Apache Kafka サポート

概要

Apache Kafka 用の Spring Integration は Spring for Apache Kafka プロジェクトに基づいています。

この依存関係をプロジェクトに含める必要があります。

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-kafka</artifactId>
    <version>6.4.1</version>
</dependency>
compile "org.springframework.integration:spring-integration-kafka:6.4.1"

次のコンポーネントを提供します。

送信チャネルアダプター

送信チャネルアダプターは、Spring Integration チャネルから Apache Kafka トピックにメッセージを公開するために使用されます。チャネルはアプリケーションコンテキストで定義され、Apache Kafka にメッセージを送信するアプリケーションに接続されます。送信側アプリケーションは、次のように、送信チャネルアダプターによって内部的に Kafka レコードに変換される Spring Integration メッセージを使用して、Apache Kafka に公開できます。

  • Spring Integration メッセージのペイロードは、Kafka レコードのペイロードを設定するために使用されます。

  • デフォルトでは、Spring Integration メッセージの kafka_messageKey ヘッダーは、Kafka レコードのキーを設定するために使用されます。

kafka_topic ヘッダーと kafka_partitionId ヘッダーを介してメッセージをパブリッシュするためのターゲットトピックとパーティションをそれぞれカスタマイズできます。

さらに、<int-kafka:outbound-channel-adapter> は、送信メッセージに SpEL 式を適用することにより、キー、ターゲットトピック、ターゲットパーティションを抽出する機能を提供します。そのために、相互に排他的な 3 つの属性のペアをサポートしています。

  • topic および topic-expression

  • message-key および message-key-expression

  • partition-id および partition-id-expression

これらを使用すると、topicmessage-keypartition-id をそれぞれアダプターの静的値として指定したり、実行時にリクエストメッセージに対してそれらの値を動的に評価したりできます。

KafkaHeaders インターフェース(spring-kafka によって提供)には、ヘッダーとの対話に使用される定数が含まれています。messageKey および topic のデフォルトヘッダーには、kafka_ プレフィックスが必要になりました。古いヘッダーを使用していた以前のバージョンから移行する場合、<int-kafka:outbound-channel-adapter> で message-key-expression="headers['messageKey']" および topic-expression="headers['topic']" を指定する必要があります。または、<header-enricher> または MessageBuilder を使用して、アップストリームのヘッダーを KafkaHeaders からの新しいヘッダーに変更することもできます。定数値を使用する場合は、topic および message-key を使用して、アダプターで構成することもできます。

注: アダプターがトピックまたはメッセージキー(定数または式のいずれか)で構成されている場合、それらが使用され、対応するヘッダーは無視されます。ヘッダーで構成をオーバーライドする場合は、次のような式で構成する必要があります。

topic-expression="headers['topic'] != null ? headers['topic'] : 'myTopic'"

アダプターには KafkaTemplate が必要です。KafkaTemplate には、適切に構成された KafkaProducerFactory が必要です。

send-failure-channel (sendFailureChannel) が提供され、send() 障害 (同期または非同期) が受信された場合、ErrorMessage がチャネルに送信されます。ペイロードは、failedMessagerecord ( ProducerRecord) および cause プロパティを持つ KafkaSendFailureException です。error-message-strategy プロパティを設定することで、DefaultErrorMessageStrategy をオーバーライドできます。

send-success-channel (sendSuccessChannel)が指定されている場合、送信が成功した後、型 org.apache.kafka.clients.producer.RecordMetadata のペイロードを持つメッセージが送信されます。

アプリケーションがトランザクションを使用し、同じチャネルアダプターを使用して、リスナーコンテナーによってトランザクションが開始されるメッセージを公開する場合、および既存のトランザクションがない場合に公開する場合は、KafkaTemplate で transactionIdPrefix を構成して、によって使用されるプレフィックスをオーバーライドする必要があります。コンテナーまたはトランザクションマネージャー。コンテナーによって開始されるトランザクション(プロデューサーファクトリまたはトランザクションマネージャープロパティ)で使用されるプレフィックスは、すべてのアプリケーションインスタンスで同じである必要があります。プロデューサーのみのトランザクションに使用されるプレフィックスは、すべてのアプリケーションインスタンスで一意である必要があります。

