アノテーションインターフェース EnableKafka


AbstractListenerContainerFactory によって内部で作成される Kafka リスナーアノテーション付きエンドポイントを有効にします。次のように Configuration クラスで使用します。
 @Configuration
 @EnableKafka
 public class AppConfig {
        @Bean
        public ConcurrentKafkaListenerContainerFactory myKafkaListenerContainerFactory() {
                ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
                factory.setConsumerFactory(consumerFactory());
                factory.setConcurrency(4);
                return factory;
        }
        // other @Bean definitions
 }
 
KafkaListenerContainerFactory は、特定のエンドポイントのリスナーコンテナーを作成するロールを果たします。上記のサンプルで使用されている ConcurrentKafkaListenerContainerFactory のような一般的な実装は、基盤となる MessageListenerContainer でサポートされている必要な構成オプションを提供します。

@EnableKafka は、コンテナー内の Spring 管理の Bean 上の KafkaListener アノテーションの検出を可能にします。例: MyService クラスが与えられた場合:

 package com.acme.foo;

 public class MyService {
        @KafkaListener(containerFactory = "myKafkaListenerContainerFactory", topics = "myTopic")
        public void process(String msg) {
                // process incoming message
        }
 }
 
使用するコンテナーファクトリは、使用する KafkaListenerContainerFactory Bean の名前を定義する containerFactory 属性によって識別されます。何も設定されていない場合、kafkaListenerContainerFactory という名前の KafkaListenerContainerFactory Bean が存在すると想定されます。

次の構成により、トピック "myQueue" からメッセージが受信されるたびに、メッセージの内容とともに MyService.process() が呼び出されます。

 @Configuration
 @EnableKafka
 public class AppConfig {
        @Bean
        public MyService myService() {
                return new MyService();
        }

        // Kafka infrastructure setup
 }
 
あるいは、MyService に @Component のアノテーションが付けられている場合、次の構成により、@KafkaListener アノテーション付きメソッドが一致する受信メッセージで呼び出されることが保証されます。
 @Configuration
 @EnableKafka
 @ComponentScan(basePackages = "com.acme.foo")
 public class AppConfig {
 }
 
作成されたコンテナーはアプリケーションコンテキストに登録されていませんが、KafkaListenerEndpointRegistry を使用して管理目的で簡単に見つけることができることに注意してください。

アノテーション付きメソッドは柔軟な署名を使用できます。特に、Message 抽象化と関連するアノテーションを使用することが可能です。詳細については、KafkaListener Javadoc を参照してください。たとえば、次のようにすると、メッセージの内容と kafka パーティションヘッダーが挿入されます。

 @KafkaListener(containerFactory = "myKafkaListenerContainerFactory", topics = "myTopic")
 public void process(String msg, @Header("kafka_partition") int partition) {
        // process incoming message
 }
 
これらの機能は、アノテーション付きメソッドを処理するために必要な呼び出し側を構築する MessageHandlerMethodFactory によって抽象化されます。デフォルトでは、DefaultMessageHandlerMethodFactory が使用されます。

さらに制御が必要な場合は、@Configuration クラスで KafkaListenerConfigurer を実装できます。これにより、基になる KafkaListenerEndpointRegistrar インスタンスにアクセスできます。次の例は、明示的なデフォルト KafkaListenerContainerFactory を指定する方法を示しています

        @Configuration
        @EnableKafka
        public class AppConfig implements KafkaListenerConfigurer {
                @Override
                public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
                        registrar.setContainerFactory(myKafkaListenerContainerFactory());
                }

                @Bean
                public KafkaListenerContainerFactory<?, ?> myKafkaListenerContainerFactory() {
                        // factory settings
                }

                @Bean
                public MyService myService() {
                        return new MyService();
                }
        }
 
コンテナーの作成および管理方法をさらに制御する必要がある場合は、カスタム KafkaListenerEndpointRegistry を指定することもできます。以下の例は、DefaultMessageHandlerMethodFactory をカスタマイズする方法と、カスタム Validator を提供して、Validated でアノテーションが付けられたペイロードがカスタム Validator に対して最初に検証される方法も示しています。
        @Configuration
        @EnableKafka
        public class AppConfig implements KafkaListenerConfigurer {
                @Override
                public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
                        registrar.setEndpointRegistry(myKafkaListenerEndpointRegistry());
                        registrar.setMessageHandlerMethodFactory(myMessageHandlerMethodFactory);
                        registrar.setValidator(new MyValidator());
                }

                @Bean
                public KafkaListenerEndpointRegistry myKafkaListenerEndpointRegistry() {
                        // registry configuration
                }

                @Bean
                public MessageHandlerMethodFactory myMessageHandlerMethodFactory() {
                        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
                        // factory configuration
                        return factory;
                }

                @Bean
                public MyService myService() {
                        return new MyService();
                }
        }
 
KafkaListenerConfigurer を実装すると、KafkaListenerEndpointRegistrar を介したエンドポイント登録のきめ細かい制御も可能になります。例: 以下は、追加のエンドポイントを構成します。
        @Configuration
        @EnableKafka
        public class AppConfig implements KafkaListenerConfigurer {
                @Override
                public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
                        SimpleKafkaListenerEndpoint myEndpoint = new SimpleKafkaListenerEndpoint();
                        // ... configure the endpoint
                        registrar.registerEndpoint(endpoint, anotherKafkaListenerContainerFactory());
                }

                @Bean
                public MyService myService() {
                        return new MyService();
                }

                @Bean
                public KafkaListenerContainerFactory<?, ?> anotherKafkaListenerContainerFactory() {
                        // ...
                }

                // Kafka infrastructure setup
        }
 
KafkaListenerConfigurer を実装するすべての Bean が同様の方法で検出され、呼び出されることに注意してください。上記の例は、XML 構成を使用する場合に、コンテキストに登録されている通常の Bean 定義に変換できます。
作成者:
Stephane Nicoll, Gary Russell, Artem Bilan
関連事項: