Spring Framework は、JmsTemplate を使用した JMS API の簡易使用から、メッセージを非同期で受信するための完全なインフラストラクチャまで、メッセージングシステムとの統合を幅広くサポートしています。Spring AMQP は、Advanced Message Queuing Protocol に同様の機能セットを提供します。Spring Boot は、RabbitTemplate および RabbitMQ の自動構成オプションも提供します。Spring WebSocket にはネイティブで STOMP メッセージングのサポートが含まれており、Spring Boot はスターターと少量の自動構成を通じてそれをサポートしています。Spring Boot は Apache Kafka もサポートしています。

1. JMS

javax.jms.ConnectionFactory インターフェースは、JMS ブローカーと対話するための javax.jms.Connection を作成する標準的な方法を提供します。Spring は JMS を使用するために ConnectionFactory を必要としますが、通常は直接使用する必要はなく、代わりに高レベルのメッセージング抽象化に依存できます。(詳細については、Spring Framework リファレンスドキュメントの関連セクションを参照してください)Spring Boot は、メッセージを送受信するために必要なインフラストラクチャーを自動構成します。

1.1. ActiveMQ サポート

ActiveMQ [Apache] (英語) がクラスパスで使用可能な場合、Spring Boot は ConnectionFactory を構成することもできます。ブローカーが存在する場合、組み込みブローカーが自動的に開始および構成されます(構成を通じてブローカー URL が指定されておらず、組み込みブローカーが構成で無効になっていない場合)。

spring-boot-starter-activemq を使用する場合、JMS と統合する Spring インフラストラクチャと同様に、ActiveMQ インスタンスの接続または埋め込みに必要な依存関係が提供されます。

ActiveMQ 構成は、spring.activemq.*外部構成プロパティによって制御されます。

デフォルトでは、ActiveMQ は VM トランスポート [Apache] (英語) を使用するように自動構成されており、同じ JVM インスタンスに埋め込まれたブローカーを開始します。

次の例に示すように、spring.activemq.in-memory プロパティを構成することにより、組み込みブローカーを無効にできます。

Properties
spring.activemq.in-memory=false
Yaml
spring:
  activemq:
    in-memory: false

次の例に示すように、ブローカー URL を構成すると、組み込みブローカーも無効になります。

Properties
spring.activemq.broker-url=tcp://192.168.1.210:9876
spring.activemq.user=admin
spring.activemq.password=secret
Yaml
spring:
  activemq:
    broker-url: "tcp://192.168.1.210:9876"
    user: "admin"
    password: "secret"

組み込みブローカーを完全に制御したい場合は、詳細について ActiveMQ のドキュメント [Apache] (英語) を参照してください。

デフォルトでは、CachingConnectionFactory は、spring.jms.* の外部構成プロパティによって制御できる適切な設定でネイティブ ConnectionFactory をラップします。

Properties
spring.jms.cache.session-cache-size=5
Yaml
spring:
  jms:
    cache:
      session-cache-size: 5

ネイティブプーリングを使用する場合は、次の例に示すように、org.messaginghub:pooled-jms に依存関係を追加し、それに応じて JmsPoolConnectionFactory を構成することにより、使用できます。

Properties
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=50
Yaml
spring:
  activemq:
    pool:
      enabled: true
      max-connections: 50
サポートされるオプションの詳細については、ActiveMQProperties [GitHub] (英語) を参照してください。より高度なカスタマイズのために、ActiveMQConnectionFactoryCustomizer を実装する任意の数の Bean を登録することもできます。

デフォルトでは、ActiveMQ は宛先がまだ存在しない場合に宛先を作成し、指定された名前に対して宛先が解決されるようにします。

1.2. ActiveMQArtemis サポート

