コンテナーの動的作成

実行時にリスナーコンテナーを作成するために使用できる手法がいくつかあります。このセクションでは、これらのテクニックのいくつかを探ります。

MessageListener の実装

独自のリスナーを直接実装する場合は、コンテナーファクトリを使用して、そのリスナーの生のコンテナーを作成するだけです。

ユーザーリスナー
  • Java

  • Kotlin

public class MyListener implements MessageListener<String, String> {

    @Override
    public void onMessage(ConsumerRecord<String, String> data) {
        // ...
    }

}

private ConcurrentMessageListenerContainer<String, String> createContainer(
        ConcurrentKafkaListenerContainerFactory<String, String> factory, String topic, String group) {

    ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(topic);
    container.getContainerProperties().setMessageListener(new MyListener());
    container.getContainerProperties().setGroupId(group);
    container.setBeanName(group);
    container.start();
    return container;
}
class MyListener : MessageListener<String?, String?> {

    override fun onMessage(data: ConsumerRecord<String?, String?>) {
        // ...
    }

}

private fun createContainer(
    factory: ConcurrentKafkaListenerContainerFactory<String, String>, topic: String, group: String
): ConcurrentMessageListenerContainer<String, String> {
    val container = factory.createContainer(topic)
    container.containerProperties.messageListener = MyListener()
    container.containerProperties.groupId = group
    container.beanName = group
    container.start()
    return container
}

試作 Bean

@KafkaListener でアノテーションが付けられたメソッドのコンテナーは、Bean をプロトタイプとして宣言することで動的に作成できます。

プロトタイプ
  • Java

  • Kotlin

public class MyPojo {

    private final String id;

    private final String topic;

    public MyPojo(String id, String topic) {
        this.id = id;
        this.topic = topic;
    }

    public String getId() {
        return this.id;
    }

    public String getTopic() {
        return this.topic;
    }

    @KafkaListener(id = "#{__listener.id}", topics = "#{__listener.topic}")
    public void listen(String in) {
        System.out.println(in);
    }

}

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
MyPojo pojo(String id, String topic) {
    return new MyPojo(id, topic);
}

applicationContext.getBean(MyPojo.class, "one", "topic2");
applicationContext.getBean(MyPojo.class, "two", "topic3");
class MyPojo(id: String?, topic: String?) {

    @KafkaListener(id = "#{__listener.id}", topics = ["#{__listener.topics}"])
    fun listen(`in`: String?) {
        println(`in`)
    }

}

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
fun pojo(id: String?, topic: String?): MyPojo {
    return MyPojo(id, topic)
}

applicationContext.getBean(MyPojo::class.java, "one", arrayOf("topic2"))
applicationContext.getBean(MyPojo::class.java, "two", arrayOf("topic3"))
リスナーには一意の ID が必要です。バージョン 2.8.9 以降、KafkaListenerEndpointRegistry には新しいメソッド unregisterListenerContainer(String id) があり、ID を再利用できます。コンテナーの登録を解除しても、コンテナーは stop() されません。自分で行う必要があります。