ブール値に解決する必要がある flushExpression を構成できます。linger.ms および batch.size Kafka プロデューサープロパティを使用している場合は、複数のメッセージを送信した後のフラッシュが役立つ場合があります。式は最後のメッセージで Boolean.TRUE と評価され、不完全なバッチがすぐに送信されます。デフォルトでは、式は KafkaIntegrationHeaders.FLUSH ヘッダー(kafka_flush)で Boolean 値を検索します。フラッシュは、値が true の場合に発生し、false の場合、ヘッダーがない場合には発生しません。

KafkaProducerMessageHandler.sendTimeoutExpression のデフォルトが 10 秒から delivery.timeout.ms Kafka プロデューサープロパティ + 5000 に変更されたため、このフレームワークによって生成されたタイムアウトではなく、タイムアウト後の実際の Kafka エラーがアプリケーションに伝播されます。予期しない動作が発生する可能性があるため、これは一貫性を保つために変更されました(Spring は送信をタイムアウトする可能性がありますが、実際には最終的には成功します)。重要: このタイムアウトはデフォルトで 120 秒であるため、よりタイムリーな障害を取得するためにタイムアウトを短縮することをお勧めします。

構成

次の例は、Apache Kafka の送信チャネルアダプターを構成する方法を示しています。

  • Java DSL

  • Java

  • XML

@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka));
}

@Bean
public IntegrationFlow sendToKafkaFlow() {
    return f -> f
            .splitWith(s -> s.<String>function(p -> Stream.generate(() -> p).limit(101).iterator()))
            .publishSubscribeChannel(c -> c
                    .subscribe(sf -> sf.handle(
                            kafkaMessageHandler(producerFactory(), TEST_TOPIC1)
                                    .timestampExpression("T(Long).valueOf('1487694048633')"),
                            e -> e.id("kafkaProducer1")))
                    .subscribe(sf -> sf.handle(
                            kafkaMessageHandler(producerFactory(), TEST_TOPIC2)
                                   .timestamp(m -> 1487694048644L),
                            e -> e.id("kafkaProducer2")))
            );
}

@Bean
public DefaultKafkaHeaderMapper mapper() {
    return new DefaultKafkaHeaderMapper();
}

private KafkaProducerMessageHandlerSpec<Integer, String, ?> kafkaMessageHandler(
        ProducerFactory<Integer, String> producerFactory, String topic) {
    return Kafka
            .outboundChannelAdapter(producerFactory)
            .messageKey(m -> m
                    .getHeaders()
                    .get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
            .headerMapper(mapper())
            .partitionId(m -> 10)
            .topicExpression("headers[kafka_topic] ?: '" + topic + "'")
            .configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic));
}
@Bean
@ServiceActivator(inputChannel = "toKafka")
public MessageHandler handler() throws Exception {
    KafkaProducerMessageHandler<String, String> handler =
            new KafkaProducerMessageHandler<>(kafkaTemplate());
    handler.setTopicExpression(new LiteralExpression("someTopic"));
    handler.setMessageKeyExpression(new LiteralExpression("someKey"));
    handler.setSuccessChannel(successes());
    handler.setFailureChannel(failures());
    return handler;
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
    // set more properties
    return new DefaultKafkaProducerFactory<>(props);
}
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                    kafka-template="template"
                                    auto-startup="false"
                                    channel="inputToKafka"
                                    topic="foo"
                                    sync="false"
                                    message-key-expression="'bar'"
                                    send-failure-channel="failures"
                                    send-success-channel="successes"
                                    error-message-strategy="ems"
                                    partition-id-expression="2">
</int-kafka:outbound-channel-adapter>

<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="localhost:9092" />
                    ... <!-- more producer properties -->
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
</bean>

メッセージ駆動型チャネルアダプター

KafkaMessageDrivenChannelAdapter (<int-kafka:message-driven-channel-adapter>)は、spring-kafka KafkaMessageListenerContainer または ConcurrentListenerContainer を使用します。

