プログラミングモデル

関数カタログと柔軟な関数シグネチャー

Spring Cloud Function の主な機能の 1 つは、一貫した実行モデルを提供しながら、ユーザー定義関数のさまざまな型シグネチャーを適応およびサポートすることです。そのため、すべてのユーザー定義関数は FunctionCatalog によって標準表現に変換されます。

通常、ユーザーは FunctionCatalog をまったく気にする必要はありませんが、ユーザーコードでサポートされている機能の種類を知っておくと便利です。

また、Spring Cloud Function は、プロジェクト Reactor (英語) によって提供されるリアクティブ API に対してファーストクラスのサポートを提供していることも理解しておくことが重要です。これにより、Mono や Flux などのリアクティブプリミティブをユーザー定義関数の型として使用できるようになり、関数実装のプログラミングモデルを選択する際の柔軟性が向上します。リアクティブプログラミングモデルでは、命令型プログラミングスタイルでは実装が困難または不可能な機能も関数にサポートされます。詳細については、関数アリティのセクションを参照してください。

Java 8 関数のサポート

Spring Cloud Function は、Java 8 以降 Java で定義された 3 つのコア関数インターフェースを採用し、そ上に構築されています。

  • Supplier<O>

  • Function<I, O>

  • Consumer<I>

SupplierFunctionConsumer について頻繁にメンションすることを避けるため、このマニュアルの残りの部分では、適切な場合はこれを Functional Bean と呼ぶことにします。

簡単に言えば、ApplicationContext 内の 関数 Bean である Bean は、FunctionCatalog に遅延登録されます。つまり、このリファレンスマニュアルで説明されているすべての追加機能の恩恵を受けることができます。

最も単純なアプリケーションでは、アプリケーション構成で SupplierFunction、または Consumer 型の @Bean を宣言するだけです。その後、FunctionCatalog を使用して、名前に基づいて特定の関数を検索できます。

例:

@Bean
public Function<String, String> uppercase() {
    return value -> value.toUpperCase();
}

// . . .

FunctionCatalog catalog = applicationContext.getBean(FunctionCatalog.class);
Function uppercase =  catalog.lookup(“uppercase”);

uppercase が Bean である場合、ApplicationContext から直接取得することは確かに可能ですが、取得できるのは SCF によって提供される追加機能なしで宣言したとおりの Bean だけであることを理解することが重要です。FunctionCatalog を介して関数を検索すると、受け取るインスタンスは、このマニュアルで説明されている追加機能 (つまり、型変換、合成など) でラップ (インストルメント化) されます。

また、一般的なユーザーは Spring Cloud Function を直接使用しないことを理解することが重要です。代わりに、一般的なユーザーは、追加作業なしでさまざまな実行コンテキストで使用することを想定して、Java FunctionSupplier、または Consumer を実装します。

たとえば、同じ Java 関数は、Spring Cloud Function が提供するアダプターや、コアプログラミングモデルとして Spring Cloud Function を使用する他のフレームワーク (Spring Cloud Stream など) を介して、REST エンドポイントストリーミングメッセージハンドラーAWS Lambda などとして表現できます。

要約すると、Spring Cloud Function は、さまざまな実行コンテキストで利用できる追加機能を備えた Java 関数をインストルメント化します。

関数定義

前の例では、FunctionCatalog の関数をプログラムで検索する方法を示しましたが、Spring Cloud Function が別のフレームワーク (例: Spring Cloud Stream ) によってプログラミングモデルとして使用される一般的な統合ケースでは、spring.cloud.function.definition プロパティを介して使用する関数を宣言できます。FunctionCatalog の関数を検出する場合は、デフォルトの動作を知って理解することが重要です。

たとえば、ApplicationContext に 関数 Bean が 1 つしかない場合は、FunctionCatalog の単一の関数を空の名前または任意の名前で検索できるため、通常、spring.cloud.function.definition プロパティは必要ありません。例: カタログ内の関数が uppercase のみである場合、catalog.lookup(null)catalog.lookup(“”)catalog.lookup(“foo”) として検索できます。

ただし、spring.cloud.function.definition を使用する Spring Cloud Stream などのフレームワークを使用している場合は、常に spring.cloud.function.definition プロパティを使用することをお勧めします。

以下に例を示します。

spring.cloud.function.definition=uppercase

不適格な関数のフィルタリング

一般的な ApplicationContext には、有効な Java 関数であるが、FunctionCatalog に登録する候補として意図されていない Bean が含まれる場合があります。このような Bean は、他のプロジェクトからの自動構成、または Java 関数として適格な他の Bean である可能性があります。

フレームワークは、FunctionCatalog への登録の候補にならない既知の Bean のデフォルトのフィルタリングを提供します。また、spring.cloud.function.ineligible-definitions プロパティを使用して、Bean 定義名のコンマ区切りリストを提供することで、このリストに Bean を追加することもできます。

