直列化、逆直列化、メッセージ変換

概要

Apache Kafka は、レコード値とそのキーを直列化およびデ直列化するための高レベル API を提供します。これは、いくつかの組み込み実装を備えた org.apache.kafka.common.serialization.Serializer<T> および org.apache.kafka.common.serialization.Deserializer<T> 抽象化に存在します。一方、Producer または Consumer 構成プロパティを使用して、シリアライザークラスとデシリアライザークラスを指定できます。次の例は、その方法を示しています。

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

より複雑なケースや特殊なケースでは、KafkaConsumer (したがって KafkaProducer) は、keys および values のそれぞれの Serializer および Deserializer インスタンスを受け入れるオーバーロードされたコンストラクターを提供します。

この API を使用する場合、DefaultKafkaProducerFactory および DefaultKafkaConsumerFactory は (コンストラクターまたは setter メソッドを介して) プロパティを提供し、カスタム Serializer および Deserializer インスタンスをターゲット Producer または Consumer に挿入します。また、コンストラクターを介して Supplier<Serializer> または Supplier<Deserializer> インスタンスを渡すこともできます。これらの Supplier は、各 Producer または Consumer の作成時に呼び出されます。

文字列の直列化

バージョン 2.5 以降、Spring for Apache Kafka は、エンティティのストリング表現を使用する ToStringSerializer および ParseStringDeserializer クラスを提供します。それらは、メソッド toString といくつかの Function<String> または BiFunction<String, Headers> に依存して、文字列を解析し、インスタンスのプロパティを設定します。通常、これは parse などのクラスの静的メソッドを呼び出します。

ToStringSerializer<Thing> thingSerializer = new ToStringSerializer<>();
//...
ParseStringDeserializer<Thing> deserializer = new ParseStringDeserializer<>(Thing::parse);

デフォルトでは、ToStringSerializer は、レコード Headers 内の直列化されたエンティティに関する型情報を伝えるように構成されています。これを無効にするには、addTypeInfo プロパティを false に設定します。この情報は受信側の ParseStringDeserializer で利用できます。

  • ToStringSerializer.ADD_TYPE_INFO_HEADERS (デフォルト true): これを false に設定して、ToStringSerializer でこの機能を無効にすることができます (addTypeInfo プロパティを設定します)。

ParseStringDeserializer<Object> deserializer = new ParseStringDeserializer<>((str, headers) -> {
    byte[] header = headers.lastHeader(ToStringSerializer.VALUE_TYPE).value();
    String entityType = new String(header);

    if (entityType.contains("Thing")) {
        return Thing.parse(str);
    }
    else {
        // ...parsing logic
    }
});

String を byte[] との間で変換するために使用される Charset を構成できます。デフォルトは UTF-8 です。

ConsumerConfig プロパティを使用して、パーサーメソッドの名前でデシリアライザーを構成できます。

  • ParseStringDeserializer.KEY_PARSER

  • ParseStringDeserializer.VALUE_PARSER

プロパティには、クラスの完全修飾名とそれに続くメソッド名をピリオド . で区切って含める必要があります。メソッドは静的で、(String, Headers) または (String) のいずれかの署名を持つ必要があります。

Kafka ストリームで使用するための ToFromStringSerde も提供されます。

JSON

Spring for Apache Kafka は、Jackson JSON オブジェクトマッパーに基づく JsonSerializer および JsonDeserializer 実装も提供します。JsonSerializer では、任意の Java オブジェクトを JSON byte[] として書き込むことができます。JsonDeserializer には、消費された byte[] を適切なターゲットオブジェクトに逆直列化できるように、追加の Class<?> targetType 引数が必要です。次の例は、JsonDeserializer を作成する方法を示しています。

JsonDeserializer<Thing> thingDeserializer = new JsonDeserializer<>(Thing.class);

JsonSerializer と JsonDeserializer の両方を ObjectMapper でカスタマイズできます。継承して、configure(Map<String, ?> configs, boolean isKey) メソッドで特定の構成ロジックを実装することもできます。

バージョン 2.3 以降、すべての JSON 対応コンポーネントはデフォルトで JacksonUtils.enhancedObjectMapper() インスタンスで構成され、MapperFeature.DEFAULT_VIEW_INCLUSION および DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES 機能が無効になっています。また、このようなインスタンスには、Java 時間や Kotlin サポートなどのカスタムデータ型用のよく知られたモジュールが付属しています。詳細については、JacksonUtils.enhancedObjectMapper() JavaDocs を参照してください。このメソッドは、ネットワークを介したプラットフォーム間の互換性のために、org.springframework.util.MimeType オブジェクトの直列化用の org.springframework.kafka.support.JacksonMimeTypeModule をプレーン文字列に登録します。JacksonMimeTypeModule は、アプリケーションコンテキストで Bean として登録でき、Spring Boot ObjectMapper インスタンスに自動構成されます。

