アプリケーションのテスト

spring-kafka-test jar には、アプリケーションのテストに役立つ便利なユーティリティがいくつか含まれています。

組み込み Kafka ブローカー

次の 2 つの実装が提供されています。

  • EmbeddedKafkaZKBroker - 埋め込み Zookeeper インスタンスを起動するレガシー実装 (EmbeddedKafka を使用する場合は依然としてデフォルト)。

  • EmbeddedKafkaKraftBroker - 複合コントローラーおよびブローカーモードでは、Zookeeper ではなく Kraft を使用します (3.1 以降)。

次のセクションで説明するように、ブローカーを構成するにはいくつかの手法があります。

KafkaTestUtils

org.springframework.kafka.test.utils.KafkaTestUtils は、レコードを消費したり、さまざまなレコードオフセットを取得したりするための多くの静的ヘルパーメソッドを提供します。詳細については、Javadoc を参照してください。

JUnit

org.springframework.kafka.test.utils.KafkaTestUtils は、プロデューサーとコンシューマーのプロパティを設定するための静的メソッドもいくつか提供します。次のリストは、それらのメソッドシグネチャーを示しています。

/**
 * Set up test properties for an {@code <Integer, String>} consumer.
 * @param group the group id.
 * @param autoCommit the auto commit.
 * @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
 * @return the properties.
 */
public static Map<String, Object> consumerProps(String group, String autoCommit,
                                       EmbeddedKafkaBroker embeddedKafka) { ... }

/**
 * Set up test properties for an {@code <Integer, String>} producer.
 * @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
 * @return the properties.
 */
public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafka) { ... }

バージョン 2.5 以降、consumerProps メソッドは ConsumerConfig.AUTO_OFFSET_RESET_CONFIG を earliest に設定します。これは、ほとんどの場合、テストケースで送信されたメッセージをコンシューマーが消費する必要があるためです。ConsumerConfig のデフォルトは latest です。つまり、コンシューマーが開始する前にテストによって送信されたメッセージは、それらのレコードを受信しません。以前の動作に戻すには、メソッドを呼び出した後にプロパティを latest に設定します。

組み込みブローカーを使用する場合、クロストークを防ぐために、テストごとに異なるトピックを使用するのが一般的なベストプラクティスです。何らかの理由でこれが不可能な場合、consumeFromEmbeddedTopics メソッドのデフォルトの動作は、割り当て後に割り当てられたパーティションを先頭にシークすることであることに注意してください。コンシューマープロパティにアクセスできないため、seekToEnd ブール値パラメーターを受け取るオーバーロードされたメソッドを使用して、最初ではなく最後までシークする必要があります。

組み込み Kafka サーバーおよび組み込み Zookeeper サーバーを作成するために、EmbeddedKafkaZKBroker 用の JUnit 4 @Rule ラッパーが提供されています。(JUnit 5 での @EmbeddedKafka の使用については、"@EmbeddedKafka アノテーション" を参照してください)。次のリストは、これらのメソッドのシグネチャーを示しています。

/**
 * Create embedded Kafka brokers.
 * @param count the number of brokers.
 * @param controlledShutdown passed into TestUtils.createBrokerConfig.
 * @param topics the topics to create (2 partitions per).
 */
public EmbeddedKafkaRule(int count, boolean controlledShutdown, String... topics) { ... }

/**
 *
 * Create embedded Kafka brokers.
 * @param count the number of brokers.
 * @param controlledShutdown passed into TestUtils.createBrokerConfig.
 * @param partitions partitions per topic.
 * @param topics the topics to create.
 */
public EmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics) { ... }
EmbeddedKafkaKraftBroker は JUnit4 ではサポートされていません。

EmbeddedKafkaBroker クラスには、作成したすべてのトピックを使用できるユーティリティメソッドがあります。次の例は、その使用方法を示しています。

Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
embeddedKafka.consumeFromAllEmbeddedTopics(consumer);

KafkaTestUtils には、コンシューマーから結果を取得するためのユーティリティメソッドがいくつかあります。次のリストは、それらのメソッドシグネチャーを示しています。

