トピックの構成

アプリケーションコンテキストで KafkaAdmin Bean を定義すると、ブローカーにトピックが自動的に追加されます。これを行うには、トピックごとに NewTopic @Bean をアプリケーションコンテキストに追加します。バージョン 2.3 では、このような Bean の作成をより便利にするために、新しいクラス TopicBuilder が導入されました。次の例は、その方法を示しています。

  • Java

  • Kotlin

@Bean
public KafkaAdmin admin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    return new KafkaAdmin(configs);
}

@Bean
public NewTopic topic1() {
    return TopicBuilder.name("thing1")
            .partitions(10)
            .replicas(3)
            .compact()
            .build();
}

@Bean
public NewTopic topic2() {
    return TopicBuilder.name("thing2")
            .partitions(10)
            .replicas(3)
            .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
            .build();
}

@Bean
public NewTopic topic3() {
    return TopicBuilder.name("thing3")
            .assignReplicas(0, List.of(0, 1))
            .assignReplicas(1, List.of(1, 2))
            .assignReplicas(2, List.of(2, 0))
            .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
            .build();
}
@Bean
fun admin() = KafkaAdmin(mapOf(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092"))

@Bean
fun topic1() =
    TopicBuilder.name("thing1")
        .partitions(10)
        .replicas(3)
        .compact()
        .build()

@Bean
fun topic2() =
    TopicBuilder.name("thing2")
        .partitions(10)
        .replicas(3)
        .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
        .build()

@Bean
fun topic3() =
    TopicBuilder.name("thing3")
        .assignReplicas(0, Arrays.asList(0, 1))
        .assignReplicas(1, Arrays.asList(1, 2))
        .assignReplicas(2, Arrays.asList(2, 0))
        .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
        .build()

バージョン 2.6 以降では、partitions() または replicas()、あるいはその両方を省略でき、ブローカーのデフォルトがこれらのプロパティに適用されます。この機能をサポートするには、ブローカーのバージョンが少なくとも 2.4.0 である必要があります。KIP-464 [Apache] (英語) を参照してください。

  • Java

  • Kotlin

@Bean
public NewTopic topic4() {
    return TopicBuilder.name("defaultBoth")
            .build();
}

@Bean
public NewTopic topic5() {
    return TopicBuilder.name("defaultPart")
            .replicas(1)
            .build();
}

@Bean
public NewTopic topic6() {
    return TopicBuilder.name("defaultRepl")
            .partitions(3)
            .build();
}
@Bean
fun topic4() = TopicBuilder.name("defaultBoth").build()

@Bean
fun topic5() = TopicBuilder.name("defaultPart").replicas(1).build()

@Bean
fun topic6() = TopicBuilder.name("defaultRepl").partitions(3).build()

バージョン 2.7 以降、単一の KafkaAdmin.NewTopics Bean 定義で複数の NewTopic を宣言できます。

  • Java

  • Kotlin

@Bean
public KafkaAdmin.NewTopics topics456() {
    return new NewTopics(
            TopicBuilder.name("defaultBoth")
                .build(),
            TopicBuilder.name("defaultPart")
                .replicas(1)
                .build(),
            TopicBuilder.name("defaultRepl")
                .partitions(3)
                .build());
}
@Bean
fun topics456() = KafkaAdmin.NewTopics(
    TopicBuilder.name("defaultBoth")
        .build(),
    TopicBuilder.name("defaultPart")
        .replicas(1)
        .build(),
    TopicBuilder.name("defaultRepl")
        .partitions(3)
        .build()
)
Spring Boot を使用する場合、KafkaAdmin Bean が自動的に登録されるため、必要なのは NewTopic (および / または NewTopics) @Bean のみです。

デフォルトでは、ブローカーが使用できない場合、メッセージはログに記録されますが、コンテキストは引き続きロードされます。プログラムで管理者の initialize() メソッドを呼び出して、後で再試行できます。この状態を致命的と見なす場合は、管理者の fatalIfBrokerNotAvailable プロパティを true に設定します。その後、コンテキストは初期化に失敗します。

ブローカーがそれをサポートしている場合(1.0.0 以降)、既存のトピックのパーティションが NewTopic.numPartitions より少ないことがわかった場合、管理者はパーティションの数を増やします。

バージョン 2.7 以降、KafkaAdmin は、実行時にトピックを作成および検査するためのメソッドを提供します。

  • createOrModifyTopics

  • describeTopics

より高度な機能については、AdminClient を直接使用できます。次の例は、その方法を示しています。

@Autowired
private KafkaAdmin admin;

...

    AdminClient client = AdminClient.create(admin.getConfigurationProperties());
    ...
    client.close();

バージョン 2.9.10、3.0.9 以降では、特定の NewTopic Bean を作成または変更するかどうかを決定するために使用できる Predicate<NewTopic> を提供できるようになりました。これは、たとえば、異なるクラスターを指す複数の KafkaAdmin インスタンスがあり、各管理者が作成または変更する必要があるトピックを選択したい場合に便利です。

admin.setCreateOrModifyTopic(nt -> !nt.name().equals("dontCreateThisOne"));