メッセージの消費
1. Pulsar リスナー
Pulsar コンシューマーに関しては、エンドユーザーアプリケーションで PulsarListener
アノテーションを使用することをお勧めします。PulsarListener
を使用するには、@EnablePulsar
アノテーションを使用する必要があります。Spring Boot サポートを使用すると、このアノテーションが自動的に有効になり、メッセージリスナーインフラストラクチャ (Pulsar コンシューマーの作成を担当する) など、PulsarListener
に必要なすべてのコンポーネントが構成されます。PulsarMessageListenerContainer
は、PulsarConsumerFactory
を使用して、メッセージを消費するために使用する基礎となる Pulsar コンシューマーである Pulsar コンシューマーを作成および管理します。
Spring Boot はこのコンシューマーファクトリを提供しますが、spring.pulsar.consumer.*
(英語) アプリケーションプロパティを指定してさらに構成できます。
クイックツアーセクションで見た PulsarListener
コードスニペットをもう一度見てみましょう。
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
この方法はさらに簡略化できます。
@PulsarListener
public void listen(String message) {
System.out.println("Message Received: " + message);
}
この最も基本的な形式では、subscriptionName
が @PulsarListener
アノテーションに指定されていない場合、自動生成されたサブスクリプション名が使用されます。同様に、topics
が直接提供されない場合、トピック解決プロセスを使用して宛先トピックが決定されます。
前に示した PulsarListener
メソッドでは、データを String
として受け取りますが、スキーマ型を指定しません。内部的には、フレームワークは Pulsar のスキーマメカニズムに依存して、データを必要な型に変換します。フレームワークは、ユーザーが String
型を予期していることを検出し、その情報に基づいてスキーマ型を推測し、そのスキーマをコンシューマーに提供します。フレームワークは、すべてのプリミティブ型に対してこの推論を実行します。すべての非プリミティブ型では、デフォルトのスキーマは JSON であると想定されます。複合型が JSON 以外のもの (AVRO や KEY_VALUE など) を使用している場合は、schemaType
プロパティを使用してアノテーションにスキーマ型を指定する必要があります。
次の例は、Integer
を受け取る別の PulsarListener
メソッドを示しています。
@PulsarListener(subscriptionName = "my-subscription-1", topics = "my-topic-1")
public void listen(Integer message) {
System.out.println(message);
}
次の PulsarListener
メソッドは、トピックから複合型を使用する方法を示しています。
@PulsarListener(subscriptionName = "my-subscription-2", topics = "my-topic-2", schemaType = SchemaType.JSON)
public void listen(Foo message) {
System.out.println(message);
}
さらにいくつかの方法を見てみましょう。
Pulsar メッセージを直接使用できます。
@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.apache.pulsar.client.api.Message<String> message) {
System.out.println(message.getValue());
}
次の例では、Spring メッセージングエンベロープを使用してレコードを消費します。
@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.springframework.messaging.Message<String> message) {
System.out.println(message.getPayload());
}
次に、バッチでレコードを使用する方法を見てみましょう。次の例では、PulsarListener
を使用してレコードを POJO としてバッチで消費します。
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Foo> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message));
}
この例では、レコードをオブジェクトのコレクション (List
) として受信することに注意してください。さらに、PulsarListener
レベルでバッチ消費を有効にするには、アノテーションの batch
プロパティを true
に設定する必要があります。
List
が保持する実際の型に基づいて、フレームワークは使用するスキーマを推測しようとします。List
に JSON 以外の複合型が含まれている場合でも、PulsarListener
に schemaType
を指定する必要があります。
以下では、Pulsar Java クライアントによって提供される Message
エンベロープを使用します。
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Message<Foo>> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}
次の例では、Spring メッセージング Message
型のエンベロープを含むバッチレコードを使用します。
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<org.springframework.messaging.Message<Foo>> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message.getPayload()));
}
最後に、Pulsar の Messages
ホルダーオブジェクトをバッチリスナーに使用することもできます。
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(org.apache.pulsar.client.api.Messages<Foo>> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}
PulsarListener
を使用すると、アノテーション自体に Pulsar コンシューマープロパティを直接提供できます。これは、前述の Boot 構成プロパティを使用したくない場合、または複数の PulsarListener
メソッドを使用したくない場合に便利です。
次の例では、Pulsar コンシューマープロパティを PulsarListener
で直接使用します。
@PulsarListener(properties = { "subscriptionName=subscription-1", "topicNames=foo-1", "receiverQueueSize=5000" })
void listen(String message) {
}
使用されるプロパティは、spring.pulsar.consumer アプリケーション構成プロパティではなく、直接 Pulsar コンシューマープロパティです。 |
1.1. AUTO_CONSUME を使用した汎用レコード
Pulsar トピックのスキーマの型を事前に知る機会がない場合は、AUTO_CONSUME
スキーマ型を使用して汎用レコードを使用できます。この場合、トピックは、トピックに関連付けられたスキーマ情報を使用して、メッセージを GenericRecord
オブジェクトに逆直列化します。
汎用レコードを使用するには、以下に示すように、@PulsarListener
で schemaType = SchemaType.AUTO_CONSUME
を設定し、型 GenericRecord
の Pulsar メッセージをメッセージパラメーターとして使用します。
@PulsarListener(topics = "my-generic-topic", schemaType = SchemaType.AUTO_CONSUME)
void listen(org.apache.pulsar.client.api.Message<GenericRecord> message) {
GenericRecord record = message.getValue();
record.getFields().forEach((f) ->
System.out.printf("%s = %s%n", f.getName(), record.getField(f)));
}
GenericRecord API を使用すると、フィールドとその関連値にアクセスできます。 |
1.2. ConsumerBuilder のカスタマイズ
以下に示すように、型 PulsarListenerConsumerBuilderCustomizer
の @Bean
を提供し、それを PulsarListener
で使用できるようにすることで、PulsarListenerConsumerBuilderCustomizer
を使用して ConsumerBuilder
を通じて使用可能なフィールドをカスタマイズできます。
@PulsarListener(topics = "hello-topic", consumerCustomizer = "myCustomizer")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
@Bean
PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
return (builder) -> builder.consumerName("myConsumer");
}
アプリケーションに 1 つの @PulsarListener と 1 つの PulsarListenerConsumerBuilderCustomizer Bean しか登録されていない場合、カスタマイザーは自動的に適用されます。 |
2. スキーマ情報の指定
前に示したように、Java プリミティブの場合、Spring for Apache Pulsar フレームワークは PulsarListener
で使用する適切なスキーマを推測できます。非プリミティブ型の場合、アノテーションでスキーマが明示的に指定されていない場合、Spring for Apache Pulsar フレームワークはその型から Schema.JSON
を構築しようとします。
現在サポートされている複雑なスキーマ型は、INLINE エンコードを使用した JSON、AVRO、PROTOBUF、AUTO_CONSUME、KEY_VALUE です。 |
2.1. カスタムスキーママッピング
PulsarListener
で複合型のスキーマを指定する代わりに、型のマッピングを使用してスキーマリゾルバーを構成できます。これにより、フレームワークが受信メッセージ型を使用してリゾルバーを参照するため、リスナーにスキーマを設定する必要がなくなります。
2.1.1. 構成プロパティ
スキーママッピングは、spring.pulsar.defaults.type-mappings
プロパティを使用して構成できます。次の例では、application.yml
を使用して、それぞれ AVRO
スキーマと JSON
スキーマを使用する User
および Address
複合オブジェクトのマッピングを追加します。
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
message-type は、メッセージクラスの完全修飾名です。 |
2.1.2. スキーマリゾルバーカスタマイザー
マッピングを追加する推奨される方法は、上記のプロパティを使用することです。ただし、より詳細な制御が必要な場合は、スキーマリゾルバーカスタマイザーを提供してマッピングを追加できます。
次の例では、スキーマリゾルバーカスタマイザーを使用して、AVRO
スキーマと JSON
スキーマをそれぞれ使用する User
および Address
複合オブジェクトのマッピングを追加します。
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
2.1.3. 型マッピングのアノテーション
特定のメッセージ型に使用するデフォルトのスキーマ情報を指定するためのもう 1 つのオプションは、メッセージクラスに @PulsarMessage
アノテーションをマークすることです。スキーマ情報は、アノテーションの schemaType
属性を介して指定できます。
次の例では、Foo
型のメッセージを生成または消費するときに、デフォルトのスキーマとして JSON を使用するようにシステムを構成します。
@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}
この設定を行うと、リスナーにスキーマを設定する必要はありません。例:
@PulsarListener(subscriptionName = "user-sub", topics = "user-topic")
public void listen(User user) {
System.out.println(user);
}
3. Pulsar コンシューマーオブジェクトへのアクセス
場合によっては、Pulsar Consumer オブジェクトに直接アクセスする必要があります。次の例は、それを取得する方法を示しています。
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message, org.apache.pulsar.client.api.Consumer<String> consumer) {
System.out.println("Message Received: " + message);
ConsumerStats stats = consumer.getStats();
...
}
この方法で Consumer オブジェクトにアクセスするときは、受信メソッドを呼び出してコンシューマーのカーソル位置を変更する操作を呼び出さないでください。このような操作はすべてコンテナーによって実行される必要があります。 |
4. Pulsar メッセージリスナーコンテナー
これで、PulsarListener
を介したコンシューマー側の基本的な対話を見てきました。ここで、PulsarListener
が基礎となる Pulsar コンシューマーとどのように対話するかという内部動作を詳しく見てみましょう。エンドユーザーアプリケーションの場合、ほとんどのシナリオで、Spring for Apache Pulsar を使用するときに Pulsar トピックから利用するために PulsarListener
アノテーションを直接使用することをお勧めします。これは、そのモデルが幅広いアプリケーションユースケースをカバーしているためです。ただし、PulsarListener
が内部でどのように動作するかを理解することが重要です。このセクションでは、それらの詳細について説明します。
前に簡単に説明したように、Spring for Apache Pulsar を使用する場合、メッセージリスナーコンテナーがメッセージ消費の中心となります。PulsarListener
は、バックグラウンドでメッセージリスナーコンテナーインフラストラクチャを使用して、Pulsar コンシューマーを作成および管理します。Spring for Apache Pulsar は、PulsarMessageListenerContainer
を通じてこのメッセージリスナーコンテナーの契約を提供します。このメッセージリスナーコンテナーのデフォルトの実装は、DefaultPulsarMessageListenerContainer
を通じて提供されます。名前が示すように、PulsarMessageListenerContainer
にはメッセージリスナーが含まれています。コンテナーは Pulsar コンシューマーを作成し、別のスレッドを実行してデータを受信して処理します。データは、提供されたメッセージリスナー実装によって処理されます。
メッセージリスナーコンテナーは、コンシューマーの batchReceive
メソッドを使用してデータをバッチで消費します。データが受信されると、選択されたメッセージリスナー実装に渡されます。
Spring for Apache Pulsar を使用する場合、次のメッセージリスナー型を使用できます。
これらのさまざまなメッセージリスナーの詳細については、次のセクションで説明します。
ただし、その前に、コンテナー自体を詳しく見てみましょう。
4.1. DefaultPulsarMessageListenerContainer
これは、単一のコンシューマーベースのメッセージリスナーコンテナーです。次のリストは、そのコンストラクターを示しています。
public DefaultPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
PulsarContainerProperties pulsarContainerProperties)
}
PulsarConsumerFactory
(コンシューマーの作成に使用) と PulsarContainerProperties
オブジェクト (コンテナーのプロパティに関する情報が含まれる) を受け取ります。PulsarContainerProperties
には次のコンストラクターがあります。
public PulsarContainerProperties(String... topics)
public PulsarContainerProperties(Pattern topicPattern)
トピック情報は、PulsarContainerProperties
を通じて、またはコンシューマーファクトリに提供されるコンシューマープロパティとして提供できます。次の例では DefaultPulsarMessageListenerContainer
を使用します。
Map<String, Object> config = new HashMap<>();
config.put("topics", "my-topic");
PulsarConsumerFactory<String> pulsarConsumerFactorY = DefaultPulsarConsumerFactory<>(pulsarClient, config);
PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties();
pulsarContainerProperties.setMessageListener((PulsarRecordMessageListener<?>) (consumer, msg) -> {
});
DefaultPulsarMessageListenerContainer<String> pulsarListenerContainer = new DefaultPulsarMessageListenerContainer(pulsarConsumerFacotyr,
pulsarContainerProperties);
return pulsarListenerContainer;
リスナーコンテナーを直接使用するときにトピック情報が指定されていない場合は、「メッセージ型のデフォルト」ステップが省略されることを除いて、PulsarListener で使用されるのと同じトピック解決プロセスが使用されます。 |
DefaultPulsarMessageListenerContainer
は単一のコンシューマーのみを作成します。複数のスレッドを通じて複数のコンシューマーを管理したい場合は、ConcurrentPulsarMessageListenerContainer
を使用する必要があります。
4.2. ConcurrentPulsarMessageListenerContainer
ConcurrentPulsarMessageListenerContainer
には次のコンストラクターがあります。
public ConcurrentPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
PulsarContainerProperties pulsarContainerProperties)
ConcurrentPulsarMessageListenerContainer
を使用すると、setter を通じて concurrency
プロパティを指定できます。1
を超える同時実行は、非排他的サブスクリプション (failover
、shared
、key-shared
) でのみ許可されます。排他的サブスクリプションモードを使用している場合、同時実行のためにデフォルトの 1
のみを使用できます。
次の例では、failover
サブスクリプションの PulsarListener
アノテーションを通じて concurrency
を有効にします。
@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
subscriptionType = SubscriptionType.Failover, concurrency = "3")
void listen(String message, Consumer<String> consumer) {
...
System.out.println("Current Thread: " + Thread.currentThread().getName());
System.out.println("Current Consumer: " + consumer.getConsumerName());
}
前述のリスナーでは、トピック my-topic
に 3 つのパーティションがあると想定しています。パーティション化されていないトピックの場合、同時実行性を 3
に設定しても何も起こりません。メインのアクティブなコンシューマーに加えて、2 つのアイドル状態のコンシューマーが取得されます。トピックに 3 つ以上のパーティションがある場合、メッセージは、コンテナーが作成するコンシューマー間で負荷分散されます。この PulsarListener
を実行すると、前の例のスレッド名とコンシューマー名の出力が示すように、異なるパーティションからのメッセージが異なるコンシューマーを介して消費されることがわかります。
パーティション化されたトピックでこの方法で Failover サブスクリプションを使用すると、Pulsar はメッセージの順序を保証します。 |
次のリストは、PulsarListener
の別の例を示していますが、Shared
サブスクリプションと concurrency
が有効になっています。
@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
subscriptionType = SubscriptionType.Shared, concurrency = "5")
void listen(String message) {
...
}
前の例では、PulsarListener
は 5 つの異なるコンシューマーを作成します (今回は、トピックに 5 つのパーティションがあると仮定します)。
Shared サブスクリプションでは Pulsar でのメッセージの順序が保証されないため、このバージョンではメッセージの順序はありません。 |
メッセージの順序付けが必要で、共有サブスクリプション型が必要な場合は、Key_Shared
サブスクリプション型を使用する必要があります。
4.3. レコードの消費
メッセージリスナーコンテナーが単一レコードとバッチベースのメッセージの両方の消費をどのように可能にするかを見てみましょう。
単一レコードの消費
この説明のために、基本的な PulsarListener
をもう一度見てみましょう。
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
この PulsarListener
メソッドでは、毎回 1 つのレコードでリスナーメソッドを呼び出すように Spring for Apache Pulsar に要求する必要があります。メッセージリスナーコンテナーは、コンシューマー上で batchReceive
メソッドを使用してデータをバッチで消費すると述べました。この場合、フレームワークは PulsarListener
が単一のレコードを受信することを検出します。これは、メソッドを呼び出すたびに 1 つのレコードが必要であることを意味します。レコードはメッセージリスナーコンテナーによってバッチで消費されますが、受信したバッチを反復処理し、PulsarRecordMessageListener
のアダプターを介してリスナーメソッドを呼び出します。前のセクションでわかるように、PulsarRecordMessageListener
は Pulsar Java クライアントによって提供される MessageListener
を継承し、基本的な received
メソッドをサポートします。
バッチ消費
次の例は、PulsarListener
がレコードをバッチで消費することを示しています。
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen4(List<Foo> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message));
}
この型の PulsarListener
を使用すると、フレームワークはバッチモードであることを検出します。Consumer の batchReceive
メソッドを使用してデータをバッチですでに受信しているため、PulsarBatchMessageListener
のアダプターを通じてバッチ全体をリスナーメソッドに渡します。
5. Pulsar ヘッダー
Pulsar メッセージメタデータは、Spring メッセージヘッダーとして使用できます。使用可能なヘッダーのリストは PulsarHeaders.java [GitHub] (英語) にあります。
5.1. 単一レコードベースのコンシューマーでのアクセス
次の例は、単一レコード消費モードを使用するアプリケーションでさまざまな Pulsar ヘッダーにアクセスする方法を示しています。
@PulsarListener(topics = "simpleListenerWithHeaders")
void simpleListenerWithHeaders(String data, @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
@Header(PulsarHeaders.RAW_DATA) byte[] rawData,
@Header("foo") String foo) {
}
前述の例では、messageId
および rawData
メッセージメタデータの値と、foo
という名前のカスタムメッセージプロパティにアクセスします。Spring @Header
アノテーションは各ヘッダーフィールドに使用されます。
Pulsar の Message
をペイロードを運ぶエンベロープとして使用することもできます。その際、ユーザーはメタデータを取得するために Pulsar メッセージ上の対応するメソッドを直接呼び出すことができます。ただし、便宜上、Header
アノテーションを使用して取得することもできます。Spring メッセージング Message
エンベロープを使用してペイロードを伝送し、@Header
を使用して Pulsar ヘッダーを取得することもできることに注意してください。
5.2. バッチレコードベースのコンシューマーでのアクセス
このセクションでは、バッチコンシューマーを使用するアプリケーションでさまざまな Pulsar ヘッダーにアクセスする方法を説明します。
@PulsarListener(topics = "simpleBatchListenerWithHeaders", batch = true)
void simpleBatchListenerWithHeaders(List<String> data,
@Header(PulsarHeaders.MESSAGE_ID) List<MessageId> messageIds,
@Header(PulsarHeaders.TOPIC_NAME) List<String> topicNames, @Header("foo") List<String> fooValues) {
}
前の例では、データを List<String>
として消費します。さまざまなヘッダーを抽出するときも、List<>
として抽出します。Spring for Apache Pulsar は、ヘッダーリストがデータリストに対応していることを保証します。
バッチリスナーを使用し、ペイロードを List<org.apache.pulsar.client.api.Message<?>
、org.apache.pulsar.client.api.Messages<?>
、または org.springframework.messaging.Messsge<?>
として受信する場合も、同じ方法でヘッダーを抽出できます。
6. メッセージの確認
Spring for Apache Pulsar を使用する場合、アプリケーションによってオプトアウトされない限り、メッセージ確認応答はフレームワークによって処理されます。このセクションでは、フレームワークがメッセージ確認応答を処理する方法について詳しく説明します。
6.1. メッセージ ACK モード
Spring for Apache Pulsar は、メッセージを確認するために次のモードを提供します。
BATCH
RECORD
MANUAL
BATCH
確認応答モードはデフォルトですが、メッセージリスナーコンテナーで変更できます。次のセクションでは、PulsarListener
の単一バージョンとバッチバージョンの両方を使用する場合に確認応答がどのように機能するか、またそれらがバッキングメッセージリスナーコンテナー (そして最終的には Pulsar コンシューマー) にどのように変換されるかを見ていきます。
6.2. シングルレコードモードでの自動メッセージ確認応答
基本的な単一メッセージベースの PulsarListener
をもう一度見てみましょう。
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
特に Pulsar コンシューマーを直接使用することに慣れている場合は、PulsarListener
を使用するときに確認応答がどのように機能するか疑問に思うのは自然なことです。答えはメッセージリスナーコンテナーにあります。これは、コンシューマー関連のすべてのアクティビティを調整する Spring for Apache Pulsar の中心的な場所です。
デフォルトの動作をオーバーライドしないと仮定すると、前述の PulsarListener
を使用するとバックグラウンドで次のことが起こります。
まず、リスナーコンテナーは、Pulsar コンシューマーからメッセージをバッチとして受信します。
受信されたメッセージは、一度に 1 メッセージずつ
PulsarListener
に渡されます。すべてのレコードがリスナーメソッドに渡され、正常に処理されると、コンテナーは元のバッチからのすべてのメッセージを承認します。
これが通常の流れです。元のバッチのいずれかのレコードが例外をスローした場合、Spring for Apache Pulsar はそれらのレコードを個別に追跡します。バッチからのすべてのレコードが処理されると、Spring for Apache Pulsar は成功したすべてのメッセージを肯定応答し、失敗したすべてのメッセージを否定的に肯定応答 (ナック) します。つまり、PulsarRecordMessageListener
を使用して単一レコードを消費し、BATCH
のデフォルトの ack モードが使用される場合、フレームワークは batchReceive
呼び出しから受信したすべてのレコードが正常に処理されるのを待ってから、Pulsar コンシューマーで acknowledge
メソッドを呼び出します。ハンドラーメソッドの呼び出し時に特定のレコードが例外をスローした場合、Spring for Apache Pulsar はそれらのレコードを追跡し、バッチ全体が処理された後にそれらのレコードに対して個別に negativeAcknowledge
を呼び出します。
アプリケーションがレコードごとに肯定応答または否定応答を発生させたい場合は、RECORD
ACK モードを有効にすることができます。この場合、各レコードを処理した後、エラーがなければメッセージは肯定応答され、エラーがあった場合は否定応答されます。次の例では、Pulsar リスナーで RECORD
ACK モードを有効にします。
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.RECORD)
public void listen(String message) {
System.out.println("Message Received: " + message);
}
6.3. シングルレコードモードでの手動メッセージ確認応答
フレームワークが常に確認応答を送信する必要があるわけではなく、アプリケーション自体から直接送信する必要がある場合があります。Spring for Apache Pulsar では、手動のメッセージ確認応答を有効にする方法がいくつか提供されています。次の例は、そのうちの 1 つを示しています。
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Acknowledgment acknowledgment) {
System.out.println("Message Received: " + message.getValue());
acknowledgment.acknowledge();
}
ここで説明する価値のあることがいくつかあります。まず、PulsarListener
で ackMode
を設定することで、手動 ACK モードを有効にします。手動 ACK モードを有効にすると、Spring for Apache Pulsar はアプリケーションに Acknowledgment
オブジェクトを挿入させます。フレームワークは、互換性のあるメッセージリスナーコンテナー (単一レコードベースの消費用の PulsarAcknowledgingMessageListener
) を選択することでこれを実現します。これにより、Acknowledgment
オブジェクトへのアクセスが可能になります。
Acknowledgment
オブジェクトは、次の API メソッドを提供します。
void acknowledge();
void acknowledge(MessageId messageId);
void acknowledge(List<MessageId> messageIds);
void nack();
void nack(MessageId messageId);
MANUAL
ACK モードを使用しているときに、この Acknowledgment
オブジェクトを PulsarListener
に挿入し、対応するメソッドの 1 つを呼び出すことができます。
前述の PulsarListener
の例では、パラメーターのない acknowledge
メソッドを呼び出します。これは、フレームワークが現在どの Message
で動作しているかを認識しているためです。acknowledge()
を呼び出す場合、Message
エンベロープでペイロードを受信する必要はなく、ターゲット型 (この例では String
) を使用します。メッセージ ID: acknowledge.acknowledge(message.getMessageId());
を指定して、acknowledge
の別のバリアントを呼び出すこともできます。acknowledge(messageId)
を使用する場合は、Message<?>
エンベロープを使用してペイロードを受信する必要があります。
確認応答で可能なことと同様に、Acknowledgment
API は否定応答のオプションも提供します。前に示した nack メソッドを参照してください。
Pulsar コンシューマー上で acknowledge
を直接呼び出すこともできます。
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Consumer<String> consumer) {
System.out.println("Message Received: " + message.getValue());
try {
consumer.acknowledge(message);
}
catch (Exception e) {
....
}
}
基盤となるコンシューマーで acknowledge
を直接呼び出す場合は、自分でエラー処理を行う必要があります。Acknowledgment
を使用する場合は、フレームワークが自動的に実行してくれるため、その必要はありません。手動確認応答を使用する場合は、Acknowledgment
オブジェクトアプローチを使用する必要があります。
手動確認を使用する場合、フレームワークでは確認がまったく行われないことを理解することが重要です。アプリケーションを設計する際には、適切な確認戦略を熟考することが非常に重要です。 |
6.4. バッチ消費における自動メッセージ確認応答
レコードをバッチで消費し ( "メッセージ ACK モード" を参照)、デフォルトの ACK モード BATCH
を使用する場合、バッチ全体が正常に処理されると、バッチ全体が承認されます。いずれかのレコードが例外をスローした場合、バッチ全体が否定的に認識されます。これは、プロデューサー側でバッチ化されたバッチと同じではない可能性があることに注意してください。むしろ、これはコンシューマーでの batchReceive
の呼び出しから返されたバッチです。
次のバッチリスナーを考えてみましょう。
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", batch = true)
public void batchListen(List<Foo> messages) {
for (Foo foo : messages) {
...
}
}
受信コレクション (この例では messages
) 内のすべてのメッセージが処理されると、フレームワークはすべてのメッセージを確認します。
バッチモードで使用する場合、RECORD
は許可された ACK モードではありません。アプリケーションはバッチ全体の再配信を望まない場合があるため、問題が発生する可能性があります。このような状況では、MANUAL
確認モードを使用する必要があります。
6.5. バッチ消費における手動メッセージ確認応答
前のセクションで見たように、MANUAL
ACK モードがメッセージリスナーコンテナーに設定されている場合、フレームワークは肯定的または否定的な確認を行いません。このような関心事に対処できるかどうかは完全にアプリケーション次第です。MANUAL
ACK モードが設定されている場合、Spring for Apache Pulsar は互換性のあるメッセージリスナーコンテナー (バッチ消費用の PulsarBatchAcknowledgingMessageListener
) を選択します。これにより、Acknowledgment
オブジェクトへのアクセスが可能になります。Acknowledgment
API で使用できるメソッドは次のとおりです。
void acknowledge();
void acknowledge(MessageId messageId);
void acknowledge(List<MessageId> messageIds);
void nack();
void nack(MessageId messageId);
MANUAL
ACK モードを使用しているときに、この Acknowledgment
オブジェクトを PulsarListener
に挿入できます。次のリストは、バッチベースのリスナーの基本的な例を示しています。
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(List<Message<String>> messgaes, Acknowlegement acknowledgment) {
for (Message<String> message : messages) {
try {
...
acknowledgment.acknowledge(message.getMessageId());
}
catch (Exception e) {
acknowledgment.nack(message.getMessageId());
}
}
}
バッチリスナーを使用する場合、メッセージリスナーコンテナーは現在どのレコードを操作しているかを認識できません。手動で確認するには、MessageId
または List<MessageId>
を受け取るオーバーロードされた acknowledge
メソッドのいずれかを使用する必要があります。MessageId
を使用してバッチリスナーに対して否定応答することもできます。
7. メッセージの再配信とエラー処理
PulsarListener
とメッセージリスナーコンテナーのインフラストラクチャとそのさまざまな機能について説明したため、次にメッセージの再配信とエラー処理を理解してみましょう。Apache Pulsar は、メッセージの再配信とエラー処理のためのさまざまなネイティブ戦略を提供します。見て、Spring for Apache Pulsar を通じてどのように使用できるかを見てみましょう。
7.1. メッセージ再配信の確認応答タイムアウトの指定
デフォルトでは、Pulsar コンシューマーはクラッシュしない限りメッセージを再配信しませんが、Pulsar コンシューマーに ack タイムアウトを設定することでこの動作を変更できます。ack タイムアウトプロパティの値が 0 より大きく、Pulsar コンシューマーがタイムアウト期間内にメッセージを確認応答しない場合、メッセージは再配信されます。
Spring for Apache Pulsar を使用する場合、コンシューマーカスタマイザーを介して、または @PulsarListener
の properties
属性のネイティブ Pulsar ackTimeoutMillis
プロパティを使用して、このプロパティを設定できます。
@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
properties = {"ackTimeoutMillis=60000"})
public void listen(String s) {
...
}
ack タイムアウトを指定した場合、コンシューマーが 60 秒以内に確認応答を送信しない場合、メッセージは Pulsar によってコンシューマーに再配信されます。
さまざまな遅延を伴う ACK タイムアウトの高度なバックオフオプションを指定する場合は、次の手順を実行できます。
@EnablePulsar
@Configuration
class AckTimeoutRedeliveryConfig {
@PulsarListener(subscriptionName = "withAckTimeoutRedeliveryBackoffSubscription",
topics = "withAckTimeoutRedeliveryBackoff-test-topic",
ackTimeoutRedeliveryBackoff = "ackTimeoutRedeliveryBackoff",
properties = { "ackTimeoutMillis=60000" })
void listen(String msg) {
// some long-running process that may cause an ack timeout
}
@Bean
RedeliveryBackoff ackTimeoutRedeliveryBackoff() {
return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
.build();
}
}
前述の例では、Pulsar の RedeliveryBackoff
の Bean を、最小遅延 1 秒、最大遅延 10 秒、バックオフ乗数 2 で指定します。最初の ACK タイムアウトが発生した後、メッセージの再配信はこのバックオフ Bean によって制御されます。ackTimeoutRedeliveryBackoff
プロパティを実際の Bean 名 (この場合は ackTimeoutRedeliveryBackoff
) に設定することで、バックオフ Bean を PulsarListener
アノテーションに提供します。
7.2. 否定応答の再配信の指定
否定応答する場合、Pulsar コンシューマーでは、アプリケーションがメッセージを再配信する方法を指定できます。デフォルトでは、メッセージは 1 分以内に再配信されますが、コンシューマーカスタマイザーを使用するか、@PulsarListener
の properties
属性のネイティブ Pulsar negativeAckRedeliveryDelay
プロパティを使用して変更できます。
@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
properties = {"negativeAckRedeliveryDelay=10ms"})
public void listen(String s) {
...
}
次のように、RedeliveryBackoff
Bean を指定し、PulsarProducer の negativeAckRedeliveryBackoff
プロパティとして Bean 名を指定することで、乗数を使用してさまざまな遅延とバックオフメカニズムを指定することもできます。
@EnablePulsar
@Configuration
class NegativeAckRedeliveryConfig {
@PulsarListener(subscriptionName = "withNegRedeliveryBackoffSubscription",
topics = "withNegRedeliveryBackoff-test-topic", negativeAckRedeliveryBackoff = "redeliveryBackoff",
subscriptionType = SubscriptionType.Shared)
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@Bean
RedeliveryBackoff redeliveryBackoff() {
return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
.build();
}
}
7.3. Apache Pulsar からのデッドレタートピックをメッセージ再配信とエラー処理に使用する
Apache Pulsar を使用すると、アプリケーションは Shared
サブスクリプション型のコンシューマーでデッドレタートピックを使用できます。Exclusive
および Failover
サブスクリプション型の場合、この機能は利用できません。基本的な考え方は、メッセージが一定回数再試行され (ACK タイムアウトまたは NACK 再配信が原因である可能性があります)、再試行回数がなくなると、メッセージをデッドレターキューと呼ばれる特別なトピックに送信できるというものです。(DLQ)。いくつかのコードスニペットを調べて、この機能の動作に関する詳細を確認してみましょう。
@EnablePulsar
@Configuration
class DeadLetterPolicyConfig {
@PulsarListener(id = "deadLetterPolicyListener", subscriptionName = "deadLetterPolicySubscription",
topics = "topic-with-dlp", deadLetterPolicy = "deadLetterPolicy",
subscriptionType = SubscriptionType.Shared, properties = { "ackTimeoutMillis=1000" })
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@PulsarListener(id = "dlqListener", topics = "my-dlq-topic")
void listenDlq(String msg) {
System.out.println("From DLQ: " + msg);
}
@Bean
DeadLetterPolicy deadLetterPolicy() {
return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
}
}
まず、DeadLetterPolicy
用の特別な Bean があり、deadLetterPolicy
という名前が付けられています (任意の名前にすることができます)。この Bean は、最大配信数 (この場合は 10) やデッドレタートピックの名前 (この場合は my-dlq-topic
) など、いくつかのことを指定します。DLQ トピック名を指定しない場合は、Pulsar の <topicname>-<subscriptionname>-DLQ
がデフォルトになります。次に、deadLetterPolicy
プロパティを設定することで、この Bean 名を PulsarListener
に提供します。DLQ 機能は共有サブスクリプションでのみ機能するため、PulsarListener
のサブスクリプション型は Shared
であることに注意してください。このコードは主にデモンストレーション目的であるため、ackTimeoutMillis
値として 1000 を提供します。このコードは例外をスローし、Pulsar が 1 秒以内に ack を受信しない場合は再試行するという考え方です。このサイクルが 10 回続くと (これは DeadLetterPolicy
での最大再配信回数です)、Pulsar コンシューマーはメッセージを DLQ トピックに公開します。DLQ トピックをリッスンして、DLQ トピックに公開されたデータを受信するための別の PulsarListener
があります。
7.4. Spring for Apache Pulsar のネイティブエラー処理
前に記述されていたように、Apache Pulsar の DLQ 機能は共有サブスクリプションに対してのみ機能します。非共有サブスクリプションに対して同様の機能を使用する必要がある場合、アプリケーションはどうすればよいでしょうか ? Pulsar が排他的およびフェイルオーバーサブスクリプションで DLQ をサポートしない主な理由は、これらのサブスクリプション型が順序保証されているためです。再配信や DLQ などを許可すると、メッセージを順番どおりに受信できなくなります。ただし、アプリケーションでは問題ないが、さらに重要なことに、非共有サブスクリプションに対してこの DLQ 機能が必要な場合はどうなるでしょうか ? そのために、Spring for Apache Pulsar は PulsarConsumerErrorHandler
を提供します。これは、Pulsar のあらゆるサブスクリプション型 ( Exclusive
、Failover
、Shared
または Key_Shared
) で使用できます。
Spring for Apache Pulsar から PulsarConsumerErrorHandler
を使用する場合は、リスナーに ACK タイムアウトプロパティを設定しないようにしてください。
いくつかのコードスニペットを調べて詳細を見てみましょう。
@EnablePulsar
@Configuration
class PulsarConsumerErrorHandlerConfig {
@Bean
PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
PulsarTemplate<String> pulsarTemplate) {
return new DefaultPulsarConsumerErrorHandler<>(
new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
}
@PulsarListener(id = "pulsarConsumerErrorHandler-id", subscriptionName = "pulsatConsumerErrorHandler-subscription",
topics = "pulsarConsumerErrorHandler-topic",
pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler")
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@PulsarListener(id = "pceh-dltListener", topics = "my-foo-dlt")
void listenDlt(String msg) {
System.out.println("From DLT: " + msg);
}
}
pulsarConsumerErrorHandler
Bean を検討してください。これにより、型 PulsarConsumerErrorHandler
の Bean が作成され、Spring for Apache Pulsar によってすぐに提供されるデフォルトの実装 ( DefaultPulsarConsumerErrorHandler
) が使用されます。DefaultPulsarConsumerErrorHandler
には、PulsarMessageRecovererFactory
と org.springframework.util.backoff.Backoff
を受け取るコンストラクターがあります。PulsarMessageRecovererFactory
は、次の API との関数インターフェースです。
@FunctionalInterface
public interface PulsarMessageRecovererFactory<T> {
/**
* Provides a message recoverer {@link PulsarMessageRecoverer}.
* @param consumer Pulsar consumer
* @return {@link PulsarMessageRecoverer}.
*/
PulsarMessageRecoverer<T> recovererForConsumer(Consumer<T> consumer);
}
recovererForConsumer
メソッドは Pulsar コンシューマーを受け取り、別の関数インターフェースである PulsarMessageRecoverer
を返します。PulsarMessageRecoverer
の API は次のとおりです。
public interface PulsarMessageRecoverer<T> {
/**
* Recover a failed message, for e.g. send the message to a DLT.
* @param message Pulsar message
* @param exception exception from failed message
*/
void recoverMessage(Message<T> message, Exception exception);
}
Spring for Apache Pulsar は、PulsarDeadLetterPublishingRecoverer
と呼ばれる PulsarMessageRecovererFactory
の実装を提供します。これは、メッセージをデッドレタートピック (DLT) に送信することでメッセージを回復できるデフォルトの実装を提供します。この実装を前述の DefaultPulsarConsumerErrorHandler
のコンストラクターに提供します。2 番目の引数として FixedBackOff
を指定します。高度なバックオフ機能のために Spring の ExponentialBackoff
を提供することもできます。次に、この Bean 名を PulsarConsumerErrorHandler
のプロパティとして PulsarListener
に提供します。このプロパティは pulsarConsumerErrorHandler
と呼ばれます。PulsarListener
メソッドはメッセージに対して失敗するたびに再試行されます。再試行の回数は、Backoff
が提供する実装値によって制御されます。この例では、10 回の再試行を実行します (最初の再試行とその後 10 回の再試行で合計 11 回の試行)。すべての再試行が完了すると、メッセージは DLT トピックに送信されます。
提供する PulsarDeadLetterPublishingRecoverer
実装では、DLT にメッセージを発行するために使用される PulsarTemplate
が使用されます。ほとんどの場合、パーティション化されたトピックに関する注意事項については、Spring Boot からの同じ自動構成 PulsarTemplate
で十分です。パーティション化されたトピックを使用し、メイントピックにカスタムメッセージルーティングを使用する場合は、message-routing-mode
の値 custompartition
が設定される自動構成 PulsarProducerFactory
を使用しない別の PulsarTemplate
を使用する必要があります。次のブループリントで PulsarConsumerErrorHandler
を使用できます。
@Bean
PulsarConsumerErrorHandler<Integer> pulsarConsumerErrorHandler(PulsarClient pulsarClient) {
PulsarProducerFactory<Integer> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, Map.of());
PulsarTemplate<Integer> pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory);
BiFunction<Consumer<?>, Message<?>, String> destinationResolver =
(c, m) -> "my-foo-dlt";
PulsarDeadLetterPublishingRecoverer<Integer> pulsarDeadLetterPublishingRecoverer =
new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, destinationResolver);
return new DefaultPulsarConsumerErrorHandler<>(pulsarDeadLetterPublishingRecoverer,
new FixedBackOff(100, 5));
}
2 番目のコンストラクター引数として PulsarDeadLetterPublishingRecoverer
に宛先リゾルバーを提供していることに注意してください。指定しない場合、PulsarDeadLetterPublishingRecoverer
は DLT トピック名として <subscription-name>-<topic-name>-DLT>
を使用します。この機能を使用する場合は、デフォルトを使用するのではなく、宛先リゾルバーを設定して適切な宛先名を使用する必要があります。
PulsarConsumerErrorHnadler
で行ったように、単一のレコードメッセージリスナーを使用する場合、および手動確認応答を使用する場合は、例外がスローされたときにメッセージを否定的に確認しないようにしてください。むしろ、例外をコンテナーに再スローしてください。それ以外の場合、コンテナーはメッセージが個別に処理されると認識し、エラー処理はトリガーされません。
最後に、DLT トピックからメッセージを受信する 2 番目の PulsarListener
があります。
これまでこのセクションで説明した例では、単一のレコードメッセージリスナーで PulsarConsumerErrorHandler
を使用する方法のみを説明しました。次に、これをバッチリスナーで使用する方法を見ていきます。
7.5. PulsarConsumerErrorHandler によるバッチリスナー
まず、バッチ PulsarListener
メソッドを見てみましょう。
@PulsarListener(subscriptionName = "batch-demo-5-sub", topics = "batch-demo-4", batch = true, concurrency = "3",
subscriptionType = SubscriptionType.Failover,
pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler", ackMode = AckMode.MANUAL)
void listen(List<Message<Integer>> data, Consumer<Integer> consumer, Acknowledgment acknowledgment) {
for (Message<Integer> datum : data) {
if (datum.getValue() == 5) {
throw new PulsarBatchListenerFailedException("failed", datum);
}
acknowledgement.acknowledge(datum.getMessageId());
}
}
@Bean
PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
PulsarTemplate<String> pulsarTemplate) {
return new DefaultPulsarConsumerErrorHandler<>(
new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
}
@PulsarListener(subscriptionName = "my-dlt-subscription", topics = "my-foo-dlt")
void dltReceiver(Message<Integer> message) {
System.out.println("DLT - RECEIVED: " + message.getValue());
}
もう一度、pulsarConsumerErrorHandler
プロパティに PulsarConsumerErrorHandler
Bean という名前を付けます。(前の例に示すように) バッチリスナーを使用し、Spring for Apache Pulsar から PulsarConsumerErrorHandler
を使用する場合は、手動確認を使用する必要があります。こうすることで、成功した個々のメッセージをすべて確認できます。失敗したものについては、失敗したメッセージを含む PulsarBatchListenerFailedException
をスローする必要があります。この例外がなければ、フレームワークは失敗をどう処理すればよいのかわかりません。再試行時に、コンテナーは失敗したメッセージから始まるメッセージの新しいバッチをリスナーに送信します。再度失敗した場合は、再試行が完了するまで再試行され、その時点でメッセージが DLT に送信されます。その時点で、メッセージはコンテナーによって確認され、元のバッチ内の後続のメッセージがリスナーに渡されます。
8. PulsarListener でのコンシューマーのカスタマイズ
Spring for Apache Pulsar は、PulsarListener
で使用されるコンテナーによって作成されたコンシューマーをカスタマイズする便利な方法を提供します。アプリケーションは PulsarListenerConsumerBuilderCustomizer
に Bean を提供できます。ここに一例を示します。
@Bean
public PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
return cb -> {
cb.subscriptionName("modified-subscription-name");
};
}
次に、以下に示すように、このカスタマイザー Bean 名を PuslarListener
アノテーションの属性として提供できます。
@PulsarListener(subscriptionName = "my-subscription",
topics = "my-topic", consumerCustomizer = "myCustomizer")
void listen(String message) {
}
フレームワークは、PulsarListener
を通じて提供された Bean を検出し、Pulsar Consumer を作成する前にこのカスタマイザーを Consumer ビルダーに適用します。
複数の PulsarListener
メソッドがあり、それぞれに異なるカスタマイズルールがある場合は、複数のカスタマイザー Bean を作成し、各 PulsarListener
に適切なカスタマイザーをアタッチする必要があります。
9. メッセージリスナーコンテナーのライフサイクル
9.1. 一時停止と再開
アプリケーションがメッセージの消費を一時的に停止し、後で再開したい場合があります。Spring for Apache Pulsar は、基になるメッセージリスナーコンテナーを一時停止および再開する機能を提供します。Pulsar メッセージリスナーコンテナーが一時停止されると、Pulsar コンシューマーからデータを受信するためにコンテナーによって行われるポーリングはすべて一時停止されます。同様に、コンテナーが再開されると、一時停止中にトピックに新しいレコードが追加された場合、次のポーリングでデータが返され始めます。
リスナーコンテナーを一時停止または再開するには、以下のスニペットに示すように、まず PulsarListenerEndpointRegistry
Bean 経由でコンテナーインスタンスを取得し、次にコンテナーインスタンスで一時停止 / 再開 API を呼び出します。
@Autowired
private PulsarListenerEndpointRegistry registry;
void someMethod() {
PulsarMessageListenerContainer container = registry.getListenerContainer("my-listener-id");
container.pause();
}
getListenerContainer に渡される id パラメーターはコンテナー ID です。これは、@PulsarListener を一時停止 / 再開するときの @PulsarListener id 属性の値になります。 |
9.2. 起動失敗の処理
メッセージリスナーコンテナーは、アプリケーションコンテキストがリフレッシュされると起動されます。デフォルトでは、起動中に発生した障害はすべて再スローされ、アプリケーションの起動は失敗します。この動作は、対応するコンテナープロパティの StartupFailurePolicy
を使用して調整できます。
利用可能なオプションは次のとおりです。
Stop
(デフォルト) - 例外をログに記録して再スローし、アプリケーションを事実上停止します。Continue
- 例外をログに記録し、コンテナーを実行していない状態のままにしますが、アプリケーションは停止しません。Retry
- 例外をログに記録し、コンテナーを非同期的に起動しようとしますが、アプリケーションは停止しません。
デフォルトの再試行動作は、各試行の間に 10 秒の遅延を置いて 3 回再試行することです。ただし、対応するコンテナープロパティでカスタム再試行テンプレートを指定できます。再試行回数が尽きた後にコンテナーが再起動に失敗すると、コンテナーは非実行状態のままになります。
10. Pulsar リーダーのサポート
このフレームワークは、PulsarReaderFactory
経由で Pulsar リーダー [Apache] (英語) の使用をサポートします。
Spring Boot は、spring.pulsar.reader.*
(英語) アプリケーションプロパティのいずれかを指定することでさらに構成できるこのリーダーファクトリを提供します。
10.1. PulsarReader アノテーション
PulsarReaderFactory
を直接使用することもできますが、Spring for Apache Pulsar はリーダーファクトリを自分で設定せずにトピックからすばやく読み取るために使用できる PulsarReader
アノテーションを提供します。これは、PulsarListener.
の背後にある同じアイデアに似ています。簡単な例を次に示します。
@PulsarReader(id = "reader-demo-id", topics = "reader-demo-topic", startMessageId = "earliest")
void read(String message) {
//...
}
id
属性はオプションですが、アプリケーションにとって意味のある値を指定することがベストプラクティスです。指定しない場合は、自動生成された ID が使用されます。一方、topics
属性と startMessageId
属性は必須です。topics
属性には、単一のトピックまたはトピックのコンマ区切りリストを指定できます。startMessageId
属性は、トピック内の特定のメッセージから始めるようにリーダーに指示します。startMessageId
の有効な値は、earliest
または latest.
です。閲覧者が、利用可能な最も古いメッセージまたは最新のメッセージ以外のトピックから任意にメッセージの読み取りを開始できるようにしたいとします。その場合、ReaderBuilderCustomizer
を使用して ReaderBuilder
をカスタマイズし、開始する適切な MessageId
を認識する必要があります。
10.2. ReaderBuilder のカスタマイズ
Spring for Apache Pulsar の PulsarReaderReaderBuilderCustomizer
を使用して、ReaderBuilder
を通じて利用可能なフィールドをカスタマイズできます。以下のように、型 PulsarReaderReaderBuilderCustomizer
の @Bean
を提供し、それを PulsarReader
で使用できるようにすることができます。
@PulsarReader(id = "reader-customizer-demo-id", topics = "reader-customizer-demo-topic",
readerCustomizer = "myCustomizer")
void read(String message) {
//...
}
@Bean
public PulsarReaderReaderBuilderCustomizer<String> myCustomizer() {
return readerBuilder -> {
readerBuilder.startMessageId(messageId); // the first message read is after this message id.
// Any other customizations on the readerBuilder
};
}
アプリケーションに 1 つの @PulsarReader と 1 つの PulsarReaderReaderBuilderCustomizer Bean しか登録されていない場合、カスタマイザーは自動的に適用されます。 |
10.3. 起動失敗の処理
メッセージリスナーコンテナーは、アプリケーションコンテキストがリフレッシュされると起動されます。デフォルトでは、起動中に発生した障害はすべて再スローされ、アプリケーションの起動は失敗します。この動作は、対応するコンテナープロパティの StartupFailurePolicy
を使用して調整できます。
利用可能なオプションは次のとおりです。
Stop
(デフォルト) - 例外をログに記録して再スローし、アプリケーションを事実上停止します。Continue
- 例外をログに記録し、コンテナーを実行していない状態のままにしますが、アプリケーションは停止しません。Retry
- 例外をログに記録し、コンテナーを非同期的に起動しようとしますが、アプリケーションは停止しません。
デフォルトの再試行動作は、各試行の間に 10 秒の遅延を置いて 3 回再試行することです。ただし、対応するコンテナープロパティでカスタム再試行テンプレートを指定できます。再試行回数が尽きた後にコンテナーが再起動に失敗すると、コンテナーは非実行状態のままになります。