/**
 * Poll the consumer, expecting a single record for the specified topic.
 * @param consumer the consumer.
 * @param topic the topic.
 * @return the record.
 * @throws org.junit.ComparisonFailure if exactly one record is not received.
 */
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic) { ... }

/**
 * Poll the consumer for records.
 * @param consumer the consumer.
 * @return the records.
 */
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer) { ... }

次の例は、KafkaTestUtils の使用方法を示しています。

...
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = KafkaTestUtils.getSingleRecord(consumer, "topic");
...

組み込み Kafka および組み込み Zookeeper サーバーが EmbeddedKafkaBroker によって開始されると、spring.embedded.kafka.brokers という名前のシステムプロパティが Kafka ブローカーのアドレスに設定され、spring.embedded.zookeeper.connect という名前のシステムプロパティが Zookeeper のアドレスに設定されます。このプロパティには便利な定数 (EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS および EmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT) が提供されています。

デフォルトの spring.embedded.kafka.brokers システムプロパティの代わりに、Kafka ブローカーのアドレスを任意の便利なプロパティに公開できます。この目的のために、組み込み Kafka を開始する前に spring.embedded.kafka.brokers.property (EmbeddedKafkaBroker.BROKER_LIST_PROPERTY) システムプロパティを設定できます。例: Spring Boot では、spring.kafka.bootstrap-servers 構成プロパティが Kafka クライアントの自動構成用にそれぞれ設定されることが期待されます。ランダムポートで組み込み Kafka を使用してテストを実行する前に、spring.embedded.kafka.brokers.property=spring.kafka.bootstrap-servers をシステムプロパティとして設定できます。これにより、EmbeddedKafkaBroker はそれを使用してブローカーアドレスを公開します。これがこのプロパティのデフォルト値になりました (バージョン 3.0.10 以降)。

EmbeddedKafkaBroker.brokerProperties(Map<String, String>) を使用すると、Kafka サーバーに追加のプロパティを提供できます。可能なブローカープロパティの詳細については、Kafka 構成 [Apache] (英語) を参照してください。

トピックの構成

次の構成例では、5 つのパーティションを持つ cat および hat というトピック、10 のパーティションを持つ thing1 というトピック、および 15 のパーティションを持つ thing2 というトピックを作成します。

public class MyTests {

    @ClassRule
    private static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, false, 5, "cat", "hat");

    @Test
    public void test() {
        embeddedKafkaRule.getEmbeddedKafka()
              .addTopics(new NewTopic("thing1", 10, (short) 1), new NewTopic("thing2", 15, (short) 1));
        ...
    }

}

デフォルトでは、addTopics は問題が発生した場合 (すでに存在するトピックの追加など) に例外をスローします。バージョン 2.6 は、Map<String, Exception> を返すそのメソッドの新しいバージョンを追加しました。キーはトピック名で、値は成功の場合は null、失敗の場合は Exception です。

複数のテストクラスに同じブローカーを使用する

次のようなものを使用して、複数のテストクラスに同じブローカーを使用できます。

public final class EmbeddedKafkaHolder {

    private static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaZKBroker(1, false)
            .brokerListProperty("spring.kafka.bootstrap-servers");

    private static boolean started;

    public static EmbeddedKafkaBroker getEmbeddedKafka() {
        if (!started) {
            try {
                embeddedKafka.afterPropertiesSet();
            }
            catch (Exception e) {
                throw new KafkaException("Embedded broker failed to start", e);
            }
            started = true;
        }
        return embeddedKafka;
    }

    private EmbeddedKafkaHolder() {
        super();
    }

}

これは、Spring Boot 環境を想定しており、組み込みブローカーがブートストラップサーバーのプロパティを置き換えます。

次に、各テストクラスで、次のようなものを使用できます。

static {
    EmbeddedKafkaHolder.getEmbeddedKafka().addTopics("topic1", "topic2");
}

private static final EmbeddedKafkaBroker broker = EmbeddedKafkaHolder.getEmbeddedKafka();

Spring Boot を使用していない場合は、broker.getBrokersAsString() を使用してブートストラップサーバーを取得できます。

