最新の安定バージョンについては、spring-cloud-stream 4.3.0 を使用してください。 |
レコードの消費
上記の upppercase 関数では、レコードを Flux<String> として消費し、それを Flux<String> として生成しています。元の受信形式 ( ReceiverRecord) でレコードを受信する必要がある場合があります。ここにそのような機能があります。
@Bean
public Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<String>> lowercase() {
return s -> s.map(rec -> new String(rec.value()).toLowerCase());
} この関数では、レコードを Flux<ReceiverRecord<byte[], byte[]>> として消費し、それを Flux<String> として生成していることに注意してください。ReceiverRecord は、Reactor Kafka に特化した Kafka ConsumerRecord である基本受信レコードです。リアクティブ Kafka バインダーを使用する場合、上記の関数により、受信レコードごとに ReceiverRecord 型にアクセスできます。ただし、この場合、RecordMessageConverter (Javadoc) のカスタム実装を提供する必要があります。デフォルトでは、リアクティブ Kafka バインダーは、ペイロードとヘッダーを ConsumerRecord から変換する MessagingMessageConverter (Javadoc) を使用します。ハンドラーメソッドがそれを受け取るまでに、ペイロードはすでに受信したレコードから抽出され、上記の最初の関数の場合と同様にメソッドに渡されます。アプリケーションでカスタム RecordMessageConverter 実装を提供することにより、デフォルトの動作をオーバーライドできます。例: レコードを生の Flux<ReceiverRecord<byte[], byte[]>> として使用する場合は、アプリケーションで次の Bean 定義を提供できます。
@Bean
RecordMessageConverter fullRawReceivedRecord() {
return new RecordMessageConverter() {
private final RecordMessageConverter converter = new MessagingMessageConverter();
@Override
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
Consumer<?, ?> consumer, Type payloadType) {
return MessageBuilder.withPayload(record).build();
}
@Override
public ProducerRecord<?, ?> fromMessage(Message<?> message, String defaultTopic) {
return this.converter.fromMessage(message, defaultTopic);
}
};
} 次に、必要なバインディングにこのコンバーターを使用するようにフレームワークに指示する必要があります。これは、lowercase 関数に基づく例です。
spring.cloud.stream.kafka.bindings.lowercase-in-0.consumer.converterBeanName=fullRawReceivedRecord"lowercase-in-0 は、lowercase 関数の入力バインディング名です。送信 (lowercase-out-0) については、引き続き通常の MessagingMessageConverter を使用します。
上記の toMessage 実装では、生の ConsumerRecord (リアクティブバインダーコンテキストにあるため ReceiverRecord ) を受け取り、それを Message 内にラップします。次に、ReceiverRecord であるメッセージペイロードがユーザーメソッドに提供されます。
reactiveAutoCommit が false (デフォルト) の場合、オフセットをコミットするために rec.receiverOffset().acknowledge() (または commit()) を呼び出します。reactiveAutoCommit が true の場合、flux は代わりに ConsumerRecord を提供します。詳細については、reactor-kafka のドキュメントと javadoc を参照してください。