直列化、逆直列化、メッセージ変換
概要
Apache Kafka は、レコード値とそのキーを直列化およびデ直列化するための高レベル API を提供します。これは、いくつかの組み込み実装を備えた org.apache.kafka.common.serialization.Serializer<T>
および org.apache.kafka.common.serialization.Deserializer<T>
抽象化に存在します。一方、Producer
または Consumer
構成プロパティを使用して、シリアライザークラスとデシリアライザークラスを指定できます。次の例は、その方法を示しています。
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
より複雑なケースや特殊なケースでは、KafkaConsumer
(したがって KafkaProducer
) は、keys
および values
のそれぞれの Serializer
および Deserializer
インスタンスを受け入れるオーバーロードされたコンストラクターを提供します。
この API を使用する場合、DefaultKafkaProducerFactory
および DefaultKafkaConsumerFactory
は (コンストラクターまたは setter メソッドを介して) プロパティを提供し、カスタム Serializer
および Deserializer
インスタンスをターゲット Producer
または Consumer
に挿入します。また、コンストラクターを介して Supplier<Serializer>
または Supplier<Deserializer>
インスタンスを渡すこともできます。これらの Supplier
は、各 Producer
または Consumer
の作成時に呼び出されます。
文字列の直列化
バージョン 2.5 以降、Spring for Apache Kafka は、エンティティのストリング表現を使用する ToStringSerializer
および ParseStringDeserializer
クラスを提供します。それらは、メソッド toString
といくつかの Function<String>
または BiFunction<String, Headers>
に依存して、文字列を解析し、インスタンスのプロパティを設定します。通常、これは parse
などのクラスの静的メソッドを呼び出します。
ToStringSerializer<Thing> thingSerializer = new ToStringSerializer<>();
//...
ParseStringDeserializer<Thing> deserializer = new ParseStringDeserializer<>(Thing::parse);
デフォルトでは、ToStringSerializer
は、レコード Headers
内の直列化されたエンティティに関する型情報を伝えるように構成されています。これを無効にするには、addTypeInfo
プロパティを false
に設定します。この情報は受信側の ParseStringDeserializer
で利用できます。
ToStringSerializer.ADD_TYPE_INFO_HEADERS
(デフォルトtrue
): これをfalse
に設定して、ToStringSerializer
でこの機能を無効にすることができます (addTypeInfo
プロパティを設定します)。
ParseStringDeserializer<Object> deserializer = new ParseStringDeserializer<>((str, headers) -> {
byte[] header = headers.lastHeader(ToStringSerializer.VALUE_TYPE).value();
String entityType = new String(header);
if (entityType.contains("Thing")) {
return Thing.parse(str);
}
else {
// ...parsing logic
}
});
String
を byte[]
との間で変換するために使用される Charset
を構成できます。デフォルトは UTF-8
です。
ConsumerConfig
プロパティを使用して、パーサーメソッドの名前でデシリアライザーを構成できます。
ParseStringDeserializer.KEY_PARSER
ParseStringDeserializer.VALUE_PARSER
プロパティには、クラスの完全修飾名とそれに続くメソッド名をピリオド .
で区切って含める必要があります。メソッドは静的で、(String, Headers)
または (String)
のいずれかの署名を持つ必要があります。
Kafka ストリームで使用するための ToFromStringSerde
も提供されます。
JSON
Spring for Apache Kafka は、Jackson JSON オブジェクトマッパーに基づく JsonSerializer
および JsonDeserializer
実装も提供します。JsonSerializer
では、任意の Java オブジェクトを JSON byte[]
として書き込むことができます。JsonDeserializer
には、消費された byte[]
を適切なターゲットオブジェクトに逆直列化できるように、追加の Class<?> targetType
引数が必要です。次の例は、JsonDeserializer
を作成する方法を示しています。
JsonDeserializer<Thing> thingDeserializer = new JsonDeserializer<>(Thing.class);
JsonSerializer
と JsonDeserializer
の両方を ObjectMapper
でカスタマイズできます。継承して、configure(Map<String, ?> configs, boolean isKey)
メソッドで特定の構成ロジックを実装することもできます。
バージョン 2.3 以降、すべての JSON 対応コンポーネントはデフォルトで JacksonUtils.enhancedObjectMapper()
インスタンスで構成され、MapperFeature.DEFAULT_VIEW_INCLUSION
および DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES
機能が無効になっています。また、このようなインスタンスには、Java 時間や Kotlin サポートなどのカスタムデータ型用のよく知られたモジュールが付属しています。詳細については、JacksonUtils.enhancedObjectMapper()
JavaDocs を参照してください。このメソッドは、ネットワークを介したプラットフォーム間の互換性のために、org.springframework.util.MimeType
オブジェクトの直列化用の org.springframework.kafka.support.JacksonMimeTypeModule
をプレーン文字列に登録します。JacksonMimeTypeModule
は、アプリケーションコンテキストで Bean として登録でき、Spring Boot ObjectMapper
インスタンスに自動構成されます。
また、バージョン 2.3 以降、JsonDeserializer
は、ターゲットジェネリクスコンテナー型の処理を改善するための TypeReference
ベースのコンストラクターを提供します。
バージョン 2.1 以降、レコード Headers
で型情報を伝達できるようになり、複数の型を処理できるようになりました。さらに、次の Kafka プロパティを使用してシリアライザーとデシリアライザーを構成できます。KafkaConsumer
および KafkaProducer
にそれぞれ Serializer
および Deserializer
インスタンスを提供した場合、効果がありません。
プロパティの構成
JsonSerializer.ADD_TYPE_INFO_HEADERS
(デフォルトtrue
): これをfalse
に設定して、JsonSerializer
でこの機能を無効にすることができます (addTypeInfo
プロパティを設定します)。JsonSerializer.TYPE_MAPPINGS
(デフォルトempty
): マッピング型を参照してください。JsonDeserializer.USE_TYPE_INFO_HEADERS
(デフォルトtrue
): シリアライザーによって設定されたヘッダーを無視するには、これをfalse
に設定できます。JsonDeserializer.REMOVE_TYPE_INFO_HEADERS
(デフォルトtrue
): これをfalse
に設定すると、シリアライザーによって設定されたヘッダーを保持できます。JsonDeserializer.KEY_DEFAULT_TYPE
: ヘッダー情報が存在しない場合のキーの逆直列化のフォールバック型。JsonDeserializer.VALUE_DEFAULT_TYPE
: ヘッダー情報が存在しない場合の値の逆直列化のフォールバック型。JsonDeserializer.TRUSTED_PACKAGES
(デフォルトはjava.util
、java.lang
): 逆直列化が許可されるパッケージパターンのカンマ区切りのリスト。*
はすべてを逆直列化することを意味します。JsonDeserializer.TYPE_MAPPINGS
(デフォルトempty
): マッピング型を参照してください。JsonDeserializer.KEY_TYPE_METHOD
(デフォルトempty
): メソッドを使用して型を決定するを参照してください。JsonDeserializer.VALUE_TYPE_METHOD
(デフォルトempty
): メソッドを使用して型を決定するを参照してください。
バージョン 2.2 以降、型情報ヘッダー(シリアライザーによって追加された場合)はデシリアライザーによって削除されます。removeTypeHeaders
プロパティを false
に設定することにより、デシリアライザで直接、前述の構成プロパティを使用して、以前の動作に戻すことができます。
バージョン 2.8 以降、プログラム構築に示すようにプログラムでシリアライザーまたはデシリアライザーを構築する場合、プロパティを明示的に設定していない限り(set*() メソッドまたは流れるような API を使用)、上記のプロパティがファクトリによって適用されます。以前は、プログラムで作成するときに、構成プロパティが適用されることはありませんでした。これは、オブジェクトにプロパティを直接明示的に設定した場合にも当てはまります。 |
マッピング型
バージョン 2.2 以降、JSON を使用する場合、前述のリストのプロパティを使用して型 マッピングを提供できるようになりました。以前は、シリアライザーとデシリアライザー内の型マッパーをカスタマイズする必要がありました。マッピングは、token:className
ペアのコンマ区切りリストで構成されます。送信では、ペイロードのクラス名が対応するトークンにマッピングされます。受信では、型 ヘッダーのトークンが対応するクラス名にマップされます。
次の例では、一連のマッピングを作成します。
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.mycat.Cat, hat:com.myhat.Hat");
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeserializer.TYPE_MAPPINGS, "cat:com.yourcat.Cat, hat:com.yourhat.Hat");
対応するオブジェクトには互換性が必要です。 |
Spring Boot を使用する場合、これらのプロパティを application.properties
(または yaml) ファイルで提供できます。次の例は、その方法を示しています。
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.type.mapping=cat:com.mycat.Cat,hat:com.myhat.Hat
プロパティを使用した簡単な構成のみを実行できます。より高度な構成 (シリアライザーとデシリアライザーでカスタム
これらのコンストラクターを使用する代わりに、Setter も提供されています。 |
Spring Boot を使用して、上記のように ConsumerFactory と ProducerFactory をオーバーライドする場合、ワイルドカードジェネリクス型を Bean メソッドの戻り値の型とともに使用する必要があります。代わりに具体的なジェネリクス型が指定されている場合、Spring Boot はこれらの Bean を無視し、デフォルトの Bean を使用します。 |
バージョン 2.2 以降では、ブール値 useHeadersIfPresent
引数 (デフォルトでは true
) を持つオーバーロードされたコンストラクターの 1 つを使用して、提供されたターゲット型を使用し、ヘッダー内の型情報を無視するようにデシリアライザーを明示的に構成できます。次の例は、その方法を示しています。
DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));
メソッドを使用して型を決定する
バージョン 2.5 以降、プロパティを介してデシリアライザーを構成し、ターゲット型を決定するメソッドを呼び出すことができるようになりました。存在する場合、これは上で議論された他のテクニックのいずれかを上書きします。これは、データが Spring シリアライザーを使用しないアプリケーションによって公開され、データまたは他のヘッダーに応じて異なる型に逆直列化する必要がある場合に役立ちます。これらのプロパティをメソッド名に設定します。完全修飾クラス名の後にメソッド名が続き、ピリオド .
で区切られます。メソッドは public static
として宣言する必要があり、3 つの署名 (String topic, byte[] data, Headers headers)
、(byte[] data, Headers headers)
または (byte[] data)
のいずれかを持ち、Jackson JavaType
を返します。
JsonDeserializer.KEY_TYPE_METHOD
:spring.json.key.type.method
JsonDeserializer.VALUE_TYPE_METHOD
:spring.json.value.type.method
任意のヘッダーを使用するか、データをインスペクションして型を判別できます。
JavaType thing1Type = TypeFactory.defaultInstance().constructType(Thing1.class);
JavaType thing2Type = TypeFactory.defaultInstance().constructType(Thing2.class);
public static JavaType thingOneOrThingTwo(byte[] data, Headers headers) {
// {"thisIsAFieldInThing1":"value", ...
if (data[21] == '1') {
return thing1Type;
}
else {
return thing2Type;
}
}
より高度なデータの場合、インスペクション は JsonPath
などの使用を検討しますが、型を判別するためのテストが単純なほど、プロセスはより効率的になります。
以下は、デシリアライザーをプログラムで作成する例です (コンストラクターでコンシューマーファクトリにデシリアライザーを提供する場合)。
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeResolver(SomeClass::thing1Thing2JavaTypeForTopic);
...
public static JavaType thing1Thing2JavaTypeForTopic(String topic, byte[] data, Headers headers) {
...
}
プログラム構築
プロデューサー / コンシューマーファクトリで使用するためにシリアライザー / デシリアライザーをプログラムで構築する場合、バージョン 2.3 以降、流れるような API を使用できるため、構成が簡単になります。
@Bean
public ProducerFactory<MyKeyType, MyValueType> pf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaProducerFactory<MyKeyType, MyValueType> pf = new DefaultKafkaProducerFactory<>(props,
new JsonSerializer<MyKeyType>()
.forKeys()
.noTypeInfo(),
new JsonSerializer<MyValueType>()
.noTypeInfo());
return pf;
}
@Bean
public ConsumerFactory<MyKeyType, MyValueType> cf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaConsumerFactory<MyKeyType, MyValueType> cf = new DefaultKafkaConsumerFactory<>(props,
new JsonDeserializer<>(MyKeyType.class)
.forKeys()
.ignoreTypeHeaders(),
new JsonDeserializer<>(MyValueType.class)
.ignoreTypeHeaders());
return cf;
}
メソッドを使用して型を決定すると同様に、プログラムで型マッピングを提供するには、typeFunction
プロパティを使用します。
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeFunction(MyUtils::thingOneOrThingTwo);
または、流れるような API を使用してプロパティを構成したり、set*()
メソッドを使用して設定したりしない限り、ファクトリは構成プロパティを使用してシリアライザー / デシリアライザーを構成します。プロパティの構成を参照してください。
シリアライザーとデシリアライザーの委譲
ヘッダーの使用
バージョン 2.3 は DelegatingSerializer
および DelegatingDeserializer
を導入しました。これにより、さまざまなキーおよび / または値型でレコードを生成および使用できます。プロデューサーは、ヘッダー DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR
を、値に使用するシリアライザーとキーに DelegatingSerializer.KEY_SERIALIZATION_SELECTOR
を選択するために使用されるセレクター値に設定する必要があります。一致するものが見つからない場合は、IllegalStateException
がスローされます。
受信レコードの場合、デシリアライザーは同じヘッダーを使用して、使用するデシリアライザーを選択します。一致するものが見つからないか、ヘッダーが存在しない場合は、生の byte[]
が返されます。
コンストラクターを介してセレクターのマップを Serializer
/ Deserializer
に構成するか、キー DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG
および DelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIG
を使用して Kafka プロデューサー / コンシューマープロパティを介して構成できます。シリアライザーの場合、プロデューサープロパティは、キーがセレクターで値が Serializer
インスタンス、シリアライザー Class
、またはクラス名である Map<String, Object>
にすることができます。プロパティは、次に示すように、カンマ区切りのマップエントリの文字列にすることもできます。
デシリアライザーの場合、コンシューマープロパティは、キーがセレクターで値が Deserializer
インスタンス、デシリアライザー Class
、またはクラス名である Map<String, Object>
にすることができます。プロパティは、次に示すように、カンマ区切りのマップエントリの文字列にすることもできます。
プロパティを使用して構成するには、次の構文を使用します。
producerProps.put(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Serializer, thing2:com.example.MyThing2Serializer")
consumerProps.put(DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Deserializer, thing2:com.example.MyThing2Deserializer")
その後、プロデューサーは DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR
ヘッダーを thing1
または thing2
に設定します。
この手法は、異なる型を同じトピック (または異なるトピック) に送信することをサポートします。
バージョン 2.5.1 以降、型 (キーまたは値) が Serdes でサポートされている標準型 (Long 、Integer など) のいずれかである場合、セレクターヘッダーを設定する必要はありません。代わりに、シリアライザーはヘッダーを型のクラス名に設定します。これらの型のシリアライザーまたはデシリアライザーを構成する必要はありません。動的に (1 回だけ) 作成されます。 |
異なる型を異なるトピックに送信する別の手法については、RoutingKafkaTemplate
を使用するを参照してください。
タイプ順
バージョン 2.8 は DelegatingByTypeSerializer
を導入しました。
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
null, new DelegatingByTypeSerializer(Map.of(
byte[].class, new ByteArraySerializer(),
Bytes.class, new BytesSerializer(),
String.class, new StringSerializer())));
}
バージョン 2.8.3 以降では、マップキーがターゲットオブジェクトから割り当て可能かどうかを確認するようにシリアライザーを構成できます。これは、デリゲートシリアライザーがサブクラスをシリアライズできる場合に役立ちます。この場合、不一致の一致がある場合は、LinkedHashMap
などの順序付けられた Map
を提供する必要があります。
トピック別
バージョン 2.8 以降、DelegatingByTopicSerializer
および DelegatingByTopicDeserializer
では、トピック名に基づいてシリアライザー / デシリアライザーを選択できます。正規表現 Pattern
は、使用するインスタンスを検索するために使用されます。マップは、コンストラクターを使用して、またはプロパティ(pattern:serializer
のコンマ区切りリスト)を介して構成できます。
producerConfigs.put(DelegatingByTopicSerializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArraySerializer.class.getName()
+ ", topic[5-9]:" + StringSerializer.class.getName());
...
ConsumerConfigs.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArrayDeserializer.class.getName()
+ ", topic[5-9]:" + StringDeserializer.class.getName());
これをキーに使用する場合は、KEY_SERIALIZATION_TOPIC_CONFIG
を使用してください。
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
new IntegerSerializer(),
new DelegatingByTopicSerializer(Map.of(
Pattern.compile("topic[0-4]"), new ByteArraySerializer(),
Pattern.compile("topic[5-9]"), new StringSerializer())),
new JsonSerializer<Object>()); // default
}
DelegatingByTopicSerialization.KEY_SERIALIZATION_TOPIC_DEFAULT
および DelegatingByTopicSerialization.VALUE_SERIALIZATION_TOPIC_DEFAULT
を使用して、パターン一致がない場合に使用するデフォルトのシリアライザー / デシリアライザーを指定できます。
追加のプロパティ DelegatingByTopicSerialization.CASE_SENSITIVE
(デフォルトは true
)を false
に設定すると、トピックルックアップで大文字と小文字が区別されなくなります。
デシリアライザーの再試行
RetryingDeserializer
は、デリゲートの直列化解除中にネットワークの課題などの一時的なエラーが発生した可能性がある場合に、デリゲート Deserializer
および RetryTemplate
を使用して直列化解除を再試行します。
ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
new RetryingDeserializer(myUnreliableKeyDeserializer, retryTemplate),
new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));
バージョン 3.1.2
以降、オプションで RecoveryCallback
を RetryingDeserializer
に設定できます。
再試行ポリシー、バックオフポリシーなどを使用した RetryTemplate
の構成については、spring-retry [GitHub] (英語) プロジェクトを参照してください。
Spring メッセージングメッセージ変換
Serializer
および Deserializer
API は、低レベルの Kafka Consumer
および Producer
の観点からは非常にシンプルで柔軟ですが、@KafkaListener
または Spring Integration の Apache Kafka サポートを使用する場合は、Spring メッセージングレベルでより柔軟性が必要になる場合があります。org.springframework.messaging.Message
との間で簡単に変換できるようにするために、Spring for Apache Kafka は MessagingMessageConverter
実装とその JsonMessageConverter
(およびサブクラス) のカスタマイズで MessageConverter
抽象化を提供します。@KafkaListener.containerFactory()
プロパティの AbstractKafkaListenerContainerFactory
Bean 定義を使用して、MessageConverter
を KafkaTemplate
インスタンスに直接挿入できます。次の例は、その方法を示しています。
@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordMessageConverter(new JsonMessageConverter());
return factory;
}
...
@KafkaListener(topics = "jsonData",
containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(Cat cat) {
...
}
Spring Boot を使用する場合は、コンバーターを @Bean
として定義するだけで、Spring Boot の自動構成によって自動構成されたテンプレートとコンテナーファクトリに接続されます。
@KafkaListener
を使用する場合、パラメーター型がメッセージコンバーターに提供され、変換を支援します。
この型推論は、 |
コンシューマー側では、 プロデューサー側では、Spring Integration または
ここでも、 便宜上、バージョン 2.3 以降、フレームワークは 3 つの値型すべてを直列化できる |
バージョン 2.7.1 以降、メッセージペイロードの変換を spring-messaging
SmartMessageConverter
に委譲できます。これにより、たとえば、MessageHeaders.CONTENT_TYPE
ヘッダーに基づく変換が可能になります。
KafkaMessageConverter.fromMessage() メソッドは、ProducerRecord.value() プロパティ内のメッセージペイロードを使用して ProducerRecord に送信変換するために呼び出されます。KafkaMessageConverter.toMessage() メソッドは、ペイロードが ConsumerRecord.value() プロパティである ConsumerRecord からの受信変換のために呼び出されます。SmartMessageConverter.toMessage() メソッドは、fromMessage() (通常は KafkaTemplate.send(Message<?> msg) によって) に渡される Message から新しい送信 Message<?> を作成するために呼び出されます。同様に、KafkaMessageConverter.toMessage() メソッドでは、コンバーターが ConsumerRecord から新しい Message<?> を作成した後、SmartMessageConverter.fromMessage() メソッドが呼び出され、新しく変換されたペイロードを使用して最終受信メッセージが作成されます。いずれの場合も、SmartMessageConverter が null を返す場合は、元のメッセージが使用されます。 |
デフォルトのコンバーターが KafkaTemplate
およびリスナーコンテナーファクトリで使用される場合、テンプレートで setMessagingConverter()
を呼び出し、@KafkaListener
メソッドの contentTypeConverter
プロパティを介して SmartMessageConverter
を構成します。
例:
template.setMessagingConverter(mySmartConverter);
@KafkaListener(id = "withSmartConverter", topics = "someTopic",
contentTypeConverter = "mySmartConverter")
public void smart(Thing thing) {
...
}
Spring Data 射影 インターフェースの使用
バージョン 2.1.1 以降、JSON を具象型の代わりに Spring Data Projection インターフェースに変換できます。これにより、JSON ドキュメント内の複数の場所からの値のルックアップなど、データへの非常に選択的で低結合のバインドが可能になります。たとえば、次のインターフェースをメッセージペイロード型として定義できます。
interface SomeSample {
@JsonPath({ "$.username", "$.user.name" })
String getUsername();
}
@KafkaListener(id="projection.listener", topics = "projection")
public void projection(SomeSample in) {
String username = in.getUsername();
...
}
デフォルトでは、アクセサーメソッドを使用して、受信した JSON ドキュメント内のフィールドとしてプロパティ名を検索します。@JsonPath
式を使用すると、値の検索をカスタマイズしたり、複数の JSON パス式を定義して、式が実際の値を返すまで複数の場所から値を検索したりできます。
この機能を有効にするには、適切なデリゲートコンバーター (送信変換および非射影 インターフェースの変換に使用) で構成された ProjectingMessageConverter
を使用します。spring-data:spring-data-commons
および com.jayway.jsonpath:json-path
もクラスパスに追加する必要があります。
@KafkaListener
メソッドへのパラメーターとして使用される場合、インターフェース型は通常どおりコンバーターに自動的に渡されます。
ErrorHandlingDeserializer
を使用する
デシリアライザーがメッセージのデシリアライズに失敗すると、poll()
が戻る前に問題が発生するため、Spring は問題を処理できません。この問題を解決するために、ErrorHandlingDeserializer
が導入されました。このデシリアライザーは、実際のデシリアライザー (キーまたは値) に委譲します。デリゲートがレコードコンテンツの逆直列化に失敗した場合、ErrorHandlingDeserializer
は、原因と生のバイトを含むヘッダーに null
値と DeserializationException
を返します。レコードレベルの MessageListener
を使用する場合、ConsumerRecord
にキーまたは値の DeserializationException
ヘッダーが含まれている場合、コンテナーの ErrorHandler
は失敗した ConsumerRecord
で呼び出されます。レコードはリスナーに渡されません。
または、Function<FailedDeserializationInfo, T>
である failedDeserializationFunction
を提供することにより、カスタム値を作成するように ErrorHandlingDeserializer
を構成できます。この関数は、通常の方法でリスナーに渡される T
のインスタンスを作成するために呼び出されます。すべてのコンテキスト情報を含む型 FailedDeserializationInfo
のオブジェクトが関数に提供されます。ヘッダーに DeserializationException
(シリアライズされた Java オブジェクトとして) があります。詳細については、ErrorHandlingDeserializer
の Javadoc を参照してください。
キーと値の Deserializer
オブジェクトを受け取り、適切なデリゲートで構成した適切な ErrorHandlingDeserializer
インスタンスに接続する DefaultKafkaConsumerFactory
コンストラクターを使用できます。または、コンシューマー構成プロパティ ( ErrorHandlingDeserializer
によって使用される) を使用してデリゲートをインスタンス化することもできます。プロパティ名は ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS
および ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS
です。プロパティ値には、クラスまたはクラス名を指定できます。次の例は、これらのプロパティを設定する方法を示しています。
... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);
次の例では、failedDeserializationFunction
を使用しています。
public class BadThing extends Thing {
private final FailedDeserializationInfo failedDeserializationInfo;
public BadThing(FailedDeserializationInfo failedDeserializationInfo) {
this.failedDeserializationInfo = failedDeserializationInfo;
}
public FailedDeserializationInfo getFailedDeserializationInfo() {
return this.failedDeserializationInfo;
}
}
public class FailedThingProvider implements Function<FailedDeserializationInfo, Thing> {
@Override
public Thing apply(FailedDeserializationInfo info) {
return new BadThing(info);
}
}
前の例では、次の構成を使用しています。
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedThingProvider.class);
...
コンシューマーが ErrorHandlingDeserializer で構成されている場合は、通常のオブジェクトだけでなく、逆直列化例外から生じる生の byte[] 値も処理できるシリアライザーを使用して KafkaTemplate とそのプロデューサーを構成することが重要です。テンプレートの汎用値型は Object である必要があります。1 つの手法は、DelegatingByTypeSerializer を使用することです。以下に例を示します。 |
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JsonSerializer<Object>())));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
バッチリスナーで ErrorHandlingDeserializer
を使用する場合は、メッセージヘッダーで逆直列化の例外を確認する必要があります。DefaultBatchErrorHandler
とともに使用すると、そのヘッダーを使用して、例外が失敗したレコードを判別し、BatchListenerFailedException
を介してエラーハンドラーと通信できます。
@KafkaListener(id = "test", topics = "test")
void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
for (int i = 0; i < in.size(); i++) {
Thing thing = in.get(i);
if (thing == null
&& headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {
try {
DeserializationException deserEx = SerializationUtils.byteArrayToDeserializationException(this.logger,
headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
if (deserEx != null) {
logger.error(deserEx, "Record at index " + i + " could not be deserialized");
}
}
catch (Exception ex) {
logger.error(ex, "Record at index " + i + " could not be deserialized");
}
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
process(thing);
}
}
SerializationUtils.byteArrayToDeserializationException()
は、ヘッダーを DeserializationException
に変換するために使用できます。
List<ConsumerRecord<?, ?>
を消費する場合、代わりに SerializationUtils.getExceptionFromHeader()
が使用されます。
@KafkaListener(id = "kgh2036", topics = "kgh2036")
void listen(List<ConsumerRecord<String, Thing>> in) {
for (int i = 0; i < in.size(); i++) {
ConsumerRecord<String, Thing> rec = in.get(i);
if (rec.value() == null) {
DeserializationException deserEx = SerializationUtils.getExceptionFromHeader(rec,
SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
if (deserEx != null) {
logger.error(deserEx, "Record at offset " + rec.offset() + " could not be deserialized");
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
}
process(rec.value());
}
}
DeadLetterPublishingRecoverer も使用している場合、DeserializationException 用に公開されたレコードには、型 byte[] の record.value() が含まれます。これは直列化しないでください。byte[] には ByteArraySerializer を使用するように構成された DelegatingByTypeSerializer を使用し、他のすべての型には通常のシリアライザー (Json、Avro など) を使用することを検討してください。 |
バージョン 3.1 以降では、ErrorHandlingDeserializer
に Validator
を追加できます。デリゲート Deserializer
がオブジェクトを正常に逆直列化したが、そのオブジェクトが検証に失敗した場合は、逆直列化例外の発生に似た例外がスローされます。これにより、元の生のデータをエラーハンドラーに渡すことができます。デシリアライザーを自分で作成する場合は、setValidator
を呼び出すだけです。プロパティを使用してシリアライザーを構成する場合は、コンシューマー構成プロパティ ErrorHandlingDeserializer.VALIDATOR_CLASS
を Validator
のクラスまたは完全修飾クラス名に設定します。Spring Boot を使用する場合、このプロパティ名は spring.kafka.consumer.properties.spring.deserializer.validator.class
です。
バッチリスナーを使用したペイロード変換
バッチリスナーコンテナーファクトリを使用する場合、BatchMessagingMessageConverter
内で JsonMessageConverter
を使用してバッチメッセージを変換することもできます。詳細については、直列化、逆直列化、メッセージ変換および Spring メッセージングメッセージ変換を参照してください。
デフォルトでは、変換の型はリスナー引数から推測されます。JsonMessageConverter
を DefaultJackson2TypeMapper
で構成し、その TypePrecedence
が (デフォルトの INFERRED
ではなく) TYPE_ID
に設定されている場合、コンバーターはヘッダー (存在する場合) の型情報を代わりに使用します。これにより、たとえば、リスナーメソッドを具象クラスの代わりにインターフェースで宣言できます。また、型コンバーターはマッピングをサポートしているため、(データに互換性がある限り) ソースとは異なる型に逆直列化できます。これは、クラスレベルの @KafkaListener
インスタンスを使用する場合にも役立ちます。この場合、呼び出すメソッドを決定するためにペイロードがすでに変換されている必要があります。次の例では、このメソッドを使用する Bean を作成します。
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter()));
return factory;
}
@Bean
public JsonMessageConverter converter() {
return new JsonMessageConverter();
}
これが機能するには、変換ターゲットのメソッドシグネチャーが、次のような単一のジェネリクスパラメーター型を持つコンテナーオブジェクトである必要があることに注意してください。
@KafkaListener(topics = "blc1")
public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
バッチヘッダーには引き続きアクセスできることに注意してください。
バッチコンバーターにそれをサポートするレコードコンバーターがある場合は、ペイロードがジェネリクス型に従って変換されたメッセージのリストを受け取ることもできます。次の例は、その方法を示しています。
@KafkaListener(topics = "blc3", groupId = "blc3")
public void listen(List<Message<Foo>> fooMessages) {
...
}
ConversionService
のカスタマイズ
バージョン 2.1.1 以降、リスナーメソッドの呼び出しのパラメーターを解決するためにデフォルトの org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory
によって使用される org.springframework.core.convert.ConversionService
は、次のいずれかのインターフェースを実装するすべての Bean で提供されます。
org.springframework.core.convert.converter.Converter
org.springframework.core.convert.converter.GenericConverter
org.springframework.format.Formatter
これにより、ConsumerFactory
および KafkaListenerContainerFactory
のデフォルト構成を変更せずに、リスナーの逆直列化をさらにカスタマイズできます。
KafkaListenerConfigurer Bean を介して KafkaListenerEndpointRegistrar にカスタム MessageHandlerMethodFactory を設定すると、この機能が無効になります。 |
カスタム HandlerMethodArgumentResolver
を @KafkaListener
に追加する
バージョン 2.4.2 以降、独自の HandlerMethodArgumentResolver
を追加し、カスタムメソッドパラメーターを解決できます。必要なのは KafkaListenerConfigurer
を実装し、クラス KafkaListenerEndpointRegistrar
のメソッド setCustomMethodArgumentResolvers()
を使用することだけです。
@Configuration
class CustomKafkaConfig implements KafkaListenerConfigurer {
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setCustomMethodArgumentResolvers(
new HandlerMethodArgumentResolver() {
@Override
public boolean supportsParameter(MethodParameter parameter) {
return CustomMethodArgument.class.isAssignableFrom(parameter.getParameterType());
}
@Override
public Object resolveArgument(MethodParameter parameter, Message<?> message) {
return new CustomMethodArgument(
message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class)
);
}
}
);
}
}
カスタム MessageHandlerMethodFactory
を KafkaListenerEndpointRegistrar
Bean に追加することで、フレームワークの引数解決を完全に置き換えることもできます。これを行う場合、アプリケーションが null
value()
(圧縮されたトピックなどから) を使用して tombstone レコードを処理する必要がある場合は、KafkaNullAwarePayloadArgumentResolver
をファクトリに追加する必要があります。これは、すべての型をサポートし、@Payload
アノテーションなしで引数と一致できるため、最後のリゾルバーである必要があります。DefaultMessageHandlerMethodFactory
を使用している場合は、このリゾルバーを最後のカスタムリゾルバーとして設定します。ファクトリでは、このリゾルバーが KafkaNull
ペイロードの知識を持たない標準 PayloadMethodArgumentResolver
の前に使用されるようにします。
NULL ペイロードと Tombstone
レコードのログ圧縮も参照してください。