Spring のデータ統合への道のりの簡単な歴史

データ統合に関する Spring の旅は、Spring Integration から始まりました。そのプログラミングモデルにより、エンタープライズ統合パターン (英語) を採用して、データベース、メッセージブローカーなどの外部システムに接続できるアプリケーションを構築するための一貫した開発者エクスペリエンスを提供しました。

エンタープライズ環境でマイクロサービスが目立つようになったクラウド時代に早送りします。Spring Boot は、開発者がアプリケーションを構築する方法を変革しました。Spring のプログラミングモデルと Spring Boot によって処理されるランタイムの責任により、スタンドアロンの本番グレードの Spring ベースのマイクロサービスの開発がシームレスになりました。

これをデータ統合ワークロードに拡張するために、Spring Integration と Spring Boot が新しいプロジェクトにまとめられました。Spring Cloud Stream が誕生しました。

Spring Cloud Stream を使用すると、開発者は次のことができます。

  • データ中心のアプリケーションを個別に構築、テスト、デプロイします。

  • メッセージングによる構成を含む、最新のマイクロサービスアーキテクチャパターンを適用します。

  • アプリケーションの責任をイベント中心の考え方から切り離します。イベントは、時間内に発生した何かを表すことができ、ダウンストリームのコンシューマーアプリケーションは、それがどこで発生したか、プロデューサーの ID を知らなくても反応できます。

  • ビジネスロジックをメッセージブローカー (RabbitMQ、Apache Kafka、Amazon、Kinesis など) に移植します。

  • 一般的なユースケースについては、フレームワークの自動コンテンツ型サポートに依存してください。さまざまなデータ変換型に拡張することが可能です。

  • などなど . . .

クイックスタート

この 3 ステップのガイドに従うことで、詳細に飛び込む前でも、5 分以内に Spring Cloud Stream を試すことができます。

選択したメッセージングミドルウェアからのメッセージを受信し(これについては後で詳しく説明します)、受信したメッセージをコンソールに記録する Spring Cloud Stream アプリケーションを作成する方法を示します。これを LoggingConsumer と呼びます。あまり実用的ではありませんが、主要な概念と抽象化のいくつかを導入しているため、このユーザーガイドの残りの部分を簡単に理解できます。

3 つのステップは次のとおりです。

Spring Initializr を使用したサンプルアプリケーションの作成

開始するには、Spring Initializr にアクセスしてください。そこから、LoggingConsumer アプリケーションを生成できます。そうするには:

  1. 依存関係セクションで、stream の入力を開始します。「クラウドストリーム」オプションが表示されたら、それを選択します。

  2. "kafka" または "rabbit" のいずれかを入力し始めます。

  3. "Kafka" または "RabbitMQ" を選択します。

    基本的に、アプリケーションがバインドするメッセージングミドルウェアを選択します。すでにインストールされているものを使用するか、インストールと実行に慣れているものを使用することをお勧めします。また、Initilaizer 画面からわかるように、選択できる他のオプションがいくつかあります。例: Maven(デフォルト)の代わりに、ビルドツールとして Gradle を選択できます。

  4. 成果物フィールドに、"logging-consumer" と入力します。

    成果物フィールドの値がアプリケーション名になります。ミドルウェアに RabbitMQ を選択した場合、Spring Initializr は次のようになります。

spring initializr
  1. プロジェクトを生成するボタンをクリックします。

    そうすることで、生成されたプロジェクトの圧縮バージョンがハードドライブにダウンロードされます。

  2. プロジェクトディレクトリとして使用するフォルダーにファイルを解凍します。

Spring Initializr で利用可能な多くの可能性を探索することをお勧めします。さまざまな種類の Spring アプリケーションを作成できます。

プロジェクトを IDE にインポートする

これで、プロジェクトを IDE にインポートできます。IDE によっては、特定のインポート手順に従う必要がある場合があることに注意してください。例: プロジェクトの生成方法(Maven または Gradle)によっては、特定のインポート手順に従う必要がある場合があります(たとえば、Eclipse または STS では、ファイル→インポート→ Maven →既存の Maven プロジェクトを使用する必要があります)。

インポートしたプロジェクトには、いかなる種類のエラーがあってはなりません。また、src/main/java には com.example.loggingconsumer.LoggingConsumerApplication が含まれている必要があります。

技術的には、この時点で、アプリケーションのメインクラスを実行できます。すでに有効な Spring Boot アプリケーションです。ただし、何も実行されないため、コードを追加します。

メッセージハンドラーの追加、構築、実行

com.example.loggingconsumer.LoggingConsumerApplication クラスを次のように変更します。

@SpringBootApplication
public class LoggingConsumerApplication {

	public static void main(String[] args) {
		SpringApplication.run(LoggingConsumerApplication.class, args);
	}

	@Bean
	public Consumer<Person> log() {
	    return person -> {
	        System.out.println("Received: " + person);
	    };
	}

