メッセージの消費

目次

1. Pulsar リスナー

Pulsar コンシューマーに関しては、エンドユーザーアプリケーションで PulsarListener アノテーションを使用することをお勧めします。PulsarListener を使用するには、@EnablePulsar アノテーションを使用する必要があります。Spring Boot サポートを使用すると、このアノテーションが自動的に有効になり、メッセージリスナーインフラストラクチャ (Pulsar コンシューマーの作成を担当する) など、PulsarListener に必要なすべてのコンポーネントが構成されます。PulsarMessageListenerContainer は、PulsarConsumerFactory を使用して、メッセージを消費するために使用する基礎となる Pulsar コンシューマーである Pulsar コンシューマーを作成および管理します。

Spring Boot はこのコンシューマーファクトリを提供しますが、spring.pulsar.consumer.* (英語) アプリケーションプロパティを指定してさらに構成できます。

クイックツアーセクションで見た PulsarListener コードスニペットをもう一度見てみましょう。

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

この方法はさらに簡略化できます。

@PulsarListener
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

この最も基本的な形式では、subscriptionName が @PulsarListener アノテーションに指定されていない場合、自動生成されたサブスクリプション名が使用されます。同様に、topics が直接提供されない場合、トピック解決プロセスを使用して宛先トピックが決定されます。

前に示した PulsarListener メソッドでは、データを String として受け取りますが、スキーマ型を指定しません。内部的には、フレームワークは Pulsar のスキーマメカニズムに依存して、データを必要な型に変換します。フレームワークは、ユーザーが String 型を予期していることを検出し、その情報に基づいてスキーマ型を推測し、そのスキーマをコンシューマーに提供します。フレームワークは、すべてのプリミティブ型に対してこの推論を実行します。すべての非プリミティブ型では、デフォルトのスキーマは JSON であると想定されます。複合型が JSON 以外のもの (AVRO や KEY_VALUE など) を使用している場合は、schemaType プロパティを使用してアノテーションにスキーマ型を指定する必要があります。

次の例は、Integer を受け取る別の PulsarListener メソッドを示しています。

@PulsarListener(subscriptionName = "my-subscription-1", topics = "my-topic-1")
public void listen(Integer message) {
   System.out.println(message);
}

次の PulsarListener メソッドは、トピックから複合型を使用する方法を示しています。

@PulsarListener(subscriptionName = "my-subscription-2", topics = "my-topic-2", schemaType = SchemaType.JSON)
public void listen(Foo message) {
    System.out.println(message);
}

さらにいくつかの方法を見てみましょう。

Pulsar メッセージを直接使用できます。

@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.apache.pulsar.client.api.Message<String> message) {
    System.out.println(message.getValue());
}

次の例では、Spring メッセージングエンベロープを使用してレコードを消費します。

@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.springframework.messaging.Message<String> message) {
    System.out.println(message.getPayload());
}

次に、バッチでレコードを使用する方法を見てみましょう。次の例では、PulsarListener を使用してレコードを POJO としてバッチで消費します。

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Foo> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message));
}

この例では、レコードをオブジェクトのコレクション (List) として受信することに注意してください。さらに、PulsarListener レベルでバッチ消費を有効にするには、アノテーションの batch プロパティを true に設定する必要があります。

List が保持する実際の型に基づいて、フレームワークは使用するスキーマを推測しようとします。List に JSON 以外の複合型が含まれている場合でも、PulsarListener に schemaType を指定する必要があります。

以下では、Pulsar Java クライアントによって提供される Message エンベロープを使用します。

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Message<Foo>> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}

次の例では、Spring メッセージング Message 型のエンベロープを含むバッチレコードを使用します。

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<org.springframework.messaging.Message<Foo>> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message.getPayload()));
}

最後に、Pulsar の Messages ホルダーオブジェクトをバッチリスナーに使用することもできます。

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(org.apache.pulsar.client.api.Messages<Foo>> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}

PulsarListener を使用すると、アノテーション自体に Pulsar コンシューマープロパティを直接提供できます。これは、前述の Boot 構成プロパティを使用したくない場合、または複数の PulsarListener メソッドを使用したくない場合に便利です。

次の例では、Pulsar コンシューマープロパティを PulsarListener で直接使用します。

@PulsarListener(properties = { "subscriptionName=subscription-1", "topicNames=foo-1", "receiverQueueSize=5000" })
void listen(String message) {
}
使用されるプロパティは、spring.pulsar.consumer アプリケーション構成プロパティではなく、直接 Pulsar コンシューマープロパティです。

1.1. AUTO_CONSUME を使用した汎用レコード

Pulsar トピックのスキーマの型を事前に知る機会がない場合は、AUTO_CONSUME スキーマ型を使用して汎用レコードを使用できます。この場合、トピックは、トピックに関連付けられたスキーマ情報を使用して、メッセージを GenericRecord オブジェクトに逆直列化します。

汎用レコードを使用するには、以下に示すように、@PulsarListener で schemaType = SchemaType.AUTO_CONSUME を設定し、型 GenericRecord の Pulsar メッセージをメッセージパラメーターとして使用します。

@PulsarListener(topics = "my-generic-topic", schemaType = SchemaType.AUTO_CONSUME)
void listen(org.apache.pulsar.client.api.Message<GenericRecord> message) {
    GenericRecord record = message.getValue();
    record.getFields().forEach((f) ->
            System.out.printf("%s = %s%n", f.getName(), record.getField(f)));
}
GenericRecord API を使用すると、フィールドとその関連値にアクセスできます。

1.2. ConsumerBuilder のカスタマイズ

以下に示すように、型 PulsarListenerConsumerBuilderCustomizer の @Bean を提供し、それを PulsarListener で使用できるようにすることで、PulsarListenerConsumerBuilderCustomizer を使用して ConsumerBuilder を通じて使用可能なフィールドをカスタマイズできます。

@PulsarListener(topics = "hello-topic", consumerCustomizer = "myCustomizer")
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

@Bean
PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
    return (builder) -> builder.consumerName("myConsumer");
}
アプリケーションに 1 つの @PulsarListener と 1 つの PulsarListenerConsumerBuilderCustomizer Bean しか登録されていない場合、カスタマイザーは自動的に適用されます。

2. スキーマ情報の指定

