Java DSL

Spring Integration Java 構成および DSL は、Spring @Configuration クラスから Spring Integration メッセージフローを構成できる便利なビルダーと流れるような API のセットを提供します。

Kotlin DSL も参照してください)

Spring Integration の Java DSL は、本質的に Spring Integration のファサードです。DSL は、Spring Framework および Spring Integration からの既存の Java 構成とともに、流れるような Builder パターンを使用することにより、Spring Integration メッセージフローをアプリケーションに埋め込む簡単な方法を提供します。また、ラムダ(Java 8 で使用可能)を使用およびサポートして、Java 構成をさらに簡素化します。

カフェ [GitHub] (英語) は DSL を使用する良い例です。

DSL は、IntegrationFlowBuilder の IntegrationFlows ファクトリによって提示されます。これにより、IntegrationFlow コンポーネントが生成され、Spring Bean として登録する必要があります(@Bean アノテーションを使用)。ビルダーパターンは、任意の複雑な構造を、ラムダを引数として受け入れることができるメソッドの階層として表現するために使用されます。

IntegrationFlowBuilder は、IntegrationFlowBeanPostProcessor によるアプリケーションコンテキストでの具象 Bean のさらなる解析と登録のために、IntegrationFlow Bean の統合コンポーネント(MessageChannel インスタンス、AbstractEndpoint インスタンスなど)のみを収集します。

Java DSL は Spring Integration クラスを直接使用し、XML の生成と解析をバイパスします。ただし、DSL は XML に加えて構文上の砂糖以上のものを提供します。その最も魅力的な機能の 1 つは、インラインラムダを定義してエンドポイントロジックを実装する機能であり、カスタムロジックを実装するための外部クラスは不要です。ある意味では、Spring Integration の Spring Expression Language(SpEL)およびインラインスクリプトのサポートがこれに対処していますが、ラムダはより簡単で強力です。

次の例は、Spring Integration の Java 構成の使用方法を示しています。

@Configuration
@EnableIntegration
public class MyConfiguration {

    @Bean
    public AtomicInteger integerSource() {
        return new AtomicInteger();
    }

    @Bean
    public IntegrationFlow myFlow() {
        return IntegrationFlows.fromSupplier(integerSource()::getAndIncrement,
                                         c -> c.poller(Pollers.fixedRate(100)))
                    .channel("inputChannel")
                    .filter((Integer p) -> p > 0)
                    .transform(Object::toString)
                    .channel(MessageChannels.queue())
                    .get();
    }
}

上記の構成例の結果、ApplicationContext の起動後に Spring Integration エンドポイントとメッセージチャネルが作成されます。Java 構成は、XML 構成の置き換えと拡張の両方に使用できます。Java 構成を使用するために、既存の XML 構成をすべて置き換える必要はありません。

DSL の基本

org.springframework.integration.dsl パッケージには、前述の IntegrationFlowBuilder API と多くの IntegrationComponentSpec 実装が含まれています。これらは、ビルダーでもあり、具体的なエンドポイントを構成するための流れるような API を提供します。IntegrationFlowBuilder インフラストラクチャは、チャネル、エンドポイント、ポーラー、チャネルインターセプターなどのメッセージベースのアプリケーションに共通のエンタープライズ統合パターン (英語) (EIP)を提供します。

エンドポイントは、読みやすくするために DSL で動詞として表されます。次のリストには、一般的な DSL メソッド名と関連する EIP エンドポイントが含まれています。

  • 変換→ Transformer

  • フィルター→ Filter

  • ハンドル→ ServiceActivator

  • 分割→ Splitter

  • 集約→ Aggregator

  • ルート→ Router

  • ブリッジ→ Bridge

概念的には、統合プロセスは、これらのエンドポイントを 1 つ以上のメッセージフローに構成することによって構築されます。EIP は「メッセージフロー」という用語を正式に定義していませんが、よく知られているメッセージングパターンを使用する作業単位として考えると便利です。DSL は、チャネルとそれらの間のエンドポイントの構成を定義する IntegrationFlow コンポーネントを提供しますが、現在、IntegrationFlow は、アプリケーションコンテキストで実際の Bean を生成するための構成のロールのみを果たし、実行時には使用されません。ただし、IntegrationFlow の Bean を Lifecycle としてオートワイヤーして、この IntegrationFlow に関連付けられたすべての Spring Integration コンポーネントに委譲されたフロー全体の start() および stop() を制御できます。次の例では、IntegrationFlows ファクトリを使用して、IntegrationFlowBuilder の EIP メソッドを使用して IntegrationFlow Bean を定義しています。

@Bean
public IntegrationFlow integerFlow() {
    return IntegrationFlows.from("input")
            .<String, Integer>transform(Integer::parseInt)
            .get();
}

transform メソッドは、メッセージペイロードを操作するためのエンドポイント引数としてラムダを受け入れます。このメソッドの本当の引数は GenericTransformer<S, T> です。ここでは、提供されているトランスフォーマー(ObjectToJsonTransformerFileToStringTransformer など)のいずれかを使用できます。

内部では、IntegrationFlowBuilder は MessageHandler とそのエンドポイントを、それぞれ MessageTransformingHandler と ConsumerEndpointFactoryBean で認識します。別の例を考えてみましょう:

@Bean
public IntegrationFlow myFlow() {
    return IntegrationFlows.from("input")
                .filter("World"::equals)
                .transform("Hello "::concat)
                .handle(System.out::println)
                .get();
}

上記の例は、Filter → Transformer → Service Activator のシーケンスを構成します。フローは " 'one way'" です。つまり、応答メッセージは提供せず、ペイロードを STDOUT に出力するだけです。エンドポイントは、直接チャネルを使用して自動的に相互接続されます。

ラムダと Message<?> 引数

EIP メソッドでラムダを使用する場合、「入力」引数は通常、メッセージペイロードです。メッセージ全体にアクセスする場合は、Class<?> を最初のパラメーターとして受け取るオーバーロードメソッドのいずれかを使用します。例: これは機能しません:

.<Message<?>, Foo>transform(m -> newFooFromMessage(m))

ラムダは引数型を保持せず、フレームワークはペイロードを Message<?> にキャストしようとするため、これは実行時に ClassCastException で失敗します。

代わりに、次を使用します。

.(Message.class, m -> newFooFromMessage(m))
Bean 定義のオーバーライド

Java DSL は、フロー定義でインラインで定義されたオブジェクトの Bean を登録でき、既存の注入された Bean を再利用できます。インラインオブジェクトと既存の Bean 定義に同じ Bean 名が定義されている場合、そのような構成が間違っていることを示す BeanDefinitionOverrideException がスローされます。ただし、prototype Bean を扱う場合、BeanFactory から prototype Bean を呼び出すたびに新しいインスタンスが取得されるため、統合フロープロセッサーから既存の Bean 定義を検出する方法はありません。このようにして、Bean 登録および既存の prototype Bean 定義に対する可能なチェックなしで、IntegrationFlow で提供されたインスタンスがそのまま使用されます。ただし、明示的な id があり、この名前の Bean 定義が prototype スコープ内にある場合、BeanFactory.initializeBean() はこのオブジェクトに対して呼び出されます。

