Pulsar Functions

Spring for Apache Pulsar は、Pulsar IO [Apache] (英語) (コネクター) および Pulsar Functions [Apache] (英語) の基本的なサポートを提供し、ユーザーが sourcesprocessorssinks で構成されるストリーム処理パイプラインを定義できるようにします。sources および sinks は Pulsar IO (コネクター) によってモデル化され、processors は Pulsar Functions によって表されます。

コネクターは単なる特殊な機能であるため、簡単にするために、ソース、シンク、機能を総称して "Pulsar Functions" と呼びます。
前提条件

親しみやすさ - 利用者は Pulsar IO および Pulsar Functions にある程度慣れていることが期待されます。そうでない場合は、入門ガイドを参照すると役立つ場合があります。

有効な機能 - これらの機能を使用するには、Apache Pulsar でサポートされている機能を有効にして構成する必要があります (デフォルトでは無効になっています)。組み込みコネクターも Pulsar クラスターにインストールする必要がある場合があります。

詳細については、Pulsar IO [Apache] (英語) および Pulsar Functions [Apache] (英語) のドキュメントを参照してください。

1. Pulsar Function 管理

このフレームワークは、Pulsar functions を管理するための PulsarFunctionAdministration コンポーネントを提供します。Pulsar Spring Boot スターターを使用すると、PulsarFunctionAdministration が自動構成されます。

デフォルトでは、アプリケーションは localhost:8080 にあるローカル Pulsar インスタンスへの接続を試行します。ただし、すでに構成されている PulsarAdministration を利用するため、利用可能なクライアントオプション (認証を含む) については Pulsar 管理クライアントを参照してください。追加の構成オプションは、spring.pulsar.function.* アプリケーションのプロパティで使用できます。

2. 自動関数管理

アプリケーションの起動時に、フレームワークはアプリケーションコンテキスト内のすべての PulsarFunctionPulsarSinkPulsarSource Bean を検索します。Bean ごとに、対応する Pulsar function が作成または更新されます。関数型、関数構成、関数がすでに存在するかどうかに基づいて、適切な API が呼び出されます。

PulsarFunctionPulsarSinkPulsarSource Bean は、それぞれ Apache Pulsar 構成オブジェクト FunctionConfigSinkConfigSourceConfig の単純なラッパーです。サポートされているコネクター (およびそのさまざまな構成) が多数あるため、フレームワークはさまざまな Apache Pulsar コネクターをミラーリングするための構成プロパティ階層を作成しようとしません。代わりに、完全な構成オブジェクトを提供するのはユーザーの負担となり、フレームワークは提供された構成を使用して管理 (作成 / 更新) を処理します。

アプリケーションのシャットダウン時には、アプリケーションの起動中に処理されたすべての機能に停止ポリシーが適用され、放置されるか、停止されるか、Pulsar サーバーから削除されます。

3. 制限

3.1. ノーマジック Pulsar Functions

Pulsar functions およびカスタムコネクターは、カスタムアプリケーションコード (例: java.util.Function) によって表されます。カスタムコードを自動的に登録するための魔法のサポートはありません。これは素晴らしいことですが、いくつかの技術的な課題があり、まだ実装されていません。関数 (またはカスタムコネクター) が関数構成で指定された場所で利用可能であることを確認するのはユーザーの責任です。例: 関数 config の jar 値が ./some/path/MyFunction.jar である場合、指定されたパスに関数 jar ファイルが存在する必要があります。

3.2. 名前の識別子

関数構成の name プロパティは、更新操作または作成操作を実行するかどうかを決定するために、関数がすでに存在するかどうかを判断するための識別子として使用されます。機能の更新が必要な場合は、名前を変更しないでください。

4. 構成

4.1. Pulsar Function アーカイブ

各 Pulsar function は、実際のアーカイブ (jar ファイルなど) によって表されます。アーカイブへのパスは、ソースとシンクの場合は archive プロパティ、関数の場合は jar プロパティを介して指定されます。