前に示したように、Java プリミティブの場合、Spring for Apache Pulsar フレームワークは PulsarListener で使用する適切なスキーマを推測できます。非プリミティブ型の場合、アノテーションでスキーマが明示的に指定されていない場合、Spring for Apache Pulsar フレームワークはその型から Schema.JSON を構築しようとします。

現在サポートされている複雑なスキーマ型は、INLINE エンコードを使用した JSON、AVRO、PROTOBUF、AUTO_CONSUME、KEY_VALUE です。

2.1. カスタムスキーママッピング

PulsarListener で複合型のスキーマを指定する代わりに、型のマッピングを使用してスキーマリゾルバーを構成できます。これにより、フレームワークが受信メッセージ型を使用してリゾルバーを参照するため、リスナーにスキーマを設定する必要がなくなります。

2.1.1. 構成プロパティ

スキーママッピングは、spring.pulsar.defaults.type-mappings プロパティを使用して構成できます。次の例では、application.yml を使用して、それぞれ AVRO スキーマと JSON スキーマを使用する User および Address 複合オブジェクトのマッピングを追加します。

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.User
          schema-info:
            schema-type: AVRO
        - message-type: com.acme.Address
          schema-info:
            schema-type: JSON
message-type は、メッセージクラスの完全修飾名です。

2.1.2. スキーマリゾルバーカスタマイザー

マッピングを追加する推奨される方法は、上記のプロパティを使用することです。ただし、より詳細な制御が必要な場合は、スキーマリゾルバーカスタマイザーを提供してマッピングを追加できます。

次の例では、スキーマリゾルバーカスタマイザーを使用して、AVRO スキーマと JSON スキーマをそれぞれ使用する User および Address 複合オブジェクトのマッピングを追加します。

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
	return (schemaResolver) -> {
		schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
		schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
	}
}

2.1.3. 型マッピングのアノテーション

特定のメッセージ型に使用するデフォルトのスキーマ情報を指定するためのもう 1 つのオプションは、メッセージクラスに @PulsarMessage アノテーションをマークすることです。スキーマ情報は、アノテーションの schemaType 属性を介して指定できます。

次の例では、Foo 型のメッセージを生成または消費するときに、デフォルトのスキーマとして JSON を使用するようにシステムを構成します。

@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}

この設定を行うと、リスナーにスキーマを設定する必要はありません。例:

@PulsarListener(subscriptionName = "user-sub", topics = "user-topic")
public void listen(User user) {
    System.out.println(user);
}

3. Pulsar コンシューマーオブジェクトへのアクセス

場合によっては、Pulsar Consumer オブジェクトに直接アクセスする必要があります。次の例は、それを取得する方法を示しています。

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message, org.apache.pulsar.client.api.Consumer<String> consumer) {
    System.out.println("Message Received: " + message);
    ConsumerStats stats = consumer.getStats();
    ...
}
この方法で Consumer オブジェクトにアクセスするときは、受信メソッドを呼び出してコンシューマーのカーソル位置を変更する操作を呼び出さないでください。このような操作はすべてコンテナーによって実行される必要があります。

4. Pulsar メッセージリスナーコンテナー

これで、PulsarListener を介したコンシューマー側の基本的な対話を見てきました。ここで、PulsarListener が基礎となる Pulsar コンシューマーとどのように対話するかという内部動作を詳しく見てみましょう。エンドユーザーアプリケーションの場合、ほとんどのシナリオで、Spring for Apache Pulsar を使用するときに Pulsar トピックから利用するために PulsarListener アノテーションを直接使用することをお勧めします。これは、そのモデルが幅広いアプリケーションユースケースをカバーしているためです。ただし、PulsarListener が内部でどのように動作するかを理解することが重要です。このセクションでは、それらの詳細について説明します。

前に簡単に説明したように、Spring for Apache Pulsar を使用する場合、メッセージリスナーコンテナーがメッセージ消費の中心となります。PulsarListener は、バックグラウンドでメッセージリスナーコンテナーインフラストラクチャを使用して、Pulsar コンシューマーを作成および管理します。Spring for Apache Pulsar は、PulsarMessageListenerContainer を通じてこのメッセージリスナーコンテナーの契約を提供します。このメッセージリスナーコンテナーのデフォルトの実装は、DefaultPulsarMessageListenerContainer を通じて提供されます。名前が示すように、PulsarMessageListenerContainer にはメッセージリスナーが含まれています。コンテナーは Pulsar コンシューマーを作成し、別のスレッドを実行してデータを受信して処理します。データは、提供されたメッセージリスナー実装によって処理されます。

メッセージリスナーコンテナーは、コンシューマーの batchReceive メソッドを使用してデータをバッチで消費します。データが受信されると、選択されたメッセージリスナー実装に渡されます。

Spring for Apache Pulsar を使用する場合、次のメッセージリスナー型を使用できます。

これらのさまざまなメッセージリスナーの詳細については、次のセクションで説明します。

ただし、その前に、コンテナー自体を詳しく見てみましょう。

4.1. DefaultPulsarMessageListenerContainer

これは、単一のコンシューマーベースのメッセージリスナーコンテナーです。次のリストは、そのコンストラクターを示しています。

public DefaultPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
			PulsarContainerProperties pulsarContainerProperties)
}

PulsarConsumerFactory (コンシューマーの作成に使用) と PulsarContainerProperties オブジェクト (コンテナーのプロパティに関する情報が含まれる) を受け取ります。PulsarContainerProperties には次のコンストラクターがあります。

public PulsarContainerProperties(String... topics)

public PulsarContainerProperties(Pattern topicPattern)

トピック情報は、PulsarContainerProperties を通じて、またはコンシューマーファクトリに提供されるコンシューマープロパティとして提供できます。次の例では DefaultPulsarMessageListenerContainer を使用します。

Map<String, Object> config = new HashMap<>();
config.put("topics", "my-topic");
PulsarConsumerFactory<String> pulsarConsumerFactorY = DefaultPulsarConsumerFactory<>(pulsarClient, config);

PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties();

pulsarContainerProperties.setMessageListener((PulsarRecordMessageListener<?>) (consumer, msg) -> {
		});

DefaultPulsarMessageListenerContainer<String> pulsarListenerContainer = new DefaultPulsarMessageListenerContainer(pulsarConsumerFacotyr,
        pulsarContainerProperties);

return pulsarListenerContainer;
リスナーコンテナーを直接使用するときにトピック情報が指定されていない場合は、「メッセージ型のデフォルト」ステップが省略されることを除いて、PulsarListener で使用されるのと同じトピック解決プロセスが使用されます。