Spring Boot は、クラスパスで ActiveMQ Artemis [Apache] (英語) が使用可能であることを検出すると、ConnectionFactory を自動構成できます。ブローカーが存在する場合、埋め込みモードのブローカーは自動的に起動および構成されます(モードプロパティが明示的に設定されていない場合)。サポートされるモードは、embedded (組み込みブローカーが必要であり、ブローカーがクラスパスで使用できない場合にエラーが発生することを明示するため)および native (netty トランスポートプロトコルを使用してブローカーに接続するため)です。後者が構成されている場合、Spring Boot は、デフォルト設定でローカルマシンで実行されているブローカーに接続する ConnectionFactory を構成します。

spring-boot-starter-artemis を使用する場合は、既存の ActiveMQ Artemis インスタンスに接続するために必要な依存関係と、JMS と統合するための Spring インフラストラクチャが提供されます。アプリケーションに org.apache.activemq:artemis-jms-server を追加すると、組み込みモードを使用できます。

ActiveMQ Artemis 構成は、spring.artemis.* の外部構成プロパティによって制御されます。例: application.properties で次のセクションを宣言できます:

Properties
spring.artemis.mode=native
spring.artemis.broker-url=tcp://192.168.1.210:9876
spring.artemis.user=admin
spring.artemis.password=secret
Yaml
spring:
  artemis:
    mode: native
    broker-url: "tcp://192.168.1.210:9876"
    user: "admin"
    password: "secret"

ブローカーを埋め込むときに、永続性を有効にし、使用可能にする必要がある宛先をリストするかどうかを選択できます。これらをコンマ区切りリストとして指定して、デフォルトのオプションで作成するか、型 org.apache.activemq.artemis.jms.server.config.JMSQueueConfiguration または org.apache.activemq.artemis.jms.server.config.TopicConfiguration の Bean をそれぞれ高度なキューおよびトピック構成に定義できます。

デフォルトでは、CachingConnectionFactory は、spring.jms.* の外部構成プロパティによって制御できる適切な設定でネイティブ ConnectionFactory をラップします。

Properties
spring.jms.cache.session-cache-size=5
Yaml
spring:
  jms:
    cache:
      session-cache-size: 5

ネイティブプーリングを使用する場合は、次の例に示すように、org.messaginghub:pooled-jms に依存関係を追加し、それに応じて JmsPoolConnectionFactory を構成することにより、使用できます。

Properties
spring.artemis.pool.enabled=true
spring.artemis.pool.max-connections=50
Yaml
spring:
  artemis:
    pool:
      enabled: true
      max-connections: 50

サポートされるオプションの詳細については、ArtemisProperties [GitHub] (英語) を参照してください。

JNDI ルックアップは含まれず、宛先は Artemis 構成の name 属性または構成を通じて提供された名前を使用して、名前に対して解決されます。

1.3. JNDI ConnectionFactory を使用する

アプリケーションサーバーでアプリケーションを実行している場合、Spring Boot は JNDI を使用して JMS ConnectionFactory を見つけようとします。デフォルトでは、java:/JmsXA と java:/XAConnectionFactory の場所がチェックされます。次の例に示すように、別の場所を指定する必要がある場合は、spring.jms.jndi-name プロパティを使用できます。

Properties
spring.jms.jndi-name=java:/MyConnectionFactory
Yaml
spring:
  jms:
    jndi-name: "java:/MyConnectionFactory"

1.4. メッセージの送信

Spring の JmsTemplate は自動構成されており、次の例に示すように、独自の Bean に直接オートワイヤーできます。

import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    private final JmsTemplate jmsTemplate;

    public MyBean(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    // ...

    public void someMethod() {
        this.jmsTemplate.convertAndSend("hello");
    }

}
JmsMessagingTemplate (Javadoc) は、同様の方法で注入できます。DestinationResolver または MessageConverter Bean が定義されている場合、自動構成された JmsTemplate に自動的に関連付けられます。

1.5. メッセージの受信

