Spring Framework は、JmsTemplate を使用した JMS API の簡易使用から、メッセージを非同期で受信するための完全なインフラストラクチャまで、メッセージングシステムとの統合を幅広くサポートしています。Spring AMQP は、Advanced Message Queuing Protocol に同様の機能セットを提供します。Spring Boot は、RabbitTemplate および RabbitMQ の自動構成オプションも提供します。Spring WebSocket にはネイティブで STOMP メッセージングのサポートが含まれており、Spring Boot はスターターと少量の自動構成を通じてそれをサポートしています。Spring Boot は、Apache Kafka と Apache Pulsar もサポートしています。

1. JMS

jakarta.jms.ConnectionFactory インターフェースは、JMS ブローカーと対話するための jakarta.jms.Connection を作成する標準的な方法を提供します。Spring は JMS を使用するために ConnectionFactory を必要としますが、通常は直接使用する必要はなく、代わりに高レベルのメッセージング抽象化に依存できます。(詳細については、Spring Framework リファレンスドキュメントの関連セクションを参照してください)Spring Boot は、メッセージを送受信するために必要なインフラストラクチャーを自動構成します。

1.1. ActiveMQ「クラシック」サポート

ActiveMQ「クラシック」 [Apache] (英語) がクラスパスで使用可能な場合、Spring Boot は ConnectionFactory を構成できます。

spring-boot-starter-activemq を使用する場合、JMS と統合するための Spring インフラストラクチャと同様に、ActiveMQ の「クラシック」インスタンスに接続するために必要な依存関係が提供されます。

ActiveMQ の「クラシック」構成は、spring.activemq.*外部構成プロパティによって制御されます。デフォルトでは、ActiveMQ "Classic" は TCP トランスポート [Apache] (英語) を使用するように自動構成され、デフォルトで tcp://localhost:61616 に接続します。次の例は、デフォルトのブローカー URL を変更する方法を示しています。

Properties
spring.activemq.broker-url=tcp://192.168.1.210:9876
spring.activemq.user=admin
spring.activemq.password=secret
Yaml
spring:
  activemq:
    broker-url: "tcp://192.168.1.210:9876"
    user: "admin"
    password: "secret"

デフォルトでは、CachingConnectionFactory は、spring.jms.* の外部構成プロパティによって制御できる適切な設定でネイティブ ConnectionFactory をラップします。

Properties
spring.jms.cache.session-cache-size=5
Yaml
spring:
  jms:
    cache:
      session-cache-size: 5

ネイティブプーリングを使用する場合は、次の例に示すように、org.messaginghub:pooled-jms に依存関係を追加し、それに応じて JmsPoolConnectionFactory を構成することにより、使用できます。

Properties
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=50
Yaml
spring:
  activemq:
    pool:
      enabled: true
      max-connections: 50
サポートされるオプションの詳細については、ActiveMQProperties [GitHub] (英語) を参照してください。より高度なカスタマイズのために、ActiveMQConnectionFactoryCustomizer を実装する任意の数の Bean を登録することもできます。

デフォルトでは、ActiveMQ "Classic" は宛先がまだ存在しない場合にそれを作成し、指定された名前に対して宛先が解決されるようにします。

1.2. ActiveMQArtemis サポート

Spring Boot は、クラスパスで ActiveMQ Artemis [Apache] (英語) が使用可能であることを検出すると、ConnectionFactory を自動構成できます。ブローカーが存在する場合、埋め込みモードのブローカーは自動的に起動および構成されます(モードプロパティが明示的に設定されていない場合)。サポートされるモードは、embedded (組み込みブローカーが必要であり、ブローカーがクラスパスで使用できない場合にエラーが発生することを明示するため)および native (netty トランスポートプロトコルを使用してブローカーに接続するため)です。後者が構成されている場合、Spring Boot は、デフォルト設定でローカルマシンで実行されているブローカーに接続する ConnectionFactory を構成します。

spring-boot-starter-artemis を使用する場合は、既存の ActiveMQ Artemis インスタンスに接続するために必要な依存関係と、JMS と統合するための Spring インフラストラクチャが提供されます。アプリケーションに org.apache.activemq:artemis-jakarta-server を追加すると、組み込みモードを使用できます。

ActiveMQ Artemis 構成は、spring.artemis.* の外部構成プロパティによって制御されます。例: application.properties で次のセクションを宣言できます:

Properties
spring.artemis.mode=native
spring.artemis.broker-url=tcp://192.168.1.210:9876
spring.artemis.user=admin
spring.artemis.password=secret
Yaml
spring:
  artemis:
    mode: native
    broker-url: "tcp://192.168.1.210:9876"
    user: "admin"
    password: "secret"

ブローカーを埋め込むときに、永続性を有効にし、使用可能にする必要がある宛先をリストするかどうかを選択できます。これらをコンマ区切りリストとして指定して、デフォルトのオプションで作成するか、型 org.apache.activemq.artemis.jms.server.config.JMSQueueConfiguration または org.apache.activemq.artemis.jms.server.config.TopicConfiguration の Bean をそれぞれ高度なキューおよびトピック構成に定義できます。

デフォルトでは、CachingConnectionFactory は、spring.jms.* の外部構成プロパティによって制御できる適切な設定でネイティブ ConnectionFactory をラップします。

Properties
spring.jms.cache.session-cache-size=5
Yaml
spring:
  jms:
    cache:
      session-cache-size: 5

ネイティブプーリングを使用したい場合は、次の例に示すように、org.messaginghub:pooled-jms に依存関係を追加し、それに応じて JmsPoolConnectionFactory を構成することで実行できます。

Properties
spring.artemis.pool.enabled=true
spring.artemis.pool.max-connections=50
Yaml
spring:
  artemis:
    pool:
      enabled: true
      max-connections: 50

サポートされるオプションの詳細については、ArtemisProperties [GitHub] (英語) を参照してください。

JNDI ルックアップは含まれず、宛先は、ActiveMQ Artemis 構成の name 属性、または構成を通じて提供された名前のいずれかを使用して、その名前に対して解決されます。

1.3. JNDI ConnectionFactory を使用する

アプリケーションサーバーでアプリケーションを実行している場合、Spring Boot は JNDI を使用して JMS ConnectionFactory を見つけようとします。デフォルトでは、java:/JmsXA と java:/XAConnectionFactory の場所がチェックされます。次の例に示すように、別の場所を指定する必要がある場合は、spring.jms.jndi-name プロパティを使用できます。

Properties
spring.jms.jndi-name=java:/MyConnectionFactory
Yaml
spring:
  jms:
    jndi-name: "java:/MyConnectionFactory"

1.4. メッセージの送信

