コンテナーの動的作成
実行時にリスナーコンテナーを作成するために使用できる手法がいくつかあります。このセクションでは、これらのテクニックのいくつかを探ります。
MessageListener の実装
独自のリスナーを直接実装する場合は、コンテナーファクトリを使用して、そのリスナーの生のコンテナーを作成するだけです。
ユーザーリスナー
Java
Kotlin
public class MyListener implements MessageListener<String, String> {
@Override
public void onMessage(ConsumerRecord<String, String> data) {
// ...
}
}
private ConcurrentMessageListenerContainer<String, String> createContainer(
ConcurrentKafkaListenerContainerFactory<String, String> factory, String topic, String group) {
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(topic);
container.getContainerProperties().setMessageListener(new MyListener());
container.getContainerProperties().setGroupId(group);
container.setBeanName(group);
container.start();
return container;
}
class MyListener : MessageListener<String?, String?> {
override fun onMessage(data: ConsumerRecord<String?, String?>) {
// ...
}
}
private fun createContainer(
factory: ConcurrentKafkaListenerContainerFactory<String, String>, topic: String, group: String
): ConcurrentMessageListenerContainer<String, String> {
val container = factory.createContainer(topic)
container.containerProperties.messageListener = MyListener()
container.containerProperties.groupId = group
container.beanName = group
container.start()
return container
}
試作 Bean
@KafkaListener
でアノテーションが付けられたメソッドのコンテナーは、Bean をプロトタイプとして宣言することで動的に作成できます。
プロトタイプ
Java
Kotlin
public class MyPojo {
private final String id;
private final String topic;
public MyPojo(String id, String topic) {
this.id = id;
this.topic = topic;
}
public String getId() {
return this.id;
}
public String getTopic() {
return this.topic;
}
@KafkaListener(id = "#{__listener.id}", topics = "#{__listener.topic}")
public void listen(String in) {
System.out.println(in);
}
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
MyPojo pojo(String id, String topic) {
return new MyPojo(id, topic);
}
applicationContext.getBean(MyPojo.class, "one", "topic2");
applicationContext.getBean(MyPojo.class, "two", "topic3");
class MyPojo(val id: String, val topic: String) {
@KafkaListener(id = "#{__listener.id}", topics = ["#{__listener.topic}"])
fun listen(`in`: String?) {
println(`in`)
}
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
fun pojo(id: String, topic: String): MyPojo {
return MyPojo(id, topic)
}
applicationContext.getBean(MyPojo::class.java, "one", "topic2")
applicationContext.getBean(MyPojo::class.java, "two", "topic3")
リスナーには一意の ID が必要です。バージョン 2.8.9 以降、KafkaListenerEndpointRegistry には新しいメソッド unregisterListenerContainer(String id) があり、ID を再利用できます。コンテナーの登録を解除しても、コンテナーは stop() されません。自分で行う必要があります。 |