以下に例を示します。

spring.cloud.function.ineligible-definitions=foo,bar

サプライヤー

サプライヤーは、リアクティブSupplier<Flux<T>>)または命令型Supplier<T>)にすることができます。呼び出しの観点からは、このような Supplier の実装者にとって、これは何ら違いを生じないはずです。

ただし、フレームワーク内 (例: Spring Cloud Stream ) で使用される場合、サプライヤー、特にリアクティブは、ストリームのソースを表すために使用されることがよくあります。サプライヤーは、コンシューマーがサブスクライブできるストリーム (例: Flux) を取得するために 1 回呼び出されます。言い換えると、このようなサプライヤーは、無限ストリームと同等のものを表します。

ただし、同じリアクティブサプライヤーが有限のストリームを表すこともできます (例: ポーリングされた JDBC データの結果セット)。このような場合、このようなリアクティブサプライヤーは、基盤となるフレームワークのポーリングメカニズムに接続する必要があります。

それを補助するために、Spring Cloud Function は、そのようなサプライヤーが有限のストリームを生成し、再度ポーリングする必要がある可能性があることを通知するマーカーアノテーション org.springframework.cloud.function.context.PollableBean を提供します。ただし、Spring Cloud Function 自体はこのアノテーションの動作を提供しないことを理解することが重要です。

さらに、PollableBean アノテーションは、生成されたストリームを分割する必要があることを示す分割可能属性を公開します。( スプリッター EIP (英語) を参照してください)

次に例を示します。

@PollableBean(splittable = true)
public Supplier<Flux<String>> someSupplier() {
	return () -> {
		String v1 = String.valueOf(System.nanoTime());
		String v2 = String.valueOf(System.nanoTime());
		String v3 = String.valueOf(System.nanoTime());
		return Flux.just(v1, v2, v3);
	};
}

関数

関数は、命令型またはリアクティブ型の方法で記述することもできます。ただし、Supplier や Consumer とは異なり、実装者は、Spring Cloud Stream などのフレームワーク内で使用する場合、リアクティブ関数はストリームへの参照を渡すために 1 回だけ呼び出される (つまり、Flux または Mono) のに対し、命令型関数はイベントごとに 1 回呼び出されること以外に特別な考慮事項はありません。

public Function<String, String> uppercase() {
    . . . .
}

BiFunction

ペイロードとともに追加のデータ (メタデータ) を受け取る必要がある場合は、いつでも関数シグネチャーを宣言して、追加情報を含むヘッダーのマップを含む Message を受け取ることができます。

public Function<Message<String>, String> uppercase() {
    . . . .
}

関数シグネチャーをもう少し軽くし、より POJO に近づけるには、別の方法があります。BiFunction を使用できます。

public BiFunction<String, Map, String> uppercase() {
    . . . .
}

Message には 2 つの属性 (ペイロードとヘッダー) しか含まれず、BiFunction には 2 つの入力パラメーターが必要であることを考えると、フレームワークはこの署名を自動的に認識し、Message からペイロードを抽出して、それを最初の引数として渡し、ヘッダーの Map を 2 番目の引数として渡します。その結果、関数は Spring のメッセージング API に結合されません。BiFunction には厳密な署名が必要であり、2 番目の引数は Map でなければならないことに注意してください。同じルールが BiConsumer にも適用されます。

コンシューマー

Consumer は、少なくとも潜在的にはブロッキングを意味する void 戻り型を持っているため、少し特殊です。ほとんどの場合、Consumer<Flux<?>> を記述する必要はありませんが、そうする必要がある場合は、入力 Flux をサブスクライブすることを忘れないでください。

関数合成

関数合成は、複数の関数を 1 つに合成できる機能です。コアサポートは、Java 8 以降で利用可能な Function.andThen(..) (標準 Javadoc) によって提供される関数合成機能に基づいています。ただし、Spring Cloud Function では、これに加えていくつかの追加機能が提供されています。

宣言型関数の合成

この機能を使用すると、spring.cloud.function.definition プロパティを設定するときに、| (パイプ) または , (カンマ) 区切り文字を使用して宣言的に構成指示を提供できます。

例:

--spring.cloud.function.definition=uppercase|reverse

ここでは、関数 uppercase と関数 reverse を組み合わせた単一の関数の定義を効果的に提供しました。実際、関数の定義は複数の名前付き関数の組み合わせになる可能性があるため、プロパティ名が name ではなく definition である理由の 1 つはこれです。前述のように、| の代わりに …​definition=uppercase,reverse のように , を使用できます。

非関数の作成

Spring Cloud Function は、Supplier と Consumer または Function の合成、および Function と Consumer の合成もサポートします。理解しておくべき重要なことは、このような定義の最終結果です。Supplier と Function の合成は依然として Supplier になりますが、Supplier と Consumer の合成は実質的に Runnable になります。同じロジックに従って、Function と Consumer の合成は Consumer になります。

