JDBC メッセージストア

Spring Integration は、2 つの JDBC 固有のメッセージストア実装を提供します。JdbcMessageStore は、アグリゲーターおよびクレームチェックパターンでの使用に適しています。JdbcChannelMessageStore 実装は、メッセージチャネル専用の、よりターゲットを絞ったスケーラブルな実装を提供します。

JdbcMessageStore を使用してメッセージチャネルをバックアップできることに注意してください。JdbcChannelMessageStore はその目的に最適化されています。

バージョン 5.0.11、5.1.2 以降、JdbcChannelMessageStore のインデックスが最適化されました。このようなストアに大きなメッセージグループがある場合は、インデックスを変更することができます。さらに、PriorityChannel のインデックスはコメント化されています。これは、JDBC によってサポートされているチャネルを使用しない限り、インデックスが必要ないためです。
OracleChannelMessageStoreQueryProvider を使用する場合、優先チャネルインデックスはクエリのヒントに含まれているため、追加する必要があります。

データベースの初期化

JDBC メッセージストアコンポーネントの使用を開始する前に、適切なオブジェクトでターゲットデータベースをプロビジョニングする必要があります。

Spring Integration には、データベースの初期化に使用できるサンプルスクリプトがいくつか付属しています。spring-integration-jdbc JAR ファイルには、org.springframework.integration.jdbc パッケージ内のスクリプトが含まれています。さまざまな一般的なデータベースプラットフォームの作成例と削除スクリプトの例を提供します。これらのスクリプトを使用する一般的な方法は、Spring JDBC データソース初期化子で参照することです。スクリプトは、サンプルとして、必要なテーブル名と列名の仕様として提供されていることに注意してください。本番環境で使用するために拡張する必要がある場合があります(たとえば、インデックス宣言を追加することによって)。

バージョン 6.2 以降、JdbcMessageStoreJdbcChannelMessageStoreJdbcMetadataStoreDefaultLockRepository は SmartLifecycle を実装し、start() メソッドでそれぞれのテーブルに対して `SELECT COUNT` クエリを実行して、必要なテーブル (指定されたプレフィックスに従って) がターゲットデータベースに存在することを確認します。必要なテーブルが存在しない場合、アプリケーションコンテキストは起動に失敗します。このチェックは setCheckDatabaseOnStart(false) によって無効にできます。

汎用 JDBC メッセージストア

JDBC モジュールは、データベースに裏付けされた Spring Integration MessageStore (クレームチェックパターンで重要)および MessageGroupStore (アグリゲーターなどのステートフルパターンで重要)の実装を提供します。両方のインターフェースは JdbcMessageStore によって実装されており、次の例に示すように、XML でストアインスタンスを構成するためのサポートがあります。

<int-jdbc:message-store id="messageStore" data-source="dataSource"/>

DataSource の代わりに JdbcTemplate を指定できます。

次の例は、他のいくつかのオプション属性を示しています。

<int-jdbc:message-store id="messageStore" data-source="dataSource" table-prefix="MY_INT_"/>

前の例では、ストアによって生成されたクエリ内のテーブル名のプレフィックスを指定しました。テーブル名のプレフィックスのデフォルトは INT_ です。

バッキングメッセージチャネル

JDBC でメッセージチャネルをサポートする場合は、JdbcChannelMessageStore 実装を使用することをお勧めします。メッセージチャネルと組み合わせてのみ機能します。

サポートされているデータベース

JdbcChannelMessageStore は、データベース固有の SQL クエリを使用して、データベースからメッセージを取得します。JdbcChannelMessageStore で ChannelMessageStoreQueryProvider プロパティを設定する必要があります。この channelMessageStoreQueryProvider は、指定した特定のデータベースの SQL クエリを提供します。Spring Integration は、次のリレーショナルデータベースのサポートを提供します。

  • PostgreSQL

  • HSQLDB

  • MySQL

  • Oracle

  • Derby

  • H2

  • SqlServer

  • Sybase

  • DB2

データベースがリストにない場合は、ChannelMessageStoreQueryProvider インターフェースを実装し、独自のカスタムクエリを提供できます。

バージョン 4.0 は、メッセージが同じミリ秒で保存されている場合でも先入れ先出し(FIFO)キューを確保するために、MESSAGE_SEQUENCE 列をテーブルに追加しました。

