再試行とデッドレター処理
デフォルトでは、コンシューマーバインディングで再試行(maxAttemts
など)と enableDlq
を構成すると、これらの機能はバインダー内で実行され、リスナーコンテナーや Kafka コンシューマーは関与しません。
次のように、この機能をリスナーコンテナーに移動することが望ましい場合があります。
再試行と遅延の合計は、コンシューマーの
max.poll.interval.ms
プロパティを超え、パーティションのリバランスを引き起こす可能性があります。デッドレターを別の Kafka クラスターに公開したいとします。
エラーハンドラーに再試行リスナーを追加したいとします。
…
この機能をバインダーからコンテナーに移動するように構成するには、型 ListenerContainerWithDlqAndRetryCustomizer
の @Bean
を定義します。このインターフェースには次のメソッドがあります。
/**
* Configure the container.
* @param container the container.
* @param destinationName the destination name.
* @param group the group.
* @param dlqDestinationResolver a destination resolver for the dead letter topic (if
* enableDlq).
* @param backOff the backOff using retry properties (if configured).
* @see #retryAndDlqInBinding(String, String)
*/
void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName, String group,
@Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
@Nullable BackOff backOff);
/**
* Return false to move retries and DLQ from the binding to a customized error handler
* using the retry metadata and/or a {@code DeadLetterPublishingRecoverer} when
* configured via
* {@link #configure(AbstractMessageListenerContainer, String, String, BiFunction, BackOff)}.
* @param destinationName the destination name.
* @param group the group.
* @return false to disable retries and DLQ in the binding
*/
default boolean retryAndDlqInBinding(String destinationName, String group) {
return true;
}
宛先リゾルバーと BackOff
は、バインディングプロパティから作成されます (構成されている場合)。KafkaTemplate
は、spring.kafka….
プロパティからの構成を使用します。これらを使用して、カスタムエラーハンドラーとデッドレターパブリッシャーを作成できます。例:
@Bean
ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> template) {
return new ListenerContainerWithDlqAndRetryCustomizer() {
@Override
public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName,
String group,
@Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
@Nullable BackOff backOff) {
if (destinationName.equals("topicWithLongTotalRetryConfig")) {
ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(template,
dlqDestinationResolver);
container.setCommonErrorHandler(new DefaultErrorHandler(dlpr, backOff));
}
}
@Override
public boolean retryAndDlqInBinding(String destinationName, String group) {
return !destinationName.contains("topicWithLongTotalRetryConfig");
}
};
}
これで、1 回の再試行遅延のみがコンシューマーの max.poll.interval.ms
プロパティより大きくなる必要があります。
複数のバインダーを使用する場合、"ListenerContainerWithDlqAndRetryCustomizer" Bean は "DefaultBinderFactory" によってオーバーライドされます。Bean を適用するには、'BinderCustomizer' を使用してコンテナーカスタマイザーを設定する必要があります ( 【バインダーカスタマイザー】を参照)。
@Bean
public BinderCustomizer binderCustomizer(ListenerContainerWithDlqAndRetryCustomizer containerCustomizer) {
return (binder, binderName) -> {
if (binder instanceof KafkaMessageChannelBinder kafkaMessageChannelBinder) {
kafkaMessageChannelBinder.setContainerCustomizer(containerCustomizer);
}
else if (binder instanceof KStreamBinder) {
...
}
else if (binder instanceof RabbitMessageChannelBinder) {
...
}
};
}