また、mode 属性も使用できます。record または batch の値を受け入れることができます (デフォルト: record)。record モードの場合、各メッセージペイロードは単一の ConsumerRecord から変換されます。batch モードの場合、ペイロードは、コンシューマーポーリングによって返されたすべての ConsumerRecord インスタンスから変換されたオブジェクトのリストです。バッチ化された @KafkaListener と同様に、KafkaHeaders.RECEIVED_KEYKafkaHeaders.RECEIVED_PARTITIONKafkaHeaders.RECEIVED_TOPICKafkaHeaders.OFFSET ヘッダーもリストであり、位置はペイロード内の位置に対応しています。

受信したメッセージには、特定のヘッダーが入力されています。詳細については、KafkaHeaders クラス (Javadoc) を参照してください。

Consumer オブジェクト(kafka_consumer ヘッダー内)はスレッドセーフではありません。そのメソッドは、アダプター内でリスナーを呼び出すスレッドでのみ呼び出す必要があります。メッセージを別のスレッドに渡す場合は、そのメソッドを呼び出さないでください。

retry-template が指定されている場合、配信の失敗はその再試行ポリシーに従って再試行されます。error-channel も指定されている場合、再試行が使い果たされた後、デフォルトの ErrorMessageSendingRecoverer がリカバリコールバックとして使用されます。recovery-callback を使用して、その場合に実行する他のアクションを指定するか、null に設定して最終例外をリスナーコンテナーにスローし、そこで処理されるようにすることもできます。

ErrorMessage (error-channel または recovery-callback で使用するため)を作成する場合、error-message-strategy プロパティを設定することでエラーメッセージをカスタマイズできます。デフォルトでは、RawRecordHeaderErrorMessageStrategy が使用され、変換されたメッセージと生の ConsumerRecord へのアクセスを提供します。

この形式の再試行はブロックされており、ポーリングされたすべてのレコードの合計再試行遅延が max.poll.interval.ms コンシューマープロパティを超える可能性がある場合、再調整が発生する可能性があります。代わりに、KafkaErrorSendingMessageRecoverer で構成されたリスナーコンテナーに DefaultErrorHandler を追加することを検討してください。

構成

次の例は、メッセージ駆動型チャネルアダプターを構成する方法を示しています。

  • Java DSL

  • Java

  • XML

@Bean
public IntegrationFlow topic1ListenerFromKafkaFlow() {
    return IntegrationFlow
            .from(Kafka.messageDrivenChannelAdapter(consumerFactory(),
                    KafkaMessageDrivenChannelAdapter.ListenerMode.record, TEST_TOPIC1)
                    .configureListenerContainer(c ->
                            c.ackMode(AbstractMessageListenerContainer.AckMode.MANUAL)
                                    .id("topic1ListenerContainer"))
                    .recoveryCallback(new ErrorMessageSendingRecoverer(errorChannel(),
                            new RawRecordHeaderErrorMessageStrategy()))
                    .retryTemplate(new RetryTemplate())
                    .filterInRetry(true))
            .filter(Message.class, m ->
                            m.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, Integer.class) < 101,
                    f -> f.throwExceptionOnRejection(true))
            .<String, String>transform(String::toUpperCase)
            .channel(c -> c.queue("listeningFromKafkaResults1"))
            .get();
}
@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
            adapter(KafkaMessageListenerContainer<String, String> container) {
    KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
            new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record);
    kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
    return kafkaMessageDrivenChannelAdapter;
}

@Bean
public KafkaMessageListenerContainer<String, String> container() throws Exception {
    ContainerProperties properties = new ContainerProperties(this.topic);
    // set more properties
    return new KafkaMessageListenerContainer<>(consumerFactory(), properties);
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
    // set more properties
    return new DefaultKafkaConsumerFactory<>(props);
}
<int-kafka:message-driven-channel-adapter
        id="kafkaListener"
        listener-container="container1"
        auto-startup="false"
        phase="100"
        send-timeout="5000"
        mode="record"
        retry-template="template"
        recovery-callback="callback"
        error-message-strategy="ems"
        channel="someChannel"
        error-channel="errorChannel" />

<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
            <constructor-arg>
                <map>
                <entry key="bootstrap.servers" value="localhost:9092" />
                ...
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.springframework.kafka.listener.config.ContainerProperties">
            <constructor-arg name="topics" value="foo" />
        </bean>
    </constructor-arg>

</bean>