Spring の JmsTemplate は自動構成されており、次の例に示すように、独自の Bean に直接オートワイヤーできます。

Java
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    private final JmsTemplate jmsTemplate;

    public MyBean(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    // ...

    public void someMethod() {
        this.jmsTemplate.convertAndSend("hello");
    }

}
Kotlin
import org.springframework.jms.core.JmsTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val jmsTemplate: JmsTemplate) {

    // ...

    fun someMethod() {
        jmsTemplate.convertAndSend("hello")
    }

}
JmsMessagingTemplate (Javadoc) は、同様の方法で注入できます。DestinationResolver または MessageConverter Bean が定義されている場合、自動構成された JmsTemplate に自動的に関連付けられます。

1.5. メッセージの受信

JMS インフラストラクチャが存在する場合、任意の Bean に @JmsListener のアノテーションを付けて、リスナーエンドポイントを作成できます。JmsListenerContainerFactory が定義されていない場合、デフォルトの JmsListenerContainerFactory が自動的に構成されます。DestinationResolverMessageConverterjakarta.jms.ExceptionListener Bean が定義されている場合、デフォルトのファクトリに自動的に関連付けられます。

デフォルトでは、デフォルトのファクトリはトランザクションです。JtaTransactionManager が存在するインフラストラクチャで実行すると、デフォルトでリスナーコンテナーに関連付けられます。そうでない場合、sessionTransacted フラグが有効になります。後者のシナリオでは、リスナーメソッド(またはそのデリゲート)に @Transactional を追加することにより、ローカルデータストアトランザクションを受信メッセージの処理に関連付けることができます。これにより、ローカルトランザクションが完了すると、受信メッセージが確認されます。これには、同じ JMS セッションで実行されたレスポンスメッセージの送信も含まれます。

次のコンポーネントは、someQueue 宛先にリスナーエンドポイントを作成します。

Java
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @JmsListener(destination = "someQueue")
    public void processMessage(String content) {
        // ...
    }

}
Kotlin
import org.springframework.jms.annotation.JmsListener
import org.springframework.stereotype.Component

@Component
class MyBean {

    @JmsListener(destination = "someQueue")
    fun processMessage(content: String?) {
        // ...
    }

}
詳細については、@EnableJms の Javadoc を参照してください。

さらに JmsListenerContainerFactory インスタンスを作成する必要がある場合、またはデフォルトをオーバーライドする場合は、Spring Boot が提供する DefaultJmsListenerContainerFactoryConfigurer を使用して、自動構成されたものと同じ設定で DefaultJmsListenerContainerFactory を初期化できます。

たとえば、次の例では、特定の MessageConverter を使用する別のファクトリを公開しています。

Java
import jakarta.jms.ConnectionFactory;

import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;

@Configuration(proxyBeanMethods = false)
public class MyJmsConfiguration {

    @Bean
    public DefaultJmsListenerContainerFactory myFactory(DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        ConnectionFactory connectionFactory = getCustomConnectionFactory();
        configurer.configure(factory, connectionFactory);
        factory.setMessageConverter(new MyMessageConverter());
        return factory;
    }

    private ConnectionFactory getCustomConnectionFactory() {
        return ...
    }

}
Kotlin
import jakarta.jms.ConnectionFactory
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.jms.config.DefaultJmsListenerContainerFactory

@Configuration(proxyBeanMethods = false)
class MyJmsConfiguration {

    @Bean
    fun myFactory(configurer: DefaultJmsListenerContainerFactoryConfigurer): DefaultJmsListenerContainerFactory {
        val factory = DefaultJmsListenerContainerFactory()
        val connectionFactory = getCustomConnectionFactory()
        configurer.configure(factory, connectionFactory)
        factory.setMessageConverter(MyMessageConverter())
        return factory
    }

    fun getCustomConnectionFactory() : ConnectionFactory? {
        return ...
    }

}

次に、次のように、@JmsListener アノテーション付きメソッドでファクトリを使用できます。

Java
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @JmsListener(destination = "someQueue", containerFactory = "myFactory")
    public void processMessage(String content) {
        // ...
    }

}
Kotlin
import org.springframework.jms.annotation.JmsListener
import org.springframework.stereotype.Component

@Component
class MyBean {

    @JmsListener(destination = "someQueue", containerFactory = "myFactory")
    fun processMessage(content: String?) {
        // ...
    }

}

2. AMQP

Advanced Message Queuing Protocol(AMQP)は、メッセージ指向ミドルウェア向けのプラットフォームに依存しないワイヤーレベルのプロトコルです。Spring AMQP プロジェクトは、コア Spring コンセプトを AMQP ベースのメッセージングソリューションの開発に適用します。Spring Boot は、spring-boot-starter-amqp 「スターター」など、RabbitMQ を介して AMQP を操作するためのいくつかの便利な機能を提供します。

2.1. RabbitMQ のサポート

RabbitMQ (英語) は、AMQP プロトコルに基づく、軽量で信頼性が高く、スケーラブルでポータブルなメッセージブローカーです。Spring は、RabbitMQ を使用して AMQP プロトコルを介して通信します。

RabbitMQ 構成は、spring.rabbitmq.* の外部構成プロパティによって制御されます。例: application.properties で次のセクションを宣言できます。

Properties
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
Yaml
spring:
  rabbitmq:
    host: "localhost"
    port: 5672
    username: "admin"
    password: "secret"

または、addresses 属性を使用して同じ接続を構成できます。

Properties
spring.rabbitmq.addresses=amqp://admin:secret@localhost
Yaml
spring:
  rabbitmq:
    addresses: "amqp://admin:secret@localhost"
その方法でアドレスを指定する場合、host および port プロパティは無視されます。アドレスが amqps プロトコルを使用している場合、SSL サポートは自動的に有効になります。

サポートされているプロパティベースの構成オプションの詳細については、RabbitProperties [GitHub] (英語) を参照してください。Spring AMQP で使用される RabbitMQ ConnectionFactory の下位レベルの詳細を構成するには、ConnectionFactoryCustomizer Bean を定義します。

ConnectionNameStrategy Bean がコンテキストに存在する場合、自動構成された CachingConnectionFactory によって作成された接続に名前を付けるために自動的に使用されます。

アプリケーション全体の追加のカスタマイズを RabbitTemplate に行うには、RabbitTemplateCustomizer Bean を使用します。

詳細については、RabbitMQ で使用されるプロトコルである AMQP を理解する (英語) を参照してください。

2.2. メッセージの送信

Spring の AmqpTemplate および AmqpAdmin は自動構成され、次の例に示すように、独自の Bean に直接オートワイヤーできます。

