メッセージの生成と消費

関数を記述して @Bean として公開するだけで、Spring Cloud Stream アプリケーションを記述できます。Spring Integration アノテーションベースの構成または Spring Cloud Stream アノテーションベースの構成を使用することもできますが、spring-cloud-stream 3.x 以降では、関数実装を使用することをお勧めします。

Spring Cloud Function サポート

概要

Spring Cloud Stream v2.1 以降、ストリームハンドラーソースを定義する別の方法として、Spring Cloud Function (英語) の組み込みサポートを使用して、java.util.function.[Supplier/Function/Consumer] 型の Bean として表現することもできます。

バインディングによって公開される外部宛先にバインドする関数 Bean を指定するには、spring.cloud.function.definition プロパティを指定する必要があります。

型 java.util.function.[Supplier/Function/Consumer] の Bean が 1 つしかない場合は、関数 Bean が自動検出されるため、spring.cloud.function.definition プロパティをスキップできます。ただし、混乱を避けるために、このようなプロパティを使用することをお勧めします。型 java.util.function.[Supplier/Function/Consumer] の単一の Bean は、メッセージの処理以外の目的で存在する可能性があるため、この自動検出が邪魔になる場合がありますが、単一であると自動検出され、自動バインドされます。これらのまれなシナリオでは、false に設定された値を spring.cloud.stream.function.autodetect プロパティに提供することにより、自動検出を無効にすることができます。

これは、メッセージハンドラーを java.util.function.Function として公開するアプリケーションの例であり、データのコンシューマーおよびプロデューサーとして機能することにより、パススルーセマンティクスを効果的にサポートします。

@SpringBootApplication
public class MyFunctionBootApp {

	public static void main(String[] args) {
		SpringApplication.run(MyFunctionBootApp.class);
	}

	@Bean
	public Function<String, String> toUpperCase() {
		return s -> s.toUpperCase();
	}
}

前の例では、toUpperCase と呼ばれる型 java.util.function.Function の Bean を定義して、指定された宛先バインダーによって公開される外部宛先に 'input' と 'output' をバインドする必要があるメッセージハンドラーとして動作させます。デフォルトでは、'input' と 'output' のバインディング名は toUpperCase-in-0 と toUpperCase-out-0 になります。バインディング名を確立するために使用される命名規則の詳細については、関数バインディング名セクションを参照してください。

以下は、他のセマンティクスをサポートするための単純な関数アプリケーションの例です。

これは、java.util.function.Supplier として公開されているソースセマンティクスの例です。

@SpringBootApplication
public static class SourceFromSupplier {

	@Bean
	public Supplier<Date> date() {
		return () -> new Date(12345L);
	}
}

これは、java.util.function.Consumer として公開されているシンクセマンティクスの例です。

@SpringBootApplication
public static class SinkFromConsumer {

	@Bean
	public Consumer<String> sink() {
		return System.out::println;
	}
}

サプライヤー (ソース)

Function と Consumer は、呼び出しがどのようにトリガーされるかに関しては非常に簡単です。それらは、バインドされている宛先に送信されたデータ(イベント)に基づいてトリガーされます。言い換えれば、それらは典型的なイベント駆動型コンポーネントです。

ただし、トリガーに関しては、Supplier は独自のカテゴリにあります。定義上、データのソース(発信元)であるため、受信の宛先にサブスクライブせず、他のメカニズムによってトリガーされる必要があります。Supplier の実装についても問題があります。これは、必須またはリアクティブ的であり、そのようなサプライヤーのトリガーに直接関係します。

次のサンプルについて考えてみます。

@SpringBootApplication
public static class SupplierConfiguration {

	@Bean
	public Supplier<String> stringSupplier() {
		return () -> "Hello from Supplier";
	}
}

上記の Supplier Bean は、get() メソッドが呼び出されるたびに文字列を生成します。ただし、このメソッドを呼び出すのは誰で、どのくらいの頻度ですか? フレームワークは、サプライヤーの呼び出しをトリガーするデフォルトのポーリングメカニズム(「誰?」の質問に答える)を提供し、デフォルトでは毎秒(「どのくらいの頻度で?」の質問に答える)をトリガーします。言い換えると、上記の構成では、毎秒 1 つのメッセージが生成され、各メッセージは、バインダーによって公開される output 宛先に送信されます。ポーリングメカニズムをカスタマイズする方法については、ポーリング構成プロパティセクションを参照してください。

別の例を考えてみましょう。

@SpringBootApplication
public static class SupplierConfiguration {

    @Bean
    public Supplier<Flux<String>> stringSupplier() {
        return () -> Flux.fromStream(Stream.generate(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    Thread.sleep(1000);
                    return "Hello from Supplier";
                } catch (Exception e) {
                    // ignore
                }
            }
        })).subscribeOn(Schedulers.elastic()).share();
    }
}

前述の Supplier Bean は、リアクティブプログラミングスタイルを採用しています。通常、命令型サプライヤーとは異なり、get() メソッドの呼び出しにより、個々のメッセージではなくメッセージの連続ストリームが生成(サプライ)されるため、トリガーは 1 回だけにする必要があります。

フレームワークはプログラミングスタイルの違いを認識し、そのようなサプライヤーが 1 回だけトリガーされることを保証します。

ただし、あるデータソースをポーリングして、結果セットを表すデータの有限ストリームを返したいユースケースを想像してみてください。リアクティブプログラミングスタイルは、そのようなサプライヤーにとって完璧なメカニズムです。ただし、生成されるストリームの性質が有限であるため、このようなサプライヤーは定期的に呼び出す必要があります。