上記の例では、すべてのテストが完了したときにブローカーをシャットダウンするメカニズムは提供されていません。たとえば、Gradle デーモンでテストを実行する場合、これは問題になる可能性があります。このような状況ではこの手法を使用しないでください。または、テストが完了したら、何かを使用して EmbeddedKafkaBroker で destroy() を呼び出す必要があります。

バージョン 3.0 以降、フレームワークは JUnit プラットフォームの GlobalEmbeddedKafkaTestExecutionListener を公開します。デフォルトでは無効になっています。これには、JUnit プラットフォーム 1.8 以降が必要です。このリスナーの目的は、テスト計画全体に対して 1 つのグローバル EmbeddedKafkaBroker を開始し、計画の最後で停止することです。このリスナーを有効にして、プロジェクト内のすべてのテストに対して 1 つのグローバル組み込み Kafka クラスターを使用するには、システムプロパティまたは JUnit プラットフォーム構成を介して spring.kafka.global.embedded.enabled プロパティを true に設定する必要があります。さらに、次のプロパティを提供できます。

  • spring.kafka.embedded.count - 管理する Kafka ブローカーの数。

  • spring.kafka.embedded.ports - 起動するすべての Kafka ブローカーのポート (カンマ区切り値)、ランダムポートが優先される場合は 0。値の数は、前述の count と同じである必要があります。

  • spring.kafka.embedded.topics - 開始された Kafka クラスターで作成するトピック (コンマ区切り値)。

  • spring.kafka.embedded.partitions - 作成されたトピック用にプロビジョニングするパーティションの数。

  • spring.kafka.embedded.broker.properties.location - 追加の Kafka ブローカー構成プロパティのファイルの場所。このプロパティの値は、Spring リソース抽象化パターンに従う必要があります。

  • spring.kafka.embedded.kraft - デフォルトは false です。true の場合は、EmbeddedKafkaZKBroker ではなく EmbeddedKafkaKraftBroker を使用します。

基本的に、これらのプロパティは @EmbeddedKafka 属性の一部を模倣しています。

構成プロパティと JUnit 5 ユーザーガイド (英語) で指定する方法の詳細については、を参照してください。例: spring.embedded.kafka.brokers.property=my.bootstrap-servers エントリは、テストクラスパスの junit-platform.properties ファイルに追加できます。バージョン 3.0.10 以降、ブローカーは、Spring Boot アプリケーションでのテストのために、これをデフォルトで自動的に spring.kafka.bootstrap-servers に設定します。

グローバルな組み込み Kafka とテストごとのクラスを 1 つのテストスイートに結合しないことをお勧めします。どちらも同じシステムプロパティを共有しているため、予期しない動作が発生する可能性が非常に高くなります。
spring-kafka-test は、junit-jupiter-api および junit-platform-launcher (後者はグローバル組み込みブローカーをサポートするため) に推移的な依存関係があります。組み込みブローカーを使用したいが、JUnit を使用していない場合は、これらの依存関係を除外することをお勧めします。

@EmbeddedKafka アノテーション

一般に、ルールを @ClassRule として使用して、テスト間でブローカーを開始および停止しないようにすることをお勧めします (テストごとに異なるトピックを使用します)。バージョン 2.0 以降、Spring のテストアプリケーションコンテキストキャッシングを使用する場合、EmbeddedKafkaBroker Bean も宣言できるため、単一のブローカーを複数のテストクラスで使用できます。便宜上、EmbeddedKafkaBroker Bean を登録するための @EmbeddedKafka というテストクラスレベルのアノテーションを提供します。次の例は、その使用方法を示しています。

@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(partitions = 1,
         topics = {
                 KafkaStreamsTests.STREAMING_TOPIC1,
                 KafkaStreamsTests.STREAMING_TOPIC2 })
public class KafkaStreamsTests {

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @Test
    public void someTest() {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", this.embeddedKafka);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
        Consumer<Integer, String> consumer = cf.createConsumer();
        this.embeddedKafka.consumeFromAnEmbeddedTopic(consumer, KafkaStreamsTests.STREAMING_TOPIC2);
        ConsumerRecords<Integer, String> replies = KafkaTestUtils.getRecords(consumer);
        assertThat(replies.count()).isGreaterThanOrEqualTo(1);
    }

