メッセージヘッダー

0.11.0.0 クライアントでは、メッセージ内のヘッダーのサポートが導入されました。バージョン 2.0 の時点で、Spring for Apache Kafka はこれらのヘッダーと spring-messagingMessageHeaders 間のマッピングをサポートするようになりました。

以前のバージョンでは、ConsumerRecord および ProducerRecord を spring-messaging Message<?> にマップしていました。ここで、値プロパティは payload との間でマップされ、他のプロパティ (topicpartition など) はヘッダーにマップされていました。これは引き続き当てはまりますが、追加の (任意の) ヘッダーをマップできるようになりました。

Apache Kafka ヘッダーには、次のインターフェース定義に示すようなシンプルな API があります。

public interface Header {

    String key();

    byte[] value();

}

KafkaHeaderMapper 戦略は、Kafka Headers と MessageHeaders の間でヘッダーエントリをマップするために提供されます。そのインターフェースの定義は次のとおりです。

public interface KafkaHeaderMapper {

    void fromHeaders(MessageHeaders headers, Headers target);

    void toHeaders(Headers source, Map<String, Object> target);

}

SimpleKafkaHeaderMapper は、生のヘッダーを byte[] としてマップし、String 値に変換するための構成オプションを備えています。

DefaultKafkaHeaderMapper はキーを MessageHeaders ヘッダー名にマップし、送信メッセージの豊富なヘッダー型をサポートするために、JSON 変換が実行されます。"special" ヘッダー (spring_json_header_types のキーを持つ) には、<key>:<type> の JSON マップが含まれています。このヘッダーは、各ヘッダー値を元の型に適切に変換するために受信側で使用されます。

受信側では、すべての Kafka Header インスタンスが MessageHeaders にマップされます。送信側では、デフォルトで、idtimestampConsumerRecord プロパティにマッピングされるヘッダーを除くすべての MessageHeaders がマッピングされます。

マッパーにパターンを提供することで、送信 メッセージにマップするヘッダーを指定できます。次のリストは、多くのマッピング例を示しています。

public DefaultKafkaHeaderMapper() { (1)
    ...
}

public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) { (2)
    ...
}

public DefaultKafkaHeaderMapper(String... patterns) { (3)
    ...
}

public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) { (4)
    ...
}
1 例の前に説明したように、デフォルトの Jackson ObjectMapper を使用し、ほとんどのヘッダーをマップします。
2 例の前に説明したように、提供された Jackson ObjectMapper を使用し、ほとんどのヘッダーをマップします。
3 デフォルトの Jackson ObjectMapper を使用し、提供されたパターンに従ってヘッダーをマップします。
4 提供された Jackson ObjectMapper を使用し、提供されたパターンに従ってヘッダーをマッピングします。

パターンはかなり単純で、先頭のワイルドカード (*)、末尾のワイルドカード、またはその両方 (たとえば、*.cat.*) を含めることができます。先行する ! でパターンを否定できます。ヘッダー名 (正か負かにかかわらず) に一致する最初のパターンが勝ちます。

独自のパターンを提供する場合は、!id および !timestamp を含めることをお勧めします。これらのヘッダーは受信側で読み取り専用であるためです。

デフォルトでは、マッパーは java.lang および java.util のクラスのみをデシリアライズします。addTrustedPackages メソッドを使用して信頼できるパッケージを追加することにより、他の (またはすべての) パッケージを信頼できます。信頼できないソースからメッセージを受信した場合、信頼できるパッケージのみを追加することをお勧めします。すべてのパッケージを信頼するには、mapper.addTrustedPackages("*") を使用できます。
String ヘッダー値を未加工の形式でマッピングすると、マッパーの JSON 形式を認識していないシステムと通信する場合に役立ちます。

バージョン 2.2.5 以降、特定の文字列値のヘッダーを JSON を使用してマップするのではなく、生の byte[] との間でマップするように指定できます。AbstractKafkaHeaderMapper には新しいプロパティがあります。mapAllStringsOut が true に設定されている場合、すべての文字列値ヘッダーは charset プロパティ (デフォルトは UTF-8) を使用して byte[] に変換されます。さらに、header name : boolean のマップであるプロパティ rawMappedHeaders があります。マップにヘッダー名が含まれていて、ヘッダーに String 値が含まれている場合、文字セットを使用して生の byte[] としてマップされます。このマップは、マップ値のブール値が true である場合にのみ、文字セットを使用して生の受信 byte[] ヘッダーを String にマップするためにも使用されます。ブール値が false である場合、またはヘッダー名が true 値を持つマップにない場合、受信ヘッダーは単に生のマップされていないヘッダーとしてマップされます。

