Debezium サポート
Debezium エンジン (英語) 、変更データキャプチャー (CDC) 受信 チャネルアダプター。DebeziumMessageProducer
を使用すると、データベース変更イベントをキャプチャーしてメッセージに変換し、後で送信チャネルにストリーミングできます。
Spring Integration Debezium 依存関係をプロジェクトに含める必要があります。
また、入力データベースに debezium コネクター (英語) の依存関係を含める必要があります。たとえば、Debezium を PostgreSQL で使用するには、postgres debezium コネクターが必要になります。
Maven
Gradle
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<version>${debezium-version}</version>
</dependency>
compile "io.debezium:debezium-connector-postgres:{debezium-version}"
|
受信 Debezium チャネルアダプター
Debezium アダプターは、事前構成された DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>>
インスタンスを想定しています。
debezium サプライヤー [GitHub] (英語) は、便利な DebeziumProperties [GitHub] (英語) 構成抽象化を備えた、すぐに使える |
Debezium Java DSL は、提供された |
さらに、DebeziumMessageProducer
は次の構成プロパティを使用して調整できます。
contentType
-JSON
(デフォルト)、AVRO
、PROTOBUF
メッセージコンテンツの処理を許可します。contentTypemust
は、提供されたDebeziumEngine.Builder
用に構成されたSerializationFormat
と揃えられます。enableBatch
-false
(デフォルト) に設定すると、debezium アダプターは、ソースデータベースから受信したChangeEvent
データ変更イベントごとに新しいMessage
を送信します。true
に設定すると、アダプターは Debezium エンジンから受信したChangeEvent
のバッチごとに 1 つのMessage
をダウンストリームに送信します。このようなペイロードは直列化できないため、カスタムの直列化 / 逆直列化の実装が必要になります。enableEmptyPayload
- tombstone (別名削除) メッセージのサポートを有効にします。データベース行の削除時に、Debezium は、削除された行と同じキーとOptional.empty
の値を持つ tombstone 変更イベントを送信できます。デフォルトはfalse
です。headerMapper
-ChangeEvent
ヘッダーを選択してMessage
ヘッダーに変換できるカスタムHeaderMapper
実装。デフォルトのDefaultDebeziumHeaderMapper
実装では、setHeaderNamesToMap
に setter が提供されます。デフォルトでは、すべてのヘッダーがマップされます。taskExecutor
- Debezium エンジンにカスタムTaskExecutor
を設定します。
次のコードスニペットは、このチャネルアダプターのさまざまな構成を示しています。
Java 構成を使用した構成
次の Spring Boot アプリケーションは、Java 構成で受信アダプターを構成する方法の例を示しています。
@SpringBootApplication
public class DebeziumJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(DebeziumJavaApplication.class)
.web(WebApplicationType.NONE)
.run(args);
}
@Bean
public MessageChannel debeziumInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer debeziumMessageProducer(
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
MessageChannel debeziumInputChannel) {
DebeziumMessageProducer debeziumMessageProducer =
new DebeziumMessageProducer(debeziumEngineBuilder);
debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
return debeziumMessageProducer;
}
@ServiceActivator(inputChannel = "debeziumInputChannel")
public void handler(Message<?> message) {
Object destination = message.getHeaders().get(DebeziumHeaders.DESTINATION); (1)
String key = new String((byte[]) message.getHeaders().get(DebeziumHeaders.KEY)); (2)
String payload = new String((byte[]) message.getPayload()); (3)
System.out.println("KEY: " + key + ", DESTINATION: " + destination + ", PAYLOAD: " + payload);
}
}
1 | イベントの対象となる論理宛先の名前。通常、宛先は topic.prefix 構成オプション、データベース名、テーブル名で構成されます。例: my-topic.inventory.orders 。 |
2 | 変更されたテーブルのキーと変更された行の実際のキーのスキーマが含まれます。キースキーマとそれに対応するキーペイロードの両方には、コネクターがイベントを作成した時点で、変更されたテーブルの PRIMARY KEY (または一意制約) の各列のフィールドが含まれています。 |
3 | キーと同様に、ペイロードにはスキーマセクションとペイロード値セクションがあります。スキーマセクションには、ネストされたフィールドを含む、ペイロード値セクションのエンベロープ構造を記述するスキーマが含まれています。データを作成、更新、削除する操作の変更イベントはすべて、エンベロープ構造の値ペイロードを持っています。 |
|
同様に、受信した変更イベントをバッチで処理するように DebeziumMessageProducer
を構成できます。
@Bean
public MessageProducer debeziumMessageProducer(
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
MessageChannel debeziumInputChannel) {
DebeziumMessageProducer debeziumMessageProducer = new DebeziumMessageProducer(debeziumEngineBuilder);
debeziumMessageProducer.setEnableBatch(true);
debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
return debeziumMessageProducer;
}
@ServiceActivator(inputChannel = "debeziumInputChannel")
public void handler(List<ChangeEvent<Object, Object>> payload) {
System.out.println(payload);
}
Debezium Java DSL サポート
spring-integration-debezium
は、Debezium
ファクトリと DebeziumMessageProducerSpec
実装を介して、便利な Java DSL Fluent API を提供します。
Debezium Java DSL の受信チャネルアダプターは次のとおりです。
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder = ...
IntegrationFlow.from(
Debezium.inboundChannelAdapter(debeziumEngineBuilder)
.headerNames("special*")
.contentType("application/json")
.enableBatch(false))
.handle(m -> System.out.println(new String((byte[]) m.getPayload())))
または、ネイティブの debezium 構成プロパティから DebeziumMessageProducerSpec
インスタンスを作成し、デフォルトで JSON
直列化形式に設定します。
Properties debeziumConfig = ...
IntegrationFlow
.from(Debezium.inboundChannelAdapter(debeziumConfig))
.handle(m -> System.out.println(new String((byte[]) m.getPayload())))
次の Spring Boot アプリケーションは、Java DSL を使用して受信アダプターを構成する例を示しています。
@SpringBootApplication
public class DebeziumJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(DebeziumJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow debeziumInbound(
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder) {
return IntegrationFlow
.from(Debezium
.inboundChannelAdapter(debeziumEngineBuilder)
.headerNames("special*")
.contentType("application/json")
.enableBatch(false))
.handle(m -> System.out.println(new String((byte[]) m.getPayload())))
.get();
}
}