    @Configuration
    @EnableKafkaStreams
    public static class KafkaStreamsConfiguration {

        @Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
        private String brokerAddresses;

        @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
        public KafkaStreamsConfiguration kStreamsConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
            return new KafkaStreamsConfiguration(props);
        }

    }

}

バージョン 2.2.4 以降、@EmbeddedKafka アノテーションを使用して Kafka ポートプロパティを指定することもできます。

バージョン 3.2 以降では、EmbeddedKafkaZKBroker の代わりに EmbeddedKafkaKraftBroker を使用するには、kraft プロパティを true に設定します。

次の例では、@EmbeddedKafka サポートプロパティプレースホルダー解決の topicsbrokerPropertiesbrokerPropertiesLocation 属性を設定します。

@TestPropertySource(locations = "classpath:/test.properties")
@EmbeddedKafka(topics = { "any-topic", "${kafka.topics.another-topic}" },
        brokerProperties = { "log.dir=${kafka.broker.logs-dir}",
                            "listeners=PLAINTEXT://localhost:${kafka.broker.port}",
                            "auto.create.topics.enable=${kafka.broker.topics-enable:true}" },
        brokerPropertiesLocation = "classpath:/broker.properties")

前の例では、プロパティプレースホルダー ${kafka.topics.another-topic}${kafka.broker.logs-dir}${kafka.broker.port} は Spring Environment から解決されます。さらに、ブローカーのプロパティは、brokerPropertiesLocation によって指定された broker.properties クラスパスリソースからロードされます。プロパティプレースホルダーは、brokerPropertiesLocation URL およびリソース内で見つかったすべてのプロパティプレースホルダーに対して解決されます。brokerProperties によって定義されたプロパティは、brokerPropertiesLocation にあるプロパティをオーバーライドします。

@EmbeddedKafka アノテーションは JUnit 4 または JUnit 5 で使用できます。

JUnit5 による @EmbeddedKafka アノテーション

バージョン 2.3 以降、JUnit5 で @EmbeddedKafka アノテーションを使用するには 2 つの方法があります。@SpringJunitConfig アノテーションとともに使用すると、組み込みブローカーがテストアプリケーションコンテキストに追加されます。クラスまたはメソッドレベルでブローカーをテストにオートワイヤーして、ブローカーのアドレスリストを取得できます。

Spring Test コンテキストを使用しない場合、EmbdeddedKafkaCondition はブローカーを作成します。条件にはパラメーターリゾルバーが含まれているため、テストメソッドでブローカーにアクセスできます。

@EmbeddedKafka
public class EmbeddedKafkaConditionTests {

    @Test
    public void test(EmbeddedKafkaBroker broker) {
        String brokerList = broker.getBrokersAsString();
        ...
    }

}

@EmbeddedKafka アノテーションが付けられたクラスに ExtendWith(SpringExtension.class) アノテーション (またはメタアノテーション) が付けられていない限り、スタンドアロンブローカー (Spring の TestContext の外部) が作成されます。@SpringJunitConfig と @SpringBootTest にはメタアノテーションが付けられており、これらのアノテーションのいずれかが存在する場合にはコンテキストベースのブローカーが使用されます。

使用可能な Spring Test アプリケーションコンテキストがある場合、トピックとブローカープロパティにはプロパティプレースホルダーを含めることができ、プロパティがどこかで定義されている限り解決されます。使用可能な Spring コンテキストがない場合、これらのプレースホルダーは解決されません。

@SpringBootTest アノテーションに埋め込まれたブローカー

Spring Initializr は、テストスコープの spring-kafka-test 依存関係をプロジェクト構成に自動的に追加するようになりました。

アプリケーションが spring-cloud-stream で Kafka バインダーを使用し、テストに組み込みブローカーを使用する場合は、spring-cloud-stream-test-support 依存関係を削除する必要があります。これは、実際のバインダーがテストケースのテストバインダーに置き換わるためです。一部のテストでテストバインダーを使用し、一部のテストで埋め込みブローカーを使用する場合、実際のバインダーを使用するテストでは、テストクラスのバインダー自動構成を除外して、テストバインダーを無効にする必要があります。次の例は、その方法を示しています。

