アノテーションインターフェース KafkaListener


指定されたトピックで Kafka メッセージリスナーのターゲットとなるメソッドをマークするアノテーション。containerFactory() は、Kafka リスナーコンテナーの構築に使用する KafkaListenerContainerFactory を識別します。設定されていない場合、構成によって明示的なデフォルトが提供されていない限り、デフォルトのコンテナーファクトリは kafkaListenerContainerFactory の Bean 名で使用可能であると見なされます。

@KafkaListener アノテーションの処理は、KafkaListenerAnnotationBeanPostProcessor を登録することによって実行されます。これは手動で行うことも、EnableKafka アノテーションを介して行うのがより便利です。

アノテーション付きのメソッドは、MessageMapping が提供するものと同様の柔軟な署名を持つことができます。つまり

  • 生の Kafka メッセージにアクセスするための ConsumerRecord 
  • Acknowledgment を手動で確認する
  • @Payload - 検証のサポートを含むアノテーション付きメソッド引数
  • @Header -KafkaHeaders で定義された、特定のヘッダー値を抽出するためのアノテーション付きメソッド引数
  • @Headers - すべてのヘッダーにアクセスするために MapSE にも割り当て可能でなければならないアノテーション付き引数。
  • すべてのヘッダーにアクセスするための MessageHeaders 引数。
  • すべてのメソッド引数への便利なアクセスのための MessageHeaderAccessor

メソッドレベルで定義すると、メソッドごとにリスナーコンテナーが作成されます。MessageListenerMessagingMessageListenerAdapter であり、MethodKafkaListenerEndpoint で構成されています。

クラスレベルで定義されている場合、単一のメッセージリスナーコンテナーを使用して、@KafkaHandler アノテーションが付けられたすべてのメソッドにサービスを提供します。このようなアノテーション付きメソッドのメソッドシグネチャーは、特定の受信メッセージに対して単一のメソッドを解決できるようなあいまいさを引き起こしてはなりません。MessagingMessageListenerAdapterMultiMethodKafkaListenerEndpoint で構成されます。