バージョン 6.2 以降、ChannelMessageStoreQueryProvider は isSingleStatementForPoll フラグを公開し、PostgresChannelMessageStoreQueryProvider は true を返し、ポーリングのクエリは単一の DELETE…​RETURNING ステートメントに基づくようになりました。単一のポーリングステートメントのみがサポートされている場合、JdbcChannelMessageStore は isSingleStatementForPoll オプションを参照し、別の DELETE ステートメントをスキップします。

カスタムメッセージ挿入

バージョン 5.0 以降、ChannelMessageStorePreparedStatementSetter クラスをオーバーロードすることにより、JdbcChannelMessageStore へのメッセージ挿入のカスタム実装を提供できます。これを使用して、異なる列を設定したり、テーブル構造や直列化戦略を変更したりできます。例: byte[] へのデフォルトの直列化の代わりに、その構造を JSON 文字列として保存できます。

次の例では、setValues のデフォルト実装を使用して共通の列を格納し、メッセージペイロードを varchar として格納する動作をオーバーライドします。

public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter {

    @Override
    public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage,
        Object groupId, String region, 	boolean priorityEnabled) throws SQLException {
        // Populate common columns
        super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
        // Store message payload as varchar
        preparedStatement.setString(6, requestMessage.getPayload().toString());
    }
}

通常、キューイングにリレーショナルデータベースを使用することはお勧めしません。代わりに、可能であれば、代わりに JMS または AMQP でバックアップされたチャネルの使用を検討してください。詳細については、次のリソースを参照してください。

データベースをキューとして使用する予定がある場合は、PostgreSQL とその通知メカニズムの使用を検討してください。これについては、後続のセクションで説明します。

同時ポーリング

メッセージチャネルをポーリングする場合、Poller を TaskExecutor 参照で構成するオプションがあります。

ただし、JDBC でバックアップされたメッセージチャネルを使用し、チャネルをポーリングして、複数のスレッドでメッセージストアのトランザクションを実行する予定の場合は、マルチバージョン同時実行制御 [Wikipedia] (英語) (MVCC) をサポートするリレーショナルデータベースを使用する必要があります。そうしないと、ロックが課題になる可能性があり、複数のスレッドを使用する場合のパフォーマンスが期待どおりに実現されない可能性があります。たとえば、その点では Apache Derby に課題があります。

JDBC キューのスループットを向上させ、異なるスレッドがキューから同じ Message をポーリングする場合の課題を回避するには、MVCC をサポートしないデータベースを使用する場合、JdbcChannelMessageStore の usingIdCache プロパティを true に設定することが重要です。次の例は、その方法を示しています。

<bean id="queryProvider"
    class="o.s.i.jdbc.store.channel.PostgresChannelMessageStoreQueryProvider"/>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" />
    <int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())"/>
</int:transaction-synchronization-factory>

<task:executor id="pool" pool-size="10"
    queue-capacity="10" rejection-policy="CALLER_RUNS" />

<bean id="store" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
    <property name="region" value="TX_TIMEOUT"/>
    <property name="usingIdCache" value="true"/>
</bean>

<int:channel id="inputChannel">
    <int:queue message-store="store"/>
</int:channel>

<int:bridge input-channel="inputChannel" output-channel="outputChannel">
    <int:poller fixed-delay="500" receive-timeout="500"
        max-messages-per-poll="1" task-executor="pool">
        <int:transactional propagation="REQUIRED" synchronization-factory="syncFactory"
        isolation="READ_COMMITTED" transaction-manager="transactionManager" />
    </int:poller>
</int:bridge>

<int:channel id="outputChannel" />

優先チャンネル

バージョン 4.0 以降、JdbcChannelMessageStore は PriorityCapableChannelMessageStore を実装し、priorityEnabled オプションを提供し、priority-queue インスタンスの message-store 参照として使用できるようにします。この目的のために、INT_CHANNEL_MESSAGE テーブルには PRIORITY メッセージヘッダーの値を格納する MESSAGE_PRIORITY 列があります。さらに、新しい MESSAGE_SEQUENCE 列により、同じ優先度で同じミリ秒に複数のメッセージが保存されている場合でも、堅牢な先入れ先出し(FIFO)ポーリングメカニズムを実現できます。メッセージは、order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE を使用してデータベースからポーリング(選択)されます。

priorityEnabled オプションはストア全体に適用され、適切な FIFO キューセマンティクスはキューチャネルに対して保持されないため、同じ JdbcChannelMessageStore Bean を優先キューチャネルと非優先キューチャネルに使用することはお勧めしません。ただし、同じ INT_CHANNEL_MESSAGE テーブル(および region も)を両方の JdbcChannelMessageStore 型に使用できます。そのシナリオを構成するには、次の例に示すように、一方のメッセージストア Bean を他方から拡張できます。
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>

