アプリケーションイベント

次の Spring アプリケーションイベントは、リスナーコンテナーとそのコンシューマーによって公開されます。

  • ConsumerStartingEvent: コンシューマースレッドが最初に開始されたとき、ポーリングを開始する前に公開されます。

  • ConsumerStartedEvent: コンシューマーがポーリングを開始しようとしているときに公開されます。

  • ConsumerFailedToStartEventconsumerStartTimeout コンテナープロパティ内に ConsumerStartingEvent が公開されていない場合に公開されます。このイベントは、構成されたタスクエグゼキューターに、使用されているコンテナーとその並行性をサポートするのに十分なスレッドがないことを示している可能性があります。この状態が発生すると、エラーメッセージもログに記録されます。

  • ListenerContainerIdleEventidleInterval でメッセージが受信されていないときに公開されます(構成されている場合)。

  • ListenerContainerNoLongerIdleEvent: 以前に ListenerContainerIdleEvent を公開した後にレコードが消費されたときに公開されます。

  • ListenerContainerPartitionIdleEventidlePartitionEventInterval のそのパーティションからメッセージが受信されていないときに公開されます(構成されている場合)。

  • ListenerContainerPartitionNoLongerIdleEvent: 以前に ListenerContainerPartitionIdleEvent を公開したパーティションからレコードが消費されたときに公開されます。

  • NonResponsiveConsumerEvent: コンシューマーが poll メソッドでブロックされているように見えるときに公開されます。

  • ConsumerPartitionPausedEvent: パーティションが一時停止されたときに各コンシューマーによって公開されます。

  • ConsumerPartitionResumedEvent: パーティションが再開されたときに各コンシューマーによって公開されます。

  • ConsumerPausedEvent: コンテナーが一時停止されたときに各コンシューマーによって公開されます。

  • ConsumerResumedEvent: コンテナーが再開されたときに各コンシューマーによって公開されます。

  • ConsumerStoppingEvent: 停止する直前に各コンシューマーによって公開されました。

  • ConsumerStoppedEvent: コンシューマーが閉鎖された後に公開されます。スレッドセーフを参照してください。

  • ConsumerRetryAuthEvent: コンシューマーの認証または認可が失敗し、再試行されているときに発行されます。

  • ConsumerRetryAuthSuccessfulEvent: 認証または認可が正常に再試行されたときに発行されます。以前に ConsumerRetryAuthEvent があった場合にのみ発生する可能性があります。

  • ContainerStoppedEvent: すべてのコンシューマーが停止したときに公開されます。

デフォルトでは、アプリケーションコンテキストのイベントマルチキャストは、呼び出し元のスレッドでイベントリスナーを呼び出します。非同期エグゼキュータを使用するようにマルチキャストを変更する場合、イベントにコンシューマーへの参照が含まれているときに Consumer メソッドを呼び出さないでください。

ListenerContainerIdleEvent には次のプロパティがあります。

  • source: イベントを公開したリスナーコンテナーインスタンス。

  • container: ソースコンテナーが子の場合は、リスナーコンテナーまたは親リスナーコンテナー。

  • id: リスナー ID(またはコンテナー Bean 名)。

  • idleTime: イベントが公開されたときにコンテナーがアイドル状態だった時間。

  • topicPartitions: イベントの生成時にコンテナーに割り当てられたトピックとパーティション。

  • consumer: Kafka Consumer オブジェクトへの参照。例: コンシューマーの pause() メソッドが以前に呼び出された場合、イベントの受信時に resume() を実行できます。

  • paused: コンテナーが現在一時停止されているかどうか。詳細については、リスナーコンテナーの一時停止と再開を参照してください。

ListenerContainerNoLongerIdleEvent は、idleTime と paused を除いて、同じプロパティを持っています。

