Spring で管理される Producer Interceptor
バージョン 3.0.0 以降、プロデューサーインターセプターに関しては、インターセプターのクラス名を Apache Kafka プロデューサー構成に指定する代わりに、Spring がそれを Bean として直接管理できるようにすることができます。このアプローチを採用する場合は、このプロデューサーインターセプターを KafkaTemplate
に設定する必要があります。以下は、上記と同じ MyProducerInterceptor
を使用する例ですが、内部構成プロパティを使用しないように変更されています。
public class MyProducerInterceptor implements ProducerInterceptor<String, String> {
private final SomeBean bean;
public MyProducerInterceptor(SomeBean bean) {
this.bean = bean;
}
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
this.bean.someMethod("producer interceptor");
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
}
@Bean
public MyProducerInterceptor myProducerInterceptor(SomeBean someBean) {
return new MyProducerInterceptor(someBean);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf, MyProducerInterceptor myProducerInterceptor) {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(pf);
kafkaTemplate.setProducerInterceptor(myProducerInterceptor);
}
レコードが送信される直前に、プロデューサーインターセプターの onSend
メソッドが呼び出されます。サーバーがデータの公開に関する確認を送信すると、onAcknowledgement
メソッドが呼び出されます。onAcknowledgement
は、プロデューサーがユーザーコールバックを呼び出す直前に呼び出されます。
Spring を介して管理され、KafkaTemplate
に適用する必要があるこのようなプロデューサーインターセプターが複数ある場合は、代わりに CompositeProducerInterceptor
を使用する必要があります。CompositeProducerInterceptor
では、個々のプロデューサーインターセプターを順番に追加できます。基盤となる ProducerInterceptor
実装のメソッドは、CompositeProducerInterceptor
に追加された順序で呼び出されます。