Apache Pulsar サポート

Apache Pulsar (英語) は、Spring for Apache Pulsar プロジェクトの自動構成を提供することによってサポートされます。

Spring Boot は、org.springframework.pulsar:spring-pulsar がクラスパス上にある場合、クラシック (必須) Spring for Apache Pulsar コンポーネントを自動構成して登録します。org.springframework.pulsar:spring-pulsar-reactive がクラスパス上にある場合、リアクティブコンポーネントに対しても同じことが行われます。

命令型およびリアクティブ型の使用のために依存関係を便利に収集するための spring-boot-starter-pulsar スターターと spring-boot-starter-pulsar-reactive スターターがそれぞれあります。

Pulsar への接続

Pulsar スターターを使用すると、Spring Boot は PulsarClient (英語) Bean を自動的に構成して登録します。

デフォルトでは、アプリケーションは pulsar://localhost:6650 にあるローカル Pulsar インスタンスへの接続を試行します。これは、spring.pulsar.client.service-url プロパティを別の値に設定することで調整できます。

値は有効な Pulsar プロトコル [Apache] (英語) URL である必要があります

spring.pulsar.client.* というプレフィックスが付いたアプリケーションプロパティのいずれかを指定して、クライアントを構成できます。

構成をさらに細かく制御する必要がある場合は、1 つ以上の PulsarClientBuilderCustomizer (Javadoc) Bean を登録することを検討してください。

認証

認証が必要な Pulsar クラスターに接続するには、pluginClassName とプラグインに必要なパラメーターを設定して、使用する認証プラグインを指定する必要があります。パラメーターをパラメーター名とパラメーター値のマップとして設定できます。次の例は、AuthenticationOAuth2 プラグインを構成する方法を示しています。

  • プロパティ

  • YAML

spring.pulsar.client.authentication.plugin-class-name=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
spring.pulsar.client.authentication.param.issuerUrl=https://auth.server.cloud/
spring.pulsar.client.authentication.param.privateKey=file:///Users/some-key.json
spring.pulsar.client.authentication.param.audience=urn:sn:acme:dev:my-instance
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
        param:
          issuerUrl: https://auth.server.cloud/
          privateKey: file:///Users/some-key.json
          audience: urn:sn:acme:dev:my-instance

spring.pulsar.client.authentication.param.* で定義された名前が、認証プラグイン (通常はキャメルケース) で予期される名前と正確に一致することを確認する必要があります。Spring Boot は、これらのエントリに対していかなる種類の緩和バインディングも試行しません。

例: AuthenticationOAuth2 認証プラグインの発行者 URL を構成する場合は、spring.pulsar.client.authentication.param.issuerUrl を使用する必要があります。issuerurl や issuer-url などの他の形式を使用する場合、設定はプラグインに適用されません。

また、バインディングが緩和されていないため、変換中に大文字と小文字の区別が失われるため、認証パラメーターに環境変数を使用することが問題になります。パラメーターに環境変数を使用する場合、正しく動作させるには、Spring for Apache Pulsar リファレンスドキュメントの次の手順に従う必要があります。

SSL

デフォルトでは、Pulsar クライアントは Pulsar サービスとプレーンテキストで通信します。Spring for Apache Pulsar リファレンスドキュメントの次の手順に従って、TLS 暗号化を有効にできます。

クライアントと認証の詳細については、Spring for Apache Pulsar リファレンスドキュメントを参照してください。

Pulsar へのリアクティブな接続

リアクティブ自動構成がアクティブになると、Spring Boot は ReactivePulsarClient (英語) Bean を自動構成して登録します。

ReactivePulsarClient (英語) は、前述の PulsarClient (英語) のインスタンスを適応させます。前のセクションに従って、ReactivePulsarClient (英語) で使用される PulsarClient (英語) を構成します。

Pulsar 管理への接続

Spring for Apache Pulsar の PulsarAdministration (Javadoc) クライアントも自動構成されます。

デフォルトでは、アプリケーションは http://localhost:8080 にあるローカル Pulsar インスタンスへの接続を試行します。これは、spring.pulsar.admin.service-url プロパティを (http|https)://<host>:<port> の形式で別の値に設定することで調整できます。

構成をさらに細かく制御する必要がある場合は、1 つ以上の PulsarAdminBuilderCustomizer (Javadoc) Bean を登録することを検討してください。

認証

認証が必要な Pulsar クラスターにアクセスする場合、管理クライアントには通常の Pulsar クライアントと同じセキュリティ構成が必要です。spring.pulsar.client.authentication を spring.pulsar.admin.authentication に置き換えることで、前述の認証構成を使用できます。