@KafkaListener アノテーションに使用されるコンテナーファクトリを使用して、他の目的で ConcurrentMessageListenerContainer インスタンスを作成することもできます。例については、Spring for Apache Kafka のドキュメントを参照してください。

Java DSL では、DSL がコンテナーを Bean として登録するため、コンテナーを @Bean として構成する必要はありません。次の例は、その方法を示しています。

@Bean
public IntegrationFlow topic2ListenerFromKafkaFlow() {
    return IntegrationFlow
            .from(Kafka.messageDrivenChannelAdapter(kafkaListenerContainerFactory().createContainer(TEST_TOPIC2),
            KafkaMessageDrivenChannelAdapter.ListenerMode.record)
                .id("topic2Adapter"))
            ...
            get();
}

この場合、アダプターには id (topic2Adapter)が指定されていることに注意してください。コンテナーは、topic2Adapter.container という名前でアプリケーションコンテキストに登録されます。アダプターに id プロパティがない場合、コンテナーの Bean 名は、コンテナーの完全修飾クラス名に #n を加えたものになります。ここで、n はコンテナーごとに増分されます。

受信チャネルアダプター

KafkaMessageSource は、ポーリング可能なチャネルアダプターの実装を提供します。

構成

  • Java DSL

  • Kotlin

  • Java

  • XML

@Bean
public IntegrationFlow flow(ConsumerFactory<String, String> cf)  {
    return IntegrationFlow.from(Kafka.inboundChannelAdapter(cf, new ConsumerProperties("myTopic")),
                          e -> e.poller(Pollers.fixedDelay(5000)))
            .handle(System.out::println)
            .get();
}
@Bean
fun sourceFlow(cf: ConsumerFactory<String, String>) =
    integrationFlow(Kafka.inboundChannelAdapter(cf,
        ConsumerProperties(TEST_TOPIC3).also {
            it.groupId = "kotlinMessageSourceGroup"
        }),
        { poller(Pollers.fixedDelay(100)) }) {
        handle { m ->

        }
    }
@InboundChannelAdapter(channel = "fromKafka", poller = @Poller(fixedDelay = "5000"))
@Bean
public KafkaMessageSource<String, String> source(ConsumerFactory<String, String> cf)  {
    ConsumerProperties consumerProperties = new ConsumerProperties("myTopic");
	consumerProperties.setGroupId("myGroupId");
	consumerProperties.setClientId("myClientId");
    retunr new KafkaMessageSource<>(cf, consumerProperties);
}
<int-kafka:inbound-channel-adapter
        id="adapter1"
        consumer-factory="consumerFactory"
        consumer-properties="consumerProperties1"
        ack-factory="ackFactory"
        channel="inbound"
        message-converter="converter"
        payload-type="java.lang.String"
        raw-header="true"
        auto-startup="false">
    <int:poller fixed-delay="5000"/>
</int-kafka:inbound-channel-adapter>

<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
    <constructor-arg>
        <map>
            <entry key="max.poll.records" value="1"/>
        </map>
    </constructor-arg>
</bean>

<bean id="consumerProperties1" class="org.springframework.kafka.listener.ConsumerProperties">
    <constructor-arg name="topics" value="topic1"/>
    <property name="groupId" value="group"/>
    <property name="clientId" value="client"/>
</bean>

使用可能なプロパティについては、javadoc を参照してください。

デフォルトでは、max.poll.records はコンシューマーファクトリで明示的に設定する必要があります。コンシューマーファクトリが DefaultKafkaConsumerFactory の場合は、強制的に 1 に設定されます。プロパティ allowMultiFetch を true に設定して、この動作をオーバーライドできます。

リバランスを回避するには、max.poll.interval.ms 内でコンシューマーをポーリングする必要があります。allowMultiFetch を true に設定した場合は、取得したすべてのレコードを処理し、max.poll.interval.ms 内で再度ポーリングする必要があります。

このアダプターによって発行されたメッセージには、前のポーリングから残っているレコードの数を含むヘッダー kafka_remainingRecords が含まれています。

バージョン 6.2 以降、KafkaMessageSource はコンシューマープロパティで提供される ErrorHandlingDeserializer をサポートします。DeserializationException はレコードヘッダーから抽出され、呼び出された側にスローされます。SourcePollingChannelAdapter では、この例外は ErrorMessage にラップされ、その errorChannel に発行されます。詳細については、ErrorHandlingDeserializer のドキュメントを参照してください。