また、バージョン 2.3 以降、JsonDeserializer は、ターゲットジェネリクスコンテナー型の処理を改善するための TypeReference ベースのコンストラクターを提供します。

バージョン 2.1 以降、レコード Headers で型情報を伝達できるようになり、複数の型を処理できるようになりました。さらに、次の Kafka プロパティを使用してシリアライザーとデシリアライザーを構成できます。KafkaConsumer および KafkaProducer にそれぞれ Serializer および Deserializer インスタンスを提供した場合、効果がありません。

プロパティの構成

  • JsonSerializer.ADD_TYPE_INFO_HEADERS (デフォルト true): これを false に設定して、JsonSerializer でこの機能を無効にすることができます (addTypeInfo プロパティを設定します)。

  • JsonSerializer.TYPE_MAPPINGS (デフォルト empty): マッピング型を参照してください。

  • JsonDeserializer.USE_TYPE_INFO_HEADERS (デフォルト true): シリアライザーによって設定されたヘッダーを無視するには、これを false に設定できます。

  • JsonDeserializer.REMOVE_TYPE_INFO_HEADERS (デフォルト true): これを false に設定すると、シリアライザーによって設定されたヘッダーを保持できます。

  • JsonDeserializer.KEY_DEFAULT_TYPE: ヘッダー情報が存在しない場合のキーの逆直列化のフォールバック型。

  • JsonDeserializer.VALUE_DEFAULT_TYPE: ヘッダー情報が存在しない場合の値の逆直列化のフォールバック型。

  • JsonDeserializer.TRUSTED_PACKAGES (デフォルトは java.utiljava.lang): 逆直列化が許可されるパッケージパターンのカンマ区切りのリスト。* はすべてを逆直列化することを意味します。

  • JsonDeserializer.TYPE_MAPPINGS (デフォルト empty): マッピング型を参照してください。

  • JsonDeserializer.KEY_TYPE_METHOD (デフォルト empty): メソッドを使用して型を決定するを参照してください。

  • JsonDeserializer.VALUE_TYPE_METHOD (デフォルト empty): メソッドを使用して型を決定するを参照してください。

バージョン 2.2 以降、型情報ヘッダー(シリアライザーによって追加された場合)はデシリアライザーによって削除されます。removeTypeHeaders プロパティを false に設定することにより、デシリアライザで直接、前述の構成プロパティを使用して、以前の動作に戻すことができます。

バージョン 2.8 以降、プログラム構築に示すようにプログラムでシリアライザーまたはデシリアライザーを構築する場合、プロパティを明示的に設定していない限り(set*() メソッドまたは流れるような API を使用)、上記のプロパティがファクトリによって適用されます。以前は、プログラムで作成するときに、構成プロパティが適用されることはありませんでした。これは、オブジェクトにプロパティを直接明示的に設定した場合にも当てはまります。

マッピング型

バージョン 2.2 以降、JSON を使用する場合、前述のリストのプロパティを使用して型 マッピングを提供できるようになりました。以前は、シリアライザーとデシリアライザー内の型マッパーをカスタマイズする必要がありました。マッピングは、token:className ペアのコンマ区切りリストで構成されます。送信では、ペイロードのクラス名が対応するトークンにマッピングされます。受信では、型 ヘッダーのトークンが対応するクラス名にマップされます。

次の例では、一連のマッピングを作成します。

senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.mycat.Cat, hat:com.myhat.Hat");
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeserializer.TYPE_MAPPINGS, "cat:com.yourcat.Cat, hat:com.yourhat.Hat");
対応するオブジェクトには互換性が必要です。

Spring Boot を使用する場合、これらのプロパティを application.properties (または yaml) ファイルで提供できます。次の例は、その方法を示しています。

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.type.mapping=cat:com.mycat.Cat,hat:com.myhat.Hat

プロパティを使用した簡単な構成のみを実行できます。より高度な構成 (シリアライザーとデシリアライザーでカスタム ObjectMapper を使用するなど) の場合は、事前に構築されたシリアライザーとデシリアライザーを受け入れるプロデューサーとコンシューマーファクトリコンストラクターを使用する必要があります。次の Spring Boot の例は、デフォルトのファクトリをオーバーライドします。