そしてもちろん、Consumer や FunctionConsumerSupplier などの構成不可能なオブジェクトを構成することはできません。

関数のルーティングとフィルタリング

バージョン 2.2 以降、Spring Cloud Function はルーティング機能を提供しており、これにより単一の関数を呼び出すことができます。この機能は、呼び出したい実際の関数へのルーターとして機能します。この機能は、複数の関数の構成を維持するのが面倒な場合や、複数の関数を公開できない特定の FAAS 環境で非常に役立ちます。

RoutingFunction は、FunctionCatalog に functionRouter という名前で登録されています。簡潔さと一貫性を保つために、RoutingFunction.FUNCTION_NAME 定数を参照することもできます。

この関数には次のシグネチャーがあります。

public class RoutingFunction implements Function<Object, Object> {
// . . .
}

ルーティング指示は、いくつかの方法で伝達できます。メッセージヘッダー、システムプロパティ、プラグ可能な戦略を介した指示の提供をサポートしています。詳細をいくつか見てみましょう。

MessageRoutingCallback

MessageRoutingCallback は、ルート先関数定義の名前の決定を支援する戦略です。

public interface MessageRoutingCallback {
    default String routingResult(Message<?> message) {
	    return (String) message.getHeaders().get(FunctionProperties.FUNCTION_DEFINITION);
    }
}

必要なのは、MessageRoutingCallback を Bean として実装し、RoutingFunction によって取得されるように登録することだけです。例:

@Bean
public MessageRoutingCallback customRouter() {
	return new MessageRoutingCallback() {
		@Override
		public String routingResult(Message<?> message) {
			return (String) message.getHeaders().get(FunctionProperties.FUNCTION_DEFINITION);
		}
	};
}

前の例では、非常に単純な MessageRoutingCallback の実装が示されています。これは、受信 Message の FunctionProperties.FUNCTION_DEFINITION Message ヘッダーから関数定義を決定し、呼び出す関数の定義を表す String のインスタンスを返します。

メッセージヘッダー

入力引数が Message<?> 型の場合、spring.cloud.function.definition または spring.cloud.function.routing-expression Message ヘッダーのいずれかを設定することでルーティング命令を伝達できます。プロパティの名前が示すように、spring.cloud.function.routing-expression は Spring 式言語 (SpEL) に依存しています。より静的なケースでは、spring.cloud.function.definition ヘッダーを使用できます。これにより、単一の関数の名前 (例: …​definition=foo) または合成命令 (例: …​definition=foo|bar|baz) を指定できます。より動的なケースでは、spring.cloud.function.routing-expression ヘッダーを使用して、関数の定義に解決される SpEL 式を指定できます (上記のとおり)。

SpEL 評価コンテキストのルートオブジェクトは実際の入力引数であるため、Message<?> の場合は、payload と headers の両方にアクセスできる式を構築できます (例: spring.cloud.function.routing-expression=headers.function_name)。
SpEL では、実行する Java コードの文字列表現をユーザーが提供できます。spring.cloud.function.routing-expression が Message ヘッダー経由で提供される可能性があるということは、そのような式を設定する機能がエンドユーザーに公開される可能性があることを意味します (つまり、Web モジュールを使用する場合は HTTP ヘッダー)。これにより、いくつかの問題 (悪意のあるコードなど) が発生する可能性があります。これを管理するために、Message ヘッダー経由で提供されるすべての式は、機能が制限されており、コンテキストオブジェクト (この場合は Message) のみを評価するように設計された SimpleEvaluationContext に対してのみ評価されます。一方、プロパティまたはシステム環境変数経由で設定されるすべての式は、Java 言語の完全な柔軟性を可能にする StandardEvaluationContext に対して評価されます。システム / アプリケーションのプロパティまたは環境変数経由で式を設定することは、通常の場合エンドユーザーに公開されないため、一般的に安全であると考えられていますが、他の Spring プロジェクト、サードパーティ、エンドユーザーが作成したカスタム実装によって提供される Spring Boot Actuator エンドポイント経由で、システム、アプリケーション、環境変数を更新する可視性と機能が実際にエンドユーザーに公開される場合があります。このようなエンドポイントは、業界標準の Web セキュリティ慣行を使用して保護する必要があります。Spring Cloud Function は、このようなエンドポイントを公開しません。

特定の実行環境 / モデルでは、アダプターは Message ヘッダーを介して spring.cloud.function.definition および / または spring.cloud.function.routing-expression を変換して通信するロールを担います。例: spring-cloud-function-web を使用する場合、spring.cloud.function.definition を HTTP ヘッダーとして提供でき、フレームワークはそれを他の HTTP ヘッダーとともにメッセージヘッダーとして伝播します。

アプリケーションのプロパティ