メッセージチャンネル

EIP メソッドを備えた IntegrationFlowBuilder に加えて、Java DSL は MessageChannel インスタンスを構成するための流れるような API を提供します。この目的のために、MessageChannels ビルダーファクトリが提供されています。次の例は、その使用方法を示しています。

@Bean
public MessageChannel priorityChannel() {
    return MessageChannels.priority(this.mongoDbChannelMessageStore, "priorityGroup")
                        .interceptor(wireTap())
                        .get();
}

XML 構成で input-channel/output-channel ペアを接続するのと同様に、同じ MessageChannels ビルダーファクトリを IntegrationFlowBuilder から接続エンドポイントへの channel() EIP メソッドで使用できます。デフォルトでは、エンドポイントは DirectChannel インスタンスに接続されており、Bean 名は次のパターンに基づいています: [IntegrationFlow.beanName].channel#[channelNameIndex]。このルールは、インライン MessageChannels ビルダーファクトリの使用によって生成された名前のないチャネルにも適用されます。ただし、すべての MessageChannels メソッドには、MessageChannel インスタンスの Bean 名を設定するために使用できる channelId を認識するバリアントがあります。MessageChannel 参照と beanName は、Bean メソッドの呼び出しとして使用できます。次の例は、channel() EIP メソッドを使用するための可能な方法を示しています。

@Bean
public MessageChannel queueChannel() {
    return MessageChannels.queue().get();
}

@Bean
public MessageChannel publishSubscribe() {
    return MessageChannels.publishSubscribe().get();
}

@Bean
public IntegrationFlow channelFlow() {
    return IntegrationFlows.from("input")
                .fixedSubscriberChannel()
                .channel("queueChannel")
                .channel(publishSubscribe())
                .channel(MessageChannels.executor("executorChannel", this.taskExecutor))
                .channel("output")
                .get();
}
  • from("input") は、「「「入力」ID で MessageChannel を見つけて使用するか、作成します」」という意味です。

  • fixedSubscriberChannel() は FixedSubscriberChannel のインスタンスを生成し、channelFlow.channel#0 という名前で登録します。

  • channel("queueChannel") も同じように機能しますが、既存の queueChannel Bean を使用します。

  • channel(publishSubscribe()) は Bean メソッドのリファレンスです。

  • channel(MessageChannels.executor("executorChannel", this.taskExecutor)) は、IntegrationComponentSpec を ExecutorChannel に公開し、それを executorChannel として登録する IntegrationFlowBuilder です。

  • channel("output") は、この名前の Bean がすでに存在しない限り、DirectChannel Bean を名前として output に登録します。

メモ: 上記の IntegrationFlow 定義は有効であり、そのチャネルはすべて BridgeHandler インスタンスを持つエンドポイントに適用されます。

異なる IntegrationFlow インスタンスから MessageChannels ファクトリを介して同じインラインチャネル定義を使用するよう注意してください。DSL パーサーが存在しないオブジェクトを Bean として登録しても、異なる IntegrationFlow コンテナーから同じオブジェクト(MessageChannel)を判別できません。次の例は間違っています。
@Bean
public IntegrationFlow startFlow() {
    return IntegrationFlows.from("input")
                .transform(...)
                .channel(MessageChannels.queue("queueChannel"))
                .get();
}

@Bean
public IntegrationFlow endFlow() {
    return IntegrationFlows.from(MessageChannels.queue("queueChannel"))
                .handle(...)
                .get();
}

その悪い例の結果は、次の例外です。

Caused by: java.lang.IllegalStateException:
Could not register object [queueChannel] under bean name 'queueChannel':
     there is already object [queueChannel] bound
	    at o.s.b.f.s.DefaultSingletonBeanRegistry.registerSingleton(DefaultSingletonBeanRegistry.java:129)

動作させるには、そのチャネルに対して @Bean を宣言し、異なる IntegrationFlow インスタンスから Bean メソッドを使用する必要があります。

ポーラー

Spring Integration は、AbstractPollingEndpoint 実装用に PollerMetadata を構成できる Fluent API も提供します。次の例に示すように、Pollers ビルダーファクトリを使用して、一般的な Bean 定義または IntegrationFlowBuilder EIP メソッドから作成された定義を構成できます。

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
    return Pollers.fixedRate(500)
        .errorChannel("myErrors");
}

詳細については、Javadoc の Pollers (Javadoc) および PollerSpec (Javadoc) を参照してください。

DSL を使用して PollerSpec を @Bean として作成する場合、Bean 定義の get() メソッドを呼び出さないでください。PollerSpec は、仕様から PollerMetadata オブジェクトを生成し、そのすべてのプロパティを初期化する FactoryBean です。

reactive() エンドポイント

バージョン 5.5 以降、ConsumerEndpointSpec は、オプションのカスタマイザー Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>> を備えた reactive() 構成プロパティを提供します。このオプションは、IntegrationReactiveUtils.messageChannelToFlux() を介して Flux に変換される入力チャネル型とは関係なく、ターゲットエンドポイントを ReactiveStreamsConsumer インスタンスとして構成します。提供された関数は、Flux.transform() オペレーターから使用され、入力チャネルからのリアクティブストリームソースをカスタマイズします(publishOn()log()doOnNext() など)。

次の例は、最終的なサブスクライバーとプロデューサーに関係なく、公開スレッドを入力チャネルからその DirectChannel に変更する方法を示しています。

@Bean
public IntegrationFlow reactiveEndpointFlow() {
    return IntegrationFlows
            .from("inputChannel")
            .<String, Integer>transform(Integer::parseInt,
                    e -> e.reactive(flux -> flux.publishOn(Schedulers.parallel())))
            .get();
}

詳細については、Reactive Streams サポートを参照してください。

DSL およびエンドポイントの構成

すべての IntegrationFlowBuilder EIP メソッドには、AbstractEndpoint インスタンスのオプションを提供するためにラムダパラメーターを適用するバリアントがあります: SmartLifecyclePollerMetadatarequest-handler-advice-chain など。それぞれに汎用引数があるため、次の例に示すように、コンテキストでエンドポイントとその MessageHandler を構成できます。

@Bean
public IntegrationFlow flow2() {
    return IntegrationFlows.from(this.inputChannel)
                .transform(new PayloadSerializingTransformer(),
                       c -> c.autoStartup(false).id("payloadSerializingTransformer"))
                .transform((Integer p) -> p * 2, c -> c.advice(this.expressionAdvice()))
                .get();
}

さらに、EndpointSpec は id() メソッドを提供し、エンドポイント Bean を、生成された名前ではなく、指定された Bean 名で登録できるようにします。

MessageHandler が Bean として参照されている場合、.advice() メソッドが DSL 定義に存在する場合、既存の adviceChain 構成はオーバーライドされます。

@Bean
public TcpOutboundGateway tcpOut() {
    TcpOutboundGateway gateway = new TcpOutboundGateway();
    gateway.setConnectionFactory(cf());
    gateway.setAdviceChain(Collections.singletonList(fooAdvice()));
    return gateway;
}

