クラス ReactiveKafkaConsumerTemplate<K,V>

java.lang.ObjectSE
org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate<K,V>
型パラメーター:
K - 鍵の型。
V - 値の型。

public class ReactiveKafkaConsumerTemplate<K,V> extends ObjectSE
リアクティブ kafka コンシューマー操作の実装。
導入:
2.3.0
作成者:
Mark Norkin, Adrian Chlebosz, Marcus Voltolim
  • コンストラクターのサマリー

    コンストラクター
    コンストラクター
    説明
    ReactiveKafkaConsumerTemplate(reactor.kafka.receiver.KafkaReceiver<K,V> kafkaReceiver)
    ReactiveKafkaConsumerTemplate(reactor.kafka.receiver.ReceiverOptions<K,V> receiverOptions)
  • メソッドのサマリー

    修飾子と型
    メソッド
    説明
    reactor.core.publisher.Flux<org.apache.kafka.common.TopicPartition>
    reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,LongSE>>
    beginningOffsets(org.apache.kafka.common.TopicPartition... partitions)
    reactor.core.publisher.Mono<MapSE<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata>>
    committed(SetSE<org.apache.kafka.common.TopicPartition> partitions)
    <T> reactor.core.publisher.Mono<T>
    doOnConsumer(FunctionSE<org.apache.kafka.clients.consumer.Consumer<K,V>,? extends T> function)
    reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,LongSE>>
    endOffsets(org.apache.kafka.common.TopicPartition... partitions)
    reactor.core.publisher.Flux<reactor.util.function.Tuple2<StringSE,ListSE<org.apache.kafka.common.PartitionInfo>>>
    reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.MetricName,? extends org.apache.kafka.common.Metric>>
    reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndTimestamp>>
    offsetsForTimes(MapSE<org.apache.kafka.common.TopicPartition,LongSE> timestampsToSearch)
    reactor.core.publisher.Flux<org.apache.kafka.common.PartitionInfo>
    reactor.core.publisher.Mono<VoidSE>
    pause(org.apache.kafka.common.TopicPartition... partitions)
    reactor.core.publisher.Flux<org.apache.kafka.common.TopicPartition>
    reactor.core.publisher.Mono<LongSE>
    position(org.apache.kafka.common.TopicPartition partition)
    reactor.core.publisher.Flux<reactor.kafka.receiver.ReceiverRecord<K,V>>
    reactor.core.publisher.Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>>
    reactor.core.publisher.Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>>
    reactor.core.publisher.Flux<reactor.core.publisher.Flux<reactor.kafka.receiver.ReceiverRecord<K,V>>>
    reactor.core.publisher.Flux<reactor.core.publisher.Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>>>
    receiveExactlyOnce(reactor.kafka.sender.TransactionManager transactionManager)
    一度だけの配信セマンティクスに使用できるコンシューマーレコードバッチの Flux を返します。
    reactor.core.publisher.Mono<VoidSE>
    resume(org.apache.kafka.common.TopicPartition... partitions)
    reactor.core.publisher.Mono<VoidSE>
    seek(org.apache.kafka.common.TopicPartition partition, long offset)
    reactor.core.publisher.Mono<VoidSE>
    seekToBeginning(org.apache.kafka.common.TopicPartition... partitions)
    reactor.core.publisher.Mono<VoidSE>
    seekToEnd(org.apache.kafka.common.TopicPartition... partitions)
    reactor.core.publisher.Flux<StringSE>

    クラス java.lang.ObjectSE から継承されたメソッド

    clone, equalsSE, finalize, getClass, hashCode, notify, notifyAll, toString, wait, waitSE, waitSE
  • コンストラクターの詳細

    • ReactiveKafkaConsumerTemplate

      public ReactiveKafkaConsumerTemplate(reactor.kafka.receiver.ReceiverOptions<K,V> receiverOptions)
    • ReactiveKafkaConsumerTemplate

      public ReactiveKafkaConsumerTemplate(reactor.kafka.receiver.KafkaReceiver<K,V> kafkaReceiver)
  • メソッドの詳細

    • receive

      public reactor.core.publisher.Flux<reactor.kafka.receiver.ReceiverRecord<K,V>> receive()
    • receiveBatch

      public reactor.core.publisher.Flux<reactor.core.publisher.Flux<reactor.kafka.receiver.ReceiverRecord<K,V>>> receiveBatch()
    • receiveAutoAck

      public reactor.core.publisher.Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>> receiveAutoAck()
    • receiveAtMostOnce

      public reactor.core.publisher.Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>> receiveAtMostOnce()
    • receiveExactlyOnce

      public reactor.core.publisher.Flux<reactor.core.publisher.Flux<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>>> receiveExactlyOnce(reactor.kafka.sender.TransactionManager transactionManager)
      一度だけの配信セマンティクスに使用できるコンシューマーレコードバッチの Flux を返します。内部フラックスごとに新しいトランザクションが開始され、フラックスの処理後に TransactionManager.commit() または TransactionManager.abort() を使用してトランザクションをコミットまたは中止するのは、消費側アプリケーションの責任です。コンシューマーレコードの次のバッチは、前のフラックスが終了した後にのみ配信されます。各内部フラックスにディスパッチされたレコードのオフセットは、そのフラックスに対して開始されたトランザクション内で提供された transactionManager を使用してコミットされます。

      使用例:

       
       KafkaSender<Integer, Person> sender = sender(senderOptions());
       ReceiverOptions<Integer, Person> receiverOptions = receiverOptions(Collections.singleton(sourceTopic));
       KafkaReceiver<Integer, Person> receiver = KafkaReceiver.create(receiverOptions);
       receiver.receiveExactlyOnce(sender.transactionManager())
       	 .concatMap(f -> sendAndCommit(f))
      	 .onErrorResume(e -> sender.transactionManager().abort().then(Mono.error(e)))
      	 .doOnCancel(() -> close());
      
       Flux<SenderResult<Integer>> sendAndCommit(Flux<ConsumerRecord<Integer, Person>> flux) {
       	return sender.send(flux.map(r -> SenderRecord.<Integer, Person, Integer>create(transform(r.value()), r.key())))
      			.concatWith(sender.transactionManager().commit());
       }
       
       
      パラメーター:
      transactionManager - トランザクションマネージャーは、各内部フラックスの新しいトランザクションを開始し、そのトランザクション内でオフセットをコミットするために使用されていました
      戻り値:
      トランザクション内で処理されるコンシューマーレコードバッチのフラックス
    • doOnConsumer

      public <T> reactor.core.publisher.Mono<T> doOnConsumer(FunctionSE<org.apache.kafka.clients.consumer.Consumer<K,V>,? extends T> function)
    • assignment

      public reactor.core.publisher.Flux<org.apache.kafka.common.TopicPartition> assignment()
    • subscription

      public reactor.core.publisher.Flux<StringSE> subscription()
    • seek

      public reactor.core.publisher.Mono<VoidSE> seek(org.apache.kafka.common.TopicPartition partition, long offset)
    • seekToBeginning

      public reactor.core.publisher.Mono<VoidSE> seekToBeginning(org.apache.kafka.common.TopicPartition... partitions)
    • seekToEnd

      public reactor.core.publisher.Mono<VoidSE> seekToEnd(org.apache.kafka.common.TopicPartition... partitions)
    • position

      public reactor.core.publisher.Mono<LongSE> position(org.apache.kafka.common.TopicPartition partition)
    • committed

      public reactor.core.publisher.Mono<MapSE<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata>> committed(SetSE<org.apache.kafka.common.TopicPartition> partitions)
    • partitionsFromConsumerFor

      public reactor.core.publisher.Flux<org.apache.kafka.common.PartitionInfo> partitionsFromConsumerFor(StringSE topic)
    • paused

      public reactor.core.publisher.Flux<org.apache.kafka.common.TopicPartition> paused()
    • pause

      public reactor.core.publisher.Mono<VoidSE> pause(org.apache.kafka.common.TopicPartition... partitions)
    • resume

      public reactor.core.publisher.Mono<VoidSE> resume(org.apache.kafka.common.TopicPartition... partitions)
    • metricsFromConsumer

      public reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.MetricName,? extends org.apache.kafka.common.Metric>> metricsFromConsumer()
    • listTopics

      public reactor.core.publisher.Flux<reactor.util.function.Tuple2<StringSE,ListSE<org.apache.kafka.common.PartitionInfo>>> listTopics()
    • offsetsForTimes

      public reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndTimestamp>> offsetsForTimes(MapSE<org.apache.kafka.common.TopicPartition,LongSE> timestampsToSearch)
    • beginningOffsets

      public reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,LongSE>> beginningOffsets(org.apache.kafka.common.TopicPartition... partitions)
    • endOffsets

      public reactor.core.publisher.Flux<reactor.util.function.Tuple2<org.apache.kafka.common.TopicPartition,LongSE>> endOffsets(org.apache.kafka.common.TopicPartition... partitions)