Apache Kafka ストリームのサポート
バージョン 1.1.4 以降、Spring for Apache Kafka は Kafka ストリーム [Apache] (英語) のファーストクラスのサポートを提供します。Spring アプリケーションから使用するには、kafka-streams
jar がクラスパスに存在する必要があります。これは Spring for Apache Kafka プロジェクトのオプションの依存関係であり、推移的にダウンロードされることはありません。
基本
リファレンス Apache Kafka Streams ドキュメントでは、API を使用する次の方法が提案されています。
// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.
StreamsBuilder builder = ...; // when using the Kafka Streams DSL
// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;
KafkaStreams streams = new KafkaStreams(builder, config);
// Start the Kafka Streams instance
streams.start();
// Stop the Kafka Streams instance
streams.close();
2 つの主要なコンポーネントがあります。
StreamsBuilder
:KStream
(またはKTable
) インスタンスを構築するための API を使用します。KafkaStreams
: それらのインスタンスのライフサイクルを管理するため。
単一の StreamsBuilder によって KafkaStreams インスタンスに公開されるすべての KStream インスタンスは、ロジックが異なっていても、同時に開始および停止されます。つまり、StreamsBuilder によって定義されるすべてのストリームは、単一のライフサイクルコントロールに関連付けられます。KafkaStreams インスタンスが streams.close() によって閉じられると、再起動することはできません。代わりに、ストリーム処理を再開するための新しい KafkaStreams インスタンスを作成する必要があります。 |
Spring 管理
Spring アプリケーションコンテキストの観点から Kafka ストリームの使用を簡素化し、コンテナーを介したライフサイクル管理を使用するために、Spring for Apache Kafka には StreamsBuilderFactoryBean
が導入されています。これは、StreamsBuilder
シングルトンインスタンスを Bean として公開する AbstractFactoryBean
実装です。次の例では、そのような Bean を作成します。
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
バージョン 2.2 以降、ストリーム設定は StreamsConfig ではなく KafkaStreamsConfiguration オブジェクトとして提供されるようになりました。 |
StreamsBuilderFactoryBean
は、内部 KafkaStreams
インスタンスのライフサイクルを管理するために SmartLifecycle
も実装します。Kafka ストリーム API と同様に、KafkaStreams
を開始する前に KStream
インスタンスを定義する必要があります。これは、Kafka ストリームの Spring API にも当てはまります。StreamsBuilderFactoryBean
でデフォルトの autoStartup = true
を使用する場合、アプリケーションコンテキストがリフレッシュされる前に StreamsBuilder
で KStream
インスタンスを宣言する必要があります。例: KStream
は通常の Bean 定義にすることができますが、Kafka ストリーム API は影響なしに使用されます。次の例は、その方法を示しています。
@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
// Fluent KStream API
return stream;
}
ライフサイクルを手動で制御する場合 (たとえば、何らかの条件による停止と開始)、ファクトリ Bean (&
) プレフィックスを使用して、StreamsBuilderFactoryBean
Bean を直接参照できます。StreamsBuilderFactoryBean
は内部 KafkaStreams
インスタンスを使用するため、安全に停止して再起動できます。新しい KafkaStreams
が各 start()
上に作成されます。KStream
インスタンスのライフサイクルを個別に制御したい場合は、別の StreamsBuilderFactoryBean
インスタンスの使用を検討することもできます。
StreamsBuilderFactoryBean
で KafkaStreams.StateListener
、Thread.UncaughtExceptionHandler
、StateRestoreListener
オプションを指定することもできます。これは、内部 KafkaStreams
インスタンスに委譲されます。また、これらのオプションを StreamsBuilderFactoryBean
で間接的に設定する以外に、バージョン 2.1.5 以降、KafkaStreamsCustomizer
コールバックインターフェースを使用して内部 KafkaStreams
インスタンスを構成できます。KafkaStreamsCustomizer
は、StreamsBuilderFactoryBean
によって提供されるオプションをオーバーライドすることに注意してください。一部の KafkaStreams
操作を直接実行する必要がある場合は、StreamsBuilderFactoryBean.getKafkaStreams()
を使用してその内部 KafkaStreams
インスタンスにアクセスできます。StreamsBuilderFactoryBean
Bean を型別にオートワイヤーできますが、次の例に示すように、Bean 定義で完全な型を使用する必要があります。
@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
または、インターフェース Bean 定義を使用する場合は、名前で @Qualifier
をインジェクション用に追加できます。次の例は、その方法を示しています。
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
バージョン 2.4.1 以降、ファクトリ Bean には型 KafkaStreamsInfrastructureCustomizer
の新しいプロパティ infrastructureCustomizer
があります。これにより、ストリームが作成される前に、StreamsBuilder
(状態ストアを追加するなど) および / または Topology
のカスタマイズが可能になります。
public interface KafkaStreamsInfrastructureCustomizer {
void configureBuilder(StreamsBuilder builder);
void configureTopology(Topology topology);
}
デフォルトの no-op 実装は、どちらかが必要でない場合に両方のメソッドを実装する必要がないように提供されています。
複数のカスタマイザを適用する必要がある場合のために、CompositeKafkaStreamsInfrastructureCustomizer
が提供されます。
KafkaStreams Micrometer サポート
バージョン 2.5.3 で導入され、ファクトリ Bean によって管理される KafkaStreams
オブジェクトの micrometer メーターを自動的に登録するように KafkaStreamsMicrometerListener
を構成できます。
streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
JSON の直列化と逆直列化をストリーミングします
JSON 形式でトピックまたは状態ストアを読み書きするときにデータをシリアライズおよびデシリアライズするために、Spring for Apache Kafka は、JSON を使用する JsonSerde
実装を提供し、直列化、逆直列化、メッセージ変換で説明されている JsonSerializer
および JsonDeserializer
に委譲します。JsonSerde
実装は、コンストラクター (ターゲット型または ObjectMapper
) を通じて同じ構成オプションを提供します。次の例では、JsonSerde
を使用して Kafka ストリームの Cat
ペイロードをシリアライズおよびデシリアライズします (インスタンスが必要な場合はいつでも JsonSerde
を同様の方法で使用できます)。
stream.through(Serdes.Integer(), new JsonSerde<>(Cat.class), "cats");
プロデューサー / コンシューマーファクトリで使用するためにシリアライザー / デシリアライザーをプログラムで構築する場合、バージョン 2.3 以降、流れるような API を使用できるため、構成が簡単になります。
stream.through(
new JsonSerde<>(MyKeyType.class)
.forKeys()
.noTypeInfo(),
new JsonSerde<>(MyValueType.class)
.noTypeInfo(),
"myTypes");
KafkaStreamBrancher
を使用する
KafkaStreamBrancher
クラスは、KStream
の上に条件付き ブランチ を構築するためのより便利な方法を導入します。
KafkaStreamBrancher
を使用しない次の例を考えてみましょう。
KStream<String, String>[] branches = builder.stream("source").branch(
(key, value) -> value.contains("A"),
(key, value) -> value.contains("B"),
(key, value) -> true
);
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");
次の例では、KafkaStreamBrancher
を使用しています。
new KafkaStreamBrancher<String, String>()
.branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
.branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
//default branch should not necessarily be defined in the end of the chain!
.defaultBranch(ks -> ks.to("C"))
.onTopOf(builder.stream("source"));
//onTopOf method returns the provided stream so we can continue with method chaining
構成
Kafka ストリーム環境を構成するには、StreamsBuilderFactoryBean
に KafkaStreamsConfiguration
インスタンスが必要です。すべての可能なオプションについては、Apache Kafka のドキュメント [Apache] (英語) を参照してください。
バージョン 2.2 以降、ストリーム設定は StreamsConfig ではなく KafkaStreamsConfiguration オブジェクトとして提供されるようになりました。 |
特にマイクロサービスを開発する場合、ほとんどの場合、定型コードを回避するために、Spring for Apache Kafka は @EnableKafkaStreams
アノテーションを提供します。これは、@Configuration
クラスに配置する必要があります。必要なのは、defaultKafkaStreamsConfig
という名前の KafkaStreamsConfiguration
Bean を宣言することだけです。defaultKafkaStreamsBuilder
という名前の StreamsBuilderFactoryBean
Bean は、アプリケーションコンテキストで自動的に宣言されます。追加の StreamsBuilderFactoryBean
Bean を宣言して使用することもできます。StreamsBuilderFactoryBeanConfigurer
を実装する Bean を提供することにより、その Bean の追加のカスタマイズを実行できます。そのような Bean が複数ある場合、Ordered.order
プロパティに従って適用されます。
デフォルトでは、ファクトリ Bean が停止すると、KafkaStreams.cleanUp()
メソッドが呼び出されます。バージョン 2.1.2 以降、ファクトリ Bean には追加のコンストラクターがあり、start()
または stop()
中に cleanUp()
メソッドを呼び出すかどうかを制御できるプロパティを持つ CleanupConfig
オブジェクトを取得します。バージョン 2.7 以降、デフォルトではローカル状態をクリーンアップしません。
ヘッダーエンリッチャー
バージョン 3.0 は ContextualProcessor
の HeaderEnricherProcessor
拡張を追加しました。非推奨の Transformer
インターフェースを実装した非推奨の HeaderEnricher
と同じ機能を提供します。これは、ストリーム処理内でヘッダーを追加するために使用できます。ヘッダー値は SpEL 式です。式評価のルートオブジェクトには 3 つのプロパティがあります。
record
-org.apache.kafka.streams.processor.api.Record
(key
、value
、timestamp
、headers
)key
- 現在のレコードのキーvalue
- 現在のレコードの値context
-ProcessorContext
、現在のレコードのメタデータへのアクセスを許可します
式は byte[]
または String
( UTF-8
を使用して byte[]
に変換される) を返す必要があります。
ストリーム内でエンリッチャーを使用するには:
.process(() -> new HeaderEnricherProcessor(expressions))
プロセッサーは key
または value
を変更しません。ヘッダーを追加するだけです。
レコードごとに新しいインスタンスが必要です。 |
.process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap))
これは簡単な例で、1 つのリテラルヘッダーと 1 つの変数を追加します。
Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()"));
ProcessorSupplier supplier = () -> new HeaderEnricher<String, String>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
.process(() -> supplier)
.to(OUTPUT);
MessagingProcessor
バージョン 3.0 では、ContextualProcessor
の MessagingProcessor
拡張機能が追加され、非推奨の Transformer
インターフェースを実装した非推奨の MessagingTransformer
と同じ機能が提供されました。これにより、Kafka ストリームトポロジが Spring Integration フローなどの Spring メッセージングコンポーネントと対話できるようになります。トランスには MessagingFunction
の実装が必要です。
@FunctionalInterface
public interface MessagingFunction {
Message<?> exchange(Message<?> message);
}
Spring Integration は、その GatewayProxyFactoryBean
を使用した実装を自動的に提供します。また、キー、値、メタデータ (ヘッダーを含む) を Spring メッセージング Message<?>
との間で変換するために MessagingMessageConverter
が必要です。詳細については、【 KStream
から Spring Integration フローを呼び出す ] を参照してください。
デシリアライズ例外からの回復
バージョン 2.3 では、逆直列化例外が発生したときに何らかのアクションを実行できる RecoveringDeserializationExceptionHandler
が導入されました。RecoveringDeserializationExceptionHandler
が実装である DeserializationExceptionHandler
については、Kafka のドキュメントを参照してください。RecoveringDeserializationExceptionHandler
は ConsumerRecordRecoverer
実装で構成されます。フレームワークは、失敗したレコードを配信不能トピックに送信する DeadLetterPublishingRecoverer
を提供します。この回復プログラムの詳細については、デッドレターレコードの公開を参照してください。
回復者を構成するには、次のプロパティをストリーム構成に追加します。
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
...
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
RecoveringDeserializationExceptionHandler.class);
props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
...
return new KafkaStreamsConfiguration(props);
}
@Bean
public DeadLetterPublishingRecoverer recoverer() {
return new DeadLetterPublishingRecoverer(kafkaTemplate(),
(record, ex) -> new TopicPartition("recovererDLQ", -1));
}
もちろん、recoverer()
Bean は ConsumerRecordRecoverer
の独自の実装にすることができます。
Kafka ストリームの例
次の例は、この章で取り上げたすべてのトピックを組み合わせたものです。
@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return new KafkaStreamsConfiguration(props);
}
@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
return fb -> fb.setStateListener((newState, oldState) -> {
System.out.println("State transition from " + oldState + " to " + newState);
});
}
@Bean
public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
stream
.mapValues((ValueMapper<String, String>) String::toUpperCase)
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMillis(1_000)))
.reduce((String value1, String value2) -> value1 + value2,
Named.as("windowStore"))
.toStream()
.map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
.filter((i, s) -> s.length() > 40)
.to("streamingTopic2");
stream.print(Printed.toSysOut());
return stream;
}
}