受信チャネルアダプター

受信チャネルアダプターの主な機能は、SQL SELECT クエリを実行し、結果セットをメッセージに変換することです。メッセージペイロードは結果セット全体(List として表される)であり、リスト内のアイテムの型は行マッピング戦略に依存します。デフォルトの戦略は、クエリ結果の各行に対して Map を返す汎用マッパーです。オプションで、RowMapper インスタンスへの参照を追加することでこれを変更できます(行マッピングの詳細については、Spring JDBC の資料を参照してください)。

SELECT クエリ結果の行を個々のメッセージに変換する場合は、ダウンストリームスプリッターを使用できます。

受信アダプターには、JdbcTemplate インスタンスまたは DataSource への参照も必要です。

メッセージを生成する SELECT ステートメントの他に、アダプターには、次のポーリングで表示されないようにレコードを処理済みとしてマークする UPDATE ステートメントもあります。更新は、元の選択からの ID のリストによってパラメーター化できます。デフォルトでは、これは命名規則によって行われます(id と呼ばれる入力結果セットの列は、id と呼ばれる更新のパラメーターマップのリストに変換されます)。次の例では、更新クエリと DataSource 参照を使用して受信チャネルアダプターを定義しています。

<int-jdbc:inbound-channel-adapter query="select * from item where status=2"
    channel="target" data-source="dataSource"
    update="update item set status=10 where id in (:id)" />
更新クエリのパラメーターは、パラメーター名の前にコロン(:)を付けて指定します(前述の例では、ポーリングされた結果セットの各行に適用される式です)。これは、Spring JDBC の名前付きパラメーター JDBC サポートの標準機能であり、Spring Integration で採用された規約(ポーリングされた結果リストへの射影)と組み合わせたものです。基本的な Spring JDBC の機能は、利用可能な表現を制限しますが(たとえば、ピリオド以外のほとんどの特殊文字は許可されません)、ターゲットは通常、Bean のパスでアドレス指定可能なオブジェクトのリスト(1 つのリストの場合もあります)であるため、これは過度に制限されるものではありません。

パラメーター生成戦略を変更するには、SqlParameterSourceFactory をアダプターに挿入して、デフォルトの動作をオーバーライドできます(アダプターには sql-parameter-source-factory 属性があります)。Spring Integration は ExpressionEvaluatingSqlParameterSourceFactory を提供します。ExpressionEvaluatingSqlParameterSourceFactory は、クエリの結果を #root オブジェクトとして SpEL ベースのパラメーターソースを作成します。(update-per-row が true の場合、ルートオブジェクトは行です)。更新クエリに同じパラメーター名が複数回出現する場合、1 回だけ評価され、その結果がキャッシュされます。

選択クエリにパラメーターソースを使用することもできます。この場合、評価の対象となる「結果」オブジェクトがないため、(パラメーターソースファクトリを使用するのではなく)毎回単一のパラメーターソースが使用されます。バージョン 4.0 以降では、Spring を使用して、SpEL ベースのパラメーターソースを作成できます。以下に例を示します。

<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
	channel="target" data-source="dataSource"
	select-sql-parameter-source="parameterSource" />

<bean id="parameterSource" factory-bean="parameterSourceFactory"
			factory-method="createParameterSourceNoCache">
	<constructor-arg value="" />
</bean>

<bean id="parameterSourceFactory"
		class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
	<property name="parameterExpressions">
		<map>
			<entry key="status" value="@statusBean.which()" />
		</map>
	</property>
</bean>

<bean id="statusBean" class="foo.StatusDetermination" />

各パラメーター式の value は、任意の有効な SpEL 式にすることができます。式評価の #root オブジェクトは、parameterSource Bean で定義されたコンストラクター引数です。すべての評価に対して静的です(前の例では、空の String)。

バージョン 5.0 以降では、ExpressionEvaluatingSqlParameterSourceFactory と sqlParameterTypes を指定して、特定のパラメーターのターゲット SQL 型を指定できます。

次の例は、クエリで使用されているパラメーターの SQL 型を示しています。

<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
    channel="target" data-source="dataSource"
    select-sql-parameter-source="parameterSource" />

<bean id="parameterSource" factory-bean="parameterSourceFactory"
            factory-method="createParameterSourceNoCache">
    <constructor-arg value="" />
</bean>

<bean id="parameterSourceFactory"
        class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
    <property name="sqlParameterTypes">
        <map>
            <entry key="status" value="#{ T(java.sql.Types).BINARY}" />
        </map>
    </property>
</bean>
createParameterSourceNoCache ファクトリメソッドを使用します。それ以外の場合、パラメーターソースは評価の結果をキャッシュします。また、キャッシュが無効になっているため、選択クエリに同じパラメーター名が複数回表示される場合、出現するたびに再評価されることに注意してください。

ポーリングとトランザクション

受信アダプターは、通常の Spring Integration ポーラーを子要素として受け入れます。その結果、(他の用途の中でも)ポーリングの頻度を制御できます。JDBC を使用するためのポーラーの重要な機能は、次の例に示すように、トランザクションでポーリング操作をラップするオプションです。

<int-jdbc:inbound-channel-adapter query="..."
        channel="target" data-source="dataSource" update="...">
    <int:poller fixed-rate="1000">
        <int:transactional/>
    </int:poller>
</int-jdbc:inbound-channel-adapter>
ポーラーを明示的に指定しない場合、デフォルト値が使用されます。Spring Integration で通常のように、トップレベル Bean として定義できます。

上記の例では、データベースは 1000 ミリ秒ごと(または 1 秒間に 1 回)ポーリングされ、更新クエリと選択クエリは両方とも同じトランザクションで実行されます。トランザクションマネージャーの構成は表示されません。ただし、データソースを認識している限り、ポーリングはトランザクションです。一般的な使用例では、ダウンストリームチャネルが直接チャネル(デフォルト)であるため、エンドポイントは同じスレッドで呼び出されるため、同じトランザクションで呼び出されます。そのようにして、それらのいずれかが失敗した場合、トランザクションはロールバックし、入力データは元の状態に戻ります。

max-rows 対 max-messages-per-poll

JDBC 受信チャネルアダプターは、max-rows という属性を定義します。アダプターのポーラーを指定するときに、max-messages-per-poll というプロパティを定義することもできます。これら 2 つの属性は似ていますが、その意味はまったく異なります。

max-messages-per-poll はポーリング間隔ごとにクエリが実行される回数を指定しますが、max-rows は各実行で返される行の数を指定します。

通常の状況では、JDBC 受信チャネルアダプターを使用する際に、ポーラーの max-messages-per-poll プロパティを設定したくないでしょう。デフォルト値は 1 です。これは、JDBC 受信チャネルアダプターの receive() (Javadoc) メソッドがポーリング間隔ごとに 1 回だけ実行されることを意味します。

max-messages-per-poll 属性をより大きな値に設定すると、クエリが何度も連続して実行されます。max-messages-per-poll 属性の詳細については、受信チャネルアダプターの構成を参照してください。

対照的に、max-rows 属性は、0 より大きい場合、receive() メソッドによって作成されたクエリ結果セットから使用される最大行数を指定します。属性が 0 に設定されている場合、結果のメッセージにはすべての行が含まれます。属性のデフォルトは 0 です。

MySQL LIMIT または SQL Server TOP または Oracle の ROWNUM など、ベンダー固有のクエリオプションを介して結果セット制限を使用することをお勧めします。詳細については、特定のベンダーのドキュメントを参照してください。