ListenerContainerPartitionIdleEvent には次のプロパティがあります。

  • source: イベントを公開したリスナーコンテナーインスタンス。

  • container: ソースコンテナーが子の場合は、リスナーコンテナーまたは親リスナーコンテナー。

  • id: リスナー ID(またはコンテナー Bean 名)。

  • idleTime: イベントが公開されたとき、時間パーティションの消費はアイドル状態でした。

  • topicPartition: イベントをトリガーしたトピックとパーティション。

  • consumer: Kafka Consumer オブジェクトへの参照。例: コンシューマーの pause() メソッドが以前に呼び出された場合、イベントの受信時に resume() を実行できます。

  • paused: そのパーティションの消費がそのコンシューマーに対して現在一時停止されているかどうか。詳細については、リスナーコンテナーの一時停止と再開を参照してください。

ListenerContainerPartitionNoLongerIdleEvent は、idleTime と paused を除いて、同じプロパティを持っています。

NonResponsiveConsumerEvent には次のプロパティがあります。

  • source: イベントを公開したリスナーコンテナーインスタンス。

  • container: ソースコンテナーが子の場合は、リスナーコンテナーまたは親リスナーコンテナー。

  • id: リスナー ID(またはコンテナー Bean 名)。

  • timeSinceLastPoll: コンテナーが最後に poll() と呼ばれる直前の時間。

  • topicPartitions: イベントの生成時にコンテナーに割り当てられたトピックとパーティション。

  • consumer: Kafka Consumer オブジェクトへの参照。例: コンシューマーの pause() メソッドが以前に呼び出された場合、イベントの受信時に resume() を実行できます。

  • paused: コンテナーが現在一時停止されているかどうか。詳細については、リスナーコンテナーの一時停止と再開を参照してください。

ConsumerPausedEventConsumerResumedEventConsumerStopping イベントには、次のプロパティがあります。

  • source: イベントを公開したリスナーコンテナーインスタンス。

  • container: ソースコンテナーが子の場合は、リスナーコンテナーまたは親リスナーコンテナー。

  • partitions: 関係する TopicPartition インスタンス。

ConsumerPartitionPausedEventConsumerPartitionResumedEvent イベントには、次のプロパティがあります。

  • source: イベントを公開したリスナーコンテナーインスタンス。

  • container: ソースコンテナーが子の場合は、リスナーコンテナーまたは親リスナーコンテナー。

  • partition: 関係する TopicPartition インスタンス。

ConsumerRetryAuthEvent イベントには次のプロパティがあります。

  • source: イベントを公開したリスナーコンテナーインスタンス。

  • container: ソースコンテナーが子の場合は、リスナーコンテナーまたは親リスナーコンテナー。

  • reason:

    • AUTHENTICATION - 認証例外のため、イベントが発行されました。

    • AUTHORIZATION - 認可例外のため、イベントが発行されました。

ConsumerStartingEventConsumerStartedEventConsumerFailedToStartEventConsumerStoppedEventConsumerRetryAuthSuccessfulEventContainerStoppedEvent イベントには、次のプロパティがあります。

  • source: イベントを公開したリスナーコンテナーインスタンス。

  • container: ソースコンテナーが子の場合は、リスナーコンテナーまたは親リスナーコンテナー。

すべてのコンテナー(子または親)は ContainerStoppedEvent を公開します。親コンテナーの場合、ソースプロパティとコンテナープロパティは同じです。

さらに、ConsumerStoppedEvent には次の追加プロパティがあります。

  • reason:

    • NORMAL - コンシューマーは正常に停止しました(コンテナーは停止しました)。

    • ERROR - java.lang.Error がスローされました。

    • FENCED - トランザクションプロデューサーはフェンスで囲まれ、stopContainerWhenFenced コンテナープロパティは true です。

    • AUTH - AuthenticationException または AuthorizationException がスローされ、authExceptionRetryInterval が構成されていません。

    • NO_OFFSET - パーティションのオフセットはなく、auto.offset.reset ポリシーは none です。

このイベントを使用して、次のような状態の後にコンテナーを再起動できます。

if (event.getReason.equals(Reason.FENCED)) {
    event.getSource(MessageListenerContainer.class).start();
}

アイドル状態のコンシューマーと無反応なコンシューマーの検出

効率的ではありますが、非同期コンシューマーの問題の 1 つは、アイドル状態を検出することです。一定期間メッセージが到着しない場合は、何らかのアクションを実行することをお勧めします。

