サンプルアプリケーション

Spring AMQP サンプル [GitHub] (英語) プロジェクトには、2 つのサンプルアプリケーションが含まれています。1 つ目は、同期と非同期の両方のメッセージ受信を示す単純な "Hello World" の例です。重要なコンポーネントを理解するための優れた出発点を提供します。2 番目のサンプルは、株式取引のユースケースに基づいており、実際のアプリケーションで一般的な相互作用の型を示しています。この章では、最も重要なコンポーネントに集中できるように、各サンプルの簡単なウォークスルーを提供します。サンプルはどちらも Maven ベースであるため、Maven 対応の IDE ( SpringSource ツールスイート (英語) など) に直接インポートできるはずです。

"Hello World" サンプル

"Hello World" サンプルは、同期と非同期の両方のメッセージ受信を示しています。spring-rabbit-helloworld サンプルを IDE にインポートしてから、以下の説明に従ってください。

同期の例

src/main/java ディレクトリ内で、org.springframework.amqp.helloworld パッケージに移動します。HelloWorldConfiguration クラスを開き、クラスレベルで @Configuration アノテーションが含まれていることを確認し、メソッドレベルでいくつかの @Bean アノテーションを確認します。これは、Spring の Java ベースの構成の例です。詳細については、こちらを参照してください。

次のリストは、接続ファクトリの作成方法を示しています。

@Bean
public CachingConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory =
        new CachingConnectionFactory("localhost");
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    return connectionFactory;
}

構成には RabbitAdmin のインスタンスも含まれています。これは、デフォルトで、型 exchange、queue、binding の任意の Bean を検索し、ブローカーで宣言します。実際、HelloWorldConfiguration で生成される helloWorldQueue Bean は Queue のインスタンスであるため、例です。

次のリストは、helloWorldQueue Bean 定義を示しています。

@Bean
public Queue helloWorldQueue() {
    return new Queue(this.helloWorldQueueName);
}

rabbitTemplate Bean 構成を振り返ると、queue プロパティ (メッセージの受信用) および routingKey プロパティ (メッセージの送信用) として helloWorldQueue の名前が設定されていることがわかります。

構成を調べたため、これらのコンポーネントを実際に使用するコードを見ることができます。まず、同じパッケージ内から Producer クラスを開きます。これには、Spring ApplicationContext が作成される main() メソッドが含まれています。

次のリストは、main メソッドを示しています。

public static void main(String[] args) {
    ApplicationContext context =
        new AnnotationConfigApplicationContext(RabbitConfiguration.class);
    AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
    amqpTemplate.convertAndSend("Hello World");
    System.out.println("Sent: Hello World");
}

前の例では、AmqpTemplate Bean が取得され、Message の送信に使用されます。クライアントコードは可能な限りインターフェースに依存する必要があるため、型は RabbitTemplate ではなく AmqpTemplate です。HelloWorldConfiguration で作成された Bean は RabbitTemplate のインスタンスですが、インターフェースに依存することは、このコードの移植性が高いことを意味します (コードとは別に構成を変更できます)。convertAndSend() メソッドが呼び出されるため、テンプレートはその MessageConverter インスタンスに委譲します。この場合、デフォルトの SimpleMessageConverter を使用しますが、HelloWorldConfiguration で定義されているように、rabbitTemplate Bean に別の実装を提供できます。

Consumer クラスを開きます。実際には、同じ構成基本クラスを共有しています。つまり、rabbitTemplate Bean を共有しています。そのため、routingKey (送信用) と queue (受信用) の両方を使用してそのテンプレートを構成しました。AmqpTemplate で説明したように、代わりに "routingKey" 引数を send メソッドに渡し、"queue" 引数を receive メソッドに渡すことができます。Consumer コードは基本的に Producer のミラーイメージであり、convertAndSend() ではなく receiveAndConvert() を呼び出します。

次のリストは、Consumer の主なメソッドを示しています。

public static void main(String[] args) {
    ApplicationContext context =
        new AnnotationConfigApplicationContext(RabbitConfiguration.class);
    AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
    System.out.println("Received: " + amqpTemplate.receiveAndConvert());
}

