メッセージのフィルタリング
リバランスなどの特定のシナリオでは、すでに処理されたメッセージが再配信される場合があります。フレームワークは、そのようなメッセージが処理されたかどうかを知ることができません。それはアプリケーションレベルの関数です。これはべき等レシーバー (英語) パターンとして知られており、Spring Integration はその実装を提供します。
Spring for Apache Kafka プロジェクトは、MessageListener
をラップできる FilteringMessageListenerAdapter
クラスを使用して支援も提供します。このクラスは RecordFilterStrategy
の実装を取り、filter
メソッドを実装して、メッセージが重複しているため破棄する必要があることを通知します。これには、アダプターが破棄されたレコードを確認する必要があるかどうかを示す ackDiscarded
と呼ばれる追加のプロパティがあります。デフォルトは false
です。
@KafkaListener
を使用する場合は、リスナーが適切なフィルタリングアダプターにラップされるように、コンテナーファクトリで RecordFilterStrategy
(およびオプションで ackDiscarded
)を設定します。
さらに、バッチメッセージリスナーを使用する場合に備えて、FilteringBatchMessageListenerAdapter
が提供されます。
ConsumerRecords は不変であるため、@KafkaListener が List<ConsumerRecord<?, ?>> ではなく ConsumerRecords<?, ?> を受信した場合、FilteringBatchMessageListenerAdapter は無視されます。 |
バージョン 2.8.4 以降では、リスナーアノテーションの filter
プロパティを使用して、リスナーコンテナーファクトリのデフォルトの RecordFilterStrategy
をオーバーライドできます。
@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
...
}