Apache Kafka サポート

Apache Kafka (英語) は、spring-kafka プロジェクトの自動構成を提供することによりサポートされます。

Kafka 構成は、spring.kafka.*外部構成プロパティによって制御されます。例: application.properties で次のセクションを宣言できます。

  • プロパティ

  • YAML

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring:
  kafka:
    bootstrap-servers: "localhost:9092"
    consumer:
      group-id: "myGroup"
起動時にトピックを作成するには、型 NewTopic の Bean を追加します。トピックがすでに存在する場合、Bean は無視されます。

サポートされるオプションの詳細については、KafkaProperties (Javadoc) を参照してください。

メッセージの送信

Spring の KafkaTemplate は自動構成されており、次の例に示すように、独自の Bean に直接オートワイヤーできます。

  • Java

  • Kotlin

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final KafkaTemplate<String, String> kafkaTemplate;

	public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
		this.kafkaTemplate = kafkaTemplate;
	}

	// ...

	public void someMethod() {
		this.kafkaTemplate.send("someTopic", "Hello");
	}

}
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val kafkaTemplate: KafkaTemplate<String, String>) {

	// ...

	fun someMethod() {
		kafkaTemplate.send("someTopic", "Hello")
	}

}
プロパティ spring.kafka.producer.transaction-id-prefix が定義されている場合、KafkaTransactionManager が自動的に構成されます。また、RecordMessageConverter Bean が定義されている場合、自動構成された KafkaTemplate に自動的に関連付けられます。

メッセージの受信

Apache Kafka インフラストラクチャが存在する場合、任意の Bean に @KafkaListener のアノテーションを付けてリスナーエンドポイントを作成できます。KafkaListenerContainerFactory が定義されていない場合は、spring.kafka.listener.* で定義されたキーを使用してデフォルトの KafkaListenerContainerFactory が自動的に構成されます。

次のコンポーネントは、someTopic トピックにリスナーエンドポイントを作成します。

  • Java

  • Kotlin

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@KafkaListener(topics = "someTopic")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@KafkaListener(topics = ["someTopic"])
	fun processMessage(content: String?) {
		// ...
	}

}

KafkaTransactionManager Bean が定義されている場合、自動的にコンテナーファクトリに関連付けられます。同様に、RecordFilterStrategyCommonErrorHandlerAfterRollbackProcessor または ConsumerAwareRebalanceListener Bean が定義されている場合、自動的にデフォルトのファクトリに関連付けられます。

リスナーの型に応じて、RecordMessageConverter または BatchMessageConverter Bean がデフォルトファクトリに関連付けられます。RecordMessageConverter Bean のみがバッチリスナーに存在する場合、BatchMessageConverter にラップされます。

カスタム ChainedKafkaTransactionManager は、通常、自動構成された KafkaTransactionManager Bean を参照するため、@Primary とマークする必要があります。

Kafka ストリーム

Spring for Apache Kafka は、StreamsBuilder オブジェクトを作成し、そのストリームのライフサイクルを管理するためのファクトリ Bean を提供します。Spring Boot は、kafka-streams がクラスパス上にあり、Kafka ストリームが @EnableKafkaStreams アノテーションによって有効になっている限り、必要な KafkaStreamsConfiguration Bean を自動構成します。

Kafka ストリームを有効にすると、アプリケーション ID とブートストラップサーバーを設定する必要があります。前者は spring.kafka.streams.application-id を使用して設定でき、設定されていない場合はデフォルトで spring.application.name になります。後者はグローバルに設定することも、ストリームに対してのみ具体的にオーバーライドすることもできます。

専用プロパティを使用して、いくつかの追加プロパティを利用できます。他の任意の Kafka プロパティは、spring.kafka.streams.properties 名前空間を使用して設定できます。詳細については、Kafka の追加プロパティも参照してください。

ファクトリ Bean を使用するには、次の例に示すように、StreamsBuilder を @Bean に接続します。

  • Java

  • Kotlin

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.support.serializer.JsonSerde;

@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {

	@Bean
	public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
		KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
		stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>()));
		return stream;
	}

	private KeyValue<Integer, String> uppercaseValue(Integer key, String value) {
		return new KeyValue<>(key, value.toUpperCase());
	}

}
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Produced
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafkaStreams
import org.springframework.kafka.support.serializer.JsonSerde

@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
class MyKafkaStreamsConfiguration {

	@Bean
	fun kStream(streamsBuilder: StreamsBuilder): KStream<Int, String> {
		val stream = streamsBuilder.stream<Int, String>("ks1In")
		stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), JsonSerde()))
		return stream
	}

	private fun uppercaseValue(key: Int, value: String): KeyValue<Int?, String?> {
		return KeyValue(key, value.uppercase())
	}

}