DefaultPulsarMessageListenerContainer は単一のコンシューマーのみを作成します。複数のスレッドを通じて複数のコンシューマーを管理したい場合は、ConcurrentPulsarMessageListenerContainer を使用する必要があります。

4.2. ConcurrentPulsarMessageListenerContainer

ConcurrentPulsarMessageListenerContainer には次のコンストラクターがあります。

public ConcurrentPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
    PulsarContainerProperties pulsarContainerProperties)

ConcurrentPulsarMessageListenerContainer を使用すると、setter を通じて concurrency プロパティを指定できます。1 を超える同時実行は、非排他的サブスクリプション (failoversharedkey-shared) でのみ許可されます。排他的サブスクリプションモードを使用している場合、同時実行のためにデフォルトの 1 のみを使用できます。

次の例では、failover サブスクリプションの PulsarListener アノテーションを通じて concurrency を有効にします。

@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
				subscriptionType = SubscriptionType.Failover, concurrency = "3")
void listen(String message, Consumer<String> consumer) {
    ...
    System.out.println("Current Thread: " + Thread.currentThread().getName());
    System.out.println("Current Consumer: " + consumer.getConsumerName());
}

前述のリスナーでは、トピック my-topic に 3 つのパーティションがあると想定しています。パーティション化されていないトピックの場合、同時実行性を 3 に設定しても何も起こりません。メインのアクティブなコンシューマーに加えて、2 つのアイドル状態のコンシューマーが取得されます。トピックに 3 つ以上のパーティションがある場合、メッセージは、コンテナーが作成するコンシューマー間で負荷分散されます。この PulsarListener を実行すると、前の例のスレッド名とコンシューマー名の出力が示すように、異なるパーティションからのメッセージが異なるコンシューマーを介して消費されることがわかります。

パーティション化されたトピックでこの方法で Failover サブスクリプションを使用すると、Pulsar はメッセージの順序を保証します。

次のリストは、PulsarListener の別の例を示していますが、Shared サブスクリプションと concurrency が有効になっています。

@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
				subscriptionType = SubscriptionType.Shared, concurrency = "5")
void listen(String message) {
    ...
}

前の例では、PulsarListener は 5 つの異なるコンシューマーを作成します (今回は、トピックに 5 つのパーティションがあると仮定します)。

Shared サブスクリプションでは Pulsar でのメッセージの順序が保証されないため、このバージョンではメッセージの順序はありません。

メッセージの順序付けが必要で、共有サブスクリプション型が必要な場合は、Key_Shared サブスクリプション型を使用する必要があります。

4.3. レコードの消費

メッセージリスナーコンテナーが単一レコードとバッチベースのメッセージの両方の消費をどのように可能にするかを見てみましょう。

単一レコードの消費

この説明のために、基本的な PulsarListener をもう一度見てみましょう。

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

この PulsarListener メソッドでは、毎回 1 つのレコードでリスナーメソッドを呼び出すように Spring for Apache Pulsar に要求する必要があります。メッセージリスナーコンテナーは、コンシューマー上で batchReceive メソッドを使用してデータをバッチで消費すると述べました。この場合、フレームワークは PulsarListener が単一のレコードを受信することを検出します。これは、メソッドを呼び出すたびに 1 つのレコードが必要であることを意味します。レコードはメッセージリスナーコンテナーによってバッチで消費されますが、受信したバッチを反復処理し、PulsarRecordMessageListener のアダプターを介してリスナーメソッドを呼び出します。前のセクションでわかるように、PulsarRecordMessageListener は Pulsar Java クライアントによって提供される MessageListener を継承し、基本的な received メソッドをサポートします。

バッチ消費

次の例は、PulsarListener がレコードをバッチで消費することを示しています。

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen4(List<Foo> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message));
}

この型の PulsarListener を使用すると、フレームワークはバッチモードであることを検出します。Consumer の batchReceive メソッドを使用してデータをバッチですでに受信しているため、PulsarBatchMessageListener のアダプターを通じてバッチ全体をリスナーメソッドに渡します。

5. Pulsar ヘッダー

Pulsar メッセージメタデータは、Spring メッセージヘッダーとして使用できます。使用可能なヘッダーのリストは PulsarHeaders.java [GitHub] (英語) にあります。

5.1. 単一レコードベースのコンシューマーでのアクセス

次の例は、単一レコード消費モードを使用するアプリケーションでさまざまな Pulsar ヘッダーにアクセスする方法を示しています。

@PulsarListener(topics = "simpleListenerWithHeaders")
void simpleListenerWithHeaders(String data, @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
                @Header(PulsarHeaders.RAW_DATA) byte[] rawData,
                @Header("foo") String foo) {

}

前述の例では、messageId および rawData メッセージメタデータの値と、foo という名前のカスタムメッセージプロパティにアクセスします。Spring @Header アノテーションは各ヘッダーフィールドに使用されます。

Pulsar の Message をペイロードを運ぶエンベロープとして使用することもできます。その際、ユーザーはメタデータを取得するために Pulsar メッセージ上の対応するメソッドを直接呼び出すことができます。ただし、便宜上、Header アノテーションを使用して取得することもできます。Spring メッセージング Message エンベロープを使用してペイロードを伝送し、@Header を使用して Pulsar ヘッダーを取得することもできることに注意してください。

5.2. バッチレコードベースのコンシューマーでのアクセス

このセクションでは、バッチコンシューマーを使用するアプリケーションでさまざまな Pulsar ヘッダーにアクセスする方法を説明します。

@PulsarListener(topics = "simpleBatchListenerWithHeaders", batch = true)
void simpleBatchListenerWithHeaders(List<String> data,
					@Header(PulsarHeaders.MESSAGE_ID) List<MessageId> messageIds,
					@Header(PulsarHeaders.TOPIC_NAME) List<String> topicNames, @Header("foo") List<String> fooValues) {

}

前の例では、データを List<String> として消費します。さまざまなヘッダーを抽出するときも、List<> として抽出します。Spring for Apache Pulsar は、ヘッダーリストがデータリストに対応していることを保証します。

バッチリスナーを使用し、ペイロードを List<org.apache.pulsar.client.api.Message<?>org.apache.pulsar.client.api.Messages<?>、または org.springframework.messaging.Messsge<?> として受信する場合も、同じ方法でヘッダーを抽出できます。

