メッセージチャンネル

EIP メソッドを備えた IntegrationFlowBuilder に加えて、Java DSL は MessageChannel インスタンスを構成するための流れるような API を提供します。この目的のために、MessageChannels ビルダーファクトリが提供されています。次の例は、その使用方法を示しています。

@Bean
public PriorityChannelSpec priorityChannel() {
    return MessageChannels.priority(this.mongoDbChannelMessageStore, "priorityGroup")
                        .interceptor(wireTap());
}

XML 構成で input-channel/output-channel ペアを接続するのと同様に、同じ MessageChannels ビルダーファクトリを IntegrationFlowBuilder から channel() EIP メソッドで使用してエンドポイントを接続することができます。デフォルトでは、エンドポイントは Bean 名がパターン [IntegrationFlow.beanName].channel#[channelNameIndex] に基づく DirectChannel インスタンスと接続されます。このルールは、インライン MessageChannels ビルダーファクトリの使用によって生成される名前のないチャネルにも適用されます。ただし、すべての MessageChannels メソッドには、MessageChannel インスタンスの Bean 名を設定するために使用できる channelId を認識するバリアントがあります。MessageChannel 参照と beanName は、Bean メソッド呼び出しとして使用できます。次の例は、channel() EIP メソッドを使用する可能な方法を示しています。

@Bean
public QueueChannelSpec queueChannel() {
    return MessageChannels.queue();
}

@Bean
public PublishSubscribeChannelSpec<?> publishSubscribe() {
    return MessageChannels.publishSubscribe();
}

@Bean
public IntegrationFlow channelFlow() {
    return IntegrationFlow.from("input")
                .fixedSubscriberChannel()
                .channel("queueChannel")
                .channel(publishSubscribe())
                .channel(MessageChannels.executor("executorChannel", this.taskExecutor))
                .channel("output")
                .get();
}
  • from("input") は、「「「入力」ID で MessageChannel を見つけて使用するか、作成します」」という意味です。

  • fixedSubscriberChannel() は FixedSubscriberChannel のインスタンスを生成し、channelFlow.channel#0 という名前で登録します。

  • channel("queueChannel") も同じように機能しますが、既存の queueChannel Bean を使用します。

  • channel(publishSubscribe()) は Bean メソッドのリファレンスです。

  • channel(MessageChannels.executor("executorChannel", this.taskExecutor)) は、IntegrationComponentSpec を ExecutorChannel に公開し、それを executorChannel として登録する IntegrationFlowBuilder です。

  • channel("output") は、この名前の Bean がすでに存在しない限り、DirectChannel Bean を名前として output に登録します。

メモ: 上記の IntegrationFlow 定義は有効であり、そのチャネルはすべて BridgeHandler インスタンスを持つエンドポイントに適用されます。

異なる IntegrationFlow インスタンスから MessageChannels ファクトリを介して同じインラインチャネル定義を使用するよう注意してください。DSL パーサーが存在しないオブジェクトを Bean として登録しても、異なる IntegrationFlow コンテナーから同じオブジェクト(MessageChannel)を判別できません。次の例は間違っています。
@Bean
public IntegrationFlow startFlow() {
    return IntegrationFlow.from("input")
                .transform(...)
                .channel(MessageChannels.queue("queueChannel"))
                .get();
}

@Bean
public IntegrationFlow endFlow() {
    return IntegrationFlow.from(MessageChannels.queue("queueChannel"))
                .handle(...)
                .get();
}

その悪い例の結果は、次の例外です。

Caused by: java.lang.IllegalStateException:
Could not register object [queueChannel] under bean name 'queueChannel':
     there is already object [queueChannel] bound
	    at o.s.b.f.s.DefaultSingletonBeanRegistry.registerSingleton(DefaultSingletonBeanRegistry.java:129)

動作させるには、そのチャネルに対して @Bean を宣言し、異なる IntegrationFlow インスタンスから Bean メソッドを使用する必要があります。