有限のデータストリームを生成することでこのようなユースケースをエミュレートする次のサンプルについて考えてみます。

@SpringBootApplication
public static class SupplierConfiguration {

	@PollableBean
	public Supplier<Flux<String>> stringSupplier() {
		return () -> Flux.just("hello", "bye");
	}
}

Bean 自体は PollableBean アノテーション(@Bean のサブセット)でアノテーションが付けられているため、このようなサプライヤーの実装はリアクティブですが、ポーリングする必要があることをフレームワークに通知します。

PollableBean には、このアノテーションのポストプロセッサーに、アノテーション付きコンポーネントによって生成された結果を分割する必要があることを通知する splittable 属性が定義されており、デフォルトでは true に設定されています。つまり、フレームワークは、返される各項目を個別のメッセージとして送信して分割します。この動作が望ましくない場合は、false に設定できます。その場合、このようなサプライヤーは、生成された Flux を分割せずにそのまま返します。

サプライヤーとスレッド

これまでに学習したように、イベントによってトリガーされる Function や Consumer とは異なり(入力データがあります)、Supplier には入力がないため、別のメカニズムであるポーラーによってトリガーされます。ポーラーは、予測できないスレッドメカニズムを持っている可能性があります。また、ほとんどの場合、スレッドメカニズムの詳細は関数のダウンストリーム実行とは関係ありませんが、特定の場合、特にスレッドの親和性に一定の期待がある統合フレームワークでは問題が発生する可能性があります。例: スレッドローカルに格納されているトレースデータに依存する Spring Cloud Sleuth。このような場合、StreamBridge を介した別のメカニズムがあり、ユーザーはスレッドメカニズムをより細かく制御できます。詳細については、任意のデータを出力に送信する (たとえば外国のイベント駆動型ソース) セクションを参照してください。

コンシューマー (リアクティブ)

リアクティブ Consumer は、戻り値の型が void であり、サブスクライブする参照がないフレームワークを残すため、少し特別です。ほとんどの場合、Consumer<Flux<?>> を記述する必要はなく、代わりに、ストリームの最後の演算子として then 演算子を呼び出す Function<Flux<?>, Mono<Void>> として記述します。

例:

public Function<Flux<?>, Mono<Void>> consumer() {
	return flux -> flux.map(..).filter(..).then();
}

ただし、明示的に Consumer<Flux<?>> を記述する必要がある場合は、受信 Flux をサブスクライブすることを忘れないでください。

また、リアクティブ関数と命令関数を混合する場合、関数の合成にも同じルールが適用されることに注意してください。Spring Cloud Function は確かに命令型のリアクティブ関数の作成をサポートしていますが、特定の制限に注意する必要があります。例: 命令型のコンシューマーとリアクティブ関数を構成したと仮定します。そのような組成の結果は、リアクティブ Consumer です。ただし、このセクションで前述したようなコンシューマーをサブスクライブする方法はないため、この制限に対処するには、コンシューマーをリアクティブにして手動でサブスクライブするか(前述のように)、機能を必須に変更する必要があります。

ポーリング構成プロパティ

次のプロパティは Spring Cloud Stream によって公開され、接頭辞 spring.integration.poller. が付けられます。

fixedDelay

デフォルトのポーラーの遅延をミリ秒単位で修正しました。

デフォルト: 1000L.

maxMessagesPerPoll

デフォルトのポーラーの各ポーリングイベントの最大メッセージ。

デフォルト: 1L.

cron

cron トリガーの cron 式の値。

デフォルト: なし。

initialDelay

定期的なトリガーの初期遅延。

デフォルト: 0.

timeUnit

遅延値に適用する TimeUnit。

デフォルト: MILLISECONDS.

たとえば、--spring.integration.poller.fixed-delay=2000 は、2 秒ごとにポーリングするようにポーラー間隔を設定します。

バインディングごとのポーリング構成

前のセクションでは、すべてのバインディングに適用される単一のデフォルトポーラーを構成する方法を示しました。各マイクロサービスが単一のコンポーネント(サプライヤーなど)を表すように設計されたマイクロサービス spring-cloud-stream のモデルにうまく適合しているため、デフォルトのポーラー構成で十分ですが、異なるポーリング構成を必要とする複数のコンポーネントがある場合があります。

このような場合は、バインディングごとの方法でポーラーを構成してください。例: supply-out-0 をバインドする出力があると仮定します。この場合、spring.cloud.stream.bindings.supply-out-0.producer.poller.. プレフィックス(spring.cloud.stream.bindings.supply-out-0.producer.poller.fixed-delay=2000 など)を使用して、このようなバインディング用にポーラーを構成できます。

任意のデータを出力に送信する (たとえば外国のイベント駆動型ソース)

実際のデータソースは、バインダーではない外部(外部)システムからのものである場合があります。例: データのソースは、従来の REST エンドポイントである可能性があります。このようなソースを spring-cloud-stream で使用される機能メカニズムとどのように橋渡ししますか?

Spring Cloud Stream には 2 つのメカニズムがあるため、それらについて詳しく見ていきましょう。

ここでは、両方のサンプルで、ルート Web コンテキストにバインドされた delegateToSupplier と呼ばれる標準の MVC エンドポイントメソッドを使用し、受信リクエストを StreamBridge メカニズムを介してストリーミングに委譲します。

@SpringBootApplication
@Controller
public class WebSourceApplication {

	public static void main(String[] args) {
		SpringApplication.run(WebSourceApplication.class, "--spring.cloud.stream.output-bindings=toStream");
	}

	@Autowired
	private StreamBridge streamBridge;

