メッセージのフィルタリング

リバランスなどの特定のシナリオでは、すでに処理されたメッセージが再配信される場合があります。フレームワークは、そのようなメッセージが処理されたかどうかを知ることができません。それはアプリケーションレベルの関数です。これはべき等レシーバー (英語) パターンとして知られており、Spring Integration はその実装を提供します。

Spring for Apache Kafka プロジェクトは、MessageListener をラップできる FilteringMessageListenerAdapter クラスを使用して支援も提供します。このクラスは RecordFilterStrategy の実装を取り、filter メソッドを実装して、メッセージが重複しているため破棄する必要があることを通知します。これには、アダプターが破棄されたレコードを確認する必要があるかどうかを示す ackDiscarded と呼ばれる追加のプロパティがあります。デフォルトは false です。

@KafkaListener を使用する場合は、リスナーが適切なフィルタリングアダプターにラップされるように、コンテナーファクトリで RecordFilterStrategy (およびオプションで ackDiscarded)を設定します。

さらに、バッチメッセージリスナーを使用する場合に備えて、FilteringBatchMessageListenerAdapter が提供されます。

ConsumerRecords は不変であるため、@KafkaListener が List<ConsumerRecord<?, ?>> ではなく ConsumerRecords<?, ?> を受信した場合、FilteringBatchMessageListenerAdapter は無視されます。

バージョン 2.8.4 以降では、リスナーアノテーションの filter プロパティを使用して、リスナーコンテナーファクトリのデフォルトの RecordFilterStrategy をオーバーライドできます。

@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
    ...
}