起動時にトピックを作成するには、型 PulsarTopic (Javadoc) の Bean を追加します。トピックがすでに存在する場合、Bean は無視されます。

メッセージの送信

Spring の PulsarTemplate (Javadoc) は自動構成されており、次の例に示すように、これを使用してメッセージを送信できます。

  • Java

  • Kotlin

import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final PulsarTemplate<String> pulsarTemplate;

	public MyBean(PulsarTemplate<String> pulsarTemplate) {
		this.pulsarTemplate = pulsarTemplate;
	}

	public void someMethod() {
		this.pulsarTemplate.send("someTopic", "Hello");
	}

}
import org.apache.pulsar.client.api.PulsarClientException
import org.springframework.pulsar.core.PulsarTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val pulsarTemplate: PulsarTemplate<String>) {

	@Throws(PulsarClientException::class)
	fun someMethod() {
		pulsarTemplate.send("someTopic", "Hello")
	}

}

PulsarTemplate (Javadoc) は、基盤となる Pulsar プロデューサーを作成するために PulsarProducerFactory (Javadoc) に依存します。Spring Boot 自動構成では、このプロデューサーファクトリも提供され、デフォルトでは、作成したプロデューサーがキャッシュされます。spring.pulsar.producer.* および spring.pulsar.producer.cache.* プレフィックス付きアプリケーションプロパティのいずれかを指定して、プロデューサーファクトリとキャッシュ設定を構成できます。

プロデューサーファクトリ構成をさらに制御する必要がある場合は、1 つ以上の ProducerBuilderCustomizer (Javadoc) Bean を登録することを検討してください。これらのカスタマイザーは、作成されたすべてのプロデューサーに適用されます。メッセージを送信するときに、現在のプロデューサーにのみ影響を与える ProducerBuilderCustomizer (Javadoc) を渡すこともできます。

送信されるメッセージをさらに制御する必要がある場合は、メッセージを送信するときに TypedMessageBuilderCustomizer (Javadoc) を渡すことができます。

メッセージをリアクティブ的に送信する

Reactive 自動構成を有効にすると、Spring の ReactivePulsarTemplate (Javadoc) が自動構成され、次の例に示すように、これを使用してメッセージを送信できます。

  • Java

  • Kotlin

import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final ReactivePulsarTemplate<String> pulsarTemplate;

	public MyBean(ReactivePulsarTemplate<String> pulsarTemplate) {
		this.pulsarTemplate = pulsarTemplate;
	}

	public void someMethod() {
		this.pulsarTemplate.send("someTopic", "Hello").subscribe();
	}

}
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val pulsarTemplate: ReactivePulsarTemplate<String>) {

	fun someMethod() {
		pulsarTemplate.send("someTopic", "Hello").subscribe()
	}

}

ReactivePulsarTemplate (Javadoc) は、実際に基礎となる送信者を作成するために ReactivePulsarSenderFactory (Javadoc) に依存します。Spring Boot 自動構成では、この送信者ファクトリも提供され、デフォルトでは、作成したプロデューサーをキャッシュします。spring.pulsar.producer.* および spring.pulsar.producer.cache.* プレフィックスの付いたアプリケーションプロパティのいずれかを指定して、送信者ファクトリとキャッシュ設定を構成できます。

送信者ファクトリ構成をさらに制御する必要がある場合は、1 つ以上の ReactiveMessageSenderBuilderCustomizer (Javadoc) Bean を登録することを検討してください。これらのカスタマイザーは、作成されたすべての送信者に適用されます。メッセージを送信するときに、現在の送信者にのみ影響を与える ReactiveMessageSenderBuilderCustomizer (Javadoc) を渡すこともできます。

送信されるメッセージをさらに制御する必要がある場合は、メッセージを送信するときに MessageSpecBuilderCustomizer (Javadoc) を渡すことができます。

メッセージの受信

Apache Pulsar インフラストラクチャが存在する場合、任意の Bean に @PulsarListener (Javadoc) のアノテーションを付けてリスナーエンドポイントを作成できます。次のコンポーネントは、someTopic トピックにリスナーエンドポイントを作成します。

  • Java

  • Kotlin

import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@PulsarListener(topics = "someTopic")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.pulsar.annotation.PulsarListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@PulsarListener(topics = ["someTopic"])
	fun processMessage(content: String?) {
		// ...
	}

}

Spring Boot 自動構成は、PulsarListenerContainerFactory (Javadoc) や、基礎となる Pulsar コンシューマーの構築に使用するコンシューマーファクトリなど、PulsarListener (Javadoc) に必要なすべてのコンポーネントを提供します。これらのコンポーネントは、spring.pulsar.listener.* および spring.pulsar.consumer.* プレフィックス付きアプリケーションプロパティのいずれかを指定して構成できます。