6. メッセージの確認

Spring for Apache Pulsar を使用する場合、アプリケーションによってオプトアウトされない限り、メッセージ確認応答はフレームワークによって処理されます。このセクションでは、フレームワークがメッセージ確認応答を処理する方法について詳しく説明します。

6.1. メッセージ ACK モード

Spring for Apache Pulsar は、メッセージを確認するために次のモードを提供します。

  • BATCH

  • RECORD

  • MANUAL

BATCH 確認応答モードはデフォルトですが、メッセージリスナーコンテナーで変更できます。次のセクションでは、PulsarListener の単一バージョンとバッチバージョンの両方を使用する場合に確認応答がどのように機能するか、またそれらがバッキングメッセージリスナーコンテナー (そして最終的には Pulsar コンシューマー) にどのように変換されるかを見ていきます。

6.2. シングルレコードモードでの自動メッセージ確認応答

基本的な単一メッセージベースの PulsarListener をもう一度見てみましょう。

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

特に Pulsar コンシューマーを直接使用することに慣れている場合は、PulsarListener を使用するときに確認応答がどのように機能するか疑問に思うのは自然なことです。答えはメッセージリスナーコンテナーにあります。これは、コンシューマー関連のすべてのアクティビティを調整する Spring for Apache Pulsar の中心的な場所です。

デフォルトの動作をオーバーライドしないと仮定すると、前述の PulsarListener を使用するとバックグラウンドで次のことが起こります。

  1. まず、リスナーコンテナーは、Pulsar コンシューマーからメッセージをバッチとして受信します。

  2. 受信されたメッセージは、一度に 1 メッセージずつ PulsarListener に渡されます。

  3. すべてのレコードがリスナーメソッドに渡され、正常に処理されると、コンテナーは元のバッチからのすべてのメッセージを承認します。

これが通常の流れです。元のバッチのいずれかのレコードが例外をスローした場合、Spring for Apache Pulsar はそれらのレコードを個別に追跡します。バッチからのすべてのレコードが処理されると、Spring for Apache Pulsar は成功したすべてのメッセージを肯定応答し、失敗したすべてのメッセージを否定的に肯定応答 (ナック) します。つまり、PulsarRecordMessageListener を使用して単一レコードを消費し、BATCH のデフォルトの ack モードが使用される場合、フレームワークは batchReceive 呼び出しから受信したすべてのレコードが正常に処理されるのを待ってから、Pulsar コンシューマーで acknowledge メソッドを呼び出します。ハンドラーメソッドの呼び出し時に特定のレコードが例外をスローした場合、Spring for Apache Pulsar はそれらのレコードを追跡し、バッチ全体が処理された後にそれらのレコードに対して個別に negativeAcknowledge を呼び出します。

アプリケーションがレコードごとに肯定応答または否定応答を発生させたい場合は、RECORD ACK モードを有効にすることができます。この場合、各レコードを処理した後、エラーがなければメッセージは肯定応答され、エラーがあった場合は否定応答されます。次の例では、Pulsar リスナーで RECORD ACK モードを有効にします。

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.RECORD)
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

6.3. シングルレコードモードでの手動メッセージ確認応答

フレームワークが常に確認応答を送信する必要があるわけではなく、アプリケーション自体から直接送信する必要がある場合があります。Spring for Apache Pulsar では、手動のメッセージ確認応答を有効にする方法がいくつか提供されています。次の例は、そのうちの 1 つを示しています。

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Acknowledgment acknowledgment) {
    System.out.println("Message Received: " + message.getValue());
	acknowledgment.acknowledge();
}

ここで説明する価値のあることがいくつかあります。まず、PulsarListener で ackMode を設定することで、手動 ACK モードを有効にします。手動 ACK モードを有効にすると、Spring for Apache Pulsar はアプリケーションに Acknowledgment オブジェクトを挿入させます。フレームワークは、互換性のあるメッセージリスナーコンテナー (単一レコードベースの消費用の PulsarAcknowledgingMessageListener ) を選択することでこれを実現します。これにより、Acknowledgment オブジェクトへのアクセスが可能になります。

Acknowledgment オブジェクトは、次の API メソッドを提供します。

void acknowledge();

void acknowledge(MessageId messageId);

void acknowledge(List<MessageId> messageIds);

void nack();

void nack(MessageId messageId);

MANUAL ACK モードを使用しているときに、この Acknowledgment オブジェクトを PulsarListener に挿入し、対応するメソッドの 1 つを呼び出すことができます。

前述の PulsarListener の例では、パラメーターのない acknowledge メソッドを呼び出します。これは、フレームワークが現在どの Message で動作しているかを認識しているためです。acknowledge() を呼び出す場合、Message エンベロープでペイロードを受信する必要はなく、ターゲット型 (この例では String) を使用します。メッセージ ID: acknowledge.acknowledge(message.getMessageId()); を指定して、acknowledge の別のバリアントを呼び出すこともできます。acknowledge(messageId) を使用する場合は、Message<?> エンベロープを使用してペイロードを受信する必要があります。

確認応答で可能なことと同様に、Acknowledgment API は否定応答のオプションも提供します。前に示した nack メソッドを参照してください。

Pulsar コンシューマー上で acknowledge を直接呼び出すこともできます。

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Consumer<String> consumer) {
    System.out.println("Message Received: " + message.getValue());
	try {
		consumer.acknowledge(message);
	}
	catch (Exception e) {
		....
	}
}

基盤となるコンシューマーで acknowledge を直接呼び出す場合は、自分でエラー処理を行う必要があります。Acknowledgment を使用する場合は、フレームワークが自動的に実行してくれるため、その必要はありません。手動確認応答を使用する場合は、Acknowledgment オブジェクトアプローチを使用する必要があります。

手動確認を使用する場合、フレームワークでは確認がまったく行われないことを理解することが重要です。アプリケーションを設計する際には、適切な確認戦略を熟考することが非常に重要です。

6.4. バッチ消費における自動メッセージ確認応答

レコードをバッチで消費し ( "メッセージ ACK モード" を参照)、デフォルトの ACK モード BATCH を使用する場合、バッチ全体が正常に処理されると、バッチ全体が承認されます。いずれかのレコードが例外をスローした場合、バッチ全体が否定的に認識されます。これは、プロデューサー側でバッチ化されたバッチと同じではない可能性があることに注意してください。むしろ、これはコンシューマーでの batchReceive の呼び出しから返されたバッチです。