ルーティング指示は、アプリケーションプロパティとして spring.cloud.function.definition または spring.cloud.function.routing-expression を介して通信することもできます。前のセクションで説明したルールはここでも適用されます。唯一の違いは、これらの指示をアプリケーションプロパティ (例: --spring.cloud.function.definition=foo) として提供することです。

メッセージヘッダーとして spring.cloud.function.definition または spring.cloud.function.routing-expression を提供するのは、命令型関数 (例: Function<Foo, Bar>) でのみ機能することを理解することが重要です。つまり、命令型関数ではメッセージごとにのみルーティングできます。リアクティブ関数では メッセージごとにルーティングできません。ルーティング指示はアプリケーションプロパティとしてのみ提供できます。すべては作業単位に関することです。命令型関数では、作業単位はメッセージであるため、そのような作業単位に基づいてルーティングできます。リアクティブ関数では、作業単位はストリーム全体であるため、アプリケーションプロパティを介して提供された指示のみに基づいて、ストリーム全体をルーティングします。

ルーティング命令の優先順位

ルーティング指示を提供するメカニズムが複数あることを考えると、複数のメカニズムが同時に使用される場合の競合解決の優先順位を理解することが重要です。順序は次のとおりです。

  1. MessageRoutingCallback (関数が必須の場合、他に何かが定義されているかどうかに関係なく優先されます。)

  2. メッセージヘッダー (機能が必須で、MessageRoutingCallback が提供されていない場合)

  3. アプリケーションのプロパティ (任意の関数)

ルーティング不可能なメッセージ

カタログにルート先関数がない場合は、そのことを示す例外が発生します。

このような動作が望ましくない場合があり、そのようなメッセージを処理できる「キャッチオール」型の関数が必要になることがあります。これを実現するために、フレームワークは org.springframework.cloud.function.context.DefaultMessageRoutingHandler 戦略を提供します。必要なのは、これを Bean として登録することだけです。デフォルトの実装では、メッセージがルーティングできないという事実がログに記録されるだけで、例外なしでメッセージフローが続行され、ルーティングできないメッセージが実質的に削除されます。より高度なものが必要な場合は、この戦略の独自の実装を提供し、これを Bean として登録するだけです。

@Bean
public DefaultMessageRoutingHandler defaultRoutingHandler() {
	return new DefaultMessageRoutingHandler() {
		@Override
		public void accept(Message<?> message) {
			// do something really cool
		}
	};
}

関数フィルタリング

フィルタリングは、「実行」または「破棄」の 2 つのパスしかないルーティングの型です。関数の観点で言えば、ある条件が "true" を返す場合にのみ特定の関数を呼び出し、それ以外の場合は入力を破棄することを意味します。

ただし、入力を破棄する場合、アプリケーションのコンテキストでそれが何を意味するかについてはさまざまな解釈があります。たとえば、ログに記録したり、破棄されたメッセージのカウンターを維持したりする場合があります。また、何もしないという場合もあります。

これらの異なるパスがあるため、破棄されたメッセージの処理方法に関する一般的な設定オプションは提供されていません。代わりに、単に「破棄」パスを示す単純な Consumer を定義することをお勧めします。

@Bean
public Consumer<?> devNull() {
   // log, count, or whatever
}

これで、実質的にフィルターとなる 2 つのパスのみを持つルーティング式を作成できます。例:

--spring.cloud.function.routing-expression=headers.contentType.toString().equals('text/plain') ? 'echo' : 'devNull'

'echo' 関数に送られる条件に合わないすべてのメッセージは 'devNull' に送られ、そこでは何もできません。署名 Consumer<?> により、型変換が試行されないため、実行オーバーヘッドがほとんど発生しません。

リアクティブ入力 (Publisher など) を扱う場合、ルーティング指示は Function プロパティを介してのみ提供する必要があります。これは、Publisher を渡すために 1 回だけ呼び出され、残りはリアクターによって処理されるリアクティブ関数の性質によるもので、個々の値 (Message など) を介して伝達されるルーティング指示にアクセスしたり、それに依存したりすることはできません。

複数のルーター

デフォルトでは、フレームワークには、前のセクションで説明したように、常に 1 つのルーティング関数が構成されます。ただし、複数のルーティング関数が必要になる場合があります。その場合は、functionRouter 以外の名前を付ける限り、既存のインスタンスに加えて、RoutingFunction Bean の独自のインスタンスを作成できます。

マップ内のキー / 値のペアとして、spring.cloud.function.routing-expression または spring.cloud.function.definition を RoutingFunction に渡すことができます。

以下に簡単な例を示します。

@Configuration
protected static class MultipleRouterConfiguration {

	@Bean
	RoutingFunction mySpecialRouter(FunctionCatalog functionCatalog, BeanFactory beanFactory, @Nullable MessageRoutingCallback routingCallback) {
		Map<String, String> propertiesMap = new HashMap<>();
		propertiesMap.put(FunctionProperties.PREFIX + ".routing-expression", "'reverse'");
		return new RoutingFunction(functionCatalog, propertiesMap, new BeanFactoryResolver(beanFactory), routingCallback);
	}