	@RequestMapping
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void delegateToSupplier(@RequestBody String body) {
		System.out.println("Sending " + body);
		streamBridge.send("toStream", body);
	}
}

ここでは、StreamBridge Bean をオートワイヤーします。これにより、データを出力バインディングに送信して、非ストリームアプリケーションと spring-cloud-stream を効果的にブリッジできます。上記の例では、ソース関数が定義されていないことに注意してください(Supplier Bean など)。フレームワークには、事前にソースバインディングを作成するトリガーがありません。これは、構成に関数 Bean が含まれている場合に一般的です。StreamBridge は、send(..) 操作への最初の呼び出しで存在しないバインディングの出力バインディングの作成(および必要に応じて宛先の自動プロビジョニング)を開始し、その後の再利用のためにそれをキャッシュするため、これは問題ありません(詳細については StreamBridge と動的宛先を参照)。

ただし、初期化 (起動) 時に出力バインディングを事前に作成したい場合は、ソースの名前を宣言できる spring.cloud.stream.output-bindings プロパティを利用すると便利です。指定された名前は、ソースバインディングを作成するトリガーとして使用されます。; を使用して複数のソースを示すことができます (複数の出力バインディング) (例: --spring.cloud.stream.output-bindings=foo;bar)

また、streamBridge.send(..) メソッドはデータに Object を使用することに注意してください。つまり、POJO または Message を送信でき、出力を送信するときに、関数と同じレベルの一貫性を提供する関数またはサプライヤーからのものであるかのように、同じルーチンを実行します。これは、出力型の変換、パーティショニングなどが、関数によって生成された出力からのものであるかのように尊重されることを意味します。

明示的なバインディングの作成で説明されているのとは異なり、StreamBridge はパフォーマンスと、必要に応じて任意の数のバインディングをオンザフライで作成する機能の両方を考慮して設計されています。そのため、StreamBridge によって作成された実際のバインディングはアプリケーションコンテキストにキャッシュされず、結合の視覚化と制御に従って管理することはできません。しかし、それでも StreamBridge を使用してバインディングを動的に作成し、後で管理したい場合は、StreamBridge を使用する前に、次のメカニズムを使用して明示的にバインディングを作成してください - ref:spring-cloud-stream/binding_visualization_control.adocl#_define_new_and_manage_existing_bindings[ プログラムによる新しいバインディングの定義 ]

非同期送信付き StreamBridge

StreamBridge は、Spring Cloud Stream の核となる Spring Integration フレームワークによって提供される送信メカニズムを使用します。デフォルトでは、このメカニズムは送信者のスレッドを使用します。つまり、送信はブロックされています。多くの場合これで問題ありませんが、送信を非同期にしたい場合もあります。これを行うには、いずれかの送信メソッドを呼び出す前に、StreamBridge の setAsync(true) メソッドを使用します。

非同期送信による可観測性コンテキスト伝播

Spring フレームワークのサポートだけでなく、フレームワークによって提供される Observability サポートを使用する場合、スレッド境界を壊すと、Observability コンテキスト、つまりトレース履歴の一貫性に影響します。これを回避するには、Micrometer から context-propagation 依存関係を追加するだけです。(下記を参照)

<dependency>
    <groupId>io.micrometer</groupId>
    <artifactId>context-propagation</artifactId>
    <version>1.1.0</version>
</dependency>

StreamBridge と動的宛先

StreamBridge は、FROM コンシューマーのルーティングのセクションで説明されている使用例と同様に、出力先が事前にわからない場合にも使用できます。

例を見てみましょう

@SpringBootApplication
@Controller
public class WebSourceApplication {

	public static void main(String[] args) {
		SpringApplication.run(WebSourceApplication.class, args);
	}

	@Autowired
	private StreamBridge streamBridge;

	@RequestMapping
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void delegateToSupplier(@RequestBody String body) {
		System.out.println("Sending " + body);
		streamBridge.send("myDestination", body);
	}
}

ご覧のとおり、前の例は、spring.cloud.stream.output-bindings プロパティ(提供されていない)を介して提供される明示的なバインディング命令を除いて、前の例と非常に似ています。ここでは、バインディングとして存在しない myDestination 名にデータを送信しています。そのような名前は、FROM コンシューマーのルーティングセクションに従って動的宛先として扱われます。

前の例では、ストリームをフィードするための外部ソースとして ApplicationRunner を使用しています。

より実用的な例では、外部ソースが REST エンドポイントです。

@SpringBootApplication
@Controller
public class WebSourceApplication {

	public static void main(String[] args) {
		SpringApplication.run(WebSourceApplication.class);
	}

	@Autowired
	private StreamBridge streamBridge;

	@RequestMapping
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void delegateToSupplier(@RequestBody String body) {
		streamBridge.send("myBinding", body);
	}
}

delegateToSupplier メソッドの内部を見るとわかるように、StreamBridge を使用して myBinding バインディングにデータを送信しています。また、ここでは StreamBridge の動的機能の恩恵を受けており、myBinding が存在しない場合は自動的に作成されてキャッシュされ、そうでない場合は既存のバインディングが使用されます。

動的な宛先(バインディング)をキャッシュすると、動的な宛先が多数ある場合にメモリリークが発生する機能があります。ある程度の制御を行うために、デフォルトのキャッシュサイズが 10 の出力バインディングに自己排除キャッシュメカニズムを提供します。これは、動的宛先サイズがその数を超えると、既存のバインディングが排除される可能性があることを意味します。再作成する必要があり、パフォーマンスがわずかに低下する機能があります。spring.cloud.stream.dynamic-destination-cache-size プロパティを使用してキャッシュサイズを増やし、目的の値に設定できます。
curl -H "Content-Type: text/plain" -X POST -d "hello from the other side" http://localhost:8080/