Producer を実行してから Consumer を実行すると、コンソール出力に Received: Hello World が表示されます。

非同期の例

同期の例は、同期 Hello World サンプルをウォークスルーしました。このセクションでは、少し高度ですが、より強力なオプションについて説明します。いくつかの変更により、Hello World サンプルは、メッセージ駆動型 POJO とも呼ばれる非同期受信の例を提供できます。実際、まさにそれを提供するサブパッケージ org.springframework.amqp.samples.helloworld.async があります。

繰り返しますが、送信側から始めます。ProducerConfiguration クラスを開くと、connectionFactory と rabbitTemplate Bean が作成されることに注意してください。今回は、メッセージ送信側専用の構成なので、キュー定義すら必要なく、RabbitTemplate には "routingKey" プロパティセットのみがあります。メッセージはキューに直接送信されるのではなく、交換に送信されることを思い出してください。AMQP デフォルト交換は、名前のない直接交換です。すべてのキューは、その名前をルーティングキーとしてデフォルトの交換にバインドされます。そのため、ここではルーティングキーのみを提供する必要があります。

次のリストは、rabbitTemplate 定義を示しています。

public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    template.setRoutingKey(this.helloWorldQueueName);
    return template;
}

このサンプルは非同期メッセージ受信を示しているため、プロデュース側はメッセージを継続的に送信するように設計されています (同期バージョンのような実行ごとのメッセージモデルの場合、実際にはメッセージであることがそれほど明白ではありません)。駆動コンシューマー)。メッセージを継続的に送信するコンポーネントは、ProducerConfiguration 内の内部クラスとして定義されます。3 秒ごとに実行するように構成されています。

次のリストは、コンポーネントを示しています。

static class ScheduledProducer {

    @Autowired
    private volatile RabbitTemplate rabbitTemplate;

    private final AtomicInteger counter = new AtomicInteger();

    @Scheduled(fixedRate = 3000)
    public void sendMessage() {
        rabbitTemplate.convertAndSend("Hello World " + counter.incrementAndGet());
    }
}

すべての詳細を理解する必要はありません。実際の焦点は受信側にあります (次に説明します)。ただし、Spring タスクスケジューリングのサポートにまだ慣れていない場合は、こちらで詳細を確認できます。簡単に言うと、ProducerConfiguration 内の postProcessor Bean がタスクをスケジューラーに登録します。

これで、受信側に目を向けることができます。メッセージ駆動型の POJO の動作を強調するために、メッセージに反応するコンポーネントから始めます。このクラスは HelloWorldHandler と呼ばれ、次のリストに示されています。

public class HelloWorldHandler {

    public void handleMessage(String text) {
        System.out.println("Received: " + text);
    }

}

そのクラスは POJO です。基本クラスを継承せず、インターフェースを実装せず、インポートも含みません。Spring AMQP MessageListenerAdapter によって MessageListener インターフェースに「適応」されています。その後、そのアダプターを SimpleMessageListenerContainer で構成できます。このサンプルでは、コンテナーは ConsumerConfiguration クラスで作成されます。そこでアダプターにラップされた POJO を確認できます。

次のリストは、listenerContainer の定義方法を示しています。

@Bean
public SimpleMessageListenerContainer listenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory());
    container.setQueueName(this.helloWorldQueueName);
    container.setMessageListener(new MessageListenerAdapter(new HelloWorldHandler()));
    return container;
}

SimpleMessageListenerContainer は Spring ライフサイクルコンポーネントであり、デフォルトで自動的に開始されます。Consumer クラスを見ると、その main() メソッドが、ApplicationContext を作成するための 1 行のブートストラップだけで構成されていることがわかります。Producer の main() メソッドも 1 行のブートストラップです。これは、メソッドに @Scheduled のアノテーションが付けられたコンポーネントも自動的に開始されるためです。Producer と Consumer は任意の順序で開始でき、メッセージが 3 秒ごとに送受信されることを確認できます。

株取引

