Spring で管理される Producer Interceptor
バージョン 3.0.0 以降、プロデューサーインターセプターに関しては、インターセプターのクラス名を Apache Kafka プロデューサー構成に指定する代わりに、Spring がそれを Bean として直接管理できるようにすることができます。このアプローチを採用する場合は、このプロデューサーインターセプターを KafkaTemplate に設定する必要があります。以下は、上記と同じ MyProducerInterceptor を使用する例ですが、内部構成プロパティを使用しないように変更されています。
public class MyProducerInterceptor implements ProducerInterceptor<String, String> {
private final SomeBean bean;
public MyProducerInterceptor(SomeBean bean) {
this.bean = bean;
}
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
this.bean.someMethod("producer interceptor");
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
}@Bean
public MyProducerInterceptor myProducerInterceptor(SomeBean someBean) {
return new MyProducerInterceptor(someBean);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf, MyProducerInterceptor myProducerInterceptor) {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(pf);
kafkaTemplate.setProducerInterceptor(myProducerInterceptor);
} レコードが送信される直前に、プロデューサーインターセプターの onSend メソッドが呼び出されます。サーバーがデータの公開に関する確認を送信すると、onAcknowledgement メソッドが呼び出されます。onAcknowledgement は、プロデューサーがユーザーコールバックを呼び出す直前に呼び出されます。
Spring を介して管理され、KafkaTemplate に適用する必要があるこのようなプロデューサーインターセプターが複数ある場合は、代わりに CompositeProducerInterceptor を使用する必要があります。CompositeProducerInterceptor では、個々のプロデューサーインターセプターを順番に追加できます。基盤となる ProducerInterceptor 実装のメソッドは、CompositeProducerInterceptor に追加された順序で呼び出されます。