	public static class Person {
		private String name;
		public String getName() {
			return name;
		}
		public void setName(String name) {
			this.name = name;
		}
		public String toString() {
			return this.name;
		}
	}
}

前のリストからわかるように:

  • 関数型プログラミングモデル(【 Spring クラウド機能対応】を参照)を使用して、単一のメッセージハンドラーを Consumer として定義しています。

  • このようなハンドラーをバインダーによって公開された入力宛先バインディングにバインドするために、フレームワークの規則に依存しています。

そうすることで、フレームワークのコア機能の 1 つを確認することもできます。受信メッセージのペイロードを型 Person に自動的に変換しようとします。

これで、メッセージをリッスンする完全に機能する Spring Cloud Stream アプリケーションができました。ここから、簡単にするために、ステップ 1 で RabbitMQ を選択したと仮定します。RabbitMQ がインストールされて実行されていると仮定すると、IDE で main メソッドを実行することでアプリケーションを起動できます。

次の出力が表示されます。

	--- [ main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg, bound to: input
	--- [ main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
	--- [ main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#2a3a299:0/SimpleConnection@66c83fc8. . .
	. . .
	--- [ main] o.s.i.a.i.AmqpInboundChannelAdapter      : started inbound.input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg
	. . .
	--- [ main] c.e.l.LoggingConsumerApplication         : Started LoggingConsumerApplication in 2.531 seconds (JVM running for 2.897)

RabbitMQ 管理コンソールまたはその他の RabbitMQ クライアントに移動し、input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg にメッセージを送信します。anonymous.CbMIwdkJSBO1ZoPDOtHtCg 部分はグループ名を表し、生成されるため、環境によって異なるものになるはずです。より予測可能なものについては、spring.cloud.stream.bindings.input.group=hello (または任意の名前)を設定することにより、明示的なグループ名を使用できます。

メッセージの内容は、次のように Person クラスの JSON 表現である必要があります。

{"name":"Sam Spade"}

次に、コンソールに次のように表示されます。

Received: Sam Spade

また、アプリケーションをビルドして Boot jar にパッケージ化し(./mvnw clean install を使用)、ビルドされた JAR を java -jar コマンドを使用して実行することもできます。

これで、(非常に基本的ではありますが)Spring Cloud Stream アプリケーションが機能します。

ストリーミングデータのコンテキストでの Spring 式言語(SpEL)

このリファレンスマニュアル全体を通して、Spring 式言語(SpEL)を利用できる多くの機能と例に出会うでしょう。それを使用することになると、特定の制限を理解することが重要です。

SpEL を使用すると、現在のメッセージだけでなく、実行しているアプリケーションコンテキストにもアクセスできます。ただし、特に受信メッセージのコンテキストにおいて SpEL が認識できるデータの種類を理解することが重要です。ブローカーから、メッセージは byte[] の形式で到着します。その後、バインダーによって Message<byte[]> に変換されますが、メッセージのペイロードは生の形式を維持していることがわかります。メッセージのヘッダーは <String, Object> で、通常、値は別のプリミティブまたはプリミティブのコレクション / 配列、つまり Object です。これは、バインダーはユーザーコード (関数) にアクセスできないため、必要な入力型を認識できないためです。バインダーは、郵便で配達される手紙と同じように、メッセージヘッダーの形式で、ペイロードといくつかの読み取り可能なメタデータを含む封筒を効果的に配信しました。これは、メッセージのペイロードへのアクセスは可能ですが、生データ (つまり、byte[]) としてのみアクセスできることを意味します。また、開発者が具象型 (Foo、Bar など) としてペイロードオブジェクトのフィールドに SpEL アクセスできるようにする機能を要求することは非常に一般的かもしれませんが、それを実現することがいかに難しいか不可能であるかがわかるでしょう。この問題を示す一例を次に示します。ペイロード型に基づいてさまざまな関数にルーティングするルーティング式があると想像してください。この要件は、byte[] から特定の型へのペイロード変換と SpEL の適用を意味します。ただし、そのような変換を実行するには、コンバーターに渡す実際の型を知る必要がありますが、関数のシグネチャーから来ており、どの型であるかはわかりません。この要件を解決するためのより良いアプローチは、型情報をメッセージヘッダー (例: application/json;type=foo.bar.Baz ) として渡すことです。1 年以内にアクセスして評価できる明確で読みやすい String 値と、読みやすい SpEL 式を取得できます。

さらに、ペイロードは特権データ、つまり最終受信者のみが読み取るデータであると見なされるため、ルーティングの決定にペイロードを使用することは非常に悪い習慣であると考えられます。繰り返しますが、郵便配達のたとえを使用すると、郵便配達員が配達を決定するために封筒を開けて手紙の内容を読んでほしくないでしょう。同じ概念が、特にメッセージの生成時にそのような情報を含めるのが比較的簡単な場合に当てはまります。これは、ネットワーク上で送信されるデータの設計、およびそのようなデータのどの部分が公開とみなされ、どの部分が特権的であるかに関する一定レベルの規律を強制します。