@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(JsonDeserializer customValueDeserializer) {
    Map<String, Object> properties = new HashMap<>();
    // properties.put(..., ...)
    // ...
    return new DefaultKafkaConsumerFactory<>(properties,
        new StringDeserializer(), customValueDeserializer);
}

@Bean
public ProducerFactory<?, ?> kafkaProducerFactory(JsonSerializer customValueSerializer) {
    return new DefaultKafkaProducerFactory<>(properties.buildProducerProperties(),
        new StringSerializer(), customValueSerializer);
}

これらのコンストラクターを使用する代わりに、Setter も提供されています。

Spring Boot を使用して、上記のように ConsumerFactory と ProducerFactory をオーバーライドする場合、ワイルドカードジェネリクス型を Bean メソッドの戻り値の型とともに使用する必要があります。代わりに具体的なジェネリクス型が指定されている場合、Spring Boot はこれらの Bean を無視し、デフォルトの Bean を使用します。

バージョン 2.2 以降では、ブール値 useHeadersIfPresent 引数 (デフォルトでは true ) を持つオーバーロードされたコンストラクターの 1 つを使用して、提供されたターゲット型を使用し、ヘッダー内の型情報を無視するようにデシリアライザーを明示的に構成できます。次の例は、その方法を示しています。

DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
        new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));

メソッドを使用して型を決定する

バージョン 2.5 以降、プロパティを介してデシリアライザーを構成し、ターゲット型を決定するメソッドを呼び出すことができるようになりました。存在する場合、これは上で議論された他のテクニックのいずれかを上書きします。これは、データが Spring シリアライザーを使用しないアプリケーションによって公開され、データまたは他のヘッダーに応じて異なる型に逆直列化する必要がある場合に役立ちます。これらのプロパティをメソッド名に設定します。完全修飾クラス名の後にメソッド名が続き、ピリオド . で区切られます。メソッドは public static として宣言する必要があり、3 つの署名 (String topic, byte[] data, Headers headers)(byte[] data, Headers headers) または (byte[] data) のいずれかを持ち、Jackson JavaType を返します。

  • JsonDeserializer.KEY_TYPE_METHOD : spring.json.key.type.method

  • JsonDeserializer.VALUE_TYPE_METHOD : spring.json.value.type.method

任意のヘッダーを使用するか、データをインスペクションして型を判別できます。

JavaType thing1Type = TypeFactory.defaultInstance().constructType(Thing1.class);

JavaType thing2Type = TypeFactory.defaultInstance().constructType(Thing2.class);

public static JavaType thingOneOrThingTwo(byte[] data, Headers headers) {
    // {"thisIsAFieldInThing1":"value", ...
    if (data[21] == '1') {
        return thing1Type;
    }
    else {
        return thing2Type;
    }
}

より高度なデータの場合、インスペクション は JsonPath などの使用を検討しますが、型を判別するためのテストが単純なほど、プロセスはより効率的になります。

以下は、デシリアライザーをプログラムで作成する例です (コンストラクターでコンシューマーファクトリにデシリアライザーを提供する場合)。

JsonDeserializer<Object> deser = new JsonDeserializer<>()
        .trustedPackages("*")
        .typeResolver(SomeClass::thing1Thing2JavaTypeForTopic);

...

public static JavaType thing1Thing2JavaTypeForTopic(String topic, byte[] data, Headers headers) {
    ...
}

プログラム構築

プロデューサー / コンシューマーファクトリで使用するためにシリアライザー / デシリアライザーをプログラムで構築する場合、バージョン 2.3 以降、流れるような API を使用できるため、構成が簡単になります。

@Bean
public ProducerFactory<MyKeyType, MyValueType> pf() {
    Map<String, Object> props = new HashMap<>();
    // props.put(..., ...)
    // ...
    DefaultKafkaProducerFactory<MyKeyType, MyValueType> pf = new DefaultKafkaProducerFactory<>(props,
        new JsonSerializer<MyKeyType>()
            .forKeys()
            .noTypeInfo(),
        new JsonSerializer<MyValueType>()
            .noTypeInfo());
    return pf;
}

@Bean
public ConsumerFactory<MyKeyType, MyValueType> cf() {
    Map<String, Object> props = new HashMap<>();
    // props.put(..., ...)
    // ...
    DefaultKafkaConsumerFactory<MyKeyType, MyValueType> cf = new DefaultKafkaConsumerFactory<>(props,
        new JsonDeserializer<>(MyKeyType.class)
            .forKeys()
            .ignoreTypeHeaders(),
        new JsonDeserializer<>(MyValueType.class)
            .ignoreTypeHeaders());
    return cf;
}

