Spring Framework は、JmsTemplate
を使用した JMS API の簡易使用から、メッセージを非同期で受信するための完全なインフラストラクチャまで、メッセージングシステムとの統合を幅広くサポートしています。Spring AMQP は、Advanced Message Queuing Protocol に同様の機能セットを提供します。Spring Boot は、RabbitTemplate
および RabbitMQ の自動構成オプションも提供します。Spring WebSocket にはネイティブで STOMP メッセージングのサポートが含まれており、Spring Boot はスターターと少量の自動構成を通じてそれをサポートしています。Spring Boot は Apache Kafka もサポートしています。
1. JMS
javax.jms.ConnectionFactory
インターフェースは、JMS ブローカーと対話するための javax.jms.Connection
を作成する標準的な方法を提供します。Spring は JMS を使用するために ConnectionFactory
を必要としますが、通常は直接使用する必要はなく、代わりに高レベルのメッセージング抽象化に依存できます。(詳細については、Spring Framework リファレンスドキュメントの関連セクションを参照してください)Spring Boot は、メッセージを送受信するために必要なインフラストラクチャーを自動構成します。
1.1. ActiveMQ サポート
ActiveMQ [Apache] (英語) がクラスパスで使用可能な場合、Spring Boot は ConnectionFactory
を構成することもできます。ブローカーが存在する場合、組み込みブローカーが自動的に開始および構成されます(構成を通じてブローカー URL が指定されておらず、組み込みブローカーが構成で無効になっていない場合)。
spring-boot-starter-activemq を使用する場合、JMS と統合する Spring インフラストラクチャと同様に、ActiveMQ インスタンスの接続または埋め込みに必要な依存関係が提供されます。 |
ActiveMQ 構成は、spring.activemq.*
の外部構成プロパティによって制御されます。
デフォルトでは、ActiveMQ は VM トランスポート [Apache] (英語) を使用するように自動構成されており、同じ JVM インスタンスに埋め込まれたブローカーを開始します。
次の例に示すように、spring.activemq.in-memory
プロパティを構成することにより、組み込みブローカーを無効にできます。
spring.activemq.in-memory=false
spring:
activemq:
in-memory: false
次の例に示すように、ブローカー URL を構成すると、組み込みブローカーも無効になります。
spring.activemq.broker-url=tcp://192.168.1.210:9876
spring.activemq.user=admin
spring.activemq.password=secret
spring:
activemq:
broker-url: "tcp://192.168.1.210:9876"
user: "admin"
password: "secret"
組み込みブローカーを完全に制御したい場合は、詳細について ActiveMQ のドキュメント [Apache] (英語) を参照してください。
デフォルトでは、CachingConnectionFactory
は、spring.jms.*
の外部構成プロパティによって制御できる適切な設定でネイティブ ConnectionFactory
をラップします。
spring.jms.cache.session-cache-size=5
spring:
jms:
cache:
session-cache-size: 5
ネイティブプーリングを使用する場合は、次の例に示すように、org.messaginghub:pooled-jms
に依存関係を追加し、それに応じて JmsPoolConnectionFactory
を構成することにより、使用できます。
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=50
spring:
activemq:
pool:
enabled: true
max-connections: 50
サポートされるオプションの詳細については、ActiveMQProperties [GitHub] (英語) を参照してください。より高度なカスタマイズのために、ActiveMQConnectionFactoryCustomizer を実装する任意の数の Bean を登録することもできます。 |
デフォルトでは、ActiveMQ は宛先がまだ存在しない場合に宛先を作成し、指定された名前に対して宛先が解決されるようにします。
1.2. ActiveMQArtemis サポート
Spring Boot は、クラスパスで ActiveMQ Artemis [Apache] (英語) が使用可能であることを検出すると、ConnectionFactory
を自動構成できます。ブローカーが存在する場合、埋め込みモードのブローカーは自動的に起動および構成されます(モードプロパティが明示的に設定されていない場合)。サポートされるモードは、embedded
(組み込みブローカーが必要であり、ブローカーがクラスパスで使用できない場合にエラーが発生することを明示するため)および native
(netty
トランスポートプロトコルを使用してブローカーに接続するため)です。後者が構成されている場合、Spring Boot は、デフォルト設定でローカルマシンで実行されているブローカーに接続する ConnectionFactory
を構成します。
spring-boot-starter-artemis を使用する場合は、既存の ActiveMQ Artemis インスタンスに接続するために必要な依存関係と、JMS と統合するための Spring インフラストラクチャが提供されます。アプリケーションに org.apache.activemq:artemis-jms-server を追加すると、組み込みモードを使用できます。 |
ActiveMQ Artemis 構成は、spring.artemis.*
の外部構成プロパティによって制御されます。例: application.properties
で次のセクションを宣言できます:
spring.artemis.mode=native
spring.artemis.broker-url=tcp://192.168.1.210:9876
spring.artemis.user=admin
spring.artemis.password=secret
spring:
artemis:
mode: native
broker-url: "tcp://192.168.1.210:9876"
user: "admin"
password: "secret"
ブローカーを埋め込むときに、永続性を有効にし、使用可能にする必要がある宛先をリストするかどうかを選択できます。これらをコンマ区切りリストとして指定して、デフォルトのオプションで作成するか、型 org.apache.activemq.artemis.jms.server.config.JMSQueueConfiguration
または org.apache.activemq.artemis.jms.server.config.TopicConfiguration
の Bean をそれぞれ高度なキューおよびトピック構成に定義できます。
デフォルトでは、CachingConnectionFactory
は、spring.jms.*
の外部構成プロパティによって制御できる適切な設定でネイティブ ConnectionFactory
をラップします。
spring.jms.cache.session-cache-size=5
spring:
jms:
cache:
session-cache-size: 5
ネイティブプーリングを使用する場合は、次の例に示すように、org.messaginghub:pooled-jms
に依存関係を追加し、それに応じて JmsPoolConnectionFactory
を構成することにより、使用できます。
spring.artemis.pool.enabled=true
spring.artemis.pool.max-connections=50
spring:
artemis:
pool:
enabled: true
max-connections: 50
サポートされるオプションの詳細については、ArtemisProperties
[GitHub] (英語) を参照してください。
JNDI ルックアップは含まれず、宛先は Artemis 構成の name
属性または構成を通じて提供された名前を使用して、名前に対して解決されます。
1.3. JNDI ConnectionFactory を使用する
アプリケーションサーバーでアプリケーションを実行している場合、Spring Boot は JNDI を使用して JMS ConnectionFactory
を見つけようとします。デフォルトでは、java:/JmsXA
と java:/XAConnectionFactory
の場所がチェックされます。次の例に示すように、別の場所を指定する必要がある場合は、spring.jms.jndi-name
プロパティを使用できます。
spring.jms.jndi-name=java:/MyConnectionFactory
spring:
jms:
jndi-name: "java:/MyConnectionFactory"
1.4. メッセージの送信
Spring の JmsTemplate
は自動構成されており、次の例に示すように、独自の Bean に直接オートワイヤーできます。
@Component
public class MyBean {
private final JmsTemplate jmsTemplate;
public MyBean(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
}
@Component
class MyBean(private val jmsTemplate: JmsTemplate) {
}
JmsMessagingTemplate (Javadoc) は、同様の方法で注入できます。DestinationResolver または MessageConverter Bean が定義されている場合、自動構成された JmsTemplate に自動的に関連付けられます。 |
1.5. メッセージの受信
JMS インフラストラクチャが存在する場合、任意の Bean に @JmsListener
のアノテーションを付けて、リスナーエンドポイントを作成できます。JmsListenerContainerFactory
が定義されていない場合、デフォルトの JmsListenerContainerFactory
が自動的に構成されます。DestinationResolver
、MessageConverter
、javax.jms.ExceptionListener
Bean が定義されている場合、デフォルトのファクトリに自動的に関連付けられます。
デフォルトでは、デフォルトのファクトリはトランザクションです。JtaTransactionManager
が存在するインフラストラクチャで実行すると、デフォルトでリスナーコンテナーに関連付けられます。そうでない場合、sessionTransacted
フラグが有効になります。後者のシナリオでは、リスナーメソッド(またはそのデリゲート)に @Transactional
を追加することにより、ローカルデータストアトランザクションを受信メッセージの処理に関連付けることができます。これにより、ローカルトランザクションが完了すると、受信メッセージが確認されます。これには、同じ JMS セッションで実行されたレスポンスメッセージの送信も含まれます。
次のコンポーネントは、someQueue
宛先にリスナーエンドポイントを作成します。
@Component
public class MyBean {
@JmsListener(destination = "someQueue")
public void processMessage(String content) {
// ...
}
}
@Component
class MyBean {
@JmsListener(destination = "someQueue")
fun processMessage(content: String?) {
// ...
}
}
詳細については、@EnableJms の Javadoc を参照してください。 |
さらに JmsListenerContainerFactory
インスタンスを作成する必要がある場合、またはデフォルトをオーバーライドする場合は、Spring Boot が提供する DefaultJmsListenerContainerFactoryConfigurer
を使用して、自動構成されたものと同じ設定で DefaultJmsListenerContainerFactory
を初期化できます。
たとえば、次の例では、特定の MessageConverter
を使用する別のファクトリを公開しています。
@Configuration(proxyBeanMethods = false)
public class MyJmsConfiguration {
@Bean
public DefaultJmsListenerContainerFactory myFactory(DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
ConnectionFactory connectionFactory = getCustomConnectionFactory();
configurer.configure(factory, connectionFactory);
factory.setMessageConverter(new MyMessageConverter());
return factory;
}
private ConnectionFactory getCustomConnectionFactory() {
return ...
}
}
@Configuration(proxyBeanMethods = false)
class MyJmsConfiguration {
@Bean
fun myFactory(configurer: DefaultJmsListenerContainerFactoryConfigurer): DefaultJmsListenerContainerFactory {
val factory = DefaultJmsListenerContainerFactory()
val connectionFactory = getCustomConnectionFactory()
configurer.configure(factory, connectionFactory)
factory.setMessageConverter(MyMessageConverter())
return factory
}
fun getCustomConnectionFactory() : ConnectionFactory? {
return ...
}
}
次に、次のように、@JmsListener
アノテーション付きメソッドでファクトリを使用できます。
@Component
public class MyBean {
@JmsListener(destination = "someQueue", containerFactory = "myFactory")
public void processMessage(String content) {
// ...
}
}
@Component
class MyBean {
@JmsListener(destination = "someQueue", containerFactory = "myFactory")
fun processMessage(content: String?) {
// ...
}
}
2. AMQP
Advanced Message Queuing Protocol(AMQP)は、メッセージ指向ミドルウェア向けのプラットフォームに依存しないワイヤーレベルのプロトコルです。Spring AMQP プロジェクトは、コア Spring コンセプトを AMQP ベースのメッセージングソリューションの開発に適用します。Spring Boot は、spring-boot-starter-amqp
「スターター」など、RabbitMQ を介して AMQP を操作するためのいくつかの便利な機能を提供します。
2.1. RabbitMQ のサポート
RabbitMQ (英語) は、AMQP プロトコルに基づいた、軽量で信頼性が高く、スケーラブルでポータブルなメッセージブローカーです。Spring は、RabbitMQ
を使用して AMQP プロトコルを介して通信します。
RabbitMQ 構成は、spring.rabbitmq.*
の外部構成プロパティによって制御されます。例: application.properties
で次のセクションを宣言できます。
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
spring:
rabbitmq:
host: "localhost"
port: 5672
username: "admin"
password: "secret"
または、addresses
属性を使用して同じ接続を構成できます。
spring.rabbitmq.addresses=amqp://admin:secret@localhost
spring:
rabbitmq:
addresses: "amqp://admin:secret@localhost"
その方法でアドレスを指定する場合、host および port プロパティは無視されます。アドレスが amqps プロトコルを使用している場合、SSL サポートは自動的に有効になります。 |
サポートされているプロパティベースの構成オプションの詳細については、RabbitProperties
[GitHub] (英語) を参照してください。Spring AMQP で使用される RabbitMQ ConnectionFactory
の下位レベルの詳細を構成するには、ConnectionFactoryCustomizer
Bean を定義します。
ConnectionNameStrategy
Bean がコンテキストに存在する場合、自動構成された CachingConnectionFactory
によって作成された接続に名前を付けるために自動的に使用されます。
詳細については、RabbitMQ で使用されるプロトコルである AMQP を理解する (英語) を参照してください。 |
2.2. メッセージの送信
Spring の AmqpTemplate
および AmqpAdmin
は自動構成され、次の例に示すように、独自の Bean に直接オートワイヤーできます。
@Component
public class MyBean {
private final AmqpAdmin amqpAdmin;
private final AmqpTemplate amqpTemplate;
public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
this.amqpAdmin = amqpAdmin;
this.amqpTemplate = amqpTemplate;
}
}
@Component
class MyBean(private val amqpAdmin: AmqpAdmin, private val amqpTemplate: AmqpTemplate) {
}
RabbitMessagingTemplate (Javadoc) は、同様の方法で注入できます。MessageConverter Bean が定義されている場合、自動構成された AmqpTemplate に自動的に関連付けられます。 |
必要に応じて、Bean として定義されている org.springframework.amqp.core.Queue
は、RabbitMQ インスタンスで対応するキューを宣言するために自動的に使用されます。
操作を再試行するには、AmqpTemplate
で再試行を有効にします(たとえば、ブローカー接続が失われた場合)。
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
spring:
rabbitmq:
template:
retry:
enabled: true
initial-interval: "2s"
再試行はデフォルトで無効になっています。RabbitRetryTemplateCustomizer
Bean を宣言することにより、プログラムで RetryTemplate
をカスタマイズすることもできます。
さらに RabbitTemplate
インスタンスを作成する必要がある場合、またはデフォルトをオーバーライドする場合、Spring Boot は RabbitTemplateConfigurer
Bean を提供します。これを使用して、自動構成で使用されるファクトリと同じ設定で RabbitTemplate
を初期化できます。
2.3. ストリームへのメッセージの送信
特定のストリームにメッセージを送信するには、次の例に示すように、ストリームの名前を指定します。
spring.rabbitmq.stream.name=my-stream
spring:
rabbitmq:
stream:
name: "my-stream"
MessageConverter
、StreamMessageConverter
、ProducerCustomizer
Bean が定義されている場合、自動構成された RabbitStreamTemplate
に自動的に関連付けられます。
さらに RabbitStreamTemplate
インスタンスを作成する必要がある場合、またはデフォルトをオーバーライドする場合、Spring Boot は RabbitStreamTemplateConfigurer
Bean を提供します。これを使用して、自動構成で使用されるファクトリと同じ設定で RabbitStreamTemplate
を初期化できます。
2.4. メッセージの受信
Rabbit インフラストラクチャが存在する場合、任意の Bean に @RabbitListener
のアノテーションを付けて、リスナーエンドポイントを作成できます。RabbitListenerContainerFactory
が定義されていない場合、デフォルトの SimpleRabbitListenerContainerFactory
が自動的に構成され、spring.rabbitmq.listener.type
プロパティを使用して直接コンテナーに切り替えることができます。MessageConverter
または MessageRecoverer
Bean が定義されている場合、デフォルトのファクトリに自動的に関連付けられます。
次のサンプルコンポーネントは、someQueue
キューにリスナーエンドポイントを作成します。
@Component
public class MyBean {
@RabbitListener(queues = "someQueue")
public void processMessage(String content) {
// ...
}
}
@Component
class MyBean {
@RabbitListener(queues = ["someQueue"])
fun processMessage(content: String?) {
// ...
}
}
詳細については、@EnableRabbit の Javadoc を参照してください。 |
さらに RabbitListenerContainerFactory
インスタンスを作成する必要がある場合、またはデフォルトをオーバーライドする場合、Spring Boot は SimpleRabbitListenerContainerFactoryConfigurer
および DirectRabbitListenerContainerFactoryConfigurer
を提供します。これらを使用して、自動構成で使用されるファクトリと同じ設定で SimpleRabbitListenerContainerFactory
および DirectRabbitListenerContainerFactory
を初期化できます。
選択したコンテナーの種類は関係ありません。これらの 2 つの Bean は、自動構成によって公開されます。 |
たとえば、次の構成クラスは、特定の MessageConverter
を使用する別のファクトリを公開します。
@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {
@Bean
public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
ConnectionFactory connectionFactory = getCustomConnectionFactory();
configurer.configure(factory, connectionFactory);
factory.setMessageConverter(new MyMessageConverter());
return factory;
}
private ConnectionFactory getCustomConnectionFactory() {
return ...
}
}
@Configuration(proxyBeanMethods = false)
class MyRabbitConfiguration {
@Bean
fun myFactory(configurer: SimpleRabbitListenerContainerFactoryConfigurer): SimpleRabbitListenerContainerFactory {
val factory = SimpleRabbitListenerContainerFactory()
val connectionFactory = getCustomConnectionFactory()
configurer.configure(factory, connectionFactory)
factory.setMessageConverter(MyMessageConverter())
return factory
}
fun getCustomConnectionFactory() : ConnectionFactory? {
return ...
}
}
次に、次のように、@RabbitListener
アノテーション付きメソッドでファクトリを使用できます。
@Component
public class MyBean {
@RabbitListener(queues = "someQueue", containerFactory = "myFactory")
public void processMessage(String content) {
// ...
}
}
@Component
class MyBean {
@RabbitListener(queues = ["someQueue"], containerFactory = "myFactory")
fun processMessage(content: String?) {
// ...
}
}
再試行を有効にして、リスナーが例外をスローする状況を処理できます。デフォルトでは、RejectAndDontRequeueRecoverer
が使用されますが、独自の MessageRecoverer
を定義できます。再試行が使い果たされると、ブローカーがそうするように構成されている場合、メッセージは拒否され、送達不能交換にドロップまたはルーティングされます。デフォルトでは、再試行は無効になっています。RabbitRetryTemplateCustomizer
Bean を宣言することにより、RetryTemplate
をプログラムでカスタマイズすることもできます。
デフォルトでは、再試行が無効になっていて、リスナーが例外をスローした場合、配信は無期限に再試行されます。この動作は次の 2 つの方法で変更できます。defaultRequeueRejected プロパティを false に設定して再配信をゼロにするか、AmqpRejectAndDontRequeueException をスローしてメッセージを拒否することを通知します。後者は、再試行が有効で、配信試行の最大数に達したときに使用されるメカニズムです。 |
3. Apache Kafka サポート
Apache Kafka (英語) は、spring-kafka
プロジェクトの自動構成を提供することによりサポートされます。
Kafka 構成は、spring.kafka.*
の外部構成プロパティによって制御されます。例: application.properties
で次のセクションを宣言できます。
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring:
kafka:
bootstrap-servers: "localhost:9092"
consumer:
group-id: "myGroup"
起動時にトピックを作成するには、型 NewTopic の Bean を追加します。トピックがすでに存在する場合、Bean は無視されます。 |
サポートされるオプションの詳細については、KafkaProperties
[GitHub] (英語) を参照してください。
3.1. メッセージの送信
Spring の KafkaTemplate
は自動構成されており、次の例に示すように、独自の Bean に直接オートワイヤーできます。
@Component
public class MyBean {
private final KafkaTemplate<String, String> kafkaTemplate;
public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
}
@Component
class MyBean(private val kafkaTemplate: KafkaTemplate<String, String>) {
}
プロパティ spring.kafka.producer.transaction-id-prefix が定義されている場合、KafkaTransactionManager が自動的に構成されます。また、RecordMessageConverter Bean が定義されている場合、自動構成された KafkaTemplate に自動的に関連付けられます。 |
3.2. メッセージの受信
Apache Kafka インフラストラクチャが存在する場合、任意の Bean に @KafkaListener
のアノテーションを付けてリスナーエンドポイントを作成できます。KafkaListenerContainerFactory
が定義されていない場合は、spring.kafka.listener.*
で定義されたキーを使用してデフォルトの KafkaListenerContainerFactory
が自動的に構成されます。
次のコンポーネントは、someTopic
トピックにリスナーエンドポイントを作成します。
@Component
public class MyBean {
@KafkaListener(topics = "someTopic")
public void processMessage(String content) {
// ...
}
}
@Component
class MyBean {
@KafkaListener(topics = ["someTopic"])
fun processMessage(content: String?) {
// ...
}
}
KafkaTransactionManager
Bean が定義されている場合、自動的にコンテナーファクトリに関連付けられます。同様に、RecordFilterStrategy
、CommonErrorHandler
、AfterRollbackProcessor
または ConsumerAwareRebalanceListener
Bean が定義されている場合、自動的にデフォルトのファクトリに関連付けられます。
リスナーの型に応じて、RecordMessageConverter
または BatchMessageConverter
Bean がデフォルトファクトリに関連付けられます。RecordMessageConverter
Bean のみがバッチリスナーに存在する場合、BatchMessageConverter
にラップされます。
カスタム ChainedKafkaTransactionManager は、通常、自動構成された KafkaTransactionManager Bean を参照するため、@Primary とマークする必要があります。 |
3.3. Kafka ストリーム
Spring for Apache Kafka は、StreamsBuilder
オブジェクトを作成し、そのストリームのライフサイクルを管理するためのファクトリ Bean を提供します。Spring Boot は、kafka-streams
がクラスパス上にあり、Kafka ストリームが @EnableKafkaStreams
アノテーションによって有効になっている限り、必要な KafkaStreamsConfiguration
Bean を自動構成します。
Kafka ストリームを有効にすると、アプリケーション ID とブートストラップサーバーを設定する必要があります。前者は spring.kafka.streams.application-id
を使用して設定でき、設定されていない場合はデフォルトで spring.application.name
になります。後者はグローバルに設定することも、ストリームに対してのみ具体的にオーバーライドすることもできます。
専用プロパティを使用して、いくつかの追加プロパティを利用できます。他の任意の Kafka プロパティは、spring.kafka.streams.properties
名前空間を使用して設定できます。詳細については、Kafka の追加プロパティも参照してください。
ファクトリ Bean を使用するには、次の例に示すように、StreamsBuilder
を @Bean
に接続します。
@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {
@Bean
public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>()));
return stream;
}
private KeyValue<Integer, String> uppercaseValue(Integer key, String value) {
return new KeyValue<>(key, value.toUpperCase());
}
}
@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
class MyKafkaStreamsConfiguration {
@Bean
fun kStream(streamsBuilder: StreamsBuilder): KStream<Int, String> {
val stream = streamsBuilder.stream<Int, String>("ks1In")
stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), JsonSerde()))
return stream
}
private fun uppercaseValue(key: Int, value: String): KeyValue<Int?, String?> {
return KeyValue(key, value.uppercase())
}
}
デフォルトでは、StreamBuilder
オブジェクトによって管理されるストリームは自動的に開始されます。spring.kafka.streams.auto-startup
プロパティを使用して、この動作をカスタマイズできます。
3.4. Kafka の追加プロパティ
自動構成でサポートされるプロパティは、付録の “統合プロパティ” セクションに示されています。ほとんどの場合、これらのプロパティ (ハイフン付きまたはキャメルケース) は、Apache Kafka のドット付きプロパティに直接マップされることに注意してください。詳細については、Apache Kafka のドキュメントを参照してください。
これらのプロパティの最初のいくつかは、すべてのコンポーネント (プロデューサー、コンシューマー、管理者、ストリーム) に適用されますが、異なる値を使用する場合はコンポーネントレベルで指定できます。Apache Kafka は、プロパティを HIGH、MEDIUM、LOW の重要度で指定します。Spring Boot の自動構成では、すべての HIGH 重要度プロパティ、一部の選択された MEDIUM および LOW プロパティ、およびデフォルト値がないすべてのプロパティがサポートされます。
Kafka でサポートされているプロパティのサブセットのみが、KafkaProperties
クラスから直接利用できます。直接サポートされていない追加のプロパティを使用してプロデューサーまたはコンシューマーを構成する場合は、次のプロパティを使用します。
spring.kafka.properties[prop.one]=first
spring.kafka.admin.properties[prop.two]=second
spring.kafka.consumer.properties[prop.three]=third
spring.kafka.producer.properties[prop.four]=fourth
spring.kafka.streams.properties[prop.five]=fifth
spring:
kafka:
properties:
"[prop.one]": "first"
admin:
properties:
"[prop.two]": "second"
consumer:
properties:
"[prop.three]": "third"
producer:
properties:
"[prop.four]": "fourth"
streams:
properties:
"[prop.five]": "fifth"
これにより、共通の prop.one
Kafka プロパティが first
(プロデューサー、コンシューマー、管理者に適用)、prop.two
admin プロパティが second
に、prop.three
コンシューマープロパティが third
に、prop.four
プロデューサープロパティが fourth
に、prop.five
ストリームプロパティが fifth
に設定されます。
Spring Kafka JsonDeserializer
は、次のように構成することもできます。
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another
spring:
kafka:
consumer:
value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
properties:
"[spring.json.value.default.type]": "com.example.Invoice"
"[spring.json.trusted.packages]": "com.example.main,com.example.another"
同様に、型情報をヘッダーで送信する JsonSerializer
のデフォルトの動作を無効にできます。
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false
spring:
kafka:
producer:
value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
properties:
"[spring.json.add.type.headers]": false
この方法で設定されたプロパティは、Spring Boot が明示的にサポートする構成アイテムをオーバーライドします。 |
3.5. 組み込み Kafka を使用したテスト
Spring for Apache Kafka は、組み込みの Apache Kafka ブローカーを使用してプロジェクトをテストする便利な方法を提供します。この機能を使用するには、spring-kafka-test
モジュールの @EmbeddedKafka
を使用してテストクラスにアノテーションを付けます。詳細については、Spring for Apache Kafka リファレンスマニュアルを参照してください。
Spring Boot 自動構成を前述の組み込み Apache Kafka ブローカーで動作させるには、組み込みブローカーアドレスのシステムプロパティ (EmbeddedKafkaBroker
によって設定される) を Apache Kafka の Spring Boot 構成プロパティに再マップする必要があります。これを行うには、いくつかの方法があります。
組み込みブローカーアドレスをテストクラスの
spring.kafka.bootstrap-servers
にマップするシステムプロパティを提供します。
static {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}
init {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers")
}
@EmbeddedKafka
アノテーションでプロパティ名を構成します。
@SpringBootTest
@EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {
// ...
}
@SpringBootTest
@EmbeddedKafka(topics = ["someTopic"], bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {
// ...
}
構成プロパティでプレースホルダーを使用します。
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
spring:
kafka:
bootstrap-servers: "${spring.embedded.kafka.brokers}"
4. RSocket
RSocket (英語) は、バイトストリームトランスポートで使用するためのバイナリプロトコルです。これにより、単一の接続を介して渡される非同期メッセージを介して対称的な相互作用モデルが可能になります。
Spring Framework の spring-messaging
モジュールは、クライアント側とサーバー側の両方で RSocket リクエスターとレスポンダーのサポートを提供します。RSocket プロトコルの概要など、詳細については、Spring Framework リファレンスの RSocket セクションを参照してください。
4.1. RSocket 戦略の自動構成
Spring Boot は、RSocket ペイロードのエンコードおよびデコードに必要なすべてのインフラストラクチャを提供する RSocketStrategies
Bean を自動構成します。デフォルトでは、自動構成は以下を順番に構成しようとします。
Jackson を使用した CBOR (英語) コーデック
Jackson を使用した JSON コーデック
spring-boot-starter-rsocket
スターターは両方の依存関係を提供します。カスタマイズの可能性について詳しくは、Jackson サポートセクションを参照してください。
開発者は、RSocketStrategiesCustomizer
インターフェースを実装する Bean を作成することにより、RSocketStrategies
コンポーネントをカスタマイズできます。コーデックの順序を決定するため、@Order
は重要であることに注意してください。
4.2. RSocket サーバーの自動構成
Spring Boot は、RSocket サーバーの自動構成を提供します。必要な依存関係は spring-boot-starter-rsocket
によって提供されます。
Spring Boot を使用すると、WebFlux サーバーから WebSocket を介して RSocket を公開したり、独立した RSocket サーバーを立ち上げることができます。これは、アプリケーションの型とその構成によって異なります。
WebFlux アプリケーション(つまり、型 WebApplicationType.REACTIVE
)の場合、RSocket サーバーは、次のプロパティが一致する場合にのみ Web サーバーに接続されます。
spring.rsocket.server.mapping-path=/rsocket
spring.rsocket.server.transport=websocket
spring:
rsocket:
server:
mapping-path: "/rsocket"
transport: "websocket"
RSocket を Web サーバーに接続することは、Reactor Netty でのみサポートされています。RSocket 自体はそのライブラリで構築されています。 |
または、RSocket TCP または websocket サーバーは、独立した組み込みサーバーとして起動されます。依存関係の要件に加えて、唯一の必要な構成は、そのサーバーのポートを定義することです。
spring.rsocket.server.port=9898
spring:
rsocket:
server:
port: 9898
4.3. Spring メッセージング RSocket サポート
Spring Boot は、RSocket の Spring メッセージングインフラストラクチャを自動構成します。
これは、Spring Boot が、アプリケーションへの RSocket リクエストを処理する RSocketMessageHandler
Bean を作成することを意味します。
4.4. RSocketRequester で RSocket サービスを呼び出す
サーバーとクライアントの間に RSocket
チャネルが確立されると、どのパーティも相手とリクエストを送受信できます。
サーバーとして、RSocket @Controller
の任意のハンドラーメソッドで RSocketRequester
インスタンスを注入できます。クライアントとして、最初に RSocket 接続を構成して確立する必要があります。Spring Boot は、このような場合に予想されるコーデックを使用して RSocketRequester.Builder
を自動構成し、任意の RSocketConnectorConfigurer
Bean を適用します。
RSocketRequester.Builder
インスタンスはプロトタイプ Bean です。つまり、各インジェクションポイントが新しいインスタンスを提供します。このビルダーはステートフルであり、同じインスタンスを使用して異なるセットアップでリクエスターを作成するべきではないため、これは意図的に行われます。
次のコードは典型的な例を示しています。
@Service
public class MyService {
private final RSocketRequester rsocketRequester;
public MyService(RSocketRequester.Builder rsocketRequesterBuilder) {
this.rsocketRequester = rsocketRequesterBuilder.tcp("example.org", 9898);
}
public Mono<User> someRSocketCall(String name) {
return this.rsocketRequester.route("user").data(name).retrieveMono(User.class);
}
}
@Service
class MyService(rsocketRequesterBuilder: RSocketRequester.Builder) {
private val rsocketRequester: RSocketRequester
init {
rsocketRequester = rsocketRequesterBuilder.tcp("example.org", 9898)
}
fun someRSocketCall(name: String): Mono<User> {
return rsocketRequester.route("user").data(name).retrieveMono(
User::class.java
)
}
}
5. Spring Integration
Spring Boot は、spring-boot-starter-integration
「スターター」を含む、Spring Integration で作業するためのいくつかの便利な機能を提供します。Spring Integration は、メッセージング、HTTP、TCP などのその他のトランスポートを抽象化します。Spring Integration がクラスパスで使用可能な場合、@EnableIntegration
アノテーションを介して初期化されます。
Spring Integration ポーリングロジックは、自動構成された TaskScheduler
に依存しています。デフォルトの PollerMetadata
(毎秒無制限のメッセージ数をポーリング)は、spring.integration.poller.*
構成プロパティを使用してカスタマイズできます。
Spring Boot は、追加の Spring Integration モジュールの存在によってトリガーされるいくつかの機能も構成します。spring-integration-jmx
もクラスパス上にある場合、メッセージ処理統計は JMX を介して公開されます。spring-integration-jdbc
が利用可能な場合、次の行に示すように、起動時にデフォルトのデータベーススキーマを作成できます。
spring.integration.jdbc.initialize-schema=always
spring:
integration:
jdbc:
initialize-schema: "always"
spring-integration-rsocket
が使用可能な場合、開発者は "spring.rsocket.server.*"
プロパティを使用して RSocket サーバーを構成し、IntegrationRSocketEndpoint
または RSocketOutboundGateway
コンポーネントを使用して受信 RSocket メッセージを処理できます。このインフラストラクチャは、Spring Integration RSocket チャネルアダプターと @MessageMapping
ハンドラーを処理できます("spring.integration.rsocket.server.message-mapping-enabled"
が構成されている場合)。
Spring Boot は、構成プロパティを使用して ClientRSocketConnector
を自動構成することもできます。
# Connecting to a RSocket server over TCP
spring.integration.rsocket.client.host=example.org
spring.integration.rsocket.client.port=9898
# Connecting to a RSocket server over TCP
spring:
integration:
rsocket:
client:
host: "example.org"
port: 9898
# Connecting to a RSocket Server over WebSocket
spring.integration.rsocket.client.uri=ws://example.org
# Connecting to a RSocket Server over WebSocket
spring:
integration:
rsocket:
client:
uri: "ws://example.org"
詳細については、IntegrationAutoConfiguration
[GitHub] (英語) および IntegrationProperties
[GitHub] (英語) クラスを参照してください。
6. WebSocket
Spring Boot は、組み込み Tomcat、Jetty、Undertow の WebSockets 自動構成を提供します。war ファイルをスタンドアロンコンテナーにデプロイする場合、Spring Boot は、コンテナーが WebSocket サポートの構成を担当していると見なします。
Spring Framework は、spring-boot-starter-websocket
モジュールを介して簡単にアクセスできる MVC Web アプリケーションの豊富な WebSocket サポートを提供します。
WebSocket サポートは、リアクティブ Web アプリケーションでも利用可能であり、spring-boot-starter-webflux
と一緒に WebSocket API を含める必要があります。
<dependency>
<groupId>javax.websocket</groupId>
<artifactId>javax.websocket-api</artifactId>
</dependency>
7. 次のステップ
次のセクションでは、アプリケーションで IO 機能を有効にする方法について説明します。このセクションでは、キャッシュ、メール、検証、REST クライアントなどについて読むことができます。