非同期コンシューマー

Spring AMQP は、@RabbitListener アノテーションを使用してアノテーション付きリスナーエンドポイントもサポートし、エンドポイントをプログラムで登録するためのオープンインフラストラクチャを提供します。これは、非同期コンシューマーを設定する最も便利な方法です。詳細については、アノテーション駆動型のリスナーエンドポイントを参照してください。

以前のプリフェッチのデフォルト値は 1 であり、効率的なコンシューマーが十分に活用されない可能性がありました。バージョン 2.0 以降、デフォルトのプリフェッチ値は 250 になりました。これにより、ほとんどの一般的なシナリオでコンシューマーがビジー状態になり、スループットが向上します。

それにもかかわらず、プリフェッチ値を低くする必要があるシナリオがあります。

  • 大きなメッセージの場合、特に処理が遅い場合 (メッセージは、クライアントプロセスに大量のメモリを追加する可能性があります。)

  • 厳密なメッセージ順序が必要な場合 (この場合、プリフェッチ値を 1 に戻す必要があります。)

  • その他の特殊なケース

また、少量のメッセージングと複数のコンシューマー (単一のリスナーコンテナーインスタンス内での同時実行を含む) では、プリフェッチを減らして、コンシューマー間でメッセージをより均等に分散させたい場合があります。

プリフェッチの背景について詳しくは、RabbitMQ でのコンシューマーの使用 (英語) に関するこの投稿とキューイング理論 (英語) に関するこの投稿を参照してください。

メッセージリスナー

非同期 Message 受信の場合、専用コンポーネント ( AmqpTemplate ではない) が関与します。そのコンポーネントは、Message -consuming コールバックのコンテナーです。コンテナーとそのプロパティについては、このセクションの後半で検討します。ただし、最初にコールバックを確認する必要があります。これは、アプリケーションコードがメッセージングシステムと統合される場所であるためです。次のように、MessageListener インターフェースの実装から始めて、コールバックにはいくつかのオプションがあります。

public interface MessageListener {
    void onMessage(Message message);
}

何らかの理由でコールバックロジックが AMQP Channel インスタンスに依存している場合は、代わりに ChannelAwareMessageListener を使用できます。見た目は似ていますが、追加のパラメーターがあります。次のリストは、ChannelAwareMessageListener インターフェース定義を示しています。

public interface ChannelAwareMessageListener {
    void onMessage(Message message, Channel channel) throws Exception;
}
バージョン 2.1 では、このインターフェースはパッケージ o.s.amqp.rabbit.core から o.s.amqp.rabbit.listener.api に移動しました。

MessageListenerAdapter

アプリケーションロジックとメッセージング API をより厳密に分離したい場合は、フレームワークによって提供されるアダプターの実装を利用できます。これは、「メッセージ駆動型 POJO」サポートと呼ばれることがよくあります。

バージョン 1.5 では、POJO メッセージングのより柔軟なメカニズムである @RabbitListener アノテーションが導入されました。詳細については、アノテーション駆動型のリスナーエンドポイントを参照してください。

アダプターを使用する場合、アダプター自体が呼び出すインスタンスへの参照のみを提供する必要があります。次の例は、その方法を示しています。

MessageListenerAdapter listener = new MessageListenerAdapter(somePojo);
listener.setDefaultListenerMethod("myMethod");

アダプターをサブクラス化し、getListenerMethodName() の実装を提供して、メッセージに基づいてさまざまなメソッドを動的に選択できます。このメソッドには originalMessage と extractedMessage の 2 つのパラメーターがあり、後者は変換の結果です。デフォルトでは、SimpleMessageConverter が設定されています。詳細および利用可能なその他のコンバーターについては、SimpleMessageConverter を参照してください。

バージョン 1.4.2 以降、元のメッセージには consumerQueue および consumerTag プロパティがあり、メッセージを受信したキューを特定するために使用できます。

バージョン 1.5 以降、コンシューマーキューまたはタグのメソッド名へのマップを構成して、呼び出すメソッドを動的に選択できます。マップにエントリがない場合は、デフォルトのリスナーメソッドにフォールバックします。デフォルトのリスナーメソッド (設定されていない場合) は handleMessage です。

バージョン 2.0 から、便利な FunctionalInterface が提供されました。次のリストは、FunctionalInterface の定義を示しています。

@FunctionalInterface
public interface ReplyingMessageListener<T, R> {

    R handleMessage(T t);

}

このインターフェースは、次の例に示すように、Java 8 ラムダを使用して、アダプターの便利な構成を容易にします。

new MessageListenerAdapter((ReplyingMessageListener<String, String>) data -> {
    ...
    return result;
}));

バージョン 2.2 以降、buildListenerArguments(Object) は廃止され、代わりに新しい buildListenerArguments(Object, Channel, Message) が導入されました。新しいメソッドは、リスナーが Channel および Message 引数を取得して、手動確認モードで channel.basicReject(long, boolean) を呼び出すなど、より多くのことを行うのに役立ちます。次のリストは、最も基本的な例を示しています。

public class ExtendedListenerAdapter extends MessageListenerAdapter {

    @Override
    protected Object[] buildListenerArguments(Object extractedMessage, Channel channel, Message message) {
        return new Object[]{extractedMessage, channel, message};
    }

}

