最新の安定バージョンについては、Spring for Apache Kafka 3.3.10 を使用してください!  | 
クイックツアー
 前提条件: Apache Kafka をインストールして実行する必要があります。次に、Spring for Apache Kafka (spring-kafka) JAR とそのすべての依存関係をクラスパスに配置する必要があります。これを行う最も簡単な方法は、ビルドツールで依存関係を宣言することです。
Spring Boot を使用していない場合は、プロジェクトの依存関係として spring-kafka jar を宣言してください。
| Spring Boot を使用している場合(および start.spring.io を使用してプロジェクトを作成していない場合)、バージョンを省略すると、Boot は Boot バージョンと互換性のある正しいバージョンを自動的に取り込みます。 | 
Maven
Gradle
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>
implementation 'org.springframework.kafka:spring-kafka'
ただし、開始する最も簡単な方法は、start.spring.io(または Spring ToolSuits と IntellijIDEA のウィザード)を使用してプロジェクトを作成し、依存関係として "Spring for Apache Kafka" を選択することです。
入門
開始する最も簡単な方法は、start.spring.io(または Spring ToolSuits と IntellijIDEA のウィザード)を使用してプロジェクトを作成し、依存関係として "Spring for Apache Kafka" を選択することです。インフラストラクチャ Bean の自動構成の詳細については、Spring Boot ドキュメントを参照してください。
これが最小限のコンシューマーアプリケーションです。
Spring Boot コンシューマーアプリ
Java
Kotlin
@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("topic1")
                .partitions(10)
                .replicas(1)
                .build();
    }
    @KafkaListener(id = "myId", topics = "topic1")
    public void listen(String in) {
        System.out.println(in);
    }
}
@SpringBootApplication
class Application {
    @Bean
    fun topic() = NewTopic("topic1", 10, 1)
    @KafkaListener(id = "myId", topics = ["topic1"])
    fun listen(value: String?) {
        println(value)
    }
}
fun main(args: Array<String>) = runApplication<Application>(*args)
spring.kafka.consumer.auto-offset-reset=earliestNewTopic Bean により、トピックがブローカー上に作成されます。トピックがすでに存在する場合は必要ありません。
Spring Boot プロデューサーアプリ
Java
Kotlin
@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("topic1")
                .partitions(10)
                .replicas(1)
                .build();
    }
    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("topic1", "test");
        };
    }
}
@SpringBootApplication
class Application {
    @Bean
    fun topic() = NewTopic("topic1", 10, 1)
    @Bean
    fun runner(template: KafkaTemplate<String?, String?>) =
        ApplicationRunner { template.send("topic1", "test") }
    companion object {
        @JvmStatic
        fun main(args: Array<String>) = runApplication<Application>(*args)
    }
}
Java 構成を使用 (Spring Boot なし)
Spring for Apache Kafka は、Spring アプリケーションコンテキストで使用するように設計されています。例: Spring コンテキストの外部でリスナーコンテナーを自分で作成した場合、コンテナーが実装するすべての ...Aware インターフェースを満たさない限り、すべての関数が機能するとは限りません。 | 
SpringBoot を使用しないアプリケーションの例を次に示します。Consumer と Producer の両方があります。
Java
Kotlin
public class Sender {
    public static void main(String[] args) {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Config.class);
        context.getBean(Sender.class).send("test", 42);
    }
    private final KafkaTemplate<Integer, String> template;
    public Sender(KafkaTemplate<Integer, String> template) {
        this.template = template;
    }
    public void send(String toSend, int key) {
        this.template.send("topic1", key, toSend);
    }
}
public class Listener {
    @KafkaListener(id = "listen1", topics = "topic1")
    public void listen1(String in) {
        System.out.println(in);
    }
}
@Configuration
@EnableKafka
public class Config {
    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String>
                        kafkaListenerContainerFactory(ConsumerFactory<Integer, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        return factory;
    }
    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerProps());
    }
    private Map<String, Object> consumerProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // ...
        return props;
    }
    @Bean
    public Sender sender(KafkaTemplate<Integer, String> template) {
        return new Sender(template);
    }
    @Bean
    public Listener listener() {
        return new Listener();
    }
    @Bean
    public ProducerFactory<Integer, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(senderProps());
    }
    private Map<String, Object> senderProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //...
        return props;
    }
    @Bean
    public KafkaTemplate<Integer, String> kafkaTemplate(ProducerFactory<Integer, String> producerFactory) {
        return new KafkaTemplate<>(producerFactory);
    }
}
class Sender(private val template: KafkaTemplate<Int, String>) {
    fun send(toSend: String, key: Int) {
        template.send("topic1", key, toSend)
    }
}
class Listener {
    @KafkaListener(id = "listen1", topics = ["topic1"])
    fun listen1(`in`: String) {
        println(`in`)
    }
}
@Configuration
@EnableKafka
class Config {
    @Bean
    fun kafkaListenerContainerFactory(consumerFactory: ConsumerFactory<Int, String>) =
        ConcurrentKafkaListenerContainerFactory<Int, String>().also { it.consumerFactory = consumerFactory }
    @Bean
    fun consumerFactory() = DefaultKafkaConsumerFactory<Int, String>(consumerProps)
    val consumerProps = mapOf(
        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092",
        ConsumerConfig.GROUP_ID_CONFIG to "group",
        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to IntegerDeserializer::class.java,
        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest"
    )
    @Bean
    fun sender(template: KafkaTemplate<Int, String>) = Sender(template)
    @Bean
    fun listener() = Listener()
    @Bean
    fun producerFactory() = DefaultKafkaProducerFactory<Int, String>(senderProps)
    val senderProps = mapOf(
        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092",
        ProducerConfig.LINGER_MS_CONFIG to 10,
        ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to IntegerSerializer::class.java,
        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java
    )
    @Bean
    fun kafkaTemplate(producerFactory: ProducerFactory<Int, String>) = KafkaTemplate(producerFactory)
}
ご覧のとおり、Spring Boot を使用しない場合は、いくつかのインフラストラクチャ Bean を定義する必要があります。