Debezium サポート

Debezium エンジン (英語) 、変更データキャプチャー (CDC) 受信 チャネルアダプター。DebeziumMessageProducer を使用すると、データベース変更イベントをキャプチャーしてメッセージに変換し、後で送信チャネルにストリーミングできます。

Spring Integration Debezium 依存関係をプロジェクトに含める必要があります。

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-debezium</artifactId>
    <version>6.5.1</version>
</dependency>
compile "org.springframework.integration:spring-integration-debezium:6.5.1"

また、入力データベースに debezium コネクター (英語) の依存関係を含める必要があります。たとえば、Debezium を PostgreSQL で使用するには、postgres debezium コネクターが必要になります。

  • Maven

  • Gradle

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-postgres</artifactId>
    <version>${debezium-version}</version>
</dependency>
compile "io.debezium:debezium-connector-postgres:{debezium-version}"

debezium-version を、使用されている spring-integration-debezium バージョンと互換性のあるバージョンに置き換えます。

受信 Debezium チャネルアダプター

Debezium アダプターは、事前構成された DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> インスタンスを想定しています。

debezium サプライヤー [GitHub] (英語) は、便利な DebeziumProperties [GitHub] (英語) 構成抽象化を備えた、すぐに使える DebeziumEngine.Builder Spring Boot 自動構成を提供します。

Debezium Java DSL は、提供された DebeziumEngine.Builder だけでなく、プレーンな Debezium 構成 (例: java.util.Properties) から DebeziumMessageProducer インスタンスを作成できます。Later は、独自の構成と直列化形式を使用するいくつかの一般的なユースケースに便利です。

さらに、DebeziumMessageProducer は次の構成プロパティを使用して調整できます。

  • contentType - JSON (デフォルト)、AVROPROTOBUF メッセージコンテンツの処理を許可します。contentType must は、提供された DebeziumEngine.Builder 用に構成された SerializationFormat と揃えられます。

  • enableBatch - false (デフォルト) に設定すると、debezium アダプターは、ソースデータベースから受信した ChangeEvent データ変更イベントごとに新しい Message を送信します。true に設定すると、アダプターは Debezium エンジンから受信した ChangeEvent のバッチごとに 1 つの Message をダウンストリームに送信します。このようなペイロードは直列化できないため、カスタムの直列化 / 逆直列化の実装が必要になります。

  • enableEmptyPayload - tombstone (別名削除) メッセージのサポートを有効にします。データベース行の削除時に、Debezium は、削除された行と同じキーと Optional.empty の値を持つ tombstone 変更イベントを送信できます。デフォルトは false です。

  • headerMapper - ChangeEvent ヘッダーを選択して Message ヘッダーに変換できるカスタム HeaderMapper 実装。デフォルトの DefaultDebeziumHeaderMapper 実装では、setHeaderNamesToMap に setter が提供されます。デフォルトでは、すべてのヘッダーがマップされます。

  • taskExecutor - Debezium エンジンにカスタム TaskExecutor を設定します。

次のコードスニペットは、このチャネルアダプターのさまざまな構成を示しています。

Java 構成を使用した構成

次の Spring Boot アプリケーションは、Java 構成で受信アダプターを構成する方法の例を示しています。

@SpringBootApplication
public class DebeziumJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(DebeziumJavaApplication.class)
                .web(WebApplicationType.NONE)
                .run(args);
    }

    @Bean
    public MessageChannel debeziumInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer debeziumMessageProducer(
            DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
            MessageChannel debeziumInputChannel) {

        DebeziumMessageProducer debeziumMessageProducer =
            new DebeziumMessageProducer(debeziumEngineBuilder);
        debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
        return debeziumMessageProducer;
    }

    @ServiceActivator(inputChannel = "debeziumInputChannel")
    public void handler(Message<?> message) {

        Object destination = message.getHeaders().get(DebeziumHeaders.DESTINATION); (1)

        String key = new String((byte[]) message.getHeaders().get(DebeziumHeaders.KEY)); (2)

        String payload = new String((byte[]) message.getPayload()); (3)

        System.out.println("KEY: " + key + ", DESTINATION: " + destination + ", PAYLOAD: " + payload);
    }

}
1 イベントの対象となる論理宛先の名前。通常、宛先は topic.prefix 構成オプション、データベース名、テーブル名で構成されます。例: my-topic.inventory.orders
2 変更されたテーブルのキーと変更された行の実際のキーのスキーマが含まれます。キースキーマとそれに対応するキーペイロードの両方には、コネクターがイベントを作成した時点で、変更されたテーブルの PRIMARY KEY (または一意制約) の各列のフィールドが含まれています。
3 キーと同様に、ペイロードにはスキーマセクションとペイロード値セクションがあります。スキーマセクションには、ネストされたフィールドを含む、ペイロード値セクションのエンベロープ構造を記述するスキーマが含まれています。データを作成、更新、削除する操作の変更イベントはすべて、エンベロープ構造の値ペイロードを持っています。

key.converter.schemas.enable=false および / または value.converter.schemas.enable=false では、それぞれキーまたはペイロードのメッセージ内スキーマコンテンツを無効にすることができます。

同様に、受信した変更イベントをバッチで処理するように DebeziumMessageProducer を構成できます。

@Bean
public MessageProducer debeziumMessageProducer(
        DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
        MessageChannel debeziumInputChannel) {

    DebeziumMessageProducer debeziumMessageProducer = new DebeziumMessageProducer(debeziumEngineBuilder);
	debeziumMessageProducer.setEnableBatch(true);
    debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
    return debeziumMessageProducer;
}

@ServiceActivator(inputChannel = "debeziumInputChannel")
public void handler(List<ChangeEvent<Object, Object>> payload) {
    System.out.println(payload);
}

Debezium Java DSL サポート

spring-integration-debezium は、Debezium ファクトリと DebeziumMessageProducerSpec 実装を介して、便利な Java DSL Fluent API を提供します。

Debezium Java DSL の受信チャネルアダプターは次のとおりです。

 DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>>   debeziumEngineBuilder = ...
 IntegrationFlow.from(
    Debezium.inboundChannelAdapter(debeziumEngineBuilder)
        .headerNames("special*")
        .contentType("application/json")
        .enableBatch(false))
    .handle(m -> System.out.println(new String((byte[]) m.getPayload())))

または、ネイティブの debezium 構成プロパティから DebeziumMessageProducerSpec インスタンスを作成し、デフォルトで JSON 直列化形式に設定します。

 Properties debeziumConfig = ...
 IntegrationFlow
    .from(Debezium.inboundChannelAdapter(debeziumConfig))
    .handle(m -> System.out.println(new String((byte[]) m.getPayload())))

次の Spring Boot アプリケーションは、Java DSL を使用して受信アダプターを構成する例を示しています。

@SpringBootApplication
public class DebeziumJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(DebeziumJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow debeziumInbound(
        DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder) {

        return IntegrationFlow
                .from(Debezium
                        .inboundChannelAdapter(debeziumEngineBuilder)
					    .headerNames("special*")
					    .contentType("application/json")
					    .enableBatch(false))
                .handle(m -> System.out.println(new String((byte[]) m.getPayload())))
                .get();
    }

}