Java
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    private final AmqpAdmin amqpAdmin;

    private final AmqpTemplate amqpTemplate;

    public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
        this.amqpAdmin = amqpAdmin;
        this.amqpTemplate = amqpTemplate;
    }

    // ...

    public void someMethod() {
        this.amqpAdmin.getQueueInfo("someQueue");
    }

    public void someOtherMethod() {
        this.amqpTemplate.convertAndSend("hello");
    }

}
Kotlin
import org.springframework.amqp.core.AmqpAdmin
import org.springframework.amqp.core.AmqpTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val amqpAdmin: AmqpAdmin, private val amqpTemplate: AmqpTemplate) {

    // ...

    fun someMethod() {
        amqpAdmin.getQueueInfo("someQueue")
    }

    fun someOtherMethod() {
        amqpTemplate.convertAndSend("hello")
    }

}
RabbitMessagingTemplate (Javadoc) は、同様の方法で注入できます。MessageConverter Bean が定義されている場合、自動構成された AmqpTemplate に自動的に関連付けられます。

必要に応じて、Bean として定義されている org.springframework.amqp.core.Queue は、RabbitMQ インスタンスで対応するキューを宣言するために自動的に使用されます。

操作を再試行するには、AmqpTemplate で再試行を有効にします(たとえば、ブローカー接続が失われた場合)。

Properties
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
Yaml
spring:
  rabbitmq:
    template:
      retry:
        enabled: true
        initial-interval: "2s"

再試行はデフォルトで無効になっています。RabbitRetryTemplateCustomizer Bean を宣言することにより、プログラムで RetryTemplate をカスタマイズすることもできます。

さらに RabbitTemplate インスタンスを作成する必要がある場合、またはデフォルトをオーバーライドする場合、Spring Boot は RabbitTemplateConfigurer Bean を提供します。これを使用して、自動構成で使用されるファクトリと同じ設定で RabbitTemplate を初期化できます。

2.3. ストリームへのメッセージの送信

特定のストリームにメッセージを送信するには、次の例に示すように、ストリームの名前を指定します。

Properties
spring.rabbitmq.stream.name=my-stream
Yaml
spring:
  rabbitmq:
    stream:
      name: "my-stream"

MessageConverterStreamMessageConverterProducerCustomizer Bean が定義されている場合、自動構成された RabbitStreamTemplate に自動的に関連付けられます。

さらに RabbitStreamTemplate インスタンスを作成する必要がある場合、またはデフォルトをオーバーライドする場合、Spring Boot は RabbitStreamTemplateConfigurer Bean を提供します。これを使用して、自動構成で使用されるファクトリと同じ設定で RabbitStreamTemplate を初期化できます。

2.4. メッセージの受信

Rabbit インフラストラクチャが存在する場合、任意の Bean に @RabbitListener のアノテーションを付けて、リスナーエンドポイントを作成できます。RabbitListenerContainerFactory が定義されていない場合、デフォルトの SimpleRabbitListenerContainerFactory が自動的に構成され、spring.rabbitmq.listener.type プロパティを使用して直接コンテナーに切り替えることができます。MessageConverter または MessageRecoverer Bean が定義されている場合、デフォルトのファクトリに自動的に関連付けられます。

次のサンプルコンポーネントは、someQueue キューにリスナーエンドポイントを作成します。

Java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @RabbitListener(queues = "someQueue")
    public void processMessage(String content) {
        // ...
    }

}
Kotlin
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component

@Component
class MyBean {

    @RabbitListener(queues = ["someQueue"])
    fun processMessage(content: String?) {
        // ...
    }

}
詳細については、@EnableRabbit の Javadoc を参照してください。

さらに RabbitListenerContainerFactory インスタンスを作成する必要がある場合、またはデフォルトをオーバーライドする場合、Spring Boot は SimpleRabbitListenerContainerFactoryConfigurer および DirectRabbitListenerContainerFactoryConfigurer を提供します。これらを使用して、自動構成で使用されるファクトリと同じ設定で SimpleRabbitListenerContainerFactory および DirectRabbitListenerContainerFactory を初期化できます。

選択したコンテナーの種類は関係ありません。これらの 2 つの Bean は、自動構成によって公開されます。

たとえば、次の構成クラスは、特定の MessageConverter を使用する別のファクトリを公開します。

Java
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {

    @Bean
    public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        ConnectionFactory connectionFactory = getCustomConnectionFactory();
        configurer.configure(factory, connectionFactory);
        factory.setMessageConverter(new MyMessageConverter());
        return factory;
    }

    private ConnectionFactory getCustomConnectionFactory() {
        return ...
    }

}
Kotlin
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration(proxyBeanMethods = false)
class MyRabbitConfiguration {

    @Bean
    fun myFactory(configurer: SimpleRabbitListenerContainerFactoryConfigurer): SimpleRabbitListenerContainerFactory {
        val factory = SimpleRabbitListenerContainerFactory()
        val connectionFactory = getCustomConnectionFactory()
        configurer.configure(factory, connectionFactory)
        factory.setMessageConverter(MyMessageConverter())
        return factory
    }

    fun getCustomConnectionFactory() : ConnectionFactory? {
        return ...
    }

}

次に、次のように、@RabbitListener アノテーション付きメソッドでファクトリを使用できます。

Java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @RabbitListener(queues = "someQueue", containerFactory = "myFactory")
    public void processMessage(String content) {
        // ...
    }

}
Kotlin
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component

@Component
class MyBean {

    @RabbitListener(queues = ["someQueue"], containerFactory = "myFactory")
    fun processMessage(content: String?) {
        // ...
    }

}

再試行を有効にして、リスナーが例外をスローする状況を処理できます。デフォルトでは、RejectAndDontRequeueRecoverer が使用されますが、独自の MessageRecoverer を定義できます。再試行が使い果たされると、ブローカーがそうするように構成されている場合、メッセージは拒否され、送達不能交換にドロップまたはルーティングされます。デフォルトでは、再試行は無効になっています。RabbitRetryTemplateCustomizer Bean を宣言することにより、RetryTemplate をプログラムでカスタマイズすることもできます。

デフォルトでは、再試行が無効になっていて、リスナーが例外をスローした場合、配信は無期限に再試行されます。この動作は次の 2 つの方法で変更できます。defaultRequeueRejected プロパティを false に設定して再配信をゼロにするか、AmqpRejectAndDontRequeueException をスローしてメッセージを拒否することを通知します。後者は、再試行が有効で、配信試行の最大数に達したときに使用されるメカニズムです。

3. Apache Kafka サポート

Apache Kafka (英語) は、spring-kafka プロジェクトの自動構成を提供することによりサポートされます。

