Apache Kafka サポート
概要
Apache Kafka 用の Spring Integration は Spring for Apache Kafka プロジェクトに基づいています。
この依存関係をプロジェクトに含める必要があります。
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>5.5.8</version>
</dependency>
compile "org.springframework.integration:spring-integration-kafka:5.5.8"
次のコンポーネントを提供します。
送信チャネルアダプター
送信チャネルアダプターは、Spring Integration チャネルから Apache Kafka トピックにメッセージを公開するために使用されます。チャネルはアプリケーションコンテキストで定義され、Apache Kafka にメッセージを送信するアプリケーションに接続されます。送信側アプリケーションは、次のように、送信チャネルアダプターによって内部的に Kafka レコードに変換される Spring Integration メッセージを使用して、Apache Kafka に公開できます。
Spring Integration メッセージのペイロードは、Kafka レコードのペイロードを設定するために使用されます。
デフォルトでは、Spring Integration メッセージの
kafka_messageKey
ヘッダーは、Kafka レコードのキーを設定するために使用されます。
kafka_topic
ヘッダーと kafka_partitionId
ヘッダーを介してメッセージをパブリッシュするためのターゲットトピックとパーティションをそれぞれカスタマイズできます。
さらに、<int-kafka:outbound-channel-adapter>
は、送信メッセージに SpEL 式を適用することにより、キー、ターゲットトピック、ターゲットパーティションを抽出する機能を提供します。そのために、相互に排他的な 3 つの属性のペアをサポートしています。
topic
およびtopic-expression
message-key
およびmessage-key-expression
partition-id
およびpartition-id-expression
これらを使用すると、topic
、message-key
、partition-id
をそれぞれアダプターの静的値として指定したり、実行時にリクエストメッセージに対してそれらの値を動的に評価したりできます。
KafkaHeaders インターフェース(spring-kafka によって提供)には、ヘッダーとの対話に使用される定数が含まれています。messageKey および topic のデフォルトヘッダーには、kafka_ プレフィックスが必要になりました。古いヘッダーを使用していた以前のバージョンから移行する場合、<int-kafka:outbound-channel-adapter> で message-key-expression="headers['messageKey']" および topic-expression="headers['topic']" を指定する必要があります。または、<header-enricher> または MessageBuilder を使用して、アップストリームのヘッダーを KafkaHeaders からの新しいヘッダーに変更することもできます。定数値を使用する場合は、topic および message-key を使用して、アダプターで構成することもできます。 |
注: アダプターがトピックまたはメッセージキー(定数または式のいずれか)で構成されている場合、それらが使用され、対応するヘッダーは無視されます。ヘッダーで構成をオーバーライドする場合は、次のような式で構成する必要があります。
topic-expression="headers['topic'] != null ? headers['topic'] : 'myTopic'"
アダプターには KafkaTemplate
が必要です。KafkaTemplate
には、適切に構成された KafkaProducerFactory
が必要です。
send-failure-channel
(sendFailureChannel
)が提供され、送信エラー(同期または非同期)が受信されると、ErrorMessage
がチャネルに送信されます。ペイロードは、failedMessage
、record
(ProducerRecord
)および cause
プロパティを持つ KafkaSendFailureException
です。error-message-strategy
プロパティを設定することにより、DefaultErrorMessageStrategy
をオーバーライドできます。
send-success-channel
(sendSuccessChannel
)が指定されている場合、送信が成功した後、型 org.apache.kafka.clients.producer.RecordMetadata
のペイロードを持つメッセージが送信されます。
アプリケーションがトランザクションを使用し、同じチャネルアダプターを使用して、リスナーコンテナーによってトランザクションが開始されるメッセージを公開する場合、および既存のトランザクションがない場合に公開する場合は、KafkaTemplate で transactionIdPrefix を構成して、によって使用されるプレフィックスをオーバーライドする必要があります。コンテナーまたはトランザクションマネージャー。コンテナーによって開始されるトランザクション(プロデューサーファクトリまたはトランザクションマネージャープロパティ)で使用されるプレフィックスは、すべてのアプリケーションインスタンスで同じである必要があります。プロデューサーのみのトランザクションに使用されるプレフィックスは、すべてのアプリケーションインスタンスで一意である必要があります。 |
ブール値に解決する必要がある flushExpression
を構成できます。linger.ms
および batch.size
Kafka プロデューサープロパティを使用している場合は、複数のメッセージを送信した後のフラッシュが役立つ場合があります。式は最後のメッセージで Boolean.TRUE
と評価され、不完全なバッチがすぐに送信されます。デフォルトでは、式は KafkaIntegrationHeaders.FLUSH
ヘッダー(kafka_flush
)で Boolean
値を検索します。フラッシュは、値が true
の場合に発生し、false
の場合、ヘッダーがない場合には発生しません。
KafkaProducerMessageHandler.sendTimeoutExpression
のデフォルトが 10 秒から delivery.timeout.ms
Kafka プロデューサープロパティ + 5000
に変更されたため、このフレームワークによって生成されたタイムアウトではなく、タイムアウト後の実際の Kafka エラーがアプリケーションに伝播されます。予期しない動作が発生する可能性があるため、これは一貫性を保つために変更されました(Spring は送信をタイムアウトする可能性がありますが、実際には最終的には成功します)。重要: このタイムアウトはデフォルトで 120 秒であるため、よりタイムリーな障害を取得するためにタイムアウトを短縮することをお勧めします。
Java 構成
次の例は、Java を使用して Apache Kafka の送信チャネルアダプターを構成する方法を示しています。
@Bean
@ServiceActivator(inputChannel = "toKafka")
public MessageHandler handler() throws Exception {
KafkaProducerMessageHandler<String, String> handler =
new KafkaProducerMessageHandler<>(kafkaTemplate());
handler.setTopicExpression(new LiteralExpression("someTopic"));
handler.setMessageKeyExpression(new LiteralExpression("someKey"));
handler.setSuccessChannel(successes());
handler.setFailureChannel(failures());
return handler;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
// set more properties
return new DefaultKafkaProducerFactory<>(props);
}
Java DSL 設定
次の例は、Spring Integration Java DSL を使用して Apache Kafka の送信チャネルアダプターを構成する方法を示しています。
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka));
}
@Bean
public IntegrationFlow sendToKafkaFlow() {
return f -> f
.<String>split(p -> Stream.generate(() -> p).limit(101).iterator(), null)
.publishSubscribeChannel(c -> c
.subscribe(sf -> sf.handle(
kafkaMessageHandler(producerFactory(), TEST_TOPIC1)
.timestampExpression("T(Long).valueOf('1487694048633')"),
e -> e.id("kafkaProducer1")))
.subscribe(sf -> sf.handle(
kafkaMessageHandler(producerFactory(), TEST_TOPIC2)
.timestamp(m -> 1487694048644L),
e -> e.id("kafkaProducer2")))
);
}
@Bean
public DefaultKafkaHeaderMapper mapper() {
return new DefaultKafkaHeaderMapper();
}
private KafkaProducerMessageHandlerSpec<Integer, String, ?> kafkaMessageHandler(
ProducerFactory<Integer, String> producerFactory, String topic) {
return Kafka
.outboundChannelAdapter(producerFactory)
.messageKey(m -> m
.getHeaders()
.get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
.headerMapper(mapper())
.partitionId(m -> 10)
.topicExpression("headers[kafka_topic] ?: '" + topic + "'")
.configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic));
}
XML 構成
次の例は、XML を使用して Kafka 送信・チャネルアダプターを構成する方法を示しています。
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
kafka-template="template"
auto-startup="false"
channel="inputToKafka"
topic="foo"
sync="false"
message-key-expression="'bar'"
send-failure-channel="failures"
send-success-channel="successes"
error-message-strategy="ems"
partition-id-expression="2">
</int-kafka:outbound-channel-adapter>
<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
... <!-- more producer properties -->
</map>
</constructor-arg>
</bean>
</constructor-arg>
</bean>
メッセージ駆動型チャネルアダプター
KafkaMessageDrivenChannelAdapter
(<int-kafka:message-driven-channel-adapter>
)は、spring-kafka
KafkaMessageListenerContainer
または ConcurrentListenerContainer
を使用します。
mode
属性も使用できます。record
または batch
(デフォルト: record
)の値を受け入れることができます。record
モードの場合、各メッセージペイロードは単一の ConsumerRecord
から変換されます。batch
モードの場合、ペイロードは、コンシューマーポーリングによって返されたすべての ConsumerRecord
インスタンスから変換されたオブジェクトのリストです。バッチ処理された @KafkaListener
と同様に、KafkaHeaders.RECEIVED_MESSAGE_KEY
、KafkaHeaders.RECEIVED_PARTITION_ID
、KafkaHeaders.RECEIVED_TOPIC
、KafkaHeaders.OFFSET
ヘッダーもリストであり、位置はペイロード内の位置に対応しています。
受信したメッセージには、特定のヘッダーが入力されています。詳細については、KafkaHeaders
クラス (Javadoc) を参照してください。
Consumer オブジェクト(kafka_consumer ヘッダー内)はスレッドセーフではありません。そのメソッドは、アダプター内でリスナーを呼び出すスレッドでのみ呼び出す必要があります。メッセージを別のスレッドに渡す場合は、そのメソッドを呼び出さないでください。 |
retry-template
が提供されている場合、配信の失敗はその再試行ポリシーに従って再試行されます。この場合、error-channel
は許可されません。recovery-callback
を使用して、再試行が終了したときのエラーを処理できます。ほとんどの場合、これは ErrorMessage
をチャネルに送信する ErrorMessageSendingRecoverer
です。
ErrorMessage
(error-channel
または recovery-callback
で使用するため)を作成する場合、error-message-strategy
プロパティを設定することでエラーメッセージをカスタマイズできます。デフォルトでは、RawRecordHeaderErrorMessageStrategy
が使用され、変換されたメッセージと生の ConsumerRecord
へのアクセスを提供します。
Java 構成
次の例は、Java を使用してメッセージ駆動型チャネルアダプターを設定する方法を示しています。
@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
adapter(KafkaMessageListenerContainer<String, String> container) {
KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record);
kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
return kafkaMessageDrivenChannelAdapter;
}
@Bean
public KafkaMessageListenerContainer<String, String> container() throws Exception {
ContainerProperties properties = new ContainerProperties(this.topic);
// set more properties
return new KafkaMessageListenerContainer<>(consumerFactory(), properties);
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
// set more properties
return new DefaultKafkaConsumerFactory<>(props);
}
Java DSL 設定
次の例は、Spring Integration JavaDSL を使用してメッセージ駆動型チャネルアダプターを設定する方法を示しています。
@Bean
public IntegrationFlow topic1ListenerFromKafkaFlow() {
return IntegrationFlows
.from(Kafka.messageDrivenChannelAdapter(consumerFactory(),
KafkaMessageDrivenChannelAdapter.ListenerMode.record, TEST_TOPIC1)
.configureListenerContainer(c ->
c.ackMode(AbstractMessageListenerContainer.AckMode.MANUAL)
.id("topic1ListenerContainer"))
.recoveryCallback(new ErrorMessageSendingRecoverer(errorChannel(),
new RawRecordHeaderErrorMessageStrategy()))
.retryTemplate(new RetryTemplate())
.filterInRetry(true))
.filter(Message.class, m ->
m.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, Integer.class) < 101,
f -> f.throwExceptionOnRejection(true))
.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("listeningFromKafkaResults1"))
.get();
}
@KafkaListener
アノテーションに使用されるコンテナーファクトリを使用して、他の目的で ConcurrentMessageListenerContainer
インスタンスを作成することもできます。例については、Spring for Apache Kafka のドキュメントを参照してください。
Java DSL では、DSL がコンテナーを Bean として登録するため、コンテナーを @Bean
として構成する必要はありません。次の例は、その方法を示しています。
@Bean
public IntegrationFlow topic2ListenerFromKafkaFlow() {
return IntegrationFlows
.from(Kafka.messageDrivenChannelAdapter(kafkaListenerContainerFactory().createContainer(TEST_TOPIC2),
KafkaMessageDrivenChannelAdapter.ListenerMode.record)
.id("topic2Adapter"))
...
get();
}
この場合、アダプターには id
(topic2Adapter
)が指定されていることに注意してください。コンテナーは、topic2Adapter.container
という名前でアプリケーションコンテキストに登録されます。アダプターに id
プロパティがない場合、コンテナーの Bean 名は、コンテナーの完全修飾クラス名に #n
を加えたものになります。ここで、n
はコンテナーごとに増分されます。
XML 構成
次の例は、XML を使用してメッセージ駆動型チャネルアダプターを設定する方法を示しています。
<int-kafka:message-driven-channel-adapter
id="kafkaListener"
listener-container="container1"
auto-startup="false"
phase="100"
send-timeout="5000"
mode="record"
retry-template="template"
recovery-callback="callback"
error-message-strategy="ems"
channel="someChannel"
error-channel="errorChannel" />
<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
...
</map>
</constructor-arg>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topics" value="foo" />
</bean>
</constructor-arg>
</bean>
受信チャネルアダプター
KafkaMessageSource
は、ポーリング可能なチャネルアダプターの実装を提供します。
Java 構成
@InboundChannelAdapter(channel = "fromKafka", poller = @Poller(fixedDelay = "5000"))
@Bean
public KafkaMessageSource<String, String> source(ConsumerFactory<String, String> cf) {
KafkaMessageSource<String, String> source = new KafkaMessageSource<>(cf, "myTopic");
source.setGroupId("myGroupId");
source.setClientId("myClientId");
return source;
}
使用可能なプロパティについては、javadoc を参照してください。
デフォルトでは、max.poll.records
はコンシューマーファクトリで明示的に設定する必要があります。コンシューマーファクトリが DefaultKafkaConsumerFactory
の場合は、強制的に 1 に設定されます。プロパティ allowMultiFetch
を true
に設定して、この動作をオーバーライドできます。
リバランスを回避するには、max.poll.interval.ms 内でコンシューマーをポーリングする必要があります。allowMultiFetch を true に設定した場合は、取得したすべてのレコードを処理し、max.poll.interval.ms 内で再度ポーリングする必要があります。 |
このアダプターによって発行されたメッセージには、前のポーリングから残っているレコードの数を含むヘッダー kafka_remainingRecords
が含まれています。
Java DSL 設定
@Bean
public IntegrationFlow flow(ConsumerFactory<String, String> cf) {
return IntegrationFlows.from(Kafka.inboundChannelAdapter(cf, "myTopic")
.groupId("myDslGroupId"), e -> e.poller(Pollers.fixedDelay(5000)))
.handle(System.out::println)
.get();
}
XML 構成
<int-kafka:inbound-channel-adapter
id="adapter1"
consumer-factory="consumerFactory"
ack-factory="ackFactory"
topics="topic1"
channel="inbound"
client-id="client"
group-id="group"
message-converter="converter"
payload-type="java.lang.String"
raw-header="true"
auto-startup="false"
rebalance-listener="rebal">
<int:poller fixed-delay="5000"/>
</int-kafka:inbound-channel-adapter>
送信ゲートウェイ
送信ゲートウェイは、リクエスト / 応答操作用です。ほとんどの Spring Integration ゲートウェイとは異なり、送信スレッドはゲートウェイでブロックされず、応答は応答リスナーコンテナースレッドで処理されます。コードが同期メッセージングゲートウェイの背後にあるゲートウェイを呼び出す場合、ユーザースレッドは、応答が受信されるまで(またはタイムアウトが発生するまで)そこでブロックします。
ゲートウェイは、応答コンテナーにトピックとパーティションが割り当てられるまでリクエストを受け入れません。テンプレートの応答コンテナープロパティに ConsumerRebalanceListener を追加し、onPartitionsAssigned 呼び出しを待ってから、ゲートウェイにメッセージを送信することをお勧めします。 |
KafkaProducerMessageHandler
sendTimeoutExpression
のデフォルトは delivery.timeout.ms
Kafka プロデューサープロパティ + 5000
であるため、このフレームワークによって生成されるタイムアウトではなく、タイムアウト後の実際の Kafka エラーがアプリケーションに伝播されます。予期しない動作が発生する可能性があるため、これは一貫性を保つために変更されました(Spring は送信をタイムアウトする可能性がありますが、実際には最終的には成功します)。重要: このタイムアウトはデフォルトで 120 秒であるため、よりタイムリーな障害を取得するためにタイムアウトを短縮することをお勧めします。
Java 構成
次の例は、Java を使用してゲートウェイを構成する方法を示しています。
@Bean
@ServiceActivator(inputChannel = "kafkaRequests", outputChannel = "kafkaReplies")
public KafkaProducerMessageHandler<String, String> outGateway(
ReplyingKafkaTemplate<String, String, String> kafkaTemplate) {
return new KafkaProducerMessageHandler<>(kafkaTemplate);
}
使用可能なプロパティについては、javadoc を参照してください。
送信チャネルアダプターと同じクラスが使用されていることに注意してください。唯一の違いは、コンストラクターに渡される KafkaTemplate
が ReplyingKafkaTemplate
であるということです。詳細については、Spring for Apache Kafka のドキュメントを参照してください。
送信トピック、パーティション、キーなどは、送信アダプターと同じ方法で決定されます。返信トピックは次のように決定されます。
KafkaHeaders.REPLY_TOPIC
という名前のメッセージヘッダー(存在する場合は、String
またはbyte[]
値が必要です)は、テンプレートの応答コンテナーのサブスクライブされたトピックに対して検証されます。テンプレートの
replyContainer
が 1 つのトピックのみにサブスクライブされている場合は、それが使用されます。
KafkaHeaders.REPLY_PARTITION
ヘッダーを指定して、応答に使用する特定のパーティションを決定することもできます。繰り返しますが、これはテンプレートの応答コンテナーのサブスクリプションに対して検証されます。
Java DSL 設定
次の例は、Java DSL を使用して送信ゲートウェイを構成する方法を示しています。
@Bean
public IntegrationFlow outboundGateFlow(
ReplyingKafkaTemplate<String, String, String> kafkaTemplate) {
return IntegrationFlows.from("kafkaRequests")
.handle(Kafka.outboundGateway(kafkaTemplate))
.channel("kafkaReplies")
.get();
}
または、次の Bean のような構成を使用することもできます。
@Bean
public IntegrationFlow outboundGateFlow() {
return IntegrationFlows.from("kafkaRequests")
.handle(Kafka.outboundGateway(producerFactory(), replyContainer())
.configureKafkaTemplate(t -> t.replyTimeout(30_000)))
.channel("kafkaReplies")
.get();
}
XML 構成
<int-kafka:outbound-gateway
id="allProps"
error-message-strategy="ems"
kafka-template="template"
message-key-expression="'key'"
order="23"
partition-id-expression="2"
reply-channel="replies"
reply-timeout="43"
request-channel="requests"
requires-reply="false"
send-success-channel="successes"
send-failure-channel="failures"
send-timeout-expression="44"
sync="true"
timestamp-expression="T(System).currentTimeMillis()"
topic-expression="'topic'"/>
受信ゲートウェイ
受信ゲートウェイは、リクエスト / 応答操作用です。
次の例は、Java で受信ゲートウェイを構成する方法を示しています。
@Bean
public KafkaInboundGateway<Integer, String, String> inboundGateway(
AbstractMessageListenerContainer<Integer, String>container,
KafkaTemplate<Integer, String> replyTemplate) {
KafkaInboundGateway<Integer, String, String> gateway =
new KafkaInboundGateway<>(container, replyTemplate);
gateway.setRequestChannel(requests);
gateway.setReplyChannel(replies);
gateway.setReplyTimeout(30_000);
return gateway;
}
使用可能なプロパティについては、javadoc を参照してください。
次の例は、JavaDSL を使用して単純な大文字のコンバーターを構成する方法を示しています。
@Bean
public IntegrationFlow serverGateway(
ConcurrentMessageListenerContainer<Integer, String> container,
KafkaTemplate<Integer, String> replyTemplate) {
return IntegrationFlows
.from(Kafka.inboundGateway(container, replyTemplate)
.replyTimeout(30_000))
.<String, String>transform(String::toUpperCase)
.get();
}
または、次のようなコードを使用して大文字のコンバーターを構成することもできます。
@Bean
public IntegrationFlow serverGateway() {
return IntegrationFlows
.from(Kafka.inboundGateway(consumerFactory(), containerProperties(),
producerFactory())
.replyTimeout(30_000))
.<String, String>transform(String::toUpperCase)
.get();
}
@KafkaListener
アノテーションに使用されるコンテナーファクトリを使用して、他の目的で ConcurrentMessageListenerContainer
インスタンスを作成することもできます。例については、Spring for Apache Kafka のドキュメントとメッセージ駆動型チャネルアダプターを参照してください。
XML 構成
<int-kafka:inbound-gateway
id="gateway1"
listener-container="container1"
kafka-template="template"
auto-startup="false"
phase="100"
request-timeout="5000"
request-channel="nullChannel"
reply-channel="errorChannel"
reply-timeout="43"
message-converter="messageConverter"
payload-type="java.lang.String"
error-message-strategy="ems"
retry-template="retryTemplate"
recovery-callback="recoveryCallback"/>
各プロパティの説明については、XML スキーマを参照してください。
Apache Kafka が支援するチャネルトピック
Spring Integration には、永続性のために Apache Kafka トピックによってサポートされる MessageChannel
実装があります。
各チャネルには、送信側に KafkaTemplate
が必要であり、リスナーコンテナーファクトリ(サブスクライブ可能なチャネルの場合)またはポーリング可能なチャネルに KafkaMessageSource
が必要です。
Java DSL 設定
@Bean
public IntegrationFlow flowWithSubscribable(KafkaTemplate<Integer, String> template,
ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {
return IntegrationFlows.from(...)
...
.channel(Kafka.channel(template, containerFactory, "someTopic1").groupId("group1"))
...
.get();
}
@Bean
public IntegrationFlow flowWithPubSub(KafkaTemplate<Integer, String> template,
ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {
return IntegrationFlows.from(...)
...
.publishSubscribeChannel(pubSub(template, containerFactory),
pubsub -> pubsub
.subscribe(subflow -> ...)
.subscribe(subflow -> ...))
.get();
}
@Bean
public BroadcastCapableChannel pubSub(KafkaTemplate<Integer, String> template,
ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {
return Kafka.publishSubscribeChannel(template, containerFactory, "someTopic2")
.groupId("group2")
.get();
}
@Bean
public IntegrationFlow flowWithPollable(KafkaTemplate<Integer, String> template,
KafkaMessageSource<Integer, String> source) {
return IntegrationFlows.from(...)
...
.channel(Kafka.pollableChannel(template, source, "someTopic3").groupId("group3"))
.handle(..., e -> e.poller(...))
...
.get();
}
Java 構成
/**
* Channel for a single subscriber.
**/
@Bean
SubscribableKafkaChannel pointToPoint(KafkaTemplate<String, String> template,
KafkaListenerContainerFactory<String, String> factory)
SubscribableKafkaChannel channel =
new SubscribableKafkaChannel(template, factory, "topicA");
channel.setGroupId("group1");
return channel;
}
/**
* Channel for multiple subscribers.
**/
@Bean
SubscribableKafkaChannel pubsub(KafkaTemplate<String, String> template,
KafkaListenerContainerFactory<String, String> factory)
SubscribableKafkaChannel channel =
new SubscribableKafkaChannel(template, factory, "topicB", true);
channel.setGroupId("group2");
return channel;
}
/**
* Pollable channel (topic is configured on the source)
**/
@Bean
PollableKafkaChannel pollable(KafkaTemplate<String, String> template,
KafkaMessageSource<String, String> source)
PollableKafkaChannel channel =
new PollableKafkaChannel(template, source);
channel.setGroupId("group3");
return channel;
}
XML 構成
<int-kafka:channel kafka-template="template" id="ptp" topic="ptpTopic" group-id="ptpGroup"
container-factory="containerFactory" />
<int-kafka:pollable-channel kafka-template="template" id="pollable" message-source="source"
group-id = "pollableGroup"/>
<int-kafka:publish-subscribe-channel kafka-template="template" id="pubSub" topic="pubSubTopic"
group-id="pubSubGroup" container-factory="containerFactory" />
メッセージ変換
StringJsonMessageConverter
が提供されます。詳細については、Spring for Apache Kafka のドキュメントを参照してください。
このコンバーターをメッセージ駆動型チャネルアダプターとともに使用する場合、受信ペイロードを変換する型を指定できます。これは、アダプターに payload-type
属性(payloadType
プロパティ)を設定することによって実現されます。次の例は、XML 構成でこれを行う方法を示しています。
<int-kafka:message-driven-channel-adapter
id="kafkaListener"
listener-container="container1"
auto-startup="false"
phase="100"
send-timeout="5000"
channel="nullChannel"
message-converter="messageConverter"
payload-type="com.example.Foo"
error-channel="errorChannel" />
<bean id="messageConverter"
class="org.springframework.kafka.support.converter.MessagingMessageConverter"/>
次の例は、Java 構成でアダプターに payload-type
属性(payloadType
プロパティ)を設定する方法を示しています。
@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
adapter(KafkaMessageListenerContainer<String, String> container) {
KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record);
kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
kafkaMessageDrivenChannelAdapter.setMessageConverter(converter());
kafkaMessageDrivenChannelAdapter.setPayloadType(Foo.class);
return kafkaMessageDrivenChannelAdapter;
}
Null ペイロードとログ圧縮 "Tombstone" レコード
Spring メッセージング Message<?>
オブジェクトには null
ペイロードを含めることはできません。Apache Kafka のエンドポイントを使用する場合、null
ペイロード (tombstone レコードとも呼ばれる) は型 KafkaNull
のペイロードによって表されます。詳細については、Spring for Apache Kafka のドキュメントを参照してください。
Spring Integration エンドポイントの POJO メソッドは、KafkaNull
の代わりに真の null
値を使用できます。これを行うには、パラメーターに @Payload(required = false)
のマークを付けます。次の例は、その方法を示しています。
@ServiceActivator(inputChannel = "fromSomeKafkaInboundEndpoint")
public void in(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
@Payload(required = false) Customer customer) {
// customer is null if a tombstone record
...
}
KStream
から Spring Integration フローを呼び出す
MessagingTransformer
を使用して、KStream
から統合フローを呼び出すことができます。
@Bean
public KStream<byte[], byte[]> kStream(StreamsBuilder kStreamBuilder,
MessagingTransformer<byte[], byte[], byte[]> transformer) transformer) {
KStream<byte[], byte[]> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
stream.mapValues((ValueMapper<byte[], byte[]>) String::toUpperCase)
...
.transform(() -> transformer)
.to(streamingTopic2);
stream.print(Printed.toSysOut());
return stream;
}
@Bean
@DependsOn("flow")
public MessagingTransformer<byte[], byte[], String> transformer(
MessagingFunction function) {
MessagingMessageConverter converter = new MessagingMessageConverter();
converter.setHeaderMapper(new SimpleKafkaHeaderMapper("*"));
return new MessagingTransformer<>(function, converter);
}
@Bean
public IntegrationFlow flow() {
return IntegrationFlows.from(MessagingFunction.class)
...
.get();
}
統合フローがインターフェースで開始する場合、作成されるプロキシにはフロー Bean の名前があり、".gateway" が付加されているため、この Bean 名は必要に応じて @Qualifier
として使用できます。
読み取り / 処理 / 書き込みシナリオのパフォーマンスに関する考慮事項
多くのアプリケーションはトピックから消費し、いくつかの処理を実行し、別のトピックに書き込みます。ほとんどの場合、書き込みが失敗した場合、アプリケーションは例外をスローして、受信リクエストを再試行したり、デッドレタートピックに送信したりできます。この機能は、適切に構成されたエラーハンドラーとともに、基盤となるメッセージリスナーコンテナーによってサポートされます。ただし、これをサポートするには、書き込み操作が成功(または失敗)するまでリスナースレッドをブロックして、例外をコンテナーにスローできるようにする必要があります。単一レコードを使用する場合、これは、送信・アダプターで sync
プロパティを設定することによって実現されます。ただし、バッチを使用する場合、sync
を使用すると、アプリケーションが次のメッセージを送信する前に各送信の結果を待機するため、パフォーマンスが大幅に低下します。複数の送信を実行し、後でそれらの送信の結果を待つこともできます。これは、futuresChannel
をメッセージハンドラーに追加することで実現されます。この機能を有効にするには、送信メッセージに KafkaIntegrationHeaders.FUTURE_TOKEN
を追加します。次に、これを使用して、Future
を特定の送信済みメッセージに関連付けることができます。この機能の使用方法の例を次に示します。
@SpringBootApplication
public class FuturesChannelApplication {
public static void main(String[] args) {
SpringApplication.run(FuturesChannelApplication.class, args);
}
@Bean
IntegrationFlow inbound(ConsumerFactory<String, String> consumerFactory, Handler handler) {
return IntegrationFlows.from(Kafka.messageDrivenChannelAdapter(consumerFactory,
ListenerMode.batch, "inTopic"))
.handle(handler)
.get();
}
@Bean
IntegrationFlow outbound(KafkaTemplate<String, String> kafkaTemplate) {
return IntegrationFlows.from(Gate.class)
.enrichHeaders(h -> h
.header(KafkaHeaders.TOPIC, "outTopic")
.headerExpression(KafkaIntegrationHeaders.FUTURE_TOKEN, "headers[id]"))
.handle(Kafka.outboundChannelAdapter(kafkaTemplate)
.futuresChannel("futures"))
.get();
}
@Bean
PollableChannel futures() {
return new QueueChannel();
}
}
@Component
@DependsOn("outbound")
class Handler {
@Autowired
Gate gate;
@Autowired
PollableChannel futures;
public void handle(List<String> input) throws Exception {
System.out.println(input);
input.forEach(str -> this.gate.send(str.toUpperCase()));
for (int i = 0; i < input.size(); i++) {
Message<?> future = this.futures.receive(10000);
((Future<?>) future.getPayload()).get(10, TimeUnit.SECONDS);
}
}
}
interface Gate {
void send(String out);
}