@Bean
public IntegrationFlow clientTcpFlow() {
    return f -> f
        .handle(tcpOut(), e -> e.advice(testAdvice()))
        .transform(Transformers.objectToString());
}

つまり、それらはマージされず、testAdvice() Bean のみがこの場合に使用されます。

Transformers

DSL API は、.transform() EIP メソッド内でインラインターゲットオブジェクト定義として使用される便利で流れるような Transformers ファクトリを提供します。次の例は、その使用方法を示しています。

@Bean
public IntegrationFlow transformFlow() {
    return IntegrationFlows.from("input")
            .transform(Transformers.fromJson(MyPojo.class))
            .transform(Transformers.serializer())
            .get();
}

setter を使用した不便なコーディングを回避し、フロー定義をより簡単にします。Transformers を使用してターゲット Transformer インスタンスを @Bean インスタンスとして宣言し、再び IntegrationFlow 定義から Bean メソッドとして使用できることに注意してください。それでも、DSL オブジェクトがまだ Bean として定義されていない場合、DSL パーサーはインラインオブジェクトの Bean 宣言を処理します。

詳細およびサポートされるファクトリメソッドについては、Javadoc の Transformers (Javadoc) を参照してください。

ラムダと Message<?> 引数も参照してください。

受信チャネルアダプター

通常、メッセージフローは受信チャネルアダプター(<int-jdbc:inbound-channel-adapter> など)から始まります。アダプターは <poller> で構成されており、MessageSource<?> に定期的にメッセージを生成するように要求します。Java DSL では、MessageSource<?> から IntegrationFlow を開始することもできます。このために、IntegrationFlows ビルダーファクトリは、オーバーロードされた IntegrationFlows.from(MessageSource<?> messageSource) メソッドを提供します。MessageSource<?> を Bean として構成し、そのメソッドの引数として提供できます。IntegrationFlows.from() の 2 番目のパラメーターは、SourcePollingChannelAdapter のオプション(PollerMetadata や SmartLifecycle など)を提供できる Consumer<SourcePollingChannelAdapterSpec> ラムダです。次の例は、流れるような API とラムダを使用して IntegrationFlow を作成する方法を示しています。

@Bean
public MessageSource<Object> jdbcMessageSource() {
    return new JdbcPollingChannelAdapter(this.dataSource, "SELECT * FROM something");
}

@Bean
public IntegrationFlow pollingFlow() {
    return IntegrationFlows.from(jdbcMessageSource(),
                c -> c.poller(Pollers.fixedRate(100).maxMessagesPerPoll(1)))
            .transform(Transformers.toJson())
            .channel("furtherProcessChannel")
            .get();
}

Message オブジェクトを直接構築する必要がない場合は、java.util.function.Supplier に基づく IntegrationFlows.fromSupplier() バリアントを使用できます。Supplier.get() の結果は、自動的に Message にラップされます(まだ Message でない場合)。

メッセージルーター

Spring Integration は、次のような特殊なルーター型をネイティブで提供します。

  • HeaderValueRouter

  • PayloadTypeRouter

  • ExceptionTypeRouter

  • RecipientListRouter

  • XPathRouter

他の多くの DSL IntegrationFlowBuilder EIP メソッドと同様に、route() メソッドは、任意の AbstractMessageRouter 実装、または便宜上、SpEL 式としての Stringref-method ペアを適用できます。さらに、ラムダを使用して route() を構成し、Consumer<RouterSpec<MethodInvokingRouter>> にラムダを使用することができます。次の例に示すように、Fluent API は、channelMapping(String key, String channelName) ペアなどの AbstractMappingMessageRouter オプションも提供します。

@Bean
public IntegrationFlow routeFlowByLambda() {
    return IntegrationFlows.from("routerInput")
            .<Integer, Boolean>route(p -> p % 2 == 0,
                    m -> m.suffix("Channel")
                            .channelMapping(true, "even")
                            .channelMapping(false, "odd")
            )
            .get();
}

次の例は、単純な式ベースのルーターを示しています。

@Bean
public IntegrationFlow routeFlowByExpression() {
    return IntegrationFlows.from("routerInput")
            .route("headers['destChannel']")
            .get();
}

次の例に示すように、routeToRecipients() メソッドは Consumer<RecipientListRouterSpec> を取ります。

@Bean
public IntegrationFlow recipientListFlow() {
    return IntegrationFlows.from("recipientListInput")
            .<String, String>transform(p -> p.replaceFirst("Payload", ""))
            .routeToRecipients(r -> r
                    .recipient("thing1-channel", "'thing1' == payload")
                    .recipientMessageSelector("thing2-channel", m ->
                            m.getHeaders().containsKey("recipient")
                                    && (boolean) m.getHeaders().get("recipient"))
                    .recipientFlow("'thing1' == payload or 'thing2' == payload or 'thing3' == payload",
                            f -> f.<String, String>transform(String::toUpperCase)
                                    .channel(c -> c.queue("recipientListSubFlow1Result")))
                    .recipientFlow((String p) -> p.startsWith("thing3"),
                            f -> f.transform("Hello "::concat)
                                    .channel(c -> c.queue("recipientListSubFlow2Result")))
                    .recipientFlow(new FunctionExpression<Message<?>>(m ->
                                    "thing3".equals(m.getPayload())),
                            f -> f.channel(c -> c.queue("recipientListSubFlow3Result")))
                    .defaultOutputToParentFlow())
            .get();
}

.routeToRecipients() 定義の .defaultOutputToParentFlow() を使用すると、ルーターの defaultOutput をゲートウェイとして設定して、メインフロー内の一致しないメッセージのプロセスを続行できます。

ラムダと Message<?> 引数も参照してください。

スプリッター

スプリッターを作成するには、split() EIP メソッドを使用します。デフォルトでは、ペイロードが IterableIteratorArrayStream、リアクティブ Publisher の場合、split() メソッドは各アイテムを個別のメッセージとして出力します。ラムダ、SpEL 式、AbstractMessageSplitter 実装を受け入れます。または、パラメーターなしで使用して DefaultMessageSplitter を提供できます。次の例は、ラムダを提供して split() メソッドを使用する方法を示しています。

@Bean
public IntegrationFlow splitFlow() {
    return IntegrationFlows.from("splitInput")
              .split(s -> s.applySequence(false).delimiters(","))
              .channel(MessageChannels.executor(taskExecutor()))
              .get();
}

上記の例では、コンマ区切りの String を含むメッセージを分割するスプリッターを作成します。

ラムダと Message<?> 引数も参照してください。

アグリゲーターとリシーケンサー

Aggregator は概念的に Splitter の反対です。個々のメッセージのシーケンスを単一のメッセージに集約し、必然的に複雑になります。デフォルトでは、アグリゲーターは受信メッセージからのペイロードのコレクションを含むメッセージを返します。同じルールが Resequencer に適用されます。次の例は、スプリッターアグリゲーターパターンの標準的な例を示しています。