Kafka 構成は、spring.kafka.* の外部構成プロパティによって制御されます。例: application.properties で次のセクションを宣言できます。

Properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
Yaml
spring:
  kafka:
    bootstrap-servers: "localhost:9092"
    consumer:
      group-id: "myGroup"
起動時にトピックを作成するには、型 NewTopic の Bean を追加します。トピックがすでに存在する場合、Bean は無視されます。

サポートされるオプションの詳細については、KafkaProperties [GitHub] (英語) を参照してください。

3.1. メッセージの送信

Spring の KafkaTemplate は自動構成されており、次の例に示すように、独自の Bean に直接オートワイヤーできます。

Java
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    // ...

    public void someMethod() {
        this.kafkaTemplate.send("someTopic", "Hello");
    }

}
Kotlin
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val kafkaTemplate: KafkaTemplate<String, String>) {

    // ...

    fun someMethod() {
        kafkaTemplate.send("someTopic", "Hello")
    }

}
プロパティ spring.kafka.producer.transaction-id-prefix が定義されている場合、KafkaTransactionManager が自動的に構成されます。また、RecordMessageConverter Bean が定義されている場合、自動構成された KafkaTemplate に自動的に関連付けられます。

3.2. メッセージの受信

Apache Kafka インフラストラクチャが存在する場合、任意の Bean に @KafkaListener のアノテーションを付けてリスナーエンドポイントを作成できます。KafkaListenerContainerFactory が定義されていない場合は、spring.kafka.listener.* で定義されたキーを使用してデフォルトの KafkaListenerContainerFactory が自動的に構成されます。

次のコンポーネントは、someTopic トピックにリスナーエンドポイントを作成します。

Java
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @KafkaListener(topics = "someTopic")
    public void processMessage(String content) {
        // ...
    }

}
Kotlin
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Component

@Component
class MyBean {

    @KafkaListener(topics = ["someTopic"])
    fun processMessage(content: String?) {
        // ...
    }

}

KafkaTransactionManager Bean が定義されている場合、自動的にコンテナーファクトリに関連付けられます。同様に、RecordFilterStrategyCommonErrorHandlerAfterRollbackProcessor または ConsumerAwareRebalanceListener Bean が定義されている場合、自動的にデフォルトのファクトリに関連付けられます。

リスナーの型に応じて、RecordMessageConverter または BatchMessageConverter Bean がデフォルトファクトリに関連付けられます。RecordMessageConverter Bean のみがバッチリスナーに存在する場合、BatchMessageConverter にラップされます。

カスタム ChainedKafkaTransactionManager は、通常、自動構成された KafkaTransactionManager Bean を参照するため、@Primary とマークする必要があります。

3.3. Kafka ストリーム

Spring for Apache Kafka は、StreamsBuilder オブジェクトを作成し、そのストリームのライフサイクルを管理するためのファクトリ Bean を提供します。Spring Boot は、kafka-streams がクラスパス上にあり、Kafka ストリームが @EnableKafkaStreams アノテーションによって有効になっている限り、必要な KafkaStreamsConfiguration Bean を自動構成します。

Kafka ストリームを有効にすると、アプリケーション ID とブートストラップサーバーを設定する必要があります。前者は spring.kafka.streams.application-id を使用して設定でき、設定されていない場合はデフォルトで spring.application.name になります。後者はグローバルに設定することも、ストリームに対してのみ具体的にオーバーライドすることもできます。

専用プロパティを使用して、いくつかの追加プロパティを利用できます。他の任意の Kafka プロパティは、spring.kafka.streams.properties 名前空間を使用して設定できます。詳細については、Kafka の追加プロパティも参照してください。

ファクトリ Bean を使用するには、次の例に示すように、StreamsBuilder を @Bean に接続します。

Java
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.support.serializer.JsonSerde;

@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
        KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
        stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>()));
        return stream;
    }

    private KeyValue<Integer, String> uppercaseValue(Integer key, String value) {
        return new KeyValue<>(key, value.toUpperCase());
    }

}
Kotlin
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Produced
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafkaStreams
import org.springframework.kafka.support.serializer.JsonSerde

@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
class MyKafkaStreamsConfiguration {

    @Bean
    fun kStream(streamsBuilder: StreamsBuilder): KStream<Int, String> {
        val stream = streamsBuilder.stream<Int, String>("ks1In")
        stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), JsonSerde()))
        return stream
    }

    private fun uppercaseValue(key: Int, value: String): KeyValue<Int?, String?> {
        return KeyValue(key, value.uppercase())
    }

}

デフォルトでは、StreamBuilder オブジェクトによって管理されるストリームは自動的に開始されます。spring.kafka.streams.auto-startup プロパティを使用して、この動作をカスタマイズできます。

3.4. Kafka の追加プロパティ

自動構成でサポートされるプロパティは、付録の “統合プロパティ” セクションに示されています。ほとんどの場合、これらのプロパティ (ハイフン付きまたはキャメルケース) は、Apache Kafka のドット付きプロパティに直接マップされることに注意してください。詳細については、Apache Kafka のドキュメントを参照してください。

名前にクライアント型 (producerconsumeradmin または streams) が含まれていないプロパティは共通とみなされ、すべてのクライアントに適用されます。これらの共通プロパティのほとんどは、必要に応じて 1 つ以上のクライアント型に対してオーバーライドできます。

Apache Kafka は、重要度が HIGH、MEDIUM、LOW のプロパティを指定します。Spring Boot 自動構成は、すべての HIGH 重要度プロパティ、一部の選択された MEDIUM および LOW プロパティ、およびデフォルト値を持たないプロパティをサポートします。

Kafka でサポートされるプロパティのサブセットのみが、KafkaProperties クラスを通じて直接使用できます。直接サポートされていない追加のプロパティを使用して個々のクライアント型を構成する場合は、次のプロパティを使用します。

Properties
spring.kafka.properties[prop.one]=first
spring.kafka.admin.properties[prop.two]=second
spring.kafka.consumer.properties[prop.three]=third
spring.kafka.producer.properties[prop.four]=fourth
spring.kafka.streams.properties[prop.five]=fifth
Yaml
spring:
  kafka:
    properties:
      "[prop.one]": "first"
    admin:
      properties:
        "[prop.two]": "second"
    consumer:
      properties:
        "[prop.three]": "third"
    producer:
      properties:
        "[prop.four]": "fourth"
    streams:
      properties:
        "[prop.five]": "fifth"

これにより、共通の prop.one Kafka プロパティが first (プロデューサー、コンシューマー、管理者、ストリームに適用)、prop.two 管理プロパティが secondprop.three コンシューマープロパティが thirdprop.four プロデューサープロパティが fourth、および prop.five ストリームプロパティが fifth に設定されます。

