Kafka バインダーリスナーコンテナーカスタマイザー
Spring Cloud Stream は、カスタマイザーを使用することで、メッセージリスナーコンテナーの強力なカスタマイズオプションを提供します。このセクションでは、Kafka で使用できるカスタマイザーインターフェースである ListenerContainerCustomizer、Kafka 固有の拡張機能である KafkaListenerContainerCustomizer、および特殊な ListenerContainerWithDlqAndRetryCustomizer について説明します。
ListenerContainerCustomizer
ListenerContainerCustomizer は、メッセージリスナーコンテナーのカスタマイズを可能にする Spring Cloud Stream の汎用インターフェースです。
使用方法
ListenerContainerCustomizer を使用するには、構成でこのインターフェースを実装する Bean を作成します。
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> genericCustomizer() {
return (container, destinationName, group) -> {
// Customize the container here
};
}ListenerContainerCustomizer インターフェースは次のメソッドを定義します。
void configure(C container, String destinationName, String group);container: カスタマイズするメッセージリスナーコンテナー。destinationName: 宛先(トピック)の名前。group: コンシューマーグループ ID。
KafkaListenerContainerCustomizer
KafkaListenerContainerCustomizer インターフェースは、ListenerContainerCustomizer を継承してリスナーコンテナーの動作を変更し、バインディング固有の拡張 Kafka コンシューマープロパティへのアクセスを提供します。
使用方法
KafkaListenerContainerCustomizer を使用するには、構成でこのインターフェースを実装する Bean を作成します。
@Bean
public KafkaListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> kafkaCustomizer() {
return (container, destinationName, group, properties) -> {
// Customize the Kafka container here
};
}KafkaListenerContainerCustomizer インターフェースは次のメソッドを追加します。
default void configureKafkaListenerContainer(
C container,
String destinationName,
String group,
ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
configure(container, destinationName, group);
} このメソッドは、追加のパラメーターを使用して基本 configure メソッドを継承します。
extendedConsumerProperties: Kafka 固有のプロパティを含む、拡張されたコンシューマープロパティ。
ListenerContainerWithDlqAndRetryCustomizer
ListenerContainerWithDlqAndRetryCustomizer インターフェースは、デッドレターキュー (DLQ) と再試行メカニズムを含むシナリオに対して追加のカスタマイズオプションを提供します。
使用方法
ListenerContainerWithDlqAndRetryCustomizer を使用するには、構成でこのインターフェースを実装する Bean を作成します。
@Bean
public ListenerContainerWithDlqAndRetryCustomizer dlqCustomizer() {
return (container, destinationName, group, dlqDestinationResolver, backOff, properties) -> {
// Access the container here with access to the extended consumer binding properties.
};
}ListenerContainerWithDlqAndRetryCustomizer インターフェースは次のメソッドを定義します。
void configure(
AbstractMessageListenerContainer<?, ?> container,
String destinationName,
String group,
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
BackOff backOff,
ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties
);container: カスタマイズする Kafka リスナーコンテナー。destinationName: 宛先(トピック)の名前。group: コンシューマーグループ ID。dlqDestinationResolver: 失敗したレコードの DLQ 宛先を解決する関数。backOff: 再試行のバックオフポリシー。extendedConsumerProperties: Kafka 固有のプロパティを含む、拡張されたコンシューマープロパティ。