動的およびランタイム統合フロー

IntegrationFlow およびそのすべての依存コンポーネントは、実行時に登録できます。バージョン 5.0 より前は、BeanFactory.registerSingleton() フックを使用していました。Spring Framework 5.0 から開始して、プログラムによる BeanDefinition 登録に instanceSupplier フックを使用します。次の例は、Bean をプログラムで登録する方法を示しています。

BeanDefinition beanDefinition =
         BeanDefinitionBuilder.genericBeanDefinition((Class<Object>) bean.getClass(), () -> bean)
               .getRawBeanDefinition();

((BeanDefinitionRegistry) this.beanFactory).registerBeanDefinition(beanName, beanDefinition);

前の例では、instanceSupplier フックが genericBeanDefinition メソッドの最後のパラメーターであり、この場合はラムダによって提供されることに注意してください。

必要な Bean の初期化とライフサイクルはすべて、標準のコンテキスト構成 Bean 定義と同様に自動的に行われます。

開発エクスペリエンスを簡素化するために、Spring Integration は IntegrationFlowContext を導入して、次の例に示すように、実行時に IntegrationFlow インスタンスを登録および管理します。

@Autowired
private AbstractServerConnectionFactory server1;

@Autowired
private IntegrationFlowContext flowContext;

...

@Test
public void testTcpGateways() {
    TestingUtilities.waitListening(this.server1, null);

    IntegrationFlow flow = f -> f
            .handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort())
                    .serializer(TcpCodecs.crlf())
                    .deserializer(TcpCodecs.lengthHeader1())
                    .id("client1"))
                .remoteTimeout(m -> 5000))
            .transform(Transformers.objectToString());

    IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();
    assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO"));
}

これは、複数の構成オプションがあり、同様のフローの複数のインスタンスを作成する必要がある場合に役立ちます。これを行うには、オプションを反復し、ループ内で IntegrationFlow インスタンスを作成して登録します。もう 1 つのバリエーションは、データのソースが Spring ベースではない場合で、その場で作成する必要があります。次の例に示すように、そのようなサンプルは Reactive Streams イベントソースです。

Flux<Message<?>> messageFlux =
    Flux.just("1,2,3,4")
        .map(v -> v.split(","))
        .flatMapIterable(Arrays::asList)
        .map(Integer::parseInt)
        .map(GenericMessage<Integer>::new);

QueueChannel resultChannel = new QueueChannel();

IntegrationFlow integrationFlow =
    IntegrationFlow.from(messageFlux)
        .<Integer, Integer>transform(p -> p * 2)
        .channel(resultChannel)
        .get();

this.integrationFlowContext.registration(integrationFlow)
            .register();

IntegrationFlowContext.registration() の結果として) IntegrationFlowRegistrationBuilder を使用して、IntegrationFlow の Bean 名を指定し、登録、autoStartup の制御、非 Spring Integration Bean の登録を行うことができます。通常、これらの追加 Bean は、接続ファクトリ(AMQP、JMS、(S)FTP、TCP/UDP など)、シリアライザーとデシリアライザー、その他の必要なサポートコンポーネントです。

IntegrationFlowRegistration.destroy() コールバックを使用して、動的に登録された IntegrationFlow およびその依存 Bean が不要になったときに削除できます。詳細については、IntegrationFlowContext Javadoc を参照してください。

バージョン 5.0.6 以降、IntegrationFlow 定義で生成されたすべての Bean 名の先頭にフロー ID が付加されます。常に明示的なフロー ID を指定することをお勧めします。それ以外の場合、IntegrationFlowContext で同期バリアが開始され、IntegrationFlow の Bean 名が生成され、その Bean が登録されます。生成された同じ Bean 名が異なる IntegrationFlow インスタンスに使用される可能性がある場合、競合状態を回避するために、これら 2 つの操作を同期します。

また、バージョン 5.0.6 以降、登録ビルダー API には新しいメソッド useFlowIdAsPrefix() があります。これは、次の例に示すように、同じフローの複数のインスタンスを宣言し、フロー内のコンポーネントの ID が同じ場合に Bean 名の衝突を回避する場合に役立ちます。

private void registerFlows() {
    IntegrationFlowRegistration flow1 =
              this.flowContext.registration(buildFlow(1234))
                    .id("tcp1")
                    .useFlowIdAsPrefix()
                    .register();

    IntegrationFlowRegistration flow2 =
              this.flowContext.registration(buildFlow(1235))
                    .id("tcp2")
                    .useFlowIdAsPrefix()
                    .register();
}

private IntegrationFlow buildFlow(int port) {
    return f -> f
            .handle(Tcp.outboundGateway(Tcp.netClient("localhost", port)
                    .serializer(TcpCodecs.crlf())
                    .deserializer(TcpCodecs.lengthHeader1())
                    .id("client"))
                .remoteTimeout(m -> 5000))
            .transform(Transformers.objectToString());
}

この場合、最初のフローのメッセージハンドラーは、tcp1.client.handler という名前の Bean で参照できます。

useFlowIdAsPrefix() を使用する場合は、id 属性が必要です。