Spring Kafka JsonDeserializer は、次のように構成することもできます。

Properties
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another
Yaml
spring:
  kafka:
    consumer:
      value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
      properties:
        "[spring.json.value.default.type]": "com.example.Invoice"
        "[spring.json.trusted.packages]": "com.example.main,com.example.another"

同様に、型情報をヘッダーで送信する JsonSerializer のデフォルトの動作を無効にできます。

Properties
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false
Yaml
spring:
  kafka:
    producer:
      value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
      properties:
        "[spring.json.add.type.headers]": false
この方法で設定されたプロパティは、Spring Boot が明示的にサポートする構成アイテムをオーバーライドします。

3.5. 組み込み Kafka を使用したテスト

Spring for Apache Kafka は、組み込みの Apache Kafka ブローカーを使用してプロジェクトをテストする便利な方法を提供します。この機能を使用するには、spring-kafka-test モジュールの @EmbeddedKafka を使用してテストクラスにアノテーションを付けます。詳細については、Spring for Apache Kafka リファレンスマニュアルを参照してください。

Spring Boot 自動構成を前述の組み込み Apache Kafka ブローカーで動作させるには、組み込みブローカーアドレスのシステムプロパティ (EmbeddedKafkaBroker によって設定される) を Apache Kafka の Spring Boot 構成プロパティに再マップする必要があります。これを行うには、いくつかの方法があります。

  • 組み込みブローカーアドレスをテストクラスの spring.kafka.bootstrap-servers にマップするシステムプロパティを提供します。

Java
static {
    System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}
Kotlin
init {
    System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers")
}
  • @EmbeddedKafka アノテーションでプロパティ名を構成します。

Java
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.context.EmbeddedKafka;

@SpringBootTest
@EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {

    // ...

}
Kotlin
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.kafka.test.context.EmbeddedKafka

@SpringBootTest
@EmbeddedKafka(topics = ["someTopic"], bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {

    // ...

}
  • 構成プロパティでプレースホルダーを使用します。

Properties
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
Yaml
spring:
  kafka:
    bootstrap-servers: "${spring.embedded.kafka.brokers}"

4. Apache Pulsar サポート

Apache Pulsar (英語) は、Spring for Apache Pulsar プロジェクトの自動構成を提供することによってサポートされます。

Spring Boot は、org.springframework.pulsar:spring-pulsar がクラスパス上にある場合、クラシック (必須) Spring for Apache Pulsar コンポーネントを自動構成して登録します。org.springframework.pulsar:spring-pulsar-reactive がクラスパス上にある場合、リアクティブコンポーネントに対しても同じことが行われます。

命令的使用とリアクティブ的使用のためにそれぞれ依存関係を便利に収集するための spring-boot-starter-pulsar と spring-boot-starter-pulsar-reactive の「スターター」があります。

4.1. Pulsar への接続

Pulsar スターターを使用すると、Spring Boot は PulsarClient Bean を自動構成して登録します。

デフォルトでは、アプリケーションは pulsar://localhost:6650 にあるローカル Pulsar インスタンスへの接続を試行します。これは、spring.pulsar.client.service-url プロパティを別の値に設定することで調整できます。

値は有効な Pulsar プロトコル [Apache] (英語) URL である必要があります

spring.pulsar.client.* というプレフィックスが付いたアプリケーションプロパティのいずれかを指定して、クライアントを構成できます。

構成をさらに制御する必要がある場合は、1 つ以上の PulsarClientBuilderCustomizer Bean を登録することを検討してください。

4.1.1. 認証

認証が必要な Pulsar クラスターに接続するには、pluginClassName とプラグインに必要なパラメーターを設定して、使用する認証プラグインを指定する必要があります。パラメーターをパラメーター名とパラメーター値のマップとして設定できます。次の例は、AuthenticationOAuth2 プラグインを構成する方法を示しています。

Properties
spring.pulsar.client.authentication.plugin-class-name=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
spring.pulsar.client.authentication.param[issuerUrl]=https://auth.server.cloud/
spring.pulsar.client.authentication.param[privateKey]=file:///Users/some-key.json
spring.pulsar.client.authentication.param.audience=urn:sn:acme:dev:my-instance
Yaml
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
        param:
          issuerUrl: https://auth.server.cloud/
          privateKey: file:///Users/some-key.json
          audience: urn:sn:acme:dev:my-instance

spring.pulsar.client.authentication.param.* で定義された名前が、認証プラグイン (通常はキャメルケース) で予期される名前と正確に一致することを確認する必要があります。Spring Boot は、これらのエントリに対していかなる種類の緩和バインディングも試行しません。

例: AuthenticationOAuth2 認証プラグインの発行者 URL を構成する場合は、spring.pulsar.client.authentication.param.issuerUrl を使用する必要があります。issuerurl や issuer-url などの他の形式を使用する場合、設定はプラグインに適用されません。

4.1.2. SSL

デフォルトでは、Pulsar クライアントは Pulsar サービスとプレーンテキストで通信します。Spring for Apache Pulsar リファレンスドキュメントの次の手順に従って、TLS 暗号化を有効にできます。

クライアントと認証の詳細については、Spring for Apache Pulsar リファレンスドキュメントを参照してください。

4.2. Pulsar へのリアクティブな接続

リアクティブ自動構成がアクティブ化されると、Spring Boot は ReactivePulsarClient Bean を自動構成して登録します。

ReactivePulsarClient は、前述の PulsarClient のインスタンスを適応させます。前のセクションに従って、ReactivePulsarClient によって使用される PulsarClient を構成します。

4.3. Pulsar 管理への接続

Spring for Apache Pulsar の PulsarAdministration クライアントも自動構成されます。

デフォルトでは、アプリケーションは http://localhost:8080 にあるローカル Pulsar インスタンスへの接続を試行します。これは、spring.pulsar.admin.service-url プロパティを (http|https)://<host>:<port> の形式で別の値に設定することで調整できます。

構成をさらに制御する必要がある場合は、1 つ以上の PulsarAdminBuilderCustomizer Bean を登録することを検討してください。

4.3.1. 認証

認証が必要な Pulsar クラスターにアクセスする場合、管理クライアントには通常の Pulsar クライアントと同じセキュリティ構成が必要です。spring.pulsar.client.authentication を spring.pulsar.admin.authentication に置き換えることで、前述の認証構成を使用できます。

起動時にトピックを作成するには、型 PulsarTopic の Bean を追加します。トピックがすでに存在する場合、Bean は無視されます。

4.4. メッセージの送信

Spring の PulsarTemplate は自動構成されており、次の例に示すように、それを使用してメッセージを送信できます。

Java
import org.apache.pulsar.client.api.PulsarClientException;

import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    private final PulsarTemplate<String> pulsarTemplate;

    public MyBean(PulsarTemplate<String> pulsarTemplate) {
        this.pulsarTemplate = pulsarTemplate;
    }

    public void someMethod() throws PulsarClientException {
        this.pulsarTemplate.send("someTopic", "Hello");
    }

}
Kotlin
import org.apache.pulsar.client.api.PulsarClientException
import org.springframework.pulsar.core.PulsarTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val pulsarTemplate: PulsarTemplate<String>) {

    @Throws(PulsarClientException::class)
    fun someMethod() {
        pulsarTemplate.send("someTopic", "Hello")
    }

}