JMS インフラストラクチャが存在する場合、任意の Bean に @JmsListener のアノテーションを付けて、リスナーエンドポイントを作成できます。JmsListenerContainerFactory が定義されていない場合、デフォルトの JmsListenerContainerFactory が自動的に構成されます。DestinationResolverMessageConverterjavax.jms.ExceptionListener Bean が定義されている場合、デフォルトのファクトリに自動的に関連付けられます。

デフォルトでは、デフォルトのファクトリはトランザクションです。JtaTransactionManager が存在するインフラストラクチャで実行すると、デフォルトでリスナーコンテナーに関連付けられます。そうでない場合、sessionTransacted フラグが有効になります。後者のシナリオでは、リスナーメソッド(またはそのデリゲート)に @Transactional を追加することにより、ローカルデータストアトランザクションを受信メッセージの処理に関連付けることができます。これにより、ローカルトランザクションが完了すると、受信メッセージが確認されます。これには、同じ JMS セッションで実行されたレスポンスメッセージの送信も含まれます。

次のコンポーネントは、someQueue 宛先にリスナーエンドポイントを作成します。

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @JmsListener(destination = "someQueue")
    public void processMessage(String content) {
        // ...
    }

}
詳細については、@EnableJms の Javadoc を参照してください。

さらに JmsListenerContainerFactory インスタンスを作成する必要がある場合、またはデフォルトをオーバーライドする場合は、Spring Boot が提供する DefaultJmsListenerContainerFactoryConfigurer を使用して、自動構成されたものと同じ設定で DefaultJmsListenerContainerFactory を初期化できます。

たとえば、次の例では、特定の MessageConverter を使用する別のファクトリを公開しています。

import javax.jms.ConnectionFactory;

import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;

@Configuration(proxyBeanMethods = false)
public class MyJmsConfiguration {

    @Bean
    public DefaultJmsListenerContainerFactory myFactory(DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        ConnectionFactory connectionFactory = getCustomConnectionFactory();
        configurer.configure(factory, connectionFactory);
        factory.setMessageConverter(new MyMessageConverter());
        return factory;
    }

    private ConnectionFactory getCustomConnectionFactory() {
        return ...
    }

}

次に、次のように、@JmsListener アノテーション付きメソッドでファクトリを使用できます。

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @JmsListener(destination = "someQueue", containerFactory = "myFactory")
    public void processMessage(String content) {
        // ...
    }

}

2. AMQP

Advanced Message Queuing Protocol(AMQP)は、メッセージ指向ミドルウェア向けのプラットフォームに依存しないワイヤーレベルのプロトコルです。Spring AMQP プロジェクトは、コア Spring コンセプトを AMQP ベースのメッセージングソリューションの開発に適用します。Spring Boot は、spring-boot-starter-amqp 「スターター」など、RabbitMQ を介して AMQP を操作するためのいくつかの便利な機能を提供します。

2.1. RabbitMQ サポート

RabbitMQ (英語) は、AMQP プロトコルに基づいた、軽量で信頼性が高く、スケーラブルでポータブルなメッセージブローカーです。Spring は、RabbitMQ を使用して AMQP プロトコルを介して通信します。

RabbitMQ 構成は、spring.rabbitmq.* の外部構成プロパティによって制御されます。例: application.properties で次のセクションを宣言できます。

Properties
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
Yaml
spring:
  rabbitmq:
    host: "localhost"
    port: 5672
    username: "admin"
    password: "secret"

または、addresses 属性を使用して同じ接続を構成できます。

Properties
spring.rabbitmq.addresses=amqp://admin:secret@localhost
Yaml
spring:
  rabbitmq:
    addresses: "amqp://admin:secret@localhost"
その方法でアドレスを指定する場合、host および port プロパティは無視されます。アドレスが amqps プロトコルを使用している場合、SSL サポートは自動的に有効になります。

サポートされているプロパティベースの構成オプションの詳細については、RabbitProperties [GitHub] (英語) を参照してください。Spring AMQP で使用される RabbitMQ ConnectionFactory の下位レベルの詳細を構成するには、ConnectionFactoryCustomizer Bean を定義します。