メッセージが配信されずに時間が経過したときに ListenerContainerIdleEvent を公開するようにリスナーコンテナーを構成できます。コンテナーがアイドル状態の間、イベントは idleEventInterval ミリ秒ごとに発行されます。

この機能を構成するには、コンテナーに idleEventInterval を設定します。次の例は、その方法を示しています。

@Bean
public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
    ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
    ...
    containerProps.setIdleEventInterval(60000L);
    ...
    KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(consumerFactory, containerProps);
    return container;
}

次の例は、@KafkaListener の idleEventInterval を設定する方法を示しています。

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.getContainerProperties().setIdleEventInterval(60000L);
    ...
    return factory;
}

いずれの場合も、コンテナーがアイドル状態のときにイベントが 1 分に 1 回公開されます。

何らかの理由で、コンシューマー poll() メソッドが終了しない場合、メッセージは受信されず、アイドルイベントを生成できません(これは、ブローカーに到達できなかった初期バージョンの kafka-clients の問題でした)。この場合、ポーリングが 3x 内で pollTimeout プロパティを返さない場合、コンテナーは NonResponsiveConsumerEvent を公開します。デフォルトでは、このチェックは各コンテナーで 30 秒ごとに 1 回実行されます。この動作を変更するには、リスナーコンテナーを構成するときに、ContainerProperties で monitorInterval (デフォルトは 30 秒)および noPollThreshold (デフォルトは 3.0)プロパティを設定します。noPollThreshold は、競合状態が原因で誤ったイベントが発生しないように、1.0 よりも大きくする必要があります。このようなイベントを受信すると、コンテナーを停止できるため、コンシューマーを起こして停止させることができます。

バージョン 2.6.2 以降、コンテナーが ListenerContainerIdleEvent を公開している場合、その後レコードが受信されたときに ListenerContainerNoLongerIdleEvent が公開されます。

イベントの消費

これらのイベントは、ApplicationListener を実装することでキャプチャーできます。一般的なリスナー、またはこの特定のイベントのみを受信するように絞り込まれたリスナーのいずれかです。Spring Framework 4.2 で導入された @EventListener も使用できます。

次の例では、@KafkaListener と @EventListener を単一のクラスに結合します。アプリケーションリスナーはすべてのコンテナーのイベントを取得するため、どのコンテナーがアイドル状態であるかに基づいて特定のアクションを実行する場合は、リスナー ID を確認する必要があることを理解する必要があります。この目的に @EventListener の condition を使用することもできます。

イベントのプロパティについては、アプリケーションイベントを参照してください。

イベントは通常、コンシューマースレッドで公開されるため、Consumer オブジェクトと安全にやり取りできます。

次の例では、@KafkaListener と @EventListener の両方を使用しています。

public class Listener {

    @KafkaListener(id = "qux", topics = "annotated")
    public void listen4(@Payload String foo, Acknowledgment ack) {
        ...
    }

    @EventListener(condition = "event.listenerId.startsWith('qux-')")
    public void eventHandler(ListenerContainerIdleEvent event) {
        ...
    }

}
イベントリスナーは、すべてのコンテナーのイベントを表示します。前の例では、リスナー ID に基づいて受信するイベントを絞り込みます。@KafkaListener 用に作成されたコンテナーは並行性をサポートするため、実際のコンテナーには id-n という名前が付けられます。n は、並行性をサポートする各インスタンスの一意の値です。ということでコンディションで startsWith を採用。
アイドルイベントを使用してリスタコンテナーを停止する場合は、リスナーを呼び出すスレッドで container.stop() を呼び出さないでください。これを行うと、遅延や不要なログメッセージが発生します。代わりに、コンテナーを停止できる別のスレッドにイベントを渡す必要があります。また、コンテナーインスタンスが子コンテナーである場合、コンテナーインスタンスを stop() しないでください。代わりに、並行コンテナーを停止する必要があります。

アイドル時の現在位置

なお、リスナーに ConsumerSeekAware を実装することで、アイドル検出時に現在位置を取得できます。シークの onIdleContainer() を参照してください。