ストリームのサポート

多くの場合、アプリケーションデータはストリームから取得されます。ストリームへの参照をメッセージペイロードとしてコンシューマーに送信することはお勧めしません。代わりに、入力ストリームから読み取られたデータからメッセージが作成され、メッセージペイロードが出力ストリームに 1 つずつ書き込まれます。

この依存関係をプロジェクトに含める必要があります。

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
    <version>6.3.1</version>
</dependency>
compile "org.springframework.integration:spring-integration-stream:6.3.1"

ストリームからの読み取り

Spring Integration は、ストリーム用に 2 つのアダプターを提供します。ByteStreamReadingMessageSource と CharacterStreamReadingMessageSource はどちらも MessageSource を実装しています。channel-adapter 要素内でこれらのいずれかを構成することにより、ポーリング期間を構成でき、メッセージバスはそれらを自動的に検出してスケジュールできます。バイトストリームバージョンには InputStream が必要であり、文字ストリームバージョンには単一のコンストラクター引数として Reader が必要です。ByteStreamReadingMessageSource は、"bytesPerMessage" プロパティも受け入れて、各 Message に読み取ろうとするバイト数を決定します。デフォルト値は 1024 です。次の例では、それぞれが 2048 バイトを含むメッセージを作成する入力ストリームを作成します。

<bean class="org.springframework.integration.stream.ByteStreamReadingMessageSource">
  <constructor-arg ref="someInputStream"/>
  <property name="bytesPerMessage" value="2048"/>
</bean>

<bean class="org.springframework.integration.stream.CharacterStreamReadingMessageSource">
  <constructor-arg ref="someReader"/>
</bean>

CharacterStreamReadingMessageSource は、リーダーを BufferedReader でラップします(まだない場合)。2 番目のコンストラクター引数でバッファーリーダーが使用するバッファーサイズを設定できます。バージョン 5.0 以降、3 番目のコンストラクター引数(blockToDetectEOF)が CharacterStreamReadingMessageSource の動作を制御します。false (デフォルト)の場合、receive() メソッドはリーダーが ready() かどうかを確認し、そうでない場合は null を返します。この場合、EOF(ファイルの終わり)は検出されません。true の場合、receive() メソッドは、基礎となるストリームでデータが使用可能になるか EOF が検出されるまでブロックします。EOF が検出されると、StreamClosedEvent (アプリケーションイベント)が発行されます。ApplicationListener<StreamClosedEvent> を実装する Bean でこのイベントを使用できます。

EOF 検出を容易にするために、ポーラースレッドは、データが到着するか EOF が検出されるまで receive() メソッドでブロックします。
EOF が検出されると、ポーラーは各ポーリングでイベントを発行し続けます。これを防ぐために、アプリケーションリスナはアダプターを停止できます。イベントは、ポーラースレッドで公開されます。アダプターを停止すると、スレッドが中断されます。アダプターの停止後に割り込み可能なタスクを実行する場合は、stop() を別のスレッドで実行するか、それらのダウンストリームアクティビティに別のスレッドを使用する必要があります。QueueChannel への送信は割り込み可能であるため、リスナーからメッセージを送信する場合は、アダプターを停止する前に送信してください。

これにより、次の 2 つの例に示すように、stdin への「パイピング」またはデータのリダイレクトが容易になります。

cat myfile.txt | java -jar my.jar
java -jar my.jar < foo.txt

このアプローチにより、パイプが閉じられたときにアプリケーションを停止できます。

4 つの便利なファクトリメソッドが利用可能です:

public static final CharacterStreamReadingMessageSource stdin() { ... }

public static final CharacterStreamReadingMessageSource stdin(String charsetName) { ... }

public static final CharacterStreamReadingMessageSource stdinPipe() { ... }

public static final CharacterStreamReadingMessageSource stdinPipe(String charsetName) { ... }

ストリームへの書き込み

ターゲットストリームの場合、ByteStreamWritingMessageHandler または CharacterStreamWritingMessageHandler の 2 つの実装のいずれかを使用できます。それぞれには、単一のコンストラクター引数(バイトストリームの場合は OutputStream、文字ストリームの場合は Writer)が必要であり、オプションの 'bufferSize' を追加する 2 番目のコンストラクターを提供します。これらは両方とも最終的に MessageHandler インターフェースを実装するため、チャンネルアダプターに従って、channel-adapter 構成から参照できます。

<bean class="org.springframework.integration.stream.ByteStreamWritingMessageHandler">
  <constructor-arg ref="someOutputStream"/>
  <constructor-arg value="1024"/>
</bean>

<bean class="org.springframework.integration.stream.CharacterStreamWritingMessageHandler">
  <constructor-arg ref="someWriter"/>
</bean>

ストリーム名前空間のサポート

Spring Integration は、名前空間を定義して、ストリーム関連のチャネルアダプターに必要な構成を削減します。使用するには、次のスキーマの場所が必要です。

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:beans="http://www.springframework.org/schema/beans"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
      https://www.springframework.org/schema/beans/spring-beans.xsd
      http://www.springframework.org/schema/integration/stream
      https://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">

次のコードスニペットは、受信チャネルアダプターを構成するためにサポートされているさまざまな構成オプションを示しています。

<int-stream:stdin-channel-adapter id="adapterWithDefaultCharset"/>

<int-stream:stdin-channel-adapter id="adapterWithProvidedCharset" charset="UTF-8"/>

バージョン 5.0 から、blockToDetectEOF プロパティを設定する detect-eof 属性を設定できます。詳細については、ストリームからの読み取りを参照してください。

送信チャネルアダプターを構成するには、名前空間サポートも使用できます。次の例は、送信チャネルアダプターのさまざまな構成を示しています。

<int-stream:stdout-channel-adapter id="stdoutAdapterWithDefaultCharset"
    channel="testChannel"/>

<int-stream:stdout-channel-adapter id="stdoutAdapterWithProvidedCharset" charset="UTF-8"
    channel="testChannel"/>

<int-stream:stderr-channel-adapter id="stderrAdapter" channel="testChannel"/>

<int-stream:stdout-channel-adapter id="newlineAdapter" append-newline="true"
    channel="testChannel"/>