ConnectionNameStrategy Bean がコンテキストに存在する場合、自動構成された CachingConnectionFactory によって作成された接続に名前を付けるために自動的に使用されます。

詳細については、RabbitMQ で使用されるプロトコルである AMQP を理解する (英語) を参照してください。

2.2. メッセージの送信

Spring の AmqpTemplate および AmqpAdmin は自動構成され、次の例に示すように、独自の Bean に直接オートワイヤーできます。

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    private final AmqpAdmin amqpAdmin;

    private final AmqpTemplate amqpTemplate;

    public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
        this.amqpAdmin = amqpAdmin;
        this.amqpTemplate = amqpTemplate;
    }

    // ...

    public void someMethod() {
        this.amqpAdmin.getQueueInfo("someQueue");
    }

    public void someOtherMethod() {
        this.amqpTemplate.convertAndSend("hello");
    }

}
RabbitMessagingTemplate (Javadoc) は、同様の方法で注入できます。MessageConverter Bean が定義されている場合、自動構成された AmqpTemplate に自動的に関連付けられます。

必要に応じて、Bean として定義されている org.springframework.amqp.core.Queue は、RabbitMQ インスタンスで対応するキューを宣言するために自動的に使用されます。

操作を再試行するには、AmqpTemplate で再試行を有効にします(たとえば、ブローカー接続が失われた場合)。

Properties
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
Yaml
spring:
  rabbitmq:
    template:
      retry:
        enabled: true
        initial-interval: "2s"

再試行はデフォルトで無効になっています。RabbitRetryTemplateCustomizer Bean を宣言することにより、プログラムで RetryTemplate をカスタマイズすることもできます。

さらに RabbitTemplate インスタンスを作成する必要がある場合、またはデフォルトをオーバーライドする場合、Spring Boot は RabbitTemplateConfigurer Bean を提供します。これを使用して、自動構成で使用されるファクトリと同じ設定で RabbitTemplate を初期化できます。

2.3. メッセージの受信

Rabbit インフラストラクチャが存在する場合、任意の Bean に @RabbitListener のアノテーションを付けて、リスナーエンドポイントを作成できます。RabbitListenerContainerFactory が定義されていない場合、デフォルトの SimpleRabbitListenerContainerFactory が自動的に構成され、spring.rabbitmq.listener.type プロパティを使用して直接コンテナーに切り替えることができます。MessageConverter または MessageRecoverer Bean が定義されている場合、デフォルトのファクトリに自動的に関連付けられます。

次のサンプルコンポーネントは、someQueue キューにリスナーエンドポイントを作成します。

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @RabbitListener(queues = "someQueue")
    public void processMessage(String content) {
        // ...
    }

}
詳細については、@EnableRabbit の Javadoc を参照してください。

さらに RabbitListenerContainerFactory インスタンスを作成する必要がある場合、またはデフォルトをオーバーライドする場合、Spring Boot は SimpleRabbitListenerContainerFactoryConfigurer および DirectRabbitListenerContainerFactoryConfigurer を提供します。これらを使用して、自動構成で使用されるファクトリと同じ設定で SimpleRabbitListenerContainerFactory および DirectRabbitListenerContainerFactory を初期化できます。

選択したコンテナーの種類は関係ありません。これらの 2 つの Bean は、自動構成によって公開されます。

たとえば、次の構成クラスは、特定の MessageConverter を使用する別のファクトリを公開します。

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {

    @Bean
    public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        ConnectionFactory connectionFactory = getCustomConnectionFactory();
        configurer.configure(factory, connectionFactory);
        factory.setMessageConverter(new MyMessageConverter());
        return factory;
    }

    private ConnectionFactory getCustomConnectionFactory() {
        return ...
    }

}