@Bean
public IntegrationFlow splitAggregateFlow() {
    return IntegrationFlows.from("splitAggregateInput")
            .split()
            .channel(MessageChannels.executor(this.taskExecutor()))
            .resequence()
            .aggregate()
            .get();
}

split() メソッドは、リストを個々のメッセージに分割し、ExecutorChannel に送信します。resequence() メソッドは、メッセージヘッダーにあるシーケンスの詳細でメッセージを並べ替えます。aggregate() メソッドは、これらのメッセージを収集します。

ただし、リリース戦略と相関戦略などを指定することにより、デフォルトの動作を変更できます。次の例を考えてみましょう。

.aggregate(a ->
        a.correlationStrategy(m -> m.getHeaders().get("myCorrelationKey"))
            .releaseStrategy(g -> g.size() > 10)
            .messageStore(messageStore()))

上記の例は、myCorrelationKey ヘッダーを持つメッセージを関連付け、少なくとも 10 が蓄積されるとメッセージを解放します。

同様のラムダ構成が resequence() EIP メソッドに提供されています。

サービスアクティベーターと .handle() メソッド

.handle() EIP メソッドのゴールは、MessageHandler 実装または POJO 上のメソッドを呼び出すことです。別のオプションは、ラムダ式を使用して「アクティビティ」を定義することです。その結果、一般的な GenericHandler<P> 関数インターフェースを導入しました。handle メソッドには、P payload と MessageHeaders headers (バージョン 5.1 以降)の 2 つの引数が必要です。それができたら、次のようにフローを定義できます。

@Bean
public IntegrationFlow myFlow() {
    return IntegrationFlows.from("flow3Input")
        .<Integer>handle((p, h) -> p * 2)
        .get();
}

上記の例は、受け取った整数を 2 倍にします。

ただし、Spring Integration の主なゴールの 1 つは、メッセージペイロードからメッセージハンドラーのターゲット引数へのランタイム型変換を介した loose coupling です。Java はラムダクラスのジェネリクス型解決をサポートしていないため、ほとんどの EIP メソッドと LambdaMessageProcessor に追加の payloadType 引数を使用した回避策を導入しました。そうすることで、ハード変換作業を Spring の ConversionService に委譲します。payloadType は、提供された type とリクエストされたメッセージを使用してメソッド引数をターゲットにします。次の例は、結果の IntegrationFlow がどのようになるかを示しています。

@Bean
public IntegrationFlow integerFlow() {
    return IntegrationFlows.from("input")
            .<byte[], String>transform(p - > new String(p, "UTF-8"))
            .handle(Integer.class, (p, h) -> p * 2)
            .get();
}

ConversionService 内に BytesToIntegerConverter を登録して、追加の .transform() を削除することもできます。

@Bean
@IntegrationConverter
public BytesToIntegerConverter bytesToIntegerConverter() {
   return new BytesToIntegerConverter();
}

@Bean
public IntegrationFlow integerFlow() {
    return IntegrationFlows.from("input")
             .handle(Integer.class, (p, h) -> p * 2)
            .get();
}

ラムダと Message<?> 引数も参照してください。

オペレーター gateway()

IntegrationFlow 定義の gateway() オペレーターは、特別なサービスアクティベーターの実装であり、入力チャネルを介して他のエンドポイントまたは統合フローを呼び出し、応答を待ちます。技術的には、<chain> 定義(チェーン内からチェーンを呼び出すを参照)のネストされた <gateway> コンポーネントと同じロールを果たし、フローをよりクリーンでわかりやすくします。論理的に、ビジネスの観点から、ターゲット統合ソリューションの異なる部分の間で機能の分散と再利用を可能にするメッセージングゲートウェイです(メッセージングゲートウェイを参照)。この演算子には、さまざまなゴールに対していくつかのオーバーロードがあります。

  • gateway(String requestChannel) は、名前でエンドポイントの入力チャネルにメッセージを送信します。

  • gateway(MessageChannel requestChannel) は、直接注入によってエンドポイントの入力チャネルにメッセージを送信します。

  • gateway(IntegrationFlow flow) は、提供された IntegrationFlow の入力チャネルにメッセージを送信します。

これらはすべて、ターゲット GatewayMessageHandler とそれぞれの AbstractEndpoint を構成するための 2 番目の Consumer<GatewayEndpointSpec> 引数を持つバリアントを持っています。また、IntegrationFlow ベースのメソッドを使用すると、既存の IntegrationFlow Bean を呼び出すか、IntegrationFlow 関数インターフェースのインプレースラムダを介してフローをサブフローとして宣言するか、private メソッドクリーナーコードスタイルで抽出することができます。

@Bean
IntegrationFlow someFlow() {
        return IntegrationFlows
                .from(...)
                .gateway(subFlow())
                .handle(...)
                .get();
}

private static IntegrationFlow subFlow() {
        return f -> f
                .scatterGather(s -> s.recipientFlow(...),
                        g -> g.outputProcessor(MessageGroup::getOne))
}
ダウンストリームフローが常に応答を返すとは限らない場合は、requestTimeout を 0 に設定して、呼び出し元のスレッドが無期限にハングしないようにする必要があります。その場合、フローはその時点で終了し、スレッドはさらなる作業のために解放されます。

オペレーター log()

便宜上、Spring Integration フロー(<logging-channel-adapter>)を介したメッセージの移動をログに記録するために、log() オペレーターが示されています。内部的には、LoggingHandler をサブスクライバーとする WireTapChannelInterceptor で表されます。受信メッセージを次のエンドポイントまたは現在のチャネルに記録するロールを果たします。次の例は、LoggingHandler の使用方法を示しています。

.filter(...)
.log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
.route(...)

上記の例では、id ヘッダーは、フィルターを通過してルーティングされる前のメッセージについてのみ、ERROR レベルで test.category に記録されます。

この演算子がフローの最後で使用される場合、これは一方向ハンドラーであり、フローは終了します。応答生成フローとして作成するには、log() の後に単純な bridge() を使用するか、バージョン 5.1 から開始して、代わりに logAndReply() 演算子を使用できます。logAndReply は、フローの最後でのみ使用できます。

オペレーター intercept()

バージョン 5.3 以降、intercept() オペレーターを使用すると、フロー内の現在の MessageChannel に 1 つ以上の ChannelInterceptor インスタンスを登録できます。これは、MessageChannels API を介して明示的な MessageChannel を作成する代わりになります。次の例では、MessageSelectingInterceptor を使用して、例外を含む特定のメッセージを拒否します。

.transform(...)
.intercept(new MessageSelectingInterceptor(m -> m.getPayload().isValid()))
.handle(...)

MessageChannelSpec.wireTap()

Spring Integration には、.wireTap() fluent API MessageChannelSpec ビルダーが含まれています。次の例は、wireTap メソッドを使用して入力を記録する方法を示しています。

@Bean
public QueueChannelSpec myChannel() {
    return MessageChannels.queue()
            .wireTap("loggingFlow.input");
}

@Bean
public IntegrationFlow loggingFlow() {
    return f -> f.log();
}