2 つの例を示すことにより、このアプローチがあらゆる型の外国の情報源で機能することを強調したいと思います。

Solace PubSub+ バインダーを使用している場合、Spring Cloud Stream は scst_targetDestination ヘッダー (BinderHeaders.TARGET_DESTINATION 経由で取得可能) を予約しています。これにより、バインディングの構成された宛先からこのヘッダーで指定されたターゲット宛先にメッセージをリダイレクトできます。これにより、バインダーは動的宛先に発行するために必要なリソースを管理できるようになり、フレームワークによる管理の必要性が軽減され、前のメモで説明したキャッシュの課題が回避されます。詳細については、こちらを参照してください [GitHub] (英語)

StreamBridge を使用した出力コンテンツ型

必要に応じて、次のメソッドシグネチャー public boolean send(String bindingName, Object data, MimeType outputContentType) を使用して特定のコンテンツ型を提供することもできます。または、Message としてデータを送信する場合、そのコンテンツ型が尊重されます。

StreamBridge で特定のバインダー型を使用する

Spring Cloud Stream は、複数のバインダーシナリオをサポートします。たとえば、Kafka からデータを受信し、それを RabbitMQ に送信している場合があります。

複数のバインダーのシナリオの詳細については、バインダーセクション、特にクラスパス上の複数のバインダーを参照してください。

StreamBridge の使用を計画していて、アプリケーションで複数のバインダーを構成している場合は、使用するバインダーも StreamBridge に指示する必要があります。そのために、send メソッドにはさらに 2 つのバリエーションがあります。

public boolean send(String bindingName, @Nullable String binderType, Object data)

public boolean send(String bindingName, @Nullable String binderType, Object data, MimeType outputContentType)

ご覧のとおり、提供できる追加の引数が 1 つあります。binderType は、動的バインディングを作成するときに使用するバインダーを BindingService に指示します。

spring.cloud.stream.output-bindings プロパティが使用されている場合、またはバインディングが別のバインダーですでに作成されている場合、binderType 引数は効果がありません。

StreamBridge でのチャネルインターセプターの使用

StreamBridge は MessageChannel を使用して出力バインディングを確立するため、StreamBridge を介してデータを送信するときにチャネルインターセプターをアクティブ化できます。StreamBridge に適用するチャネルインターセプターを決定するのはアプリケーション次第です。Spring Cloud Stream は、@GlobalChannelInterceptor(patterns = "*") で通知されない限り、検出されたすべてのチャネルインターセプターを StreamBridge に注入するわけではありません。

アプリケーションに次の 2 つの異なる StreamBridge バインディングがあると仮定します。

streamBridge.send("foo-out-0", message);

および

streamBridge.send("bar-out-0", message);

ここで、両方の StreamBridge バインディングにチャネルインターセプターを適用する場合は、次の GlobalChannelInterceptor Bean を宣言できます。

@Bean
@GlobalChannelInterceptor(patterns = "*")
public ChannelInterceptor customInterceptor() {
    return new ChannelInterceptor() {
        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {
            ...
        }
    };
}

ただし、上記のグローバルアプローチが気に入らず、バインディングごとに専用のインターセプターが必要な場合は、次のようにすることができます。

@Bean
@GlobalChannelInterceptor(patterns = "foo-*")
public ChannelInterceptor fooInterceptor() {
    return new ChannelInterceptor() {
        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {
            ...
        }
    };
}

および

@Bean
@GlobalChannelInterceptor(patterns = "bar-*")
public ChannelInterceptor barInterceptor() {
    return new ChannelInterceptor() {
        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {
            ...
        }
    };
}

パターンをより厳密にしたり、ビジネスニーズに合わせてカスタマイズしたりする柔軟性があります。

このアプローチにより、アプリケーションは、使用可能なすべてのインターセプターを適用するのではなく、StreamBridge に注入するインターセプターを決定することができます。

StreamBridge は、StreamBridge のすべての send メソッドを含む StreamOperations インターフェースを介して契約を提供します。アプリケーションは StreamOperations を使用してオートワイヤーすることを選択できます。これは、StreamOperations インターフェースにモックまたは同様のメカニズムを提供することにより、StreamBridge を使用するコードの単体テストに関して便利です。

リアクティブ機能のサポート

Spring Cloud Function はプロジェクト Reactor (英語) 上に構築されているため、SupplierFunctionConsumer を実装する際に、リアクティブプログラミングモデルの恩恵を受けるために行う必要のあることはあまりありません。

例:

@SpringBootApplication
public static class SinkFromConsumer {

	@Bean
	public Function<Flux<String>, Flux<String>> reactiveUpperCase() {
		return flux -> flux.map(val -> val.toUpperCase());
	}
}

リアクティブまたは命令型プログラミングモデルを選択する際に理解しておく必要がある重要なことはほとんどありません。

完全にリアクティブか、それとも単に API か ?

リアクティブ API を使用しても、そのような API のすべてのリアクティブ機能を利用できるとは限りません。つまり、バックプレッシャーやその他の高度な機能などは、互換性のあるシステム (Reactive Kafka バインダーなど) で動作している場合にのみ機能します。通常の Kafka または Rabbit またはその他の非リアクティブバインダーを使用している場合、ストリームの実際のソースまたはターゲットはリアクティブがないため、リアクティブ API 自体の便利さからのみ恩恵を受け、その高度な機能から恩恵を受けることはできません。

エラー処理と再試行

