Pulsar ヘッダー
Pulsar にはファーストクラスの「ヘッダー」概念はありませんが、代わりにカスタムユーザープロパティのマップと、通常はメッセージヘッダーに保存されるメッセージメタデータ (例: id および event-time) にアクセスするメソッドが提供されます。「Pulsar メッセージヘッダー」と「Pulsar メッセージメタデータ」という用語は同じ意味で使用されます。利用可能なメッセージメタデータ (ヘッダー) のリストは、PulsarHeaders.java [GitHub] (英語) にあります。
Spring ヘッダー
Spring メッセージングは、MessageHeaders 抽象化を通じてファーストクラスの「ヘッダー」サポートを提供します。
Pulsar メッセージメタデータは、Spring メッセージヘッダーとして使用できます。使用可能なヘッダーのリストは PulsarHeaders.java [GitHub] (英語) にあります。
単一レコードベースのコンシューマーでのアクセス
次の例は、単一レコード消費モードを使用するアプリケーションでさまざまな Pulsar ヘッダーにアクセスする方法を示しています。
@PulsarListener(topics = "simpleListenerWithHeaders")
void simpleListenerWithHeaders(String data, @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
@Header(PulsarHeaders.RAW_DATA) byte[] rawData,
@Header("foo") String foo) {
} 前述の例では、messageId および rawData メッセージメタデータの値と、foo という名前のカスタムメッセージプロパティにアクセスします。Spring @Header アノテーションは各ヘッダーフィールドに使用されます。
Pulsar の Message をペイロードを運ぶエンベロープとして使用することもできます。その際、ユーザーはメタデータを取得するために Pulsar メッセージ上の対応するメソッドを直接呼び出すことができます。ただし、便宜上、Header アノテーションを使用して取得することもできます。Spring メッセージング Message エンベロープを使用してペイロードを伝送し、@Header を使用して Pulsar ヘッダーを取得することもできることに注意してください。
バッチレコードベースのコンシューマーでのアクセス
このセクションでは、バッチコンシューマーを使用するアプリケーションでさまざまな Pulsar ヘッダーにアクセスする方法を説明します。
@PulsarListener(topics = "simpleBatchListenerWithHeaders", batch = true)
void simpleBatchListenerWithHeaders(List<String> data,
@Header(PulsarHeaders.MESSAGE_ID) List<MessageId> messageIds,
@Header(PulsarHeaders.TOPIC_NAME) List<String> topicNames, @Header("foo") List<String> fooValues) {
} 前の例では、データを List<String> として消費します。さまざまなヘッダーを抽出するときも、List<> として抽出します。Spring for Apache Pulsar は、ヘッダーリストがデータリストに対応していることを保証します。
バッチリスナーを使用し、ペイロードを List<org.apache.pulsar.client.api.Message<?>、org.apache.pulsar.client.api.Messages<?>、または org.springframework.messaging.Messsge<?> として受信する場合も、同じ方法でヘッダーを抽出できます。
メッセージヘッダーのマッピング
PulsarHeaderMapper 戦略は、ヘッダーを Pulsar ユーザープロパティおよび Spring MessageHeaders との間でマップするために提供されます。
そのインターフェース定義は次のとおりです。
public interface PulsarHeaderMapper {
Map<String, String> toPulsarHeaders(MessageHeaders springHeaders);
MessageHeaders toSpringHeaders(Message<?> pulsarMessage);
}フレームワークは、いくつかのマッパー実装を提供します。
JsonPulsarHeaderMapperは、豊富なヘッダー型をサポートするためにヘッダーを JSON としてマップし、Jackson JSON ライブラリがクラスパス上にある場合のデフォルトです。ToStringPulsarHeaderMapperは、ヘッダー値のtoString()メソッドを使用してヘッダーを文字列としてマップし、フォールバックマッパーです。
JSON ヘッダーマッパー
JsonPulsarHeaderMapper は、<key>:<type> の JSON マップを含む「特別な」ヘッダー ( spring_json_header_types のキーを持つ) を使用します。このヘッダーは受信側 (Pulsar → Spring) で使用され、各ヘッダー値を元の型に適切に変換します。
信頼できるパッケージ
デフォルトでは、JSON マッパーはすべてのパッケージ内のクラスを逆直列化します。ただし、信頼できないソースからメッセージを受信した場合は、提供するカスタム構成の JsonPulsarHeaderMapper Bean の trustedPackages プロパティを介して、信頼できるパッケージのみを追加することもできます。
ToString クラス
特定の型は JSON 直列化に適していないため、これらの型には単純な toString() 直列化が推奨される場合があります。JsonPulsarHeaderMapper には addToStringClasses() と呼ばれるプロパティがあり、送信 マッピングでこのように処理する必要があるクラスの名前を指定できます。受信マッピング中に、これらは String としてマッピングされます。デフォルトでは、org.springframework.util.MimeType と org.springframework.http.MediaType のみがこの方法でマップされます。
カスタム ObjectMapper
JSON マッパーは、ヘッダー値の直列化を処理するために、適切に構成された Jackson 2 ObjectMapper を使用します。ただし、カスタムオブジェクトマッパーを提供するには、pulsarHeaderObjectMapper という名前の ObjectMapper Bean を提供する必要があります。たとえば、次のようになります。
@Configuration(proxyBeanMethods = false)
static class PulsarHeadersCustomObjectMapperTestConfig {
@Bean(name = "pulsarHeaderObjectMapper")
ObjectMapper customObjectMapper() {
var objectMapper = new ObjectMapper();
// do things with your special header object mapper here
return objectMapper;
}
} 上記の例のオブジェクトマッパーは、影付きの org.apache.pulsar.shade.com.fasterxml.jackson.databind.ObjectMapper ではなく、com.fasterxml.jackson.databind.ObjectMapper のインスタンスである必要があります。 |
| ここでも、Jackson 2 と Jackson 3 に関する同じ制限が適用されます。 |
受信 / 送信パターン
受信側では、デフォルトでは、すべての Pulsar ヘッダー (メッセージメタデータとユーザープロパティ) が MessageHeaders にマップされます。送信側では、デフォルトでは、id、timestamp、Pulsar メッセージメタデータを表すヘッダー (つまり、pulsar_message_ で始まるヘッダー) を除くすべての MessageHeaders がマップされます。提供したマッパー Bean で inboundPatterns および outboundPatterns を構成することにより、受信メッセージと送信メッセージにマップされるヘッダーを指定できます。メタデータヘッダーではパターンがサポートされていないため、outboundPatterns に正確なヘッダー名を追加することにより、送信メッセージに Pulsar メッセージメタデータヘッダーを含めることができます。パターンはかなり単純で、先頭のワイルドカード (*)、末尾のワイルドカード、またはその両方 (たとえば、*.cat.*) を含めることができます。先頭の ! でパターンを否定できます。ヘッダー名に一致する最初のパターン (正か負かに関係なく) が優先されます。
独自のパターンを提供する場合は、!id および !timestamp を含めることをお勧めします。これらのヘッダーは受信側で読み取り専用であるためです。 |