JDBC メッセージストア
Spring Integration は、JDBC 固有のメッセージストア実装を 2 つ提供します。JdbcMessageStore は、アグリゲータやクレームチェックパターンでの使用に適しています。JdbcChannelMessageStore 実装は、メッセージチャネルに特化した、よりターゲットを絞ったスケーラブルな実装を提供します。
JdbcMessageStore を使用してメッセージチャネルをバックアップできることに注意してください。JdbcChannelMessageStore はその目的に最適化されています。
バージョン 5.0.11、5.1.2 以降、JdbcChannelMessageStore のインデックスが最適化されました。このようなストアに大きなメッセージグループがある場合は、インデックスを変更することができます。さらに、PriorityChannel のインデックスはコメント化されています。これは、JDBC によってサポートされているチャネルを使用しない限り、インデックスが必要ないためです。 |
OracleChannelMessageStoreQueryProvider を使用する場合、優先チャネルインデックスはクエリのヒントに含まれているため、追加する必要があります。 |
データベースの初期化
JDBC メッセージストアコンポーネントの使用を開始する前に、適切なオブジェクトでターゲットデータベースをプロビジョニングする必要があります。
Spring Integration には、データベースの初期化に使用できるサンプルスクリプトがいくつか付属しています。spring-integration-jdbc JAR ファイルには、org.springframework.integration.jdbc パッケージにスクリプトが含まれています。このパッケージには、一般的なデータベースプラットフォーム向けの作成スクリプトと削除スクリプトのサンプルが含まれています。これらのスクリプトを使用する一般的な方法は、Spring JDBC データソース初期化子で参照することです。これらのスクリプトはサンプルとして、また必要な表名と列名の仕様として提供されていることに注意してください。本番環境での使用にあたっては、インデックス宣言を追加するなど、スクリプトを拡張する必要がある場合があります。
バージョン 6.2 以降、JdbcMessageStore、JdbcChannelMessageStore、JdbcMetadataStore、DefaultLockRepository は SmartLifecycle を実装し、start() メソッドでそれぞれのテーブルに対して `SELECT COUNT` クエリを実行して、必要なテーブル (指定されたプレフィックスに従って) がターゲットデータベースに存在することを確認します。必要なテーブルが存在しない場合、アプリケーションコンテキストは起動に失敗します。このチェックは setCheckDatabaseOnStart(false) によって無効にできます。
汎用 JDBC メッセージストア
JDBC モジュールは、データベースに裏付けされた Spring Integration MessageStore (クレームチェックパターンで重要)および MessageGroupStore (アグリゲーターなどのステートフルパターンで重要)の実装を提供します。両方のインターフェースは JdbcMessageStore によって実装されており、次の例に示すように、XML でストアインスタンスを構成するためのサポートがあります。
<int-jdbc:message-store id="messageStore" data-source="dataSource"/>DataSource の代わりに JdbcTemplate を指定できます。
次の例は、他のいくつかのオプション属性を示しています。
<int-jdbc:message-store id="messageStore" data-source="dataSource" table-prefix="MY_INT_"/> 前の例では、ストアによって生成されたクエリ内のテーブル名のプレフィックスを指定しました。テーブル名のプレフィックスのデフォルトは INT_ です。
バッキングメッセージチャネル
JDBC でメッセージチャネルをサポートする場合は、JdbcChannelMessageStore 実装を使用することをお勧めします。メッセージチャネルと組み合わせてのみ機能します。
サポートされているデータベース
JdbcChannelMessageStore は、データベース固有の SQL クエリを使用して、データベースからメッセージを取得します。JdbcChannelMessageStore で ChannelMessageStoreQueryProvider プロパティを設定する必要があります。この channelMessageStoreQueryProvider は、指定した特定のデータベースの SQL クエリを提供します。Spring Integration は、次のリレーショナルデータベースのサポートを提供します。
PostgreSQL
HSQLDB
MySQL
Oracle
Derby
H2
SqlServer
Sybase
DB2
データベースがリストにない場合は、ChannelMessageStoreQueryProvider インターフェースを実装し、独自のカスタムクエリを提供できます。
バージョン 4.0 は、メッセージが同じミリ秒で保存されている場合でも先入れ先出し(FIFO)キューを確保するために、MESSAGE_SEQUENCE 列をテーブルに追加しました。
バージョン 6.2 以降、ChannelMessageStoreQueryProvider は isSingleStatementForPoll フラグを公開します。PostgresChannelMessageStoreQueryProvider は true を返し、ポーリングのクエリは単一の DELETE…RETURNING ステートメントに基づいて実行されます。JdbcChannelMessageStore は isSingleStatementForPoll オプションを参照し、単一のポーリングステートメントのみがサポートされている場合は、別の DELETE ステートメントをスキップします。
カスタムメッセージ挿入
バージョン 5.0 以降、ChannelMessageStorePreparedStatementSetter クラスをオーバーロードすることにより、JdbcChannelMessageStore へのメッセージ挿入のカスタム実装を提供できます。これを使用して、異なる列を設定したり、テーブル構造や直列化戦略を変更したりできます。例: byte[] へのデフォルトの直列化の代わりに、その構造を JSON 文字列として保存できます。
次の例では、setValues のデフォルト実装を使用して共通の列を格納し、メッセージペイロードを varchar として格納する動作をオーバーライドします。
public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter {
@Override
public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage,
Object groupId, String region, boolean priorityEnabled) throws SQLException {
// Populate common columns
super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
// Store message payload as varchar
preparedStatement.setString(6, requestMessage.getPayload().toString());
}
}通常、キューイングにリレーショナルデータベースを使用することはお勧めしません。代わりに、可能であれば、代わりに JMS または AMQP でバックアップされたチャネルの使用を検討してください。詳細については、次のリソースを参照してください。 データベースをキューとして使用することをまだ計画している場合は、後続のセクションで説明する PostgreSQL とその通知メカニズムの使用を検討してください。 |
同時ポーリング
メッセージチャネルをポーリングする場合、Poller を TaskExecutor 参照で構成するオプションがあります。
ただし、JDBC でバックアップされたメッセージチャネルを使用し、チャネルをポーリングして、複数のスレッドでメッセージストアのトランザクションを実行する予定の場合は、マルチバージョン同時実行制御 [Wikipedia] (英語) (MVCC) をサポートするリレーショナルデータベースを使用する必要があります。そうしないと、ロックが課題になる可能性があり、複数のスレッドを使用する場合のパフォーマンスが期待どおりに実現されない可能性があります。たとえば、その点では Apache Derby に課題があります。 JDBC キューのスループットを向上させ、異なるスレッドがキューから同じ |
優先チャンネル
バージョン 4.0 以降、JdbcChannelMessageStore は PriorityCapableChannelMessageStore を実装し、priorityEnabled オプションを提供し、priority-queue インスタンスの message-store 参照として使用できるようにします。この目的のために、INT_CHANNEL_MESSAGE テーブルには PRIORITY メッセージヘッダーの値を格納する MESSAGE_PRIORITY 列があります。さらに、新しい MESSAGE_SEQUENCE 列により、同じ優先度で同じミリ秒に複数のメッセージが保存されている場合でも、堅牢な先入れ先出し(FIFO)ポーリングメカニズムを実現できます。メッセージは、order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE を使用してデータベースからポーリング(選択)されます。
priorityEnabled オプションはストア全体に適用され、適切な FIFO キューセマンティクスはキューチャネルに対して保持されないため、同じ JdbcChannelMessageStore Bean を優先キューチャネルと非優先キューチャネルに使用することはお勧めしません。ただし、同じ INT_CHANNEL_MESSAGE テーブル(および region も)を両方の JdbcChannelMessageStore 型に使用できます。そのシナリオを構成するには、次の例に示すように、一方のメッセージストア Bean を他方から拡張できます。 |
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>
<int:channel id="queueChannel">
<int:queue message-store="channelStore"/>
</int:channel>
<bean id="priorityStore" parent="channelStore">
<property name="priorityEnabled" value="true"/>
</bean>
<int:channel id="priorityChannel">
<int:priority-queue message-store="priorityStore"/>
</int:channel>メッセージストアのパーティション分割
一般的に、JdbcMessageStore は、アプリケーションのグループまたは同じアプリケーション内のノードのグローバルストアとして使用されます。名前の衝突に対する保護を提供し、データベースメタデータ構成を制御できるようにするために、メッセージストアでは、テーブルを 2 つの方法でパーティション分割できます。1 つの方法は、プレフィックスを変更して別のテーブル名を使用することです ( 前述のとおり)。もう 1 つの方法は、単一テーブル内のデータをパーティション分割するために region 名を指定することです。2 番目の方法の重要な使用例は、MessageStore が Spring Integration メッセージチャネルをバックアップする永続キューを管理している場合です。永続チャネルのメッセージデータは、ストア内でチャネル名に基づいてキー設定されます。チャネル名がグローバルに一意でない場合、チャネルは意図しないデータを取得する可能性があります。この危険を回避するために、メッセージストア region を使用して、同じ論理名を持つ異なる物理チャネルのデータを分離することができます。
PostgreSQL: プッシュ通知の受信
PostgreSQL は、データベーステーブル操作時にプッシュ通知を受信するためのリッスンおよび通知フレームワークを提供します。Spring Integration はこのメカニズム (バージョン 6.0 以降) を利用して、新しいメッセージが JdbcChannelMessageStore に追加されたときにプッシュ通知を受信できるようにします。この機能を使用する場合、データベーストリガーを定義する必要があります。これは、Spring Integration の JDBC モジュールに含まれる schema-postgresql.sql ファイルのコメントの一部として見つかります。
プッシュ通知は PostgresChannelMessageTableSubscriber クラスを介して受信されます。これにより、サブスクライバーは、特定の region および groupId の新しいメッセージの到着時にコールバックを受信できます。これらの通知は、メッセージが別の JVM に追加されましたが、同じデータベースに追加された場合でも受信されます。PostgresSubscribableChannel 実装では、PostgresChannelMessageTableSubscriber.Subscription 契約を使用して、前述の PostgresChannelMessageTableSubscriber 通知からの通知に対する反応として、ストアからメッセージをプルします。
例: some group のプッシュ通知は次のように受信できます。
@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(dataSource);
messageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
return messageStore;
}
@Bean
public PostgresChannelMessageTableSubscriber subscriber(
@Value("${spring.datasource.url}") String url,
@Value("${spring.datasource.username}") String username,
@Value("${spring.datasource.password}") String password) {
return new PostgresChannelMessageTableSubscriber(() ->
DriverManager.getConnection(url, username, password).unwrap(PgConnection.class));
}
@Bean
public PostgresSubscribableChannel channel(
PostgresChannelMessageTableSubscriber subscriber,
JdbcChannelMessageStore messageStore) {
return new PostgresSubscribableChannel(messageStore, "some group", subscriber);
}取引サポート
バージョン 6.0.5 以降、PostgresSubscribableChannel で PlatformTransactionManager を指定すると、トランザクションでサブスクライバーに通知されます。サブスクライバーで例外が発生すると、トランザクションがロールバックされ、メッセージがメッセージストアに戻されます。デフォルトでは、トランザクションサポートはアクティブ化されていません。
再試行
バージョン 6.0.5 以降、RetryTemplate を PostgresSubscribableChannel に提供することで、再試行ポリシーを指定できます。デフォルトでは、再試行は実行されません。
アクティブな このように排他的な接続が必要になるため、JVM は任意の数のサブスクリプションを登録するために使用できる単一の |