次のバッチリスナーを考えてみましょう。

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", batch = true)
public void batchListen(List<Foo> messages) {
    for (Foo foo : messages) {
		...
    }
}

受信コレクション (この例では messages ) 内のすべてのメッセージが処理されると、フレームワークはすべてのメッセージを確認します。

バッチモードで使用する場合、RECORD は許可された ACK モードではありません。アプリケーションはバッチ全体の再配信を望まない場合があるため、問題が発生する可能性があります。このような状況では、MANUAL 確認モードを使用する必要があります。

6.5. バッチ消費における手動メッセージ確認応答

前のセクションで見たように、MANUAL ACK モードがメッセージリスナーコンテナーに設定されている場合、フレームワークは肯定的または否定的な確認を行いません。このような関心事に対処できるかどうかは完全にアプリケーション次第です。MANUAL ACK モードが設定されている場合、Spring for Apache Pulsar は互換性のあるメッセージリスナーコンテナー (バッチ消費用の PulsarBatchAcknowledgingMessageListener ) を選択します。これにより、Acknowledgment オブジェクトへのアクセスが可能になります。Acknowledgment API で使用できるメソッドは次のとおりです。

void acknowledge();

void acknowledge(MessageId messageId);

void acknowledge(List<MessageId> messageIds);

void nack();

void nack(MessageId messageId);

MANUAL ACK モードを使用しているときに、この Acknowledgment オブジェクトを PulsarListener に挿入できます。次のリストは、バッチベースのリスナーの基本的な例を示しています。

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(List<Message<String>> messgaes, Acknowlegement acknowledgment) {
    for (Message<String> message : messages) {
		try {
			...
	        acknowledgment.acknowledge(message.getMessageId());
		}
		catch (Exception e) {
			acknowledgment.nack(message.getMessageId());
		}
    }
}

バッチリスナーを使用する場合、メッセージリスナーコンテナーは現在どのレコードを操作しているかを認識できません。手動で確認するには、MessageId または List<MessageId> を受け取るオーバーロードされた acknowledge メソッドのいずれかを使用する必要があります。MessageId を使用してバッチリスナーに対して否定応答することもできます。

7. メッセージの再配信とエラー処理

PulsarListener とメッセージリスナーコンテナーのインフラストラクチャとそのさまざまな機能について説明したため、次にメッセージの再配信とエラー処理を理解してみましょう。Apache Pulsar は、メッセージの再配信とエラー処理のためのさまざまなネイティブ戦略を提供します。見て、Spring for Apache Pulsar を通じてどのように使用できるかを見てみましょう。

7.1. メッセージ再配信の確認応答タイムアウトの指定

デフォルトでは、Pulsar コンシューマーはクラッシュしない限りメッセージを再配信しませんが、Pulsar コンシューマーに ack タイムアウトを設定することでこの動作を変更できます。ack タイムアウトプロパティの値が 0 より大きく、Pulsar コンシューマーがタイムアウト期間内にメッセージを確認応答しない場合、メッセージは再配信されます。

Spring for Apache Pulsar を使用する場合、コンシューマーカスタマイザーを介して、または @PulsarListener の properties 属性のネイティブ Pulsar ackTimeoutMillis プロパティを使用して、このプロパティを設定できます。

@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
                properties = {"ackTimeoutMillis=60000"})
public void listen(String s) {
    ...
}

ack タイムアウトを指定した場合、コンシューマーが 60 秒以内に確認応答を送信しない場合、メッセージは Pulsar によってコンシューマーに再配信されます。

さまざまな遅延を伴う ACK タイムアウトの高度なバックオフオプションを指定する場合は、次の手順を実行できます。

@EnablePulsar
@Configuration
class AckTimeoutRedeliveryConfig {

    @PulsarListener(subscriptionName = "withAckTimeoutRedeliveryBackoffSubscription",
            topics = "withAckTimeoutRedeliveryBackoff-test-topic",
            ackTimeoutRedeliveryBackoff = "ackTimeoutRedeliveryBackoff",
            properties = { "ackTimeoutMillis=60000" })
    void listen(String msg) {
        // some long-running process that may cause an ack timeout
    }

    @Bean
    RedeliveryBackoff ackTimeoutRedeliveryBackoff() {
        return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
                .build();
    }

}

前述の例では、Pulsar の RedeliveryBackoff の Bean を、最小遅延 1 秒、最大遅延 10 秒、バックオフ乗数 2 で指定します。最初の ACK タイムアウトが発生した後、メッセージの再配信はこのバックオフ Bean によって制御されます。ackTimeoutRedeliveryBackoff プロパティを実際の Bean 名 (この場合は ackTimeoutRedeliveryBackoff) に設定することで、バックオフ Bean を PulsarListener アノテーションに提供します。

7.2. 否定応答の再配信の指定

否定応答する場合、Pulsar コンシューマーでは、アプリケーションがメッセージを再配信する方法を指定できます。デフォルトでは、メッセージは 1 分以内に再配信されますが、コンシューマーカスタマイザーを使用するか、@PulsarListener の properties 属性のネイティブ Pulsar negativeAckRedeliveryDelay プロパティを使用して変更できます。

@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
                properties = {"negativeAckRedeliveryDelay=10ms"})
public void listen(String s) {
    ...
}

次のように、RedeliveryBackoff Bean を指定し、PulsarProducer の negativeAckRedeliveryBackoff プロパティとして Bean 名を指定することで、乗数を使用してさまざまな遅延とバックオフメカニズムを指定することもできます。

@EnablePulsar
@Configuration
class NegativeAckRedeliveryConfig {

    @PulsarListener(subscriptionName = "withNegRedeliveryBackoffSubscription",
            topics = "withNegRedeliveryBackoff-test-topic", negativeAckRedeliveryBackoff = "redeliveryBackoff",
            subscriptionType = SubscriptionType.Shared)
    void listen(String msg) {
        throw new RuntimeException("fail " + msg);
    }

    @Bean
    RedeliveryBackoff redeliveryBackoff() {
        return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
                .build();
    }

}

7.3. Apache Pulsar からのデッドレタートピックをメッセージ再配信とエラー処理に使用する

