Reactive Kafka Binder を使用した基本的な例

このセクションでは、リアクティブバインダーを使用してリアクティブ Kafka アプリケーションを作成するための基本的なコードスニペットとその詳細を示します。

@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
    return s -> s.map(String::toUpperCase);
}

上記の upppercase 関数は、メッセージチャネルベースの Kafka バインダー (spring-cloud-stream-binder-kafka) と、このセクションで説明するリアクティブ Kafka バインダー (spring-cloud-stream-binder-kafka-reactive) の両方で使用できます。この関数を通常の Kafka バインダーで使用する場合、アプリケーション (つまり、uppercase 関数) でリアクティブ型を使用していても、関数の実行内でリアクティブストリームのみを取得します。関数の実行コンテキストの外では、基礎となるバインダーがリアクティブスタックに基づいていないため、リアクティブのメリットはありません。これは完全なエンドツーエンドのリアクティブスタックをもたらしているように見えるかもしれませんが、このアプリケーションは部分的にしかリアクティブではありません。

ここで、上記の関数のアプリケーションで Kafka - spring-cloud-stream-binder-kafka-reactive に適切なリアクティブバインダーを使用していると仮定します。このバインダーの実装により、チェーン のトップエンドでの消費からボトムエンドでの公開まで、完全なリアクティブの利点が得られます。これは、基礎となるバインダーが Reactor Kafka (英語) のコア API 上に構築されているためです。コンシューマー側では、Kafka コンシューマーのリアクティブ実装である KafkaReceiver (英語) を利用します。同様に、プロデューサー側では、Kafka プロデューサーのリアクティブ実装である KafkaSender (英語) API を使用します。リアクティブ Kafka バインダーの基盤は、適切なリアクティブ Kafka API に基づいて構築されているため、アプリケーションはリアクティブテクノロジを使用する利点を最大限に活用できます。このリアクティブ Kafka バインダーを使用する場合、他のリアクティブ機能の中でも自動バックプレッシャーなどの機能がアプリケーションに組み込まれています。

バージョン 4.0.2 以降では、1 つ以上の ReceiverOptionsCustomizer または SenderOptionsCustomizer Bean をそれぞれ提供することで、ReceiverOptions および SenderOptions をカスタマイズできます。これらはバインディング名と初期オプションを受け取り、カスタマイズされたオプションを返す BiFunction です。インターフェースは Ordered を継承するため、複数のカスタマイザーが存在する場合、必要な順序でカスタマイザーが適用されます。

デフォルトでは、バインダーはオフセットをコミットしません。バージョン 4.0.2 以降、KafkaHeaders.ACKNOWLEDGMENT ヘッダーには ReceiverOffset オブジェクトが含まれており、acknowledge() または commit() メソッドを呼び出してオフセットをコミットさせることができます。
@Bean
public Consumer<Flux<Message<String>> consume() {
    return msg -> {
        process(msg.getPayload());
        msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, ReceiverOffset.class).acknowledge();
    }
}

詳細については、reactor-kafka のドキュメントと javadoc を参照してください。

さらに、バージョン 4.0.3 以降では、Kafka コンシューマープロパティ reactiveAtmostOnce を true に設定でき、バインダーは各ポーリングによって返されたレコードが処理される前にオフセットを自動的にコミットします。また、バージョン 4.0.3 以降では、コンシューマープロパティ reactiveAutoCommit を true に設定でき、各ポーリングによって返されたレコードが処理された後にバインダーが自動的にオフセットをコミットします。このような場合、確認応答ヘッダーは存在しません。

4.0.2 reactiveAutoCommit も提供されましたが、実装が間違っており、reactiveAtMostOnce と同様に動作しました。

以下は reaciveAutoCommit の使用方法の例です。

@Bean
Consumer<Flux<Flux<ConsumerRecord<?, String>>>> input() {
	return flux -> flux
			.doOnNext(inner -> inner
				.doOnNext(val -> {
					log.info(val.value());
				})
				.subscribe())
			.subscribe();
}

自動コミットを使用する場合、reactor-kafka は Flux<Flux<ConsumerRecord<?, ?>>> を返すことに注意してください。Spring が内部フラックスの内容にアクセスできないことを考えると、アプリケーションはネイティブ ConsumerRecord を処理する必要があります。メッセージ変換やコンテンツに適用される変換サービスはありません。これには、(構成で適切な型の Deserializer を指定することにより) ネイティブデコードを使用して、必要な型のレコードキー / 値を返す必要があります。