クラス ReactiveKafkaConsumerTemplate<K,V>
java.lang.ObjectSE
org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate<K,V>
- 型パラメーター:
K
- 鍵の型。V
- 値の型。
リアクティブ kafka コンシューマー操作の実装。
- 導入:
- 2.3.0
- 作成者:
- Mark Norkin, Adrian Chlebosz, Marcus Voltolim
コンストラクターのサマリー
コンストラクターコンストラクター説明ReactiveKafkaConsumerTemplate
(reactor.kafka.receiver.KafkaReceiver<K, V> kafkaReceiver) ReactiveKafkaConsumerTemplate
(reactor.kafka.receiver.ReceiverOptions<K, V> receiverOptions) メソッドのサマリー
修飾子と型メソッド説明reactor.core.publisher.Flux<org.apache.kafka.common.TopicPartition>
reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,
LongSE>> beginningOffsets
(org.apache.kafka.common.TopicPartition... partitions) reactor.core.publisher.Mono<MapSE<org.apache.kafka.common.TopicPartition,
org.apache.kafka.clients.consumer.OffsetAndMetadata>> <T> reactor.core.publisher.Mono<T>
doOnConsumer
(FunctionSE<org.apache.kafka.clients.consumer.Consumer<K, V>, ? extends T> function) reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,
LongSE>> endOffsets
(org.apache.kafka.common.TopicPartition... partitions) reactor.core.publisher.Flux<reactor.util.function.Tuple2<StringSE,
ListSE<org.apache.kafka.common.PartitionInfo>>> reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.MetricName,
? extends org.apache.kafka.common.Metric>> reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,
org.apache.kafka.clients.consumer.OffsetAndTimestamp>> offsetsForTimes
(MapSE<org.apache.kafka.common.TopicPartition, LongSE> timestampsToSearch) reactor.core.publisher.Flux<org.apache.kafka.common.PartitionInfo>
reactor.core.publisher.Mono<VoidSE>
pause
(org.apache.kafka.common.TopicPartition... partitions) reactor.core.publisher.Flux<org.apache.kafka.common.TopicPartition>
paused()
reactor.core.publisher.Mono<LongSE>
position
(org.apache.kafka.common.TopicPartition partition) receive()
reactor.core.publisher.Flux<reactor.core.publisher.Flux<reactor.kafka.receiver.ReceiverRecord<K,
V>>> reactor.core.publisher.Flux<reactor.core.publisher.Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,
V>>> receiveExactlyOnce
(reactor.kafka.sender.TransactionManager transactionManager) 一度だけの配信セマンティクスに使用できるコンシューマーレコードバッチのFlux
を返します。reactor.core.publisher.Mono<VoidSE>
resume
(org.apache.kafka.common.TopicPartition... partitions) reactor.core.publisher.Mono<VoidSE>
seek
(org.apache.kafka.common.TopicPartition partition, long offset) reactor.core.publisher.Mono<VoidSE>
seekToBeginning
(org.apache.kafka.common.TopicPartition... partitions) reactor.core.publisher.Mono<VoidSE>
seekToEnd
(org.apache.kafka.common.TopicPartition... partitions) reactor.core.publisher.Flux<StringSE>
コンストラクターの詳細
ReactiveKafkaConsumerTemplate
ReactiveKafkaConsumerTemplate
メソッドの詳細
receive
receiveBatch
receiveAutoAck
receiveAtMostOnce
receiveExactlyOnce
public reactor.core.publisher.Flux<reactor.core.publisher.Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>>> receiveExactlyOnce(reactor.kafka.sender.TransactionManager transactionManager) 一度だけの配信セマンティクスに使用できるコンシューマーレコードバッチのFlux
を返します。内部フラックスごとに新しいトランザクションが開始され、フラックスの処理後にTransactionManager.commit()
またはTransactionManager.abort()
を使用してトランザクションをコミットまたは中止するのは、消費側アプリケーションの責任です。コンシューマーレコードの次のバッチは、前のフラックスが終了した後にのみ配信されます。各内部フラックスにディスパッチされたレコードのオフセットは、そのフラックスに対して開始されたトランザクション内で提供されたtransactionManager
を使用してコミットされます。使用例:
KafkaSender<Integer, Person> sender = sender(senderOptions()); ReceiverOptions<Integer, Person> receiverOptions = receiverOptions(Collections.singleton(sourceTopic)); KafkaReceiver<Integer, Person> receiver = KafkaReceiver.create(receiverOptions); receiver.receiveExactlyOnce(sender.transactionManager()) .concatMap(f -> sendAndCommit(f)) .onErrorResume(e -> sender.transactionManager().abort().then(Mono.error(e))) .doOnCancel(() -> close()); Flux<SenderResult<Integer>> sendAndCommit(Flux<ConsumerRecord<Integer, Person>> flux) { return sender.send(flux.map(r -> SenderRecord.<Integer, Person, Integer>create(transform(r.value()), r.key()))) .concatWith(sender.transactionManager().commit()); }
- パラメーター:
transactionManager
- トランザクションマネージャーは、各内部フラックスの新しいトランザクションを開始し、そのトランザクション内でオフセットをコミットするために使用されていました- 戻り値:
- トランザクション内で処理されるコンシューマーレコードバッチのフラックス
doOnConsumer
public <T> reactor.core.publisher.Mono<T> doOnConsumer(FunctionSE<org.apache.kafka.clients.consumer.Consumer<K, V>, ? extends T> function) assignment
public reactor.core.publisher.Flux<org.apache.kafka.common.TopicPartition> assignment()subscription
seek
public reactor.core.publisher.Mono<VoidSE> seek(org.apache.kafka.common.TopicPartition partition, long offset) seekToBeginning
public reactor.core.publisher.Mono<VoidSE> seekToBeginning(org.apache.kafka.common.TopicPartition... partitions) seekToEnd
public reactor.core.publisher.Mono<VoidSE> seekToEnd(org.apache.kafka.common.TopicPartition... partitions) position
public reactor.core.publisher.Mono<LongSE> position(org.apache.kafka.common.TopicPartition partition) committed
partitionsFromConsumerFor
public reactor.core.publisher.Flux<org.apache.kafka.common.PartitionInfo> partitionsFromConsumerFor(StringSE topic) paused
public reactor.core.publisher.Flux<org.apache.kafka.common.TopicPartition> paused()pause
public reactor.core.publisher.Mono<VoidSE> pause(org.apache.kafka.common.TopicPartition... partitions) resume
public reactor.core.publisher.Mono<VoidSE> resume(org.apache.kafka.common.TopicPartition... partitions) metricsFromConsumer
public reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.MetricName,? extends org.apache.kafka.common.Metric>> metricsFromConsumer()listTopics
offsetsForTimes
beginningOffsets
public reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,LongSE>> beginningOffsets(org.apache.kafka.common.TopicPartition... partitions) endOffsets
public reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,LongSE>> endOffsets(org.apache.kafka.common.TopicPartition... partitions)