Apache Pulsar サポート
Apache Pulsar (英語) は、Spring for Apache Pulsar プロジェクトの自動構成を提供することによってサポートされます。
Spring Boot は、org.springframework.pulsar:spring-pulsar
がクラスパス上にある場合、クラシック (必須) Spring for Apache Pulsar コンポーネントを自動構成して登録します。org.springframework.pulsar:spring-pulsar-reactive
がクラスパス上にある場合、リアクティブコンポーネントに対しても同じことが行われます。
命令的使用とリアクティブ的使用のためにそれぞれ依存関係を便利に収集するための spring-boot-starter-pulsar
と spring-boot-starter-pulsar-reactive
の「スターター」があります。
Pulsar への接続
Pulsar スターターを使用すると、Spring Boot は PulsarClient
Bean を自動構成して登録します。
デフォルトでは、アプリケーションは pulsar://localhost:6650
にあるローカル Pulsar インスタンスへの接続を試行します。これは、spring.pulsar.client.service-url
プロパティを別の値に設定することで調整できます。
値は有効な Pulsar プロトコル [Apache] (英語) URL である必要があります |
spring.pulsar.client.*
というプレフィックスが付いたアプリケーションプロパティのいずれかを指定して、クライアントを構成できます。
構成をさらに制御する必要がある場合は、1 つ以上の PulsarClientBuilderCustomizer
Bean を登録することを検討してください。
認証
認証が必要な Pulsar クラスターに接続するには、pluginClassName
とプラグインに必要なパラメーターを設定して、使用する認証プラグインを指定する必要があります。パラメーターをパラメーター名とパラメーター値のマップとして設定できます。次の例は、AuthenticationOAuth2
プラグインを構成する方法を示しています。
プロパティ
YAML
spring.pulsar.client.authentication.plugin-class-name=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
spring.pulsar.client.authentication.param.issuerUrl=https://auth.server.cloud/
spring.pulsar.client.authentication.param.privateKey=file:///Users/some-key.json
spring.pulsar.client.authentication.param.audience=urn:sn:acme:dev:my-instance
spring:
pulsar:
client:
authentication:
plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
param:
issuerUrl: https://auth.server.cloud/
privateKey: file:///Users/some-key.json
audience: urn:sn:acme:dev:my-instance
例: また、バインディングが緩和されていないため、変換中に大文字と小文字の区別が失われるため、認証パラメーターに環境変数を使用することが問題になります。パラメーターに環境変数を使用する場合、正しく動作させるには、Spring for Apache Pulsar リファレンスドキュメントの次の手順に従う必要があります。 |
SSL
デフォルトでは、Pulsar クライアントは Pulsar サービスとプレーンテキストで通信します。Spring for Apache Pulsar リファレンスドキュメントの次の手順に従って、TLS 暗号化を有効にできます。
クライアントと認証の詳細については、Spring for Apache Pulsar リファレンスドキュメントを参照してください。
Pulsar へのリアクティブな接続
リアクティブ自動構成がアクティブ化されると、Spring Boot は ReactivePulsarClient
Bean を自動構成して登録します。
ReactivePulsarClient
は、前述の PulsarClient
のインスタンスを適応させます。前のセクションに従って、ReactivePulsarClient
によって使用される PulsarClient
を構成します。
Pulsar 管理への接続
Spring for Apache Pulsar の PulsarAdministration
クライアントも自動構成されます。
デフォルトでは、アプリケーションは http://localhost:8080
にあるローカル Pulsar インスタンスへの接続を試行します。これは、spring.pulsar.admin.service-url
プロパティを (http|https)://<host>:<port>
の形式で別の値に設定することで調整できます。
構成をさらに制御する必要がある場合は、1 つ以上の PulsarAdminBuilderCustomizer
Bean を登録することを検討してください。
認証
認証が必要な Pulsar クラスターにアクセスする場合、管理クライアントには通常の Pulsar クライアントと同じセキュリティ構成が必要です。spring.pulsar.client.authentication
を spring.pulsar.admin.authentication
に置き換えることで、前述の認証構成を使用できます。
起動時にトピックを作成するには、型 PulsarTopic の Bean を追加します。トピックがすでに存在する場合、Bean は無視されます。 |
メッセージの送信
Spring の PulsarTemplate
は自動構成されており、次の例に示すように、それを使用してメッセージを送信できます。
Java
Kotlin
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final PulsarTemplate<String> pulsarTemplate;
public MyBean(PulsarTemplate<String> pulsarTemplate) {
this.pulsarTemplate = pulsarTemplate;
}
public void someMethod() throws PulsarClientException {
this.pulsarTemplate.send("someTopic", "Hello");
}
}
import org.apache.pulsar.client.api.PulsarClientException
import org.springframework.pulsar.core.PulsarTemplate
import org.springframework.stereotype.Component
@Component
class MyBean(private val pulsarTemplate: PulsarTemplate<String>) {
@Throws(PulsarClientException::class)
fun someMethod() {
pulsarTemplate.send("someTopic", "Hello")
}
}
PulsarTemplate
は、PulsarProducerFactory
に依存して、基礎となる Pulsar プロデューサーを作成します。Spring Boot 自動構成では、このプロデューサーファクトリも提供されます。デフォルトでは、作成したプロデューサーがキャッシュされます。spring.pulsar.producer.*
および spring.pulsar.producer.cache.*
プレフィックスのいずれかのアプリケーションプロパティを指定することで、プロデューサーファクトリとキャッシュの設定を構成できます。
プロデューサーファクトリ構成をより詳細に制御する必要がある場合は、1 つ以上の ProducerBuilderCustomizer
Bean を登録することを検討してください。これらのカスタマイザーは、作成されたすべてのプロデューサーに適用されます。メッセージを送信するときに ProducerBuilderCustomizer
を渡して、現在のプロデューサーにのみ影響を与えることもできます。
送信されるメッセージをさらに制御する必要がある場合は、メッセージの送信時に TypedMessageBuilderCustomizer
を渡すことができます。
メッセージをリアクティブ的に送信する
リアクティブ自動構成がアクティブ化されると、Spring の ReactivePulsarTemplate
が自動構成され、次の例に示すように、それを使用してメッセージを送信できます。
Java
Kotlin
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final ReactivePulsarTemplate<String> pulsarTemplate;
public MyBean(ReactivePulsarTemplate<String> pulsarTemplate) {
this.pulsarTemplate = pulsarTemplate;
}
public void someMethod() {
this.pulsarTemplate.send("someTopic", "Hello").subscribe();
}
}
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate
import org.springframework.stereotype.Component
@Component
class MyBean(private val pulsarTemplate: ReactivePulsarTemplate<String>) {
fun someMethod() {
pulsarTemplate.send("someTopic", "Hello").subscribe()
}
}
ReactivePulsarTemplate
は ReactivePulsarSenderFactory
に依存して、基礎となる送信者を実際に作成します。Spring Boot 自動構成では、この送信者ファクトリも提供され、デフォルトで、作成したプロデューサーがキャッシュされます。spring.pulsar.producer.*
および spring.pulsar.producer.cache.*
プレフィックスの付いたアプリケーションプロパティのいずれかを指定することで、送信側ファクトリとキャッシュの設定を構成できます。
送信側ファクトリ構成をさらに制御する必要がある場合は、1 つ以上の ReactiveMessageSenderBuilderCustomizer
Bean を登録することを検討してください。これらのカスタマイザーは、作成されたすべての送信者に適用されます。メッセージを送信するときに ReactiveMessageSenderBuilderCustomizer
を渡して、現在の送信者にのみ影響を与えることもできます。
送信されるメッセージをさらに制御する必要がある場合は、メッセージの送信時に MessageSpecBuilderCustomizer
を渡すことができます。
メッセージの受信
Apache Pulsar インフラストラクチャが存在する場合、Bean に @PulsarListener
のアノテーションを付けてリスナーエンドポイントを作成できます。次のコンポーネントは、someTopic
トピック上にリスナーエンドポイントを作成します。
Java
Kotlin
import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@PulsarListener(topics = "someTopic")
public void processMessage(String content) {
// ...
}
}
import org.springframework.pulsar.annotation.PulsarListener
import org.springframework.stereotype.Component
@Component
class MyBean {
@PulsarListener(topics = ["someTopic"])
fun processMessage(content: String?) {
// ...
}
}
Spring Boot 自動構成は、PulsarListenerContainerFactory
や、基礎となる Pulsar コンシューマーを構築するために使用するコンシューマーファクトリなど、PulsarListener
に必要なすべてのコンポーネントを提供します。これらのコンポーネントは、spring.pulsar.listener.*
および spring.pulsar.consumer.*
プレフィックスの付いたアプリケーションプロパティのいずれかを指定することで構成できます。
コンシューマーファクトリ構成をさらに制御する必要がある場合は、1 つ以上の ConsumerBuilderCustomizer
Bean を登録することを検討してください。これらのカスタマイザーは、ファクトリによって作成されたすべてのコンシューマー、つまりすべての @PulsarListener
インスタンスに適用されます。@PulsarListener
アノテーションの consumerCustomizer
属性を設定して、単一のリスナーをカスタマイズすることもできます。
メッセージをリアクティブ的に受信する
Apache Pulsar インフラストラクチャが存在し、リアクティブ自動構成がアクティブ化されている場合、Bean に @ReactivePulsarListener
のアノテーションを付けて、リアクティブリスナーエンドポイントを作成できます。次のコンポーネントは、someTopic
トピック上にリアクティブリスナーエンドポイントを作成します。
Java
Kotlin
import reactor.core.publisher.Mono;
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@ReactivePulsarListener(topics = "someTopic")
public Mono<Void> processMessage(String content) {
// ...
return Mono.empty();
}
}
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener
import org.springframework.stereotype.Component
import reactor.core.publisher.Mono
@Component
class MyBean {
@ReactivePulsarListener(topics = ["someTopic"])
fun processMessage(content: String?): Mono<Void> {
// ...
return Mono.empty()
}
}
Spring Boot 自動構成は、ReactivePulsarListenerContainerFactory
や、基礎となるリアクティブ Pulsar コンシューマーを構築するために使用するコンシューマーファクトリなど、ReactivePulsarListener
に必要なすべてのコンポーネントを提供します。これらのコンポーネントは、spring.pulsar.listener.
および spring.pulsar.consumer.
プレフィックスの付いたアプリケーションプロパティのいずれかを指定することで構成できます。
コンシューマーファクトリ構成をさらに制御する必要がある場合は、1 つ以上の ReactiveMessageConsumerBuilderCustomizer
Bean を登録することを検討してください。これらのカスタマイザーは、ファクトリによって作成されたすべてのコンシューマー、つまりすべての @ReactivePulsarListener
インスタンスに適用されます。@ReactivePulsarListener
アノテーションの consumerCustomizer
属性を設定して、単一のリスナーをカスタマイズすることもできます。
メッセージを読む
Pulsar リーダーインターフェースを使用すると、アプリケーションでカーソルを手動で管理できます。リーダーを使用してトピックに接続する場合、リーダーがトピックに接続するときにどのメッセージから読み始めるかを指定する必要があります。
Apache Pulsar インフラストラクチャが存在する場合、任意の Bean に @PulsarReader
のアノテーションを付けて、リーダーを使用してメッセージを消費できます。次のコンポーネントは、someTopic
トピックの先頭からメッセージの読み取りを開始するリーダーエンドポイントを作成します。
Java
Kotlin
import org.springframework.pulsar.annotation.PulsarReader;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@PulsarReader(topics = "someTopic", startMessageId = "earliest")
public void processMessage(String content) {
// ...
}
}
import org.springframework.pulsar.annotation.PulsarReader
import org.springframework.stereotype.Component
@Component
class MyBean {
@PulsarReader(topics = ["someTopic"], startMessageId = "earliest")
fun processMessage(content: String?) {
// ...
}
}
@PulsarReader
は、PulsarReaderFactory
に依存して、基礎となる Pulsar リーダーを作成します。Spring Boot 自動構成は、spring.pulsar.reader.*
プレフィックスの付いたアプリケーションプロパティのいずれかを設定することによってカスタマイズできるこのリーダーファクトリを提供します。
リーダーのファクトリ構成をさらに制御する必要がある場合は、1 つ以上の ReaderBuilderCustomizer
Bean を登録することを検討してください。これらのカスタマイザーは、ファクトリによって作成されたすべてのリーダー、つまりすべての @PulsarReader
インスタンスに適用されます。@PulsarReader
アノテーションの readerCustomizer
属性を設定して、単一のリスナーをカスタマイズすることもできます。
メッセージをリアクティブ的に読む
Apache Pulsar インフラストラクチャが存在し、リアクティブ自動構成がアクティブ化されている場合、Spring の ReactivePulsarReaderFactory
が提供され、これを使用して、リアクティブな方法でメッセージを読み取るためのリーダーを作成できます。次のコンポーネントは、提供されたファクトリを使用してリーダーを作成し、someTopic
トピックから 5 分前の単一メッセージを読み取ります。
Java
Kotlin
import java.time.Instant;
import java.util.List;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.api.StartAtSpec;
import reactor.core.publisher.Mono;
import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer;
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final ReactivePulsarReaderFactory<String> pulsarReaderFactory;
public MyBean(ReactivePulsarReaderFactory<String> pulsarReaderFactory) {
this.pulsarReaderFactory = pulsarReaderFactory;
}
public void someMethod() {
ReactiveMessageReaderBuilderCustomizer<String> readerBuilderCustomizer = (readerBuilder) -> readerBuilder
.topic("someTopic")
.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)));
Mono<Message<String>> message = this.pulsarReaderFactory
.createReader(Schema.STRING, List.of(readerBuilderCustomizer))
.readOne();
// ...
}
}
import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderBuilder
import org.apache.pulsar.reactive.client.api.StartAtSpec
import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory
import org.springframework.stereotype.Component
import java.time.Instant
@Component
class MyBean(private val pulsarReaderFactory: ReactivePulsarReaderFactory<String>) {
fun someMethod() {
val readerBuilderCustomizer = ReactiveMessageReaderBuilderCustomizer {
readerBuilder: ReactiveMessageReaderBuilder<String> ->
readerBuilder
.topic("someTopic")
.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)))
}
val message = pulsarReaderFactory
.createReader(Schema.STRING, listOf(readerBuilderCustomizer))
.readOne()
// ...
}
}
Spring Boot 自動構成は、spring.pulsar.reader.*
プレフィックスの付いたアプリケーションプロパティのいずれかを設定することによってカスタマイズできるこのリーダーファクトリを提供します。
リーダーのファクトリ構成をさらに制御する必要がある場合は、ファクトリを使用してリーダーを作成するときに 1 つ以上の ReactiveMessageReaderBuilderCustomizer
インスタンスを渡すことを検討してください。
リーダーのファクトリ構成をさらに制御する必要がある場合は、1 つ以上の ReactiveMessageReaderBuilderCustomizer
Bean を登録することを検討してください。これらのカスタマイザーは、作成されたすべてのリーダーに適用されます。リーダーの作成時に 1 つ以上の ReactiveMessageReaderBuilderCustomizer
を渡して、作成したリーダーにのみカスタマイズを適用することもできます。
上記のコンポーネントの詳細と、その他の利用可能な機能については、Spring for Apache Pulsar リファレンスドキュメントを参照してください。 |
トランザクションサポート
Spring for Apache Pulsar は、PulsarTemplate
および @PulsarListener
を使用するときにトランザクションをサポートします。
リアクティブバリアントを使用する場合、トランザクションは現在サポートされていません。 |
spring.pulsar.transaction.enabled
プロパティを true
に設定すると、次のようになります。
PulsarTransactionManager
Bean を構成するPulsarTemplate
のトランザクションサポートを有効にする@PulsarListener
メソッドのトランザクションサポートを有効にする
@PulsarListener
の transactional
属性を使用すると、リスナーでトランザクションを使用するタイミングを微調整できます。
Spring for Apache Pulsar トランザクション機能をさらに制御するには、独自の PulsarTemplate
および / または ConcurrentPulsarListenerContainerFactory
Bean を定義する必要があります。また、デフォルトで自動構成された PulsarTransactionManager
が適していない場合は、PulsarAwareTransactionManager
Bean を定義することもできます。
Pulsar の追加プロパティ
自動構成でサポートされるプロパティは、付録の “統合プロパティ” セクションに示されています。ほとんどの場合、これらのプロパティ (ハイフン付きまたはキャメルケース) は Apache Pulsar 構成プロパティに直接マッピングされることに注意してください。詳細については、Apache Pulsar のドキュメントを参照してください。
Pulsar でサポートされるプロパティのサブセットのみが、PulsarProperties
クラスを通じて直接使用できます。直接サポートされていない追加プロパティを使用して自動構成コンポーネントを調整する場合は、前述の各コンポーネントでサポートされているカスタマイザーを使用できます。