Stock Trading サンプルは、Hello World サンプルよりも高度なメッセージングシナリオを示しています。ただし、構成は非常に似ていますが、もう少し複雑です。Hello World の構成について詳しく説明したため、ここでは、このサンプルの違いに焦点を当てます。マーケットプレースデータ (株価) をトピックエクスチェンジにプッシュするサーバーがあります。その後、クライアントはキューをルーティングパターン (たとえば、app.stock.quotes.nasdaq.*) にバインドすることで、マーケットプレースデータフィードをサブスクライブできます。このデモのもう 1 つの主な機能は、クライアントによって開始され、サーバーによって処理される、リクエストと応答の「株式取引」対話です。これには、オーダーリクエストメッセージ自体の中でクライアントによって送信されるプライベート replyTo キューが含まれます。

サーバーのコア構成は、org.springframework.amqp.rabbit.stocks.config.server パッケージ内の RabbitServerConfiguration クラスにあります。AbstractStockAppRabbitConfiguration を継承します。ここで、サーバーとクライアントに共通のリソースが定義されます。これには、マーケットプレースデータトピック交換 (名前は "app.stock.marketdata" ) やサーバーが株式取引用に公開するキュー (名前は "app.stock" ) が含まれます。リクエスト ')。その共通構成ファイルでは、Jackson2JsonMessageConverter が RabbitTemplate で構成されていることもわかります。

サーバー固有の構成は、2 つの要素で構成されます。まず、Message を送信するためのすべての呼び出しでその交換名を提供する必要がないように、RabbitTemplate でマーケットプレースデータ交換を構成します。これは、基本構成クラスで定義された抽象コールバックメソッド内で行われます。次のリストは、その方法を示しています。

public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) {
    rabbitTemplate.setExchange(MARKET_DATA_EXCHANGE_NAME);
}

次に、在庫リクエストキューが宣言されます。この場合、ルーティングキーとして独自の名前を持つデフォルトの名前なし交換にバインドされるため、明示的なバインドは必要ありません。前述のように、AMQP 仕様ではその動作が定義されています。次のリストは、stockRequestQueue Bean の定義を示しています。

@Bean
public Queue stockRequestQueue() {
    return new Queue(STOCK_REQUEST_QUEUE_NAME);
}

サーバーの AMQP リソースの構成を確認したため、src/test/java ディレクトリの org.springframework.amqp.rabbit.stocks パッケージに移動します。そこには、main() メソッドを提供する実際の Server クラスが表示されます。server-bootstrap.xml 構成ファイルに基づいて ApplicationContext を作成します。そこには、ダミーのマーケットプレースデータを発行するスケジュールされたタスクが表示されます。その構成は、Spring の task 名前空間のサポートに依存しています。ブートストラップ構成ファイルは、他のいくつかのファイルもインポートします。最も興味深いのは、src/main/resources の直下にある server-messaging.xml です。そこには、株式取引リクエストの処理を担当する messageListenerContainer Bean が表示されます。最後に、server-handlers.xml ('src/main/resources' にもあります) で定義されている serverHandler Bean を参照してください。その Bean は ServerHandler クラスのインスタンスであり、応答メッセージも送信できるメッセージ駆動型 POJO の良い例です。それ自体は、フレームワークまたは AMQP の概念のいずれにも結合されていないことに注意してください。TradeRequest を受け取り、TradeResponse を返します。次のリストは、handleMessage メソッドの定義を示しています。

public TradeResponse handleMessage(TradeRequest tradeRequest) { ...
}

サーバーの最も重要な構成とコードを確認したため、クライアントに目を向けることができます。最良の出発点は、おそらく org.springframework.amqp.rabbit.stocks.config.client パッケージの RabbitClientConfiguration です。明示的な名前を指定せずに 2 つのキューを宣言していることに注意してください。次のリストは、2 つのキューの Bean 定義を示しています。

@Bean
public Queue marketDataQueue() {
    return amqpAdmin().declareQueue();
}

@Bean
public Queue traderJoeQueue() {
    return amqpAdmin().declareQueue();
}