次のルールによってパスの「型」が決まります。

  • (file|http|https|function|sink|source):// で開始する場合、パスは URL になります。

  • builtin:// で起動するとパスが組み込まれます (提供されているすぐに使用できるコネクターの 1 つを指します)

  • それ以外の場合、パスはローカルです。

作成 / 更新操作中に発生するアクションは、次のようにパスの「型」に依存します。

  • パスが URL の場合、コンテンツはサーバーによってダウンロードされます

  • パスが組み込まれている場合、コンテンツはサーバー上ですでに利用可能です

  • パスがローカルの場合、コンテンツはサーバーにアップロードされます

4.2. 内蔵ソースとシンク

Apache Pulsar は、すぐに使用できる多くのソースコネクターとシンクコネクター (組み込みコネクター) を提供します。内蔵コネクターを使用するには、archive を builtin://<connector-type> (例: builtin://rabbit) に設定するだけです。

5. カスタム関数

カスタム関数を開発およびパッケージ化する方法の詳細については、Pulsar ドキュメント [Apache] (英語) を参照してください。ただし、大まかな要件は次のとおりです。

  • コードは Java8 を使用します

  • コードは java.util.Function または org.apache.pulsar.functions.api.Function のいずれかを実装します

  • uber jar としてパッケージ化

関数がビルドされパッケージ化されたら、関数を登録できるようにする方法がいくつかあります。

5.1. file://

jar ファイルをサーバーにアップロードし、関数構成の jar プロパティの file:// 経由で参照できます。

5.2. ローカル

jar ファイルはローカルに残し、関数構成の jar プロパティのローカルパスを介して参照できます。

5.3. http://

jar ファイルは、HTTP サーバー経由で利用可能にし、関数構成の jar プロパティの http(s):// 経由で参照できます。

5.4. function://

jar ファイルは Pulsar パッケージマネージャーにアップロードし、関数構成の jar プロパティの function:// 経由で参照できます。

6. サンプル

以下に、PulsarFunctionAdministration がバッキング Pulsar ソースコネクターを自動作成する PulsarSource Bean を構成する方法を示す例をいくつか示します。

PulsarSource 内蔵 Rabbit コネクターを使用
@Bean
PulsarSource rabbitSource() {
    Map<String, Object> configs = new HashMap<>();
    configs.put("host", "my.rabbit.host");
    configs.put("port", 5672);
    configs.put("virtualHost", "/");
    configs.put("username", "guest");
    configs.put("password", "guest");
    configs.put("queueName", "test_rabbit");
    configs.put("connectionName", "test-connection");
    SourceConfig sourceConfig = SourceConfig.builder()
            .tenant("public")
            .namespace("default")
            .name("rabbit-test-source")
            .archive("builtin://rabbitmq")
            .topicName("incoming_rabbit")
            .configs(configs).build();
    return new PulsarSource(sourceConfig, null);
}

次の例は、構成の負担を軽減するために Spring Boot 自動構成 RabbitProperties を使用することを除いて、前の例と同じです。もちろん、これにはアプリケーションが Rabbit 自動構成を有効にして Spring Boot を使用している必要があります。

PulsarSource 内蔵 Rabbit コネクターおよび Spring Boot RabbitProperties を使用
@Bean
PulsarSource rabbitSourceWithBootProps(RabbitProperties props) {
    Map<String, Object> configs = new HashMap<>();
    configs.put("host", props.determineHost());
    configs.put("port", props.determinePort());
    configs.put("virtualHost", props.determineVirtualHost());
    configs.put("username", props.determineUsername());
    configs.put("password", props.determinePassword());
    configs.put("queueName", "test_rabbit");
    configs.put("connectionName", "test-connection");
    SourceConfig sourceConfig = SourceConfig.builder()
            .tenant("public")
            .namespace("default")
            .name("rabbit-test-source")
            .archive("builtin://rabbitmq")
            .topicName("incoming_rabbit")
            .configs(configs).build();
    return new PulsarSource(sourceConfig, null);
}
より複雑な例については、Pulsar Functions を使用したサンプルストリームパイプライン [GitHub] (英語) サンプルアプリを参照してください。