	@Bean
	public Function<String, String> reverse() {
		return v -> new StringBuilder(v).reverse().toString();
	}

	@Bean
	public Function<String, String> uppercase() {
		return String::toUpperCase;
	}
}

これがどのように動作するかを示すテストを以下に示します。

@Test
public void testMultipleRouters() {
	System.setProperty(FunctionProperties.PREFIX + ".routing-expression", "'uppercase'");
	FunctionCatalog functionCatalog = this.configureCatalog(MultipleRouterConfiguration.class);
	Function function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME);
	assertThat(function).isNotNull();
	Message<String> message = MessageBuilder.withPayload("hello").build();
	assertThat(function.apply(message)).isEqualTo("HELLO");

	function = functionCatalog.lookup("mySpecialRouter");
	assertThat(function).isNotNull();
	message = MessageBuilder.withPayload("hello").build();
	assertThat(function.apply(message)).isEqualTo("olleh");
}

入力 / 出力エンリッチメント

受信メッセージまたは発信メッセージを変更または改善し、機能以外の問題からコードをクリーンに保つ必要がある場合がよくあります。ビジネスロジック内でそれを実行したくありません。

関数合成を使えばいつでも実現できます。このようなアプローチには、いくつかの利点があります。

  • これにより、この非機能関心事を別の関数に分離し、ビジネス関数を関数定義として構成できるようになります。

  • 受信メッセージが実際のビジネス機能に到達する前に変更できる内容に関して、完全な自由(および危険)が提供されます。

@Bean
public Function<Message<?>, Message<?>> enrich() {
    return message -> MessageBuilder.fromMessage(message).setHeader("foo", "bar").build();
}

@Bean
public Function<Message<?>, Message<?>> myBusinessFunction() {
    // do whatever
}

次に、次の関数定義を指定して関数を作成します: enrich|myBusinessFunction

説明したアプローチは最も柔軟ですが、最も複雑でもあります。前の例からわかるように、ビジネス関数と組み合わせる前に、コードを記述して Bean にするか、関数として手動で登録する必要があります。

しかし、前の例のように、実行しようとしている変更(エンリッチメント)が簡単な場合はどうでしょうか。同じことを達成するための、より単純でより動的で構成可能なメカニズムはありますか?

バージョン 3.1.3 以降、フレームワークでは、関数への入力と関数からの出力の両方に対して個別のメッセージヘッダーを拡充する SpEL 式を提供できるようになりました。例として、テストの 1 つを見てみましょう。

@Test
public void testMixedInputOutputHeaderMapping() throws Exception {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
			SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run(
					"--logging.level.org.springframework.cloud.function=DEBUG",
					"--spring.main.lazy-initialization=true",
					"--spring.cloud.function.configuration.split.output-header-mapping-expression.keyOut1='hello1'",
					"--spring.cloud.function.configuration.split.output-header-mapping-expression.keyOut2=headers.contentType",
					"--spring.cloud.function.configuration.split.input-header-mapping-expression.key1=headers.path.split('/')[0]",
					"--spring.cloud.function.configuration.split.input-header-mapping-expression.key2=headers.path.split('/')[1]",
					"--spring.cloud.function.configuration.split.input-header-mapping-expression.key3=headers.path")) {

		FunctionCatalog functionCatalog = context.getBean(FunctionCatalog.class);
		FunctionInvocationWrapper function = functionCatalog.lookup("split");
		Message<byte[]> result = (Message<byte[]>) function.apply(MessageBuilder.withPayload("hello")
				.setHeader(MessageHeaders.CONTENT_TYPE, "application/json")
				.setHeader("path", "foo/bar/baz")
				.build());
		assertThat(result.getHeaders()).containsKey("keyOut1"));
		assertThat(result.getHeaders().get("keyOut1")).isEqualTo("hello1");
		assertThat(result.getHeaders()).containsKey("keyOut2"));
		assertThat(result.getHeaders().get("keyOut2")).isEqualTo("application/json");
	}
}

ここでは、関数名 (つまり split) に続いて設定するメッセージヘッダーキーの名前と SpEL 式の値が続く、input-header-mapping-expression および output-header-mapping-expression というプロパティが表示されます。最初の式 ('keyOut1' 用) は、一重引用符で囲まれたリテラル SpEL 式で、実質的に 'keyOut1' を値 hello1 に設定します。keyOut2 は、既存の 'contentType' ヘッダーの値に設定されます。

また、入力ヘッダーマッピングでは、実際に既存のヘッダー 'path' の値を分割し、インデックスに基づいて分割された要素の値に key1 と key2 の個々の値を設定している興味深い機能も確認できます。

何らかの理由で指定された式の評価が失敗した場合、関数の実行は何も起こらなかったかのように続行されます。ただし、ログにはそのことを通知する WARN メッセージが表示されます。
o.s.c.f.context.catalog.InputEnricher    : Failed while evaluating expression "hello1"  on incoming message. . .