Apache Pulsar を使用すると、アプリケーションは Shared サブスクリプション型のコンシューマーでデッドレタートピックを使用できます。Exclusive および Failover サブスクリプション型の場合、この機能は利用できません。基本的な考え方は、メッセージが一定回数再試行され (ACK タイムアウトまたは NACK 再配信が原因である可能性があります)、再試行回数がなくなると、メッセージをデッドレターキューと呼ばれる特別なトピックに送信できるというものです。(DLQ)。いくつかのコードスニペットを調べて、この機能の動作に関する詳細を確認してみましょう。

@EnablePulsar
@Configuration
class DeadLetterPolicyConfig {

    @PulsarListener(id = "deadLetterPolicyListener", subscriptionName = "deadLetterPolicySubscription",
            topics = "topic-with-dlp", deadLetterPolicy = "deadLetterPolicy",
            subscriptionType = SubscriptionType.Shared, properties = { "ackTimeoutMillis=1000" })
    void listen(String msg) {
        throw new RuntimeException("fail " + msg);
    }

    @PulsarListener(id = "dlqListener", topics = "my-dlq-topic")
    void listenDlq(String msg) {
        System.out.println("From DLQ: " + msg);
    }

    @Bean
    DeadLetterPolicy deadLetterPolicy() {
        return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
    }

}

まず、DeadLetterPolicy 用の特別な Bean があり、deadLetterPolicy という名前が付けられています (任意の名前にすることができます)。この Bean は、最大配信数 (この場合は 10) やデッドレタートピックの名前 (この場合は my-dlq-topic) など、いくつかのことを指定します。DLQ トピック名を指定しない場合は、Pulsar の <topicname>-<subscriptionname>-DLQ がデフォルトになります。次に、deadLetterPolicy プロパティを設定することで、この Bean 名を PulsarListener に提供します。DLQ 機能は共有サブスクリプションでのみ機能するため、PulsarListener のサブスクリプション型は Shared であることに注意してください。このコードは主にデモンストレーション目的であるため、ackTimeoutMillis 値として 1000 を提供します。このコードは例外をスローし、Pulsar が 1 秒以内に ack を受信しない場合は再試行するという考え方です。このサイクルが 10 回続くと (これは DeadLetterPolicy での最大再配信回数です)、Pulsar コンシューマーはメッセージを DLQ トピックに公開します。DLQ トピックをリッスンして、DLQ トピックに公開されたデータを受信するための別の PulsarListener があります。

パーティション化されたトピックを使用する場合の DLQ トピックに関する特別な注意事項

