Pulsar Functions
Spring for Apache Pulsar は、Pulsar IO [Apache] (英語) (コネクター) および Pulsar Functions [Apache] (英語) の基本的なサポートを提供し、ユーザーが sources
、processors
、sinks
で構成されるストリーム処理パイプラインを定義できるようにします。sources
および sinks
は Pulsar IO (コネクター) によってモデル化され、processors
は Pulsar Functions によって表されます。
コネクターは単なる特殊な機能であるため、簡単にするために、ソース、シンク、機能を総称して "Pulsar Functions" と呼びます。 |
1. Pulsar Function 管理
このフレームワークは、Pulsar functions を管理するための PulsarFunctionAdministration
コンポーネントを提供します。Pulsar Spring Boot スターターを使用すると、PulsarFunctionAdministration
が自動構成されます。
デフォルトでは、アプリケーションは localhost:8080
にあるローカル Pulsar インスタンスへの接続を試行します。ただし、すでに構成されている PulsarAdministration
を利用するため、利用可能なクライアントオプション (認証を含む) については Pulsar 管理クライアントを参照してください。追加の構成オプションは、spring.pulsar.function.*
(英語) アプリケーションのプロパティで使用できます。
2. 自動関数管理
アプリケーションの起動時に、フレームワークはアプリケーションコンテキスト内のすべての PulsarFunction
、PulsarSink
、PulsarSource
Bean を検索します。Bean ごとに、対応する Pulsar function が作成または更新されます。関数型、関数構成、関数がすでに存在するかどうかに基づいて、適切な API が呼び出されます。
PulsarFunction 、PulsarSink 、PulsarSource Bean は、それぞれ Apache Pulsar 構成オブジェクト FunctionConfig 、SinkConfig 、SourceConfig の単純なラッパーです。サポートされているコネクター (およびそのさまざまな構成) が多数あるため、フレームワークはさまざまな Apache Pulsar コネクターをミラーリングするための構成プロパティ階層を作成しようとしません。代わりに、完全な構成オブジェクトを提供するのはユーザーの負担となり、フレームワークは提供された構成を使用して管理 (作成 / 更新) を処理します。 |
アプリケーションのシャットダウン時には、アプリケーションの起動中に処理されたすべての機能に停止ポリシーが適用され、放置されるか、停止されるか、Pulsar サーバーから削除されます。
3. 制限
3.1. ノーマジック Pulsar Functions
Pulsar functions およびカスタムコネクターは、カスタムアプリケーションコード (例: java.util.Function
) によって表されます。カスタムコードを自動的に登録するための魔法のサポートはありません。これは素晴らしいことですが、いくつかの技術的な課題があり、まだ実装されていません。関数 (またはカスタムコネクター) が関数構成で指定された場所で利用可能であることを確認するのはユーザーの責任です。例: 関数 config の jar
値が ./some/path/MyFunction.jar
である場合、指定されたパスに関数 jar ファイルが存在する必要があります。
4. 構成
4.1. Pulsar Function アーカイブ
各 Pulsar function は、実際のアーカイブ (jar ファイルなど) によって表されます。アーカイブへのパスは、ソースとシンクの場合は archive
プロパティ、関数の場合は jar
プロパティを介して指定されます。
次のルールによってパスの「型」が決まります。
(file|http|https|function|sink|source)://
で開始する場合、パスは URL になります。builtin://
で起動するとパスが組み込まれます (提供されているすぐに使用できるコネクターの 1 つを指します)それ以外の場合、パスはローカルです。
作成 / 更新操作中に発生するアクションは、次のようにパスの「型」に依存します。
パスが URL の場合、コンテンツはサーバーによってダウンロードされます
パスが組み込まれている場合、コンテンツはサーバー上ですでに利用可能です
パスがローカルの場合、コンテンツはサーバーにアップロードされます
5. カスタム関数
カスタム関数を開発およびパッケージ化する方法の詳細については、Pulsar ドキュメント [Apache] (英語) を参照してください。ただし、大まかな要件は次のとおりです。
コードは Java8 を使用します
コードは
java.util.Function
またはorg.apache.pulsar.functions.api.Function
のいずれかを実装しますuber jar としてパッケージ化
関数がビルドされパッケージ化されたら、関数を登録できるようにする方法がいくつかあります。
6. サンプル
以下に、PulsarFunctionAdministration
がバッキング Pulsar ソースコネクターを自動作成する PulsarSource
Bean を構成する方法を示す例をいくつか示します。
@Bean
PulsarSource rabbitSource() {
Map<String, Object> configs = new HashMap<>();
configs.put("host", "my.rabbit.host");
configs.put("port", 5672);
configs.put("virtualHost", "/");
configs.put("username", "guest");
configs.put("password", "guest");
configs.put("queueName", "test_rabbit");
configs.put("connectionName", "test-connection");
SourceConfig sourceConfig = SourceConfig.builder()
.tenant("public")
.namespace("default")
.name("rabbit-test-source")
.archive("builtin://rabbitmq")
.topicName("incoming_rabbit")
.configs(configs).build();
return new PulsarSource(sourceConfig, null);
}
次の例は、構成の負担を軽減するために Spring Boot 自動構成 RabbitProperties
を使用することを除いて、前の例と同じです。もちろん、これにはアプリケーションが Rabbit 自動構成を有効にして Spring Boot を使用している必要があります。
@Bean
PulsarSource rabbitSourceWithBootProps(RabbitProperties props) {
Map<String, Object> configs = new HashMap<>();
configs.put("host", props.determineHost());
configs.put("port", props.determinePort());
configs.put("virtualHost", props.determineVirtualHost());
configs.put("username", props.determineUsername());
configs.put("password", props.determinePassword());
configs.put("queueName", "test_rabbit");
configs.put("connectionName", "test-connection");
SourceConfig sourceConfig = SourceConfig.builder()
.tenant("public")
.namespace("default")
.name("rabbit-test-source")
.archive("builtin://rabbitmq")
.topicName("incoming_rabbit")
.configs(configs).build();
return new PulsarSource(sourceConfig, null);
}
より複雑な例については、Pulsar Functions を使用したサンプルストリームパイプライン [GitHub] (英語) サンプルアプリを参照してください。 |