次に、次のように、@RabbitListener アノテーション付きメソッドでファクトリを使用できます。

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @RabbitListener(queues = "someQueue", containerFactory = "myFactory")
    public void processMessage(String content) {
        // ...
    }

}

再試行を有効にして、リスナーが例外をスローする状況を処理できます。デフォルトでは、RejectAndDontRequeueRecoverer が使用されますが、独自の MessageRecoverer を定義できます。再試行が使い果たされると、ブローカーがそうするように構成されている場合、メッセージは拒否され、送達不能交換にドロップまたはルーティングされます。デフォルトでは、再試行は無効になっています。RabbitRetryTemplateCustomizer Bean を宣言することにより、RetryTemplate をプログラムでカスタマイズすることもできます。

デフォルトでは、再試行が無効になっていて、リスナーが例外をスローした場合、配信は無期限に再試行されます。この動作は次の 2 つの方法で変更できます。defaultRequeueRejected プロパティを false に設定して再配信をゼロにするか、AmqpRejectAndDontRequeueException をスローしてメッセージを拒否することを通知します。後者は、再試行が有効で、配信試行の最大数に達したときに使用されるメカニズムです。

3. Apache Kafka サポート

Apache Kafka (英語) は、spring-kafka プロジェクトの自動構成を提供することによりサポートされます。

Kafka 構成は、spring.kafka.* の外部構成プロパティによって制御されます。例: application.properties で次のセクションを宣言できます。

Properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
Yaml
spring:
  kafka:
    bootstrap-servers: "localhost:9092"
    consumer:
      group-id: "myGroup"
起動時にトピックを作成するには、型 NewTopic の Bean を追加します。トピックがすでに存在する場合、Bean は無視されます。

サポートされるオプションの詳細については、KafkaProperties [GitHub] (英語) を参照してください。

3.1. メッセージの送信

Spring の KafkaTemplate は自動構成されており、次の例に示すように、独自の Bean に直接オートワイヤーできます。

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    // ...

    public void someMethod() {
        this.kafkaTemplate.send("someTopic", "Hello");
    }

}
プロパティ spring.kafka.producer.transaction-id-prefix が定義されている場合、KafkaTransactionManager が自動的に構成されます。また、RecordMessageConverter Bean が定義されている場合、自動構成された KafkaTemplate に自動的に関連付けられます。

3.2. メッセージの受信

Apache Kafka インフラストラクチャが存在する場合、任意の Bean に @KafkaListener のアノテーションを付けてリスナーエンドポイントを作成できます。KafkaListenerContainerFactory が定義されていない場合は、spring.kafka.listener.* で定義されたキーを使用してデフォルトの KafkaListenerContainerFactory が自動的に構成されます。

次のコンポーネントは、someTopic トピックにリスナーエンドポイントを作成します。

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @KafkaListener(topics = "someTopic")
    public void processMessage(String content) {
        // ...
    }

}

KafkaTransactionManager Bean が定義されている場合、自動的にコンテナーファクトリに関連付けられます。同様に、RecordFilterStrategyErrorHandlerCommonErrorHandlerAfterRollbackProcessor または ConsumerAwareRebalanceListener Bean が定義されている場合、デフォルトのファクトリに自動的に関連付けられます。

リスナーの型に応じて、RecordMessageConverter または BatchMessageConverter Bean がデフォルトファクトリに関連付けられます。RecordMessageConverter Bean のみがバッチリスナーに存在する場合、BatchMessageConverter にラップされます。

カスタム ChainedKafkaTransactionManager は、通常、自動構成された KafkaTransactionManager Bean を参照するため、@Primary とマークする必要があります。

3.3. Kafka ストリーム

Spring for Apache Kafka は、StreamsBuilder オブジェクトを作成し、そのストリームのライフサイクルを管理するためのファクトリ Bean を提供します。Spring Boot は、kafka-streams がクラスパス上にあり、Kafka ストリームが @EnableKafkaStreams アノテーションによって有効になっている限り、必要な KafkaStreamsConfiguration Bean を自動構成します。