複数の入力を持つ関数を扱う場合(次のセクション)、input-header-mapping-expression の直後にインデックスを使用できます。

--spring.cloud.function.configuration.echo.input-header-mapping-expression[0].key1=‘hello1'
--spring.cloud.function.configuration.echo.input-header-mapping-expression[1].key2='hello2'

関数アリティ

データのストリームを分類して整理する必要がある場合があります。例: たとえば、「オーダー」と「請求書」を含む組織化されていないデータを処理する典型的なビッグデータのユースケースを考えてみましょう。それぞれを別々のデータストアに入れたいとします。ここで、関数アリティ(複数の入力と出力を持つ関数)のサポートが役立ちます。

このような関数の例を見てみましょう。MessageRoutingCallback

完全な実装の詳細については、こちら [GitHub] (英語) を参照してください。
@Bean
public Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>> organise() {
	return flux -> ...;
}

プロジェクト Reactor が SCF のコア依存関係であることを考えると、そのタプルライブラリを使用しています。タプルは、カーディナリティ情報の両方を伝達することにより、独自の利点を提供します。どちらも SCSt の文脈では非常に重要です。カーディナリティにより、作成して関数の対応する入力と出力にバインドする必要のある入力バインディングと出力バインディングの数を知ることができます。型情報を認識することで、適切な型変換が保証されます。

また、この関数では、2 つの出力バインディング名が organise-out-0 と organise-out-1 であるため、ここでバインディング名の命名規則の「インデックス」部分が機能します。

現時点では、関数のアリティは、 イベントの合流に関する評価と計算では通常、単一のイベントではなくイベントのストリームを参照する必要がある複雑なイベント処理を中心としたリアクティブ関数 (Function<TupleN<Flux<?>…​>, TupleN<Flux<?>…​>>) に対してのみサポートされています。

入力ヘッダーの伝播

典型的なシナリオでは、入力メッセージヘッダーは出力に伝播されず、当然のことながら、関数の出力は、それ自体のメッセージヘッダーのセットを必要とする他の何かへの入力である可能性があるためです。ただし、そのような伝播が必要になる場合があるため、Spring Cloud Function はこれを実現するためのいくつかのメカニズムを提供します。

まず、ヘッダーをいつでも手動でコピーできます。例: Message を受け取り、Message (つまり、Function<Message, Message>)を返すシグネチャーを持つ関数がある場合は、ヘッダーを自分で簡単かつ選択的にコピーできます。関数が Message を返す場合、フレームワークはペイロードを適切に変換する以外は何もしません。ただし、このようなアプローチは、特にすべてのヘッダーを単にコピーしたい場合は、少し面倒なことがわかる場合があります。このような場合を支援するために、入力ヘッダーを伝播する関数にブールフラグを設定できる単純なプロパティを提供します。プロパティは copy-input-headers です。

例: 次の構成があると仮定します。

@EnableAutoConfiguration
@Configuration
protected static class InputHeaderPropagationConfiguration {

	@Bean
	public Function<String, String> uppercase() {
		return x -> x.toUpperCase();
	}
}

ご存知のように、メッセージを送信することでこの関数を呼び出すことができます (フレームワークは型変換とペイロード抽出を処理します)

spring.cloud.function.configuration.uppercase.copy-input-headers を true に設定するだけで、次のアサーションも当てはまります。

Function<Message<String>, Message<byte[]>> uppercase = catalog.lookup("uppercase", "application/json");
Message<byte[]> result = uppercase.apply(MessageBuilder.withPayload("bob").setHeader("foo", "bar").build());
assertThat(result.getHeaders()).containsKey("foo");

型変換 (コンテンツ型のネゴシエーション)

コンテンツ型ネゴシエーションは Spring Cloud Function のコア機能の 1 つです。これにより、受信データを関数シグネチャーで宣言された型に変換できるだけでなく、関数の合成中に同じ変換を実行して、他の方法では(型ごとに)合成できない関数にすることができます。構成可能。

コンテンツ型ネゴシエーションの背後にあるメカニズムと必要性をよりよく理解するために、例として次の関数を使用して、非常に単純なユースケースを見ていきます。

@Bean
public Function<Person, String> personFunction {..}

前の例で示した関数は、引数として Person オブジェクトを想定し、出力として String 型を生成します。このような関数が Person 型で呼び出された場合、すべて正常に動作します。ただし、通常、関数は、ほとんどの場合、byte[]JSON String などの生の形式で提供される受信データのハンドラーのロールを果たします。フレームワークが受信データをこの関数の引数として渡すために、何らかの方法で受信データを Person 型に変換する必要があります。