このマニュアル全体を通して、フレームワークベースのエラー処理、再試行、その他の機能、それらに関連する構成プロパティに関する参照がいくつかあります。これらは命令型関数にのみ影響し、リアクティブ関数に関しては同じ期待を抱くべきではないことを理解することが重要です。そして、ここに理由があります... リアクティブ関数と命令型関数には基本的な違いがあります。命令型関数は、フレームワークが受信する各メッセージで呼び出されるメッセージハンドラーです。N 個のメッセージに対して、このような関数が N 回呼び出されるため、このような関数をラップして、エラー処理、再試行などの追加機能を追加できます。リアクティブ関数は初期化関数です。これは、フレームワークによって提供されたものに接続するために、ユーザーによって提供された Flux/Mono への参照を取得するために 1 回だけ呼び出されます。その後、私たち (フレームワーク) はストリームをまったく表示または制御できません。リアクティブ関数では、エラー処理と再試行 (doOnError().onError*() など) に関しては、リアクティブ API の豊富な機能に頼る必要があります。

関数構成

関数型プログラミングモデルを使用すると、一連の単純な関数から複雑なハンドラーを動的に合成できる関数型合成の恩恵を受けることもできます。例として、上記で定義したアプリケーションに次の関数 Bean を追加しましょう。

@Bean
public Function<String, String> wrapInQuotes() {
	return s -> "\"" + s + "\"";
}

そして、spring.cloud.function.definition プロパティを変更して、‘ toUpperCase ’ と ‘ wrapInQuotes ’ の両方から新しい関数を作成するという意図を反映します。これを行うには、Spring Cloud Function は | (パイプ) シンボルに依存します。例を終了すると、プロパティは次のようになります。

--spring.cloud.function.definition=toUpperCase|wrapInQuotes
Spring Cloud Function によって提供される関数合成サポートの大きな利点の 1 つは、反応型および命令型の関数を合成できるという事実です。

コンポジションの結果は単一の関数であり、ご想像のとおり、他の構成プロパティに関しては非常に不便な非常に長くてわかりにくい名前(foo|bar|baz|xyz. . . など)を持つ可能性があります。これは、関数バインディング名セクションで説明されている説明的なバインディング名機能が役立つ場合があります。

例: toUpperCase|wrapInQuotes にわかりやすい名前を付けたい場合は、次のプロパティ spring.cloud.stream.function.bindings.toUpperCase|wrapInQuotes-in-0=quotedUpperCaseInput を使用して、他の構成プロパティがそのバインディング名(spring.cloud.stream.bindings.quotedUpperCaseInput.destination=myDestination など)を参照できるようにします。

機能構成と横断的関心事

関数の合成により、実行時に 1 つとして表すことができる、単純で個別に管理 / テスト可能なコンポーネントのセットに分解することで、複雑さに効果的に対処できます。しかし、それだけがメリットではありません。

また、コンポジションを使用して、コンテンツの強化など、特定の横断的非機能関心事に対処することもできます。例: 特定のヘッダーが不足している可能性のある受信メッセージがある場合、または一部のヘッダーがビジネス機能が期待する正確な状態にない場合を想定します。これで、これらの関心事に対処する別の機能を実装し、それをメインのビジネス機能で構成することができます。

例を見てみましょう

@SpringBootApplication
public class DemoStreamApplication {

	public static void main(String[] args) {
		SpringApplication.run(DemoStreamApplication.class,
				"--spring.cloud.function.definition=enrich|echo",
				"--spring.cloud.stream.function.bindings.enrich|echo-in-0=input",
				"--spring.cloud.stream.bindings.input.destination=myDestination",
				"--spring.cloud.stream.bindings.input.group=myGroup");

	}

	@Bean
	public Function<Message<String>, Message<String>> enrich() {
		return message -> {
			Assert.isTrue(!message.getHeaders().containsKey("foo"), "Should NOT contain 'foo' header");
			return MessageBuilder.fromMessage(message).setHeader("foo", "bar").build();
		};
	}

	@Bean
	public Function<Message<String>, Message<String>> echo() {
		return message -> {
			Assert.isTrue(message.getHeaders().containsKey("foo"), "Should contain 'foo' header");
			System.out.println("Incoming message " + message);
			return message;
		};
	}
}

些細なことですが、この例は、1 つの関数が受信メッセージを追加のヘッダー(機能以外の問題)で強化する方法を示しているため、他の関数(echo)はそれから利益を得ることができます。echo 関数はクリーンな状態を保ち、ビジネスロジックのみに焦点を合わせます。構成されたバインディング名を単純化するための spring.cloud.stream.function.bindings プロパティの使用箇所も確認できます。

複数の入力引数と出力引数を持つ関数

バージョン 3.0 以降 spring-cloud-stream は、複数の入力および / または複数の出力(戻り値)を持つ関数のサポートを提供します。これは実際にはどういう意味で、どのような種類のユースケースを対象としていますか?

  • ビッグデータ: 扱っているデータのソースが非常に整理されておらず、さまざまな型のデータ要素(オーダー、トランザクションなど)が含まれており、効果的に整理する必要があると想像してください

  • データ集約: 別のユースケースでは、2 つ以上の受信 _stream からのデータ要素をマージする必要がある場合があります

上記は、データの複数のストリームを受け入れたり生成したりするために単一の関数を使用する必要がある可能性があるいくつかのユースケースについて説明しています。これが、ここで対象としている型のユースケースです。

また、ここではストリームの概念が少し異なることに注意してください。このような関数は、(個々の要素ではなく)実際のデータストリームへのアクセスが許可されている場合にのみ価値があると想定されています。そのため、spring-cloud-functions によってもたらされる依存関係の一部として、クラスパスですでに利用可能なプロジェクト Reactor (英語) によって提供される抽象化(つまり、Flux および Mono)に依存しています。