MessageChannel が InterceptableChannel のインスタンスである場合、log()wireTap()intercept() 演算子が現在の MessageChannel に適用されます。それ以外の場合は、現在構成されているエンドポイントのフローに中間 DirectChannel が挿入されます。次の例では、DirectChannel が InterceptableChannel を実装しているため、WireTap インターセプターが myChannel に直接追加されます。

@Bean
MessageChannel myChannel() {
    return new DirectChannel();
}

...
    .channel(myChannel())
    .log()
}

現在の MessageChannel が InterceptableChannel を実装しない場合、暗黙の DirectChannel と BridgeHandler が IntegrationFlow に注入され、WireTap がこの新しい DirectChannel に追加されます。次の例には、チャネル宣言がありません。

.handle(...)
.log()
}

上記の例(およびチャネルが宣言されていない場合)では、暗黙の DirectChannel が IntegrationFlow の現在の位置に挿入され、現在構成されている ServiceActivatingHandler の出力チャネルとして使用されます(前述の .handle() から)。

メッセージフローの操作

IntegrationFlowBuilder は、メッセージフローに接続された統合コンポーネントを生成するためのトップレベル API を提供します。単一のフローで統合を達成できる場合(多くの場合)、これは便利です。または、MessageChannel インスタンスを介して IntegrationFlow インスタンスを結合できます。

デフォルトでは、MessageFlow は Spring Integration の用語では「チェーン」として動作します。つまり、エンドポイントは、DirectChannel インスタンスによって自動的かつ暗黙的に接続されます。メッセージフローは実際にはチェーンとして構築されておらず、はるかに柔軟性があります。例: inputChannel 名がわかっている場合(つまり、明示的に定義している場合)、フロー内の任意のコンポーネントにメッセージを送信できます。フロー内で外部定義されたチャネルを参照して、直接チャネルの代わりにチャネルアダプターの使用を許可することもできます(リモートトランスポートプロトコル、ファイル I/O などを有効にするため)。そのため、DSL は Spring Integration chain 要素をサポートしません。この場合、多くの価値を追加しないからです。

Spring Integration Java DSL は、他の構成オプションと同じ Bean 定義モデルを生成し、既存の Spring Framework @Configuration インフラストラクチャに基づいているため、XML 定義と併用し、Spring Integration メッセージングアノテーション構成と接続できます。

ラムダを使用して、直接 IntegrationFlow インスタンスを定義することもできます。次の例は、その方法を示しています。

@Bean
public IntegrationFlow lambdaFlow() {
    return f -> f.filter("World"::equals)
                   .transform("Hello "::concat)
                   .handle(System.out::println);
}

この定義の結果は、暗黙的な直接チャネルで接続された統合コンポーネントの同じセットです。ここでの唯一の制限は、このフローが名前付き直接チャネル - lambdaFlow.input で開始されることです。また、Lambda フローは MessageSource または MessageProducer から開始できません。

バージョン 5.1 以降、この種類の IntegrationFlow はプロキシにラップされ、ライフサイクル制御を公開し、内部で関連付けられた StandardIntegrationFlow の inputChannel へのアクセスを提供します。

バージョン 5.0.6 以降、IntegrationFlow のコンポーネントに対して生成された Bean 名には、フロー Bean の後に接頭辞としてドット(.)が続きます。例: 前のサンプルの .transform("Hello "::concat) の ConsumerEndpointFactoryBean は、Bean 名が lambdaFlow.o.s.i.config.ConsumerEndpointFactoryBean#0 になります。(o.s.i は、ページに収まるように org.springframework.integration から短縮されています)そのエンドポイントの Transformer 実装 Bean は、lambdaFlow.transformer#0 (バージョン 5.1 で始まる)の Bean 名を持ち、MethodInvokingTransformer クラスの完全修飾名の代わりに、そのコンポーネント型使用されています。Bean 名をフロー内で生成する必要がある場合、すべての NamedComponent に同じパターンが適用されます。これらの生成された Bean 名には、ログの解析や分析ツールでのコンポーネントのグループ化などの目的で、また実行時に統合フローを同時に登録する際の競合状態を回避するために、フロー ID が付加されます。詳細については、動的およびランタイム統合フローを参照してください。

FunctionExpression

ラムダと generics を使用できるようにするため、FunctionExpression クラス(SpEL の Expression インターフェースの実装)を導入しました。Function<T, R> オプションは、Core Spring Integration から暗黙的な Strategy バリアントが存在する場合、expression オプションとともに DSL コンポーネントに提供されます。次の例は、関数式の使用方法を示しています。

.enrich(e -> e.requestChannel("enrichChannel")
            .requestPayload(Message::getPayload)
            .propertyFunction("date", m -> new Date()))

FunctionExpression は、SpelExpression で行われているように、ランタイム型変換もサポートしています。

サブフローのサポート

if…​else および publish-subscribe コンポーネントの一部には、サブフローを使用してロジックまたはマッピングを指定する機能があります。次の例が示すように、最も単純なサンプルは .publishSubscribeChannel() です。

@Bean
public IntegrationFlow subscribersFlow() {
    return flow -> flow
            .publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
                    .subscribe(f -> f
                            .<Integer>handle((p, h) -> p / 2)
                            .channel(c -> c.queue("subscriber1Results")))
                    .subscribe(f -> f
                            .<Integer>handle((p, h) -> p * 2)
                            .channel(c -> c.queue("subscriber2Results"))))
            .<Integer>handle((p, h) -> p * 3)
            .channel(c -> c.queue("subscriber3Results"));
}

個別の IntegrationFlow@Bean 定義を使用しても同じ結果を得ることができますが、論理構成のサブフロースタイルが役立つことを願っています。その結果、コードが短く(そして読みやすく)なることがわかりました。

バージョン 5.3 以降、BroadcastCapableChannel ベースの publishSubscribeChannel() 実装が提供され、ブローカーが支援するメッセージチャネルでサブフローサブスクライバーを構成します。たとえば、Jms.publishSubscribeChannel() のサブフローとして複数のサブスクライバーを構成できるようになりました。

@Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel() {
    return Jms.publishSubscribeChannel(jmsConnectionFactory())
                .destination("pubsub")
                .get();
}

@Bean
public IntegrationFlow pubSubFlow() {
    return f -> f
            .publishSubscribeChannel(jmsPublishSubscribeChannel(),
                    pubsub -> pubsub
                            .subscribe(subFlow -> subFlow
                                .channel(c -> c.queue("jmsPubSubBridgeChannel1")))
                            .subscribe(subFlow -> subFlow
                                .channel(c -> c.queue("jmsPubSubBridgeChannel2"))));
}

@Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel(ConnectionFactory jmsConnectionFactory) {
    return (BroadcastCapableChannel) Jms.publishSubscribeChannel(jmsConnectionFactory)
            .destination("pubsub")
            .get();
}

同様の publish-subscribe サブフロー構成は、.routeToRecipients() メソッドを提供します。

別の例は、.filter() メソッドで .discardChannel() の代わりに .discardFlow() を使用しています。

.route() には特別な注意が必要です。次の例を考えてみましょう。