メソッドを使用して型を決定すると同様に、プログラムで型マッピングを提供するには、typeFunction プロパティを使用します。

JsonDeserializer<Object> deser = new JsonDeserializer<>()
        .trustedPackages("*")
        .typeFunction(MyUtils::thingOneOrThingTwo);

または、流れるような API を使用してプロパティを構成したり、set*() メソッドを使用して設定したりしない限り、ファクトリは構成プロパティを使用してシリアライザー / デシリアライザーを構成します。プロパティの構成を参照してください。

シリアライザーとデシリアライザーの委譲

ヘッダーの使用

バージョン 2.3 は DelegatingSerializer および DelegatingDeserializer を導入しました。これにより、さまざまなキーおよび / または値型でレコードを生成および使用できます。プロデューサーは、ヘッダー DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR を、値に使用するシリアライザーとキーに DelegatingSerializer.KEY_SERIALIZATION_SELECTOR を選択するために使用されるセレクター値に設定する必要があります。一致するものが見つからない場合は、IllegalStateException がスローされます。

受信レコードの場合、デシリアライザーは同じヘッダーを使用して、使用するデシリアライザーを選択します。一致するものが見つからないか、ヘッダーが存在しない場合は、生の byte[] が返されます。

コンストラクターを介してセレクターのマップを Serializer / Deserializer に構成するか、キー DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG および DelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIG を使用して Kafka プロデューサー / コンシューマープロパティを介して構成できます。シリアライザーの場合、プロデューサープロパティは、キーがセレクターで値が Serializer インスタンス、シリアライザー Class、またはクラス名である Map<String, Object> にすることができます。プロパティは、次に示すように、カンマ区切りのマップエントリの文字列にすることもできます。

デシリアライザーの場合、コンシューマープロパティは、キーがセレクターで値が Deserializer インスタンス、デシリアライザー Class、またはクラス名である Map<String, Object> にすることができます。プロパティは、次に示すように、カンマ区切りのマップエントリの文字列にすることもできます。

プロパティを使用して構成するには、次の構文を使用します。

producerProps.put(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
    "thing1:com.example.MyThing1Serializer, thing2:com.example.MyThing2Serializer")

consumerProps.put(DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
    "thing1:com.example.MyThing1Deserializer, thing2:com.example.MyThing2Deserializer")

その後、プロデューサーは DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR ヘッダーを thing1 または thing2 に設定します。

この手法は、異なる型を同じトピック (または異なるトピック) に送信することをサポートします。

バージョン 2.5.1 以降、型 (キーまたは値) が Serdes でサポートされている標準型 (LongInteger など) のいずれかである場合、セレクターヘッダーを設定する必要はありません。代わりに、シリアライザーはヘッダーを型のクラス名に設定します。これらの型のシリアライザーまたはデシリアライザーを構成する必要はありません。動的に (1 回だけ) 作成されます。

異なる型を異なるトピックに送信する別の手法については、RoutingKafkaTemplate を使用するを参照してください。

タイプ順

バージョン 2.8 は DelegatingByTypeSerializer を導入しました。

@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
    return new DefaultKafkaProducerFactory<>(config,
            null, new DelegatingByTypeSerializer(Map.of(
                    byte[].class, new ByteArraySerializer(),
                    Bytes.class, new BytesSerializer(),
                    String.class, new StringSerializer())));
}

バージョン 2.8.3 以降では、マップキーがターゲットオブジェクトから割り当て可能かどうかを確認するようにシリアライザーを構成できます。これは、デリゲートシリアライザーがサブクラスをシリアライズできる場合に役立ちます。この場合、不一致の一致がある場合は、LinkedHashMap などの順序付けられた Map を提供する必要があります。

トピック別

バージョン 2.8 以降、DelegatingByTopicSerializer および DelegatingByTopicDeserializer では、トピック名に基づいてシリアライザー / デシリアライザーを選択できます。正規表現 Pattern は、使用するインスタンスを検索するために使用されます。マップは、コンストラクターを使用して、またはプロパティ(pattern:serializer のコンマ区切りリスト)を介して構成できます。

producerConfigs.put(DelegatingByTopicSerializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
            "topic[0-4]:" + ByteArraySerializer.class.getName()
        + ", topic[5-9]:" + StringSerializer.class.getName());
