サンプルアプリケーション
Spring AMQP サンプル [GitHub] (英語) プロジェクトには、2 つのサンプルアプリケーションが含まれています。1 つ目は、同期と非同期の両方のメッセージ受信を示す単純な "Hello World" の例です。重要なコンポーネントを理解するための優れた出発点を提供します。2 番目のサンプルは、株式取引のユースケースに基づいており、実際のアプリケーションで一般的な相互作用の型を示しています。この章では、最も重要なコンポーネントに集中できるように、各サンプルの簡単なウォークスルーを提供します。サンプルはどちらも Maven ベースであるため、Maven 対応の IDE ( SpringSource ツールスイート (英語) など) に直接インポートできるはずです。
"Hello World" サンプル
"Hello World" サンプルは、同期と非同期の両方のメッセージ受信を示しています。spring-rabbit-helloworld
サンプルを IDE にインポートしてから、以下の説明に従ってください。
同期の例
src/main/java
ディレクトリ内で、org.springframework.amqp.helloworld
パッケージに移動します。HelloWorldConfiguration
クラスを開き、クラスレベルで @Configuration
アノテーションが含まれていることを確認し、メソッドレベルでいくつかの @Bean
アノテーションを確認します。これは、Spring の Java ベースの構成の例です。詳細については、こちらを参照してください。
次のリストは、接続ファクトリの作成方法を示しています。
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
構成には RabbitAdmin
のインスタンスも含まれています。これは、デフォルトで、型 exchange、queue、binding の任意の Bean を検索し、ブローカーで宣言します。実際、HelloWorldConfiguration
で生成される helloWorldQueue
Bean は Queue
のインスタンスであるため、例です。
次のリストは、helloWorldQueue
Bean 定義を示しています。
@Bean
public Queue helloWorldQueue() {
return new Queue(this.helloWorldQueueName);
}
rabbitTemplate
Bean 構成を振り返ると、queue
プロパティ (メッセージの受信用) および routingKey
プロパティ (メッセージの送信用) として helloWorldQueue
の名前が設定されていることがわかります。
構成を調べたため、これらのコンポーネントを実際に使用するコードを見ることができます。まず、同じパッケージ内から Producer
クラスを開きます。これには、Spring ApplicationContext
が作成される main()
メソッドが含まれています。
次のリストは、main
メソッドを示しています。
public static void main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
amqpTemplate.convertAndSend("Hello World");
System.out.println("Sent: Hello World");
}
前の例では、AmqpTemplate
Bean が取得され、Message
の送信に使用されます。クライアントコードは可能な限りインターフェースに依存する必要があるため、型は RabbitTemplate
ではなく AmqpTemplate
です。HelloWorldConfiguration
で作成された Bean は RabbitTemplate
のインスタンスですが、インターフェースに依存することは、このコードの移植性が高いことを意味します (コードとは別に構成を変更できます)。convertAndSend()
メソッドが呼び出されるため、テンプレートはその MessageConverter
インスタンスに委譲します。この場合、デフォルトの SimpleMessageConverter
を使用しますが、HelloWorldConfiguration
で定義されているように、rabbitTemplate
Bean に別の実装を提供できます。
Consumer
クラスを開きます。実際には、同じ構成基本クラスを共有しています。つまり、rabbitTemplate
Bean を共有しています。そのため、routingKey
(送信用) と queue
(受信用) の両方を使用してそのテンプレートを構成しました。AmqpTemplate
で説明したように、代わりに "routingKey" 引数を send メソッドに渡し、"queue" 引数を receive メソッドに渡すことができます。Consumer
コードは基本的に Producer のミラーイメージであり、convertAndSend()
ではなく receiveAndConvert()
を呼び出します。
次のリストは、Consumer
の主なメソッドを示しています。
public static void main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
System.out.println("Received: " + amqpTemplate.receiveAndConvert());
}
Producer
を実行してから Consumer
を実行すると、コンソール出力に Received: Hello World
が表示されます。
非同期の例
同期の例は、同期 Hello World サンプルをウォークスルーしました。このセクションでは、少し高度ですが、より強力なオプションについて説明します。いくつかの変更により、Hello World サンプルは、メッセージ駆動型 POJO とも呼ばれる非同期受信の例を提供できます。実際、まさにそれを提供するサブパッケージ org.springframework.amqp.samples.helloworld.async
があります。
繰り返しますが、送信側から始めます。ProducerConfiguration
クラスを開くと、connectionFactory
と rabbitTemplate
Bean が作成されることに注意してください。今回は、メッセージ送信側専用の構成なので、キュー定義すら必要なく、RabbitTemplate
には "routingKey" プロパティセットのみがあります。メッセージはキューに直接送信されるのではなく、交換に送信されることを思い出してください。AMQP デフォルト交換は、名前のない直接交換です。すべてのキューは、その名前をルーティングキーとしてデフォルトの交換にバインドされます。そのため、ここではルーティングキーのみを提供する必要があります。
次のリストは、rabbitTemplate
定義を示しています。
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setRoutingKey(this.helloWorldQueueName);
return template;
}
このサンプルは非同期メッセージ受信を示しているため、プロデュース側はメッセージを継続的に送信するように設計されています (同期バージョンのような実行ごとのメッセージモデルの場合、実際にはメッセージであることがそれほど明白ではありません)。駆動コンシューマー)。メッセージを継続的に送信するコンポーネントは、ProducerConfiguration
内の内部クラスとして定義されます。3 秒ごとに実行するように構成されています。
次のリストは、コンポーネントを示しています。
static class ScheduledProducer {
@Autowired
private volatile RabbitTemplate rabbitTemplate;
private final AtomicInteger counter = new AtomicInteger();
@Scheduled(fixedRate = 3000)
public void sendMessage() {
rabbitTemplate.convertAndSend("Hello World " + counter.incrementAndGet());
}
}
すべての詳細を理解する必要はありません。実際の焦点は受信側にあります (次に説明します)。ただし、Spring タスクスケジューリングのサポートにまだ慣れていない場合は、こちらで詳細を確認できます。簡単に言うと、ProducerConfiguration
内の postProcessor
Bean がタスクをスケジューラーに登録します。
これで、受信側に目を向けることができます。メッセージ駆動型の POJO の動作を強調するために、メッセージに反応するコンポーネントから始めます。このクラスは HelloWorldHandler
と呼ばれ、次のリストに示されています。
public class HelloWorldHandler {
public void handleMessage(String text) {
System.out.println("Received: " + text);
}
}
そのクラスは POJO です。基本クラスを継承せず、インターフェースを実装せず、インポートも含みません。Spring AMQP MessageListenerAdapter
によって MessageListener
インターフェースに「適応」されています。その後、そのアダプターを SimpleMessageListenerContainer
で構成できます。このサンプルでは、コンテナーは ConsumerConfiguration
クラスで作成されます。そこでアダプターにラップされた POJO を確認できます。
次のリストは、listenerContainer
の定義方法を示しています。
@Bean
public SimpleMessageListenerContainer listenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueName(this.helloWorldQueueName);
container.setMessageListener(new MessageListenerAdapter(new HelloWorldHandler()));
return container;
}
SimpleMessageListenerContainer
は Spring ライフサイクルコンポーネントであり、デフォルトで自動的に開始されます。Consumer
クラスを見ると、その main()
メソッドが、ApplicationContext
を作成するための 1 行のブートストラップだけで構成されていることがわかります。Producer の main()
メソッドも 1 行のブートストラップです。これは、メソッドに @Scheduled
のアノテーションが付けられたコンポーネントも自動的に開始されるためです。Producer
と Consumer
は任意の順序で開始でき、メッセージが 3 秒ごとに送受信されることを確認できます。
株取引
Stock Trading サンプルは、Hello World サンプルよりも高度なメッセージングシナリオを示しています。ただし、構成は非常に似ていますが、もう少し複雑です。Hello World の構成について詳しく説明したため、ここでは、このサンプルの違いに焦点を当てます。マーケットプレースデータ (株価) をトピックエクスチェンジにプッシュするサーバーがあります。その後、クライアントはキューをルーティングパターン (たとえば、app.stock.quotes.nasdaq.*
) にバインドすることで、マーケットプレースデータフィードをサブスクライブできます。このデモのもう 1 つの主な機能は、クライアントによって開始され、サーバーによって処理される、リクエストと応答の「株式取引」対話です。これには、オーダーリクエストメッセージ自体の中でクライアントによって送信されるプライベート replyTo
キューが含まれます。
サーバーのコア構成は、org.springframework.amqp.rabbit.stocks.config.server
パッケージ内の RabbitServerConfiguration
クラスにあります。AbstractStockAppRabbitConfiguration
を継承します。ここで、サーバーとクライアントに共通のリソースが定義されます。これには、マーケットプレースデータトピック交換 (名前は "app.stock.marketdata" ) やサーバーが株式取引用に公開するキュー (名前は "app.stock" ) が含まれます。リクエスト ')。その共通構成ファイルでは、Jackson2JsonMessageConverter
が RabbitTemplate
で構成されていることもわかります。
サーバー固有の構成は、2 つの要素で構成されます。まず、Message
を送信するためのすべての呼び出しでその交換名を提供する必要がないように、RabbitTemplate
でマーケットプレースデータ交換を構成します。これは、基本構成クラスで定義された抽象コールバックメソッド内で行われます。次のリストは、その方法を示しています。
public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) {
rabbitTemplate.setExchange(MARKET_DATA_EXCHANGE_NAME);
}
次に、在庫リクエストキューが宣言されます。この場合、ルーティングキーとして独自の名前を持つデフォルトの名前なし交換にバインドされるため、明示的なバインドは必要ありません。前述のように、AMQP 仕様ではその動作が定義されています。次のリストは、stockRequestQueue
Bean の定義を示しています。
@Bean
public Queue stockRequestQueue() {
return new Queue(STOCK_REQUEST_QUEUE_NAME);
}
サーバーの AMQP リソースの構成を確認したため、src/test/java
ディレクトリの org.springframework.amqp.rabbit.stocks
パッケージに移動します。そこには、main()
メソッドを提供する実際の Server
クラスが表示されます。server-bootstrap.xml
構成ファイルに基づいて ApplicationContext
を作成します。そこには、ダミーのマーケットプレースデータを発行するスケジュールされたタスクが表示されます。その構成は、Spring の task
名前空間のサポートに依存しています。ブートストラップ構成ファイルは、他のいくつかのファイルもインポートします。最も興味深いのは、src/main/resources
の直下にある server-messaging.xml
です。そこには、株式取引リクエストの処理を担当する messageListenerContainer
Bean が表示されます。最後に、server-handlers.xml
('src/main/resources' にもあります) で定義されている serverHandler
Bean を参照してください。その Bean は ServerHandler
クラスのインスタンスであり、応答メッセージも送信できるメッセージ駆動型 POJO の良い例です。それ自体は、フレームワークまたは AMQP の概念のいずれにも結合されていないことに注意してください。TradeRequest
を受け取り、TradeResponse
を返します。次のリストは、handleMessage
メソッドの定義を示しています。
public TradeResponse handleMessage(TradeRequest tradeRequest) { ...
}
サーバーの最も重要な構成とコードを確認したため、クライアントに目を向けることができます。最良の出発点は、おそらく org.springframework.amqp.rabbit.stocks.config.client
パッケージの RabbitClientConfiguration
です。明示的な名前を指定せずに 2 つのキューを宣言していることに注意してください。次のリストは、2 つのキューの Bean 定義を示しています。
@Bean
public Queue marketDataQueue() {
return amqpAdmin().declareQueue();
}
@Bean
public Queue traderJoeQueue() {
return amqpAdmin().declareQueue();
}
これらはプライベートキューであり、一意の名前が自動的に生成されます。最初に生成されたキューは、サーバーによって公開されたマーケットプレースデータ交換にバインドするためにクライアントによって使用されます。AMQP では、コンシューマーはキューと対話し、プロデューサーはエクスチェンジと対話することを思い出してください。エクスチェンジへのキューの「バインド」は、ブローカに特定のエクスチェンジからキューにメッセージを配信 (またはルーティング) するよう指示するものです。マーケットプレースデータ交換はトピック交換であるため、バインディングはルーティングパターンで表現できます。RabbitClientConfiguration
は Binding
オブジェクトでこれを行い、そのオブジェクトは BindingBuilder
fluent API で生成されます。次のリストは Binding
を示しています。
@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;
@Bean
public Binding marketDataBinding() {
return BindingBuilder.bind(
marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}
実際の値はプロパティファイル ( src/main/resources
の client.properties
) で外部化されており、Spring の @Value
アノテーションを使用してその値を挿入していることに注意してください。これは一般的に良い考えです。そうしないと、値がクラスにハードコードされ、再コンパイルしないと変更できなくなります。この場合、バインドに使用されるルーティングパターンを変更しながら、複数のバージョンのクライアントを実行する方がはるかに簡単です。今すぐ試すことができます。
まず org.springframework.amqp.rabbit.stocks.Server
を実行し、次に org.springframework.amqp.rabbit.stocks.Client
を実行します。client.properties の 'stocks.quote.pattern' キーに関連付けられている現在の値は 'app.stock.quotes.nasdaq. '。既存の Server
と Client
を実行したまま、そのプロパティ値を 'app.stock.quotes.nyse.' であるため、NASDAQ
株のダミー相場が表示されるはずです。次に、2 番目の Client
インスタンスを開始します。最初のクライアントは引き続き NASDAQ 相場を受信し、2 番目のクライアントは NYSE 相場を受信することがわかります。代わりに、パターンを変更してすべての株または個別のティッカーを取得することもできます。
調査する最後の機能は、クライアントの観点からのリクエストと応答の相互作用です。TradeRequest
オブジェクトを受け取り、TradeResponse
オブジェクトを返す ServerHandler
をすでに見たことを思い出してください。Client
側の対応コードは org.springframework.amqp.rabbit.stocks.gateway
パッケージの RabbitStockServiceGateway
です。メッセージを送信するために RabbitTemplate
に委譲します。次のリストは、send
メソッドを示しています。
public void send(TradeRequest tradeRequest) {
getRabbitTemplate().convertAndSend(tradeRequest, new MessagePostProcessor() {
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setReplyTo(new Address(defaultReplyToQueue));
try {
message.getMessageProperties().setCorrelationId(
UUID.randomUUID().toString().getBytes("UTF-8"));
}
catch (UnsupportedEncodingException e) {
throw new AmqpException(e);
}
return message;
}
});
}
メッセージを送信する前に、replyTo
アドレスを設定することに注意してください。これは、traderJoeQueue
Bean 定義 (前に示した) によって生成されたキューを提供します。次のリストは、StockServiceGateway
クラス自体の @Bean
定義を示しています。
@Bean
public StockServiceGateway stockServiceGateway() {
RabbitStockServiceGateway gateway = new RabbitStockServiceGateway();
gateway.setRabbitTemplate(rabbitTemplate());
gateway.setDefaultReplyToQueue(traderJoeQueue());
return gateway;
}
サーバーとクライアントを実行していない場合は、今すぐ開始してください。"100 TCKR" の形式でリクエストを送信してみてください。リクエストの「処理」をシミュレートする人為的な短い遅延の後、クライアントに確認メッセージが表示されます。
Spring 以外のアプリケーションからの JSON の受信
Spring アプリケーションは、JSON を送信するときに、TypeId
ヘッダーを完全修飾クラス名に設定して、受信アプリケーションが JSON を Java オブジェクトに変換し直すのを支援します。
spring-rabbit-json
サンプルでは、Spring 以外のアプリケーションから JSON を変換するためのいくつかの手法を探っています。
[json- メッセージコンバーター ] と DefaultClassMapper
の Javadoc も参照してください。