Kafka キュー (シェアコンシューマー)
バージョン 4.0 以降、Spring for Apache Kafka は、Apache Kafka 4.0.0 の一部であり KIP-932 (Queues for Kafka) [Apache] (英語) を実装する共有コンシューマーを通じて Kafka キューのサポートを提供します。この機能は現在、早期アクセス段階にあります。
Kafka キューは、従来のコンシューマーグループとは異なる消費モデルを実現します。各パーティションが 1 つのコンシューマーに排他的に割り当てられるパーティションベースの割り当てモデルではなく、共有コンシューマーが協力して同じパーティションから消費し、レコードは共有グループ内のコンシューマー間で分散されます。
シェアコンシューマーファクトリ
ShareConsumerFactory は共有コンシューマーインスタンスの作成を担当します。Spring と Kafka は DefaultShareConsumerFactory 実装を提供します。
構成
通常の ConsumerFactory を構成する方法と同様に、DefaultShareConsumerFactory を構成できます。
@Bean
public ShareConsumerFactory<String, String> shareConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-share-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultShareConsumerFactory<>(props);
}コンストラクターオプション
DefaultShareConsumerFactory にはいくつかのコンストラクターオプションが用意されています。
// Basic configuration
new DefaultShareConsumerFactory<>(configs);
// With deserializer suppliers
new DefaultShareConsumerFactory<>(configs, keyDeserializerSupplier, valueDeserializerSupplier);
// With deserializer instances
new DefaultShareConsumerFactory<>(configs, keyDeserializer, valueDeserializer, configureDeserializers);デシリアライザの設定
デシリアライザーはいくつかの方法で設定できます。
Via Configuration Properties (単純なケースに推奨):
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);Setter 経由 :
factory.setKeyDeserializer(new StringDeserializer()); factory.setValueDeserializer(new StringDeserializer());Via Suppliers (コンシューマーごとにデシリアライザーを作成する必要がある場合):
factory.setKeyDeserializerSupplier(() -> new StringDeserializer()); factory.setValueDeserializerSupplier(() -> new StringDeserializer());
デシリアライザーがすでに完全に構成されており、ファクトリで再構成する必要がない場合は、configureDeserializers を false に設定します。
ライフサイクルリスナー
共有コンシューマーのライフサイクルを監視するためにリスナーを追加できます。
factory.addListener(new ShareConsumerFactory.Listener<String, String>() {
@Override
public void consumerAdded(String id, ShareConsumer<String, String> consumer) {
// Called when a new consumer is created
System.out.println("Consumer added: " + id);
}
@Override
public void consumerRemoved(String id, ShareConsumer<String, String> consumer) {
// Called when a consumer is closed
System.out.println("Consumer removed: " + id);
}
});メッセージリスナーコンテナーを共有する
ShareKafkaMessageListenerContainer
ShareKafkaMessageListenerContainer は、同時処理をサポートする共有コンシューマー用のコンテナーを提供します。
@Bean
public ShareKafkaMessageListenerContainer<String, String> container(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ContainerProperties containerProps = new ContainerProperties("my-topic");
containerProps.setGroupId("my-share-group");
ShareKafkaMessageListenerContainer<String, String> container =
new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProps);
container.setupMessageListener(new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
System.out.println("Received: " + record.value());
}
});
return container;
}コンテナーのプロパティ
共有コンテナーは、通常のコンシューマーが使用できるコンテナープロパティのサブセットをサポートします。
topics: サブスクライブするトピック名の配列groupId: 共有グループ IDclientId: コンシューマーのクライアント IDkafkaConsumerProperties: 追加のコンシューマープロパティ
共有コンシューマーは以下をサポートしません:
|
並行性
ShareKafkaMessageListenerContainer は、単一のコンテナー内に複数のコンシューマースレッドを作成することで、並行処理をサポートします。各スレッドは、同じ共有グループに参加する独自の ShareConsumer インスタンスを実行します。
従来のコンシューマーグループでは同時実行にパーティション分散が伴いますが、共有コンシューマーはブローカーにおける Kafka のレコードレベルの分散を活用します。つまり、同一コンテナー内の複数のコンシューマースレッドが共有グループの一部として連携し、Kafka ブローカーがすべてのコンシューマーインスタンスにレコードを分散します。
同時実行性はアプリケーションインスタンス間で加算される 共有グループの観点から見ると、各 次に例を示します: * アプリケーションインスタンス 1: つまり、単一のコンテナーに |
プログラムによる同時実行の設定
@Bean
public ShareKafkaMessageListenerContainer<String, String> concurrentContainer(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ContainerProperties containerProps = new ContainerProperties("my-topic");
containerProps.setGroupId("my-share-group");
ShareKafkaMessageListenerContainer<String, String> container =
new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProps);
// Set concurrency to create 5 consumer threads
container.setConcurrency(5);
container.setupMessageListener(new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
System.out.println("Received on " + Thread.currentThread().getName() + ": " + record.value());
}
});
return container;
}ファクトリによる並行性の設定
ファクトリレベルでデフォルトの同時実行性を設定できます。これは、そのファクトリによって作成されたすべてのコンテナーに適用されます。
@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ShareKafkaListenerContainerFactory<String, String> factory =
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
// Set default concurrency for all containers created by this factory
factory.setConcurrency(3);
return factory;
}リスナーごとの同時実行
同時実行設定は、concurrency 属性を使用してリスナーごとにオーバーライドできます。
@Component
public class ConcurrentShareListener {
@KafkaListener(
topics = "high-throughput-topic",
containerFactory = "shareKafkaListenerContainerFactory",
groupId = "my-share-group",
concurrency = "10" // Override factory default
)
public void listen(ConsumerRecord<String, String> record) {
// This listener will use 10 consumer threads
System.out.println("Processing: " + record.value());
}
}同時実行に関する考慮事項
スレッドセーフ : 各コンシューマースレッドは独自の
ShareConsumerインスタンスを持ち、独自の確認応答を独立して管理します。Client IDs : 各コンシューマースレッドは、数字の接尾辞を持つ一意のクライアント ID を受け取ります。(e.g.,
myContainer-0,myContainer-1, etc.)メトリクス : すべてのコンシューマースレッドからのメトリクスは集約され、
container.metrics()経由でアクセス可能ライフサイクル : すべてのコンシューマースレッドはユニットとして同時に開始および停止します
Work Distribution : Kafka ブローカーは、共有グループ内のすべてのコンシューマーインスタンス間でレコードの配布を処理します。
Explicit Acknowledgment : 各スレッドは独立してレコードの確認応答を管理します。あるスレッドで確認応答のないレコードが他のスレッドをブロックすることはない
明示的な確認応答による並行性
同時実行は明示的な確認応答モードでシームレスに機能します。各コンシューマースレッドは独立して自身のレコードを追跡し、確認応答します。
@KafkaListener(
topics = "order-queue",
containerFactory = "explicitShareKafkaListenerContainerFactory",
groupId = "order-processors",
concurrency = "5"
)
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
try {
// Process the order
processOrderLogic(record.value());
acknowledgment.acknowledge(); // ACCEPT
}
catch (RetryableException e) {
acknowledgment.release(); // Will be redelivered
}
catch (Exception e) {
acknowledgment.reject(); // Permanent failure
}
}レコードの取得と配布の動作: 共有コンシューマーはプルベースのモデルを採用しており、各コンシューマースレッドは
あるコンシューマーがレコードを取得している間は、他のコンシューマーはそれらのレコードを利用できません。取得ロックの有効期限が切れると、未確認のレコードは自動的に「利用可能」状態に戻り、他のコンシューマーに配信できるようになります。 ブローカーは、 並行性への影響:
構成:
|
アノテーション駆動型リスナー
@KafkaListener とシェアコンシューマー
ShareKafkaListenerContainerFactory を設定することで、@KafkaListener を共有コンシューマーで使用できます。
@Configuration
@EnableKafka
public class ShareConsumerConfig {
@Bean
public ShareConsumerFactory<String, String> shareConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultShareConsumerFactory<>(props);
}
@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
return new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
}
}次に、リスナー内でそれを使用します。
@Component
public class ShareMessageListener {
@KafkaListener(
topics = "my-queue-topic",
containerFactory = "shareKafkaListenerContainerFactory",
groupId = "my-share-group"
)
public void listen(ConsumerRecord<String, String> record) {
System.out.println("Received from queue: " + record.value());
// Record is automatically acknowledged with ACCEPT
}
}共有グループオフセットリセット
通常のコンシューマーグループとは異なり、共有グループではオフセットリセット動作に異なる設定が使用されます。この設定はプログラムで構成できます。
private void configureShareGroup(String bootstrapServers, String groupId) throws Exception {
Map<String, Object> adminProps = new HashMap<>();
adminProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
try (Admin admin = Admin.create(adminProps)) {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId);
ConfigEntry configEntry = new ConfigEntry("share.auto.offset.reset", "earliest");
Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of(
configResource, List.of(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET))
);
admin.incrementalAlterConfigs(configs).all().get();
}
}Record Acknowledgment
Share consumers support two acknowledgment modes that control how records are acknowledged after processing.
Implicit Acknowledgment (デフォルト)
In implicit mode, records are automatically acknowledged based on processing outcome:
Successful processing: Records are acknowledged as ACCEPT Processing errors: Records are acknowledged as REJECT
@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
// Implicit mode is the default - no additional configuration needed
return new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
}Explicit Acknowledgment
In explicit mode, the application must manually acknowledge each record using the provided ShareAcknowledgment.
There are two ways to configure explicit acknowledgment mode:
オプション 1: Using Kafka Client Configuration
@Bean
public ShareConsumerFactory<String, String> explicitShareConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit"); // Official Kafka client config
return new DefaultShareConsumerFactory<>(props);
}オプション 2: Using Spring Container Configuration
@Bean
public ShareKafkaListenerContainerFactory<String, String> explicitShareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ShareKafkaListenerContainerFactory<String, String> factory =
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
// Configure acknowledgment mode at container factory level
// true means explicit acknowledgment is required
factory.getContainerProperties().setExplicitShareAcknowledgment(true);
return factory;
}Configuration Precedence
When both configuration methods are used, Spring Kafka follows this precedence order (highest to lowest):
コンテナーのプロパティ :
containerProperties.setExplicitShareAcknowledgment(true/false)Consumer Config :
ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG("implicit" or "explicit")デフォルト :
false(implicit acknowledgment)
Acknowledgment Types
Share consumers support three acknowledgment types:
ACCEPT: Record processed successfully, mark as completed RELEASE: Temporary failure, make record available for redelivery REJECT: Permanent failure, do not retry
ShareAcknowledgment API
The ShareAcknowledgment interface provides methods for explicit acknowledgment:
public interface ShareAcknowledgment {
void acknowledge();
void release();
void reject();
}Listener Interfaces
Share consumers support specialized listener interfaces for different use cases:
Basic Message Listener
Use the standard MessageListener for simple cases:
@KafkaListener(topics = "my-topic", containerFactory = "shareKafkaListenerContainerFactory")
public void listen(ConsumerRecord<String, String> record) {
System.out.println("Received: " + record.value());
// Automatically acknowledged in implicit mode
}AcknowledgingShareConsumerAwareMessageListener
This interface provides access to the ShareConsumer instance with optional acknowledgment support. The acknowledgment parameter is nullable and depends on the container’s acknowledgment mode:
Implicit Mode Example (acknowledgment is null)
@KafkaListener(
topics = "my-topic",
containerFactory = "shareKafkaListenerContainerFactory" // Implicit mode by default
)
public void listen(ConsumerRecord<String, String> record,
@Nullable ShareAcknowledgment acknowledgment,
ShareConsumer<?, ?> consumer) {
// In implicit mode, acknowledgment is null
System.out.println("Received: " + record.value());
// Access consumer metrics if needed
Map<MetricName, ? extends Metric> metrics = consumer.metrics();
// Record is auto-acknowledged as ACCEPT on success, REJECT on error
}Explicit Mode Example (acknowledgment is non-null)
@Component
public class ExplicitAckListener {
@KafkaListener(
topics = "my-topic",
containerFactory = "explicitShareKafkaListenerContainerFactory"
)
public void listen(ConsumerRecord<String, String> record,
@Nullable ShareAcknowledgment acknowledgment,
ShareConsumer<?, ?> consumer) {
// In explicit mode, acknowledgment is non-null
try {
processRecord(record);
acknowledgment.acknowledge(); // ACCEPT
}
catch (RetryableException e) {
acknowledgment.release(); // Will be redelivered
}
catch (Exception e) {
acknowledgment.reject(); // Permanent failure
}
}
private void processRecord(ConsumerRecord<String, String> record) {
// Business logic here
}
}Acknowledgment Constraints
In explicit acknowledgment mode, the container enforces important constraints:
Poll Blocking: Subsequent polls are blocked until all records from the previous poll are acknowledged. One-time Acknowledgment: Each record can only be acknowledged once. Error Handling: If processing throws an exception, the record is automatically acknowledged as `REJECT`.
| In explicit mode, failing to acknowledge records will block further message processing. Always ensure records are acknowledged in all code paths. |
Acknowledgment Timeout Detection
To help identify missing acknowledgments, Spring Kafka provides configurable timeout detection. When a record is not acknowledged within the specified timeout, a warning is logged with details about the unacknowledged record.
@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ShareKafkaListenerContainerFactory<String, String> factory =
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
// Set acknowledgment timeout (default is 30 seconds)
factory.getContainerProperties().setShareAcknowledgmentTimeout(Duration.ofSeconds(30));
return factory;
}When a record exceeds the timeout, you’ll see a warning like:
WARN: Record not acknowledged within timeout (30 seconds). In explicit acknowledgment mode, you must call ack.acknowledge(), ack.release(), or ack.reject() for every record.
This feature helps developers quickly identify when acknowledgment calls are missing from their code, preventing the common issue of "Spring Kafka does not consume new records any more" due to forgotten acknowledgments.
Acknowledgment Examples
Mixed Acknowledgment Patterns
@KafkaListener(topics = "order-processing", containerFactory = "explicitShareKafkaListenerContainerFactory")
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
String orderId = record.key();
String orderData = record.value();
try {
if (isValidOrder(orderData)) {
if (processOrder(orderData)) {
acknowledgment.acknowledge(); // Success - ACCEPT
}
else {
acknowledgment.release(); // Temporary failure - retry later
}
}
else {
acknowledgment.reject(); // Invalid order - don't retry
}
}
catch (Exception e) {
// Exception automatically triggers REJECT
throw e;
}
}Conditional Acknowledgment
@KafkaListener(topics = "data-validation", containerFactory = "explicitShareKafkaListenerContainerFactory")
public void validateData(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
ValidationResult result = validator.validate(record.value());
switch (result.getStatus()) {
case VALID:
acknowledgment.acknowledge(AcknowledgeType.ACCEPT);
break;
case INVALID_RETRYABLE:
acknowledgment.acknowledge(AcknowledgeType.RELEASE);
break;
case INVALID_PERMANENT:
acknowledgment.acknowledge(AcknowledgeType.REJECT);
break;
}
}Poison Message Protection and Delivery Count
KIP-932 includes broker-side poison message protection to prevent unprocessable records from being endlessly redelivered.
使い方
Every time a record is acquired by a consumer in a share group, the broker increments an internal delivery count. The first acquisition sets the delivery count to 1, and each subsequent acquisition increments it. When the delivery count reaches the configured limit (default: 5), the record moves to Archived state and is not eligible for additional delivery attempts.
Delivery Count is Not Exposed to Applications The delivery count is maintained internally by the broker and is not exposed to consumer applications. This is an intentional design decision in KIP-932. The delivery count is approximate and serves as a poison message protection mechanism, not a precise redelivery counter. Applications cannot query or access this value through any API. |
For application-level retry logic, use the acknowledgment types:
RELEASE- Make record available for redelivery (contributes to delivery count)REJECT- Mark as permanently failed (does not cause redelivery)ACCEPT- Successfully processed (does not cause redelivery)
The broker automatically prevents endless redelivery once group.share.delivery.count.limit is reached, moving the record to Archived state.
Retry Strategy Recommendations
Here is an example of how to use the various acknowledgement types based exception types.
@KafkaListener(topics = "orders", containerFactory = "explicitShareKafkaListenerContainerFactory")
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment ack) {
try {
// Attempt to process the order
orderService.process(record.value());
ack.acknowledge(); // ACCEPT - successfully processed
}
catch (TransientException e) {
// Temporary failure (network issue, service unavailable, etc.)
// Release the record for redelivery
// Broker will retry up to group.share.delivery.count.limit times
logger.warn("Transient error processing order, will retry: {}", e.getMessage());
ack.release(); // RELEASE - make available for retry
}
catch (ValidationException e) {
// Permanent semantic error (invalid data format, business rule violation, etc.)
// Do not retry - this record will never succeed
logger.error("Invalid order data, rejecting: {}", e.getMessage());
ack.reject(); // REJECT - permanent failure, do not retry
}
catch (Exception e) {
// Unknown error - typically safer to reject to avoid infinite loops
// But could also release if you suspect it might be transient
logger.error("Unexpected error processing order, rejecting: {}", e.getMessage());
ack.reject(); // REJECT - avoid poison message loops
}
}The broker’s poison message protection ensures that even if you always use RELEASE for errors, records won’t be retried endlessly. They will automatically be archived after reaching the delivery attempt limit.
Differences from Regular Consumers
Share consumers differ from regular consumers in several key ways:
No Partition Assignment : Share consumers cannot be assigned specific partitions
No Topic Patterns : Share consumers do not support subscribing to topic patterns
Cooperative Consumption : Multiple consumers in the same share group can consume from the same partitions simultaneously
Record-Level Acknowledgment : Supports explicit acknowledgment with
ACCEPT、RELEASE、REJECTtypesDifferent Group Management : Share groups use different coordinator protocols
No Batch Processing : Share consumers process records individually, not in batches
Broker-Side Retry Management : Delivery count tracking and poison message protection are managed by the broker, not exposed to applications
Limitations and Considerations
Current Limitations
In preview : This feature is in preview mode and may change in future versions
No Message Converters : Message converters are not yet supported for share consumers
No Batch Listeners : Batch processing is not supported with share consumers
Poll Constraints : In explicit acknowledgment mode, unacknowledged records block subsequent polls within each consumer thread