プログラミングモデルの付属品
単一のアプリケーション内の複数の Kafka ストリームプロセッサー
バインダーを使用すると、単一の Spring Cloud Stream アプリケーション内に複数の Kafka ストリームプロセッサーを含めることができます。以下のような申し込みが可能です。
@Bean
public java.util.function.Function<KStream<Object, String>, KStream<Object, String>> process() {
...
}
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
...
}
@Bean
public java.util.function.BiFunction<KStream<Object, String>, KTable<Integer, String>, KStream<Object, String>> yetAnotherProcess() {
...
}
この場合、バインダーは、異なるアプリケーション ID を持つ 3 つの個別の Kafka Streams オブジェクトを作成します(これについては以下で詳しく説明します)。ただし、アプリケーションに複数のプロセッサーがある場合は、どの機能をアクティブにする必要があるかを Spring Cloud Stream に通知する必要があります。機能を有効にする方法は次のとおりです。
spring.cloud.function.definition: process;anotherProcess;yetAnotherProcess
特定の機能をすぐにアクティブにしたくない場合は、このリストから削除できます。
これは、同じアプリケーションに単一の Kafka Streams プロセッサーと他の型の Function
Bean があり、異なるバインダーを介して処理される場合にも当てはまります。(たとえば、通常の Kafka メッセージチャネルバインダーに基づく関数 Bean)
Kafka ストリームアプリケーション ID
アプリケーション ID は、Kafka Streams アプリケーションに提供する必要のある必須のプロパティです。Spring Cloud Stream Kafka Streams バインダーを使用すると、このアプリケーション ID を複数の方法で構成できます。
アプリケーションにプロセッサーが 1 つしかない場合は、次のプロパティを使用してバインダーレベルでこれを設定できます。
spring.cloud.stream.kafka.streams.binder.applicationId
.
便宜上、プロセッサーが 1 つしかない場合は、プロパティとして spring.application.name
を使用してアプリケーション ID を委譲することもできます。
アプリケーションに複数の Kafka Streams プロセッサーがある場合は、プロセッサーごとにアプリケーション ID を設定する必要があります。機能モデルの場合は、プロパティとして各関数に接続できます。
たとえば次の機能があると想像してください。
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
...
}
および
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
...
}
次に、次のバインダーレベルのプロパティを使用して、それぞれのアプリケーション ID を設定できます。
spring.cloud.stream.kafka.streams.binder.functions.process.applicationId
および
spring.cloud.stream.kafka.streams.binder.functions.anotherProcess.applicationId
関数ベースのモデルの場合も、アプリケーション ID をバインディングレベルで設定するこのアプローチは機能します。ただし、機能モデルを使用している場合は、上記のようにバインダーレベルで関数ごとに設定する方がはるかに簡単です。
本番デプロイの場合、構成を通じてアプリケーション ID を明示的に指定することを強くお勧めします。これは、アプリケーションを自動スケーリングする場合に特に重要になります。その場合、同じアプリケーション ID で各インスタンスをデプロイしていることを確認する必要があります。
アプリケーションがアプリケーション ID を提供しない場合、バインダーは静的アプリケーション ID を自動生成します。これは、アプリケーション ID を明示的に指定する必要がないため、開発シナリオで便利です。この方法で生成されたアプリケーション ID は、アプリケーションの再起動後も静的になります。機能モデルの場合、生成されたアプリケーション ID は、関数 Bean 名の後にリテラル applicationID
が続きます。たとえば、process
の場合は process-applicationID
、関数 Bean 名の場合は process-applicationID
です。
アプリケーション ID 設定の概要
デフォルトでは、バインダーは関数メソッドごとにアプリケーション ID を自動生成します。
単一のプロセッサーを使用している場合は、
spring.kafka.streams.applicationId
、spring.application.name
、spring.cloud.stream.kafka.streams.binder.applicationId
を使用できます。複数のプロセッサーがある場合は、プロパティ
spring.cloud.stream.kafka.streams.binder.functions.<function-name>.applicationId
を使用して関数ごとにアプリケーション ID を設定できます。
バインダーによって生成されたデフォルトのバインディング名を機能スタイルでオーバーライドする
デフォルトでは、バインダーは上記の戦略を使用して、機能スタイルを使用するときにバインディング名を生成します。つまり、<function-bean-name>-<in> | <out>-[0..n]、例: process-in-0、process-out-0 など。これらのバインディング名をオーバーライドする場合は、次のプロパティを指定することでそれを行うことができます。
spring.cloud.stream.function.bindings.<default binding name>
。デフォルトのバインディング名は、バインダーによって生成された元のバインディング名です。
たとえばたとえば、この機能があります。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
バインダーは、process-in-0
、process-in-1
、process-out-0
という名前のバインディングを生成します。完全に別のもの、おそらくさらにドメイン固有のバインディング名に変更したい場合は、以下のように行うことができます。
spring.cloud.stream.function.bindings.process-in-0=users
spring.cloud.stream.function.bindings.process-in-0=regions
および
spring.cloud.stream.function.bindings.process-out-0=clicks
その後、これらの新しいバインディング名にすべてのバインディングレベルプロパティを設定する必要があります。
上記の関数型プログラミングモデルでは、ほとんどの状況でデフォルトのバインディング名を順守することが理にかなっていることに注意してください。このオーバーライドを実行する必要がある唯一の理由は、構成プロパティの数が多く、バインディングをよりドメインに適したものにマップする場合です。
ブートストラップサーバー構成のセットアップ
Kafka Streams アプリケーションを実行するときは、Kafka ブローカーサーバー情報を提供する必要があります。この情報を提供しない場合、バインダーは、デフォルトの localhost:9092
でブローカーを実行していることを想定しています。そうでない場合は、それをオーバーライドする必要があります。これを行うにはいくつかの方法があります。
Boot プロパティの使用 -
spring.kafka.bootstrapServers
バインダーレベルのプロパティ -
spring.cloud.stream.kafka.streams.binder.brokers
バインダーレベルのプロパティに関しては、通常の Kafka バインダーである spring.cloud.stream.kafka.binder.brokers
を介して提供されるブローカープロパティを使用するかどうかは関係ありません。Kafka Streams バインダーは、最初に Kafka Streams バインダー固有のブローカープロパティが設定されているかどうかを確認し(spring.cloud.stream.kafka.streams.binder.brokers
)、見つからない場合は spring.cloud.stream.kafka.binder.brokers
を探します。