「チャネル」と「メッセージ」を受信する必要がある場合は、MessageListenerAdapter と同じように ExtendedListenerAdapter を設定できます。次のリスナーの例に示すように、リスナーのパラメーターは、buildListenerArguments(Object, Channel, Message) が返すように設定する必要があります。

public void handleMessage(Object object, Channel channel, Message message) throws IOException {
    ...
}

コンテナー

Message -listening コールバックのさまざまなオプションを確認したため、コンテナーに注意を向けることができます。基本的に、コンテナーは「アクティブな」責任を処理するため、リスナーのコールバックはパッシブのままになります。コンテナーは「ライフサイクル」コンポーネントの一例です。開始と停止のメソッドを提供します。コンテナーを構成するときは、基本的に AMQP キューと MessageListener インスタンス間のギャップを埋めます。ConnectionFactory への参照と、そのリスナーがメッセージを消費するキュー名または Queue インスタンスを提供する必要があります。

バージョン 2.0 より前は、SimpleMessageListenerContainer という 1 つのリスナーコンテナーがありました。DirectMessageListenerContainer という 2 つ目のコンテナーが追加されました。どちらを使用するかを選択する際に適用できるコンテナーと条件の違いについては、コンテナーの選択で説明されています。

次のリストは、SimpleMessageListenerContainer を使用して機能する最も基本的な例を示しています。

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory);
container.setQueueNames("some.queue");
container.setMessageListener(new MessageListenerAdapter(somePojo));

「アクティブな」コンポーネントとして、バックグラウンドで実行できるように、Bean 定義を使用してリスナーコンテナーを作成するのが最も一般的です。次の例は、XML でこれを行う 1 つの方法を示しています。

<rabbit:listener-container connection-factory="rabbitConnectionFactory">
    <rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>

次のリストは、XML でこれを行う別の方法を示しています。

<rabbit:listener-container connection-factory="rabbitConnectionFactory" type="direct">
    <rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>

上記の例はどちらも DirectMessageListenerContainer を作成します ( type 属性に注意してください — デフォルトは simple です)。

または、前のコードスニペットに似た Java 構成を使用することもできます。

@Configuration
public class ExampleAmqpConfiguration {

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory());
        container.setQueueName("some.queue");
        container.setMessageListener(exampleListener());
        return container;
    }

    @Bean
    public CachingConnectionFactory rabbitConnectionFactory() {
        CachingConnectionFactory connectionFactory =
            new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public MessageListener exampleListener() {
        return new MessageListener() {
            public void onMessage(Message message) {
                System.out.println("received: " + message);
            }
        };
    }
}

コンシューマー優先

RabbitMQ バージョン 3.2 から、ブローカーはコンシューマー優先度をサポートするようになりました ( RabbitMQ でのコンシューマー優先度の使用 (英語) を参照)。これは、コンシューマーで x-priority 引数を設定することで有効になります。次の例に示すように、SimpleMessageListenerContainer はコンシューマー引数の設定をサポートするようになりました。

container.setConsumerArguments(Collections.
<String, Object> singletonMap("x-priority", Integer.valueOf(10)));

次の例に示すように、便宜上、名前空間は listener 要素に priority 属性を提供します。

<rabbit:listener-container connection-factory="rabbitConnectionFactory">
    <rabbit:listener queues="some.queue" ref="somePojo" method="handle" priority="10" />
</rabbit:listener-container>

バージョン 1.3 以降、コンテナーが実行時にリッスンするキューを変更できます。リスナーコンテナーキューを参照してください。

auto-delete キュー

コンテナーが auto-delete キューをリッスンするように構成されている場合、キューに x-expires オプションがある場合、またはブローカーで有効期間 (英語) ポリシーが構成されている場合、コンテナーが停止したとき (つまり、最後のコンシューマーがキャンセルされたとき) にブローカーによってキューが削除されます)。バージョン 1.3 より前では、キューが欠落していたため、コンテナーを再起動できませんでした。RabbitAdmin は、接続が閉じられたとき、または接続が開かれたときにのみ、キューなどを自動的に再宣言します。これは、コンテナーが停止して開始されたときには発生しません。

バージョン 1.3 から、コンテナーは RabbitAdmin を使用して、起動時に欠落しているキューを再宣言します。

条件付き宣言 ( 条件宣言を参照) を auto-startup="false" 管理者と共に使用して、コンテナーが開始されるまでキュー宣言を延期することもできます。次の例は、その方法を示しています。

<rabbit:queue id="otherAnon" declared-by="containerAdmin" />

<rabbit:direct-exchange name="otherExchange" auto-delete="true" declared-by="containerAdmin">
    <rabbit:bindings>
        <rabbit:binding queue="otherAnon" key="otherAnon" />
    </rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:listener-container id="container2" auto-startup="false">
    <rabbit:listener id="listener2" ref="foo" queues="otherAnon" admin="containerAdmin" />
</rabbit:listener-container>

<rabbit:admin id="containerAdmin" connection-factory="rabbitConnectionFactory"
    auto-startup="false" />

この場合、キューと交換は、コンテキストの初期化中に要素が宣言されないように、auto-startup="false" を持つ containerAdmin によって宣言されます。また、コンテナーは同じ理由で開始されません。コンテナーが後で開始されると、containerAdmin への参照を使用して要素を宣言します。