作成者:
Gary Russell, Venil Noronha
関連事項:
  • オプション要素のサマリー

    オプション要素
    修飾子と型
    オプションの要素
    説明
    コンテナーファクトリのデフォルト設定を上書きするには、true または false に設定します。
    コンテナーファクトリの batchListener プロパティをオーバーライドします。
    このアノテーション内の SpEL 式で使用される疑似 Bean 名は、このリスナーが定義されている現在の Bean を参照します。
    提供されると、コンシューマーファクトリ構成のクライアント ID プロパティをオーバーライドします。
    このリスナーのコンテナーファクトリの concurrency 設定を上書きします。
    このエンドポイントのサービスを担当するメッセージリスナーコンテナーを作成するために使用する KafkaListenerContainerFactory の Bean 名。
    提供されている場合、このリスナーのリスナーコンテナーは、この値を名前として Collection<MessageListenerContainer> 型の Bean に追加されます。
    ContainerPostProcessor の Bean 名を設定して、コンテナーの作成と構成後にコンテナーをカスタマイズできるようにします。
    必要な型への変換を実行するために MessageHeaders.CONTENT_TYPE ヘッダーと組み合わせて使用する SmartMessageConverter ( CompositeMessageConverter など) の Bean 名を設定します。
    リスナーメソッドが例外をスローした場合に呼び出す KafkaListenerErrorHandler Bean 名を設定します。
    RecordFilterStrategy Bean 名を設定して、コンテナーファクトリで構成された戦略をオーバーライドします。
    コンシューマーファクトリの group.id プロパティを、このリスナーに対してのみこの値でオーバーライドします。
    このリスナーのコンテナーの一意の識別子。
    boolean
    groupId が提供されていない場合は、コンシューマーの group.id プロパティとして id(提供されている場合)を使用します。
    キー KafkaHeaders.LISTENER_INFO のヘッダーとして追加される静的情報。
    Kafka コンシューマープロパティ ; それらは、コンシューマーファクトリで定義された同じ名前のプロパティに優先します(コンシューマーファクトリがプロパティのオーバーライドをサポートしている場合)。
    boolean
    false で戻り値の型が IterableSE の場合、各要素の個々のレコードではなく、単一の応答レコードの値として結果が返されます。
    手動のトピック / パーティション割り当てを使用する場合のこのリスナーの topicPartitions。
    このリスナーのトピックパターン。
    このリスナーのトピック。
  • 要素の詳細

    • id

      このリスナーのコンテナーの一意の識別子。

      何も指定されていない場合、自動生成された ID が使用されます。

      メモ: 指定すると、idIsGroup() が false に設定されているか、groupId() が指定されていない限り、この値はコンシューマーファクトリ構成のグループ ID プロパティをオーバーライドします。

      SpEL #{...} およびプロパティプレースホルダー ${...} がサポートされています。

      戻り値:
      このエンドポイントを管理するコンテナーの id
      関連事項:
      デフォルト:
      ""
    • containerFactory

      StringSE containerFactory
      このエンドポイントのサービスを担当するメッセージリスナーコンテナーを作成するために使用する KafkaListenerContainerFactory の Bean 名。

      指定しない場合、デフォルトのコンテナーファクトリが使用されます(存在する場合)。SpEL 式が提供されている場合(#{...})、式はコンテナーファクトリインスタンスまたは Bean 名のいずれかに評価できます。

      戻り値:
      コンテナーファクトリ Bean 名。
      デフォルト:
      ""
    • topics

      StringSE[] topics
      このリスナーのトピック。エントリは、「トピック名」、「プロパティプレースホルダーキー」、「式」にすることができます。式はトピック名に解決する必要があります。これはグループ管理を使用し、Kafka はグループメンバーにパーティションを割り当てます。

      topicPattern() および topicPartitions() と相互に排他的です。

      戻り値:
      聞くトピック名または表現(SpEL)。
      デフォルト:
      {}
    • topicPattern

      StringSE topicPattern
      このリスナーのトピックパターン。エントリは、「トピックパターン」、「プロパティプレースホルダーキー」、「式」にすることができます。フレームワークは、指定されたパターンに一致するすべてのトピックにサブスクライブするコンテナーを作成して、動的に割り当てられたパーティションを取得します。チェック時に存在するトピックに対して定期的にパターンマッチングを行います。式はトピックパターンに解決する必要があります(文字列またはパターンの結果型がサポートされています)。これはグループ管理を使用し、Kafka はグループメンバーにパーティションを割り当てます。

      topics() および topicPartitions() と相互に排他的です。

      戻り値:
      トピックパターンまたは表現(SpEL)。
      関連事項:
      • CommonClientConfigs.METADATA_MAX_AGE_CONFIG
      デフォルト:
      ""
    • topicPartitions

      TopicPartition[] topicPartitions
      手動のトピック / パーティション割り当てを使用する場合のこのリスナーの topicPartitions。

      topicPattern() および topics() と相互に排他的です。

      戻り値:
      聞くトピック名または表現(SpEL)。
      デフォルト:
      {}
    • containerGroup

      StringSE containerGroup
      提供されている場合、このリスナーのリスナーコンテナーは、この値を名前として Collection<MessageListenerContainer> 型の Bean に追加されます。これにより、たとえば、コレクションを反復処理して、コンテナーのサブセットを開始 / 停止できます。Collection Bean は、バージョン 2.7.3 で非推奨になり、2.8 で削除される予定です。代わりに、名前が containerGroup + ".group" で型が ContainerGroup の Bean を使用する必要があります。

      SpEL #{...} およびプロパティプレースホルダー ${...} がサポートされています。

      戻り値:
      グループの Bean 名。
      デフォルト:
      ""
    • errorHandler

      StringSE errorHandler
      リスナーメソッドが例外をスローした場合に呼び出す KafkaListenerErrorHandler Bean 名を設定します。SpEL 式が提供されている場合(#{...})、式は KafkaListenerErrorHandler インスタンスまたは Bean 名のいずれかに評価できます。
      戻り値:
      エラーハンドラー。
      導入:
      1.3
      デフォルト:
      ""
    • groupId

      StringSE groupId
      コンシューマーファクトリの group.id プロパティを、このリスナーに対してのみこの値でオーバーライドします。

      SpEL #{...} およびプロパティプレースホルダー ${...} がサポートされています。

      戻り値:
      グループ ID。
      導入:
      1.3
      デフォルト:
      ""
    • idIsGroup

      boolean idIsGroup
      groupId が提供されていない場合は、コンシューマーの group.id プロパティとして id(提供されている場合)を使用します。コンシューマーファクトリの group.id を使用するには、false に設定します。
      戻り値:
      無効にする場合は false。
      導入:
      1.3
      デフォルト:
      true
    • clientIdPrefix

      StringSE clientIdPrefix
      提供されると、コンシューマーファクトリ構成のクライアント ID プロパティをオーバーライドします。同時実行性が使用されるときに一意性を確保するために、各コンテナーインスタンスにサフィックス('-n')が追加されます。

      SpEL #{...} およびプロパティプレースホルダー ${...} がサポートされています。

      戻り値:
      クライアント ID プレフィックス。
      導入:
      2.1.1
      デフォルト:
      ""
    • beanRef

      StringSE beanRef
      このアノテーション内の SpEL 式で使用される疑似 Bean 名は、このリスナーが定義されている現在の Bean を参照します。これにより、囲んでいる Bean 内のプロパティとメソッドにアクセスできます。デフォルトの "__listener"。

      サンプル: topics = "#{__listener.topicList}".

      戻り値:
      疑似 Bean 名。
      導入:
      2.1.2
      デフォルト:
      "__ リスナー "
    • concurrency

      StringSE concurrency
      このリスナーのコンテナーファクトリの concurrency 設定をオーバーライドします。NumberSE に評価されるプロパティプレースホルダーまたは SpEL 式の場合があります。この場合、値を取得するために Number.intValue() が使用されます。

      SpEL #{...} およびプロパティプレースホルダー ${...} がサポートされています。

      戻り値:
      並行性。
      導入:
      2.2
      デフォルト:
      ""
    • autoStartup

      StringSE autoStartup
      true または false に設定して、コンテナーファクトリのデフォルト設定をオーバーライドします。BooleanSE または StringSE に評価されるプロパティプレースホルダーまたは SpEL 式の場合があります。この場合、値を取得するために Boolean.parseBoolean(String)SE が使用されます。

      SpEL #{...} およびプロパティプレースホルダー ${...} がサポートされています。

      戻り値:
      自動起動の場合は true、自動起動しない場合は false。
      導入:
      2.2
      デフォルト:
      ""
    • properties

      StringSE[] properties
      Kafka コンシューマープロパティ ; それらは、コンシューマーファクトリで定義された同じ名前のプロパティに優先します(コンシューマーファクトリがプロパティのオーバーライドをサポートしている場合)。

      サポートされている構文

      キーと値のペアでサポートされている構文は、JavaプロパティファイルSEのエントリに対して定義されている構文と同じです。

      • key=value
      • key:value
      • key value
      group.id と client.id は無視されます。

      SpEL #{...} とプロパティプレースホルダー ${...} がサポートされています。SpEL 式は、配列またはコレクションの各メンバーが上記の形式のプロパティ名 + 値である StringSE、@{link String[]} または Collection<String> に解決される必要があります。

      戻り値:
      プロパティ。
      導入:
      2.2.4
      関連事項:
      デフォルト:
      {}
    • splitIterables

      boolean splitIterables
      false で戻り値の型が IterableSE の場合、各要素の個々のレコードではなく、単一の応答レコードの値として結果が返されます。デフォルトは真です。応答の型が Iterable<Message<?>> の場合は無視されます。
      戻り値:
      単一の応答レコードを作成する場合は false。
      導入:
      2.3.5
      デフォルト:
      true
    • contentTypeConverter

      StringSE contentTypeConverter
      必要な型への変換を実行するために MessageHeaders.CONTENT_TYPE ヘッダーと組み合わせて使用する SmartMessageConverter ( CompositeMessageConverter など) の Bean 名を設定します。SpEL 式が指定されている場合 (#{...})、式は SmartMessageConverter インスタンスまたは Bean 名に評価されます。
      戻り値:
      Bean 名。
      導入:
      2.7.1
      デフォルト:
      ""
    • batch

      StringSE batch
      コンテナーファクトリの batchListener プロパティをオーバーライドします。リスナーメソッドのシグネチャーは List<?> を受け取る必要があります。リファレンスドキュメントを参照してください。これにより、単一のコンテナーファクトリをレコードリスナーとバッチリスナーの両方に使用できます。以前は別々のコンテナーファクトリが必要でした。
      戻り値:
      アノテーション付きメソッドがバッチリスナーの場合は "true"、レコードリスナーの場合は "false"。設定されていない場合は、コンテナーの提供時の設定が使用されます。リスナー型は可変ではないため、SpEL およびプロパティプレースホルダーはサポートされていません。
      導入:
      2.8
      関連事項:
      デフォルト:
      ""
    • filter

      StringSE filter
      RecordFilterStrategy Bean 名を設定して、コンテナーファクトリで構成された戦略をオーバーライドします。SpEL 式が提供されている場合(#{...})、式は RecordFilterStrategy インスタンスまたは Bean 名のいずれかに評価できます。
      戻り値:
      フィルター。
      導入:
      2.8.4
      デフォルト:
      ""
    • info

      StringSE info
      キー KafkaHeaders.LISTENER_INFO のヘッダーとして追加される静的情報。これは、たとえば、RecordInterceptorRecordFilterStrategy、リスナー自体で、任意の目的に使用できます。

      SpEL #{...} およびプロパティプレースホルダー ${...} はサポートされていますが、文字列または byte[] に解決する必要があります。

      入力レコードのヘッダーを使用して送信レコードが作成された場合、このヘッダーは削除されます。

      戻り値:
      情報。
      導入:
      2.8.4
      デフォルト:
      ""
    • containerPostProcessor

      StringSE containerPostProcessor
      ContainerPostProcessor の Bean 名を設定して、コンテナーの作成と構成後にコンテナーをカスタマイズできるようにします。すべてのリスナーコンテナーに適用される ContainerCustomizer とは対照的に、このポストプロセッサーは現在のリスナーコンテナーにのみ適用されます。このポストプロセッサーは、コンテナーカスタマイザ (存在する場合) の後に適用されます。
      戻り値:
      コンテナーポストプロセッサーの Bean 名。
      導入:
      3.1
      デフォルト:
      ""