...
ConsumerConfigs.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
            "topic[0-4]:" + ByteArrayDeserializer.class.getName()
        + ", topic[5-9]:" + StringDeserializer.class.getName());

これをキーに使用する場合は、KEY_SERIALIZATION_TOPIC_CONFIG を使用してください。

@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
    return new DefaultKafkaProducerFactory<>(config,
            new IntegerSerializer(),
            new DelegatingByTopicSerializer(Map.of(
                    Pattern.compile("topic[0-4]"), new ByteArraySerializer(),
                    Pattern.compile("topic[5-9]"), new StringSerializer())),
                    new JsonSerializer<Object>());  // default
}

DelegatingByTopicSerialization.KEY_SERIALIZATION_TOPIC_DEFAULT および DelegatingByTopicSerialization.VALUE_SERIALIZATION_TOPIC_DEFAULT を使用して、パターン一致がない場合に使用するデフォルトのシリアライザー / デシリアライザーを指定できます。

追加のプロパティ DelegatingByTopicSerialization.CASE_SENSITIVE (デフォルトは true)を false に設定すると、トピックルックアップで大文字と小文字が区別されなくなります。

デシリアライザーの再試行

RetryingDeserializer は、デリゲートの直列化解除中にネットワークの課題などの一時的なエラーが発生した可能性がある場合に、デリゲート Deserializer および RetryTemplate を使用して直列化解除を再試行します。

ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
    new RetryingDeserializer(myUnreliableKeyDeserializer, retryTemplate),
    new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));

バージョン 3.1.2 以降、オプションで RecoveryCallback を RetryingDeserializer に設定できます。

再試行ポリシー、バックオフポリシーなどを使用した RetryTemplate の構成については、spring-retry [GitHub] (英語) プロジェクトを参照してください。

Spring メッセージングメッセージ変換

Serializer および Deserializer API は、低レベルの Kafka Consumer および Producer の観点からは非常にシンプルで柔軟ですが、@KafkaListener または Spring Integration の Apache Kafka サポートを使用する場合は、Spring メッセージングレベルでより柔軟性が必要になる場合があります。org.springframework.messaging.Message との間で簡単に変換できるようにするために、Spring for Apache Kafka は MessagingMessageConverter 実装とその JsonMessageConverter (およびサブクラス) のカスタマイズで MessageConverter 抽象化を提供します。@KafkaListener.containerFactory() プロパティの AbstractKafkaListenerContainerFactory Bean 定義を使用して、MessageConverter を KafkaTemplate インスタンスに直接挿入できます。次の例は、その方法を示しています。

