メッセージのフィルタリング
リバランスなどの特定のシナリオでは、すでに処理されたメッセージが再配信される場合があります。フレームワークは、そのようなメッセージが処理されたかどうかを知ることができません。それはアプリケーションレベルの関数です。これはべき等レシーバー (英語) パターンとして知られており、Spring Integration はその実装を提供します。
Spring for Apache Kafka プロジェクトは、MessageListener
をラップできる FilteringMessageListenerAdapter
クラスを使用して支援も提供します。このクラスは RecordFilterStrategy
の実装を取り、filter
メソッドを実装して、メッセージが重複しているため破棄する必要があることを通知します。これには、アダプターが破棄されたレコードを確認する必要があるかどうかを示す ackDiscarded
と呼ばれる追加のプロパティがあります。デフォルトは false
です。
@KafkaListener
を使用する場合は、リスナーが適切なフィルタリングアダプターにラップされるように、コンテナーファクトリで RecordFilterStrategy
(およびオプションで ackDiscarded
)を設定します。
さらに、バッチメッセージリスナーを使用する場合に備えて、FilteringBatchMessageListenerAdapter
が提供されます。
ConsumerRecords は不変であるため、@KafkaListener が List<ConsumerRecord<?, ?>> ではなく ConsumerRecords<?, ?> を受信した場合、FilteringBatchMessageListenerAdapter は無視されます。 |
バージョン 2.8.4 以降では、リスナーアノテーションの filter
プロパティを使用して、リスナーコンテナーファクトリのデフォルトの RecordFilterStrategy
をオーバーライドできます。
@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
...
}
バージョン 3.3 以降では、RecordFilterStrategy
によるフィルタリングの結果として生じる空のバッチを無視することがサポートされています。RecordFilterStrategy
を実装する場合は、ignoreEmptyBatch()
を通じて構成できます。デフォルト設定は false
であり、すべての ConsumerRecord
がフィルタリングされても KafkaListener
が呼び出されることを示します。
true
が返された場合、すべての ConsumerRecord
が除外されると KafkaListener
は呼び出されません。ただし、ブローカーへのコミットは引き続き実行されます。
false
が返された場合、すべての ConsumerRecord
がフィルタリングされると KafkaListener
が呼び出されます。
下記は用例です。
public class IgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
...
@Override
public List<ConsumerRecord<String, String>> filterBatch(
List<ConsumerRecord<String, String>> consumerRecords) {
return List.of();
}
@Override
public boolean ignoreEmptyBatch() {
return true;
}
};
// NOTE: ignoreEmptyBatchRecordFilterStrategy is bean name of IgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "ignoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
...
}
この場合、IgnoreEmptyBatchRecordFilterStrategy
は常に空のリストを返し、ignoreEmptyBatch()
の結果として true
を返します。KafkaListener#listen(…)
が呼び出されることはありません。
public class NotIgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
...
@Override
public List<ConsumerRecord<String, String>> filterBatch(
List<ConsumerRecord<String, String>> consumerRecords) {
return List.of();
}
@Override
public boolean ignoreEmptyBatch() {
return false;
}
};
// NOTE: notIgnoreEmptyBatchRecordFilterStrategy is bean name of NotIgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "notIgnoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
...
}
ただし、この場合、IgnoreEmptyBatchRecordFilterStrategy
は常に空のリストを返し、ignoreEmptyBatch()
の結果として false
を返します。KafkaListener#listen(…)
は常に呼び出されます。