このバージョンはまだ開発中であり、まだ安定しているとは考えられていません。最新の安定バージョンについては、spring-cloud-function 4.3.0 を使用してください。

Spring Integration インタラクション

Spring Integration フレームワークは、よく知られたエンタープライズ統合パターンをサポートするために Spring プログラミングモデルを継承します。Spring ベースのアプリケーション内で軽量のメッセージングを可能にし、宣言アダプターを介した外部システムとの統合をサポートします。また、さまざまな操作 (エンドポイント) を論理統合フローに組み込むための高レベル DSL も提供します。この DSL 構成のラムダスタイルにより、Spring Integration はすでに十分なレベルの java.util.function インターフェースを採用しています。@MessagingGateway プロキシインターフェースは、Function または Consumer としても使用でき、Spring Cloud Function 環境に従って機能カタログに登録できます。関数のサポートについては、Spring Integration ReferenceManual の詳細を参照してください。

一方、バージョン 4.0.3 以降、Spring Cloud Function には、Spring Integration DSL の観点から FunctionCatalog と対話するための、より深くクラウド固有の自動構成ベースの API を提供する spring-cloud-function-integration モジュールが導入されています。FunctionFlowBuilder は自動構成され、FunctionCatalog とオートワイヤーされ、ターゲット IntegrationFlow インスタンスの機能固有の DSL のエントリポイントを表します。(便宜上) 標準の IntegrationFlow.from() ファクトリに加えて、FunctionFlowBuilder は、提供された FunctionCatalog 内のターゲット Supplier を検索するための fromSupplier(String supplierDefinition) ファクトリを公開します。そして、この FunctionFlowBuilder は FunctionFlowDefinition につながります。この FunctionFlowDefinition は IntegrationFlowExtension の実装であり、apply(String functionDefinition) および accept(String consumerDefinition) 演算子を公開して、それぞれ FunctionCatalog から Function または Consumer を検索します。詳細については、Javadoc を参照してください。

次の例は、FunctionFlowBuilder の動作と残りの IntegrationFlow API の機能を示しています。

@Configuration
public class IntegrationConfiguration {

    @Bean
    Supplier<byte[]> simpleByteArraySupplier() {
        return "simple test data"::getBytes;
    }

    @Bean
    Function<String, String> upperCaseFunction() {
        return String::toUpperCase;
    }

    @Bean
    BlockingQueue<String> results() {
        return new LinkedBlockingQueue<>();
    }

    @Bean
    Consumer<String> simpleStringConsumer(BlockingQueue<String> results) {
        return results::add;
    }

    @Bean
    QueueChannel wireTapChannel() {
        return new QueueChannel();
    }

    @Bean
    IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
        return functionFlowBuilder
                .fromSupplier("simpleByteArraySupplier")
                .wireTap("wireTapChannel")
                .apply("upperCaseFunction")
                .log(LoggingHandler.Level.WARN)
                .accept("simpleStringConsumer");
    }

}

FunctionCatalog.lookup() 機能は単純な関数名だけに限定されないため、前述の apply() および accept() 演算子でも関数合成機能を使用できます。

@Bean
IntegrationFlow functionCompositionFlow(FunctionFlowBuilder functionFlowBuilder) {
    return functionFlowBuilder
            .from("functionCompositionInput")
            .accept("upperCaseFunction|simpleStringConsumer");
}

Spring Cloud アプリケーションに事前定義された関数の自動構成の依存関係を追加すると、この API の関連性がさらに高まります。たとえば、ストリームアプリケーションプロジェクトは、アプリケーションイメージに加えて、さまざまな統合ユースケース向けの機能を備えたアーティファクトを提供します。debezium-supplierelasticsearch-consumeraggregator-function など

次の構成は、それぞれ http-supplierspel-functionfile-consumer に基づいています。

@Bean
IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
    return functionFlowBuilder
            .fromSupplier("httpSupplier", e -> e.poller(Pollers.trigger(new OnlyOnceTrigger())))
            .<Flux<?>>handle((fluxPayload, headers) -> fluxPayload, e -> e.async(true))
            .channel(c -> c.flux())
            .apply("spelFunction")
            .<String, String>transform(String::toUpperCase)
            .accept("fileConsumer");
}

他に必要なのは、(必要に応じて) 設定を application.properties に追加することだけです。

http.path-pattern=/testPath
spel.function.expression=new String(payload)
file.consumer.name=test-data.txt