@RunWith(SpringRunner.class)
@SpringBootTest(properties = "spring.autoconfigure.exclude="
    + "org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration")
public class MyApplicationTests {
    ...
}

Spring Boot アプリケーションテストで組み込みブローカーを使用するには、いくつかの方法があります。

それらには次のものが含まれます。

JUnit4 クラスルール

次の例は、JUnit4 クラスルールを使用して組み込みブローカーを作成する方法を示しています。

@RunWith(SpringRunner.class)
@SpringBootTest
public class MyApplicationTests {

    @ClassRule
    public static EmbeddedKafkaRule broker = new EmbeddedKafkaRule(1, false, "someTopic")
            .brokerListProperty("spring.kafka.bootstrap-servers");

    @Autowired
    private KafkaTemplate<String, String> template;

    @Test
    public void test() {
        ...
    }

}

これは Spring Boot アプリケーションであるため、ブローカーリストプロパティをオーバーライドして Spring Boot のプロパティを設定していることに注意してください。

@EmbeddedKafka と @SpringJunitConfig

@EmbeddedKafka を @SpringJUnitConfig と一緒に使用する場合、テストクラスで @DirtiesContext を使用することをお勧めします。これは、テストスイートで複数のテストを実行した後、JVM のシャットダウン中に発生する可能性のある競合状態を防ぐためです。例: @DirtiesContext を使用しない場合、アプリケーションコンテキストがまだリソースを必要としているときに、EmbeddedKafkaBroker が早期にシャットダウンする可能性があります。すべての EmbeddedKafka テスト実行で独自の一時ディレクトリが作成されるため、この競合状態が発生すると、削除またはクリーンアップしようとしているファイルが使用できなくなったことを示すエラーログメッセージが生成されます。@DirtiesContext を追加すると、アプリケーションコンテキストが各テスト後にクリーンアップされ、キャッシュされなくなるため、このような潜在的なリソース競合状態に対する脆弱性が低くなります。

@EmbeddedKafka アノテーションまたは EmbeddedKafkaBroker Bean

次の例は、@EmbeddedKafka アノテーションを使用して組み込みブローカーを作成する方法を示しています。

@RunWith(SpringRunner.class)
@EmbeddedKafka(topics = "someTopic",
        bootstrapServersProperty = "spring.kafka.bootstrap-servers") // this is now the default
public class MyApplicationTests {

    @Autowired
    private KafkaTemplate<String, String> template;

    @Test
    public void test() {
        ...
    }

}
バージョン 3.0.10 以降、bootstrapServersProperty はデフォルトで自動的に spring.kafka.bootstrap-servers に設定されます。

Hamcrest マッチャー

org.springframework.kafka.test.hamcrest.KafkaMatchers は、次のマッチャーを提供します。

/**
 * @param key the key
 * @param <K> the type.
 * @return a Matcher that matches the key in a consumer record.
 */
public static <K> Matcher<ConsumerRecord<K, ?>> hasKey(K key) { ... }

/**
 * @param value the value.
 * @param <V> the type.
 * @return a Matcher that matches the value in a consumer record.
 */
public static <V> Matcher<ConsumerRecord<?, V>> hasValue(V value) { ... }

/**
 * @param partition the partition.
 * @return a Matcher that matches the partition in a consumer record.
 */
public static Matcher<ConsumerRecord<?, ?>> hasPartition(int partition) { ... }

/**
 * Matcher testing the timestamp of a {@link ConsumerRecord} assuming the topic has been set with
 * {@link org.apache.kafka.common.record.TimestampType#CREATE_TIME CreateTime}.
 *
 * @param ts timestamp of the consumer record.
 * @return a Matcher that matches the timestamp in a consumer record.
 */
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(long ts) {
  return hasTimestamp(TimestampType.CREATE_TIME, ts);
}