Spring Cloud Function は、それを実現するために Spring メカニズムに固有の 2 つのメカニズムに依存しています。

  1. MessageConverter- 受信メッセージデータから関数によって宣言された型に変換します。

  2. ConversionService- 受信非メッセージデータから関数によって宣言された型に変換します。

これは、生データの型(メッセージまたは非メッセージ)に応じて、Spring Cloud Function がいずれかのメカニズムを適用することを意味します。

他のリクエスト(HTTP、メッセージングなど)の一部として呼び出される関数を処理する場合、ほとんどの場合、フレームワークは MessageConverters に依存します。これは、そのようなリクエストがすでに Spring Message に変換されているためです。つまり、フレームワークは適切な MessageConverter を見つけて適用します。これを実現するには、フレームワークにユーザーからの指示が必要です。これらの命令の 1 つは、関数自体の署名(Person 型)によってすでに提供されています。理論的には、それで十分なはずです(場合によってはそれで十分です)。ただし、ほとんどのユースケースでは、適切な MessageConverter を選択するために、フレームワークに追加の情報が必要です。その欠落している部分は contentType ヘッダーです。

このようなヘッダーは通常、メッセージの一部として提供され、最初にそのようなメッセージを作成した対応するアダプターによって挿入されます。例: HTTP POST リクエストでは、コンテンツ型の HTTP ヘッダーがメッセージの contentType ヘッダーにコピーされます。

そのようなヘッダーが存在しない場合、フレームワークは application/json などのデフォルトのコンテンツ型に依存します。

コンテンツ型と引数型

前述のように、フレームワークが適切な MessageConverter を選択するには、引数型と、オプションでコンテンツ型情報が必要です。適切な MessageConverter を選択するためのロジックは、ユーザー定義関数の呼び出しの直前(実際の引数型がフレームワークに認識されている場合)にトリガーされる引数リゾルバーにあります。引数の型が現在のペイロードの型と一致しない場合、フレームワークは事前構成された MessageConverters のスタックに委譲して、それらのいずれかがペイロードを変換できるかどうかを確認します。

contentType と引数型の組み合わせは、フレームワークが適切な MessageConverter を見つけることによってメッセージをターゲット型に変換できるかどうかを判断するメカニズムです。適切な MessageConverter が見つからない場合は、例外がスローされます。これは、カスタム MessageConverter を追加することで処理できます(User-defined Message Converters を参照)。

Message が contentType のみに基づいて他の型に変換されることを期待しないでください。contentType はターゲット型を補完することを忘れないでください。これは、MessageConverter が考慮に入れる場合と考慮しない場合があるヒントです。

メッセージコンバーター

MessageConverters は、次の 2 つのメソッドを定義します。

Object fromMessage(Message<?> message, Class<?> targetClass);

Message<?> toMessage(Object payload, @Nullable MessageHeaders headers);

特に Spring Cloud Stream のコンテキストでは、これらのメソッドの契約とその使用箇所を理解することが重要です。

fromMessage メソッドは、受信 Message を引数型に変換します。Message のペイロードは任意の型にすることができ、複数の型をサポートするのは MessageConverter の実際の実装次第です。

提供された MessageConverters

前述のように、フレームワークは、最も一般的なユースケースを処理するための MessageConverters のスタックをすでに提供しています。次のリストは、提供された MessageConverters を優先順に説明しています(最初に機能する MessageConverter が使用されます)。

  1. JsonMessageConverter: Jackson (デフォルト) または Gson ライブラリを使用して contentType が application/json である場合に、Message のペイロードと POJO 間の変換をサポートします。このメッセージコンバーターは、type パラメーター (例: application/json;type=foo.bar.person ) も認識します。これは、関数の開発時に型が不明であるため、関数のシグネチャーが Function<?, ?>Function、または Function<Object, Object> のように見える場合に便利です。言い換えれば、型変換の場合、通常は関数シグネチャーから型を派生します。mime-type パラメーターを使用すると、より動的な方法で型を伝達できます。

  2. ByteArrayMessageConvertercontentType が application/octet-stream の場合に、Message のペイロードを byte[] から byte[] に変換することをサポートします。これは本質的にパススルーであり、主に下位互換性のために存在します。

  3. StringMessageConvertercontentType が text/plain の場合、任意の型の String への変換をサポートします。

適切なコンバーターが見つからない場合、フレームワークは例外をスローします。その場合は、コードと構成をチェックして、何も見逃していないことを確認する必要があります(つまり、バインディングまたはヘッダーを使用して contentType を提供したことを確認してください)。ただし、ほとんどの場合、いくつかのまれなケース(おそらく、カスタム contentType など)が見つかり、提供されている MessageConverters の現在のスタックは変換方法を認識していません。その場合は、カスタム MessageConverter を追加できます。ユーザー定義のメッセージコンバーターを参照してください。

ユーザー定義 MessageConverters

Spring Cloud Function は、追加の MessageConverters を定義および登録するメカニズムを公開しています。これを使用するには、org.springframework.messaging.converter.MessageConverter を実装し、@Bean として構成します。次に、`MessageConverter` の既存のスタックに追加されます。

