Kafka バインダーリスナーコンテナーカスタマイザー

Spring Cloud Stream は、カスタマイザーを使用することで、メッセージリスナーコンテナーの強力なカスタマイズオプションを提供します。このセクションでは、Kafka で使用できるカスタマイザーインターフェースである ListenerContainerCustomizer、Kafka 固有の拡張機能である KafkaListenerContainerCustomizer、および特殊な ListenerContainerWithDlqAndRetryCustomizer について説明します。

ListenerContainerCustomizer

ListenerContainerCustomizer は、メッセージリスナーコンテナーのカスタマイズを可能にする Spring Cloud Stream の汎用インターフェースです。

目的

リスナーコンテナーの動作を変更する必要がある場合は、このカスタマイザーを使用します。

使用方法

ListenerContainerCustomizer を使用するには、構成でこのインターフェースを実装する Bean を作成します。

@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> genericCustomizer() {
    return (container, destinationName, group) -> {
        // Customize the container here
    };
}

ListenerContainerCustomizer インターフェースは次のメソッドを定義します。

void configure(C container, String destinationName, String group);
  • container: カスタマイズするメッセージリスナーコンテナー。

  • destinationName: 宛先(トピック)の名前。

  • group: コンシューマーグループ ID。

KafkaListenerContainerCustomizer

KafkaListenerContainerCustomizer インターフェースは、ListenerContainerCustomizer を継承してリスナーコンテナーの動作を変更し、バインディング固有の拡張 Kafka コンシューマープロパティへのアクセスを提供します。

目的

リスナーコンテナーをカスタマイズするときに、バインディング固有の拡張 Kafka コンシューマープロパティにアクセスする必要がある場合は、このカスタマイザーを使用します。

使用方法

KafkaListenerContainerCustomizer を使用するには、構成でこのインターフェースを実装する Bean を作成します。

@Bean
public KafkaListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> kafkaCustomizer() {
    return (container, destinationName, group, properties) -> {
        // Customize the Kafka container here
    };
}

KafkaListenerContainerCustomizer インターフェースは次のメソッドを追加します。

default void configureKafkaListenerContainer(
    C container,
    String destinationName,
    String group,
    ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
        configure(container, destinationName, group);
}

このメソッドは、追加のパラメーターを使用して基本 configure メソッドを継承します。

  • extendedConsumerProperties: Kafka 固有のプロパティを含む、拡張されたコンシューマープロパティ。

ListenerContainerWithDlqAndRetryCustomizer

ListenerContainerWithDlqAndRetryCustomizer インターフェースは、デッドレターキュー (DLQ) と再試行メカニズムを含むシナリオに対して追加のカスタマイズオプションを提供します。

目的

DLQ の動作を微調整したり、Kafka コンシューマーのカスタム再試行ロジックを実装したりする必要がある場合は、このカスタマイザーを使用します。

使用方法

ListenerContainerWithDlqAndRetryCustomizer を使用するには、構成でこのインターフェースを実装する Bean を作成します。

@Bean
public ListenerContainerWithDlqAndRetryCustomizer dlqCustomizer() {
    return (container, destinationName, group, dlqDestinationResolver, backOff, properties) -> {
        // Access the container here with access to the extended consumer binding properties.
    };
}

ListenerContainerWithDlqAndRetryCustomizer インターフェースは次のメソッドを定義します。

void configure(
    AbstractMessageListenerContainer<?, ?> container,
    String destinationName,
    String group,
    BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
    BackOff backOff,
    ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties
);
  • container: カスタマイズする Kafka リスナーコンテナー。

  • destinationName: 宛先(トピック)の名前。

  • group: コンシューマーグループ ID。

  • dlqDestinationResolver: 失敗したレコードの DLQ 宛先を解決する関数。

  • backOff: 再試行のバックオフポリシー。

  • extendedConsumerProperties: Kafka 固有のプロパティを含む、拡張されたコンシューマープロパティ。

要約

  • DLQ が有効な場合は ListenerContainerWithDlqAndRetryCustomizer が使用されます。

  • KafkaListenerContainerCustomizer は、DLQ なしの Kafka 固有のカスタマイズに使用されます。

  • ベースとなる ListenerContainerCustomizer は、汎用カスタマイズに使用されます。

この階層的なアプローチにより、Spring Cloud Stream アプリケーション内の Kafka リスナーコンテナーを柔軟かつ具体的にカスタマイズできます。