送信ゲートウェイ

送信ゲートウェイは、リクエスト / 応答操作用です。送信スレッドがゲートウェイでブロックされず、応答が応答リスナーコンテナースレッドで処理されるという点で、ほとんどの Spring Integration ゲートウェイとは異なります。コードが同期メッセージングゲートウェイの背後でゲートウェイを呼び出す場合、ユーザースレッドは応答が受信される (またはタイムアウトが発生する) までそこでブロックされます。

KafkaProducerMessageHandler sendTimeoutExpression のデフォルトは delivery.timeout.ms Kafka プロデューサープロパティ + 5000 であるため、このフレームワークによって生成されるタイムアウトではなく、タイムアウト後の実際の Kafka エラーがアプリケーションに伝播されます。これは、予期しない動作が発生する可能性があるため、一貫性を保つために変更されました (Spring は、実際には最終的には成功しますが、send() がタイムアウトになる可能性があります)。重要: タイムアウトはデフォルトで 120 秒であるため、よりタイムリーに失敗するようにタイムアウトを減らすこともできます。

構成

次の例は、ゲートウェイを構成する方法を示しています。

  • Java DSL

  • Java

  • XML

@Bean
public IntegrationFlow outboundGateFlow(
        ReplyingKafkaTemplate<String, String, String> kafkaTemplate) {

    return IntegrationFlow.from("kafkaRequests")
            .handle(Kafka.outboundGateway(kafkaTemplate))
            .channel("kafkaReplies")
            .get();
}
@Bean
@ServiceActivator(inputChannel = "kafkaRequests", outputChannel = "kafkaReplies")
public KafkaProducerMessageHandler<String, String> outGateway(
        ReplyingKafkaTemplate<String, String, String> kafkaTemplate) {
    return new KafkaProducerMessageHandler<>(kafkaTemplate);
}
<int-kafka:outbound-gateway
    id="allProps"
    error-message-strategy="ems"
    kafka-template="template"
    message-key-expression="'key'"
    order="23"
    partition-id-expression="2"
    reply-channel="replies"
    reply-timeout="43"
    request-channel="requests"
    requires-reply="false"
    send-success-channel="successes"
    send-failure-channel="failures"
    send-timeout-expression="44"
    sync="true"
    timestamp-expression="T(System).currentTimeMillis()"
    topic-expression="'topic'"/>

使用可能なプロパティについては、javadoc を参照してください。

送信チャネルアダプターと同じクラスが使用されていることに注意してください。唯一の違いは、コンストラクターに渡される KafkaTemplate が ReplyingKafkaTemplate であるということです。詳細については、Spring for Apache Kafka のドキュメントを参照してください。

送信トピック、パーティション、キーなどは、送信アダプターと同じ方法で決定されます。返信トピックは次のように決定されます。

  1. KafkaHeaders.REPLY_TOPIC という名前のメッセージヘッダー(存在する場合は、String または byte[] 値が必要です)は、テンプレートの応答コンテナーのサブスクライブされたトピックに対して検証されます。

  2. テンプレートの replyContainer が 1 つのトピックのみにサブスクライブされている場合は、それが使用されます。

KafkaHeaders.REPLY_PARTITION ヘッダーを指定して、応答に使用する特定のパーティションを決定することもできます。繰り返しますが、これはテンプレートの応答コンテナーのサブスクリプションに対して検証されます。

または、次の Bean のような構成を使用することもできます。

@Bean
public IntegrationFlow outboundGateFlow() {
    return IntegrationFlow.from("kafkaRequests")
            .handle(Kafka.outboundGateway(producerFactory(), replyContainer())
                .configureKafkaTemplate(t -> t.replyTimeout(30_000)))
            .channel("kafkaReplies")
            .get();
}

受信ゲートウェイ

受信ゲートウェイは、リクエスト / 応答操作用です。

構成

次の例は、受信ゲートウェイを構成する方法を示しています。

  • Java DSL

  • Java

  • XML

