JDBC メッセージストア
Spring Integration は、2 つの JDBC 固有のメッセージストア実装を提供します。JdbcMessageStore
は、アグリゲーターおよびクレームチェックパターンでの使用に適しています。JdbcChannelMessageStore
実装は、メッセージチャネル専用の、よりターゲットを絞ったスケーラブルな実装を提供します。
JdbcMessageStore
を使用してメッセージチャネルをバックアップできることに注意してください。JdbcChannelMessageStore
はその目的に最適化されています。
バージョン 5.0.11、5.1.2 以降、JdbcChannelMessageStore のインデックスが最適化されました。このようなストアに大きなメッセージグループがある場合は、インデックスを変更することができます。さらに、PriorityChannel のインデックスはコメント化されています。これは、JDBC によってサポートされているチャネルを使用しない限り、インデックスが必要ないためです。 |
OracleChannelMessageStoreQueryProvider を使用する場合、優先チャネルインデックスはクエリのヒントに含まれているため、追加する必要があります。 |
データベースの初期化
JDBC メッセージストアコンポーネントの使用を開始する前に、適切なオブジェクトでターゲットデータベースをプロビジョニングする必要があります。
Spring Integration には、データベースの初期化に使用できるサンプルスクリプトがいくつか付属しています。spring-integration-jdbc
JAR ファイルには、org.springframework.integration.jdbc
パッケージ内のスクリプトが含まれています。さまざまな一般的なデータベースプラットフォームの作成例と削除スクリプトの例を提供します。これらのスクリプトを使用する一般的な方法は、Spring JDBC データソース初期化子で参照することです。スクリプトは、サンプルとして、必要なテーブル名と列名の仕様として提供されていることに注意してください。本番環境で使用するために拡張する必要がある場合があります(たとえば、インデックス宣言を追加することによって)。
バージョン 6.2 以降、JdbcMessageStore
、JdbcChannelMessageStore
、JdbcMetadataStore
、DefaultLockRepository
は SmartLifecycle
を実装し、start()
メソッドでそれぞれのテーブルに対して `SELECT COUNT` クエリを実行して、必要なテーブル (指定されたプレフィックスに従って) がターゲットデータベースに存在することを確認します。必要なテーブルが存在しない場合、アプリケーションコンテキストは起動に失敗します。このチェックは setCheckDatabaseOnStart(false)
によって無効にできます。
汎用 JDBC メッセージストア
JDBC モジュールは、データベースに裏付けされた Spring Integration MessageStore
(クレームチェックパターンで重要)および MessageGroupStore
(アグリゲーターなどのステートフルパターンで重要)の実装を提供します。両方のインターフェースは JdbcMessageStore
によって実装されており、次の例に示すように、XML でストアインスタンスを構成するためのサポートがあります。
<int-jdbc:message-store id="messageStore" data-source="dataSource"/>
DataSource
の代わりに JdbcTemplate
を指定できます。
次の例は、他のいくつかのオプション属性を示しています。
<int-jdbc:message-store id="messageStore" data-source="dataSource" table-prefix="MY_INT_"/>
前の例では、ストアによって生成されたクエリ内のテーブル名のプレフィックスを指定しました。テーブル名のプレフィックスのデフォルトは INT_
です。
バッキングメッセージチャネル
JDBC でメッセージチャネルをサポートする場合は、JdbcChannelMessageStore
実装を使用することをお勧めします。メッセージチャネルと組み合わせてのみ機能します。
サポートされているデータベース
JdbcChannelMessageStore
は、データベース固有の SQL クエリを使用して、データベースからメッセージを取得します。JdbcChannelMessageStore
で ChannelMessageStoreQueryProvider
プロパティを設定する必要があります。この channelMessageStoreQueryProvider
は、指定した特定のデータベースの SQL クエリを提供します。Spring Integration は、次のリレーショナルデータベースのサポートを提供します。
PostgreSQL
HSQLDB
MySQL
Oracle
Derby
H2
SqlServer
Sybase
DB2
データベースがリストにない場合は、ChannelMessageStoreQueryProvider
インターフェースを実装し、独自のカスタムクエリを提供できます。
バージョン 4.0 は、メッセージが同じミリ秒で保存されている場合でも先入れ先出し(FIFO)キューを確保するために、MESSAGE_SEQUENCE
列をテーブルに追加しました。
バージョン 6.2 以降、ChannelMessageStoreQueryProvider
は isSingleStatementForPoll
フラグを公開し、PostgresChannelMessageStoreQueryProvider
は true
を返し、ポーリングのクエリは単一の DELETE…RETURNING
ステートメントに基づくようになりました。単一のポーリングステートメントのみがサポートされている場合、JdbcChannelMessageStore
は isSingleStatementForPoll
オプションを参照し、別の DELETE
ステートメントをスキップします。
カスタムメッセージ挿入
バージョン 5.0 以降、ChannelMessageStorePreparedStatementSetter
クラスをオーバーロードすることにより、JdbcChannelMessageStore
へのメッセージ挿入のカスタム実装を提供できます。これを使用して、異なる列を設定したり、テーブル構造や直列化戦略を変更したりできます。例: byte[]
へのデフォルトの直列化の代わりに、その構造を JSON 文字列として保存できます。
次の例では、setValues
のデフォルト実装を使用して共通の列を格納し、メッセージペイロードを varchar
として格納する動作をオーバーライドします。
public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter {
@Override
public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage,
Object groupId, String region, boolean priorityEnabled) throws SQLException {
// Populate common columns
super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
// Store message payload as varchar
preparedStatement.setString(6, requestMessage.getPayload().toString());
}
}
通常、キューイングにリレーショナルデータベースを使用することはお勧めしません。代わりに、可能であれば、代わりに JMS または AMQP でバックアップされたチャネルの使用を検討してください。詳細については、次のリソースを参照してください。 データベースをキューとして使用する予定がある場合は、PostgreSQL とその通知メカニズムの使用を検討してください。これについては、後続のセクションで説明します。 |
同時ポーリング
メッセージチャネルをポーリングする場合、Poller
を TaskExecutor
参照で構成するオプションがあります。
ただし、JDBC でバックアップされたメッセージチャネルを使用し、チャネルをポーリングして、複数のスレッドでメッセージストアのトランザクションを実行する予定の場合は、マルチバージョン同時実行制御 [Wikipedia] (英語) (MVCC) をサポートするリレーショナルデータベースを使用する必要があります。そうしないと、ロックが課題になる可能性があり、複数のスレッドを使用する場合のパフォーマンスが期待どおりに実現されない可能性があります。たとえば、その点では Apache Derby に課題があります。 JDBC キューのスループットを向上させ、異なるスレッドがキューから同じ
|
優先チャンネル
バージョン 4.0 以降、JdbcChannelMessageStore
は PriorityCapableChannelMessageStore
を実装し、priorityEnabled
オプションを提供し、priority-queue
インスタンスの message-store
参照として使用できるようにします。この目的のために、INT_CHANNEL_MESSAGE
テーブルには PRIORITY
メッセージヘッダーの値を格納する MESSAGE_PRIORITY
列があります。さらに、新しい MESSAGE_SEQUENCE
列により、同じ優先度で同じミリ秒に複数のメッセージが保存されている場合でも、堅牢な先入れ先出し(FIFO)ポーリングメカニズムを実現できます。メッセージは、order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE
を使用してデータベースからポーリング(選択)されます。
priorityEnabled オプションはストア全体に適用され、適切な FIFO キューセマンティクスはキューチャネルに対して保持されないため、同じ JdbcChannelMessageStore Bean を優先キューチャネルと非優先キューチャネルに使用することはお勧めしません。ただし、同じ INT_CHANNEL_MESSAGE テーブル(および region も)を両方の JdbcChannelMessageStore 型に使用できます。そのシナリオを構成するには、次の例に示すように、一方のメッセージストア Bean を他方から拡張できます。 |
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>
<int:channel id="queueChannel">
<int:queue message-store="channelStore"/>
</int:channel>
<bean id="priorityStore" parent="channelStore">
<property name="priorityEnabled" value="true"/>
</bean>
<int:channel id="priorityChannel">
<int:priority-queue message-store="priorityStore"/>
</int:channel>
メッセージストアのパーティション分割
JdbcMessageStore
は、同じアプリケーション内のアプリケーションまたはノードのグループのグローバルストアとして使用するのが一般的です。名前の衝突に対する保護を提供し、データベースのメタデータ構成を制御できるようにするために、メッセージストアでは、2 つの方法でテーブルをパーティション分割できます。1 つの方法は、プレフィックスを変更することにより、個別のテーブル名を使用することです(前述のとおり)。もう 1 つの方法は、単一のテーブル内でデータをパーティション分割するために region
名を指定することです。2 番目のアプローチの重要な使用例は、MessageStore
が Spring Integration メッセージチャネルをサポートする永続キューを管理している場合です。永続チャネルのメッセージデータは、チャネル名のストアにキー入力されます。その結果、チャネル名がグローバルに一意でない場合、チャネルはそれらに向けられていないデータを取得できます。この危険を回避するために、メッセージストア region
を使用して、同じ論理名を持つ異なる物理チャネルのデータを別々に保つことができます。
PostgreSQL: プッシュ通知の受信
PostgreSQL は、データベーステーブル操作時にプッシュ通知を受信するためのリッスンおよび通知フレームワークを提供します。Spring Integration はこのメカニズム (バージョン 6.0 以降) を利用して、新しいメッセージが JdbcChannelMessageStore
に追加されたときにプッシュ通知を受信できるようにします。この機能を使用する場合、データベーストリガーを定義する必要があります。これは、Spring Integration の JDBC モジュールに含まれる schema-postgresql.sql
ファイルのコメントの一部として見つかります。
プッシュ通知は PostgresChannelMessageTableSubscriber
クラスを介して受信されます。これにより、サブスクライバーは、特定の region
および groupId
の新しいメッセージの到着時にコールバックを受信できます。これらの通知は、メッセージが別の JVM に追加されましたが、同じデータベースに追加された場合でも受信されます。PostgresSubscribableChannel
実装では、PostgresChannelMessageTableSubscriber.Subscription
契約を使用して、前述の PostgresChannelMessageTableSubscriber
通知からの通知に対する反応として、ストアからメッセージをプルします。
例: some group
のプッシュ通知は次のように受信できます。
@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(dataSource);
messageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
return messageStore;
}
@Bean
public PostgresChannelMessageTableSubscriber subscriber(
@Value("${spring.datasource.url}") String url,
@Value("${spring.datasource.username}") String username,
@Value("${spring.datasource.password}") String password) {
return new PostgresChannelMessageTableSubscriber(() ->
DriverManager.getConnection(url, username, password).unwrap(PgConnection.class));
}
@Bean
public PostgresSubscribableChannel channel(
PostgresChannelMessageTableSubscriber subscriber,
JdbcChannelMessageStore messageStore) {
return new PostgresSubscribableChannel(messageStore, "some group", subscriber);
}
取引サポート
バージョン 6.0.5 以降、PostgresSubscribableChannel
で PlatformTransactionManager
を指定すると、トランザクションでサブスクライバーに通知されます。サブスクライバーで例外が発生すると、トランザクションがロールバックされ、メッセージがメッセージストアに戻されます。デフォルトでは、トランザクションサポートはアクティブ化されていません。
再試行
バージョン 6.0.5 以降、RetryTemplate
を PostgresSubscribableChannel
に提供することで、再試行ポリシーを指定できます。デフォルトでは、再試行は実行されません。
アクティブな このように排他的な接続が必要になるため、JVM は任意の数のサブスクリプションを登録するために使用できる単一の |