メイントピックがパーティション化されている場合、Pulsar はバックグラウンドで各パーティションを別のトピックとして扱います。Pulsar は partition-<n> を追加します。n はメイントピック名のパーティション番号を表します。問題は、(上記とは対照的に) DLQ トピックを指定しない場合、Pulsar は、この `partition-<n> 情報を含むデフォルトのトピック名 (例: topic-with-dlp-partition-0-deadLetterPolicySubscription-DLQ) にパブリッシュすることです。これを解決する簡単なメソッドは、DLQ トピック名を常に指定することです。

7.4. Spring for Apache Pulsar のネイティブエラー処理

前に記述されていたように、Apache Pulsar の DLQ 機能は共有サブスクリプションに対してのみ機能します。非共有サブスクリプションに対して同様の機能を使用する必要がある場合、アプリケーションはどうすればよいでしょうか ? Pulsar が排他的およびフェイルオーバーサブスクリプションで DLQ をサポートしない主な理由は、これらのサブスクリプション型が順序保証されているためです。再配信や DLQ などを許可すると、メッセージを順番どおりに受信できなくなります。ただし、アプリケーションでは問題ないが、さらに重要なことに、非共有サブスクリプションに対してこの DLQ 機能が必要な場合はどうなるでしょうか ? そのために、Spring for Apache Pulsar は PulsarConsumerErrorHandler を提供します。これは、Pulsar のあらゆるサブスクリプション型 ( ExclusiveFailoverShared または Key_Shared) で使用できます。

Spring for Apache Pulsar から PulsarConsumerErrorHandler を使用する場合は、リスナーに ACK タイムアウトプロパティを設定しないようにしてください。

いくつかのコードスニペットを調べて詳細を見てみましょう。

@EnablePulsar
@Configuration
class PulsarConsumerErrorHandlerConfig {

    @Bean
    PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
            PulsarTemplate<String> pulsarTemplate) {
        return new DefaultPulsarConsumerErrorHandler<>(
                new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
    }

    @PulsarListener(id = "pulsarConsumerErrorHandler-id", subscriptionName = "pulsatConsumerErrorHandler-subscription",
                    topics = "pulsarConsumerErrorHandler-topic",
					pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler")
    void listen(String msg) {
        throw new RuntimeException("fail " + msg);
    }

    @PulsarListener(id = "pceh-dltListener", topics = "my-foo-dlt")
    void listenDlt(String msg) {
        System.out.println("From DLT: " + msg);
    }

}

pulsarConsumerErrorHandler Bean を検討してください。これにより、型 PulsarConsumerErrorHandler の Bean が作成され、Spring for Apache Pulsar によってすぐに提供されるデフォルトの実装 ( DefaultPulsarConsumerErrorHandler) が使用されます。DefaultPulsarConsumerErrorHandler には、PulsarMessageRecovererFactory と org.springframework.util.backoff.Backoff を受け取るコンストラクターがあります。PulsarMessageRecovererFactory は、次の API との関数インターフェースです。

@FunctionalInterface
public interface PulsarMessageRecovererFactory<T> {

	/**
	 * Provides a message recoverer {@link PulsarMessageRecoverer}.
	 * @param consumer Pulsar consumer
	 * @return {@link PulsarMessageRecoverer}.
	 */
	PulsarMessageRecoverer<T> recovererForConsumer(Consumer<T> consumer);

}

recovererForConsumer メソッドは Pulsar コンシューマーを受け取り、別の関数インターフェースである PulsarMessageRecoverer を返します。PulsarMessageRecoverer の API は次のとおりです。

public interface PulsarMessageRecoverer<T> {

	/**
	 * Recover a failed message, for e.g. send the message to a DLT.
	 * @param message Pulsar message
	 * @param exception exception from failed message
	 */
	void recoverMessage(Message<T> message, Exception exception);

}

Spring for Apache Pulsar は、PulsarDeadLetterPublishingRecoverer と呼ばれる PulsarMessageRecovererFactory の実装を提供します。これは、メッセージをデッドレタートピック (DLT) に送信することでメッセージを回復できるデフォルトの実装を提供します。この実装を前述の DefaultPulsarConsumerErrorHandler のコンストラクターに提供します。2 番目の引数として FixedBackOff を指定します。高度なバックオフ機能のために Spring の ExponentialBackoff を提供することもできます。次に、この Bean 名を PulsarConsumerErrorHandler のプロパティとして PulsarListener に提供します。このプロパティは pulsarConsumerErrorHandler と呼ばれます。PulsarListener メソッドはメッセージに対して失敗するたびに再試行されます。再試行の回数は、Backoff が提供する実装値によって制御されます。この例では、10 回の再試行を実行します (最初の再試行とその後 10 回の再試行で合計 11 回の試行)。すべての再試行が完了すると、メッセージは DLT トピックに送信されます。

提供する PulsarDeadLetterPublishingRecoverer 実装では、DLT にメッセージを発行するために使用される PulsarTemplate が使用されます。ほとんどの場合、パーティション化されたトピックに関する注意事項については、Spring Boot からの同じ自動構成 PulsarTemplate で十分です。パーティション化されたトピックを使用し、メイントピックにカスタムメッセージルーティングを使用する場合は、message-routing-mode の値 custompartition が設定される自動構成 PulsarProducerFactory を使用しない別の PulsarTemplate を使用する必要があります。次のブループリントで PulsarConsumerErrorHandler を使用できます。

@Bean
PulsarConsumerErrorHandler<Integer> pulsarConsumerErrorHandler(PulsarClient pulsarClient) {
    PulsarProducerFactory<Integer> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, Map.of());
        PulsarTemplate<Integer> pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory);

        BiFunction<Consumer<?>, Message<?>, String> destinationResolver =
                (c, m) -> "my-foo-dlt";

        PulsarDeadLetterPublishingRecoverer<Integer> pulsarDeadLetterPublishingRecoverer =
                new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, destinationResolver);

        return new DefaultPulsarConsumerErrorHandler<>(pulsarDeadLetterPublishingRecoverer,
                new FixedBackOff(100, 5));
}

2 番目のコンストラクター引数として PulsarDeadLetterPublishingRecoverer に宛先リゾルバーを提供していることに注意してください。指定しない場合、PulsarDeadLetterPublishingRecoverer は DLT トピック名として <subscription-name>-<topic-name>-DLT> を使用します。この機能を使用する場合は、デフォルトを使用するのではなく、宛先リゾルバーを設定して適切な宛先名を使用する必要があります。

PulsarConsumerErrorHnadler で行ったように、単一のレコードメッセージリスナーを使用する場合、および手動確認応答を使用する場合は、例外がスローされたときにメッセージを否定的に確認しないようにしてください。むしろ、例外をコンテナーに再スローしてください。それ以外の場合、コンテナーはメッセージが個別に処理されると認識し、エラー処理はトリガーされません。

最後に、DLT トピックからメッセージを受信する 2 番目の PulsarListener があります。

これまでこのセクションで説明した例では、単一のレコードメッセージリスナーで PulsarConsumerErrorHandler を使用する方法のみを説明しました。次に、これをバッチリスナーで使用する方法を見ていきます。

7.5. PulsarConsumerErrorHandler によるバッチリスナー

まず、バッチ PulsarListener メソッドを見てみましょう。

@PulsarListener(subscriptionName = "batch-demo-5-sub", topics = "batch-demo-4", batch = true, concurrency = "3",
            subscriptionType = SubscriptionType.Failover,
            pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler", ackMode = AckMode.MANUAL)
void listen(List<Message<Integer>> data, Consumer<Integer> consumer, Acknowledgment acknowledgment) {
    for (Message<Integer> datum : data) {
        if (datum.getValue() == 5) {
            throw new PulsarBatchListenerFailedException("failed", datum);
        }
        acknowledgement.acknowledge(datum.getMessageId());
    }
}

@Bean
PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
        PulsarTemplate<String> pulsarTemplate) {
    return new DefaultPulsarConsumerErrorHandler<>(
            new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
}

@PulsarListener(subscriptionName = "my-dlt-subscription", topics = "my-foo-dlt")
void dltReceiver(Message<Integer> message) {
    System.out.println("DLT - RECEIVED: " + message.getValue());
}

もう一度、pulsarConsumerErrorHandler プロパティに PulsarConsumerErrorHandler Bean という名前を付けます。(前の例に示すように) バッチリスナーを使用し、Spring for Apache Pulsar から PulsarConsumerErrorHandler を使用する場合は、手動確認を使用する必要があります。こうすることで、成功した個々のメッセージをすべて確認できます。失敗したものについては、失敗したメッセージを含む PulsarBatchListenerFailedException をスローする必要があります。この例外がなければ、フレームワークは失敗をどう処理すればよいのかわかりません。再試行時に、コンテナーは失敗したメッセージから始まるメッセージの新しいバッチをリスナーに送信します。再度失敗した場合は、再試行が完了するまで再試行され、その時点でメッセージが DLT に送信されます。その時点で、メッセージはコンテナーによって確認され、元のバッチ内の後続のメッセージがリスナーに渡されます。

8. PulsarListener でのコンシューマーのカスタマイズ

Spring for Apache Pulsar は、PulsarListener で使用されるコンテナーによって作成されたコンシューマーをカスタマイズする便利な方法を提供します。アプリケーションは PulsarListenerConsumerBuilderCustomizer に Bean を提供できます。ここに一例を示します。

@Bean
public PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
    return cb -> {
        cb.subscriptionName("modified-subscription-name");
    };
}

次に、以下に示すように、このカスタマイザー Bean 名を PuslarListener アノテーションの属性として提供できます。

@PulsarListener(subscriptionName = "my-subscription",
        topics = "my-topic", consumerCustomizer = "myCustomizer")
void listen(String message) {

}

フレームワークは、PulsarListener を通じて提供された Bean を検出し、Pulsar Consumer を作成する前にこのカスタマイザーを Consumer ビルダーに適用します。

複数の PulsarListener メソッドがあり、それぞれに異なるカスタマイズルールがある場合は、複数のカスタマイザー Bean を作成し、各 PulsarListener に適切なカスタマイザーをアタッチする必要があります。

9. メッセージリスナーコンテナーのライフサイクル

9.1. 一時停止と再開

アプリケーションがメッセージの消費を一時的に停止し、後で再開したい場合があります。Spring for Apache Pulsar は、基になるメッセージリスナーコンテナーを一時停止および再開する機能を提供します。Pulsar メッセージリスナーコンテナーが一時停止されると、Pulsar コンシューマーからデータを受信するためにコンテナーによって行われるポーリングはすべて一時停止されます。同様に、コンテナーが再開されると、一時停止中にトピックに新しいレコードが追加された場合、次のポーリングでデータが返され始めます。

リスナーコンテナーを一時停止または再開するには、以下のスニペットに示すように、まず PulsarListenerEndpointRegistry Bean 経由でコンテナーインスタンスを取得し、次にコンテナーインスタンスで一時停止 / 再開 API を呼び出します。

@Autowired
private PulsarListenerEndpointRegistry registry;

void someMethod() {
  PulsarMessageListenerContainer container = registry.getListenerContainer("my-listener-id");
  container.pause();
}
getListenerContainer に渡される id パラメーターはコンテナー ID です。これは、@PulsarListener を一時停止 / 再開するときの @PulsarListener id 属性の値になります。

9.2. 起動失敗の処理

メッセージリスナーコンテナーは、アプリケーションコンテキストがリフレッシュされると起動されます。デフォルトでは、起動中に発生した障害はすべて再スローされ、アプリケーションの起動は失敗します。この動作は、対応するコンテナープロパティの StartupFailurePolicy を使用して調整できます。

利用可能なオプションは次のとおりです。

  • Stop (デフォルト) - 例外をログに記録して再スローし、アプリケーションを事実上停止します。

  • Continue - 例外をログに記録し、コンテナーを実行していない状態のままにしますが、アプリケーションは停止しません。

  • Retry - 例外をログに記録し、コンテナーを非同期的に起動しようとしますが、アプリケーションは停止しません。

デフォルトの再試行動作は、各試行の間に 10 秒の遅延を置いて 3 回再試行することです。ただし、対応するコンテナープロパティでカスタム再試行テンプレートを指定できます。再試行回数が尽きた後にコンテナーが再起動に失敗すると、コンテナーは非実行状態のままになります。

9.2.1. 構成

Spring Boot 付

Spring Boot を使用する場合、コンテナーの起動プロパティを設定する PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> Bean を登録できます。

Spring Boot なし

ただし、代わりにコンポーネントを手動で構成する場合は、メッセージリスナーコンテナーファクトリを構築するときに、コンテナーの起動プロパティをそれに応じて更新する必要があります。

10. Pulsar リーダーのサポート

このフレームワークは、PulsarReaderFactory 経由で Pulsar リーダー [Apache] (英語) の使用をサポートします。

Spring Boot は、spring.pulsar.reader.* (英語) アプリケーションプロパティのいずれかを指定することでさらに構成できるこのリーダーファクトリを提供します。

10.1. PulsarReader アノテーション

PulsarReaderFactory を直接使用することもできますが、Spring for Apache Pulsar はリーダーファクトリを自分で設定せずにトピックからすばやく読み取るために使用できる PulsarReader アノテーションを提供します。これは、PulsarListener. の背後にある同じアイデアに似ています。簡単な例を次に示します。

@PulsarReader(id = "reader-demo-id", topics = "reader-demo-topic", startMessageId = "earliest")
void read(String message) {
    //...
}

id 属性はオプションですが、アプリケーションにとって意味のある値を指定することがベストプラクティスです。指定しない場合は、自動生成された ID が使用されます。一方、topics 属性と startMessageId 属性は必須です。topics 属性には、単一のトピックまたはトピックのコンマ区切りリストを指定できます。startMessageId 属性は、トピック内の特定のメッセージから始めるようにリーダーに指示します。startMessageId の有効な値は、earliest または latest. です。閲覧者が、利用可能な最も古いメッセージまたは最新のメッセージ以外のトピックから任意にメッセージの読み取りを開始できるようにしたいとします。その場合、ReaderBuilderCustomizer を使用して ReaderBuilder をカスタマイズし、開始する適切な MessageId を認識する必要があります。

10.2. ReaderBuilder のカスタマイズ

Spring for Apache Pulsar の PulsarReaderReaderBuilderCustomizer を使用して、ReaderBuilder を通じて利用可能なフィールドをカスタマイズできます。以下のように、型 PulsarReaderReaderBuilderCustomizer の @Bean を提供し、それを PulsarReader で使用できるようにすることができます。

@PulsarReader(id = "reader-customizer-demo-id", topics = "reader-customizer-demo-topic",
    readerCustomizer = "myCustomizer")
void read(String message) {
    //...
}

@Bean
public PulsarReaderReaderBuilderCustomizer<String> myCustomizer() {
    return readerBuilder -> {
        readerBuilder.startMessageId(messageId); // the first message read is after this message id.
        // Any other customizations on the readerBuilder
    };
}
アプリケーションに 1 つの @PulsarReader と 1 つの PulsarReaderReaderBuilderCustomizer Bean しか登録されていない場合、カスタマイザーは自動的に適用されます。

10.3. 起動失敗の処理

メッセージリスナーコンテナーは、アプリケーションコンテキストがリフレッシュされると起動されます。デフォルトでは、起動中に発生した障害はすべて再スローされ、アプリケーションの起動は失敗します。この動作は、対応するコンテナープロパティの StartupFailurePolicy を使用して調整できます。

利用可能なオプションは次のとおりです。

  • Stop (デフォルト) - 例外をログに記録して再スローし、アプリケーションを事実上停止します。

  • Continue - 例外をログに記録し、コンテナーを実行していない状態のままにしますが、アプリケーションは停止しません。

  • Retry - 例外をログに記録し、コンテナーを非同期的に起動しようとしますが、アプリケーションは停止しません。

デフォルトの再試行動作は、各試行の間に 10 秒の遅延を置いて 3 回再試行することです。ただし、対応するコンテナープロパティでカスタム再試行テンプレートを指定できます。再試行回数が尽きた後にコンテナーが再起動に失敗すると、コンテナーは非実行状態のままになります。

10.3.1. 構成

Spring Boot 付

Spring Boot を使用する場合、コンテナーの起動プロパティを設定する PulsarContainerFactoryCustomizer<DefaultPulsarReaderContainerFactory<?>> Bean を登録できます。

Spring Boot なし

ただし、代わりにコンポーネントを手動で構成する場合は、メッセージリスナーコンテナーファクトリを構築するときに、コンテナーの起動プロパティをそれに応じて更新する必要があります。