Kafka ストリームを有効にすると、アプリケーション ID とブートストラップサーバーを設定する必要があります。前者は spring.kafka.streams.application-id を使用して設定でき、設定されていない場合はデフォルトで spring.application.name になります。後者はグローバルに設定することも、ストリームに対してのみ具体的にオーバーライドすることもできます。

専用プロパティを使用して、いくつかの追加プロパティを利用できます。他の任意の Kafka プロパティは、spring.kafka.streams.properties 名前空間を使用して設定できます。詳細については、features.html も参照してください。

ファクトリ Bean を使用するには、次の例に示すように、StreamsBuilder を @Bean に接続します。

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.support.serializer.JsonSerde;

@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
        KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
        stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>()));
        return stream;
    }

    private KeyValue<Integer, String> uppercaseValue(Integer key, String value) {
        return new KeyValue<>(key, value.toUpperCase());
    }

}

デフォルトでは、作成する StreamBuilder オブジェクトによって管理されるストリームは自動的に開始されます。この動作は、spring.kafka.streams.auto-startup プロパティを使用してカスタマイズできます。

3.4. Kafka の追加プロパティ

自動構成でサポートされるプロパティは、application-properties.html に示されています。ほとんどの場合、これらのプロパティ (ハイフン付きまたはキャメルケース) は、Apache Kafka のドット付きプロパティに直接マップされることに注意してください。詳細については、Apache Kafka のドキュメントを参照してください。

これらのプロパティの最初のいくつかは、すべてのコンポーネント (プロデューサー、コンシューマー、管理者、ストリーム) に適用されますが、異なる値を使用する場合はコンポーネントレベルで指定できます。Apache Kafka は、プロパティを HIGH、MEDIUM、LOW の重要度で指定します。Spring Boot の自動構成では、すべての HIGH 重要度プロパティ、一部の選択された MEDIUM および LOW プロパティ、およびデフォルト値がないすべてのプロパティがサポートされます。

Kafka でサポートされているプロパティのサブセットのみが、KafkaProperties クラスから直接利用できます。直接サポートされていない追加のプロパティを使用してプロデューサーまたはコンシューマーを構成する場合は、次のプロパティを使用します。

Properties
spring.kafka.properties[prop.one]=first
spring.kafka.admin.properties[prop.two]=second
spring.kafka.consumer.properties[prop.three]=third
spring.kafka.producer.properties[prop.four]=fourth
spring.kafka.streams.properties[prop.five]=fifth
Yaml
spring:
  kafka:
    properties:
      "[prop.one]": "first"
    admin:
      properties:
        "[prop.two]": "second"
    consumer:
      properties:
        "[prop.three]": "third"
    producer:
      properties:
        "[prop.four]": "fourth"
    streams:
      properties:
        "[prop.five]": "fifth"

これにより、共通の prop.one Kafka プロパティが first (プロデューサー、コンシューマー、管理者に適用)、prop.two admin プロパティが second に、prop.three コンシューマープロパティが third に、prop.four プロデューサープロパティが fourth に、prop.five ストリームプロパティが fifth に設定されます。

Spring Kafka JsonDeserializer は、次のように構成することもできます。

Properties
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another
Yaml
spring:
  kafka:
    consumer:
      value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
      properties:
        "[spring.json.value.default.type]": "com.example.Invoice"
        "[spring.json.trusted.packages]": "com.example.main,com.example.another"

同様に、型情報をヘッダーで送信する JsonSerializer のデフォルトの動作を無効にできます。

Properties
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false
Yaml
spring:
  kafka:
    producer:
      value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
      properties:
        "[spring.json.add.type.headers]": false
この方法で設定されたプロパティは、Spring Boot が明示的にサポートする構成アイテムをオーバーライドします。

3.5. 組み込み Kafka を使用したテスト