デフォルトでは、StreamBuilder オブジェクトによって管理されるストリームは自動的に開始されます。spring.kafka.streams.auto-startup プロパティを使用して、この動作をカスタマイズできます。

Kafka の追加プロパティ

自動構成でサポートされるプロパティは、付録の “統合プロパティ” セクションに示されています。ほとんどの場合、これらのプロパティ (ハイフン付きまたはキャメルケース) は、Apache Kafka のドット付きプロパティに直接マップされることに注意してください。詳細については、Apache Kafka のドキュメントを参照してください。

名前にクライアント型 (producerconsumeradmin または streams) が含まれていないプロパティは共通とみなされ、すべてのクライアントに適用されます。これらの共通プロパティのほとんどは、必要に応じて 1 つ以上のクライアント型に対してオーバーライドできます。

Apache Kafka は、重要度が HIGH、MEDIUM、LOW のプロパティを指定します。Spring Boot 自動構成は、すべての HIGH 重要度プロパティ、一部の選択された MEDIUM および LOW プロパティ、およびデフォルト値を持たないプロパティをサポートします。

Kafka でサポートされるプロパティのサブセットのみが、KafkaProperties クラスを通じて直接使用できます。直接サポートされていない追加のプロパティを使用して個々のクライアント型を構成する場合は、次のプロパティを使用します。

  • プロパティ

  • YAML

spring.kafka.properties[prop.one]=first
spring.kafka.admin.properties[prop.two]=second
spring.kafka.consumer.properties[prop.three]=third
spring.kafka.producer.properties[prop.four]=fourth
spring.kafka.streams.properties[prop.five]=fifth
spring:
  kafka:
    properties:
      "[prop.one]": "first"
    admin:
      properties:
        "[prop.two]": "second"
    consumer:
      properties:
        "[prop.three]": "third"
    producer:
      properties:
        "[prop.four]": "fourth"
    streams:
      properties:
        "[prop.five]": "fifth"

これにより、共通の prop.one Kafka プロパティが first (プロデューサー、コンシューマー、管理者、ストリームに適用)、prop.two 管理プロパティが secondprop.three コンシューマープロパティが thirdprop.four プロデューサープロパティが fourth、および prop.five ストリームプロパティが fifth に設定されます。

Spring Kafka JsonDeserializer は、次のように構成することもできます。

  • プロパティ

  • YAML

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another
spring:
  kafka:
    consumer:
      value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
      properties:
        "[spring.json.value.default.type]": "com.example.Invoice"
        "[spring.json.trusted.packages]": "com.example.main,com.example.another"

同様に、型情報をヘッダーで送信する JsonSerializer のデフォルトの動作を無効にできます。

  • プロパティ

  • YAML

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false
spring:
  kafka:
    producer:
      value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
      properties:
        "[spring.json.add.type.headers]": false
この方法で設定されたプロパティは、Spring Boot が明示的にサポートする構成アイテムをオーバーライドします。

組み込み Kafka を使用したテスト

Spring for Apache Kafka は、組み込みの Apache Kafka ブローカーを使用してプロジェクトをテストする便利な方法を提供します。この機能を使用するには、spring-kafka-test モジュールの @EmbeddedKafka を使用してテストクラスにアノテーションを付けます。詳細については、Spring for Apache Kafka リファレンスマニュアルを参照してください。

Spring Boot 自動構成を前述の組み込み Apache Kafka ブローカーで動作させるには、組み込みブローカーアドレスのシステムプロパティ (EmbeddedKafkaBroker によって設定される) を Apache Kafka の Spring Boot 構成プロパティに再マップする必要があります。これを行うには、いくつかの方法があります。

  • 組み込みブローカーアドレスをテストクラスの spring.kafka.bootstrap-servers にマップするシステムプロパティを提供します。

  • Java

  • Kotlin

	static {
		System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
	}
	init {
		System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers")
	}
  • @EmbeddedKafka アノテーションでプロパティ名を構成します。

  • Java

  • Kotlin

import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.context.EmbeddedKafka;

@SpringBootTest
@EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {

	// ...

}
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.kafka.test.context.EmbeddedKafka

@SpringBootTest
@EmbeddedKafka(topics = ["someTopic"], bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {

	// ...

}
  • 構成プロパティでプレースホルダーを使用します。

  • プロパティ

  • YAML

spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
spring:
  kafka:
    bootstrap-servers: "${spring.embedded.kafka.brokers}"