Spring で管理される Producer Interceptor

バージョン 3.0.0 以降、プロデューサーインターセプターに関しては、インターセプターのクラス名を Apache Kafka プロデューサー構成に指定する代わりに、Spring がそれを Bean として直接管理できるようにすることができます。このアプローチを採用する場合は、このプロデューサーインターセプターを KafkaTemplate に設定する必要があります。以下は、上記と同じ MyProducerInterceptor を使用する例ですが、内部構成プロパティを使用しないように変更されています。

public class MyProducerInterceptor implements ProducerInterceptor<String, String> {

    private final SomeBean bean;

    public MyProducerInterceptor(SomeBean bean) {
        this.bean = bean;
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        this.bean.someMethod("producer interceptor");
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }

    @Override
    public void close() {
    }

}
@Bean
public MyProducerInterceptor myProducerInterceptor(SomeBean someBean) {
  return new MyProducerInterceptor(someBean);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf, MyProducerInterceptor myProducerInterceptor) {
   KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(pf);
   kafkaTemplate.setProducerInterceptor(myProducerInterceptor);
}

レコードが送信される直前に、プロデューサーインターセプターの onSend メソッドが呼び出されます。サーバーがデータの公開に関する確認を送信すると、onAcknowledgement メソッドが呼び出されます。onAcknowledgement は、プロデューサーがユーザーコールバックを呼び出す直前に呼び出されます。

Spring を介して管理され、KafkaTemplate に適用する必要があるこのようなプロデューサーインターセプターが複数ある場合は、代わりに CompositeProducerInterceptor を使用する必要があります。CompositeProducerInterceptor では、個々のプロデューサーインターセプターを順番に追加できます。基盤となる ProducerInterceptor 実装のメソッドは、CompositeProducerInterceptor に追加された順序で呼び出されます。