/**
 * Matcher testing the timestamp of a {@link ConsumerRecord}
 * @param type timestamp type of the record
 * @param ts timestamp of the consumer record.
 * @return a Matcher that matches the timestamp in a consumer record.
 */
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(TimestampType type, long ts) {
  return new ConsumerRecordTimestampMatcher(type, ts);
}

AssertJ 条件

次の AssertJ 条件を使用できます。

/**
 * @param key the key
 * @param <K> the type.
 * @return a Condition that matches the key in a consumer record.
 */
public static <K> Condition<ConsumerRecord<K, ?>> key(K key) { ... }

/**
 * @param value the value.
 * @param <V> the type.
 * @return a Condition that matches the value in a consumer record.
 */
public static <V> Condition<ConsumerRecord<?, V>> value(V value) { ... }

/**
 * @param key the key.
 * @param value the value.
 * @param <K> the key type.
 * @param <V> the value type.
 * @return a Condition that matches the key in a consumer record.
 * @since 2.2.12
 */
public static <K, V> Condition<ConsumerRecord<K, V>> keyValue(K key, V value) { ... }

/**
 * @param partition the partition.
 * @return a Condition that matches the partition in a consumer record.
 */
public static Condition<ConsumerRecord<?, ?>> partition(int partition) { ... }

/**
 * @param value the timestamp.
 * @return a Condition that matches the timestamp value in a consumer record.
 */
public static Condition<ConsumerRecord<?, ?>> timestamp(long value) {
  return new ConsumerRecordTimestampCondition(TimestampType.CREATE_TIME, value);
}

/**
 * @param type the type of timestamp
 * @param value the timestamp.
 * @return a Condition that matches the timestamp value in a consumer record.
 */
public static Condition<ConsumerRecord<?, ?>> timestamp(TimestampType type, long value) {
  return new ConsumerRecordTimestampCondition(type, value);
}

サンプル

次の例は、この章で説明するほとんどのトピックをまとめたものです。

public class KafkaTemplateTests {

    private static final String TEMPLATE_TOPIC = "templateTopic";

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TEMPLATE_TOPIC);

    @Test
    public void testTemplate() throws Exception {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false",
            embeddedKafka.getEmbeddedKafka());
        DefaultKafkaConsumerFactory<Integer, String> cf =
                            new DefaultKafkaConsumerFactory<>(consumerProps);
        ContainerProperties containerProperties = new ContainerProperties(TEMPLATE_TOPIC);
        KafkaMessageListenerContainer<Integer, String> container =
                            new KafkaMessageListenerContainer<>(cf, containerProperties);
        final BlockingQueue<ConsumerRecord<Integer, String>> records = new LinkedBlockingQueue<>();
        container.setupMessageListener(new MessageListener<Integer, String>() {

            @Override
            public void onMessage(ConsumerRecord<Integer, String> record) {
                System.out.println(record);
                records.add(record);
            }

        });
        container.setBeanName("templateTests");
        container.start();
        ContainerTestUtils.waitForAssignment(container,
                            embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic());
        Map<String, Object> producerProps =
                            KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
        ProducerFactory<Integer, String> pf =
                            new DefaultKafkaProducerFactory<>(producerProps);
        KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
        template.setDefaultTopic(TEMPLATE_TOPIC);
        template.sendDefault("foo");
        assertThat(records.poll(10, TimeUnit.SECONDS), hasValue("foo"));
        template.sendDefault(0, 2, "bar");
        ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
        assertThat(received, hasKey(2));
        assertThat(received, hasPartition(0));
        assertThat(received, hasValue("bar"));
        template.send(TEMPLATE_TOPIC, 0, 2, "baz");
        received = records.poll(10, TimeUnit.SECONDS);
        assertThat(received, hasKey(2));
        assertThat(received, hasPartition(0));
        assertThat(received, hasValue("baz"));
    }

}

前の例では、Hamcrest マッチャーを使用しています。AssertJ では、最終的な部分は次のコードのようになります。

assertThat(records.poll(10, TimeUnit.SECONDS)).has(value("foo"));
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
// using individual assertions
assertThat(received).has(key(2));
assertThat(received).has(value("bar"));
assertThat(received).has(partition(0));
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
received = records.poll(10, TimeUnit.SECONDS);
// using allOf()
assertThat(received).has(allOf(keyValue(2, "baz"), partition(0)));