Spring for Apache Kafka は、組み込みの Apache Kafka ブローカーを使用してプロジェクトをテストする便利な方法を提供します。この機能を使用するには、spring-kafka-test モジュールの @EmbeddedKafka を使用してテストクラスにアノテーションを付けます。詳細については、Spring for Apache Kafka リファレンスマニュアルを参照してください。

Spring Boot 自動構成を前述の組み込み Apache Kafka ブローカーで動作させるには、組み込みブローカーアドレスのシステムプロパティ (EmbeddedKafkaBroker によって設定される) を Apache Kafka の Spring Boot 構成プロパティに再マップする必要があります。これを行うには、いくつかの方法があります。

  • 組み込みブローカーアドレスをテストクラスの spring.kafka.bootstrap-servers にマップするシステムプロパティを提供します。

static {
    System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}
  • @EmbeddedKafka アノテーションでプロパティ名を構成します。

import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.context.EmbeddedKafka;

@SpringBootTest
@EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {

    // ...

}
  • 構成プロパティでプレースホルダーを使用します。

Properties
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
Yaml
spring:
  kafka:
    bootstrap-servers: "${spring.embedded.kafka.brokers}"

4. RSocket

RSocket (英語) は、バイトストリームトランスポートで使用するためのバイナリプロトコルです。これにより、単一の接続を介して渡される非同期メッセージを介して対称的な相互作用モデルが可能になります。

Spring Framework の spring-messaging モジュールは、クライアント側とサーバー側の両方で RSocket リクエスターとレスポンダーのサポートを提供します。RSocket プロトコルの概要など、詳細については、Spring Framework リファレンスの RSocket セクションを参照してください。

4.1. RSocket 戦略の自動構成

Spring Boot は、RSocket ペイロードのエンコードおよびデコードに必要なすべてのインフラストラクチャを提供する RSocketStrategies Bean を自動構成します。デフォルトでは、自動構成は以下を順番に構成しようとします。

  1. Jackson を使用した CBOR (英語) コーデック

  2. Jackson を使用した JSON コーデック

spring-boot-starter-rsocket スターターは両方の依存関係を提供します。カスタマイズの可能性について詳しくは、Jackson サポートセクションを参照してください。

開発者は、RSocketStrategiesCustomizer インターフェースを実装する Bean を作成することにより、RSocketStrategies コンポーネントをカスタマイズできます。コーデックの順序を決定するため、@Order は重要であることに注意してください。

4.2. RSocket サーバーの自動構成

Spring Boot は、RSocket サーバーの自動構成を提供します。必要な依存関係は spring-boot-starter-rsocket によって提供されます。

Spring Boot を使用すると、WebFlux サーバーから WebSocket を介して RSocket を公開したり、独立した RSocket サーバーを立ち上げることができます。これは、アプリケーションの型とその構成によって異なります。

WebFlux アプリケーション(つまり、型 WebApplicationType.REACTIVE)の場合、RSocket サーバーは、次のプロパティが一致する場合にのみ Web サーバーに接続されます。

Properties
spring.rsocket.server.mapping-path=/rsocket
spring.rsocket.server.transport=websocket
Yaml
spring:
  rsocket:
    server:
      mapping-path: "/rsocket"
      transport: "websocket"
RSocket を Web サーバーに接続することは、Reactor Netty でのみサポートされています。RSocket 自体はそのライブラリで構築されています。

または、RSocket TCP または websocket サーバーは、独立した組み込みサーバーとして起動されます。依存関係の要件に加えて、唯一の必要な構成は、そのサーバーのポートを定義することです。

Properties
spring.rsocket.server.port=9898
Yaml
spring:
  rsocket:
    server:
      port: 9898

4.3. Spring メッセージング RSocket サポート

Spring Boot は、RSocket の Spring メッセージングインフラストラクチャを自動構成します。

これは、Spring Boot が、アプリケーションへの RSocket リクエストを処理する RSocketMessageHandler Bean を作成することを意味します。

