最新の安定バージョンについては、Spring for Apache Kafka 3.3.5 を使用してください! |
Kafka への接続
バージョン 2.5 以降、これらはそれぞれ KafkaResourceFactory
を継承します。これにより、Supplier<String>
を構成に追加することにより、実行時にブートストラップサーバーを変更できます: setBootstrapServersSupplier(() -> …)
。これは、サーバーのリストを取得するために、すべての新しい接続に対して呼び出されます。コンシューマーとプロデューサーは一般的に長命です。既存のプロデューサーを閉じるには、DefaultKafkaProducerFactory
で reset()
を呼び出します。既存のコンシューマーを閉じるには、KafkaListenerEndpointRegistry
で stop()
(次に start()
)を呼び出すか、他のリスナーコンテナー Bean で stop()
と start()
を呼び出します。
便宜上、フレームワークは 2 セットのブートストラップサーバーをサポートする ABSwitchCluster
も提供します。そのうちの 1 つはいつでもアクティブです。setBootstrapServersSupplier()
を呼び出して、ABSwitchCluster
を構成し、それをプロデューサーファクトリとコンシューマーファクトリ、および KafkaAdmin
に追加します。切り替えたい場合は、primary()
または secondary()
を呼び出し、プロデューサーファクトリで reset()
を呼び出して、新しい接続を確立します。コンシューマーの場合、stop()
および start()
はすべてリスナーコンテナーです。@KafkaListener
、stop()
、start()
を使用する場合は、KafkaListenerEndpointRegistry
Bean。
詳細については、Javadoc を参照してください。
ファクトリリスナー
バージョン 2.5 以降、DefaultKafkaProducerFactory
および DefaultKafkaConsumerFactory
は、プロデューサーまたはコンシューマーが作成またはクローズされるたびに通知を受信するように Listener
で構成できます。
interface Listener<K, V> {
default void producerAdded(String id, Producer<K, V> producer) {
}
default void producerRemoved(String id, Producer<K, V> producer) {
}
}
interface Listener<K, V> {
default void consumerAdded(String id, Consumer<K, V> consumer) {
}
default void consumerRemoved(String id, Consumer<K, V> consumer) {
}
}
いずれの場合も、id
は、.
で区切られたファクトリ beanName
プロパティに client-id
プロパティ(作成後に metrics()
から取得)を追加することによって作成されます。
これらのリスナーは、たとえば、新しいクライアントが作成されたときに Micrometer KafkaClientMetrics
インスタンスを作成してバインドするために使用できます(クライアントが閉じられたときに閉じます)。
フレームワークは、まさにそれを行うリスナーを提供します。Micrometer ネイティブメトリクスを参照してください。