PulsarTemplate は、PulsarProducerFactory に依存して、基礎となる Pulsar プロデューサーを作成します。Spring Boot 自動構成では、このプロデューサーファクトリも提供されます。デフォルトでは、作成したプロデューサーがキャッシュされます。spring.pulsar.producer.* および spring.pulsar.producer.cache.* プレフィックスのいずれかのアプリケーションプロパティを指定することで、プロデューサーファクトリとキャッシュの設定を構成できます。

プロデューサーファクトリ構成をより詳細に制御する必要がある場合は、1 つ以上の ProducerBuilderCustomizer Bean を登録することを検討してください。これらのカスタマイザーは、作成されたすべてのプロデューサーに適用されます。メッセージを送信するときに ProducerBuilderCustomizer を渡して、現在のプロデューサーにのみ影響を与えることもできます。

送信されるメッセージをさらに制御する必要がある場合は、メッセージの送信時に TypedMessageBuilderCustomizer を渡すことができます。

4.5. メッセージをリアクティブ的に送信する

リアクティブ自動構成がアクティブ化されると、Spring の ReactivePulsarTemplate が自動構成され、次の例に示すように、それを使用してメッセージを送信できます。

Java
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    private final ReactivePulsarTemplate<String> pulsarTemplate;

    public MyBean(ReactivePulsarTemplate<String> pulsarTemplate) {
        this.pulsarTemplate = pulsarTemplate;
    }

    public void someMethod() {
        this.pulsarTemplate.send("someTopic", "Hello").subscribe();
    }

}
Kotlin
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val pulsarTemplate: ReactivePulsarTemplate<String>) {

    fun someMethod() {
        pulsarTemplate.send("someTopic", "Hello").subscribe()
    }

}

ReactivePulsarTemplate は ReactivePulsarSenderFactory に依存して、基礎となる送信者を実際に作成します。Spring Boot 自動構成では、この送信者ファクトリも提供され、デフォルトで、作成したプロデューサーがキャッシュされます。spring.pulsar.producer.* および spring.pulsar.producer.cache.* プレフィックスの付いたアプリケーションプロパティのいずれかを指定することで、送信側ファクトリとキャッシュの設定を構成できます。

送信側ファクトリ構成をさらに制御する必要がある場合は、1 つ以上の ReactiveMessageSenderBuilderCustomizer Bean を登録することを検討してください。これらのカスタマイザーは、作成されたすべての送信者に適用されます。メッセージを送信するときに ReactiveMessageSenderBuilderCustomizer を渡して、現在の送信者にのみ影響を与えることもできます。

送信されるメッセージをさらに制御する必要がある場合は、メッセージの送信時に MessageSpecBuilderCustomizer を渡すことができます。

4.6. メッセージの受信

Apache Pulsar インフラストラクチャが存在する場合、Bean に @PulsarListener のアノテーションを付けてリスナーエンドポイントを作成できます。次のコンポーネントは、someTopic トピック上にリスナーエンドポイントを作成します。

Java
import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @PulsarListener(topics = "someTopic")
    public void processMessage(String content) {
        // ...
    }

}
Kotlin
import org.springframework.pulsar.annotation.PulsarListener
import org.springframework.stereotype.Component

@Component
class MyBean {

    @PulsarListener(topics = ["someTopic"])
    fun processMessage(content: String?) {
        // ...
    }

}

Spring Boot 自動構成は、PulsarListenerContainerFactory や、基礎となる Pulsar コンシューマーを構築するために使用するコンシューマーファクトリなど、PulsarListener に必要なすべてのコンポーネントを提供します。これらのコンポーネントは、spring.pulsar.listener.* および spring.pulsar.consumer.* プレフィックスの付いたアプリケーションプロパティのいずれかを指定することで構成できます。

コンシューマーファクトリ構成をさらに制御する必要がある場合は、1 つ以上の ConsumerBuilderCustomizer Bean を登録することを検討してください。これらのカスタマイザーは、ファクトリによって作成されたすべてのコンシューマー、つまりすべての @PulsarListener インスタンスに適用されます。@PulsarListener アノテーションの consumerCustomizer 属性を設定して、単一のリスナーをカスタマイズすることもできます。

4.7. メッセージをリアクティブ的に受信する

Apache Pulsar インフラストラクチャが存在し、リアクティブ自動構成がアクティブ化されている場合、Bean に @ReactivePulsarListener のアノテーションを付けて、リアクティブリスナーエンドポイントを作成できます。次のコンポーネントは、someTopic トピック上にリアクティブリスナーエンドポイントを作成します。

Java
import reactor.core.publisher.Mono;

import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @ReactivePulsarListener(topics = "someTopic")
    public Mono<Void> processMessage(String content) {
        // ...
        return Mono.empty();
    }

}
Kotlin
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener
import org.springframework.stereotype.Component
import reactor.core.publisher.Mono

@Component
class MyBean {

    @ReactivePulsarListener(topics = ["someTopic"])
    fun processMessage(content: String?): Mono<Void> {
        // ...
        return Mono.empty()
    }

}

Spring Boot 自動構成は、ReactivePulsarListenerContainerFactory や、基礎となるリアクティブ Pulsar コンシューマーを構築するために使用するコンシューマーファクトリなど、ReactivePulsarListener に必要なすべてのコンポーネントを提供します。これらのコンポーネントは、spring.pulsar.listener. および spring.pulsar.consumer. プレフィックスの付いたアプリケーションプロパティのいずれかを指定することで構成できます。

コンシューマーファクトリ構成をさらに制御する必要がある場合は、1 つ以上の ReactiveMessageConsumerBuilderCustomizer Bean を登録することを検討してください。これらのカスタマイザーは、ファクトリによって作成されたすべてのコンシューマー、つまりすべての @ReactivePulsarListener インスタンスに適用されます。@ReactivePulsarListener アノテーションの consumerCustomizer 属性を設定して、単一のリスナーをカスタマイズすることもできます。