もう 1 つの重要な側面は、複数の入力と出力の表現です。java は、さまざまな異なる抽象化を提供して、それらの抽象化が a)無制限b)アリティの欠如、c)このコンテキストですべて重要な型情報の欠如 であるものの複数を表します。例として、Collection または配列を見てみましょう。これにより、単一の型の複数を記述したり、すべてを Object にアップキャストしたりして、spring-cloud-stream の透過型変換機能に影響を与えることができます。

これらすべての要件に対応するために、最初のサポートは、Project Reactor によって提供される別の抽象化である Tuples を利用する署名に依存しています。ただし、より柔軟な署名を許可するよう取り組んでいます。

このようなアプリケーションで使用されるバインディング名を確立するために使用される命名規則を理解するには、バインディングとバインディング名セクションを参照してください。

いくつかのサンプルを見てみましょう。

@SpringBootApplication
public class SampleApplication {

	@Bean
	public Function<Tuple2<Flux<String>, Flux<Integer>>, Flux<String>> gather() {
		return tuple -> {
			Flux<String> stringStream = tuple.getT1();
			Flux<String> intStream = tuple.getT2().map(i -> String.valueOf(i));
			return Flux.merge(stringStream, intStream);
		};
	}
}

上記の例は、2 つの入力(最初の型 String と 2 番目の型 Integer)を受け取り、型 String の単一の出力を生成する関数を示しています。

上記の例では、2 つの入力バインディングは gather-in-0 と gather-in-1 になり、一貫性を保つために、出力バインディングも同じ規則に従い、gather-out-0 という名前が付けられます。

それを知っていると、バインディング固有のプロパティを設定できます。例: 以下は、gather-in-0 バインディングのコンテンツ型をオーバーライドします。

--spring.cloud.stream.bindings.gather-in-0.content-type=text/plain
@SpringBootApplication
public class SampleApplication {

	@Bean
	public static Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>> scatter() {
		return flux -> {
			Flux<Integer> connectedFlux = flux.publish().autoConnect(2);
			UnicastProcessor even = UnicastProcessor.create();
			UnicastProcessor odd = UnicastProcessor.create();
			Flux<Integer> evenFlux = connectedFlux.filter(number -> number % 2 == 0).doOnNext(number -> even.onNext("EVEN: " + number));
			Flux<Integer> oddFlux = connectedFlux.filter(number -> number % 2 != 0).doOnNext(number -> odd.onNext("ODD: " + number));

			return Tuples.of(Flux.from(even).doOnSubscribe(x -> evenFlux.subscribe()), Flux.from(odd).doOnSubscribe(x -> oddFlux.subscribe()));
		};
	}
}

上記の例は、前のサンプルとは多少逆であり、型 Integer の単一の入力を受け取り、2 つの出力(両方とも型 String)を生成する関数を示しています。

上記の例では、入力バインディングは scatter-in-0 であり、出力バインディングは scatter-out-0 と scatter-out-1 です。

そして、次のコードでテストします。

@Test
public void testSingleInputMultiOutput() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
			TestChannelBinderConfiguration.getCompleteConfiguration(
					SampleApplication.class))
							.run("--spring.cloud.function.definition=scatter")) {

		InputDestination inputDestination = context.getBean(InputDestination.class);
		OutputDestination outputDestination = context.getBean(OutputDestination.class);

		for (int i = 0; i < 10; i++) {
			inputDestination.send(MessageBuilder.withPayload(String.valueOf(i).getBytes()).build());
		}

		int counter = 0;
		for (int i = 0; i < 5; i++) {
			Message<byte[]> even = outputDestination.receive(0, 0);
			assertThat(even.getPayload()).isEqualTo(("EVEN: " + String.valueOf(counter++)).getBytes());
			Message<byte[]> odd = outputDestination.receive(0, 1);
			assertThat(odd.getPayload()).isEqualTo(("ODD: " + String.valueOf(counter++)).getBytes());
		}
	}
}

1 つのアプリケーションで複数の機能

複数のメッセージハンドラーを 1 つのアプリケーションにグループ化する必要がある場合もあります。これを行うには、いくつかの関数を定義します。

@SpringBootApplication
public class SampleApplication {

	@Bean
	public Function<String, String> uppercase() {
		return value -> value.toUpperCase();
	}

	@Bean
	public Function<String, String> reverse() {
		return value -> new StringBuilder(value).reverse().toString();
	}
}

上記の例では、2 つの関数 uppercase と reverse を定義する構成があります。最初に、前述のように、競合(複数の関数)があることに注意する必要があります。バインドする実際の関数を指す spring.cloud.function.definition プロパティを提供して、競合を解決する必要があります。ここを除いて、; 区切り文字を使用して両方の関数を指します(以下のテストケースを参照)。

複数の入力 / 出力を持つ関数と同様に、【バインディングとバインディング名】セクションを参照して、そのようなアプリケーションで使用されるバインディング名を確立するために使用される命名規則を理解しましょう。

そして、次のコードでテストします。