@Bean
public IntegrationFlow routeFlow() {
    return f -> f
            .<Integer, Boolean>route(p -> p % 2 == 0,
                    m -> m.channelMapping("true", "evenChannel")
                            .subFlowMapping("false", sf ->
                                    sf.<Integer>handle((p, h) -> p * 3)))
            .transform(Object::toString)
            .channel(c -> c.queue("oddChannel"));
}

.channelMapping() は、通常の Router マッピングと同じように機能し続けますが、.subFlowMapping() はそのサブフローをメインフローに結び付けました。つまり、ルーターのサブフローは .route() の後にメインフローに戻ります。

場合によっては、.subFlowMapping() から既存の IntegrationFlow@Bean を参照する必要があります。次の例は、その方法を示しています。

@Bean
public IntegrationFlow splitRouteAggregate() {
    return f -> f
            .split()
            .<Integer, Boolean>route(o -> o % 2 == 0,
                    m -> m
                            .subFlowMapping(true, oddFlow())
                            .subFlowMapping(false, sf -> sf.gateway(evenFlow())))
            .aggregate();
}

@Bean
public IntegrationFlow oddFlow() {
    return f -> f.handle(m -> System.out.println("odd"));
}

@Bean
public IntegrationFlow evenFlow() {
    return f -> f.handle((p, h) -> "even");
}


この場合、そのようなサブフローから応答を受信してメインフローを続行する必要がある場合、この IntegrationFlow Bean 参照(またはその入力チャネル)は、前の例に示すように .gateway() でラップする必要があります。前の例の oddFlow() 参照は、.gateway() にラップされていません。このルーティングブランチからの応答は期待していません。そうしないと、次のような例外が発生します。

Caused by: org.springframework.beans.factory.BeanCreationException:
    The 'currentComponent' (org.springframework.integration.router.MethodInvokingRouter@7965a51c)
    is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'.
    This is the end of the integration flow.

サブフローをラムダとして構成すると、フレームワークがサブフローとのリクエストと応答の対話を処理し、ゲートウェイは必要ありません。

サブフローは任意の深さにネストできますが、ネストすることはお勧めしません。実際、ルーターの場合でも、フロー内に複雑なサブフローを追加すると、すぐにスパゲッティのプレートのように見え始め、人間が解析するのが難しくなります。

DSL がサブフロー構成をサポートする場合、構成するコンポーネントに通常チャネルが必要であり、そのサブフローが channel() 要素で始まる場合、フレームワークはコンポーネントの出力チャネルとフローの入力チャネルの間に bridge() を暗黙的に配置します。例: この filter 定義では:

.filter(p -> p instanceof String, e -> e
	.discardFlow(df -> df
                         .channel(MessageChannels.queue())
                         ...)

フレームワークは、MessageFilter.discardChannel に注入するために DirectChannel Bean を内部的に作成します。次に、サブフローをサブスクリプションのこの暗黙的なチャネルで始まる IntegrationFlow にラップし、フローで指定された channel() の前に bridge を配置します。既存の IntegrationFlow Bean を(ラムダなどのインラインサブフローの代わりに)サブフロー参照として使用する場合、フレームワークがフロー Bean から最初のチャネルを解決できるため、そのようなブリッジは必要ありません。インラインサブフローでは、入力チャネルはまだ使用できません。

プロトコルアダプターの使用

これまでに示したすべての例は、Spring Integration プログラミングモデルを使用して、DSL がメッセージングアーキテクチャをサポートする方法を示しています。ただし、実際の統合はまだ行われていません。これを行うには、HTTP、JMS、AMQP、TCP、JDBC、FTP、SMTP などを介してリモートリソースにアクセスするか、ローカルファイルシステムにアクセスする必要があります。Spring Integration は、これらすべてをサポートしています。理想的には、DSL はそれらすべてに対してファーストクラスのサポートを提供する必要がありますが、これらすべてを実装し、Spring Integration に新しいアダプターが追加されるのに遅れを取らないようにするのは困難な作業です。DSL が Spring Integration に継続的に追いつくことが期待されます。

そのため、プロトコル固有のメッセージングをシームレスに定義するための高レベル API を提供します。これは、ファクトリパターンとビルダーパターン、およびラムダを使用して行います。ファクトリクラスは、具体的なプロトコル固有の Spring Integration モジュールのコンポーネントの XML 名前空間と同じロールを果たすため、「名前空間ファクトリ」と考えることができます。現在、Spring Integration JavaDSL は AmqpFeedJmsFiles(S)FtpHttpJPAMongoDbTCP/UDPMailWebFluxScripts 名前空間ファクトリをサポートしています。次の例は、そのうちの 3 つ(AmqpJmsMail)の使用方法を示しています。

@Bean
public IntegrationFlow amqpFlow() {
    return IntegrationFlows.from(Amqp.inboundGateway(this.rabbitConnectionFactory, queue()))
            .transform("hello "::concat)
            .transform(String.class, String::toUpperCase)
            .get();
}

@Bean
public IntegrationFlow jmsOutboundGatewayFlow() {
    return IntegrationFlows.from("jmsOutboundGatewayChannel")
            .handle(Jms.outboundGateway(this.jmsConnectionFactory)
                        .replyContainer(c ->
                                    c.concurrentConsumers(3)
                                            .sessionTransacted(true))
                        .requestDestination("jmsPipelineTest"))
            .get();
}

@Bean
public IntegrationFlow sendMailFlow() {
    return IntegrationFlows.from("sendMailChannel")
            .handle(Mail.outboundAdapter("localhost")
                            .port(smtpPort)
                            .credentials("user", "pw")
                            .protocol("smtp")
                            .javaMailProperties(p -> p.put("mail.debug", "true")),
                    e -> e.id("sendMailEndpoint"))
            .get();
}

上記の例は、「名前空間ファクトリ」をインラインアダプター宣言として使用する方法を示しています。ただし、@Bean 定義からこれらを使用して、IntegrationFlow メソッドチェーンをより読みやすくすることができます。

他の人に努力する前に、これらの名前空間ファクトリに関するコミュニティのフィードバックを募集しています。また、次にサポートする必要のあるアダプターとゲートウェイの優先順位付けに関する意見も歓迎します。

このリファレンスマニュアル全体のプロトコル固有の章で、さらに多くの Java DSL サンプルを見つけることができます。

他のすべてのプロトコルチャネルアダプターは、次の例に示すように、汎用 Bean として構成され、IntegrationFlow に接続されます。

@Bean
public QueueChannelSpec wrongMessagesChannel() {
    return MessageChannels
            .queue()
            .wireTap("wrongMessagesWireTapChannel");
}

@Bean
public IntegrationFlow xpathFlow(MessageChannel wrongMessagesChannel) {
    return IntegrationFlows.from("inputChannel")
            .filter(new StringValueTestXPathMessageSelector("namespace-uri(/*)", "my:namespace"),
                    e -> e.discardChannel(wrongMessagesChannel))
            .log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
            .route(xpathRouter(wrongMessagesChannel))
            .get();
}

@Bean
public AbstractMappingMessageRouter xpathRouter(MessageChannel wrongMessagesChannel) {
    XPathRouter router = new XPathRouter("local-name(/*)");
    router.setEvaluateAsString(true);
    router.setResolutionRequired(false);
    router.setDefaultOutputChannel(wrongMessagesChannel);
    router.setChannelMapping("Tags", "splittingChannel");
    router.setChannelMapping("Tag", "receivedChannel");
    return router;
}

IntegrationFlowAdapter

次の例に示すように、IntegrationFlow インターフェースを直接実装し、スキャン用のコンポーネントとして指定できます。

@Component
public class MyFlow implements IntegrationFlow {

    @Override
    public void configure(IntegrationFlowDefinition<?> f) {
        f.<String, String>transform(String::toUpperCase);
    }

}

IntegrationFlowBeanPostProcessor によってピックアップされ、アプリケーションコンテキストで正しく解析および登録されます。

便宜上、疎結合アーキテクチャの利点を得るために、IntegrationFlowAdapter 基本クラスの実装を提供しています。次の例に示すように、from() メソッドのいずれかを使用して IntegrationFlowDefinition を生成するには、buildFlow() メソッドの実装が必要です。

@Component
public class MyFlowAdapter extends IntegrationFlowAdapter {

    private final AtomicBoolean invoked = new AtomicBoolean();

    public Date nextExecutionTime(TriggerContext triggerContext) {
          return this.invoked.getAndSet(true) ? null : new Date();
    }

    @Override
    protected IntegrationFlowDefinition<?> buildFlow() {
        return from(this::messageSource,
                      e -> e.poller(p -> p.trigger(this::nextExecutionTime)))
                 .split(this)
                 .transform(this)
                 .aggregate(a -> a.processor(this, null), null)
                 .enrichHeaders(Collections.singletonMap("thing1", "THING1"))
                 .filter(this)
                 .handle(this)
                 .channel(c -> c.queue("myFlowAdapterOutput"));
    }

    public String messageSource() {
         return "T,H,I,N,G,2";
    }

    @Splitter
    public String[] split(String payload) {
         return StringUtils.commaDelimitedListToStringArray(payload);
    }

    @Transformer
    public String transform(String payload) {
         return payload.toLowerCase();
    }

    @Aggregator
    public String aggregate(List<String> payloads) {
           return payloads.stream().collect(Collectors.joining());
    }

    @Filter
    public boolean filter(@Header Optional<String> thing1) {
            return thing1.isPresent();
    }

    @ServiceActivator
    public String handle(String payload, @Header String thing1) {
           return payload + ":" + thing1;
    }

}

動的およびランタイム統合フロー

IntegrationFlow およびそのすべての依存コンポーネントは、実行時に登録できます。バージョン 5.0 より前は、BeanFactory.registerSingleton() フックを使用していました。Spring Framework 5.0 から開始して、プログラムによる BeanDefinition 登録に instanceSupplier フックを使用します。次の例は、Bean をプログラムで登録する方法を示しています。

BeanDefinition beanDefinition =
         BeanDefinitionBuilder.genericBeanDefinition((Class<Object>) bean.getClass(), () -> bean)
               .getRawBeanDefinition();

((BeanDefinitionRegistry) this.beanFactory).registerBeanDefinition(beanName, beanDefinition);

前の例では、instanceSupplier フックが genericBeanDefinition メソッドの最後のパラメーターであり、この場合はラムダによって提供されることに注意してください。

必要な Bean の初期化とライフサイクルはすべて、標準のコンテキスト構成 Bean 定義と同様に自動的に行われます。

開発エクスペリエンスを簡素化するために、Spring Integration は IntegrationFlowContext を導入して、次の例に示すように、実行時に IntegrationFlow インスタンスを登録および管理します。

@Autowired
private AbstractServerConnectionFactory server1;

@Autowired
private IntegrationFlowContext flowContext;

...

@Test
public void testTcpGateways() {
    TestingUtilities.waitListening(this.server1, null);

    IntegrationFlow flow = f -> f
            .handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort())
                    .serializer(TcpCodecs.crlf())
                    .deserializer(TcpCodecs.lengthHeader1())
                    .id("client1"))
                .remoteTimeout(m -> 5000))
            .transform(Transformers.objectToString());

    IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();
    assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO"));
}

