レコードの消費

上記の upppercase 関数では、レコードを Flux<String> として消費し、それを Flux<String> として生成しています。元の受信形式 ( ReceiverRecord) でレコードを受信する必要がある場合があります。ここにそのような機能があります。

@Bean
public Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<String>> lowercase() {
    return s -> s.map(rec -> new String(rec.value()).toLowerCase());
}

この関数では、レコードを Flux<ReceiverRecord<byte[], byte[]>> として消費し、それを Flux<String> として生成していることに注意してください。ReceiverRecord は、Reactor Kafka に特化した Kafka ConsumerRecord である基本受信レコードです。リアクティブ Kafka バインダーを使用する場合、上記の関数により、受信レコードごとに ReceiverRecord 型にアクセスできます。ただし、この場合、RecordMessageConverter (Javadoc) のカスタム実装を提供する必要があります。デフォルトでは、リアクティブ Kafka バインダーは、ペイロードとヘッダーを ConsumerRecord から変換する MessagingMessageConverter (Javadoc) を使用します。ハンドラーメソッドがそれを受け取るまでに、ペイロードはすでに受信したレコードから抽出され、上記の最初の関数の場合と同様にメソッドに渡されます。アプリケーションでカスタム RecordMessageConverter 実装を提供することにより、デフォルトの動作をオーバーライドできます。例: レコードを生の Flux<ReceiverRecord<byte[], byte[]>> として使用する場合は、アプリケーションで次の Bean 定義を提供できます。

@Bean
RecordMessageConverter fullRawReceivedRecord() {
    return new RecordMessageConverter() {

        private final RecordMessageConverter converter = new MessagingMessageConverter();

        @Override
        public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
                Consumer<?, ?> consumer, Type payloadType) {
            return MessageBuilder.withPayload(record).build();
        }

        @Override
        public ProducerRecord<?, ?> fromMessage(Message<?> message, String defaultTopic) {
            return this.converter.fromMessage(message, defaultTopic);
        }

    };
}

次に、必要なバインディングにこのコンバーターを使用するようにフレームワークに指示する必要があります。これは、lowercase 関数に基づく例です。

spring.cloud.stream.kafka.bindings.lowercase-in-0.consumer.converterBeanName=fullRawReceivedRecord"

lowercase-in-0 は、lowercase 関数の入力バインディング名です。送信 (lowecase-out-0) については、引き続き通常の MessagingMessageConverter を使用します。

上記の toMessage 実装では、生の ConsumerRecord (リアクティブバインダーコンテキストにあるため ReceiverRecord ) を受け取り、それを Message 内にラップします。次に、ReceiverRecord であるメッセージペイロードがユーザーメソッドに提供されます。

reactiveAutoCommit が false (デフォルト) の場合、rec.receiverOffset().acknowledge() (または commit()) を呼び出してオフセットをコミットします。reactiveAutoCommit が true の場合、フラックスは代わりに ConsumerRecord を提供します。詳細については、reactor-kafka のドキュメントと javadoc を参照してください。