コンシューマーとプロデューサーのモックアップ

kafka-clients ライブラリは、テスト目的で MockConsumer クラスと MockProducer クラスを提供します。

バージョン 3.0.7 以降、リスナーコンテナーまたは KafkaTemplate を使用してテストの一部でこれらのクラスを使用したい場合、フレームワークは MockConsumerFactory および MockProducerFactory 実装を提供するようになりました。

これらのファクトリは、実行中の (または組み込みの) ブローカーを必要とするデフォルトのファクトリの代わりに、リスナーコンテナーおよびテンプレートで使用できます。

単一のコンシューマーを返す単純な実装の例を次に示します。

@Bean
ConsumerFactory<String, String> consumerFactory() {
    MockConsumer<String, String> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
    TopicPartition topicPartition0 = new TopicPartition("topic", 0);
    List<TopicPartition> topicPartitions = Collections.singletonList(topicPartition0);
    Map<TopicPartition, Long> beginningOffsets = topicPartitions.stream().collect(Collectors
            .toMap(Function.identity(), tp -> 0L));
    consumer.updateBeginningOffsets(beginningOffsets);
    consumer.schedulePollTask(() -> {
        consumer.addRecord(
                new ConsumerRecord<>("topic", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "test1",
                        new RecordHeaders(), Optional.empty()));
        consumer.addRecord(
                new ConsumerRecord<>("topic", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "test2",
                        new RecordHeaders(), Optional.empty()));
    });
    return new MockConsumerFactory(() -> consumer);
}

同時実行でテストしたい場合は、ファクトリのコンストラクター内の Supplier ラムダで毎回新しいインスタンスを作成する必要があります。

MockProducerFactory には 2 つのコンストラクターがあります。1 つは単純なファクトリを作成するもの、もう 1 つはトランザクションをサポートするファクトリを作成するものです。

以下に例を示します。

@Bean
ProducerFactory<String, String> nonTransFactory() {
    return new MockProducerFactory<>(() ->
            new MockProducer<>(true, new StringSerializer(), new StringSerializer()));
}

@Bean
ProducerFactory<String, String> transFactory() {
    MockProducer<String, String> mockProducer =
            new MockProducer<>(true, new StringSerializer(), new StringSerializer());
    mockProducer.initTransactions();
    return new MockProducerFactory<String, String>((tx, id) -> mockProducer, "defaultTxId");
}

2 番目のケースでは、ラムダは BiFunction<Boolean, String> であることに注意してください。呼び出し元がトランザクションプロデューサーを必要とする場合、最初のパラメーターは true になります。オプションの 2 番目のパラメーターにはトランザクション ID が含まれます。これは、(コンストラクターで提供される) デフォルトにすることも、KafkaTransactionManager (またはローカルトランザクションの場合は KafkaTemplate ) によってオーバーライドすることもできます (そのように構成されている場合)。トランザクション ID は、この値に基づいて別の MockProducer を使用する場合に提供されます。

マルチスレッド環境でプロデューサーを使用している場合、BiFunction は複数のプロデューサーを返す必要があります (おそらく ThreadLocal を使用してスレッドバインドされている)。

トランザクション MockProducer は、initTransaction() を呼び出してトランザクション用に初期化する必要があります。

MockProducer を使用する場合、送信のたびにプロデューサーを閉じたくない場合は、スーパークラスから close メソッドを呼び出さない close メソッドをオーバーライドするカスタム MockProducer 実装を提供できます。これは、同じプロデューサーを閉じずに複数のパブリッシュを検証する場合のテストに便利です。

次に例を示します。

@Bean
MockProducer<String, String> mockProducer() {
    return new MockProducer<>(false, new StringSerializer(), new StringSerializer()) {
        @Override
        public void close() {

        }
    };
}

@Bean
ProducerFactory<String, String> mockProducerFactory(MockProducer<String, String> mockProducer) {
    return new MockProducerFactory<>(() -> mockProducer);
}