@Bean
public IntegrationFlow serverGateway(
        ConcurrentMessageListenerContainer<Integer, String> container,
        KafkaTemplate<Integer, String> replyTemplate) {
    return IntegrationFlow
            .from(Kafka.inboundGateway(container, replyTemplate)
                .replyTimeout(30_000))
            .<String, String>transform(String::toUpperCase)
            .get();
}
@Bean
public KafkaInboundGateway<Integer, String, String> inboundGateway(
        AbstractMessageListenerContainer<Integer, String>container,
        KafkaTemplate<Integer, String> replyTemplate) {

    KafkaInboundGateway<Integer, String, String> gateway =
        new KafkaInboundGateway<>(container, replyTemplate);
    gateway.setRequestChannel(requests);
    gateway.setReplyChannel(replies);
    gateway.setReplyTimeout(30_000);
    return gateway;
}
<int-kafka:inbound-gateway
        id="gateway1"
        listener-container="container1"
        kafka-template="template"
        auto-startup="false"
        phase="100"
        request-timeout="5000"
        request-channel="nullChannel"
        reply-channel="errorChannel"
        reply-timeout="43"
        message-converter="messageConverter"
        payload-type="java.lang.String"
        error-message-strategy="ems"
        retry-template="retryTemplate"
        recovery-callback="recoveryCallback"/>

使用可能なプロパティについては、javadoc を参照してください。

RetryTemplate が指定されている場合、配信の失敗はその再試行ポリシーに従って再試行されます。error-channel も指定されている場合、再試行が使い果たされた後、デフォルトの ErrorMessageSendingRecoverer がリカバリコールバックとして使用されます。recovery-callback を使用して、その場合に実行する他のアクションを指定するか、null に設定して最終例外をリスナーコンテナーにスローし、そこで処理されるようにすることもできます。

ErrorMessage (error-channel または recovery-callback で使用するため)を作成する場合、error-message-strategy プロパティを設定することでエラーメッセージをカスタマイズできます。デフォルトでは、RawRecordHeaderErrorMessageStrategy が使用され、変換されたメッセージと生の ConsumerRecord へのアクセスを提供します。

この形式の再試行はブロックされており、ポーリングされたすべてのレコードの合計再試行遅延が max.poll.interval.ms コンシューマープロパティを超える可能性がある場合、再調整が発生する可能性があります。代わりに、KafkaErrorSendingMessageRecoverer で構成されたリスナーコンテナーに DefaultErrorHandler を追加することを検討してください。

次の例は、JavaDSL を使用して単純な大文字のコンバーターを構成する方法を示しています。

または、次のようなコードを使用して大文字のコンバーターを構成することもできます。

@Bean
public IntegrationFlow serverGateway() {
    return IntegrationFlow
            .from(Kafka.inboundGateway(consumerFactory(), containerProperties(),
                    producerFactory())
                .replyTimeout(30_000))
            .<String, String>transform(String::toUpperCase)
            .get();
}

@KafkaListener アノテーションに使用されるコンテナーファクトリを使用して、他の目的で ConcurrentMessageListenerContainer インスタンスを作成することもできます。例については、Spring for Apache Kafka のドキュメントメッセージ駆動型チャネルアダプターを参照してください。

Apache Kafka が支援するチャネルトピック

Spring Integration には、永続性のために Apache Kafka トピックによってサポートされる MessageChannel 実装があります。

各チャネルには、送信側に KafkaTemplate が必要であり、リスナーコンテナーファクトリ(サブスクライブ可能なチャネルの場合)またはポーリング可能なチャネルに KafkaMessageSource が必要です。

Java DSL 設定

  • Java DSL

  • Java

  • XML

