AMQP サポート
Spring Integration は、Advanced Message Queuing Protocol(AMQP)を使用してメッセージを送受信するためのチャネルアダプターを提供します。
この依存関係をプロジェクトに含める必要があります。
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-amqp</artifactId>
<version>5.5.16</version>
</dependency>
compile "org.springframework.integration:spring-integration-amqp:5.5.16"
次のアダプターが利用可能です。
Spring Integration は、AMQP Exchange および Queue によってサポートされるポイントツーポイントメッセージチャネルおよびパブリッシュ / サブスクライブメッセージチャネルも提供します。
AMQP サポートを提供するため、Spring Integration は(Spring AMQP)に依存しています。これは、コア Spring コンセプトを AMQP ベースのメッセージングソリューションの開発に適用します。Spring AMQP は(Spring JMS)と同様のセマンティクスを提供します。
提供されている AMQP チャネルアダプターは単方向メッセージング(送信または受信)のみを対象としていますが、Spring Integration はリクエスト応答操作用の受信および送信 AMQP ゲートウェイも提供します。
TIP: Spring AMQP プロジェクトのリファレンスドキュメントをよく理解しましょう。Spring の AMQP 全般および特に RabbitMQ との統合に関する詳細な情報を提供します。
受信チャネルアダプター
以下のリストは、AMQP 受信チャネルアダプターの可能な構成オプションを示しています。
@Bean
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "aName"))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
@Bean
public MessageChannel amqpInputChannel() {
return new DirectChannel();
}
@Bean
public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer,
@Qualifier("amqpInputChannel") MessageChannel channel) {
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
adapter.setOutputChannel(channel);
return adapter;
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames("aName");
container.setConcurrentConsumers(2);
// ...
return container;
}
@Bean
@ServiceActivator(inputChannel = "amqpInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
<int-amqp:inbound-channel-adapter
id="inboundAmqp" (1)
channel="inboundChannel" (2)
queue-names="si.test.queue" (3)
acknowledge-mode="AUTO" (4)
advice-chain="" (5)
channel-transacted="" (6)
concurrent-consumers="" (7)
connection-factory="" (8)
error-channel="" (9)
expose-listener-channel="" (10)
header-mapper="" (11)
mapped-request-headers="" (12)
listener-container="" (13)
message-converter="" (14)
message-properties-converter="" (15)
phase="" (16)
prefetch-count="" (17)
receive-timeout="" (18)
recovery-interval="" (19)
missing-queues-fatal="" (20)
shutdown-timeout="" (21)
task-executor="" (22)
transaction-attribute="" (23)
transaction-manager="" (24)
tx-size="" (25)
consumers-per-queue (26)
batch-mode="MESSAGES"/> (27)
1 | このアダプターの一意の ID。オプション。 |
2 | 変換されたメッセージの送信先のメッセージチャネル。必須。 |
3 | メッセージをコンシュームする AMQP キュー(コンマ区切りリスト)の名前。必須。 |
4 | MessageListenerContainer の確認モード。MANUAL に設定すると、配信タグとチャネルはそれぞれメッセージヘッダー amqp_deliveryTag と amqp_channel で提供されます。ユーザーアプリケーションは確認の責任を負います。NONE は、確認応答なし(autoAck )を意味します。AUTO は、ダウンストリームフローが完了すると、アダプターのコンテナーが確認することを意味します。オプション(デフォルトは AUTO)。受信エンドポイント確認モードを参照してください。 |
5 | この受信チャネルアダプターに関連付けられた横断的な動作を処理するための追加の AOP アドバイス。オプション。 |
6 | このコンポーネントによって作成されたチャネルがトランザクションであることを示すフラグ。true の場合、トランザクションチャネルを使用し、結果に応じて、ロールバックを通知する例外を除き、コミットまたはロールバックですべての操作(送信または受信)を終了するようにフレームワークに指示します。オプション(デフォルトは false)。 |
7 | 作成する同時コンシューマーの数を指定します。デフォルトは 1 です。同時コンシューマーの数を増やして、キューから受信するメッセージの消費を拡大することをお勧めします。ただし、複数のコンシューマーが登録されると、順序付けの保証は失われます。一般に、少量のキューには 1 つのコンシューマーを使用します。"consumers-per-queue" が設定されている場合は許可されません。オプション。 |
8 | RabbitMQ ConnectionFactory への Bean 参照。オプション(デフォルトは connectionFactory )。 |
9 | エラーメッセージの送信先のメッセージチャネル。オプション。 |
10 | リスナーチャネル (com.rabbitmq.client.Channel) が登録済みの ChannelAwareMessageListener に公開されるかどうか。オプション (デフォルトは true)。 |
11 | AMQP メッセージを受信するときに使用する AmqpHeaderMapper への参照。オプション。デフォルトでは、標準の AMQP プロパティ(contentType など)のみが Spring Integration MessageHeaders にコピーされます。AMQP MessageProperties 内のユーザー定義ヘッダーは、デフォルト DefaultAmqpHeaderMapper によってメッセージにコピーされません。"request-header-names" が指定されている場合は許可されません。 |
12 | AMQP リクエストから MessageHeaders にマップされる AMQP ヘッダーの名前のコンマ区切りリスト。これは、"header-mapper" 参照が提供されていない場合にのみ提供できます。このリストの値は、ヘッダー名( "*" または「thing1 *、thing2」または "* something" など)と照合する単純なパターンにすることもできます。 |
13 | AMQP メッセージの受信に使用する AbstractMessageListenerContainer への参照。この属性を指定する場合、リスナーコンテナーの構成に関連する他の属性は指定しないでください。つまり、この参照を設定することにより、リスナーコンテナーの構成について完全な責任を負う必要があります。唯一の例外は MessageListener 自体です。それが実際にこのチャネルアダプター実装の中心的な責任であるため、参照されるリスナーコンテナーは、独自の MessageListener をまだ持っていてはなりません。オプション。 |
14 | AMQP メッセージを受信するときに使用する MessageConverter 。オプション。 |
15 | AMQP メッセージを受信するときに使用する MessagePropertiesConverter 。オプション。 |
16 | 基礎となる AbstractMessageListenerContainer を開始および停止するフェーズを指定します。起動順序は最低から最高に進み、シャットダウンの順序はその逆です。デフォルトでは、この値は Integer.MAX_VALUE です。これは、このコンテナーが可能な限り遅く起動し、できるだけ早く停止することを意味します。オプション。 |
17 | AMQP ブローカーに、単一のリクエストで各コンシューマーに送信するメッセージの数を伝えます。多くの場合、スループットを改善するためにこの値を高く設定できます。トランザクションサイズ以上でなければなりません(このリストで後述する tx-size 属性を参照)。オプション(デフォルトは 1 )。 |
18 | ミリ秒単位の受信タイムアウト。オプション(デフォルトは 1000 )。 |
19 | 基礎となる AbstractMessageListenerContainer の回復試行の間隔を指定します(ミリ秒単位)。オプション(デフォルトは 5000 )。 |
20 | "true" であり、ブローカーで使用できるキューがない場合、コンテナーは起動時に致命的な例外をスローし、コンテナーの実行中にキューが削除されると停止します(キューを受動的に宣言しようと 3 回試行した後)。false の場合、コンテナーは例外をスローせず、リカバリモードに入り、recovery-interval に従って再起動を試みます。オプション(デフォルトは true )。 |
21 | 基礎となる AbstractMessageListenerContainer が停止してから AMQP 接続が強制的に閉じられるまでの、ワーカーを待機する時間(ミリ秒)。シャットダウン信号が来たときにワーカーがアクティブである場合、このタイムアウト内で終了できる限り、処理を終了することができます。それ以外の場合、接続は閉じられ、メッセージは未確認のままになります(チャネルがトランザクションの場合)。オプション(デフォルトは 5000 )。 |
22 | デフォルトでは、基礎となる AbstractMessageListenerContainer は SimpleAsyncTaskExecutor 実装を使用します。これは、各タスクに対して新しいスレッドを起動し、非同期で実行します。デフォルトでは、同時スレッドの数は無制限です。この実装はスレッドを再利用しないことに注意してください。代替手段として、スレッドプーリング TaskExecutor 実装の使用を検討してください。オプション(デフォルトは SimpleAsyncTaskExecutor )。 |
23 | デフォルトでは、基礎となる AbstractMessageListenerContainer は DefaultTransactionAttribute の新しいインスタンスを作成します(EJB アプローチを使用して実行時にロールバックしますが、チェック済み例外はありません)。オプション(デフォルトは DefaultTransactionAttribute )。 |
24 | 基になる AbstractMessageListenerContainer の外部 PlatformTransactionManager への Bean 参照を設定します。トランザクションマネージャーは、channel-transacted 属性と連携して機能します。フレームワークがメッセージを送信または受信していて、channelTransacted フラグが true であるときに進行中のトランザクションがすでに存在する場合、メッセージングトランザクションのコミットまたはロールバックは、現在のトランザクションが終了するまで延期されます。channelTransacted フラグが false の場合、トランザクションのセマンティクスはメッセージング操作に適用されません(自動確認されます)。詳細については、Spring AMQP を使用したトランザクションを参照してください。オプション。 |
25 | 1 つのトランザクションで処理するメッセージの数を SimpleMessageListenerContainer に伝えます(チャネルがトランザクションの場合)。最良の結果を得るには、prefetch-count で設定された値以下にする必要があります。"consumers-per-queue" が設定されている場合は許可されません。オプション(デフォルトは 1 )。 |
26 | 基礎となるリスナーコンテナーがデフォルトの SimpleMessageListenerContainer ではなく DirectMessageListenerContainer であることを示します。詳細については、Spring AMQP リファレンスマニュアルを参照してください。 |
27 | コンテナーの consumerBatchEnabled が true の場合、アダプターがメッセージペイロードでメッセージのバッチをどのように表示するかを決定します。MESSAGES (デフォルト)に設定すると、ペイロードは List<Message<?>> になり、各メッセージには受信 AMQP Message からマッピングされたヘッダーがあり、ペイロードは変換された body です。EXTRACT_PAYLOADS に設定した場合、ペイロードは List<?> であり、要素は AMQP Message 本体から変換されます。EXTRACT_PAYLOADS_WITH_HEADERS は EXTRACT_PAYLOADS に似ていますが、さらに、各メッセージのヘッダーは MessageProperties から対応するインデックスの List<Map<String, Object> にマップされます。ヘッダー名は AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS です。 |
コンテナー XML を使用して外部コンテナーを構成する場合、Spring AMQP 名前空間を使用してコンテナーを定義することはできないことに注意してください。これは、名前空間に少なくとも 1 つの
|
Spring Integration JMS と AMQP のサポートは似ていますが、重要な違いがあります。JMS 受信チャネルアダプターは、カバーで JmsDestinationPollingSource を使用しており、設定済みのポーラーを想定しています。AMQP 受信チャネルアダプターは AbstractMessageListenerContainer を使用し、メッセージ駆動型です。その点では、JMS メッセージ駆動型チャネルアダプターにより似ています。 |
バージョン 5.5 以降、AmqpInboundChannelAdapter
は、再試行操作が内部で呼び出されたときに RecoveryCallback
で使用される org.springframework.amqp.rabbit.retry.MessageRecoverer
戦略を使用して構成できます。詳細については、setMessageRecoverer()
JavaDocs を参照してください。
バッチメッセージ
バッチメッセージの詳細については、Spring AMQP ドキュメントを参照してください。
Spring Integration でバッチメッセージを作成するには、発信エンドポイントを BatchingRabbitTemplate
で構成するだけです。
バッチメッセージを受信すると、デフォルトで、リスナーコンテナーは各フラグメントメッセージを抽出し、アダプターは各フラグメントに対して Message<?>
を生成します。バージョン 5.2 以降、コンテナーの deBatchingEnabled
プロパティが false
に設定されている場合、代わりにアダプターによってデバッチ処理が実行され、ペイロードがフラグメントペイロードのリストである単一の Message<List<?>>
が生成されます(適切な場合は変換後)。
デフォルトの BatchingStrategy
は SimpleBatchingStrategy
ですが、これはアダプターでオーバーライドできます。
再試行操作でリカバリが必要な場合は、org.springframework.amqp.rabbit.retry.MessageBatchRecoverer をバッチで使用する必要があります。 |
ポーリングされた受信チャネルアダプター
概要
バージョン 5.0.1 では、ポーリングチャネルアダプターが導入され、MessageSourcePollingTemplate
またはポーラーなどを使用して、個々のメッセージをオンデマンドでフェッチできます。詳細については、遅延確認応答可能なメッセージソースを参照してください。
現在、XML 構成はサポートされていません。
次の例は、AmqpMessageSource
を構成する方法を示しています。
@Bean
public IntegrationFlow flow() {
return IntegrationFlows.from(Amqp.inboundPolledAdapter(connectionFactory(), DSL_QUEUE),
e -> e.poller(Pollers.fixedDelay(1_000)).autoStartup(false))
.handle(p -> {
...
})
.get();
}
@Bean
public AmqpMessageSource source(ConnectionFactory connectionFactory) {
return new AmqpMessageSource(connectionFactory, "someQueue");
}
構成プロパティについては、Javadoc を参照してください。
This adapter currently does not have XML configuration support.
バッチメッセージ
バッチメッセージを参照してください。
ポーリングされたアダプターの場合、リスナーコンテナーはなく、バッチメッセージは常にデバッチされます(BatchingStrategy
がサポートしている場合)。
受信ゲートウェイ
受信ゲートウェイは、受信チャネルアダプターのすべての属性( "channel" が "request-channel" に置き換えられることを除く)、およびいくつかの追加属性をサポートします。次のリストは、使用可能な属性を示しています。
@Bean // return the upper cased payload
public IntegrationFlow amqpInboundGateway(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, "foo"))
.transform(String.class, String::toUpperCase)
.get();
}
@Bean
public MessageChannel amqpInputChannel() {
return new DirectChannel();
}
@Bean
public AmqpInboundGateway inbound(SimpleMessageListenerContainer listenerContainer,
@Qualifier("amqpInputChannel") MessageChannel channel) {
AmqpInboundGateway gateway = new AmqpInboundGateway(listenerContainer);
gateway.setRequestChannel(channel);
gateway.setDefaultReplyTo("bar");
return gateway;
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames("foo");
container.setConcurrentConsumers(2);
// ...
return container;
}
@Bean
@ServiceActivator(inputChannel = "amqpInputChannel")
public MessageHandler handler() {
return new AbstractReplyProducingMessageHandler() {
@Override
protected Object handleRequestMessage(Message<?> requestMessage) {
return "reply to " + requestMessage.getPayload();
}
};
}
<int-amqp:inbound-gateway
id="inboundGateway" (1)
request-channel="myRequestChannel" (2)
header-mapper="" (3)
mapped-request-headers="" (4)
mapped-reply-headers="" (5)
reply-channel="myReplyChannel" (6)
reply-timeout="1000" (7)
amqp-template="" (8)
default-reply-to="" /> (9)
1 | このアダプターの固有 ID。オプション。 |
2 | 変換されたメッセージの送信先のメッセージチャネル。必須。 |
3 | AMQP メッセージを受信するときに使用する AmqpHeaderMapper への参照。オプション。デフォルトでは、標準の AMQP プロパティ(contentType など)のみが Spring Integration MessageHeaders との間でコピーされます。AMQP MessageProperties 内のユーザー定義ヘッダーは、デフォルト DefaultAmqpHeaderMapper によって AMQP メッセージとの間でコピーされません。"request-header-names" または "reply-header-names" が指定されている場合は許可されません。 |
4 | AMQP リクエストから MessageHeaders にマップされる AMQP ヘッダーの名前のコンマ区切りリスト。この属性は、"header-mapper" 参照が提供されていない場合にのみ提供できます。このリストの値は、ヘッダー名と照合する単純なパターンにすることもできます(例: "*" または "thing1*, thing2" または "*thing1" )。 |
5 | AMQP 応答メッセージの AMQP メッセージプロパティにマッピングされる MessageHeaders の名前のカンマ区切りリスト。すべての標準ヘッダー(contentType など)は AMQP メッセージプロパティにマップされますが、ユーザー定義ヘッダーは 'headers' プロパティにマップされます。この属性は、"header-mapper" 参照が提供されていない場合にのみ提供できます。このリストの値は、ヘッダー名(たとえば、"*" または "foo*, bar" または "*foo" など)と照合する単純なパターンにすることもできます。 |
6 | 返信メッセージが期待されるメッセージチャネル。オプション。 |
7 | 応答チャネルからメッセージを受信するために、基礎となる o.s.i.core.MessagingTemplate に receiveTimeout を設定します。指定しない場合、このプロパティのデフォルトは 1000 (1 秒)です。応答が送信される前にコンテナースレッドが別のスレッドに渡される場合にのみ適用されます。 |
8 | カスタマイズされた AmqpTemplate Bean 参照(送信する応答メッセージをさらに制御するため)。RabbitTemplate の代替実装を提供できます。 |
9 | replyTo requestMessage に replyTo プロパティがない場合に使用される o.s.amqp.core.Address 。このオプションが指定されていない場合、amqp-template は提供されず、replyTo プロパティはリクエストメッセージに存在せず、応答をルーティングできないために IllegalStateException がスローされます。このオプションが指定されておらず、外部 amqp-template が提供されている場合、例外はスローされません。リクエストメッセージに replyTo プロパティが存在しない場合が予想される場合は、このオプションを指定するか、そのテンプレートにデフォルトの exchange および routingKey を構成する必要があります。 |
listener-container
属性の構成については、受信チャネルアダプターの注を参照してください。
バージョン 5.5 以降、AmqpInboundChannelAdapter
は、再試行操作が内部で呼び出されたときに RecoveryCallback
で使用される org.springframework.amqp.rabbit.retry.MessageRecoverer
戦略を使用して構成できます。詳細については、setMessageRecoverer()
JavaDocs を参照してください。
バッチメッセージ
バッチメッセージを参照してください。
受信エンドポイント確認モード
デフォルトでは、受信エンドポイントは AUTO
確認モードを使用します。これは、ダウンストリーム統合フローが完了すると(または、QueueChannel
または ExecutorChannel
を使用してメッセージが別のスレッドに渡されると)コンテナーがメッセージを自動的に確認することを意味します。モードを NONE
に設定すると、確認応答がまったく使用されないようにコンシューマーが構成されます(ブローカーは、送信されるとすぐにメッセージを自動的に確認応答します)。モードを MANUAL
に設定すると、ユーザーコードは処理中の他のポイントでメッセージを確認できます。これをサポートするために、このモードでは、エンドポイントはそれぞれ amqp_channel
および amqp_deliveryTag
ヘッダーで Channel
および deliveryTag
を提供します。
Channel
で任意の有効な Rabbit コマンドを実行できますが、通常は basicAck
と basicNack
(または basicReject
)のみが使用されます。コンテナーの操作を妨げないために、チャネルへの参照を保持せず、現在のメッセージのコンテキストでのみ使用する必要があります。
Channel は「ライブ」オブジェクトへの参照であるため、直列化できず、メッセージが永続化されると失われます。 |
次の例は、MANUAL
確認応答の使用方法を示しています。
@ServiceActivator(inputChannel = "foo", outputChannel = "bar")
public Object handle(@Payload String payload, @Header(AmqpHeaders.CHANNEL) Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws Exception {
// Do some processing
if (allOK) {
channel.basicAck(deliveryTag, false);
// perhaps do some more processing
}
else {
channel.basicNack(deliveryTag, false, true);
}
return someResultForDownStreamProcessing;
}
送信エンドポイント
次の発信エンドポイントには、多くの同様の構成オプションがあります。バージョン 5.2 から、confirm-timeout
が追加されました。通常、パブリッシャーが有効になっていることを確認すると、ブローカーはすぐに適切なチャネルに送信される ack(または nack)を返します。確認が受信される前にチャネルが閉じられると、Spring AMQP フレームワークはナックを合成します。「欠落」ACK は決して発生しませんが、このプロパティを設定すると、エンドポイントは定期的にチェックし、受信が確認されずに時間が経過すると、ACK を合成します。
送信チャネルアダプター
次の例は、AMQP 送信チャネルアダプターの使用可能なプロパティを示しています。
@Bean
public IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate,
MessageChannel amqpOutboundChannel) {
return IntegrationFlows.from(amqpOutboundChannel)
.handle(Amqp.outboundAdapter(amqpTemplate)
.routingKey("queue1")) // default exchange - route to queue 'queue1'
.get();
}
@Bean
@ServiceActivator(inputChannel = "amqpOutboundChannel")
public AmqpOutboundEndpoint amqpOutbound(AmqpTemplate amqpTemplate) {
AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate);
outbound.setRoutingKey("queue1"); // default exchange - route to queue 'queue1'
return outbound;
}
@Bean
public MessageChannel amqpOutboundChannel() {
return new DirectChannel();
}
<int-amqp:outbound-channel-adapter id="outboundAmqp" (1)
channel="outboundChannel" (2)
amqp-template="myAmqpTemplate" (3)
exchange-name="" (4)
exchange-name-expression="" (5)
order="1" (6)
routing-key="" (7)
routing-key-expression="" (8)
default-delivery-mode"" (9)
confirm-correlation-expression="" (10)
confirm-ack-channel="" (11)
confirm-nack-channel="" (12)
confirm-timeout="" (13)
wait-for-confirm="" (14)
return-channel="" (15)
error-message-strategy="" (16)
header-mapper="" (17)
mapped-request-headers="" (18)
lazy-connect="true" (19)
multi-send="false"/> (20)
1 | このアダプターの一意の ID。オプション。 |
2 | メッセージを AMQP 交換に変換して公開するために送信するメッセージチャネル。必須。 |
3 | 構成された AMQP テンプレートへの Bean 参照。オプション(デフォルトは amqpTemplate )。 |
4 | メッセージの送信先の AMQP 交換の名前。指定しない場合、メッセージはデフォルトの名前のない交換に送信されます。"exchange-name-expression" と相互に排他的です。オプション。 |
5 | メッセージがルートオブジェクトとして送信される AMQP 交換の名前を決定するために評価される SpEL 式。指定しない場合、メッセージはデフォルトの名前のない交換に送信されます。"exchange-name" と相互に排他的。オプション。 |
6 | 複数のコンシューマーが登録されている場合のこのコンシューマーの順序。これにより、負荷分散とフェイルオーバーが可能になります。オプション(デフォルトは Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE] )。 |
7 | メッセージを送信するときに使用する固定ルーティングキー。デフォルトでは、これは空の String です。"routing-key-expression" と相互に排他的です。オプション。 |
8 | メッセージをルートオブジェクトとして使用してメッセージを送信するときに使用するルーティングキーを決定するために評価される SpEL 式( "payload.key" など)。デフォルトでは、これは空の String です。「ルーティングキー」と相互に排他的。オプション。 |
9 | メッセージのデフォルト配信モード: PERSISTENT または NON_PERSISTENT header-mapper が配信モードを設定するとオーバーライドされます。Spring Integration メッセージヘッダー amqp_deliveryMode が存在する場合、DefaultHeaderMapper は値を設定します。この属性が指定されておらず、ヘッダーマッパーが設定しない場合、デフォルトは RabbitTemplate によって使用される基になる Spring AMQP MessagePropertiesConverter に依存します。それがまったくカスタマイズされていない場合、デフォルトは PERSISTENT です。オプション。 |
10 | 相関データを定義する式。提供される場合、これは、パブリッシャーの確認を受信するように基盤となる AMQP テンプレートを構成します。専用の RabbitTemplate と、publisherConfirms プロパティが true に設定された CachingConnectionFactory が必要です。パブリッシャーの確認を受信して相関データを提供すると、確認の種類に応じて、confirm-ack-channel または confirm-nack-channel のいずれかに書き込まれます。確認のペイロードは、この式で定義されている相関データです。メッセージには、'amqp_publishConfirm' ヘッダーが true (ack )または false (nack )に設定されています。例: headers['myCorrelationData'] および payload 。バージョン 4.1 では、amqp_publishConfirmNackCause メッセージヘッダーが導入されました。これには、発行者の確認用の「ナック」の cause が含まれています。バージョン 4.2 以降、式が Message<?> インスタンス(#this など)に解決される場合、ack /nack チャネルで発行されるメッセージはそのメッセージに基づいており、追加のヘッダーが追加されています。以前は、型に関係なく、ペイロードとして相関データを使用して新しいメッセージが作成されていました。パブリッシャーの確認と return の代替メカニズムも参照してください。オプション。 |
11 | ポジティブ(ack )パブリッシャーが確認するチャネルが送信されます。ペイロードは、confirm-correlation-expression によって定義された相関データです。式が #root または #this の場合、メッセージは元のメッセージから構築され、amqp_publishConfirm ヘッダーは true に設定されます。パブリッシャーの確認と return の代替メカニズムも参照してください。オプション(デフォルトは nullChannel )。 |
12 | 否定的な(nack )パブリッシャー確認の送信先のチャネル。ペイロードは、confirm-correlation-expression によって定義された相関データです(ErrorMessageStrategy が構成されていない場合)。式が #root または #this の場合、メッセージは元のメッセージから構築され、amqp_publishConfirm ヘッダーは false に設定されます。ErrorMessageStrategy がある場合、メッセージは NackedAmqpMessageException ペイロードを持つ ErrorMessage です。パブリッシャーの確認と return の代替メカニズムも参照してください。オプション(デフォルトは nullChannel )。 |
13 | 設定すると、この時間内にパブリッシャー確認が受信されない場合、アダプターは否定応答(nack)を合成します(ミリ秒単位)。保留中の確認はこの値の 50% ごとにチェックされるため、nack が送信される実際の時間は 1x と 1.5x の間になります。パブリッシャーの確認と return の代替メカニズムも参照してください。デフォルトはなし(nack は生成されません)。 |
14 | true に設定すると、呼び出し元のスレッドはブロックされ、発行者の確認を待ちます。これには、確認用に構成された RabbitTemplate と confirm-correlation-expression が必要です。スレッドは、最大 confirm-timeout (またはデフォルトでは 5 秒)ブロックします。タイムアウトが発生すると、MessageTimeoutException がスローされます。戻りが有効になっていて、メッセージが返されるか、確認の待機中に他の例外が発生すると、MessageHandlingException が適切なメッセージとともにスローされます。 |
15 | 返されたメッセージが送信されるチャネル。指定すると、基になる AMQP テンプレートは、配信不能メッセージをアダプターに返すように構成されます。ErrorMessageStrategy が構成されていない場合、メッセージは AMQP から受信したデータと次の追加ヘッダー amqp_returnReplyCode 、amqp_returnReplyText 、amqp_returnExchange 、amqp_returnRoutingKey から構成されます。ErrorMessageStrategy がある場合、メッセージは ReturnedAmqpMessageException ペイロードを持つ ErrorMessage です。パブリッシャーの確認と return の代替メカニズムも参照してください。オプション。 |
16 | 返されたメッセージまたは否定応答メッセージを送信するときに ErrorMessage インスタンスを構築するために使用される ErrorMessageStrategy 実装への参照。 |
17 | AMQP メッセージを送信するときに使用する AmqpHeaderMapper への参照。デフォルトでは、標準の AMQP プロパティ(contentType など)のみが Spring Integration MessageHeaders にコピーされます。デフォルトの `DefaultAmqpHeaderMapper` では、ユーザー定義のヘッダーはメッセージにコピーされません。'request-header-names' が指定されている場合は許可されません。オプション。 |
18 | MessageHeaders から AMQP メッセージにマッピングされる AMQP ヘッダーの名前のコンマ区切りリスト。"header-mapper" 参照が提供されている場合は許可されません。このリストの値は、ヘッダー名と照合する単純なパターンにすることもできます(例: "*" または "thing1*, thing2" または "*thing1" )。 |
19 | false に設定すると、エンドポイントはアプリケーションコンテキストの初期化中にブローカーへの接続を試行します。これにより、不良構成の「フェイルファースト」検出が可能になりますが、ブローカーがダウンしていると初期化が失敗します。true (デフォルト)の場合、最初のメッセージが送信されたときに接続が確立されます(他のコンポーネントが接続を確立したためにすでに存在しない場合)。 |
20 | true に設定すると、型 Iterable<Message<?>> のペイロードは、単一の RabbitTemplate 呼び出しのスコープ内の同じチャネルで個別のメッセージとして送信されます。RabbitTemplate が必要です。wait-for-confirms が true の場合、メッセージが送信された後に RabbitTemplate.waitForConfirmsOrDie() が呼び出されます。トランザクションテンプレートでは、送信は新しいトランザクションまたはすでに開始されているトランザクション(存在する場合)で実行されます。 |
return-channel
|
送信ゲートウェイ
次のリストは、AMQP 送信ゲートウェイの可能なプロパティを示しています。
@Bean
public IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate) {
return f -> f.handle(Amqp.outboundGateway(amqpTemplate)
.routingKey("foo")) // default exchange - route to queue 'foo'
.get();
}
@MessagingGateway(defaultRequestChannel = "amqpOutbound.input")
public interface MyGateway {
String sendToRabbit(String data);
}
@Bean
@ServiceActivator(inputChannel = "amqpOutboundChannel")
public AmqpOutboundEndpoint amqpOutbound(AmqpTemplate amqpTemplate) {
AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate);
outbound.setExpectReply(true);
outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo'
return outbound;
}
@Bean
public MessageChannel amqpOutboundChannel() {
return new DirectChannel();
}
@MessagingGateway(defaultRequestChannel = "amqpOutboundChannel")
public interface MyGateway {
String sendToRabbit(String data);
}
<int-amqp:outbound-gateway id="outboundGateway" (1)
request-channel="myRequestChannel" (2)
amqp-template="" (3)
exchange-name="" (4)
exchange-name-expression="" (5)
order="1" (6)
reply-channel="" (7)
reply-timeout="" (8)
requires-reply="" (9)
routing-key="" (10)
routing-key-expression="" (11)
default-delivery-mode"" (12)
confirm-correlation-expression="" (13)
confirm-ack-channel="" (14)
confirm-nack-channel="" (15)
confirm-timeout="" (16)
return-channel="" (17)
error-message-strategy="" (18)
lazy-connect="true" /> (19)
1 | このアダプターの一意の ID。オプション。 |
2 | メッセージを AMQP 交換に変換して公開するために送信されるメッセージチャネル。必須。 |
3 | 構成された AMQP テンプレートへの Bean 参照。オプション(デフォルトは amqpTemplate )。 |
4 | メッセージの送信先となる AMQP 交換の名前。指定しない場合、メッセージはデフォルトの名前なしの cxchange に送信されます。"exchange-name-expression" と相互に排他的です。オプション。 |
5 | メッセージをルートオブジェクトとして、メッセージの送信先 AMQP 交換の名前を決定するために評価される SpEL 式。指定しない場合、メッセージはデフォルトの名前のない交換に送信されます。"exchange-name" と相互に排他的。オプション。 |
6 | 複数のコンシューマーが登録されている場合のこのコンシューマーの順序。これにより、負荷分散とフェイルオーバーが可能になります。オプション(デフォルトは Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE] )。 |
7 | AMQP キューから受信して変換した後、応答が送信されるメッセージチャネル。オプション。 |
8 | reply-channel に応答メッセージを送信するときにゲートウェイが待機する時間。これは、reply-channel がブロックできる場合にのみ適用されます。たとえば、容量制限が現在いっぱいの QueueChannel などです。デフォルトは無限大です。 |
9 | true の場合、AmqpTemplate’s `replyTimeout プロパティ内で応答メッセージが受信されない場合、ゲートウェイは例外をスローします。デフォルトは true です。 |
10 | メッセージを送信するときに使用する routing-key 。デフォルトでは、これは空の String です。"routing-key-expression" と相互に排他的です。オプション。 |
11 | メッセージをルートオブジェクトとして使用してメッセージを送信するときに使用する routing-key を決定するために評価される SpEL 式(たとえば、"payload.key" )。デフォルトでは、これは空の String です。「ルーティングキー」と相互に排他的。オプション。 |
12 | メッセージのデフォルト配信モード: PERSISTENT または NON_PERSISTENT header-mapper が配信モードを設定するとオーバーライドされます。Spring Integration メッセージヘッダー amqp_deliveryMode が存在する場合、DefaultHeaderMapper は値を設定します。この属性が指定されておらず、ヘッダーマッパーが設定しない場合、デフォルトは RabbitTemplate によって使用される基になる Spring AMQP MessagePropertiesConverter に依存します。それがまったくカスタマイズされていない場合、デフォルトは PERSISTENT です。オプション。 |
13 | バージョン 4.2 以降。相関データを定義する式。提供される場合、これは、発行者の確認を受信するように基盤となる AMQP テンプレートを構成します。専用の RabbitTemplate と、publisherConfirms プロパティが true に設定された CachingConnectionFactory が必要です。パブリッシャーの確認が受信され、相関データが提供されると、確認の型に応じて、confirm-ack-channel または confirm-nack-channel のいずれかに書き込まれます。確認のペイロードは、この式で定義されている相関データです。メッセージのヘッダー 'amqp_publishConfirm' は true (ack )または false (nack )に設定されています。nack 確認の場合、Spring Integration は追加のヘッダー amqp_publishConfirmNackCause を提供します。例: headers['myCorrelationData'] および payload 。式が Message<?> インスタンス(#this など)に解決される場合、ack /nack チャネルで発行されるメッセージは、追加のヘッダーが追加されたそのメッセージに基づいています。以前は、型に関係なく、ペイロードとして相関データを使用して新しいメッセージが作成されていました。パブリッシャーの確認と return の代替メカニズムも参照してください。オプション。 |
14 | 肯定的な(ack )パブリッシャー確認が送信されるチャネル。ペイロードは、confirm-correlation-expression によって定義された相関データです。式が #root または #this の場合、メッセージは元のメッセージから構築され、amqp_publishConfirm ヘッダーは true に設定されます。パブリッシャーの確認と return の代替メカニズムも参照してください。オプション(デフォルトは nullChannel )。 |
15 | 否定的な(nack )パブリッシャー確認の送信先のチャネル。ペイロードは、confirm-correlation-expression によって定義された相関データです(ErrorMessageStrategy が構成されていない場合)。式が #root または #this の場合、メッセージは元のメッセージから構築され、amqp_publishConfirm ヘッダーは false に設定されます。ErrorMessageStrategy がある場合、メッセージは NackedAmqpMessageException ペイロードを持つ ErrorMessage です。パブリッシャーの確認と return の代替メカニズムも参照してください。オプション(デフォルトは nullChannel )。 |
16 | 設定すると、パブリッシャーがミリ秒単位のこの時間内に確認を受信しない場合、ゲートウェイは否定応答(nack)を合成します。保留確認は、この値の 50% ごとにチェックされるため、ナックが送信される実際の時間は、この値 1x と 1.5x の間になります。デフォルトなし(ナックは生成されません)。 |
17 | 返されたメッセージが送信されるチャネル。指定すると、基になる AMQP テンプレートは、配信不能メッセージをアダプターに返すように構成されます。ErrorMessageStrategy が構成されていない場合、メッセージは AMQP から受信したデータと次の追加ヘッダー amqp_returnReplyCode 、amqp_returnReplyText 、amqp_returnExchange 、amqp_returnRoutingKey から構成されます。ErrorMessageStrategy がある場合、メッセージは ReturnedAmqpMessageException ペイロードを持つ ErrorMessage です。パブリッシャーの確認と return の代替メカニズムも参照してください。オプション。 |
18 | 返されたメッセージまたは否定応答メッセージを送信するときに ErrorMessage インスタンスを構築するために使用される ErrorMessageStrategy 実装への参照。 |
19 | false に設定すると、エンドポイントはアプリケーションコンテキストの初期化中にブローカーへの接続を試行します。これにより、ブローカがダウンしている場合にエラーメッセージを記録することにより、不正な構成の「フェイルファースト」検出が可能になります。true (デフォルト)の場合、最初のメッセージが送信されたときに接続が確立されます(他のコンポーネントが接続を確立したためにすでに存在しない場合)。 |
return-channel
|
基礎となる AmqpTemplate のデフォルトの replyTimeout は 5 秒です。より長いタイムアウトが必要な場合は、template で設定する必要があります。 |
送信アダプターと送信ゲートウェイの構成の唯一の違いは、expectReply
プロパティの設定であることに注意してください。
非同期送信ゲートウェイ
前のセクションで説明したゲートウェイは同期的であり、応答を受信する(またはタイムアウトが発生する)まで送信スレッドが中断されます。Spring Integration バージョン 4.3 は、Spring AMQP からの AsyncRabbitTemplate
を使用する非同期ゲートウェイを追加しました。メッセージが送信されると、スレッドは送信操作が完了した直後に戻り、メッセージが受信されると、テンプレートのリスナーコンテナースレッドで応答が送信されます。これは、ゲートウェイがポーラースレッドで呼び出されるときに便利です。スレッドがリリースされ、フレームワーク内の他のタスクで使用できます。
次のリストは、AMQP 非同期送信ゲートウェイの可能な構成オプションを示しています。
@Configuration
public class AmqpAsyncApplication {
@Bean
public IntegrationFlow asyncAmqpOutbound(AsyncRabbitTemplate asyncRabbitTemplate) {
return f -> f
.handle(Amqp.asyncOutboundGateway(asyncRabbitTemplate)
.routingKey("queue1")); // default exchange - route to queue 'queue1'
}
@MessagingGateway(defaultRequestChannel = "asyncAmqpOutbound.input")
public interface MyGateway {
String sendToRabbit(String data);
}
}
@Configuration
public class AmqpAsyncConfig {
@Bean
@ServiceActivator(inputChannel = "amqpOutboundChannel")
public AsyncAmqpOutboundGateway amqpOutbound(AsyncRabbitTemplate asyncTemplate) {
AsyncAmqpOutboundGateway outbound = new AsyncAmqpOutboundGateway(asyncTemplate);
outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo'
return outbound;
}
@Bean
public AsyncRabbitTemplate asyncTemplate(RabbitTemplate rabbitTemplate,
SimpleMessageListenerContainer replyContainer) {
return new AsyncRabbitTemplate(rabbitTemplate, replyContainer);
}
@Bean
public SimpleMessageListenerContainer replyContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(ccf);
container.setQueueNames("asyncRQ1");
return container;
}
@Bean
public MessageChannel amqpOutboundChannel() {
return new DirectChannel();
}
}
<int-amqp:outbound-async-gateway id="asyncOutboundGateway" (1)
request-channel="myRequestChannel" (2)
async-template="" (3)
exchange-name="" (4)
exchange-name-expression="" (5)
order="1" (6)
reply-channel="" (7)
reply-timeout="" (8)
requires-reply="" (9)
routing-key="" (10)
routing-key-expression="" (11)
default-delivery-mode"" (12)
confirm-correlation-expression="" (13)
confirm-ack-channel="" (14)
confirm-nack-channel="" (15)
confirm-timeout="" (16)
return-channel="" (17)
lazy-connect="true" /> (18)
1 | このアダプターの一意の ID。オプション。 |
2 | メッセージを AMQP 交換に変換して公開するために送信するメッセージチャネル。必須。 |
3 | 構成された AsyncRabbitTemplate への Bean 参照。オプション(デフォルトは asyncRabbitTemplate )。 |
4 | メッセージの送信先となる AMQP 交換の名前。指定しない場合、メッセージはデフォルトの名前のない交換に送信されます。"exchange-name-expression" と相互に排他的です。オプション。 |
5 | メッセージがルートオブジェクトとして送信される AMQP 交換の名前を決定するために評価される SpEL 式。指定しない場合、メッセージはデフォルトの名前のない交換に送信されます。"exchange-name" と相互に排他的。オプション。 |
6 | 複数のコンシューマーが登録されている場合のこのコンシューマーの順序。これにより、負荷分散とフェイルオーバーが可能になります。オプション(デフォルトは Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE] )。 |
7 | AMQP キューから受信して変換した後、応答が送信されるメッセージチャネル。オプション。 |
8 | reply-channel に応答メッセージを送信するときにゲートウェイが待機する時間。これは、reply-channel がブロックできる場合にのみ適用されます。たとえば、容量制限が現在いっぱいの QueueChannel などです。デフォルトは無限です。 |
9 | AsyncRabbitTemplate’s `receiveTimeout プロパティ内で応答メッセージが受信されず、この設定が true の場合、ゲートウェイは受信メッセージの errorChannel ヘッダーにエラーメッセージを送信します。AsyncRabbitTemplate’s `receiveTimeout プロパティ内で応答メッセージが受信されず、この設定が false である場合、ゲートウェイはデフォルトの errorChannel (利用可能な場合)にエラーメッセージを送信します。デフォルトは true です。 |
10 | メッセージを送信するときに使用するルーティングキー。デフォルトでは、これは空の String です。"routing-key-expression" と相互に排他的です。オプション。 |
11 | メッセージをルートオブジェクトとして使用してメッセージを送信するときに使用するルーティングキーを決定するために評価される SpEL 式(たとえば、"payload.key" )。デフォルトでは、これは空の String です。「ルーティングキー」と相互に排他的。オプション。 |
12 | メッセージのデフォルト配信モード: PERSISTENT または NON_PERSISTENT header-mapper が配信モードを設定するとオーバーライドされます。Spring Integration メッセージヘッダー(amqp_deliveryMode )が存在する場合、DefaultHeaderMapper は値を設定します。この属性が指定されておらず、ヘッダーマッパーが設定しない場合、デフォルトは RabbitTemplate によって使用される基になる Spring AMQP MessagePropertiesConverter に依存します。カスタマイズされていない場合、デフォルトは PERSISTENT です。オプション。 |
13 | 相関データを定義する式。提供される場合、これは、パブリッシャーの確認を受信するように基盤となる AMQP テンプレートを構成します。専用の RabbitTemplate と、publisherConfirms プロパティが true に設定された CachingConnectionFactory が必要です。パブリッシャーの確認を受け取り、相関データが提供されると、確認の型に応じて、確認が confirm-ack-channel または confirm-nack-channel のいずれかに書き込まれます。確認のペイロードは、この式で定義された相関データであり、メッセージの "amqp_publishConfirm" ヘッダーは true (ack )または false (nack )に設定されています。nack インスタンスの場合、追加のヘッダー(amqp_publishConfirmNackCause )が提供されます。例: headers['myCorrelationData'] 、payload 。式が Message<?> インスタンス( "#this" など)に解決される場合、ack /nack チャネルで発行されるメッセージは、追加のヘッダーが追加されたそのメッセージに基づいています。パブリッシャーの確認と return の代替メカニズムも参照してください。オプション。 |
14 | 肯定的な(ack )パブリッシャー確認が送信されるチャネル。ペイロードは、confirm-correlation-expression によって定義された相関データです。基になる AsyncRabbitTemplate の enableConfirms プロパティを true に設定する必要があります。パブリッシャーの確認と return の代替メカニズムも参照してください。オプション(デフォルトは nullChannel )。 |
15 | バージョン 4.2 以降。否定的な(nack )パブリッシャー確認の送信先のチャネル。ペイロードは、confirm-correlation-expression によって定義された相関データです。基になる AsyncRabbitTemplate の enableConfirms プロパティを true に設定する必要があります。パブリッシャーの確認と return の代替メカニズムも参照してください。オプション(デフォルトは nullChannel )。 |
16 | 設定すると、この時間内にパブリッシャー確認が受信されない場合、ゲートウェイは否定応答(nack)を合成します(ミリ秒単位)。保留中の確認はこの値の 50% ごとにチェックされるため、nack が送信される実際の時間は 1x と 1.5x の間になります。パブリッシャーの確認と return の代替メカニズムも参照してください。デフォルトはなし(nack は生成されません)。 |
17 | 返されたメッセージが送信されるチャネル。提供されると、基盤となる AMQP テンプレートは、配信不能メッセージをゲートウェイに返すように構成されます。メッセージは、AMQP から受信したデータと、次の追加ヘッダーで構成されます: amqp_returnReplyCode 、amqp_returnReplyText 、amqp_returnExchange 、amqp_returnRoutingKey 。基になる AsyncRabbitTemplate で、mandatory プロパティを true に設定する必要があります。パブリッシャーの確認と return の代替メカニズムも参照してください。オプション。 |
18 | false に設定すると、エンドポイントはアプリケーションコンテキストの初期化中にブローカーへの接続を試行します。これにより、ブローカがダウンしている場合にエラーメッセージを記録することにより、不正な構成の「フェイルファースト」検出が可能になります。true (デフォルト)の場合、最初のメッセージが送信されたときに接続が確立されます(他のコンポーネントが接続を確立したためにすでに存在しない場合)。 |
詳細については、非同期サービスアクティベーターも参照してください。
RabbitTemplate 確認と return を使用する場合、 |
パブリッシャーの確認と return の代替メカニズム
接続ファクトリがパブリッシャーの確認と戻りに対して構成されている場合、上記のセクションでは、確認を受け取り非同期で返すメッセージチャネルの構成について説明します。バージョン 5.4 以降、一般的に使いやすいメカニズムが追加されています。
この場合、confirm-correlation-expression
または確認チャネルと戻りチャネルを構成しないでください。代わりに、AmqpHeaders.PUBLISH_CONFIRM_CORRELATION
ヘッダーに CorrelationData
インスタンスを追加します。その後、メッセージを送信した CorrelationData
インスタンスの将来の状態を確認することにより、後で結果を待つことができます。returnedMessage
フィールドは、未来が完了する前に(メッセージが返された場合)常に入力されます。
CorrelationData corr = new CorrelationData("someId"); // <--- Unique "id" is required for returns
someFlow.getInputChannel().send(MessageBuilder.withPayload("test")
.setHeader("rk", "someKeyThatWontRoute")
.setHeader(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION, corr)
.build());
...
try {
Confirm Confirm = corr.getFuture().get(10, TimeUnit.SECONDS);
Message returned = corr.getReturnedMessage();
if (returned !- null) {
// message could not be routed
}
}
catch { ... }
パフォーマンスを向上させるために、一度に 1 つではなく、複数のメッセージを送信して確認を待つことができます。返されるメッセージは、変換後の生のメッセージです。CorrelationData
を必要な追加データでサブクラス化できます。
受信メッセージの変換
チャネルアダプターまたはゲートウェイに到着する受信メッセージは、メッセージコンバーターを使用して spring-messaging
Message<?>
ペイロードに変換されます。デフォルトでは、java の直列化とテキストを処理する SimpleMessageConverter
が使用されます。ヘッダーは、デフォルトで DefaultHeaderMapper.inboundMapper()
を使用してマップされます。変換エラーが発生し、エラーチャネルが定義されていない場合、例外はコンテナーにスローされ、リスナーコンテナーのエラーハンドラーによって処理されます。デフォルトのエラーハンドラーは変換エラーを致命的なものとして扱い、メッセージは拒否されます(キューがそのように構成されている場合は、配信不能交換にルーティングされます)。エラーチャネルが定義されている場合、ErrorMessage
ペイロードは、プロパティ failedMessage
(変換できなかった Spring AMQP メッセージ)および cause
を持つ ListenerExecutionFailedException
です。コンテナー AcknowledgeMode
が AUTO
(デフォルト)であり、エラーフローが例外をスローせずにエラーを消費する場合、元のメッセージは確認応答されます。エラーフローが例外をスローした場合、例外型は、コンテナーのエラーハンドラーと組み合わせて、メッセージが再キューイングされるかどうかを決定します。コンテナーが AcknowledgeMode.MANUAL
で構成されている場合、ペイロードは ManualAckListenerExecutionFailedException
であり、追加のプロパティ channel
および deliveryTag
があります。これにより、エラーフローは、メッセージの basicAck
または basicNack
(または basicReject
)を呼び出して、メッセージの処理を制御できます。
送信メッセージの変換
Spring AMQP 1.4 は ContentTypeDelegatingMessageConverter
を導入しましたが、実際のコンバーターは受信コンテンツ型メッセージプロパティに基づいて選択されます。これは、受信エンドポイントで使用できます。
Spring Integration バージョン 4.3 では、使用するコンバーターを指定する contentType
ヘッダーを使用して、送信エンドポイントでも ContentTypeDelegatingMessageConverter
を使用できます。
次の例では、ContentTypeDelegatingMessageConverter
を設定します。デフォルトのコンバーターは SimpleMessageConverter
(Java 直列化とプレーンテキストを処理します)で、JSON コンバーターを使用します。
<amqp:outbound-channel-adapter id="withContentTypeConverter" channel="ctRequestChannel"
exchange-name="someExchange"
routing-key="someKey"
amqp-template="amqpTemplateContentTypeConverter" />
<int:channel id="ctRequestChannel"/>
<rabbit:template id="amqpTemplateContentTypeConverter"
connection-factory="connectionFactory" message-converter="ctConverter" />
<bean id="ctConverter"
class="o.s.amqp.support.converter.ContentTypeDelegatingMessageConverter">
<property name="delegates">
<map>
<entry key="application/json">
<bean class="o.s.amqp.support.converter.Jackson2JsonMessageConverter" />
</entry>
</map>
</property>
</bean>
contentType
ヘッダーを application/json
に設定してメッセージを ctRequestChannel
に送信すると、JSON コンバーターが選択されます。
これは、発信チャネルアダプターとゲートウェイの両方に適用されます。
バージョン 5.0 以降、発信メッセージの ただし、以前の動作が必要な場合があります。たとえば、JSON を含む 送信チャネルアダプターとゲートウェイ(および AMQP でバックアップされたチャネル)に バージョン 5.1.9 以降、同様の |
発信ユーザー ID
Spring AMQP バージョン 1.6 は、送信メッセージのデフォルトユーザー ID を指定できるメカニズムを導入しました。AmqpHeaders.USER_ID
ヘッダーを設定することは常に可能であり、現在ではデフォルトよりも優先されます。これは、メッセージの受信者に役立つ場合があります。受信メッセージの場合、メッセージ発行者がプロパティを設定すると、AmqpHeaders.RECEIVED_USER_ID
ヘッダーで使用可能になります。RabbitMQ は、ユーザー ID が接続の実際のユーザー ID であるか、接続で偽装が許可されていることを検証すること (英語) に注意してください。
送信メッセージのデフォルトユーザー ID を構成するには、RabbitTemplate
でそれを構成し、そのテンプレートを使用するように送信アダプターまたはゲートウェイを構成します。同様に、返信にユーザー ID プロパティを設定するには、適切に構成されたテンプレートを受信ゲートウェイに挿入します。詳細については、Spring AMQP ドキュメントを参照してください。
遅延メッセージ交換
Spring AMQP は RabbitMQ 遅延メッセージ交換プラグインをサポートします。受信メッセージの場合、x-delay
ヘッダーは AmqpHeaders.RECEIVED_DELAY
ヘッダーにマップされます。AMQPHeaders.DELAY
ヘッダーを設定すると、対応する x-delay
ヘッダーが送信メッセージに設定されます。発信エンドポイントで delay
および delayExpression
プロパティを指定することもできます(XML 構成を使用する場合は delay-expression
)。これらのプロパティは、AmqpHeaders.DELAY
ヘッダーよりも優先されます。
AMQP-backed メッセージチャネル
2 つのメッセージチャネル実装が利用可能です。1 つはポイントツーポイントで、もう 1 つはパブリッシュ / サブスクライブです。これらのチャネルは両方とも、基礎となる AmqpTemplate
および SimpleMessageListenerContainer
の幅広い構成属性を提供します(この章でチャネルアダプターとゲートウェイについて前述したとおり)。ただし、ここで示す例の構成は最小限です。XML スキーマを調べて、使用可能な属性を表示します。
ポイントツーポイントチャネルは、次の例のようになります。
<int-amqp:channel id="p2pChannel"/>
上記の例では、カバーで si.p2pChannel
という名前の Queue
が宣言され、このチャネルは(技術的には、この Queue
の名前と一致するルーティングキーを使用して、無名直接交換に送信することにより) Queue
に送信します。このチャネルは、Queue
のコンシューマーも登録します。チャネルをメッセージ駆動型ではなく「ポーリング可能」にする場合は、次の例に示すように、message-driven
フラグに false
の値を指定します。
<int-amqp:channel id="p2pPollableChannel" message-driven="false"/>
パブリッシュ / サブスクライブチャネルは次のようになります。
<int-amqp:publish-subscribe-channel id="pubSubChannel"/>
上記の例では、カバーで、si.fanout.pubSubChannel
という名前のファンアウト交換が宣言され、このチャネルはそのファンアウト交換に送信します。このチャネルは、サーバー名の排他的、自動削除、非永続 Queue
も宣言し、メッセージを受信するためにその Queue
にコンシューマーを登録している間、それをファンアウト交換にバインドします。publish-subscribe-channel には「ポーリング可能な」オプションはありません。メッセージ駆動型でなければなりません。
バージョン 4.1 以降、AMQP-backed メッセージチャネルは(channel-transacted
と組み合わせて) template-channel-transacted
をサポートし、AbstractMessageListenerContainer
と RabbitTemplate
の transactional
構成を分離します。以前は、channel-transacted
はデフォルトで true
でした。現在、デフォルトでは、AbstractMessageListenerContainer
の場合は false
です。
バージョン 4.3 より前は、AMQP-backed チャネルは Serializable
ペイロードとヘッダーを持つメッセージのみをサポートしていました。メッセージ全体が変換(直列化)され、RabbitMQ に送信されました。これで、extract-payload
属性(または Java 構成を使用する場合は setExtractPayload()
)を true
に設定できます。このフラグが true
の場合、チャネルアダプターを使用する場合と同様の方法で、メッセージペイロードが変換され、ヘッダーがマップされます。この配置により、AMQP でバックアップされたチャネルを、直列化できないペイロード(おそらく、Jackson2JsonMessageConverter
などの別のメッセージコンバーター)で使用できます。デフォルトのマッピングされたヘッダーの詳細については、AMQP メッセージヘッダーを参照してください。outbound-header-mapper
および inbound-header-mapper
属性を使用するカスタムマッパーを提供することにより、マッピングを変更できます。default-delivery-mode
も指定できるようになりました。これは、amqp_deliveryMode
ヘッダーがない場合に配信モードを設定するために使用されます。デフォルトでは、Spring AMQP MessageProperties
は PERSISTENT
配信モードを使用します。
他の永続化に裏付けられたチャネルと同様に、AMQP に裏付けられたチャネルは、メッセージの永続化を提供してメッセージの損失を回避することを目的としています。他のピアアプリケーションに作業を分配することを目的としていません。そのためには、代わりにチャネルアダプターを使用してください。 |
バージョン 5.0 以降、ポーリング可能チャネルは、指定された receiveTimeout のポーラースレッドをブロックするようになりました(デフォルトは 1 秒です)。以前は、他の PollableChannel 実装とは異なり、受信タイムアウトに関係なく、メッセージが利用できない場合、スレッドはすぐにスケジューラーに戻りました。各メッセージを受信するにはコンシューマーを作成する必要があるため、ブロッキングは basicGet() を使用してメッセージを取得する(タイムアウトなし)よりも少し高負荷です。以前の動作を復元するには、ポーラーの receiveTimeout を 0 に設定します。 |
Java 構成を使用した構成
次の例は、Java 構成でチャネルを構成する方法を示しています。
@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("foo");
factoryBean.setPubSub(false);
return factoryBean;
}
@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("bar");
factoryBean.setPubSub(false);
return factoryBean;
}
@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("baz");
factoryBean.setPubSub(false);
return factoryBean;
}
Java DSL を使用した構成
次の例は、Java DSL でチャネルを構成する方法を示しています。
@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(...)
...
.channel(Amqp.pollableChannel(connectionFactory)
.queueName("foo"))
...
.get();
}
@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(...)
...
.channel(Amqp.channel(connectionFactory)
.queueName("bar"))
...
.get();
}
@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(...)
...
.channel(Amqp.publishSubscribeChannel(connectionFactory)
.queueName("baz"))
...
.get();
}
AMQP メッセージヘッダー
概要
Spring Integration AMQP アダプターは、すべての AMQP プロパティとヘッダーを自動的にマッピングします。(これは 4.3 からの変更です - 以前は、標準ヘッダーのみがマップされていました)。デフォルトでは、これらのプロパティは、DefaultAmqpHeaderMapper
(Javadoc) を使用して Spring Integration MessageHeaders
との間でコピーされます。
アダプターには、それをサポートするプロパティがあるため、AMQP 固有のヘッダーマッパーの独自の実装を渡すことができます。
DefaultAmqpHeaderMapper
の requestHeaderNames
または replyHeaderNames
プロパティによって明示的に否定されない限り、AMQP MessageProperties
(Javadoc) 内のユーザー定義ヘッダーは AMQP メッセージとの間でコピーされます。デフォルトでは、送信マッパーの場合、x-*
ヘッダーはマップされません。理由については、このセクションで後述する注意を参照してください。
デフォルトを上書きして 4.3 より前の動作に戻すには、プロパティで STANDARD_REQUEST_HEADERS
および STANDARD_REPLY_HEADERS
を使用します。
ユーザー定義のヘッダーをマッピングする場合、値には、照合する単純なワイルドカードパターン(thing* や *thing など)も含めることができます。* はすべてのヘッダーに一致します。 |
バージョン 4.1 から、AbstractHeaderMapper
(DefaultAmqpHeaderMapper
スーパークラス)により、NON_STANDARD_HEADERS
トークンを(既存の STANDARD_REQUEST_HEADERS
および STANDARD_REPLY_HEADERS
に加えて) requestHeaderNames
および replyHeaderNames
プロパティ用に構成して、すべてのユーザー定義ヘッダーをマップできます。
org.springframework.amqp.support.AmqpHeaders
クラスは、DefaultAmqpHeaderMapper
によって使用されるデフォルトヘッダーを識別します。
amqp_appId
amqp_clusterId
amqp_contentEncoding
amqp_contentLength
content-type
(contentType
ヘッダーを参照してください)amqp_correlationId
amqp_delay
amqp_deliveryMode
amqp_deliveryTag
amqp_expiration
amqp_messageCount
amqp_messageId
amqp_receivedDelay
amqp_receivedDeliveryMode
amqp_receivedExchange
amqp_receivedRoutingKey
amqp_redelivered
amqp_replyTo
amqp_timestamp
amqp_type
amqp_userId
amqp_publishConfirm
amqp_publishConfirmNackCause
amqp_returnReplyCode
amqp_returnReplyText
amqp_returnExchange
amqp_returnRoutingKey
amqp_channel
amqp_consumerTag
amqp_consumerQueue
このセクションで前述したように、すべてのヘッダーをコピーする一般的な方法は、* のヘッダーマッピングパターンを使用することです。ただし、特定の RabbitMQ 独自のプロパティ / ヘッダーもコピーされるため、予期しない副作用が生じる可能性があります。例: federation (英語) を使用する場合、受信したメッセージには、メッセージを送信したノードを含む x-received-from という名前のプロパティがある場合があります。ワイルドカード文字 * を受信ゲートウェイのリクエストおよびリプライヘッダーマッピングに使用すると、このヘッダーがコピーされ、フェデレーションで問題が発生する可能性があります。この応答メッセージは送信ブローカーにフェデレートされ、メッセージがループしていると見なされ、結果としてメッセージを表示せずにドロップする場合があります。ワイルドカードヘッダーマッピングの便利な機能を使用する場合は、ダウンストリームフローの一部のヘッダーを除外する必要がある場合があります。例: x-received-from ヘッダーを返信にコピーしないようにするために、AMQP 受信ゲートウェイに返信を送信する前に <int:header-filter … header-names="x-received-from"> を使用できます。または、ワイルドカードを使用する代わりに、実際にマップしたいプロパティを明示的にリストすることもできます。これらの理由により、受信メッセージの場合、マッパーは(デフォルトで) x-* ヘッダーをマップしません。また、受信メッセージから送信メッセージへのヘッダーの伝搬を回避するために、deliveryMode を amqp_deliveryMode ヘッダーにマップしません。代わりに、このヘッダーは amqp_receivedDeliveryMode にマップされますが、出力にはマップされません。 |
バージョン 4.3 以降、パターンの前に !
を付けることで、ヘッダーマッピングのパターンを無効にすることができます。否定パターンが優先されるため、STANDARD_REQUEST_HEADERS,thing1,ba*,!thing2,!thing3,qux,!thing1
などのリストは thing1
をマップしません(thing2
も thing3
もマップしません)。標準ヘッダーと bad
および qux
がマッピングされます。たとえば、受信側のダウンストリームで JSON 逆直列化ロジックが異なる方法で JSON 逆直列化ロジックが実行されたときに、受信メッセージの JSON 型ヘッダーをマップしないようにするために、否定手法は役立ちます。この目的のために、受信チャネルアダプター / ゲートウェイのヘッダーマッパーに !json_*
パターンを構成する必要があります。
マッピングしたい ! で始まるユーザー定義ヘッダーがある場合、次のように \ でエスケープする必要があります: STANDARD_REQUEST_HEADERS,\!myBangHeader 。!myBangHeader という名前のヘッダーがマップされました。 |
バージョン 5.1 以降、対応する amqp_messageId または amqp_timestamp ヘッダーが送信メッセージに存在しない場合、DefaultAmqpHeaderMapper は、それぞれ MessageHeaders.ID および MessageHeaders.TIMESTAMP から MessageProperties.messageId および MessageProperties.timestamp へのマッピングにフォールバックします。受信プロパティは、以前と同様に amqp_* ヘッダーにマップされます。メッセージコンシューマーがステートフルリトライを使用している場合は、messageId プロパティを設定すると便利です。 |
contentType
ヘッダー
他のヘッダーとは異なり、AmqpHeaders.CONTENT_TYPE
の先頭には amqp_
が付きません。これにより、異なるテクノロジー間で contentType ヘッダーを透過的に渡すことができます。たとえば、RabbitMQ キューに送信される受信 HTTP メッセージ。
contentType
ヘッダーは Spring AMQP の MessageProperties.contentType
プロパティにマップされ、その後 RabbitMQ の content_type
プロパティにマップされます。
バージョン 5.1 より前のバージョンでは、このヘッダーも MessageProperties.headers
マップのエントリとしてマップされていました。これは誤りであり、さらに、基になる Spring AMQP メッセージコンバーターがコンテンツ型を変更した可能性があるため、値が誤っている可能性があります。このような変更は、ファーストクラスの content_type
プロパティに反映されますが、RabbitMQ ヘッダーマップには反映されません。受信マッピングでヘッダーマップ値が無視されました。contentType
は、ヘッダーマップのエントリにマップされなくなりました。
厳密なメッセージ順序
このセクションでは、受信および送信メッセージのメッセージ順序について説明します。
受信
受信メッセージの厳密な順序付けが必要な場合は、受信リスナーコンテナーの prefetchCount
プロパティを 1
に設定する必要があります。これは、メッセージが失敗して再配信された場合、既存のプリフェッチされたメッセージの後に到着するためです。Spring AMQP バージョン 2.0 以降、prefetchCount
はパフォーマンスを向上させるために 250
にデフォルト設定されています。厳密なオーダー要件には、パフォーマンスの低下という代償が伴います。
送信
次の統合フローを検討してください。
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlows.from(Gateway.class)
.split(s -> s.delimiters(","))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
}
メッセージ A
、B
、C
をゲートウェイに送信するとします。メッセージ A
、B
、C
が順番に送信される可能性がありますが、保証はありません。これは、テンプレートが送信操作ごとにキャッシュからチャネルを「借用」し、メッセージごとに同じチャネルが使用される保証がないためです。1 つの解決策は、スプリッターの前にトランザクションを開始することですが、RabbitMQ ではトランザクションが高負荷であり、パフォーマンスが数百分の 1 に低下する可能性があります。
この問題をより効率的な方法で解決するために、バージョン 5.1 以降、Spring Integration は HandleMessageAdvice
である BoundRabbitChannelAdvice
を提供しています。メッセージアドバイスの処理を参照してください。スプリッターの前に適用すると、すべてのダウンストリーム操作が同じチャネルで実行され、オプションで、すべての送信メッセージのパブリッシャー確認が受信されるまで待機できます(接続ファクトリが確認用に構成されている場合)。次の例は、BoundRabbitChannelAdvice
の使用方法を示しています。
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlows.from(Gateway.class)
.split(s -> s.delimiters(",")
.advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
}
同じ RabbitTemplate
(RabbitOperations
を実装)がアドバイスと送信アダプターで使用されていることに注意してください。アドバイスは、すべての操作が同じチャネルで実行されるように、テンプレートの invoke
メソッド内でダウンストリームフローを実行します。オプションのタイムアウトが提供されている場合、フローが完了すると、アドバイスは waitForConfirmsOrDie
メソッドを呼び出します。waitForConfirmsOrDie
メソッドは、指定された時間内に確認が受信されない場合に例外をスローします。
ダウンストリームフロー(QueueChannel 、ExecutorChannel など)にスレッドハンドオフがあってはなりません。 |
AMQP サンプル
AMQP アダプターを試すには、https://github.com/SpringSource/spring-integration-samples (英語) の Spring Integration サンプル git リポジトリで入手可能なサンプルを確認してください。
現在、1 つのサンプルは、送信チャネルアダプターと受信チャネルアダプターを使用して、Spring Integration AMQP アダプターの基本機能を示しています。サンプルの AMQP ブローカー実装では RabbitMQ (英語) を使用します。
この例を実行するには、RabbitMQ の実行中のインスタンスが必要です。基本的なデフォルトだけのローカルインストールで十分です。RabbitMQ の詳細なインストール手順については、https://www.rabbitmq.com/install.html (英語) を参照してください |
サンプルアプリケーションが起動したら、コマンドプロンプトにテキストを入力すると、入力したテキストを含むメッセージが AMQP キューにディスパッチされます。その代わりに、そのメッセージは Spring Integration によって取得され、コンソールに出力されます。
次の図は、このサンプルで使用される Spring Integration コンポーネントの基本セットを示しています。