4.8. メッセージを読む

Pulsar リーダーインターフェースを使用すると、アプリケーションでカーソルを手動で管理できます。リーダーを使用してトピックに接続する場合、リーダーがトピックに接続するときにどのメッセージから読み始めるかを指定する必要があります。

Apache Pulsar インフラストラクチャが存在する場合、任意の Bean に @PulsarReader のアノテーションを付けて、リーダーを使用してメッセージを消費できます。次のコンポーネントは、someTopic トピックの先頭からメッセージの読み取りを開始するリーダーエンドポイントを作成します。

Java
import org.springframework.pulsar.annotation.PulsarReader;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @PulsarReader(topics = "someTopic", startMessageId = "earliest")
    public void processMessage(String content) {
        // ...
    }

}
Kotlin
import org.springframework.pulsar.annotation.PulsarReader
import org.springframework.stereotype.Component

@Component
class MyBean {

    @PulsarReader(topics = ["someTopic"], startMessageId = "earliest")
    fun processMessage(content: String?) {
        // ...
    }

}

@PulsarReader は、PulsarReaderFactory に依存して、基礎となる Pulsar リーダーを作成します。Spring Boot 自動構成は、spring.pulsar.reader.* プレフィックスの付いたアプリケーションプロパティのいずれかを設定することによってカスタマイズできるこのリーダーファクトリを提供します。

リーダーのファクトリ構成をさらに制御する必要がある場合は、1 つ以上の ReaderBuilderCustomizer Bean を登録することを検討してください。これらのカスタマイザーは、ファクトリによって作成されたすべてのリーダー、つまりすべての @PulsarReader インスタンスに適用されます。@PulsarReader アノテーションの readerCustomizer 属性を設定して、単一のリスナーをカスタマイズすることもできます。

4.9. メッセージをリアクティブ的に読む

Apache Pulsar インフラストラクチャが存在し、リアクティブ自動構成がアクティブ化されている場合、Spring の ReactivePulsarReaderFactory が提供され、これを使用して、リアクティブな方法でメッセージを読み取るためのリーダーを作成できます。次のコンポーネントは、提供されたファクトリを使用してリーダーを作成し、someTopic トピックから 5 分前の単一メッセージを読み取ります。

Java
import java.time.Instant;
import java.util.List;

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.api.StartAtSpec;
import reactor.core.publisher.Mono;

import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer;
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    private final ReactivePulsarReaderFactory<String> pulsarReaderFactory;

    public MyBean(ReactivePulsarReaderFactory<String> pulsarReaderFactory) {
        this.pulsarReaderFactory = pulsarReaderFactory;
    }

    public void someMethod() {
        ReactiveMessageReaderBuilderCustomizer<String> readerBuilderCustomizer = (readerBuilder) -> readerBuilder
            .topic("someTopic")
            .startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)));
        Mono<Message<String>> message = this.pulsarReaderFactory
            .createReader(Schema.STRING, List.of(readerBuilderCustomizer))
            .readOne();
        // ...
    }

}
Kotlin
import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderBuilder
import org.apache.pulsar.reactive.client.api.StartAtSpec
import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory
import org.springframework.stereotype.Component
import java.time.Instant

@Component
class MyBean(private val pulsarReaderFactory: ReactivePulsarReaderFactory<String>) {

    fun someMethod() {
        val readerBuilderCustomizer = ReactiveMessageReaderBuilderCustomizer {
            readerBuilder: ReactiveMessageReaderBuilder<String> ->
                readerBuilder
                    .topic("someTopic")
                    .startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)))
        }
        val message = pulsarReaderFactory
                .createReader(Schema.STRING, listOf(readerBuilderCustomizer))
                .readOne()
        // ...
    }

}

Spring Boot 自動構成は、spring.pulsar.reader.* プレフィックスの付いたアプリケーションプロパティのいずれかを設定することによってカスタマイズできるこのリーダーファクトリを提供します。

リーダーのファクトリ構成をさらに制御する必要がある場合は、ファクトリを使用してリーダーを作成するときに 1 つ以上の ReactiveMessageReaderBuilderCustomizer インスタンスを渡すことを検討してください。

リーダーのファクトリ構成をさらに制御する必要がある場合は、1 つ以上の ReactiveMessageReaderBuilderCustomizer Bean を登録することを検討してください。これらのカスタマイザーは、作成されたすべてのリーダーに適用されます。リーダーの作成時に 1 つ以上の ReactiveMessageReaderBuilderCustomizer を渡して、作成したリーダーにのみカスタマイズを適用することもできます。

上記のコンポーネントの詳細と、その他の利用可能な機能については、Spring for Apache Pulsar リファレンスドキュメントを参照してください。

4.10. Pulsar の追加プロパティ

自動構成でサポートされるプロパティは、付録の “統合プロパティ” セクションに示されています。ほとんどの場合、これらのプロパティ (ハイフン付きまたはキャメルケース) は Apache Pulsar 構成プロパティに直接マッピングされることに注意してください。詳細については、Apache Pulsar のドキュメントを参照してください。

Pulsar でサポートされるプロパティのサブセットのみが、PulsarProperties クラスを通じて直接使用できます。直接サポートされていない追加プロパティを使用して自動構成コンポーネントを調整する場合は、前述の各コンポーネントでサポートされているカスタマイザーを使用できます。

5. RSocket

RSocket (英語) は、バイトストリームトランスポートで使用するためのバイナリプロトコルです。これにより、単一の接続を介して渡される非同期メッセージを介して対称的な相互作用モデルが可能になります。

Spring Framework の spring-messaging モジュールは、クライアント側とサーバー側の両方で RSocket リクエスターとレスポンダーのサポートを提供します。RSocket プロトコルの概要など、詳細については、Spring Framework リファレンスの RSocket セクションを参照してください。

5.1. RSocket 戦略の自動構成

Spring Boot は、RSocket ペイロードのエンコードおよびデコードに必要なすべてのインフラストラクチャを提供する RSocketStrategies Bean を自動構成します。デフォルトでは、自動構成は以下を順番に構成しようとします。

  1. Jackson を使用した CBOR (英語) コーデック

  2. Jackson を使用した JSON コーデック

spring-boot-starter-rsocket スターターは両方の依存関係を提供します。カスタマイズの可能性について詳しくは、Jackson サポートセクションを参照してください。

開発者は、RSocketStrategiesCustomizer インターフェースを実装する Bean を作成することにより、RSocketStrategies コンポーネントをカスタマイズできます。コーデックの順序を決定するため、@Order は重要であることに注意してください。

5.2. RSocket サーバーの自動構成