<int:channel id="queueChannel">
    <int:queue message-store="channelStore"/>
</int:channel>

<bean id="priorityStore" parent="channelStore">
    <property name="priorityEnabled" value="true"/>
</bean>

<int:channel id="priorityChannel">
    <int:priority-queue message-store="priorityStore"/>
</int:channel>

メッセージストアのパーティション分割

JdbcMessageStore は、同じアプリケーション内のアプリケーションまたはノードのグループのグローバルストアとして使用するのが一般的です。名前の衝突に対する保護を提供し、データベースのメタデータ構成を制御できるようにするために、メッセージストアでは、2 つの方法でテーブルをパーティション分割できます。1 つの方法は、プレフィックスを変更することにより、個別のテーブル名を使用することです(前述のとおり)。もう 1 つの方法は、単一のテーブル内でデータをパーティション分割するために region 名を指定することです。2 番目のアプローチの重要な使用例は、MessageStore が Spring Integration メッセージチャネルをサポートする永続キューを管理している場合です。永続チャネルのメッセージデータは、チャネル名のストアにキー入力されます。その結果、チャネル名がグローバルに一意でない場合、チャネルはそれらに向けられていないデータを取得できます。この危険を回避するために、メッセージストア region を使用して、同じ論理名を持つ異なる物理チャネルのデータを別々に保つことができます。

PostgreSQL: プッシュ通知の受信

PostgreSQL は、データベーステーブル操作時にプッシュ通知を受信するためのリッスンおよび通知フレームワークを提供します。Spring Integration はこのメカニズム (バージョン 6.0 以降) を利用して、新しいメッセージが JdbcChannelMessageStore に追加されたときにプッシュ通知を受信できるようにします。この機能を使用する場合、データベーストリガーを定義する必要があります。これは、Spring Integration の JDBC モジュールに含まれる schema-postgresql.sql ファイルのコメントの一部として見つかります。

プッシュ通知は PostgresChannelMessageTableSubscriber クラスを介して受信されます。これにより、サブスクライバーは、特定の region および groupId の新しいメッセージの到着時にコールバックを受信できます。これらの通知は、メッセージが別の JVM に追加されましたが、同じデータベースに追加された場合でも受信されます。PostgresSubscribableChannel 実装では、PostgresChannelMessageTableSubscriber.Subscription 契約を使用して、前述の PostgresChannelMessageTableSubscriber 通知からの通知に対する反応として、ストアからメッセージをプルします。

例: some group のプッシュ通知は次のように受信できます。

@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
    JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(dataSource);
    messageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
    return messageStore;
}

@Bean
public PostgresChannelMessageTableSubscriber subscriber(
      @Value("${spring.datasource.url}") String url,
      @Value("${spring.datasource.username}") String username,
      @Value("${spring.datasource.password}") String password) {
    return new PostgresChannelMessageTableSubscriber(() ->
        DriverManager.getConnection(url, username, password).unwrap(PgConnection.class));
}

@Bean
public PostgresSubscribableChannel channel(
    PostgresChannelMessageTableSubscriber subscriber,
    JdbcChannelMessageStore messageStore) {
  return new PostgresSubscribableChannel(messageStore, "some group", subscriber);
}

取引サポート

バージョン 6.0.5 以降、PostgresSubscribableChannel で PlatformTransactionManager を指定すると、トランザクションでサブスクライバーに通知されます。サブスクライバーで例外が発生すると、トランザクションがロールバックされ、メッセージがメッセージストアに戻されます。デフォルトでは、トランザクションサポートはアクティブ化されていません。

再試行

バージョン 6.0.5 以降、RetryTemplate を PostgresSubscribableChannel に提供することで、再試行ポリシーを指定できます。デフォルトでは、再試行は実行されません。

アクティブな PostgresChannelMessageTableSubscriber は、アクティブなライフサイクルの間、排他的な JDBC Connection を占有します。この接続がプーリング DataSource から発信されていないことが重要です。このような接続プールは、通常、発行された接続が定義済みのタイムアウトウィンドウ内で閉じられることを想定しています。

このように排他的な接続が必要になるため、JVM は任意の数のサブスクリプションを登録するために使用できる単一の PostgresChannelMessageTableSubscriber のみを実行することもお勧めします。