Kafka への接続

バージョン 2.5 以降、これらはそれぞれ KafkaResourceFactory を継承します。これにより、Supplier<String> を構成に追加することにより、実行時にブートストラップサーバーを変更できます: setBootstrapServersSupplier(() -> …​)。これは、サーバーのリストを取得するために、すべての新しい接続に対して呼び出されます。コンシューマーとプロデューサーは一般的に長命です。既存のプロデューサーを閉じるには、DefaultKafkaProducerFactory で reset() を呼び出します。既存のコンシューマーを閉じるには、KafkaListenerEndpointRegistry で stop() (次に start())を呼び出すか、他のリスナーコンテナー Bean で stop() と start() を呼び出します。

便宜上、フレームワークは 2 セットのブートストラップサーバーをサポートする ABSwitchCluster も提供します。そのうちの 1 つはいつでもアクティブです。setBootstrapServersSupplier() を呼び出して、ABSwitchCluster を構成し、それをプロデューサーファクトリとコンシューマーファクトリ、および KafkaAdmin に追加します。切り替えたい場合は、primary() または secondary() を呼び出し、プロデューサーファクトリで reset() を呼び出して、新しい接続を確立します。コンシューマーの場合、stop() および start() はすべてリスナーコンテナーです。@KafkaListenerstop()start() を使用する場合は、KafkaListenerEndpointRegistry Bean。

詳細については、Javadoc を参照してください。

ファクトリリスナー

バージョン 2.5 以降、DefaultKafkaProducerFactory および DefaultKafkaConsumerFactory は、プロデューサーまたはコンシューマーが作成またはクローズされるたびに通知を受信するように Listener で構成できます。

プロデューサーファクトリリスナー
interface Listener<K, V> {

    default void producerAdded(String id, Producer<K, V> producer) {
    }

    default void producerRemoved(String id, Producer<K, V> producer) {
    }

}
コンシューマーファクトリリスナー
interface Listener<K, V> {

    default void consumerAdded(String id, Consumer<K, V> consumer) {
    }

    default void consumerRemoved(String id, Consumer<K, V> consumer) {
    }

}

いずれの場合も、id は、. で区切られたファクトリ beanName プロパティに client-id プロパティ(作成後に metrics() から取得)を追加することによって作成されます。

これらのリスナーは、たとえば、新しいクライアントが作成されたときに Micrometer KafkaClientMetrics インスタンスを作成してバインドするために使用できます(クライアントが閉じられたときに閉じます)。

フレームワークは、まさにそれを行うリスナーを提供します。Micrometer ネイティブメトリクスを参照してください。