@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRecordMessageConverter(new JsonMessageConverter());
    return factory;
}
...
@KafkaListener(topics = "jsonData",
                containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(Cat cat) {
...
}

Spring Boot を使用する場合は、コンバーターを @Bean として定義するだけで、Spring Boot の自動構成によって自動構成されたテンプレートとコンテナーファクトリに接続されます。

@KafkaListener を使用する場合、パラメーター型がメッセージコンバーターに提供され、変換を支援します。

この型推論は、@KafkaListener アノテーションがメソッドレベルで宣言されている場合にのみ実現できます。クラスレベルの @KafkaListener では、ペイロード型を使用して呼び出す @KafkaHandler メソッドを選択するため、メソッドを選択する前にすでに変換されている必要があります。

コンシューマー側では、JsonMessageConverter を構成できます。型 byte[]BytesString の ConsumerRecord 値を処理できるため、ByteArrayDeserializerBytesDeserializerStringDeserializer と組み合わせて使用する必要があります。(byte[] および Bytes は、不必要な byte[] から String への変換を回避するため、より効率的です)。必要に応じて、デシリアライザーに対応する JsonMessageConverter の特定のサブクラスを構成することもできます。

プロデューサー側では、Spring Integration または KafkaTemplate.send(Message<?> message) メソッド ( KafkaTemplate を使用するを参照) を使用する場合、構成済みの Kafka Serializer と互換性のあるメッセージコンバーターを構成する必要があります。

  • StringJsonMessageConverter と StringSerializer

  • BytesJsonMessageConverter と BytesSerializer

  • ByteArrayJsonMessageConverter と ByteArraySerializer

ここでも、byte[] または Bytes を使用すると、String から byte[] への変換が回避されるため、より効率的です。

便宜上、バージョン 2.3 以降、フレームワークは 3 つの値型すべてを直列化できる StringOrBytesSerializer も提供し、任意のメッセージコンバーターで使用できるようにします。

バージョン 2.7.1 以降、メッセージペイロードの変換を spring-messagingSmartMessageConverter に委譲できます。これにより、たとえば、MessageHeaders.CONTENT_TYPE ヘッダーに基づく変換が可能になります。

KafkaMessageConverter.fromMessage() メソッドは、ProducerRecord.value() プロパティ内のメッセージペイロードを使用して ProducerRecord に送信変換するために呼び出されます。KafkaMessageConverter.toMessage() メソッドは、ペイロードが ConsumerRecord.value() プロパティである ConsumerRecord からの受信変換のために呼び出されます。SmartMessageConverter.toMessage() メソッドは、fromMessage() (通常は KafkaTemplate.send(Message<?> msg) によって) に渡される Message から新しい送信 Message<?> を作成するために呼び出されます。同様に、KafkaMessageConverter.toMessage() メソッドでは、コンバーターが ConsumerRecord から新しい Message<?> を作成した後、SmartMessageConverter.fromMessage() メソッドが呼び出され、新しく変換されたペイロードを使用して最終受信メッセージが作成されます。いずれの場合も、SmartMessageConverter が null を返す場合は、元のメッセージが使用されます。

デフォルトのコンバーターが KafkaTemplate およびリスナーコンテナーファクトリで使用される場合、テンプレートで setMessagingConverter() を呼び出し、@KafkaListener メソッドの contentTypeConverter プロパティを介して SmartMessageConverter を構成します。

例:

template.setMessagingConverter(mySmartConverter);
@KafkaListener(id = "withSmartConverter", topics = "someTopic",
    contentTypeConverter = "mySmartConverter")
public void smart(Thing thing) {
    ...
}

Spring Data 射影 インターフェースの使用

バージョン 2.1.1 以降、JSON を具象型の代わりに Spring Data Projection インターフェースに変換できます。これにより、JSON ドキュメント内の複数の場所からの値のルックアップなど、データへの非常に選択的で低結合のバインドが可能になります。たとえば、次のインターフェースをメッセージペイロード型として定義できます。

interface SomeSample {

  @JsonPath({ "$.username", "$.user.name" })
  String getUsername();

}
@KafkaListener(id="projection.listener", topics = "projection")
public void projection(SomeSample in) {
    String username = in.getUsername();
    ...
}

デフォルトでは、アクセサーメソッドを使用して、受信した JSON ドキュメント内のフィールドとしてプロパティ名を検索します。@JsonPath 式を使用すると、値の検索をカスタマイズしたり、複数の JSON パス式を定義して、式が実際の値を返すまで複数の場所から値を検索したりできます。

この機能を有効にするには、適切なデリゲートコンバーター (送信変換および非射影 インターフェースの変換に使用) で構成された ProjectingMessageConverter を使用します。spring-data:spring-data-commons および com.jayway.jsonpath:json-path もクラスパスに追加する必要があります。

@KafkaListener メソッドへのパラメーターとして使用される場合、インターフェース型は通常どおりコンバーターに自動的に渡されます。

ErrorHandlingDeserializer を使用する

デシリアライザーがメッセージのデシリアライズに失敗すると、poll() が戻る前に問題が発生するため、Spring は問題を処理できません。この問題を解決するために、ErrorHandlingDeserializer が導入されました。このデシリアライザーは、実際のデシリアライザー (キーまたは値) に委譲します。デリゲートがレコードコンテンツの逆直列化に失敗した場合、ErrorHandlingDeserializer は、原因と生のバイトを含むヘッダーに null 値と DeserializationException を返します。レコードレベルの MessageListener を使用する場合、ConsumerRecord にキーまたは値の DeserializationException ヘッダーが含まれている場合、コンテナーの ErrorHandler は失敗した ConsumerRecord で呼び出されます。レコードはリスナーに渡されません。

または、Function<FailedDeserializationInfo, T> である failedDeserializationFunction を提供することにより、カスタム値を作成するように ErrorHandlingDeserializer を構成できます。この関数は、通常の方法でリスナーに渡される T のインスタンスを作成するために呼び出されます。すべてのコンテキスト情報を含む型 FailedDeserializationInfo のオブジェクトが関数に提供されます。ヘッダーに DeserializationException (シリアライズされた Java オブジェクトとして) があります。詳細については、ErrorHandlingDeserializer の Javadoc を参照してください。

キーと値の Deserializer オブジェクトを受け取り、適切なデリゲートで構成した適切な ErrorHandlingDeserializer インスタンスに接続する DefaultKafkaConsumerFactory コンストラクターを使用できます。または、コンシューマー構成プロパティ ( ErrorHandlingDeserializer によって使用される) を使用してデリゲートをインスタンス化することもできます。プロパティ名は ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS および ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS です。プロパティ値には、クラスまたはクラス名を指定できます。次の例は、これらのプロパティを設定する方法を示しています。

... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);