次のテストケースは、このメカニズムを示しています。

@Test
public void testSpecificStringConvert() {
    DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
    Map<String, Boolean> rawMappedHeaders = new HashMap<>();
    rawMappedHeaders.put("thisOnesAString", true);
    rawMappedHeaders.put("thisOnesBytes", false);
    mapper.setRawMappedHeaders(rawMappedHeaders);
    Map<String, Object> headersMap = new HashMap<>();
    headersMap.put("thisOnesAString", "thing1");
    headersMap.put("thisOnesBytes", "thing2");
    headersMap.put("alwaysRaw", "thing3".getBytes());
    MessageHeaders headers = new MessageHeaders(headersMap);
    Headers target = new RecordHeaders();
    mapper.fromHeaders(headers, target);
    assertThat(target).containsExactlyInAnyOrder(
            new RecordHeader("thisOnesAString", "thing1".getBytes()),
            new RecordHeader("thisOnesBytes", "thing2".getBytes()),
            new RecordHeader("alwaysRaw", "thing3".getBytes()));
    headersMap.clear();
    mapper.toHeaders(target, headersMap);
    assertThat(headersMap).contains(
            entry("thisOnesAString", "thing1"),
            entry("thisOnesBytes", "thing2".getBytes()),
            entry("alwaysRaw", "thing3".getBytes()));
}

デフォルトでは、両方のヘッダーマッパーがすべての受信ヘッダーをマップします。バージョン 2.8.8 以降、パターンは受信マッピングにも適用できます。受信マッピング用のマッパーを作成するには、それぞれのマッパーで静的メソッドの 1 つを使用します。

public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}

public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper objectMapper, String... patterns) {
}

public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}

例:

DefaultKafkaHeaderMapper inboundMapper = DefaultKafkaHeaderMapper.forInboundOnlyWithMatchers("!abc*", "*");

これにより、abc で始まるすべてのヘッダーが除外され、他のすべてのヘッダーが含まれます。

デフォルトでは、Jackson がクラスパス上にある限り、DefaultKafkaHeaderMapper は MessagingMessageConverter および BatchMessagingMessageConverter で使用されます。

バッチコンバーターを使用すると、変換されたヘッダーは KafkaHeaders.BATCH_CONVERTED_HEADERS で List<Map<String, Object>> として使用でき、リストの位置にあるマップはペイロードのデータ位置に対応します。

コンバーターがない場合 ( Jackson が存在しないか、明示的に null に設定されているため)、コンシューマーレコードのヘッダーは変換されずに KafkaHeaders.NATIVE_HEADERS ヘッダーに提供されます。このヘッダーは Headers オブジェクト (バッチコンバーターの場合は List<Headers> ) であり、リスト内の位置はペイロード内のデータの位置に対応します。

特定の型は JSON 直列化に適していないため、これらの型には単純な toString() 直列化が適している場合があります。DefaultKafkaHeaderMapper には addToStringClasses() というメソッドがあり、送信 マッピングでこの方法で処理する必要があるクラスの名前を指定できます。受信・マッピング中に、それらは String としてマッピングされます。デフォルトでは、org.springframework.util.MimeType と org.springframework.http.MediaType のみがこの方法でマッピングされます。
バージョン 2.3 以降、文字列値ヘッダーの処理が簡素化されています。このようなヘッダーは、デフォルトでは JSON エンコードされなくなりました (つまり、エンクロージング "..." が追加されていません)。型は引き続き JSON_TYPES ヘッダーに追加されるため、受信側システムは ( byte[] から) String に変換し直すことができます。マッパーは、古いバージョンで生成されたヘッダーを処理 (デコード) できます (先頭の " をチェックします)。このようにして、2.3 を使用するアプリケーションは、古いバージョンのレコードを使用できます。
2.3 を使用するバージョンによって生成されたレコードが、以前のバージョンを使用するアプリケーションによって使用される可能性がある場合、以前のバージョンとの互換性を維持するには、encodeStrings を true に設定します。すべてのアプリケーションが 2.3 以上を使用している場合は、プロパティをデフォルト値の false のままにすることができます。
@Bean
MessagingMessageConverter converter() {
    MessagingMessageConverter converter = new MessagingMessageConverter();
    DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
    mapper.setEncodeStrings(true);
    converter.setHeaderMapper(mapper);
    return converter;
}

Spring Boot を使用している場合、このコンバーター Bean を自動構成された KafkaTemplate に自動構成します。それ以外の場合は、このコンバーターをテンプレートに追加する必要があります。