コンシューマーファクトリの構成をさらに細かく制御する必要がある場合は、1 つ以上の ConsumerBuilderCustomizer (Javadoc) Bean を登録することを検討してください。これらのカスタマイザーは、ファクトリによって作成されたすべてのコンシューマーに適用され、すべての @PulsarListener (Javadoc) インスタンスに適用されます。@PulsarListener (Javadoc) アノテーションの consumerCustomizer 属性を設定することで、単一のリスナーをカスタマイズすることもできます。

実際のコンテナーファクトリ構成をさらに制御する必要がある場合は、1 つ以上の PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> Bean を登録することを検討してください。

メッセージをリアクティブ的に受信する

Apache Pulsar インフラストラクチャが存在し、Reactive 自動構成がアクティブになっている場合、任意の Bean に @ReactivePulsarListener (Javadoc) のアノテーションを付けて、リアクティブリスナーエンドポイントを作成できます。次のコンポーネントは、someTopic トピックにリアクティブリスナーエンドポイントを作成します。

  • Java

  • Kotlin

import reactor.core.publisher.Mono;

import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@ReactivePulsarListener(topics = "someTopic")
	public Mono<Void> processMessage(String content) {
		// ...
		return Mono.empty();
	}

}
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener
import org.springframework.stereotype.Component
import reactor.core.publisher.Mono

@Component
class MyBean {

	@ReactivePulsarListener(topics = ["someTopic"])
	fun processMessage(content: String?): Mono<Void> {
		// ...
		return Mono.empty()
	}

}

Spring Boot 自動構成は、ReactivePulsarListenerContainerFactory (Javadoc) や、基盤となるリアクティブ Pulsar コンシューマーの構築に使用するコンシューマーファクトリなど、ReactivePulsarListener (Javadoc) に必要なすべてのコンポーネントを提供します。これらのコンポーネントは、spring.pulsar.listener.* および spring.pulsar.consumer.* プレフィックス付きアプリケーションプロパティのいずれかを指定して構成できます。

コンシューマーファクトリの構成をさらに細かく制御する必要がある場合は、1 つ以上の ReactiveMessageConsumerBuilderCustomizer (Javadoc) Bean を登録することを検討してください。これらのカスタマイザーは、ファクトリによって作成されたすべてのコンシューマーに適用され、すべての @ReactivePulsarListener (Javadoc) インスタンスに適用されます。@ReactivePulsarListener (Javadoc) アノテーションの consumerCustomizer 属性を設定することで、単一のリスナーをカスタマイズすることもできます。

実際のコンテナーファクトリ構成をさらに制御する必要がある場合は、1 つ以上の PulsarContainerFactoryCustomizer<DefaultReactivePulsarListenerContainerFactory<?>> Bean を登録することを検討してください。

メッセージを読む

Pulsar リーダーインターフェースを使用すると、アプリケーションでカーソルを手動で管理できます。リーダーを使用してトピックに接続する場合、リーダーがトピックに接続するときにどのメッセージから読み始めるかを指定する必要があります。

Apache Pulsar インフラストラクチャが存在する場合、任意の Bean に @PulsarReader (Javadoc) のアノテーションを付けて、リーダーを使用してメッセージを消費できます。次のコンポーネントは、someTopic トピックの先頭からメッセージの読み取りを開始するリーダーエンドポイントを作成します。

  • Java

  • Kotlin

import org.springframework.pulsar.annotation.PulsarReader;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@PulsarReader(topics = "someTopic", startMessageId = "earliest")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.pulsar.annotation.PulsarReader
import org.springframework.stereotype.Component

@Component
class MyBean {

	@PulsarReader(topics = ["someTopic"], startMessageId = "earliest")
	fun processMessage(content: String?) {
		// ...
	}

}

@PulsarReader (Javadoc) は、基盤となる Pulsar リーダーを作成するために PulsarReaderFactory (Javadoc) に依存しています。Spring Boot 自動構成では、このリーダーファクトリが提供されており、spring.pulsar.reader.* プレフィックスの付いたアプリケーションプロパティのいずれかを設定することでカスタマイズできます。

リーダーファクトリの構成をさらに細かく制御する必要がある場合は、1 つ以上の ReaderBuilderCustomizer (Javadoc) Bean を登録することを検討してください。これらのカスタマイザは、ファクトリによって作成されたすべてのリーダーに適用され、すべての @PulsarReader (Javadoc) インスタンスに適用されます。@PulsarReader (Javadoc) アノテーションの readerCustomizer 属性を設定することで、単一のリスナーをカスタマイズすることもできます。

