最新の安定バージョンについては、Spring for Apache Kafka 3.3.10 を使用してください!

@KafkaListener ライフサイクル管理

@KafkaListener アノテーション用に作成されたリスナーコンテナーは、アプリケーションコンテキストの Bean ではありません。代わりに、型 KafkaListenerEndpointRegistry のインフラストラクチャ Bean に登録されます。この Bean はフレームワークによって自動的に宣言され、コンテナーのライフサイクルを管理します。autoStartup が true に設定されているすべてのコンテナーを自動起動します。すべてのコンテナーファクトリによって作成されたすべてのコンテナーは、同じ phase 内にある必要があります。詳細については、リスナーコンテナーの自動起動を参照してください。レジストリを使用して、プログラムでライフサイクルを管理できます。レジストリを開始または停止すると、登録されているすべてのコンテナーが開始または停止します。または、id 属性を使用して、個々のコンテナーへの参照を取得することもできます。アノテーションに autoStartup を設定できます。これは、コンテナーファクトリに設定されているデフォルト設定を上書きします。オートワイヤーなどのアプリケーションコンテキストから Bean への参照を取得して、登録されているコンテナーを管理できます。次の例は、その方法を示しています。

@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
public void listen(...) { ... }
@Autowired
private KafkaListenerEndpointRegistry registry;

...

    this.registry.getListenerContainer("myContainer").start();

...

レジストリは、管理するコンテナーのライフサイクルのみを維持します。Bean として宣言されたコンテナーはレジストリによって管理されず、アプリケーションコンテキストから取得できます。管理対象コンテナーのコレクションは、レジストリの getListenerContainers() メソッドを呼び出すことで取得できます。バージョン 2.2.5 は、便利なメソッド getAllListenerContainers() を追加しました。これは、レジストリによって管理されるコンテナーと Bean として宣言されたコンテナーを含むすべてのコンテナーのコレクションを返します。返されるコレクションには、初期化されたプロトタイプ Bean が含まれますが、遅延 Bean 宣言は初期化されません。

アプリケーションコンテキストがリフレッシュされた後に登録されたエンドポイントは、autoStartup プロパティに関係なく、SmartLifecycle 契約に準拠するためにすぐに開始されます。autoStartup は、アプリケーションコンテキストの初期化中にのみ考慮されます。遅延登録の例は、プロトタイプスコープに @KafkaListener を含む Bean であり、コンテキストが初期化された後にインスタンスが作成されます。バージョン 2.8.7 以降では、レジストリの alwaysStartAfterRefresh プロパティを false に設定できます。その後、コンテナーの autoStartup プロパティは、コンテナーが開始されるかどうかを定義します。

KafkaListenerEndpointRegistry から MessageListenerContainers を取得

KafkaListenerEndpointRegistry は、さまざまな管理シナリオに対応するために MessageListenerContainer インスタンスを取得するためのメソッドを提供します。

すべてのコンテナー : すべてのリスナーコンテナーを対象とする操作の場合は、getListenerContainers() を使用して包括的なコレクションを取得します。

Collection<MessageListenerContainer> allContainers = registry.getListenerContainers();

ID による特定のコンテナー : 個々のコンテナーを管理するために、getListenerContainer(String id) では ID による取得が可能になります。

MessageListenerContainer specificContainer = registry.getListenerContainer("myContainerId");

動的コンテナーフィルタリング : バージョン 3.2 で導入された 2 つのオーバーロードされた getListenerContainersMatching メソッドにより、コンテナーの詳細な選択が可能になります。1 つのメソッドは ID ベースのフィルタリング用の Predicate<String> をパラメーターとして受け取り、もう 1 つのメソッドはコンテナーのプロパティや状態を含む可能性のあるより高度な条件用の BiPredicate<String, MessageListenerContainer> をパラメーターとして受け取ります。

// Prefix matching (Predicate<String>)
Collection<MessageListenerContainer> filteredContainers =
    registry.getListenerContainersMatching(id -> id.startsWith("productListener-retry-"));

// Regex matching (Predicate<String>)
Collection<MessageListenerContainer> regexFilteredContainers =
    registry.getListenerContainersMatching(myPattern::matches);

// Pre-built Set of IDs (Predicate<String>)
Collection<MessageListenerContainer> setFilteredContainers =
    registry.getListenerContainersMatching(myIdSet::contains);

// Advanced Filtering: ID prefix and running state (BiPredicate<String, MessageListenerContainer>)
Collection<MessageListenerContainer> advancedFilteredContainers =
    registry.getListenerContainersMatching(
        (id, container) -> id.startsWith("specificPrefix-") && container.isRunning()
    );

これらのメソッドを利用して、アプリケーション内の MessageListenerContainer インスタンスを効率的に管理およびクエリします。