Spring Boot は、RSocket サーバーの自動構成を提供します。必要な依存関係は spring-boot-starter-rsocket によって提供されます。

Spring Boot を使用すると、WebFlux サーバーから WebSocket を介して RSocket を公開したり、独立した RSocket サーバーを立ち上げることができます。これは、アプリケーションの型とその構成によって異なります。

WebFlux アプリケーション(つまり、型 WebApplicationType.REACTIVE)の場合、RSocket サーバーは、次のプロパティが一致する場合にのみ Web サーバーに接続されます。

Properties
spring.rsocket.server.mapping-path=/rsocket
spring.rsocket.server.transport=websocket
Yaml
spring:
  rsocket:
    server:
      mapping-path: "/rsocket"
      transport: "websocket"
RSocket を Web サーバーに接続することは、Reactor Netty でのみサポートされています。RSocket 自体はそのライブラリで構築されています。

または、RSocket TCP または websocket サーバーは、独立した組み込みサーバーとして起動されます。依存関係の要件に加えて、唯一の必要な構成は、そのサーバーのポートを定義することです。

Properties
spring.rsocket.server.port=9898
Yaml
spring:
  rsocket:
    server:
      port: 9898

5.3. Spring メッセージング RSocket サポート

Spring Boot は、RSocket の Spring メッセージングインフラストラクチャを自動構成します。

これは、Spring Boot が、アプリケーションへの RSocket リクエストを処理する RSocketMessageHandler Bean を作成することを意味します。

5.4. RSocketRequester で RSocket サービスを呼び出す

サーバーとクライアントの間に RSocket チャネルが確立されると、どのパーティも相手とリクエストを送受信できます。

サーバーとして、RSocket @Controller の任意のハンドラーメソッドで RSocketRequester インスタンスを注入できます。クライアントとして、最初に RSocket 接続を構成して確立する必要があります。Spring Boot は、このような場合に予想されるコーデックを使用して RSocketRequester.Builder を自動構成し、任意の RSocketConnectorConfigurer Bean を適用します。

RSocketRequester.Builder インスタンスはプロトタイプ Bean です。つまり、各インジェクションポイントが新しいインスタンスを提供します。このビルダーはステートフルであり、同じインスタンスを使用して異なるセットアップでリクエスターを作成するべきではないため、これは意図的に行われます。

次のコードは典型的な例を示しています。

Java
import reactor.core.publisher.Mono;

import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.stereotype.Service;

@Service
public class MyService {

    private final RSocketRequester rsocketRequester;

    public MyService(RSocketRequester.Builder rsocketRequesterBuilder) {
        this.rsocketRequester = rsocketRequesterBuilder.tcp("example.org", 9898);
    }

    public Mono<User> someRSocketCall(String name) {
        return this.rsocketRequester.route("user").data(name).retrieveMono(User.class);
    }

}
Kotlin
import org.springframework.messaging.rsocket.RSocketRequester
import org.springframework.stereotype.Service
import reactor.core.publisher.Mono

@Service
class MyService(rsocketRequesterBuilder: RSocketRequester.Builder) {

    private val rsocketRequester: RSocketRequester

    init {
        rsocketRequester = rsocketRequesterBuilder.tcp("example.org", 9898)
    }

    fun someRSocketCall(name: String): Mono<User> {
        return rsocketRequester.route("user").data(name).retrieveMono(
            User::class.java
        )
    }

}

6. Spring Integration

Spring Boot は、spring-boot-starter-integration 「スターター」を含む、Spring Integration で作業するためのいくつかの便利な機能を提供します。Spring Integration は、メッセージング、HTTP、TCP などのその他のトランスポートを抽象化します。Spring Integration がクラスパスで使用可能な場合、@EnableIntegration アノテーションを介して初期化されます。

Spring Integration ポーリングロジックは、自動構成された TaskScheduler に依存しています。デフォルトの PollerMetadata (毎秒無制限のメッセージ数をポーリング)は、spring.integration.poller.* 構成プロパティを使用してカスタマイズできます。

Spring Boot は、追加の Spring Integration モジュールの存在によってトリガーされるいくつかの機能も構成します。spring-integration-jmx もクラスパス上にある場合、メッセージ処理統計は JMX を介して公開されます。spring-integration-jdbc が利用可能な場合、次の行に示すように、起動時にデフォルトのデータベーススキーマを作成できます。

Properties
spring.integration.jdbc.initialize-schema=always
Yaml
spring:
  integration:
    jdbc:
      initialize-schema: "always"

spring-integration-rsocket が使用可能な場合、開発者は "spring.rsocket.server.*" プロパティを使用して RSocket サーバーを構成し、IntegrationRSocketEndpoint または RSocketOutboundGateway コンポーネントを使用して受信 RSocket メッセージを処理できます。このインフラストラクチャは、Spring Integration RSocket チャネルアダプターと @MessageMapping ハンドラーを処理できます("spring.integration.rsocket.server.message-mapping-enabled" が構成されている場合)。

Spring Boot は、構成プロパティを使用して ClientRSocketConnector を自動構成することもできます。

Properties
# Connecting to a RSocket server over TCP
spring.integration.rsocket.client.host=example.org
spring.integration.rsocket.client.port=9898
Yaml
# Connecting to a RSocket server over TCP
spring:
  integration:
    rsocket:
      client:
        host: "example.org"
        port: 9898
Properties
# Connecting to a RSocket Server over WebSocket
spring.integration.rsocket.client.uri=ws://example.org
Yaml
# Connecting to a RSocket Server over WebSocket
spring:
  integration:
    rsocket:
      client:
        uri: "ws://example.org"

詳細については、IntegrationAutoConfiguration [GitHub] (英語) および IntegrationProperties [GitHub] (英語) クラスを参照してください。

7. WebSocket

Spring Boot は、組み込み Tomcat、Jetty、Undertow の WebSockets 自動構成を提供します。war ファイルをスタンドアロンコンテナーにデプロイする場合、Spring Boot は、コンテナーが WebSocket サポートの構成を担当していると見なします。

Spring Framework は、spring-boot-starter-websocket モジュールを介して簡単にアクセスできる MVC Web アプリケーションの豊富な WebSocket サポートを提供します。

WebSocket サポートは、リアクティブ Web アプリケーションでも利用可能であり、spring-boot-starter-webflux と一緒に WebSocket API を含める必要があります。

<dependency>
    <groupId>jakarta.websocket</groupId>
    <artifactId>jakarta.websocket-api</artifactId>
</dependency>

8. 次のステップ

次のセクションでは、アプリケーションで IO 機能を有効にする方法について説明します。このセクションでは、キャッシュメール検証REST クライアントなどについて読むことができます。