実際のコンテナーファクトリ構成をさらに制御する必要がある場合は、1 つ以上の PulsarContainerFactoryCustomizer<DefaultPulsarReaderContainerFactory<?>> Bean を登録することを検討してください。

メッセージをリアクティブ的に読む

Apache Pulsar インフラストラクチャが存在し、Reactive 自動構成がアクティブになっている場合、Spring の ReactivePulsarReaderFactory (Javadoc) が提供され、それを使用してリーダーを作成し、リアクティブにメッセージを読み取ることができます。次のコンポーネントは、提供されたファクトリを使用してリーダーを作成し、someTopic トピックから 5 分前のメッセージを 1 つ読み取ります。

  • Java

  • Kotlin

import java.time.Instant;
import java.util.List;

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.api.StartAtSpec;
import reactor.core.publisher.Mono;

import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer;
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final ReactivePulsarReaderFactory<String> pulsarReaderFactory;

	public MyBean(ReactivePulsarReaderFactory<String> pulsarReaderFactory) {
		this.pulsarReaderFactory = pulsarReaderFactory;
	}

	public void someMethod() {
		ReactiveMessageReaderBuilderCustomizer<String> readerBuilderCustomizer = (readerBuilder) -> readerBuilder
			.topic("someTopic")
			.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)));
		Mono<Message<String>> message = this.pulsarReaderFactory
			.createReader(Schema.STRING, List.of(readerBuilderCustomizer))
			.readOne();
		// ...
	}

}
import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderBuilder
import org.apache.pulsar.reactive.client.api.StartAtSpec
import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory
import org.springframework.stereotype.Component
import java.time.Instant

@Component
class MyBean(private val pulsarReaderFactory: ReactivePulsarReaderFactory<String>) {

	fun someMethod() {
		val readerBuilderCustomizer = ReactiveMessageReaderBuilderCustomizer {
			readerBuilder: ReactiveMessageReaderBuilder<String> ->
				readerBuilder
					.topic("someTopic")
					.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)))
		}
		val message = pulsarReaderFactory
				.createReader(Schema.STRING, listOf(readerBuilderCustomizer))
				.readOne()
		// ...
	}

}

Spring Boot 自動構成は、spring.pulsar.reader.* プレフィックスの付いたアプリケーションプロパティのいずれかを設定することによってカスタマイズできるこのリーダーファクトリを提供します。

リーダーファクトリの構成をさらに制御する必要がある場合は、ファクトリを使用してリーダーを作成するときに、1 つ以上の ReactiveMessageReaderBuilderCustomizer (Javadoc) インスタンスを渡すことを検討してください。

リーダーファクトリの構成をさらに制御する必要がある場合は、1 つ以上の ReactiveMessageReaderBuilderCustomizer (Javadoc) Bean を登録することを検討してください。これらのカスタマイザーは、作成されたすべてのリーダーに適用されます。リーダーを作成するときに 1 つ以上の ReactiveMessageReaderBuilderCustomizer (Javadoc) を渡して、作成されたリーダーにのみカスタマイズを適用することもできます。

上記のコンポーネントの詳細と、その他の利用可能な機能については、Spring for Apache Pulsar リファレンスドキュメントを参照してください。

トランザクションサポート

Spring for Apache Pulsar は、PulsarTemplate (Javadoc) および @PulsarListener (Javadoc) を使用するときにトランザクションをサポートします。

リアクティブバリアントを使用する場合、トランザクションは現在サポートされていません。

spring.pulsar.transaction.enabled プロパティを true に設定すると、次のようになります。

@PulsarListener (Javadoc) の transactional 属性を使用すると、リスナーでトランザクションを使用するタイミングを微調整できます。

Spring for Apache Pulsar トランザクション機能をさらに制御するには、独自の PulsarTemplate (Javadoc) および / または ConcurrentPulsarListenerContainerFactory (Javadoc) Bean を定義する必要があります。また、デフォルトで自動構成された PulsarTransactionManager (Javadoc) が適していない場合は、PulsarAwareTransactionManager (Javadoc) Bean を定義することもできます。

Pulsar の追加プロパティ

自動構成でサポートされるプロパティは、付録の統合プロパティセクションに示されています。ほとんどの場合、これらのプロパティ (ハイフン付きまたは camelCase) は、Apache Pulsar 構成プロパティに直接マップされることに注意してください。詳細については、Apache Pulsar のドキュメントを参照してください。

Pulsar でサポートされているプロパティのサブセットのみが、PulsarProperties (Javadoc) クラスを通じて直接使用できます。直接サポートされていない追加のプロパティを使用して自動構成されたコンポーネントを調整する場合は、前述の各コンポーネントでサポートされているカスタマイザーを使用できます。