Java DSL
Spring Integration Java 構成および DSL は、Spring @Configuration
クラスから Spring Integration メッセージフローを構成できる便利なビルダーと流れるような API のセットを提供します。
(Kotlin DSL も参照してください)
Spring Integration の Java DSL は、本質的に Spring Integration のファサードです。DSL は、Spring Framework および Spring Integration からの既存の Java 構成とともに、流れるような Builder
パターンを使用することにより、Spring Integration メッセージフローをアプリケーションに埋め込む簡単な方法を提供します。また、ラムダ(Java 8 で使用可能)を使用およびサポートして、Java 構成をさらに簡素化します。
カフェ [GitHub] (英語) は DSL を使用する良い例です。
DSL は、IntegrationFlowBuilder
の IntegrationFlows
ファクトリによって提示されます。これにより、IntegrationFlow
コンポーネントが生成され、Spring Bean として登録する必要があります(@Bean
アノテーションを使用)。ビルダーパターンは、任意の複雑な構造を、ラムダを引数として受け入れることができるメソッドの階層として表現するために使用されます。
IntegrationFlowBuilder
は、IntegrationFlowBeanPostProcessor
によるアプリケーションコンテキストでの具象 Bean のさらなる解析と登録のために、IntegrationFlow
Bean の統合コンポーネント(MessageChannel
インスタンス、AbstractEndpoint
インスタンスなど)のみを収集します。
Java DSL は Spring Integration クラスを直接使用し、XML の生成と解析をバイパスします。ただし、DSL は XML に加えて構文上の砂糖以上のものを提供します。その最も魅力的な機能の 1 つは、インラインラムダを定義してエンドポイントロジックを実装する機能であり、カスタムロジックを実装するための外部クラスは不要です。ある意味では、Spring Integration の Spring Expression Language(SpEL)およびインラインスクリプトのサポートがこれに対処していますが、ラムダはより簡単で強力です。
次の例は、Spring Integration の Java 構成の使用方法を示しています。
@Configuration
@EnableIntegration
public class MyConfiguration {
@Bean
public AtomicInteger integerSource() {
return new AtomicInteger();
}
@Bean
public IntegrationFlow myFlow() {
return IntegrationFlows.fromSupplier(integerSource()::getAndIncrement,
c -> c.poller(Pollers.fixedRate(100)))
.channel("inputChannel")
.filter((Integer p) -> p > 0)
.transform(Object::toString)
.channel(MessageChannels.queue())
.get();
}
}
上記の構成例の結果、ApplicationContext
の起動後に Spring Integration エンドポイントとメッセージチャネルが作成されます。Java 構成は、XML 構成の置き換えと拡張の両方に使用できます。Java 構成を使用するために、既存の XML 構成をすべて置き換える必要はありません。
DSL の基本
org.springframework.integration.dsl
パッケージには、前述の IntegrationFlowBuilder
API と多くの IntegrationComponentSpec
実装が含まれています。これらは、ビルダーでもあり、具体的なエンドポイントを構成するための流れるような API を提供します。IntegrationFlowBuilder
インフラストラクチャは、チャネル、エンドポイント、ポーラー、チャネルインターセプターなどのメッセージベースのアプリケーションに共通のエンタープライズ統合パターン (英語) (EIP)を提供します。
エンドポイントは、読みやすくするために DSL で動詞として表されます。次のリストには、一般的な DSL メソッド名と関連する EIP エンドポイントが含まれています。
変換→
Transformer
フィルター→
Filter
ハンドル→
ServiceActivator
分割→
Splitter
集約→
Aggregator
ルート→
Router
ブリッジ→
Bridge
概念的には、統合プロセスは、これらのエンドポイントを 1 つ以上のメッセージフローに構成することによって構築されます。EIP は「メッセージフロー」という用語を正式に定義していませんが、よく知られているメッセージングパターンを使用する作業単位として考えると便利です。DSL は、チャネルとそれらの間のエンドポイントの構成を定義する IntegrationFlow
コンポーネントを提供しますが、現在、IntegrationFlow
は、アプリケーションコンテキストで実際の Bean を生成するための構成のロールのみを果たし、実行時には使用されません。ただし、IntegrationFlow
の Bean を Lifecycle
としてオートワイヤーして、この IntegrationFlow
に関連付けられたすべての Spring Integration コンポーネントに委譲されたフロー全体の start()
および stop()
を制御できます。次の例では、IntegrationFlows
ファクトリを使用して、IntegrationFlowBuilder
の EIP メソッドを使用して IntegrationFlow
Bean を定義しています。
@Bean
public IntegrationFlow integerFlow() {
return IntegrationFlows.from("input")
.<String, Integer>transform(Integer::parseInt)
.get();
}
transform
メソッドは、メッセージペイロードを操作するためのエンドポイント引数としてラムダを受け入れます。このメソッドの本当の引数は GenericTransformer<S, T>
です。ここでは、提供されているトランスフォーマー(ObjectToJsonTransformer
、FileToStringTransformer
など)のいずれかを使用できます。
内部では、IntegrationFlowBuilder
は MessageHandler
とそのエンドポイントを、それぞれ MessageTransformingHandler
と ConsumerEndpointFactoryBean
で認識します。別の例を考えてみましょう:
@Bean
public IntegrationFlow myFlow() {
return IntegrationFlows.from("input")
.filter("World"::equals)
.transform("Hello "::concat)
.handle(System.out::println)
.get();
}
上記の例は、Filter → Transformer → Service Activator
のシーケンスを構成します。フローは " 'one way'" です。つまり、応答メッセージは提供せず、ペイロードを STDOUT に出力するだけです。エンドポイントは、直接チャネルを使用して自動的に相互接続されます。
ラムダと Message<?> 引数 EIP メソッドでラムダを使用する場合、「入力」引数は通常、メッセージペイロードです。メッセージ全体にアクセスする場合は、
ラムダは引数型を保持せず、フレームワークはペイロードを 代わりに、次を使用します。
|
Bean 定義のオーバーライド Java DSL は、フロー定義でインラインで定義されたオブジェクトの Bean を登録でき、既存の注入された Bean を再利用できます。インラインオブジェクトと既存の Bean 定義に同じ Bean 名が定義されている場合、そのような構成が間違っていることを示す |
メッセージチャンネル
EIP メソッドを備えた IntegrationFlowBuilder
に加えて、Java DSL は MessageChannel
インスタンスを構成するための流れるような API を提供します。この目的のために、MessageChannels
ビルダーファクトリが提供されています。次の例は、その使用方法を示しています。
@Bean
public MessageChannel priorityChannel() {
return MessageChannels.priority(this.mongoDbChannelMessageStore, "priorityGroup")
.interceptor(wireTap())
.get();
}
XML 構成で input-channel
/output-channel
ペアを接続するのと同様に、同じ MessageChannels
ビルダーファクトリを IntegrationFlowBuilder
から接続エンドポイントへの channel()
EIP メソッドで使用できます。デフォルトでは、エンドポイントは DirectChannel
インスタンスに接続されており、Bean 名は次のパターンに基づいています: [IntegrationFlow.beanName].channel#[channelNameIndex]
。このルールは、インライン MessageChannels
ビルダーファクトリの使用によって生成された名前のないチャネルにも適用されます。ただし、すべての MessageChannels
メソッドには、MessageChannel
インスタンスの Bean 名を設定するために使用できる channelId
を認識するバリアントがあります。MessageChannel
参照と beanName
は、Bean メソッドの呼び出しとして使用できます。次の例は、channel()
EIP メソッドを使用するための可能な方法を示しています。
@Bean
public MessageChannel queueChannel() {
return MessageChannels.queue().get();
}
@Bean
public MessageChannel publishSubscribe() {
return MessageChannels.publishSubscribe().get();
}
@Bean
public IntegrationFlow channelFlow() {
return IntegrationFlows.from("input")
.fixedSubscriberChannel()
.channel("queueChannel")
.channel(publishSubscribe())
.channel(MessageChannels.executor("executorChannel", this.taskExecutor))
.channel("output")
.get();
}
from("input")
は、「「「入力」ID でMessageChannel
を見つけて使用するか、作成します」」という意味です。fixedSubscriberChannel()
はFixedSubscriberChannel
のインスタンスを生成し、channelFlow.channel#0
という名前で登録します。channel("queueChannel")
も同じように機能しますが、既存のqueueChannel
Bean を使用します。channel(publishSubscribe())
は Bean メソッドのリファレンスです。channel(MessageChannels.executor("executorChannel", this.taskExecutor))
は、IntegrationComponentSpec
をExecutorChannel
に公開し、それをexecutorChannel
として登録するIntegrationFlowBuilder
です。channel("output")
は、この名前の Bean がすでに存在しない限り、DirectChannel
Bean を名前としてoutput
に登録します。
メモ: 上記の IntegrationFlow
定義は有効であり、そのチャネルはすべて BridgeHandler
インスタンスを持つエンドポイントに適用されます。
異なる IntegrationFlow インスタンスから MessageChannels ファクトリを介して同じインラインチャネル定義を使用するよう注意してください。DSL パーサーが存在しないオブジェクトを Bean として登録しても、異なる IntegrationFlow コンテナーから同じオブジェクト(MessageChannel )を判別できません。次の例は間違っています。 |
@Bean
public IntegrationFlow startFlow() {
return IntegrationFlows.from("input")
.transform(...)
.channel(MessageChannels.queue("queueChannel"))
.get();
}
@Bean
public IntegrationFlow endFlow() {
return IntegrationFlows.from(MessageChannels.queue("queueChannel"))
.handle(...)
.get();
}
その悪い例の結果は、次の例外です。
Caused by: java.lang.IllegalStateException:
Could not register object [queueChannel] under bean name 'queueChannel':
there is already object [queueChannel] bound
at o.s.b.f.s.DefaultSingletonBeanRegistry.registerSingleton(DefaultSingletonBeanRegistry.java:129)
動作させるには、そのチャネルに対して @Bean
を宣言し、異なる IntegrationFlow
インスタンスから Bean メソッドを使用する必要があります。
ポーラー
Spring Integration は、AbstractPollingEndpoint
実装用に PollerMetadata
を構成できる Fluent API も提供します。次の例に示すように、Pollers
ビルダーファクトリを使用して、一般的な Bean 定義または IntegrationFlowBuilder
EIP メソッドから作成された定義を構成できます。
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
return Pollers.fixedRate(500)
.errorChannel("myErrors");
}
詳細については、Javadoc の Pollers
(Javadoc) および PollerSpec
(Javadoc) を参照してください。
DSL を使用して PollerSpec を @Bean として作成する場合、Bean 定義の get() メソッドを呼び出さないでください。PollerSpec は、仕様から PollerMetadata オブジェクトを生成し、そのすべてのプロパティを初期化する FactoryBean です。 |
reactive()
エンドポイント
バージョン 5.5 以降、ConsumerEndpointSpec
は、オプションのカスタマイザー Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>
を備えた reactive()
構成プロパティを提供します。このオプションは、IntegrationReactiveUtils.messageChannelToFlux()
を介して Flux
に変換される入力チャネル型とは関係なく、ターゲットエンドポイントを ReactiveStreamsConsumer
インスタンスとして構成します。提供された関数は、Flux.transform()
オペレーターから使用され、入力チャネルからのリアクティブストリームソースをカスタマイズします(publishOn()
、log()
、doOnNext()
など)。
次の例は、最終的なサブスクライバーとプロデューサーに関係なく、公開スレッドを入力チャネルからその DirectChannel
に変更する方法を示しています。
@Bean
public IntegrationFlow reactiveEndpointFlow() {
return IntegrationFlows
.from("inputChannel")
.<String, Integer>transform(Integer::parseInt,
e -> e.reactive(flux -> flux.publishOn(Schedulers.parallel())))
.get();
}
詳細については、Reactive Streams サポートを参照してください。
DSL およびエンドポイントの構成
すべての IntegrationFlowBuilder
EIP メソッドには、AbstractEndpoint
インスタンスのオプションを提供するためにラムダパラメーターを適用するバリアントがあります: SmartLifecycle
、PollerMetadata
、request-handler-advice-chain
など。それぞれに汎用引数があるため、次の例に示すように、コンテキストでエンドポイントとその MessageHandler
を構成できます。
@Bean
public IntegrationFlow flow2() {
return IntegrationFlows.from(this.inputChannel)
.transform(new PayloadSerializingTransformer(),
c -> c.autoStartup(false).id("payloadSerializingTransformer"))
.transform((Integer p) -> p * 2, c -> c.advice(this.expressionAdvice()))
.get();
}
さらに、EndpointSpec
は id()
メソッドを提供し、エンドポイント Bean を、生成された名前ではなく、指定された Bean 名で登録できるようにします。
MessageHandler
が Bean として参照されている場合、.advice()
メソッドが DSL 定義に存在する場合、既存の adviceChain
構成はオーバーライドされます。
@Bean
public TcpOutboundGateway tcpOut() {
TcpOutboundGateway gateway = new TcpOutboundGateway();
gateway.setConnectionFactory(cf());
gateway.setAdviceChain(Collections.singletonList(fooAdvice()));
return gateway;
}
@Bean
public IntegrationFlow clientTcpFlow() {
return f -> f
.handle(tcpOut(), e -> e.advice(testAdvice()))
.transform(Transformers.objectToString());
}
つまり、それらはマージされず、testAdvice()
Bean のみがこの場合に使用されます。
Transformers
DSL API は、.transform()
EIP メソッド内でインラインターゲットオブジェクト定義として使用される便利で流れるような Transformers
ファクトリを提供します。次の例は、その使用方法を示しています。
@Bean
public IntegrationFlow transformFlow() {
return IntegrationFlows.from("input")
.transform(Transformers.fromJson(MyPojo.class))
.transform(Transformers.serializer())
.get();
}
setter を使用した不便なコーディングを回避し、フロー定義をより簡単にします。Transformers
を使用してターゲット Transformer
インスタンスを @Bean
インスタンスとして宣言し、再び IntegrationFlow
定義から Bean メソッドとして使用できることに注意してください。それでも、DSL オブジェクトがまだ Bean として定義されていない場合、DSL パーサーはインラインオブジェクトの Bean 宣言を処理します。
詳細およびサポートされるファクトリメソッドについては、Javadoc の Transformers (Javadoc) を参照してください。
ラムダと Message<?>
引数も参照してください。
受信チャネルアダプター
通常、メッセージフローは受信チャネルアダプター(<int-jdbc:inbound-channel-adapter>
など)から始まります。アダプターは <poller>
で構成されており、MessageSource<?>
に定期的にメッセージを生成するように要求します。Java DSL では、MessageSource<?>
から IntegrationFlow
を開始することもできます。このために、IntegrationFlows
ビルダーファクトリは、オーバーロードされた IntegrationFlows.from(MessageSource<?> messageSource)
メソッドを提供します。MessageSource<?>
を Bean として構成し、そのメソッドの引数として提供できます。IntegrationFlows.from()
の 2 番目のパラメーターは、SourcePollingChannelAdapter
のオプション(PollerMetadata
や SmartLifecycle
など)を提供できる Consumer<SourcePollingChannelAdapterSpec>
ラムダです。次の例は、流れるような API とラムダを使用して IntegrationFlow
を作成する方法を示しています。
@Bean
public MessageSource<Object> jdbcMessageSource() {
return new JdbcPollingChannelAdapter(this.dataSource, "SELECT * FROM something");
}
@Bean
public IntegrationFlow pollingFlow() {
return IntegrationFlows.from(jdbcMessageSource(),
c -> c.poller(Pollers.fixedRate(100).maxMessagesPerPoll(1)))
.transform(Transformers.toJson())
.channel("furtherProcessChannel")
.get();
}
Message
オブジェクトを直接構築する必要がない場合は、java.util.function.Supplier
に基づく IntegrationFlows.fromSupplier()
バリアントを使用できます。Supplier.get()
の結果は、自動的に Message
にラップされます(まだ Message
でない場合)。
メッセージルーター
Spring Integration は、次のような特殊なルーター型をネイティブで提供します。
HeaderValueRouter
PayloadTypeRouter
ExceptionTypeRouter
RecipientListRouter
XPathRouter
他の多くの DSL IntegrationFlowBuilder
EIP メソッドと同様に、route()
メソッドは、任意の AbstractMessageRouter
実装、または便宜上、SpEL 式としての String
、ref
-method
ペアを適用できます。さらに、ラムダを使用して route()
を構成し、Consumer<RouterSpec<MethodInvokingRouter>>
にラムダを使用することができます。次の例に示すように、Fluent API は、channelMapping(String key, String channelName)
ペアなどの AbstractMappingMessageRouter
オプションも提供します。
@Bean
public IntegrationFlow routeFlowByLambda() {
return IntegrationFlows.from("routerInput")
.<Integer, Boolean>route(p -> p % 2 == 0,
m -> m.suffix("Channel")
.channelMapping(true, "even")
.channelMapping(false, "odd")
)
.get();
}
次の例は、単純な式ベースのルーターを示しています。
@Bean
public IntegrationFlow routeFlowByExpression() {
return IntegrationFlows.from("routerInput")
.route("headers['destChannel']")
.get();
}
次の例に示すように、routeToRecipients()
メソッドは Consumer<RecipientListRouterSpec>
を取ります。
@Bean
public IntegrationFlow recipientListFlow() {
return IntegrationFlows.from("recipientListInput")
.<String, String>transform(p -> p.replaceFirst("Payload", ""))
.routeToRecipients(r -> r
.recipient("thing1-channel", "'thing1' == payload")
.recipientMessageSelector("thing2-channel", m ->
m.getHeaders().containsKey("recipient")
&& (boolean) m.getHeaders().get("recipient"))
.recipientFlow("'thing1' == payload or 'thing2' == payload or 'thing3' == payload",
f -> f.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("recipientListSubFlow1Result")))
.recipientFlow((String p) -> p.startsWith("thing3"),
f -> f.transform("Hello "::concat)
.channel(c -> c.queue("recipientListSubFlow2Result")))
.recipientFlow(new FunctionExpression<Message<?>>(m ->
"thing3".equals(m.getPayload())),
f -> f.channel(c -> c.queue("recipientListSubFlow3Result")))
.defaultOutputToParentFlow())
.get();
}
.routeToRecipients()
定義の .defaultOutputToParentFlow()
を使用すると、ルーターの defaultOutput
をゲートウェイとして設定して、メインフロー内の一致しないメッセージのプロセスを続行できます。
ラムダと Message<?>
引数も参照してください。
スプリッター
スプリッターを作成するには、split()
EIP メソッドを使用します。デフォルトでは、ペイロードが Iterable
、Iterator
、Array
、Stream
、リアクティブ Publisher
の場合、split()
メソッドは各アイテムを個別のメッセージとして出力します。ラムダ、SpEL 式、AbstractMessageSplitter
実装を受け入れます。または、パラメーターなしで使用して DefaultMessageSplitter
を提供できます。次の例は、ラムダを提供して split()
メソッドを使用する方法を示しています。
@Bean
public IntegrationFlow splitFlow() {
return IntegrationFlows.from("splitInput")
.split(s -> s.applySequence(false).delimiters(","))
.channel(MessageChannels.executor(taskExecutor()))
.get();
}
上記の例では、コンマ区切りの String
を含むメッセージを分割するスプリッターを作成します。
ラムダと Message<?>
引数も参照してください。
アグリゲーターとリシーケンサー
Aggregator
は概念的に Splitter
の反対です。個々のメッセージのシーケンスを単一のメッセージに集約し、必然的に複雑になります。デフォルトでは、アグリゲーターは受信メッセージからのペイロードのコレクションを含むメッセージを返します。同じルールが Resequencer
に適用されます。次の例は、スプリッターアグリゲーターパターンの標準的な例を示しています。
@Bean
public IntegrationFlow splitAggregateFlow() {
return IntegrationFlows.from("splitAggregateInput")
.split()
.channel(MessageChannels.executor(this.taskExecutor()))
.resequence()
.aggregate()
.get();
}
split()
メソッドは、リストを個々のメッセージに分割し、ExecutorChannel
に送信します。resequence()
メソッドは、メッセージヘッダーにあるシーケンスの詳細でメッセージを並べ替えます。aggregate()
メソッドは、これらのメッセージを収集します。
ただし、リリース戦略と相関戦略などを指定することにより、デフォルトの動作を変更できます。次の例を考えてみましょう。
.aggregate(a ->
a.correlationStrategy(m -> m.getHeaders().get("myCorrelationKey"))
.releaseStrategy(g -> g.size() > 10)
.messageStore(messageStore()))
上記の例は、myCorrelationKey
ヘッダーを持つメッセージを関連付け、少なくとも 10 が蓄積されるとメッセージを解放します。
同様のラムダ構成が resequence()
EIP メソッドに提供されています。
サービスアクティベーターと .handle()
メソッド
.handle()
EIP メソッドのゴールは、MessageHandler
実装または POJO 上のメソッドを呼び出すことです。別のオプションは、ラムダ式を使用して「アクティビティ」を定義することです。その結果、一般的な GenericHandler<P>
関数インターフェースを導入しました。handle
メソッドには、P payload
と MessageHeaders headers
(バージョン 5.1 以降)の 2 つの引数が必要です。それができたら、次のようにフローを定義できます。
@Bean
public IntegrationFlow myFlow() {
return IntegrationFlows.from("flow3Input")
.<Integer>handle((p, h) -> p * 2)
.get();
}
上記の例は、受け取った整数を 2 倍にします。
ただし、Spring Integration の主なゴールの 1 つは、メッセージペイロードからメッセージハンドラーのターゲット引数へのランタイム型変換を介した loose coupling
です。Java はラムダクラスのジェネリクス型解決をサポートしていないため、ほとんどの EIP メソッドと LambdaMessageProcessor
に追加の payloadType
引数を使用した回避策を導入しました。そうすることで、ハード変換作業を Spring の ConversionService
に委譲します。payloadType
は、提供された type
とリクエストされたメッセージを使用してメソッド引数をターゲットにします。次の例は、結果の IntegrationFlow
がどのようになるかを示しています。
@Bean
public IntegrationFlow integerFlow() {
return IntegrationFlows.from("input")
.<byte[], String>transform(p - > new String(p, "UTF-8"))
.handle(Integer.class, (p, h) -> p * 2)
.get();
}
ConversionService
内に BytesToIntegerConverter
を登録して、追加の .transform()
を削除することもできます。
@Bean
@IntegrationConverter
public BytesToIntegerConverter bytesToIntegerConverter() {
return new BytesToIntegerConverter();
}
@Bean
public IntegrationFlow integerFlow() {
return IntegrationFlows.from("input")
.handle(Integer.class, (p, h) -> p * 2)
.get();
}
ラムダと Message<?>
引数も参照してください。
オペレーター gateway()
IntegrationFlow
定義の gateway()
オペレーターは、特別なサービスアクティベーターの実装であり、入力チャネルを介して他のエンドポイントまたは統合フローを呼び出し、応答を待ちます。技術的には、<chain>
定義(チェーン内からチェーンを呼び出すを参照)のネストされた <gateway>
コンポーネントと同じロールを果たし、フローをよりクリーンでわかりやすくします。論理的に、ビジネスの観点から、ターゲット統合ソリューションの異なる部分の間で機能の分散と再利用を可能にするメッセージングゲートウェイです(メッセージングゲートウェイを参照)。この演算子には、さまざまなゴールに対していくつかのオーバーロードがあります。
gateway(String requestChannel)
は、名前でエンドポイントの入力チャネルにメッセージを送信します。gateway(MessageChannel requestChannel)
は、直接注入によってエンドポイントの入力チャネルにメッセージを送信します。gateway(IntegrationFlow flow)
は、提供されたIntegrationFlow
の入力チャネルにメッセージを送信します。
これらはすべて、ターゲット GatewayMessageHandler
とそれぞれの AbstractEndpoint
を構成するための 2 番目の Consumer<GatewayEndpointSpec>
引数を持つバリアントを持っています。また、IntegrationFlow
ベースのメソッドを使用すると、既存の IntegrationFlow
Bean を呼び出すか、IntegrationFlow
関数インターフェースのインプレースラムダを介してフローをサブフローとして宣言するか、private
メソッドクリーナーコードスタイルで抽出することができます。
@Bean
IntegrationFlow someFlow() {
return IntegrationFlows
.from(...)
.gateway(subFlow())
.handle(...)
.get();
}
private static IntegrationFlow subFlow() {
return f -> f
.scatterGather(s -> s.recipientFlow(...),
g -> g.outputProcessor(MessageGroup::getOne))
}
ダウンストリームフローが常に応答を返すとは限らない場合は、requestTimeout を 0 に設定して、呼び出し元のスレッドが無期限にハングしないようにする必要があります。その場合、フローはその時点で終了し、スレッドはさらなる作業のために解放されます。 |
オペレーター log()
便宜上、Spring Integration フロー(<logging-channel-adapter>
)を介したメッセージの移動をログに記録するために、log()
オペレーターが示されています。内部的には、LoggingHandler
をサブスクライバーとする WireTap
ChannelInterceptor
で表されます。受信メッセージを次のエンドポイントまたは現在のチャネルに記録するロールを果たします。次の例は、LoggingHandler
の使用方法を示しています。
.filter(...)
.log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
.route(...)
上記の例では、id
ヘッダーは、フィルターを通過してルーティングされる前のメッセージについてのみ、ERROR
レベルで test.category
に記録されます。
この演算子がフローの最後で使用される場合、これは一方向ハンドラーであり、フローは終了します。応答生成フローとして作成するには、log()
の後に単純な bridge()
を使用するか、バージョン 5.1 から開始して、代わりに logAndReply()
演算子を使用できます。logAndReply
は、フローの最後でのみ使用できます。
オペレーター intercept()
バージョン 5.3 以降、intercept()
オペレーターを使用すると、フロー内の現在の MessageChannel
に 1 つ以上の ChannelInterceptor
インスタンスを登録できます。これは、MessageChannels
API を介して明示的な MessageChannel
を作成する代わりになります。次の例では、MessageSelectingInterceptor
を使用して、例外を含む特定のメッセージを拒否します。
.transform(...)
.intercept(new MessageSelectingInterceptor(m -> m.getPayload().isValid()))
.handle(...)
MessageChannelSpec.wireTap()
Spring Integration には、.wireTap()
fluent API MessageChannelSpec
ビルダーが含まれています。次の例は、wireTap
メソッドを使用して入力を記録する方法を示しています。
@Bean
public QueueChannelSpec myChannel() {
return MessageChannels.queue()
.wireTap("loggingFlow.input");
}
@Bean
public IntegrationFlow loggingFlow() {
return f -> f.log();
}
|
現在の MessageChannel
が InterceptableChannel
を実装しない場合、暗黙の DirectChannel
と BridgeHandler
が IntegrationFlow
に注入され、WireTap
がこの新しい DirectChannel
に追加されます。次の例には、チャネル宣言がありません。
.handle(...)
.log()
}
上記の例(およびチャネルが宣言されていない場合)では、暗黙の DirectChannel
が IntegrationFlow
の現在の位置に挿入され、現在構成されている ServiceActivatingHandler
の出力チャネルとして使用されます(前述の .handle()
から)。
メッセージフローの操作
IntegrationFlowBuilder
は、メッセージフローに接続された統合コンポーネントを生成するためのトップレベル API を提供します。単一のフローで統合を達成できる場合(多くの場合)、これは便利です。または、MessageChannel
インスタンスを介して IntegrationFlow
インスタンスを結合できます。
デフォルトでは、MessageFlow
は Spring Integration の用語では「チェーン」として動作します。つまり、エンドポイントは、DirectChannel
インスタンスによって自動的かつ暗黙的に接続されます。メッセージフローは実際にはチェーンとして構築されておらず、はるかに柔軟性があります。例: inputChannel
名がわかっている場合(つまり、明示的に定義している場合)、フロー内の任意のコンポーネントにメッセージを送信できます。フロー内で外部定義されたチャネルを参照して、直接チャネルの代わりにチャネルアダプターの使用を許可することもできます(リモートトランスポートプロトコル、ファイル I/O などを有効にするため)。そのため、DSL は Spring Integration chain
要素をサポートしません。この場合、多くの価値を追加しないからです。
Spring Integration Java DSL は、他の構成オプションと同じ Bean 定義モデルを生成し、既存の Spring Framework @Configuration
インフラストラクチャに基づいているため、XML 定義と併用し、Spring Integration メッセージングアノテーション構成と接続できます。
ラムダを使用して、直接 IntegrationFlow
インスタンスを定義することもできます。次の例は、その方法を示しています。
@Bean
public IntegrationFlow lambdaFlow() {
return f -> f.filter("World"::equals)
.transform("Hello "::concat)
.handle(System.out::println);
}
この定義の結果は、暗黙的な直接チャネルで接続された統合コンポーネントの同じセットです。ここでの唯一の制限は、このフローが名前付き直接チャネル - lambdaFlow.input
で開始されることです。また、Lambda フローは MessageSource
または MessageProducer
から開始できません。
バージョン 5.1 以降、この種類の IntegrationFlow
はプロキシにラップされ、ライフサイクル制御を公開し、内部で関連付けられた StandardIntegrationFlow
の inputChannel
へのアクセスを提供します。
バージョン 5.0.6 以降、IntegrationFlow
のコンポーネントに対して生成された Bean 名には、フロー Bean の後に接頭辞としてドット(.
)が続きます。例: 前のサンプルの .transform("Hello "::concat)
の ConsumerEndpointFactoryBean
は、Bean 名が lambdaFlow.o.s.i.config.ConsumerEndpointFactoryBean#0
になります。(o.s.i
は、ページに収まるように org.springframework.integration
から短縮されています)そのエンドポイントの Transformer
実装 Bean は、lambdaFlow.transformer#0
(バージョン 5.1 で始まる)の Bean 名を持ち、MethodInvokingTransformer
クラスの完全修飾名の代わりに、そのコンポーネント型使用されています。Bean 名をフロー内で生成する必要がある場合、すべての NamedComponent
に同じパターンが適用されます。これらの生成された Bean 名には、ログの解析や分析ツールでのコンポーネントのグループ化などの目的で、また実行時に統合フローを同時に登録する際の競合状態を回避するために、フロー ID が付加されます。詳細については、動的およびランタイム統合フローを参照してください。
FunctionExpression
ラムダと generics
を使用できるようにするため、FunctionExpression
クラス(SpEL の Expression
インターフェースの実装)を導入しました。Function<T, R>
オプションは、Core Spring Integration から暗黙的な Strategy
バリアントが存在する場合、expression
オプションとともに DSL コンポーネントに提供されます。次の例は、関数式の使用方法を示しています。
.enrich(e -> e.requestChannel("enrichChannel")
.requestPayload(Message::getPayload)
.propertyFunction("date", m -> new Date()))
FunctionExpression
は、SpelExpression
で行われているように、ランタイム型変換もサポートしています。
サブフローのサポート
if…else
および publish-subscribe
コンポーネントの一部には、サブフローを使用してロジックまたはマッピングを指定する機能があります。次の例が示すように、最も単純なサンプルは .publishSubscribeChannel()
です。
@Bean
public IntegrationFlow subscribersFlow() {
return flow -> flow
.publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
.subscribe(f -> f
.<Integer>handle((p, h) -> p / 2)
.channel(c -> c.queue("subscriber1Results")))
.subscribe(f -> f
.<Integer>handle((p, h) -> p * 2)
.channel(c -> c.queue("subscriber2Results"))))
.<Integer>handle((p, h) -> p * 3)
.channel(c -> c.queue("subscriber3Results"));
}
個別の IntegrationFlow
@Bean
定義を使用しても同じ結果を得ることができますが、論理構成のサブフロースタイルが役立つことを願っています。その結果、コードが短く(そして読みやすく)なることがわかりました。
バージョン 5.3 以降、BroadcastCapableChannel
ベースの publishSubscribeChannel()
実装が提供され、ブローカーが支援するメッセージチャネルでサブフローサブスクライバーを構成します。たとえば、Jms.publishSubscribeChannel()
のサブフローとして複数のサブスクライバーを構成できるようになりました。
@Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel() {
return Jms.publishSubscribeChannel(jmsConnectionFactory())
.destination("pubsub")
.get();
}
@Bean
public IntegrationFlow pubSubFlow() {
return f -> f
.publishSubscribeChannel(jmsPublishSubscribeChannel(),
pubsub -> pubsub
.subscribe(subFlow -> subFlow
.channel(c -> c.queue("jmsPubSubBridgeChannel1")))
.subscribe(subFlow -> subFlow
.channel(c -> c.queue("jmsPubSubBridgeChannel2"))));
}
@Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel(ConnectionFactory jmsConnectionFactory) {
return (BroadcastCapableChannel) Jms.publishSubscribeChannel(jmsConnectionFactory)
.destination("pubsub")
.get();
}
同様の publish-subscribe
サブフロー構成は、.routeToRecipients()
メソッドを提供します。
別の例は、.filter()
メソッドで .discardChannel()
の代わりに .discardFlow()
を使用しています。
.route()
には特別な注意が必要です。次の例を考えてみましょう。
@Bean
public IntegrationFlow routeFlow() {
return f -> f
.<Integer, Boolean>route(p -> p % 2 == 0,
m -> m.channelMapping("true", "evenChannel")
.subFlowMapping("false", sf ->
sf.<Integer>handle((p, h) -> p * 3)))
.transform(Object::toString)
.channel(c -> c.queue("oddChannel"));
}
.channelMapping()
は、通常の Router
マッピングと同じように機能し続けますが、.subFlowMapping()
はそのサブフローをメインフローに結び付けました。つまり、ルーターのサブフローは .route()
の後にメインフローに戻ります。
場合によっては、
Caused by: org.springframework.beans.factory.BeanCreationException: The 'currentComponent' (org.springframework.integration.router.MethodInvokingRouter@7965a51c) is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow. サブフローをラムダとして構成すると、フレームワークがサブフローとのリクエストと応答の対話を処理し、ゲートウェイは必要ありません。 |
サブフローは任意の深さにネストできますが、ネストすることはお勧めしません。実際、ルーターの場合でも、フロー内に複雑なサブフローを追加すると、すぐにスパゲッティのプレートのように見え始め、人間が解析するのが難しくなります。
DSL がサブフロー構成をサポートする場合、構成するコンポーネントに通常チャネルが必要であり、そのサブフローが
フレームワークは、 |
プロトコルアダプターの使用
これまでに示したすべての例は、Spring Integration プログラミングモデルを使用して、DSL がメッセージングアーキテクチャをサポートする方法を示しています。ただし、実際の統合はまだ行われていません。これを行うには、HTTP、JMS、AMQP、TCP、JDBC、FTP、SMTP などを介してリモートリソースにアクセスするか、ローカルファイルシステムにアクセスする必要があります。Spring Integration は、これらすべてをサポートしています。理想的には、DSL はそれらすべてに対してファーストクラスのサポートを提供する必要がありますが、これらすべてを実装し、Spring Integration に新しいアダプターが追加されるのに遅れを取らないようにするのは困難な作業です。DSL が Spring Integration に継続的に追いつくことが期待されます。
そのため、プロトコル固有のメッセージングをシームレスに定義するための高レベル API を提供します。これは、ファクトリパターンとビルダーパターン、およびラムダを使用して行います。ファクトリクラスは、具体的なプロトコル固有の Spring Integration モジュールのコンポーネントの XML 名前空間と同じロールを果たすため、「名前空間ファクトリ」と考えることができます。現在、Spring Integration JavaDSL は Amqp
、Feed
、Jms
、Files
、(S)Ftp
、Http
、JPA
、MongoDb
、TCP/UDP
、Mail
、WebFlux
、Scripts
名前空間ファクトリをサポートしています。次の例は、そのうちの 3 つ(Amqp
、Jms
、Mail
)の使用方法を示しています。
@Bean
public IntegrationFlow amqpFlow() {
return IntegrationFlows.from(Amqp.inboundGateway(this.rabbitConnectionFactory, queue()))
.transform("hello "::concat)
.transform(String.class, String::toUpperCase)
.get();
}
@Bean
public IntegrationFlow jmsOutboundGatewayFlow() {
return IntegrationFlows.from("jmsOutboundGatewayChannel")
.handle(Jms.outboundGateway(this.jmsConnectionFactory)
.replyContainer(c ->
c.concurrentConsumers(3)
.sessionTransacted(true))
.requestDestination("jmsPipelineTest"))
.get();
}
@Bean
public IntegrationFlow sendMailFlow() {
return IntegrationFlows.from("sendMailChannel")
.handle(Mail.outboundAdapter("localhost")
.port(smtpPort)
.credentials("user", "pw")
.protocol("smtp")
.javaMailProperties(p -> p.put("mail.debug", "true")),
e -> e.id("sendMailEndpoint"))
.get();
}
上記の例は、「名前空間ファクトリ」をインラインアダプター宣言として使用する方法を示しています。ただし、@Bean
定義からこれらを使用して、IntegrationFlow
メソッドチェーンをより読みやすくすることができます。
他の人に努力する前に、これらの名前空間ファクトリに関するコミュニティのフィードバックを募集しています。また、次にサポートする必要のあるアダプターとゲートウェイの優先順位付けに関する意見も歓迎します。 |
このリファレンスマニュアル全体のプロトコル固有の章で、さらに多くの Java DSL サンプルを見つけることができます。
他のすべてのプロトコルチャネルアダプターは、次の例に示すように、汎用 Bean として構成され、IntegrationFlow
に接続されます。
@Bean
public QueueChannelSpec wrongMessagesChannel() {
return MessageChannels
.queue()
.wireTap("wrongMessagesWireTapChannel");
}
@Bean
public IntegrationFlow xpathFlow(MessageChannel wrongMessagesChannel) {
return IntegrationFlows.from("inputChannel")
.filter(new StringValueTestXPathMessageSelector("namespace-uri(/*)", "my:namespace"),
e -> e.discardChannel(wrongMessagesChannel))
.log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
.route(xpathRouter(wrongMessagesChannel))
.get();
}
@Bean
public AbstractMappingMessageRouter xpathRouter(MessageChannel wrongMessagesChannel) {
XPathRouter router = new XPathRouter("local-name(/*)");
router.setEvaluateAsString(true);
router.setResolutionRequired(false);
router.setDefaultOutputChannel(wrongMessagesChannel);
router.setChannelMapping("Tags", "splittingChannel");
router.setChannelMapping("Tag", "receivedChannel");
return router;
}
IntegrationFlowAdapter
次の例に示すように、IntegrationFlow
インターフェースを直接実装し、スキャン用のコンポーネントとして指定できます。
@Component
public class MyFlow implements IntegrationFlow {
@Override
public void configure(IntegrationFlowDefinition<?> f) {
f.<String, String>transform(String::toUpperCase);
}
}
IntegrationFlowBeanPostProcessor
によってピックアップされ、アプリケーションコンテキストで正しく解析および登録されます。
便宜上、疎結合アーキテクチャの利点を得るために、IntegrationFlowAdapter
基本クラスの実装を提供しています。次の例に示すように、from()
メソッドのいずれかを使用して IntegrationFlowDefinition
を生成するには、buildFlow()
メソッドの実装が必要です。
@Component
public class MyFlowAdapter extends IntegrationFlowAdapter {
private final AtomicBoolean invoked = new AtomicBoolean();
public Date nextExecutionTime(TriggerContext triggerContext) {
return this.invoked.getAndSet(true) ? null : new Date();
}
@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return from(this::messageSource,
e -> e.poller(p -> p.trigger(this::nextExecutionTime)))
.split(this)
.transform(this)
.aggregate(a -> a.processor(this, null), null)
.enrichHeaders(Collections.singletonMap("thing1", "THING1"))
.filter(this)
.handle(this)
.channel(c -> c.queue("myFlowAdapterOutput"));
}
public String messageSource() {
return "T,H,I,N,G,2";
}
@Splitter
public String[] split(String payload) {
return StringUtils.commaDelimitedListToStringArray(payload);
}
@Transformer
public String transform(String payload) {
return payload.toLowerCase();
}
@Aggregator
public String aggregate(List<String> payloads) {
return payloads.stream().collect(Collectors.joining());
}
@Filter
public boolean filter(@Header Optional<String> thing1) {
return thing1.isPresent();
}
@ServiceActivator
public String handle(String payload, @Header String thing1) {
return payload + ":" + thing1;
}
}
動的およびランタイム統合フロー
IntegrationFlow
およびそのすべての依存コンポーネントは、実行時に登録できます。バージョン 5.0 より前は、BeanFactory.registerSingleton()
フックを使用していました。Spring Framework 5.0
から開始して、プログラムによる BeanDefinition
登録に instanceSupplier
フックを使用します。次の例は、Bean をプログラムで登録する方法を示しています。
BeanDefinition beanDefinition =
BeanDefinitionBuilder.genericBeanDefinition((Class<Object>) bean.getClass(), () -> bean)
.getRawBeanDefinition();
((BeanDefinitionRegistry) this.beanFactory).registerBeanDefinition(beanName, beanDefinition);
前の例では、instanceSupplier
フックが genericBeanDefinition
メソッドの最後のパラメーターであり、この場合はラムダによって提供されることに注意してください。
必要な Bean の初期化とライフサイクルはすべて、標準のコンテキスト構成 Bean 定義と同様に自動的に行われます。
開発エクスペリエンスを簡素化するために、Spring Integration は IntegrationFlowContext
を導入して、次の例に示すように、実行時に IntegrationFlow
インスタンスを登録および管理します。
@Autowired
private AbstractServerConnectionFactory server1;
@Autowired
private IntegrationFlowContext flowContext;
...
@Test
public void testTcpGateways() {
TestingUtilities.waitListening(this.server1, null);
IntegrationFlow flow = f -> f
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort())
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1())
.id("client1"))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString());
IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();
assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO"));
}
これは、複数の構成オプションがあり、同様のフローの複数のインスタンスを作成する必要がある場合に役立ちます。そのために、オプションを繰り返して、IntegrationFlow
インスタンスをループ内で作成および登録できます。もう 1 つの変形は、データのソースが Spring ベースではなく、その場で作成する必要がある場合です。このようなサンプルは、次の例が示すように、Reactive Streams イベントソースです。
Flux<Message<?>> messageFlux =
Flux.just("1,2,3,4")
.map(v -> v.split(","))
.flatMapIterable(Arrays::asList)
.map(Integer::parseInt)
.map(GenericMessage<Integer>::new);
QueueChannel resultChannel = new QueueChannel();
IntegrationFlow integrationFlow =
IntegrationFlows.from(messageFlux)
.<Integer, Integer>transform(p -> p * 2)
.channel(resultChannel)
.get();
this.integrationFlowContext.registration(integrationFlow)
.register();
(IntegrationFlowContext.registration()
の結果として) IntegrationFlowRegistrationBuilder
を使用して、IntegrationFlow
の Bean 名を指定し、登録、autoStartup
の制御、非 Spring Integration Bean の登録を行うことができます。通常、これらの追加 Bean は、接続ファクトリ(AMQP、JMS、(S)FTP、TCP/UDP など)、シリアライザーとデシリアライザー、その他の必要なサポートコンポーネントです。
IntegrationFlowRegistration.destroy()
コールバックを使用して、動的に登録された IntegrationFlow
およびその依存 Bean が不要になったときに削除できます。詳細については、IntegrationFlowContext
Javadoc を参照してください。
バージョン 5.0.6 以降、IntegrationFlow 定義で生成されたすべての Bean 名の先頭にフロー ID が付加されます。常に明示的なフロー ID を指定することをお勧めします。それ以外の場合、IntegrationFlowContext で同期バリアが開始され、IntegrationFlow の Bean 名が生成され、その Bean が登録されます。生成された同じ Bean 名が異なる IntegrationFlow インスタンスに使用される可能性がある場合、競合状態を回避するために、これら 2 つの操作を同期します。 |
また、バージョン 5.0.6 以降、登録ビルダー API には新しいメソッド useFlowIdAsPrefix()
があります。これは、次の例に示すように、同じフローの複数のインスタンスを宣言し、フロー内のコンポーネントの ID が同じ場合に Bean 名の衝突を回避する場合に役立ちます。
private void registerFlows() {
IntegrationFlowRegistration flow1 =
this.flowContext.registration(buildFlow(1234))
.id("tcp1")
.useFlowIdAsPrefix()
.register();
IntegrationFlowRegistration flow2 =
this.flowContext.registration(buildFlow(1235))
.id("tcp2")
.useFlowIdAsPrefix()
.register();
}
private IntegrationFlow buildFlow(int port) {
return f -> f
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", port)
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1())
.id("client"))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString());
}
この場合、最初のフローのメッセージハンドラーは、tcp1.client.handler
という名前の Bean で参照できます。
useFlowIdAsPrefix() を使用する場合は、id 属性が必要です。 |
ゲートウェイとしての IntegrationFlow
IntegrationFlow
は、次の例に示すように、GatewayProxyFactoryBean
コンポーネントを提供するサービスインターフェースから開始できます。
public interface ControlBusGateway {
void send(String command);
}
...
@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlows.from(ControlBusGateway.class)
.controlBus()
.get();
}
インターフェースメソッドのすべてのプロキシには、IntegrationFlow
の次の統合コンポーネントにメッセージを送信するためのチャネルが提供されます。@MessagingGateway
アノテーションを使用してサービスインターフェースをマークし、@Gateway
アノテーションを使用してメソッドをマークできます。それでも、requestChannel
は無視され、IntegrationFlow
の次のコンポーネントの内部チャネルでオーバーライドされます。そうでない場合、IntegrationFlow
を使用してこのような構成を作成することは意味がありません。
デフォルトでは、GatewayProxyFactoryBean
は [FLOW_BEAN_NAME.gateway]
などの従来の Bean 名を取得します。その ID は、@MessagingGateway.name()
属性またはオーバーロードされた IntegrationFlows.from(Class<?> serviceInterface, Consumer<GatewayProxySpec> endpointConfigurer)
ファクトリメソッドを使用して変更できます。また、インターフェースの @MessagingGateway
アノテーションのすべての属性がターゲット GatewayProxyFactoryBean
に適用されます。アノテーション構成が適用されない場合、Consumer<GatewayProxySpec>
バリアントを使用して、ターゲットプロキシに適切なオプションを提供できます。この DSL メソッドは、バージョン 5.2 以降で使用できます。
Java 8 では、次の例に示すように、java.util.function
インターフェースを使用して統合ゲートウェイを作成することもできます。
@Bean
public IntegrationFlow errorRecovererFlow() {
return IntegrationFlows.from(Function.class, (gateway) -> gateway.beanName("errorRecovererFunction"))
.handle((GenericHandler<?>) (p, h) -> {
throw new RuntimeException("intentional");
}, e -> e.advice(retryAdvice()))
.get();
}
その errorRecovererFlow
は次のように使用できます。
@Autowired
@Qualifier("errorRecovererFunction")
private Function<String, String> errorRecovererFlowGateway;
DSL 拡張
バージョン 5.3 以降、IntegrationFlowExtension
が導入され、カスタムまたは構成された EIP オペレーターで既存の Java DSL を拡張できるようになりました。必要なのは、IntegrationFlow
Bean 定義で使用できるメソッドを提供するこのクラスの拡張だけです。拡張クラスは、カスタム IntegrationComponentSpec
構成にも使用できます。たとえば、既存の IntegrationComponentSpec
拡張機能で、欠落またはデフォルトのオプションを実装できます。以下のサンプルは、複合カスタムオペレーターと、デフォルトのカスタム outputProcessor
に対する AggregatorSpec
拡張の使用箇所を示しています。
public class CustomIntegrationFlowDefinition
extends IntegrationFlowExtension<CustomIntegrationFlowDefinition> {
public CustomIntegrationFlowDefinition upperCaseAfterSplit() {
return split()
.transform("payload.toUpperCase()");
}
public CustomIntegrationFlowDefinition customAggregate(Consumer<CustomAggregatorSpec> aggregator) {
return register(new CustomAggregatorSpec(), aggregator);
}
}
public class CustomAggregatorSpec extends AggregatorSpec {
CustomAggregatorSpec() {
outputProcessor(group ->
group.getMessages()
.stream()
.map(Message::getPayload)
.map(String.class::cast)
.collect(Collectors.joining(", ")));
}
}
メソッドチェーンフローの場合、これらの拡張機能の新しい DSL オペレーターは拡張機能クラスを返す必要があります。このようにして、ターゲット IntegrationFlow
定義は、新規および既存の DSL オペレーターで機能します。
@Bean
public IntegrationFlow customFlowDefinition() {
return
new CustomIntegrationFlowDefinition()
.log()
.upperCaseAfterSplit()
.channel("innerChannel")
.customAggregate(customAggregatorSpec ->
customAggregatorSpec.expireGroupsUponCompletion(true))
.logAndReply();
}
統合フローの構成
Spring Integration の第一級オブジェクトとしての MessageChannel
抽象化では、統合フローの構成が常に想定されていました。フロー内の任意のエンドポイントの入力チャネルを使用して、このチャネルを出力として持つエンドポイントからだけでなく、他の任意のエンドポイントからメッセージを送信できます。さらに、@MessagingGateway
契約、Content Enricher コンポーネント、<chain>
などの複合エンドポイント、IntegrationFlow
Bean(IntegrationFlowAdapter
など)を使用すると、ビジネスロジックをより短く再利用可能なパーツに分散するのに十分簡単です。最終的な構成に必要なのは、送信または受信する MessageChannel
に関する知識だけです。
バージョン 5.5.4
以降、MessageChannel
からさらに抽象化し、実装の詳細をエンドユーザーから隠すために、IntegrationFlows
は from(IntegrationFlow)
ファクトリメソッドを導入して、既存のフローの出力から現在の IntegrationFlow
を開始できるようにします。
@Bean
IntegrationFlow templateSourceFlow() {
return IntegrationFlows.fromSupplier(() -> "test data")
.channel("sourceChannel")
.get();
}
@Bean
IntegrationFlow compositionMainFlow(IntegrationFlow templateSourceFlow) {
return IntegrationFlows.from(templateSourceFlow)
.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("compositionMainFlowResult"))
.get();
}
一方、IntegrationFlowDefinition
には、他のフローの入力チャネルでカレントフローを継続するための to(IntegrationFlow)
ターミナルオペレーターが追加されています。
@Bean
IntegrationFlow mainFlow(IntegrationFlow otherFlow) {
return f -> f
.<String, String>transform(String::toUpperCase)
.to(otherFlow);
}
@Bean
IntegrationFlow otherFlow() {
return f -> f
.<String, String>transform(p -> p + " from other flow")
.channel(c -> c.queue("otherFlowResultChannel"));
}
フローの途中での構成は、既存の gateway(IntegrationFlow)
EIP メソッドを使用して簡単に実現できます。このように、より単純で再利用可能な論理ブロックからフローを構成することにより、複雑なフローを構築できます。例: IntegrationFlow
Bean のライブラリを依存関係として追加することができ、それらの構成クラスを最終プロジェクトにインポートして、IntegrationFlow
定義用にオートワイヤーするだけで十分です。