@Test
public void testMultipleFunctions() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
			TestChannelBinderConfiguration.getCompleteConfiguration(
					ReactiveFunctionConfiguration.class))
							.run("--spring.cloud.function.definition=uppercase;reverse")) {

		InputDestination inputDestination = context.getBean(InputDestination.class);
		OutputDestination outputDestination = context.getBean(OutputDestination.class);

		Message<byte[]> inputMessage = MessageBuilder.withPayload("Hello".getBytes()).build();
		inputDestination.send(inputMessage, "uppercase-in-0");
		inputDestination.send(inputMessage, "reverse-in-0");

		Message<byte[]> outputMessage = outputDestination.receive(0, "uppercase-out-0");
		assertThat(outputMessage.getPayload()).isEqualTo("HELLO".getBytes());

		outputMessage = outputDestination.receive(0, "reverse-out-0");
		assertThat(outputMessage.getPayload()).isEqualTo("olleH".getBytes());
	}
}

バッチコンシューマー

バッチリスナーをサポートする MessageChannelBinder を使用していて、この機能がコンシューマーバインディングに対して有効になっている場合、spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode を true に設定して、メッセージのバッチ全体を List の関数に渡すことができます。

@Bean
public Function<List<Person>, Person> findFirstPerson() {
    return persons -> persons.get(0);
}

バッチ型変換

単一のメッセージコンシューマーの型変換と同様に、バッチ処理ではバッチ内のすべてのメッセージをリクエストされた型に変換する必要があります。たとえば、前の例では、この型は Person です。

また、バッチ内の各メッセージのヘッダーは、バッチ全体を表すメッセージの MessageHeaders に個別に提供されることを理解することも重要です。これらのメッセージとそれに対応するバッチヘッダーは、それぞれのバインダーによって作成され、その構造が異なる場合があります。バッチヘッダーの構造を理解するには、バインダーのドキュメントを参照する必要があります。Kafka と Rabbit の場合は、それぞれ amqp_batchedHeaders と kafka_batchConvertedHeaders を検索できます。

つまり、5 つのペイロードを含むバッチを表すメッセージがある場合、同じメッセージにヘッダーのセットが含まれ、各ヘッダーは同じインデックスを持つペイロードに対応します。

しかし、特定のペイロードが変換に失敗した場合はどうなるでしょうか ? 単一メッセージのシナリオでは、単に null を返し、変換されていないメッセージでメソッドを呼び出します。これにより、関数のシグネチャーに応じて、例外が発生するか、生のメッセージを処理できるようになります。

バッチ処理の場合は、状況が少し複雑になります。変換されていないペイロードに対して null を返すと、バッチサイズが効果的に削減されます。例: 元のバッチに 5 つのメッセージが含まれていて、そのうち 2 つが変換に失敗した場合には、変換されたバッチには 3 つのメッセージのみが含まれます。これは許容できるかもしれませんが、対応するバッチヘッダーはどうでしょうか。バインダーが最初のバッチを形成したときに作成されたため、ヘッダーは 5 つ残ります。この不一致により、ヘッダーと対応するペイロードを関連付けることが難しくなります。

この課題に対処するために、MessageConverterHelper インターフェースを提供します。

public interface MessageConverterHelper {

	/**
	 * This method will be called by the framework in cases when a message failed to convert.
	 * It allows you to signal to the framework if such failure should be considered fatal or not.
	 *
	 * @param message failed message
	 * @return true if conversion failure must be considered fatal.
	 */
	default boolean shouldFailIfCantConvert(Message<?> message) {
		return false;
	}

	/**
	 * This method will be called by the framework in cases when a single message within batch of messages failed to convert.
	 * It provides a place for providing post-processing logic before message converter returns.
	 *
	 * @param message failed message.
	 * @param index index of failed message within the batch
	 */
	default void postProcessBatchMessageOnFailure(Message<?> message, int index) {
	}
}

実装されている場合、このインターフェースはフレームワークのメッセージ変換ロジックによって呼び出され、特定のペイロードを変換できない場合にバッチメッセージに対して後処理を実行します。

Kafka および Rabbit のデフォルト実装では、バッチペイロードとそのヘッダー間の相関関係を維持するために、対応するバッチヘッダーが自動的に削除されます。ただし、このような場合にカスタム動作を追加する必要がある場合は、独自の実装を提供して Bean として登録できます。

さらに、このインターフェースは、変換エラーをより決定論的に処理できるメソッドを提供します。デフォルトでは、このメソッドは false を返しますが、変換エラーが発生したときにプロセス全体を失敗させたい場合は、実装をカスタマイズできます。

バッチプロデューサー

メッセージのコレクションを返すことにより、プロデューサー側でバッチ処理の概念を使用することもできます。これにより、コレクション内の各メッセージがバインダーによって個別に送信されるという逆の効果が効果的に提供されます。

次の関数について考えてみます。

@Bean
public Function<String, List<Message<String>>> batch() {
	return p -> {
		List<Message<String>> list = new ArrayList<>();
		list.add(MessageBuilder.withPayload(p + ":1").build());
		list.add(MessageBuilder.withPayload(p + ":2").build());
		list.add(MessageBuilder.withPayload(p + ":3").build());
		list.add(MessageBuilder.withPayload(p + ":4").build());
		return list;
	};
}

返されたリストの各メッセージは個別に送信され、4 つのメッセージが出力先に送信されます。

関数としての Spring Integration フロー

関数を実装する場合、エンタープライズ統合パターン (英語) (EIP)のカテゴリに適合する複雑な要件がある場合があります。これらは、EIP のリファレンス実装である Spring Integration(SI)などのフレームワークを使用して処理するのが最適です。

ありがたいことに、SI はすでにゲートウェイとしての統合フローを介して統合フローを関数として公開するためのサポートを提供しています。次のサンプルを検討してください。

@SpringBootApplication
public class FunctionSampleSpringIntegrationApplication {

	public static void main(String[] args) {
		SpringApplication.run(FunctionSampleSpringIntegrationApplication.class, args);
	}