@Bean
public IntegrationFlow flowWithSubscribable(KafkaTemplate<Integer, String> template,
        ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {

    return IntegrationFlow.from(...)
            ...
            .channel(Kafka.channel(template, containerFactory, "someTopic1").groupId("group1"))
            ...
            .get();
}

@Bean
public IntegrationFlow flowWithPubSub(KafkaTemplate<Integer, String> template,
        ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {

    return IntegrationFlow.from(...)
            ...
            .publishSubscribeChannel(pubSub(template, containerFactory),
                pubsub -> pubsub
                            .subscribe(subflow -> ...)
                            .subscribe(subflow -> ...))
            .get();
}

@Bean
public BroadcastCapableChannel pubSub(KafkaTemplate<Integer, String> template,
        ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {

    return Kafka.publishSubscribeChannel(template, containerFactory, "someTopic2")
            .groupId("group2")
            .get();
}

@Bean
public IntegrationFlow flowWithPollable(KafkaTemplate<Integer, String> template,
        KafkaMessageSource<Integer, String> source) {

    return IntegrationFlow.from(...)
            ...
            .channel(Kafka.pollableChannel(template, source, "someTopic3").groupId("group3"))
            .handle(...,  e -> e.poller(...))
            ...
            .get();
}
/**
 * Channel for a single subscriber.
 **/
@Bean
SubscribableKafkaChannel pointToPoint(KafkaTemplate<String, String> template,
    KafkaListenerContainerFactory<String, String> factory)

    SubscribableKafkaChannel channel =
        new SubscribableKafkaChannel(template, factory, "topicA");
    channel.setGroupId("group1");
    return channel;
}

/**
 * Channel for multiple subscribers.
 **/
@Bean
SubscribableKafkaChannel pubsub(KafkaTemplate<String, String> template,
    KafkaListenerContainerFactory<String, String> factory)

    SubscribableKafkaChannel channel =
        new SubscribableKafkaChannel(template, factory, "topicB", true);
    channel.setGroupId("group2");
    return channel;
}

/**
 * Pollable channel (topic is configured on the source)
 **/
@Bean
PollableKafkaChannel pollable(KafkaTemplate<String, String> template,
    KafkaMessageSource<String, String> source)

    PollableKafkaChannel channel =
        new PollableKafkaChannel(template, source);
    channel.setGroupId("group3");
    return channel;
}
<int-kafka:channel kafka-template="template" id="ptp" topic="ptpTopic" group-id="ptpGroup"
    container-factory="containerFactory" />

<int-kafka:pollable-channel kafka-template="template" id="pollable" message-source="source"
    group-id = "pollableGroup"/>

<int-kafka:publish-subscribe-channel kafka-template="template" id="pubSub" topic="pubSubTopic"
    group-id="pubSubGroup" container-factory="containerFactory" />

メッセージ変換

StringJsonMessageConverter が提供されます。詳細については、Spring for Apache Kafka のドキュメントを参照してください。

このコンバーターをメッセージ駆動型チャネルアダプターとともに使用する場合、受信ペイロードを変換する型を指定できます。これは、アダプターに payload-type 属性(payloadType プロパティ)を設定することによって実現されます。次の例は、XML 構成でこれを行う方法を示しています。

<int-kafka:message-driven-channel-adapter
        id="kafkaListener"
        listener-container="container1"
        auto-startup="false"
        phase="100"
        send-timeout="5000"
        channel="nullChannel"
        message-converter="messageConverter"
        payload-type="com.example.Thing"
        error-channel="errorChannel" />

<bean id="messageConverter"
    class="org.springframework.kafka.support.converter.MessagingMessageConverter"/>

次の例は、Java 構成でアダプターに payload-type 属性(payloadType プロパティ)を設定する方法を示しています。

@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
            adapter(KafkaMessageListenerContainer<String, String> container) {
    KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
            new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record);
    kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
    kafkaMessageDrivenChannelAdapter.setMessageConverter(converter());
    kafkaMessageDrivenChannelAdapter.setPayloadType(Thing.class);
    return kafkaMessageDrivenChannelAdapter;
}

Null ペイロードとログ圧縮 "Tombstone" レコード

Spring メッセージング Message<?> オブジェクトには null ペイロードを含めることはできません。Apache Kafka のエンドポイントを使用する場合、null ペイロード (tombstone レコードとも呼ばれます) は型 KafkaNull のペイロードによって表されます。詳細については、Spring for Apache Kafka のドキュメントを参照してください。

Spring Integration エンドポイントの POJO メソッドは、KafkaNull の代わりに真の null 値を使用できます。これを行うには、パラメーターを @Payload(required = false) でマークします。次の例は、その方法を示しています。

@ServiceActivator(inputChannel = "fromSomeKafkaInboundEndpoint")
public void in(@Header(KafkaHeaders.RECEIVED_KEY) String key,
               @Payload(required = false) Customer customer) {
    // customer is null if a tombstone record
    ...
}