次の例では、failedDeserializationFunction を使用しています。

public class BadThing extends Thing {

  private final FailedDeserializationInfo failedDeserializationInfo;

  public BadThing(FailedDeserializationInfo failedDeserializationInfo) {
    this.failedDeserializationInfo = failedDeserializationInfo;
  }

  public FailedDeserializationInfo getFailedDeserializationInfo() {
    return this.failedDeserializationInfo;
  }

}

public class FailedThingProvider implements Function<FailedDeserializationInfo, Thing> {

  @Override
  public Thing apply(FailedDeserializationInfo info) {
    return new BadThing(info);
  }

}

前の例では、次の構成を使用しています。

...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedThingProvider.class);
...
コンシューマーが ErrorHandlingDeserializer で構成されている場合は、通常のオブジェクトだけでなく、逆直列化例外から生じる生の byte[] 値も処理できるシリアライザーを使用して KafkaTemplate とそのプロデューサーを構成することが重要です。テンプレートの汎用値型は Object である必要があります。1 つの手法は、DelegatingByTypeSerializer を使用することです。以下に例を示します。
@Bean
public ProducerFactory<String, Object> producerFactory() {
  return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
    new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
          MyNormalObject.class, new JsonSerializer<Object>())));
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
  return new KafkaTemplate<>(producerFactory());
}

バッチリスナーで ErrorHandlingDeserializer を使用する場合は、メッセージヘッダーで逆直列化の例外を確認する必要があります。DefaultBatchErrorHandler とともに使用すると、そのヘッダーを使用して、例外が失敗したレコードを判別し、BatchListenerFailedException を介してエラーハンドラーと通信できます。

@KafkaListener(id = "test", topics = "test")
void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
    for (int i = 0; i < in.size(); i++) {
        Thing thing = in.get(i);
        if (thing == null
                && headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {
            try {
                DeserializationException deserEx = SerializationUtils.byteArrayToDeserializationException(this.logger,
                        headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
                if (deserEx != null) {
                    logger.error(deserEx, "Record at index " + i + " could not be deserialized");
                }
            }
            catch (Exception ex) {
                logger.error(ex, "Record at index " + i + " could not be deserialized");
            }
            throw new BatchListenerFailedException("Deserialization", deserEx, i);
        }
        process(thing);
    }
}

SerializationUtils.byteArrayToDeserializationException() は、ヘッダーを DeserializationException に変換するために使用できます。

List<ConsumerRecord<?, ?> を消費する場合、代わりに SerializationUtils.getExceptionFromHeader() が使用されます。

@KafkaListener(id = "kgh2036", topics = "kgh2036")
void listen(List<ConsumerRecord<String, Thing>> in) {
    for (int i = 0; i < in.size(); i++) {
        ConsumerRecord<String, Thing> rec = in.get(i);
        if (rec.value() == null) {
            DeserializationException deserEx = SerializationUtils.getExceptionFromHeader(rec,
                    SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
            if (deserEx != null) {
                logger.error(deserEx, "Record at offset " + rec.offset() + " could not be deserialized");
                throw new BatchListenerFailedException("Deserialization", deserEx, i);
            }
        }
        process(rec.value());
    }
}
DeadLetterPublishingRecoverer も使用している場合、DeserializationException 用に公開されたレコードには、型 byte[] の record.value() が含まれます。これは直列化しないでください。byte[] には ByteArraySerializer を使用するように構成された DelegatingByTypeSerializer を使用し、他のすべての型には通常のシリアライザー (Json、Avro など) を使用することを検討してください。

バージョン 3.1 以降では、ErrorHandlingDeserializer に Validator を追加できます。デリゲート Deserializer がオブジェクトを正常に逆直列化したが、そのオブジェクトが検証に失敗した場合は、逆直列化例外の発生に似た例外がスローされます。これにより、元の生のデータをエラーハンドラーに渡すことができます。デシリアライザーを自分で作成する場合は、setValidator を呼び出すだけです。プロパティを使用してシリアライザーを構成する場合は、コンシューマー構成プロパティ ErrorHandlingDeserializer.VALIDATOR_CLASS を Validator のクラスまたは完全修飾クラス名に設定します。Spring Boot を使用する場合、このプロパティ名は spring.kafka.consumer.properties.spring.deserializer.validator.class です。

バッチリスナーを使用したペイロード変換

バッチリスナーコンテナーファクトリを使用する場合、BatchMessagingMessageConverter 内で JsonMessageConverter を使用してバッチメッセージを変換することもできます。詳細については、直列化、逆直列化、メッセージ変換および Spring メッセージングメッセージ変換を参照してください。

デフォルトでは、変換の型はリスナー引数から推測されます。JsonMessageConverter を DefaultJackson2TypeMapper で構成し、その TypePrecedence が (デフォルトの INFERRED ではなく) TYPE_ID に設定されている場合、コンバーターはヘッダー (存在する場合) の型情報を代わりに使用します。これにより、たとえば、リスナーメソッドを具象クラスの代わりにインターフェースで宣言できます。また、型コンバーターはマッピングをサポートしているため、(データに互換性がある限り) ソースとは異なる型に逆直列化できます。これは、クラスレベルの @KafkaListener インスタンスを使用する場合にも役立ちます。この場合、呼び出すメソッドを決定するためにペイロードがすでに変換されている必要があります。次の例では、このメソッドを使用する Bean を作成します。

@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);
    factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter()));
    return factory;
}