	@Bean
	public IntegrationFlow uppercaseFlow() {
		return IntegrationFlow.from(MessageFunction.class, spec -> spec.beanName("uppercase"))
				.<String, String>transform(String::toUpperCase)
				.log(LoggingHandler.Level.WARN)
				.bridge()
				.get();
	}

	public interface MessageFunction extends Function<Message<String>, Message<String>> {

	}
}

SI に精通している人のために、IntegrationFlow 型の Bean を定義し、uppercase と呼ばれる Function<String, String> (SI DSL を使用)として公開する統合フローを宣言していることがわかります。MessageFunction インターフェースを使用すると、適切な型変換のために入力と出力の型を明示的に宣言できます。型変換の詳細については、コンテンツ型の交渉セクションを参照してください。

生の入力を受け取るには、from(Function.class, …​) を使用できます。

結果の関数は、ターゲットバインダーによって公開される入力および出力の宛先にバインドされます。

このようなアプリケーションで使用されるバインディング名を確立するために使用される命名規則を理解するには、【バインディングとバインディング名】セクションを参照してください。

特に関数型プログラミングモデルに関する Spring Integration と Spring Cloud Stream の相互運用性の詳細については、この投稿 (英語) が非常に興味深いと思うかもしれません。Spring Integration と Spring Cloud Stream/ 関数の最高のものをマージすることで適用できるさまざまなパターンを少し深く掘り下げているからです。

ポーリングされたコンシューマーの使用

概要

ポーリングされたコンシューマーを使用する場合は、オンデマンドで PollableMessageSource をポーリングします。ポーリングされたコンシューマーのバインディングを定義するには、spring.cloud.stream.pollable-source プロパティを提供する必要があります。

ポーリングされたコンシューマーバインディングの次の例について考えてみます。

--spring.cloud.stream.pollable-source=myDestination

前の例のポーリング可能なソース名 myDestination は、myDestination-in-0 バインディング名が機能プログラミングモデルと一貫性を保つようにします。

前の例でポーリングされたコンシューマーを考えると、次のように使用できます。

@Bean
public ApplicationRunner poller(PollableMessageSource destIn, MessageChannel destOut) {
    return args -> {
        while (someCondition()) {
            try {
                if (!destIn.poll(m -> {
                    String newPayload = ((String) m.getPayload()).toUpperCase();
                    destOut.send(new GenericMessage<>(newPayload));
                })) {
                    Thread.sleep(1000);
                }
            }
            catch (Exception e) {
                // handle failure
            }
        }
    };
}

手動ではなく、Spring に似た代替手段は、スケジュールされたタスク Bean を構成することです。例:

@Scheduled(fixedDelay = 5_000)
public void poll() {
	System.out.println("Polling...");
	this.source.poll(m -> {
		System.out.println(m.getPayload());

	}, new ParameterizedTypeReference<Foo>() { });
}

PollableMessageSource.poll() メソッドは MessageHandler 引数を取ります(ここに示すように、多くの場合ラムダ式)。メッセージが受信され、正常に処理された場合は、true を返します。

メッセージ駆動型コンシューマーと同様に、MessageHandler が例外をスローすると、Error Handling に従って、メッセージはエラーチャネルに公開されます。

通常、poll() メソッドは、MessageHandler が終了したときにメッセージを確認応答します。メソッドが異常終了した場合、メッセージは拒否されます(再キューイングされません)が、エラーの処理を参照してください。次の例に示すように、確認応答の責任を取ることで、その動作をオーバーライドできます。

@Bean
public ApplicationRunner poller(PollableMessageSource dest1In, MessageChannel dest2Out) {
    return args -> {
        while (someCondition()) {
            if (!dest1In.poll(m -> {
                StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).noAutoAck();
                // e.g. hand off to another thread which can perform the ack
                // or acknowledge(Status.REQUEUE)

            })) {
                Thread.sleep(1000);
            }
        }
    };
}
リソースリークを回避するために、ある時点でメッセージを ack (または nack)する必要があります。
一部のメッセージングシステム (Apache Kafka など) では、ログに単純なオフセットが保持されます。配信が失敗し、StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).acknowledge(Status.REQUEUE); で再キューに入れられた場合、その後正常に確認されたメッセージは再配信されます。

オーバーロードされた poll メソッドもあり、その定義は次のとおりです。

poll(MessageHandler handler, ParameterizedTypeReference<?> type)

type は、次の例に示すように、受信メッセージのペイロードを変換できるようにする変換ヒントです。

boolean result = pollableSource.poll(received -> {
			Map<String, Foo> payload = (Map<String, Foo>) received.getPayload();
            ...

		}, new ParameterizedTypeReference<Map<String, Foo>>() {});

エラーの処理

デフォルトでは、ポーリング可能なソースに対してエラーチャネルが構成されています。コールバックが例外をスローすると、ErrorMessage がエラーチャネル(<destination>.<group>.errors)に送信されます。このエラーチャネルは、グローバル Spring Integration errorChannel にもブリッジされます。

エラーを処理するために、@ServiceActivator を使用していずれかのエラーチャネルにサブスクライブできます。サブスクリプションがない場合、エラーは単にログに記録され、メッセージは成功したものとして確認されます。エラーチャネルサービスアクティベータが例外をスローした場合、メッセージは(デフォルトで)拒否され、再配信されません。サービスアクティベータが RequeueCurrentMessageException をスローした場合、メッセージはブローカで再キューイングされ、後続のポーリングで再度取得されます。

リスナーが RequeueCurrentMessageException を直接スローした場合、メッセージは上記のように再キューイングされ、エラーチャネルに送信されません。