Apache Kafka サポート

Apache Kafka (英語) は、spring-kafka プロジェクトの自動構成を提供することによりサポートされます。

Kafka 構成は、spring.kafka.*外部構成プロパティによって制御されます。例: application.properties で次のセクションを宣言できます。

  • プロパティ

  • YAML

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring:
  kafka:
    bootstrap-servers: "localhost:9092"
    consumer:
      group-id: "myGroup"
起動時にトピックを作成するには、型 NewTopic [Apache] (英語) の Bean を追加します。トピックがすでに存在する場合、Bean は無視されます。

サポートされるオプションの詳細については、KafkaProperties (Javadoc) を参照してください。

メッセージの送信

Spring の KafkaTemplate (Javadoc) は自動構成されており、次の例に示すように、独自の Bean に直接自動接続できます。

  • Java

  • Kotlin

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final KafkaTemplate<String, String> kafkaTemplate;

	public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
		this.kafkaTemplate = kafkaTemplate;
	}

	// ...

	public void someMethod() {
		this.kafkaTemplate.send("someTopic", "Hello");
	}

}
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val kafkaTemplate: KafkaTemplate<String, String>) {

	// ...

	fun someMethod() {
		kafkaTemplate.send("someTopic", "Hello")
	}

}
プロパティ spring.kafka.producer.transaction-id-prefix が定義されている場合、KafkaTransactionManager (Javadoc) が自動的に構成されます。また、RecordMessageConverter (Javadoc) Bean が定義されている場合、自動構成された KafkaTemplate (Javadoc) に自動的に関連付けられます。

メッセージの受信

Apache Kafka インフラストラクチャが存在する場合、任意の Bean に @KafkaListener (Javadoc) のアノテーションを付けてリスナーエンドポイントを作成できます。KafkaListenerContainerFactory (Javadoc) が定義されていない場合は、spring.kafka.listener.* で定義されたキーを使用してデフォルトの KafkaListenerContainerFactory (Javadoc) が自動的に構成されます。

次のコンポーネントは、someTopic トピックにリスナーエンドポイントを作成します。

  • Java

  • Kotlin

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@KafkaListener(topics = "someTopic")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@KafkaListener(topics = ["someTopic"])
	fun processMessage(content: String?) {
		// ...
	}

}

KafkaTransactionManager (Javadoc) Bean が定義されている場合、コンテナーファクトリに自動的に関連付けられます。同様に、RecordFilterStrategy (Javadoc) CommonErrorHandler (Javadoc) AfterRollbackProcessor (Javadoc) または ConsumerAwareRebalanceListener (Javadoc) Bean が定義されている場合、デフォルトのファクトリに自動的に関連付けられます。

リスナーの型に応じて、RecordMessageConverter (Javadoc) または BatchMessageConverter (Javadoc) Bean がデフォルトのファクトリに関連付けられます。バッチリスナーに RecordMessageConverter (Javadoc) Bean のみが存在する場合は、BatchMessageConverter (Javadoc) にラップされます。

カスタム ChainedKafkaTransactionManager (Javadoc) は通常、自動構成された KafkaTransactionManager (Javadoc) Bean を参照するため、@Primary (Javadoc) としてマークする必要があります。

Kafka ストリーム

Spring for Apache Kafka は、StreamsBuilder [Apache] (英語) オブジェクトを作成し、そのストリームのライフサイクルを管理するためのファクトリ Bean を提供します。kafka-streams がクラスパス上にあり、Kafka ストリームが @EnableKafkaStreams (Javadoc) アノテーションによって有効になっている限り、Spring Boot は必要な KafkaStreamsConfiguration (Javadoc) Bean を自動構成します。

Kafka ストリームを有効にすると、アプリケーション ID とブートストラップサーバーを設定する必要があります。前者は spring.kafka.streams.application-id を使用して設定でき、設定されていない場合はデフォルトで spring.application.name になります。後者はグローバルに設定することも、ストリームに対してのみ具体的にオーバーライドすることもできます。

専用プロパティを使用して、いくつかの追加プロパティを利用できます。他の任意の Kafka プロパティは、spring.kafka.streams.properties 名前空間を使用して設定できます。詳細については、Kafka の追加プロパティも参照してください。

提供時の Bean を使用するには、次の例に示すように、StreamsBuilder [Apache] (英語) @Bean (Javadoc) に接続します。

  • Java

  • Kotlin

import java.util.Locale;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.support.serializer.JsonSerde;

@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {

	@Bean
	public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
		KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
		stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>()));
		return stream;
	}

	private KeyValue<Integer, String> uppercaseValue(Integer key, String value) {
		return new KeyValue<>(key, value.toUpperCase(Locale.getDefault()));
	}

}
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Produced
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafkaStreams
import org.springframework.kafka.support.serializer.JsonSerde

@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
class MyKafkaStreamsConfiguration {