これらはプライベートキューであり、一意の名前が自動的に生成されます。最初に生成されたキューは、サーバーによって公開されたマーケットプレースデータ交換にバインドするためにクライアントによって使用されます。AMQP では、コンシューマーはキューと対話し、プロデューサーはエクスチェンジと対話することを思い出してください。エクスチェンジへのキューの「バインド」は、ブローカに特定のエクスチェンジからキューにメッセージを配信 (またはルーティング) するよう指示するものです。マーケットプレースデータ交換はトピック交換であるため、バインディングはルーティングパターンで表現できます。RabbitClientConfiguration は Binding オブジェクトでこれを行い、そのオブジェクトは BindingBuilder fluent API で生成されます。次のリストは Binding を示しています。

@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;

@Bean
public Binding marketDataBinding() {
    return BindingBuilder.bind(
        marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}

実際の値はプロパティファイル ( src/main/resourcesclient.properties ) で外部化されており、Spring の @Value アノテーションを使用してその値を挿入していることに注意してください。これは一般的に良い考えです。そうしないと、値がクラスにハードコードされ、再コンパイルしないと変更できなくなります。この場合、バインドに使用されるルーティングパターンを変更しながら、複数のバージョンのクライアントを実行する方がはるかに簡単です。今すぐ試すことができます。

まず org.springframework.amqp.rabbit.stocks.Server を実行し、次に org.springframework.amqp.rabbit.stocks.Client を実行します。client.properties の 'stocks.quote.pattern' キーに関連付けられている現在の値は 'app.stock.quotes.nasdaq. '。既存の Server と Client を実行したまま、そのプロパティ値を 'app.stock.quotes.nyse.' であるため、NASDAQ 株のダミー相場が表示されるはずです。次に、2 番目の Client インスタンスを開始します。最初のクライアントは引き続き NASDAQ 相場を受信し、2 番目のクライアントは NYSE 相場を受信することがわかります。代わりに、パターンを変更してすべての株または個別のティッカーを取得することもできます。

調査する最後の機能は、クライアントの観点からのリクエストと応答の相互作用です。TradeRequest オブジェクトを受け取り、TradeResponse オブジェクトを返す ServerHandler をすでに見たことを思い出してください。Client 側の対応コードは org.springframework.amqp.rabbit.stocks.gateway パッケージの RabbitStockServiceGateway です。メッセージを送信するために RabbitTemplate に委譲します。次のリストは、send メソッドを示しています。

public void send(TradeRequest tradeRequest) {
    getRabbitTemplate().convertAndSend(tradeRequest, new MessagePostProcessor() {
        public Message postProcessMessage(Message message) throws AmqpException {
            message.getMessageProperties().setReplyTo(new Address(defaultReplyToQueue));
            try {
                message.getMessageProperties().setCorrelationId(
                    UUID.randomUUID().toString().getBytes("UTF-8"));
            }
            catch (UnsupportedEncodingException e) {
                throw new AmqpException(e);
            }
            return message;
        }
    });
}

メッセージを送信する前に、replyTo アドレスを設定することに注意してください。これは、traderJoeQueue Bean 定義 (前に示した) によって生成されたキューを提供します。次のリストは、StockServiceGateway クラス自体の @Bean 定義を示しています。

@Bean
public StockServiceGateway stockServiceGateway() {
    RabbitStockServiceGateway gateway = new RabbitStockServiceGateway();
    gateway.setRabbitTemplate(rabbitTemplate());
    gateway.setDefaultReplyToQueue(traderJoeQueue());
    return gateway;
}

サーバーとクライアントを実行していない場合は、今すぐ開始してください。"100 TCKR" の形式でリクエストを送信してみてください。リクエストの「処理」をシミュレートする人為的な短い遅延の後、クライアントに確認メッセージが表示されます。

Spring 以外のアプリケーションからの JSON の受信

Spring アプリケーションは、JSON を送信するときに、TypeId ヘッダーを完全修飾クラス名に設定して、受信アプリケーションが JSON を Java オブジェクトに変換し直すのを支援します。

spring-rabbit-json サンプルでは、Spring 以外のアプリケーションから JSON を変換するためのいくつかの手法を探っています。