これは、複数の構成オプションがあり、同様のフローの複数のインスタンスを作成する必要がある場合に役立ちます。そのために、オプションを繰り返して、IntegrationFlow インスタンスをループ内で作成および登録できます。もう 1 つの変形は、データのソースが Spring ベースではなく、その場で作成する必要がある場合です。このようなサンプルは、次の例が示すように、Reactive Streams イベントソースです。

Flux<Message<?>> messageFlux =
    Flux.just("1,2,3,4")
        .map(v -> v.split(","))
        .flatMapIterable(Arrays::asList)
        .map(Integer::parseInt)
        .map(GenericMessage<Integer>::new);

QueueChannel resultChannel = new QueueChannel();

IntegrationFlow integrationFlow =
    IntegrationFlows.from(messageFlux)
        .<Integer, Integer>transform(p -> p * 2)
        .channel(resultChannel)
        .get();

this.integrationFlowContext.registration(integrationFlow)
            .register();

IntegrationFlowContext.registration() の結果として) IntegrationFlowRegistrationBuilder を使用して、IntegrationFlow の Bean 名を指定し、登録、autoStartup の制御、非 Spring Integration Bean の登録を行うことができます。通常、これらの追加 Bean は、接続ファクトリ(AMQP、JMS、(S)FTP、TCP/UDP など)、シリアライザーとデシリアライザー、その他の必要なサポートコンポーネントです。

IntegrationFlowRegistration.destroy() コールバックを使用して、動的に登録された IntegrationFlow およびその依存 Bean が不要になったときに削除できます。詳細については、IntegrationFlowContext Javadoc を参照してください。

バージョン 5.0.6 以降、IntegrationFlow 定義で生成されたすべての Bean 名の先頭にフロー ID が付加されます。常に明示的なフロー ID を指定することをお勧めします。それ以外の場合、IntegrationFlowContext で同期バリアが開始され、IntegrationFlow の Bean 名が生成され、その Bean が登録されます。生成された同じ Bean 名が異なる IntegrationFlow インスタンスに使用される可能性がある場合、競合状態を回避するために、これら 2 つの操作を同期します。

また、バージョン 5.0.6 以降、登録ビルダー API には新しいメソッド useFlowIdAsPrefix() があります。これは、次の例に示すように、同じフローの複数のインスタンスを宣言し、フロー内のコンポーネントの ID が同じ場合に Bean 名の衝突を回避する場合に役立ちます。

private void registerFlows() {
    IntegrationFlowRegistration flow1 =
              this.flowContext.registration(buildFlow(1234))
                    .id("tcp1")
                    .useFlowIdAsPrefix()
                    .register();

    IntegrationFlowRegistration flow2 =
              this.flowContext.registration(buildFlow(1235))
                    .id("tcp2")
                    .useFlowIdAsPrefix()
                    .register();
}