カスタム MessageConverter 実装が既存のスタックの先頭に追加されることを理解することが重要です。その結果、カスタム MessageConverter 実装は既存の実装よりも優先され、既存のコンバーターをオーバーライドしたり、追加したりすることができます。

次の例は、application/bar と呼ばれる新しいコンテンツ型をサポートするメッセージコンバーター Bean を作成する方法を示しています。

@SpringBootApplication
public static class SinkApplication {

    ...

    @Bean
    public MessageConverter customMessageConverter() {
        return new MyCustomMessageConverter();
    }
}

public class MyCustomMessageConverter extends AbstractMessageConverter {

    public MyCustomMessageConverter() {
        super(new MimeType("application", "bar"));
    }

    @Override
    protected boolean supports(Class<?> clazz) {
        return (Bar.class.equals(clazz));
    }

    @Override
    protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
        Object payload = message.getPayload();
        return (payload instanceof Bar ? payload : new Bar((byte[]) payload));
    }
}

JSON オプションに関する注意

Spring Cloud Function では、JSON を処理するための Jackson および Gson メカニズムをサポートしています。そして、あなたの利益のために、それ自体が 2 つのメカニズムを認識し、選択したものを使用するか、デフォルトのルールに従う org.springframework.cloud.function.json.JsonMapper でそれを抽象化しました。デフォルトのルールは次のとおりです。

  • 使用されるメカニズムであるクラスパス上にあるライブラリ。クラスパスに com.fasterxml.jackson.* がある場合は、Jackson が使用され、com.google.code.gson がある場合は、Gson が使用されます。

  • 両方がある場合は、Gson がデフォルトになります。または、gson または jackson の 2 つの値のいずれかを使用して spring.cloud.function.preferred-json-mapper プロパティを設定できます。

とはいえ、型変換は通常、開発者にとって透過的です。ただし、org.springframework.cloud.function.json.JsonMapper は Bean としても登録されているため、必要に応じてコードに簡単に挿入できます。

Kotlin Lambda サポート

また、Kotlin ラムダのサポートも提供しています(v2.0 以降)。次のことを考慮してください。

@Bean
open fun kotlinSupplier(): () -> String {
    return  { "Hello from Kotlin" }
}

@Bean
open fun kotlinFunction(): (String) -> String {
    return  { it.toUpperCase() }
}

@Bean
open fun kotlinConsumer(): (String) -> Unit {
    return  { println(it) }
}

上記は、Spring Bean として構成された Kotlin ラムダを表しています。各署名は、Java で SupplierFunctionConsumer に相当するものにマップされるため、フレームワークによってサポート / 認識される署名になります。Kotlin から Java へのマッピングの仕組みはこのドキュメントの範囲外ですが、「Java 8 関数のサポート」セクションで概説されている署名変換の同じルールがここでも適用されることを理解することが重要です。

Kotlin サポートを有効にするには、クラスパスに Kotlin SDK ライブラリを追加するだけで、適切な自動構成とサポートクラスがトリガーされます。

関数コンポーネントスキャン

Spring Cloud Function は、functions と呼ばれるパッケージ内の FunctionConsumerSupplier の実装が存在する場合、それをスキャンします。この機能を使用すると、Spring に依存しない関数を記述できます。@Component アノテーションも必要ありません。別のパッケージを使用する場合は、spring.cloud.function.scan.packages を設定できます。spring.cloud.function.scan.enabled=false を使用して、スキャンを完全にオフにすることもできます。

データマスキング

一般的なアプリケーションには、複数のレベルのログ記録が付属しています。一部のクラウド / サーバーレスプラットフォームでは、誰でも見ることができるようにログに記録されるパケットに機密データが含まれている場合があります。ログ記録はフレームワーク自体から行われるため、ログに記録されるデータをインスペクションするのは個々の開発者の責任ですが、バージョン 4.1 の時点で、AWS Lambda ペイロード内の機密データのマスキングに最初に役立つ JsonMasker が導入されました。ただし、JsonMasker は汎用的で、どのモジュールでも使用できます。現時点では、JSON などの構造化データでのみ機能します。必要なのは、マスクするキーを指定することだけです。残りは META-INF/mask.keys ファイルでキーを指定する必要があります。ファイルの形式は非常にシンプルで、複数のキーをカンマ、改行、その両方で区切ることができます。

このようなファイルの内容の例を次に示します。

eventSourceARN
asdf1, SS

ここでは 3 つのキーが定義されています。このようなファイルが存在すると、JsonMasker はそれを使用して指定されたキーの値をマスクします。

使用方法を示すサンプルコードは次のとおりです。

private final static JsonMasker masker = JsonMasker.INSTANCE();
// . . .
logger.info("Received: " + masker.mask(new String(payload, StandardCharsets.UTF_8)));