KStream から Spring Integration フローを呼び出す

MessagingTransformer を使用して、KStream から統合フローを呼び出すことができます。

@Bean
public KStream<byte[], byte[]> kStream(StreamsBuilder kStreamBuilder,
        MessagingTransformer<byte[], byte[], byte[]> transformer)  transformer) {
    KStream<byte[], byte[]> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
    stream.mapValues((ValueMapper<byte[], byte[]>) String::toUpperCase)
            ...
            .transform(() -> transformer)
            .to(streamingTopic2);

    stream.print(Printed.toSysOut());

    return stream;
}

@Bean
@DependsOn("flow")
public MessagingTransformer<byte[], byte[], String> transformer(
        MessagingFunction function) {

    MessagingMessageConverter converter = new MessagingMessageConverter();
    converter.setHeaderMapper(new SimpleKafkaHeaderMapper("*"));
    return new MessagingTransformer<>(function, converter);
}

@Bean
public IntegrationFlow flow() {
    return IntegrationFlow.from(MessagingFunction.class)
        ...
        .get();
}

統合フローがインターフェースで開始する場合、作成されるプロキシにはフロー Bean の名前があり、".gateway" が付加されているため、この Bean 名は必要に応じて @Qualifier として使用できます。

読み取り / 処理 / 書き込みシナリオのパフォーマンスに関する考慮事項

多くのアプリケーションは、トピックから消費し、何らかの処理を実行して、別のトピックに書き込みます。ほとんどの場合、write が失敗した場合、アプリケーションは例外をスローして、受信リクエストを再試行したり、デッドレタートピックに送信したりします。この機能は、適切に構成されたエラーハンドラーと共に、基になるメッセージリスナーコンテナーによってサポートされます。ただし、これをサポートするには、書き込み操作が成功 (または失敗) するまでリスナースレッドをブロックして、コンテナーに例外をスローできるようにする必要があります。単一のレコードを使用する場合、これは送信アダプターで sync プロパティを設定することによって実現されます。ただし、バッチを使用する場合、sync を使用すると、アプリケーションが次のメッセージを送信する前に各送信の結果を待機するため、パフォーマンスが大幅に低下します。複数の送信を実行してから、それらの送信の結果を後で待つこともできます。これは、メッセージハンドラーに futuresChannel を追加することによって実現されます。この機能を有効にするには、送信メッセージに KafkaIntegrationHeaders.FUTURE_TOKEN を追加します。これを使用して、Future を特定の送信メッセージに関連付けることができます。この機能の使用例を次に示します。

@SpringBootApplication
public class FuturesChannelApplication {

    public static void main(String[] args) {
        SpringApplication.run(FuturesChannelApplication.class, args);
    }

    @Bean
    IntegrationFlow inbound(ConsumerFactory<String, String> consumerFactory, Handler handler) {
        return IntegrationFlow.from(Kafka.messageDrivenChannelAdapter(consumerFactory,
                    ListenerMode.batch, "inTopic"))
                .handle(handler)
                .get();
    }

    @Bean
    IntegrationFlow outbound(KafkaTemplate<String, String> kafkaTemplate) {
        return IntegrationFlow.from(Gate.class)
                .enrichHeaders(h -> h
                        .header(KafkaHeaders.TOPIC, "outTopic")
                        .headerExpression(KafkaIntegrationHeaders.FUTURE_TOKEN, "headers[id]"))
                .handle(Kafka.outboundChannelAdapter(kafkaTemplate)
                        .futuresChannel("futures"))
                .get();
    }

    @Bean
    PollableChannel futures() {
        return new QueueChannel();
    }

}

@Component
@DependsOn("outbound")
class Handler {

    @Autowired
    Gate gate;

    @Autowired
    PollableChannel futures;

    public void handle(List<String> input) throws Exception {
        System.out.println(input);
        input.forEach(str -> this.gate.send(str.toUpperCase()));
        for (int i = 0; i < input.size(); i++) {
            Message<?> future = this.futures.receive(10000);
            ((Future<?>) future.getPayload()).get(10, TimeUnit.SECONDS);
        }
    }

}

interface Gate {

    void send(String out);

}