private IntegrationFlow buildFlow(int port) {
    return f -> f
            .handle(Tcp.outboundGateway(Tcp.netClient("localhost", port)
                    .serializer(TcpCodecs.crlf())
                    .deserializer(TcpCodecs.lengthHeader1())
                    .id("client"))
                .remoteTimeout(m -> 5000))
            .transform(Transformers.objectToString());
}

この場合、最初のフローのメッセージハンドラーは、tcp1.client.handler という名前の Bean で参照できます。

useFlowIdAsPrefix() を使用する場合は、id 属性が必要です。

ゲートウェイとしての IntegrationFlow 

IntegrationFlow は、次の例に示すように、GatewayProxyFactoryBean コンポーネントを提供するサービスインターフェースから開始できます。

public interface ControlBusGateway {

    void send(String command);
}

...

@Bean
public IntegrationFlow controlBusFlow() {
    return IntegrationFlows.from(ControlBusGateway.class)
            .controlBus()
            .get();
}

インターフェースメソッドのすべてのプロキシには、IntegrationFlow の次の統合コンポーネントにメッセージを送信するためのチャネルが提供されます。@MessagingGateway アノテーションを使用してサービスインターフェースをマークし、@Gateway アノテーションを使用してメソッドをマークできます。それでも、requestChannel は無視され、IntegrationFlow の次のコンポーネントの内部チャネルでオーバーライドされます。そうでない場合、IntegrationFlow を使用してこのような構成を作成することは意味がありません。

デフォルトでは、GatewayProxyFactoryBean は [FLOW_BEAN_NAME.gateway] などの従来の Bean 名を取得します。その ID は、@MessagingGateway.name() 属性またはオーバーロードされた IntegrationFlows.from(Class<?> serviceInterface, Consumer<GatewayProxySpec> endpointConfigurer) ファクトリメソッドを使用して変更できます。また、インターフェースの @MessagingGateway アノテーションのすべての属性がターゲット GatewayProxyFactoryBean に適用されます。アノテーション構成が適用されない場合、Consumer<GatewayProxySpec> バリアントを使用して、ターゲットプロキシに適切なオプションを提供できます。この DSL メソッドは、バージョン 5.2 以降で使用できます。

Java 8 では、次の例に示すように、java.util.function インターフェースを使用して統合ゲートウェイを作成することもできます。

@Bean
public IntegrationFlow errorRecovererFlow() {
    return IntegrationFlows.from(Function.class, (gateway) -> gateway.beanName("errorRecovererFunction"))
            .handle((GenericHandler<?>) (p, h) -> {
                throw new RuntimeException("intentional");
            }, e -> e.advice(retryAdvice()))
            .get();
}

その errorRecovererFlow は次のように使用できます。

@Autowired
@Qualifier("errorRecovererFunction")
private Function<String, String> errorRecovererFlowGateway;

DSL 拡張

バージョン 5.3 以降、IntegrationFlowExtension が導入され、カスタムまたは構成された EIP オペレーターで既存の Java DSL を拡張できるようになりました。必要なのは、IntegrationFlow Bean 定義で使用できるメソッドを提供するこのクラスの拡張だけです。拡張クラスは、カスタム IntegrationComponentSpec 構成にも使用できます。たとえば、既存の IntegrationComponentSpec 拡張機能で、欠落またはデフォルトのオプションを実装できます。以下のサンプルは、複合カスタムオペレーターと、デフォルトのカスタム outputProcessor に対する AggregatorSpec 拡張の使用箇所を示しています。

public class CustomIntegrationFlowDefinition
        extends IntegrationFlowExtension<CustomIntegrationFlowDefinition> {

    public CustomIntegrationFlowDefinition upperCaseAfterSplit() {
        return split()
                .transform("payload.toUpperCase()");
    }

    public CustomIntegrationFlowDefinition customAggregate(Consumer<CustomAggregatorSpec> aggregator) {
        return register(new CustomAggregatorSpec(), aggregator);
    }

}

public class CustomAggregatorSpec extends AggregatorSpec {

    CustomAggregatorSpec() {
        outputProcessor(group ->
                group.getMessages()
                        .stream()
                        .map(Message::getPayload)
                        .map(String.class::cast)
                        .collect(Collectors.joining(", ")));
    }

}

メソッドチェーンフローの場合、これらの拡張機能の新しい DSL オペレーターは拡張機能クラスを返す必要があります。このようにして、ターゲット IntegrationFlow 定義は、新規および既存の DSL オペレーターで機能します。

@Bean
public IntegrationFlow customFlowDefinition() {
    return
            new CustomIntegrationFlowDefinition()
                    .log()
                    .upperCaseAfterSplit()
                    .channel("innerChannel")
                    .customAggregate(customAggregatorSpec ->
                            customAggregatorSpec.expireGroupsUponCompletion(true))
                    .logAndReply();
}

統合フローの構成

Spring Integration の第一級オブジェクトとしての MessageChannel 抽象化では、統合フローの構成が常に想定されていました。フロー内の任意のエンドポイントの入力チャネルを使用して、このチャネルを出力として持つエンドポイントからだけでなく、他の任意のエンドポイントからメッセージを送信できます。さらに、@MessagingGateway 契約、Content Enricher コンポーネント、<chain> などの複合エンドポイント、IntegrationFlow Bean(IntegrationFlowAdapter など)を使用すると、ビジネスロジックをより短く再利用可能なパーツに分散するのに十分簡単です。最終的な構成に必要なのは、送信または受信する MessageChannel に関する知識だけです。

バージョン 5.5.4 以降、MessageChannel からさらに抽象化し、実装の詳細をエンドユーザーから隠すために、IntegrationFlows は from(IntegrationFlow) ファクトリメソッドを導入して、既存のフローの出力から現在の IntegrationFlow を開始できるようにします。

@Bean
IntegrationFlow templateSourceFlow() {
    return IntegrationFlows.fromSupplier(() -> "test data")
            .channel("sourceChannel")
            .get();
}

@Bean
IntegrationFlow compositionMainFlow(IntegrationFlow templateSourceFlow) {
    return IntegrationFlows.from(templateSourceFlow)
            .<String, String>transform(String::toUpperCase)
            .channel(c -> c.queue("compositionMainFlowResult"))
            .get();
}

一方、IntegrationFlowDefinition には、他のフローの入力チャネルでカレントフローを継続するための to(IntegrationFlow) ターミナルオペレーターが追加されています。

@Bean
IntegrationFlow mainFlow(IntegrationFlow otherFlow) {
    return f -> f
            .<String, String>transform(String::toUpperCase)
            .to(otherFlow);
}

@Bean
IntegrationFlow otherFlow() {
    return f -> f
            .<String, String>transform(p -> p + " from other flow")
            .channel(c -> c.queue("otherFlowResultChannel"));
}

フローの途中での構成は、既存の gateway(IntegrationFlow) EIP メソッドを使用して簡単に実現できます。このように、より単純で再利用可能な論理ブロックからフローを構成することにより、複雑なフローを構築できます。例: IntegrationFlow Bean のライブラリを依存関係として追加することができ、それらの構成クラスを最終プロジェクトにインポートして、IntegrationFlow 定義用にオートワイヤーするだけで十分です。