メッセージリスナー
メッセージリスナーコンテナーを使用する場合は、データを受信するためのリスナーを提供する必要があります。現在、メッセージリスナー用に 8 つのインターフェースがサポートされています。次のリストは、これらのインターフェースを示しています。
public interface MessageListener<K, V> { (1)
void onMessage(ConsumerRecord<K, V> data);
}
public interface AcknowledgingMessageListener<K, V> { (2)
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { (3)
void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
}
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { (4)
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
public interface BatchMessageListener<K, V> { (5)
void onMessage(List<ConsumerRecord<K, V>> data);
}
public interface BatchAcknowledgingMessageListener<K, V> { (6)
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}
public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { (7)
void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
}
public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { (8)
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
1 | 自動コミットまたはコンテナー管理のコミット方法のいずれかを使用する場合、Kafka コンシューマー poll() 操作から受け取った個々の ConsumerRecord インスタンスを処理するには、このインターフェースを使用します。 |
2 | 手動コミット方法の 1 つを使用する場合、Kafka コンシューマー poll() 操作から受け取った個々の ConsumerRecord インスタンスを処理するには、このインターフェースを使用します。 |
3 | 自動コミットまたはコンテナー管理のコミット方法のいずれかを使用する場合、Kafka コンシューマー poll() 操作から受け取った個々の ConsumerRecord インスタンスを処理するには、このインターフェースを使用します。Consumer オブジェクトへのアクセスが提供されます。 |
4 | 手動コミット方法の 1 つを使用する場合、Kafka コンシューマー poll() 操作から受け取った個々の ConsumerRecord インスタンスを処理するには、このインターフェースを使用します。Consumer オブジェクトへのアクセスが提供されます。 |
5 | 自動コミットまたはコンテナー管理のコミット方法のいずれかを使用する場合、Kafka コンシューマー poll() 操作から受け取ったすべての ConsumerRecord インスタンスを処理するために、このインターフェースを使用します。リスナーには完全なバッチが与えられるため、このインターフェースを使用する場合、AckMode.RECORD はサポートされません。 |
6 | 手動コミット方法の 1 つを使用する場合、Kafka コンシューマー poll() 操作から受け取ったすべての ConsumerRecord インスタンスを処理するために、このインターフェースを使用します。 |
7 | 自動コミットまたはコンテナー管理のコミット方法のいずれかを使用する場合、Kafka コンシューマー poll() 操作から受け取ったすべての ConsumerRecord インスタンスを処理するために、このインターフェースを使用します。リスナーには完全なバッチが与えられるため、このインターフェースを使用する場合、AckMode.RECORD はサポートされません。Consumer オブジェクトへのアクセスが提供されます。 |
8 | 手動コミット方法の 1 つを使用する場合、Kafka コンシューマー poll() 操作から受け取ったすべての ConsumerRecord インスタンスを処理するために、このインターフェースを使用します。Consumer オブジェクトへのアクセスが提供されます。 |
Consumer オブジェクトはスレッドセーフではありません。リスナーを呼び出すスレッドでのみメソッドを呼び出す必要があります。 |
リスナー内のコンシューマーの位置やコミットされたオフセットに影響を与える Consumer<?, ?> メソッドを実行しないでください。コンテナーはそのような情報を管理する必要があります。 |