メッセージのフィルタリング

リバランスなどの特定のシナリオでは、すでに処理されたメッセージが再配信される場合があります。フレームワークは、そのようなメッセージが処理されたかどうかを知ることができません。それはアプリケーションレベルの関数です。これはべき等レシーバー (英語) パターンとして知られており、Spring Integration はその実装を提供します。

Spring for Apache Kafka プロジェクトは、MessageListener をラップできる FilteringMessageListenerAdapter クラスを使用して支援も提供します。このクラスは RecordFilterStrategy の実装を取り、filter メソッドを実装して、メッセージが重複しているため破棄する必要があることを通知します。これには、アダプターが破棄されたレコードを確認する必要があるかどうかを示す ackDiscarded と呼ばれる追加のプロパティがあります。デフォルトは false です。

@KafkaListener を使用する場合は、リスナーが適切なフィルタリングアダプターにラップされるように、コンテナーファクトリで RecordFilterStrategy (およびオプションで ackDiscarded)を設定します。

さらに、バッチメッセージリスナーを使用する場合に備えて、FilteringBatchMessageListenerAdapter が提供されます。

ConsumerRecords は不変であるため、@KafkaListener が List<ConsumerRecord<?, ?>> ではなく ConsumerRecords<?, ?> を受信した場合、FilteringBatchMessageListenerAdapter は無視されます。

バージョン 2.8.4 以降では、リスナーアノテーションの filter プロパティを使用して、リスナーコンテナーファクトリのデフォルトの RecordFilterStrategy をオーバーライドできます。

@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
    ...
}

バージョン 3.3 以降では、RecordFilterStrategy によるフィルタリングの結果として生じる空のバッチを無視することがサポートされています。RecordFilterStrategy を実装する場合は、ignoreEmptyBatch() を通じて構成できます。デフォルト設定は false であり、すべての ConsumerRecord がフィルタリングされても KafkaListener が呼び出されることを示します。

true が返された場合、すべての ConsumerRecord が除外されると KafkaListener は呼び出されません。ただし、ブローカーへのコミットは引き続き実行されます。

false が返された場合、すべての ConsumerRecord がフィルタリングされると KafkaListener が呼び出されます

下記は用例です。

public class IgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
    ...
    @Override
    public List<ConsumerRecord<String, String>> filterBatch(
            List<ConsumerRecord<String, String>> consumerRecords) {
        return List.of();
    }

    @Override
    public boolean ignoreEmptyBatch() {
        return true;
    }
};

// NOTE: ignoreEmptyBatchRecordFilterStrategy is bean name of IgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "ignoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
    ...
}

この場合、IgnoreEmptyBatchRecordFilterStrategy は常に空のリストを返し、ignoreEmptyBatch() の結果として true を返します。KafkaListener#listen(…​) が呼び出されることはありません。

public class NotIgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
    ...
    @Override
    public List<ConsumerRecord<String, String>> filterBatch(
            List<ConsumerRecord<String, String>> consumerRecords) {
        return List.of();
    }

    @Override
    public boolean ignoreEmptyBatch() {
        return false;
    }
};

// NOTE: notIgnoreEmptyBatchRecordFilterStrategy is bean name of NotIgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "notIgnoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
    ...
}

ただし、この場合、IgnoreEmptyBatchRecordFilterStrategy は常に空のリストを返し、ignoreEmptyBatch() の結果として false を返します。KafkaListener#listen(…​) は常に呼び出されます。