@Bean
public JsonMessageConverter converter() {
    return new JsonMessageConverter();
}

これが機能するには、変換ターゲットのメソッドシグネチャーが、次のような単一のジェネリクスパラメーター型を持つコンテナーオブジェクトである必要があることに注意してください。

@KafkaListener(topics = "blc1")
public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
}

バッチヘッダーには引き続きアクセスできることに注意してください。

バッチコンバーターにそれをサポートするレコードコンバーターがある場合は、ペイロードがジェネリクス型に従って変換されたメッセージのリストを受け取ることもできます。次の例は、その方法を示しています。

@KafkaListener(topics = "blc3", groupId = "blc3")
public void listen(List<Message<Foo>> fooMessages) {
    ...
}

ConversionService のカスタマイズ

バージョン 2.1.1 以降、リスナーメソッドの呼び出しのパラメーターを解決するためにデフォルトの org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory によって使用される org.springframework.core.convert.ConversionService は、次のいずれかのインターフェースを実装するすべての Bean で提供されます。

  • org.springframework.core.convert.converter.Converter

  • org.springframework.core.convert.converter.GenericConverter

  • org.springframework.format.Formatter

これにより、ConsumerFactory および KafkaListenerContainerFactory のデフォルト構成を変更せずに、リスナーの逆直列化をさらにカスタマイズできます。

KafkaListenerConfigurer Bean を介して KafkaListenerEndpointRegistrar にカスタム MessageHandlerMethodFactory を設定すると、この機能が無効になります。

カスタム HandlerMethodArgumentResolver を @KafkaListener に追加する

バージョン 2.4.2 以降、独自の HandlerMethodArgumentResolver を追加し、カスタムメソッドパラメーターを解決できます。必要なのは KafkaListenerConfigurer を実装し、クラス KafkaListenerEndpointRegistrar のメソッド setCustomMethodArgumentResolvers() を使用することだけです。

@Configuration
class CustomKafkaConfig implements KafkaListenerConfigurer {

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
        registrar.setCustomMethodArgumentResolvers(
            new HandlerMethodArgumentResolver() {

                @Override
                public boolean supportsParameter(MethodParameter parameter) {
                    return CustomMethodArgument.class.isAssignableFrom(parameter.getParameterType());
                }

                @Override
                public Object resolveArgument(MethodParameter parameter, Message<?> message) {
                    return new CustomMethodArgument(
                        message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class)
                    );
                }
            }
        );
    }

}

カスタム MessageHandlerMethodFactory を KafkaListenerEndpointRegistrar Bean に追加することで、フレームワークの引数解決を完全に置き換えることもできます。これを行う場合、アプリケーションが nullvalue() (圧縮されたトピックなどから) を使用して tombstone レコードを処理する必要がある場合は、KafkaNullAwarePayloadArgumentResolver をファクトリに追加する必要があります。これは、すべての型をサポートし、@Payload アノテーションなしで引数と一致できるため、最後のリゾルバーである必要があります。DefaultMessageHandlerMethodFactory を使用している場合は、このリゾルバーを最後のカスタムリゾルバーとして設定します。ファクトリでは、このリゾルバーが KafkaNull ペイロードの知識を持たない標準 PayloadMethodArgumentResolver の前に使用されるようにします。