最新の安定バージョンについては、spring-cloud-stream 4.2.1 を使用してください。 |
レコードの消費
上記の upppercase
関数では、レコードを Flux<String>
として消費し、それを Flux<String>
として生成しています。元の受信形式 ( ReceiverRecord
) でレコードを受信する必要がある場合があります。ここにそのような機能があります。
@Bean
public Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<String>> lowercase() {
return s -> s.map(rec -> new String(rec.value()).toLowerCase());
}
この関数では、レコードを Flux<ReceiverRecord<byte[], byte[]>>
として消費し、それを Flux<String>
として生成していることに注意してください。ReceiverRecord
は、Reactor Kafka に特化した Kafka ConsumerRecord
である基本受信レコードです。リアクティブ Kafka バインダーを使用する場合、上記の関数により、受信レコードごとに ReceiverRecord
型にアクセスできます。ただし、この場合、RecordMessageConverter (Javadoc) のカスタム実装を提供する必要があります。デフォルトでは、リアクティブ Kafka バインダーは、ペイロードとヘッダーを ConsumerRecord
から変換する MessagingMessageConverter (Javadoc) を使用します。ハンドラーメソッドがそれを受け取るまでに、ペイロードはすでに受信したレコードから抽出され、上記の最初の関数の場合と同様にメソッドに渡されます。アプリケーションでカスタム RecordMessageConverter
実装を提供することにより、デフォルトの動作をオーバーライドできます。例: レコードを生の Flux<ReceiverRecord<byte[], byte[]>>
として使用する場合は、アプリケーションで次の Bean 定義を提供できます。
@Bean
RecordMessageConverter fullRawReceivedRecord() {
return new RecordMessageConverter() {
private final RecordMessageConverter converter = new MessagingMessageConverter();
@Override
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
Consumer<?, ?> consumer, Type payloadType) {
return MessageBuilder.withPayload(record).build();
}
@Override
public ProducerRecord<?, ?> fromMessage(Message<?> message, String defaultTopic) {
return this.converter.fromMessage(message, defaultTopic);
}
};
}
次に、必要なバインディングにこのコンバーターを使用するようにフレームワークに指示する必要があります。これは、lowercase
関数に基づく例です。
spring.cloud.stream.kafka.bindings.lowercase-in-0.consumer.converterBeanName=fullRawReceivedRecord"
lowercase-in-0
は、lowercase
関数の入力バインディング名です。送信 (lowecase-out-0
) については、引き続き通常の MessagingMessageConverter
を使用します。
上記の toMessage
実装では、生の ConsumerRecord
(リアクティブバインダーコンテキストにあるため ReceiverRecord
) を受け取り、それを Message
内にラップします。次に、ReceiverRecord
であるメッセージペイロードがユーザーメソッドに提供されます。
reactiveAutoCommit
が false
(デフォルト) の場合、オフセットをコミットするために rec.receiverOffset().acknowledge()
(または commit()
) を呼び出します。reactiveAutoCommit
が true
の場合、flux は代わりに ConsumerRecord
を提供します。詳細については、reactor-kafka
のドキュメントと javadoc を参照してください。