4.4. RSocketRequester で RSocket サービスを呼び出す

サーバーとクライアントの間に RSocket チャネルが確立されると、どのパーティも相手とリクエストを送受信できます。

サーバーとして、RSocket @Controller の任意のハンドラーメソッドで RSocketRequester インスタンスを注入できます。クライアントとして、最初に RSocket 接続を構成して確立する必要があります。Spring Boot は、このような場合に予想されるコーデックを使用して RSocketRequester.Builder を自動構成し、任意の RSocketConnectorConfigurer Bean を適用します。

RSocketRequester.Builder インスタンスはプロトタイプ Bean です。つまり、各インジェクションポイントが新しいインスタンスを提供します。このビルダーはステートフルであり、同じインスタンスを使用して異なるセットアップでリクエスターを作成するべきではないため、これは意図的に行われます。

次のコードは典型的な例を示しています。

import reactor.core.publisher.Mono;

import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.stereotype.Service;

@Service
public class MyService {

    private final RSocketRequester rsocketRequester;

    public MyService(RSocketRequester.Builder rsocketRequesterBuilder) {
        this.rsocketRequester = rsocketRequesterBuilder.tcp("example.org", 9898);
    }

    public Mono<User> someRSocketCall(String name) {
        return this.rsocketRequester.route("user").data(name).retrieveMono(User.class);
    }

}

5. Spring Integration

Spring Boot は、spring-boot-starter-integration 「スターター」を含む、Spring Integration で作業するためのいくつかの便利な機能を提供します。Spring Integration は、メッセージング、HTTP、TCP などのその他のトランスポートを抽象化します。Spring Integration がクラスパスで使用可能な場合、@EnableIntegration アノテーションを介して初期化されます。

Spring Integration ポーリングロジックは、自動構成された TaskScheduler に依存しています。デフォルトの PollerMetadata (毎秒無制限のメッセージ数をポーリング)は、spring.integration.poller.* 構成プロパティを使用してカスタマイズできます。

Spring Boot は、追加の Spring Integration モジュールの存在によってトリガーされるいくつかの機能も構成します。spring-integration-jmx もクラスパス上にある場合、メッセージ処理統計は JMX を介して公開されます。spring-integration-jdbc が利用可能な場合、次の行に示すように、起動時にデフォルトのデータベーススキーマを作成できます。

Properties
spring.integration.jdbc.initialize-schema=always
Yaml
spring:
  integration:
    jdbc:
      initialize-schema: "always"

spring-integration-rsocket が使用可能な場合、開発者は "spring.rsocket.server.*" プロパティを使用して RSocket サーバーを構成し、IntegrationRSocketEndpoint または RSocketOutboundGateway コンポーネントを使用して受信 RSocket メッセージを処理できます。このインフラストラクチャは、Spring Integration RSocket チャネルアダプターと @MessageMapping ハンドラーを処理できます("spring.integration.rsocket.server.message-mapping-enabled" が構成されている場合)。

Spring Boot は、構成プロパティを使用して ClientRSocketConnector を自動構成することもできます。

Properties
# Connecting to a RSocket server over TCP
spring.integration.rsocket.client.host=example.org
spring.integration.rsocket.client.port=9898
Yaml
# Connecting to a RSocket server over TCP
spring:
  integration:
    rsocket:
      client:
        host: "example.org"
        port: 9898
Properties
# Connecting to a RSocket Server over WebSocket
spring.integration.rsocket.client.uri=ws://example.org
Yaml
# Connecting to a RSocket Server over WebSocket
spring:
  integration:
    rsocket:
      client:
        uri: "ws://example.org"

詳細については、IntegrationAutoConfiguration [GitHub] (英語) および IntegrationProperties [GitHub] (英語) クラスを参照してください。

6. 次のステップ

次のセクションでは、アプリケーションで IO 機能を有効にする方法について説明します。このセクションでは、キャッシュメール検証REST クライアントなどについて読むことができます。