最新の安定バージョンについては、Spring for Apache Kafka 3.3.5 を使用してください! |
アプリケーションイベント
次の Spring アプリケーションイベントは、リスナーコンテナーとそのコンシューマーによって公開されます。
ConsumerStartingEvent
: コンシューマースレッドが最初に開始されたとき、ポーリングを開始する前に公開されます。ConsumerStartedEvent
: コンシューマーがポーリングを開始しようとしているときに公開されます。ConsumerFailedToStartEvent
:consumerStartTimeout
コンテナープロパティ内にConsumerStartingEvent
が公開されていない場合に公開されます。このイベントは、構成されたタスクエグゼキューターに、使用されているコンテナーとその並行性をサポートするのに十分なスレッドがないことを示している可能性があります。この状態が発生すると、エラーメッセージもログに記録されます。ListenerContainerIdleEvent
:idleEventInterval
でメッセージが受信されていないときに公開されます(構成されている場合)。ListenerContainerNoLongerIdleEvent
: 以前にListenerContainerIdleEvent
を公開した後にレコードが消費されたときに公開されます。ListenerContainerPartitionIdleEvent
:idlePartitionEventInterval
のそのパーティションからメッセージが受信されていないときに公開されます(構成されている場合)。ListenerContainerPartitionNoLongerIdleEvent
: 以前にListenerContainerPartitionIdleEvent
を公開したパーティションからレコードが消費されたときに公開されます。NonResponsiveConsumerEvent
: コンシューマーがpoll
メソッドでブロックされているように見えるときに公開されます。ConsumerPartitionPausedEvent
: パーティションが一時停止されたときに各コンシューマーによって公開されます。ConsumerPartitionResumedEvent
: パーティションが再開されたときに各コンシューマーによって公開されます。ConsumerPausedEvent
: コンテナーが一時停止されたときに各コンシューマーによって公開されます。ConsumerResumedEvent
: コンテナーが再開されたときに各コンシューマーによって公開されます。ConsumerStoppingEvent
: 停止する直前に各コンシューマーによって公開されました。ConsumerStoppedEvent
: コンシューマーが閉鎖された後に公開されます。スレッドセーフを参照してください。ConsumerRetryAuthEvent
: コンシューマーの認証または認可が失敗し、再試行されているときに発行されます。ConsumerRetryAuthSuccessfulEvent
: 認証または認可が正常に再試行されたときに発行されます。以前にConsumerRetryAuthEvent
があった場合にのみ発生する可能性があります。ContainerStoppedEvent
: すべてのコンシューマーが停止したときに公開されます。
デフォルトでは、アプリケーションコンテキストのイベントマルチキャストは、呼び出し元のスレッドでイベントリスナーを呼び出します。非同期エグゼキュータを使用するようにマルチキャストを変更する場合、イベントにコンシューマーへの参照が含まれているときに Consumer メソッドを呼び出さないでください。 |
ListenerContainerIdleEvent
には次のプロパティがあります。
source
: イベントを公開したリスナーコンテナーインスタンス。container
: ソースコンテナーが子の場合は、リスナーコンテナーまたは親リスナーコンテナー。id
: リスナー ID(またはコンテナー Bean 名)。idleTime
: イベントが公開されたときにコンテナーがアイドル状態だった時間。topicPartitions
: イベントの生成時にコンテナーに割り当てられたトピックとパーティション。consumer
: KafkaConsumer
オブジェクトへの参照。例: コンシューマーのpause()
メソッドが以前に呼び出された場合、イベントの受信時にresume()
を実行できます。paused
: コンテナーが現在一時停止されているかどうか。詳細については、リスナーコンテナーの一時停止と再開を参照してください。
ListenerContainerNoLongerIdleEvent
は、idleTime
と paused
を除いて、同じプロパティを持っています。
ListenerContainerPartitionIdleEvent
には次のプロパティがあります。
source
: イベントを公開したリスナーコンテナーインスタンス。container
: ソースコンテナーが子の場合は、リスナーコンテナーまたは親リスナーコンテナー。id
: リスナー ID(またはコンテナー Bean 名)。idleTime
: イベントが公開されたとき、時間パーティションの消費はアイドル状態でした。topicPartition
: イベントをトリガーしたトピックとパーティション。consumer
: KafkaConsumer
オブジェクトへの参照。例: コンシューマーのpause()
メソッドが以前に呼び出された場合、イベントの受信時にresume()
を実行できます。paused
: そのパーティションの消費がそのコンシューマーに対して現在一時停止されているかどうか。詳細については、リスナーコンテナーの一時停止と再開を参照してください。
ListenerContainerPartitionNoLongerIdleEvent
は、idleTime
と paused
を除いて、同じプロパティを持っています。
NonResponsiveConsumerEvent
には次のプロパティがあります。
source
: イベントを公開したリスナーコンテナーインスタンス。container
: ソースコンテナーが子の場合は、リスナーコンテナーまたは親リスナーコンテナー。id
: リスナー ID(またはコンテナー Bean 名)。timeSinceLastPoll
: コンテナーが最後にpoll()
と呼ばれる直前の時間。topicPartitions
: イベントの生成時にコンテナーに割り当てられたトピックとパーティション。consumer
: KafkaConsumer
オブジェクトへの参照。例: コンシューマーのpause()
メソッドが以前に呼び出された場合、イベントの受信時にresume()
を実行できます。paused
: コンテナーが現在一時停止されているかどうか。詳細については、リスナーコンテナーの一時停止と再開を参照してください。
ConsumerPausedEvent
、ConsumerResumedEvent
、ConsumerStopping
イベントには、次のプロパティがあります。
source
: イベントを公開したリスナーコンテナーインスタンス。container
: ソースコンテナーが子の場合は、リスナーコンテナーまたは親リスナーコンテナー。partitions
: 関係するTopicPartition
インスタンス。
ConsumerPartitionPausedEvent
、ConsumerPartitionResumedEvent
イベントには、次のプロパティがあります。
source
: イベントを公開したリスナーコンテナーインスタンス。container
: ソースコンテナーが子の場合は、リスナーコンテナーまたは親リスナーコンテナー。partition
: 関係するTopicPartition
インスタンス。
ConsumerRetryAuthEvent
イベントには次のプロパティがあります。
source
: イベントを公開したリスナーコンテナーインスタンス。container
: ソースコンテナーが子の場合は、リスナーコンテナーまたは親リスナーコンテナー。reason
:AUTHENTICATION
- 認証例外のため、イベントが発行されました。AUTHORIZATION
- 認可例外のため、イベントが発行されました。
ConsumerStartingEvent
、ConsumerStartingEvent
、ConsumerFailedToStartEvent
、ConsumerStoppedEvent
、ConsumerRetryAuthSuccessfulEvent
、ContainerStoppedEvent
イベントには、次のプロパティがあります。
source
: イベントを公開したリスナーコンテナーインスタンス。container
: ソースコンテナーが子の場合は、リスナーコンテナーまたは親リスナーコンテナー。
すべてのコンテナー(子または親)は ContainerStoppedEvent
を公開します。親コンテナーの場合、ソースプロパティとコンテナープロパティは同じです。
さらに、ConsumerStoppedEvent
には次の追加プロパティがあります。
reason
:NORMAL
- コンシューマーは正常に停止しました(コンテナーは停止しました)。ERROR
-java.lang.Error
がスローされました。FENCED
- トランザクションプロデューサーはフェンスで囲まれ、stopContainerWhenFenced
コンテナープロパティはtrue
です。AUTH
-AuthenticationException
またはAuthorizationException
がスローされ、authExceptionRetryInterval
が構成されていません。NO_OFFSET
- パーティションのオフセットはなく、auto.offset.reset
ポリシーはnone
です。
このイベントを使用して、次のような状態の後にコンテナーを再起動できます。
if (event.getReason.equals(Reason.FENCED)) {
event.getSource(MessageListenerContainer.class).start();
}
アイドル状態のコンシューマーと無反応なコンシューマーの検出
効率的ではありますが、非同期コンシューマーの問題の 1 つは、アイドル状態を検出することです。一定期間メッセージが到着しない場合は、何らかのアクションを実行することをお勧めします。
メッセージが配信されずに時間が経過したときに ListenerContainerIdleEvent
を公開するようにリスナーコンテナーを構成できます。コンテナーがアイドル状態の間、イベントは idleEventInterval
ミリ秒ごとに発行されます。
この機能を構成するには、コンテナーに idleEventInterval
を設定します。次の例は、その方法を示しています。
@Bean
public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
...
containerProps.setIdleEventInterval(60000L);
...
KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(consumerFactory, containerProps);
return container;
}
次の例は、@KafkaListener
の idleEventInterval
を設定する方法を示しています。
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.getContainerProperties().setIdleEventInterval(60000L);
...
return factory;
}
いずれの場合も、コンテナーがアイドル状態のときにイベントが 1 分に 1 回公開されます。
何らかの理由で、コンシューマー poll()
メソッドが終了しない場合、メッセージは受信されず、アイドルイベントを生成できません(これは、ブローカーに到達できなかった初期バージョンの kafka-clients
の問題でした)。この場合、ポーリングが 3x
内で pollTimeout
プロパティを返さない場合、コンテナーは NonResponsiveConsumerEvent
を公開します。デフォルトでは、このチェックは各コンテナーで 30 秒ごとに 1 回実行されます。この動作を変更するには、リスナーコンテナーを構成するときに、ContainerProperties
で monitorInterval
(デフォルトは 30 秒)および noPollThreshold
(デフォルトは 3.0)プロパティを設定します。noPollThreshold
は、競合状態が原因で誤ったイベントが発生しないように、1.0
よりも大きくする必要があります。このようなイベントを受信すると、コンテナーを停止できるため、コンシューマーを起こして停止させることができます。
バージョン 2.6.2 以降、コンテナーが ListenerContainerIdleEvent
を公開している場合、その後レコードが受信されたときに ListenerContainerNoLongerIdleEvent
が公開されます。
イベントの消費
これらのイベントは、ApplicationListener
を実装することでキャプチャーできます。一般的なリスナー、またはこの特定のイベントのみを受信するように絞り込まれたリスナーのいずれかです。Spring Framework 4.2 で導入された @EventListener
も使用できます。
次の例では、@KafkaListener
と @EventListener
を単一のクラスに結合します。アプリケーションリスナーはすべてのコンテナーのイベントを取得するため、どのコンテナーがアイドル状態であるかに基づいて特定のアクションを実行する場合は、リスナー ID を確認する必要があることを理解する必要があります。この目的に @EventListener
の condition
を使用することもできます。
イベントのプロパティについては、アプリケーションイベントを参照してください。
イベントは通常、コンシューマースレッドで公開されるため、Consumer
オブジェクトと安全にやり取りできます。
次の例では、@KafkaListener
と @EventListener
の両方を使用しています。
public class Listener {
@KafkaListener(id = "qux", topics = "annotated")
public void listen4(@Payload String foo, Acknowledgment ack) {
...
}
@EventListener(condition = "event.listenerId.startsWith('qux-')")
public void eventHandler(ListenerContainerIdleEvent event) {
...
}
}
イベントリスナーは、すべてのコンテナーのイベントを表示します。前の例では、リスナー ID に基づいて受信するイベントを絞り込みます。@KafkaListener 用に作成されたコンテナーは並行性をサポートするため、実際のコンテナーには id-n という名前が付けられます。n は、並行性をサポートする各インスタンスの一意の値です。ということでコンディションで startsWith を採用。 |
アイドルイベントを使用してリスタコンテナーを停止する場合は、リスナーを呼び出すスレッドで container.stop() を呼び出さないでください。これを行うと、遅延や不要なログメッセージが発生します。代わりに、コンテナーを停止できる別のスレッドにイベントを渡す必要があります。また、コンテナーインスタンスが子コンテナーである場合、コンテナーインスタンスを stop() しないでください。代わりに、並行コンテナーを停止する必要があります。 |
アイドル時の現在位置
なお、リスナーに ConsumerSeekAware
を実装することで、アイドル検出時に現在位置を取得できます。シークの onIdleContainer()
を参照してください。