クラス MessagingMessageConverter

java.lang.ObjectSE
org.springframework.kafka.support.converter.MessagingMessageConverter
実装されているすべてのインターフェース:
MessageConverterRecordMessageConverter
既知の直属サブクラス
JacksonJsonMessageConverterJacksonProjectingMessageConverterJsonMessageConverterProjectingMessageConverter

public class MessagingMessageConverter extends ObjectSE implements RecordMessageConverter
個々のメッセージを受信するメッセージリスナーのメッセージング MessageConverter 実装。

返されるメッセージに ConsumerRecord に基づいて KafkaHeaders を移入します。

作成者:
Marius Bogoevici, Gary Russell, Dariusz Szablinski, Biju Kunjummen, Soby Chacko
  • フィールドの詳細

  • コンストラクターの詳細

    • MessagingMessageConverter

      public MessagingMessageConverter()
      KafkaHeaders.PARTITION を使用してターゲットパーティションを決定するインスタンスを構築します。
    • MessagingMessageConverter

      public MessagingMessageConverter(FunctionSE<Message<?>, @Nullable IntegerSE> partitionProvider)
      提供されたパーティションプロバイダー関数を使用するインスタンスを構築します。この関数は null を返して、パーティションの選択を Kafka クライアントに委譲できます。
      パラメーター:
      partitionProvider - プロバイダー。
      導入:
      3.0.8
  • メソッドの詳細

    • setGenerateMessageId

      public void setGenerateMessageId(boolean generateMessageId)
      作成されたメッセージに対して Message ids を生成します。false に設定すると、デフォルト値を使用しようとします。デフォルトでは false に設定されています。
      パラメーター:
      generateMessageId - メッセージ ID を生成する必要がある場合は true
    • setGenerateTimestamp

      public void setGenerateTimestamp(boolean generateTimestamp)
      生成されたメッセージに対して timestamp を生成します。false に設定すると、代わりに -1 が使用されます。デフォルトでは false に設定されています。
      パラメーター:
      generateTimestamp - タイムスタンプを生成する必要がある場合は true
    • setHeaderMapper

      public void setHeaderMapper(KafkaHeaderMapper headerMapper)
      ヘッダーマッパーを設定してヘッダーをマップします。
      パラメーター:
      headerMapper - マッパー。
      導入:
      1.3
    • setRawRecordHeader

      public void setRawRecordHeader(boolean rawRecordHeader)
      true に設定すると、生の ConsumerRecord がヘッダー KafkaHeaders.RAW_DATA として追加されます。
      パラメーター:
      rawRecordHeader - true の場合、ヘッダーを追加します。
      導入:
      2.7
    • getMessagingConverter

      protected MessageConverter getMessagingConverter()
    • setMessagingConverter

      public void setMessagingConverter(@Nullable SmartMessageConverter messagingConverter)
      spring-messaging SmartMessageConverter を設定して、レコード値を目的の型に変換します。これにより、受信にマップされるときに MessageHeaders.CONTENT_TYPE が文字列に変換されます。

      IMPORTANT: このコンバーターの fromMessage(Message, String) メソッドは、ProducerRecord.value() プロパティにメッセージペイロードを含む ProducerRecord への送信変換のために呼び出されます。toMessage(ConsumerRecord, Object, Object, Type) は、ペイロードが ConsumerRecord.value() プロパティである ConsumerRecord からの受信変換のために呼び出されます。

      MessageConverter.toMessage(Object, MessageHeaders) メソッドは、fromMessage(Message, String) に渡された Message から新しい送信 Message を作成するために呼び出されます。同様に、toMessage(ConsumerRecord, Acknowledgment, Consumer, Type) では、このコンバーターが ConsumerRecord から新しい Message を作成した後、MessageConverter.fromMessage(Message, Class) メソッドが呼び出され、最終的な受信メッセージが新しく変換されたペイロードで作成されます。

      どちらの場合でも、SmartMessageConverter が null を返す場合は、元のメッセージが使用されます。

      パラメーター:
      messagingConverter - コンバーター。
      導入:
      2.7.1
    • toMessage

      public Message<?> toMessage(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, @Nullable ObjectSE acknowledgment, @Nullable ObjectSE consumer, @Nullable TypeSE type)
      インターフェースからコピーされた説明: RecordMessageConverter
      ConsumerRecord を Message に変換します。
      次で指定:
      インターフェース RecordMessageConvertertoMessage 
      パラメーター:
      record - レコード。
      acknowledgment - the acknowledgment (can be Acknowledgment or ShareAcknowledgment).
      consumer - the consumer (can be Consumer or ShareConsumer).
      type - 必要なペイロード型。
      戻り値:
      メッセージ。
    • fromMessage

      public org.apache.kafka.clients.producer.ProducerRecord<?,?> fromMessage(Message<?> messageArg, @Nullable StringSE defaultTopic)
      インターフェースからコピーされた説明: RecordMessageConverter
      メッセージをプロデューサーレコードに変換します。
      次で指定:
      インターフェース RecordMessageConverterfromMessage 
      パラメーター:
      messageArg - メッセージ。
      defaultTopic - ヘッダーが見つからない場合に使用するデフォルトのトピック。
      戻り値:
      プロデューサーの記録。
    • initialRecordHeaders

      protected org.apache.kafka.common.header.Headers initialRecordHeaders(Message<?> message)
      サブクラスは、マップされる前に追加のヘッダーを設定できます。
      パラメーター:
      message - メッセージ。
      戻り値:
      ヘッダー
      導入:
      2.1
    • convertPayload

      protected @Nullable ObjectSE convertPayload(Message<?> message)
      サブクラスはペイロードを変換できます。デフォルトでは、変更されずに Kafka に送信されます。
      パラメーター:
      message - メッセージ。
      戻り値:
      ペイロード。
    • extractAndConvertValue

      protected ObjectSE extractAndConvertValue(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, @Nullable TypeSE type)
      サブクラスは値を変換できます。デフォルトでは、変換できる SmartMessageConverter がない限り、Kafka によって提供されるものとして返されます。
      パラメーター:
      record - レコード。
      type - 必要な型。
      戻り値:
      値。