	@Bean
	fun kStream(streamsBuilder: StreamsBuilder): KStream<Int, String> {
		val stream = streamsBuilder.stream<Int, String>("ks1In")
		stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), JsonSerde()))
		return stream
	}

	private fun uppercaseValue(key: Int, value: String): KeyValue<Int?, String?> {
		return KeyValue(key, value.uppercase())
	}

}

デフォルトでは、StreamsBuilder [Apache] (英語) オブジェクトによって管理されるストリームは自動的に開始されます。この動作は、spring.kafka.streams.auto-startup プロパティを使用してカスタマイズできます。

Kafka の追加プロパティ

自動構成でサポートされるプロパティは、付録の統合プロパティセクションに示されています。ほとんどの場合、これらのプロパティ (ハイフン付きまたは camelCase) は、Apache Kafka のドット付きプロパティに直接マップされることに注意してください。詳細については、Apache Kafka のドキュメントを参照してください。

名前にクライアント型 (producerconsumeradmin または streams) が含まれていないプロパティは共通とみなされ、すべてのクライアントに適用されます。これらの共通プロパティのほとんどは、必要に応じて 1 つ以上のクライアント型に対してオーバーライドできます。

Apache Kafka は、重要度が HIGH、MEDIUM、LOW のプロパティを指定します。Spring Boot 自動構成は、すべての HIGH 重要度プロパティ、一部の選択された MEDIUM および LOW プロパティ、およびデフォルト値を持たないプロパティをサポートします。

Kafka でサポートされているプロパティのサブセットのみが、KafkaProperties (Javadoc) クラスを通じて直接使用できます。直接サポートされていない追加のプロパティを使用して個々のクライアント型を構成する場合は、次のプロパティを使用します。

  • プロパティ

  • YAML

spring.kafka.properties[prop.one]=first
spring.kafka.admin.properties[prop.two]=second
spring.kafka.consumer.properties[prop.three]=third
spring.kafka.producer.properties[prop.four]=fourth
spring.kafka.streams.properties[prop.five]=fifth
spring:
  kafka:
    properties:
      "[prop.one]": "first"
    admin:
      properties:
        "[prop.two]": "second"
    consumer:
      properties:
        "[prop.three]": "third"
    producer:
      properties:
        "[prop.four]": "fourth"
    streams:
      properties:
        "[prop.five]": "fifth"

これにより、共通の prop.one Kafka プロパティが first (プロデューサー、コンシューマー、管理者、ストリームに適用)、prop.two 管理プロパティが secondprop.three コンシューマープロパティが thirdprop.four プロデューサープロパティが fourth、および prop.five ストリームプロパティが fifth に設定されます。

Spring、Kafka、JsonDeserializer (Javadoc) を次のように構成することもできます。

  • プロパティ

  • YAML

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another
spring:
  kafka:
    consumer:
      value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
      properties:
        "[spring.json.value.default.type]": "com.example.Invoice"
        "[spring.json.trusted.packages]": "com.example.main,com.example.another"

同様に、ヘッダーで型情報を送信する JsonSerializer (Javadoc) のデフォルトの動作を無効にすることもできます。

  • プロパティ

  • YAML

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false
spring:
  kafka:
    producer:
      value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
      properties:
        "[spring.json.add.type.headers]": false
この方法で設定されたプロパティは、Spring Boot が明示的にサポートする構成アイテムをオーバーライドします。

組み込み Kafka を使用したテスト

Spring for Apache Kafka は、組み込みの Apache Kafka ブローカーを使用してプロジェクトをテストする便利な方法を提供します。この機能を使用するには、spring-kafka-test モジュールの @EmbeddedKafka (Javadoc) を使用してテストクラスにアノテーションを付けます。詳細については、Spring for Apache Kafka リファレンスマニュアルを参照してください。

Spring Boot の自動構成を前述の組み込み Apache Kafka ブローカーで動作させるには、組み込みブローカーアドレスのシステムプロパティ (EmbeddedKafkaBroker (Javadoc) によって設定される) を Apache Kafka の Spring Boot 構成プロパティに再マップする必要があります。これを行うには、いくつかの方法があります。

  • 組み込みブローカーアドレスをテストクラスの spring.kafka.bootstrap-servers にマップするシステムプロパティを提供します。

  • Java

  • Kotlin

	static {
		System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
	}
	init {
		System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers")
	}
  • Java

  • Kotlin

import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.context.EmbeddedKafka;

@SpringBootTest
@EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {

	// ...

}
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.kafka.test.context.EmbeddedKafka

@SpringBootTest
@EmbeddedKafka(topics = ["someTopic"], bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {

	// ...

}
  • 構成プロパティでプレースホルダーを使用します。

  • プロパティ

  • YAML

spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
spring:
  kafka:
    bootstrap-servers: "${spring.embedded.kafka.brokers}"