Apache Cassandra サポート

Spring Integration は、Apache Cassandra クラスターに対してデータベース操作を実行するためのチャネルアダプター (バージョン 6.0 以降) を提供します。これは完全に Spring Data for Apache Cassandra プロジェクトに基づいています。

この依存関係をプロジェクトに含める必要があります。

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-cassandra</artifactId>
    <version>6.4.2</version>
</dependency>
compile "org.springframework.integration:spring-integration-cassandra:6.4.2"

Cassandra 送信コンポーネント

CassandraMessageHandler は AbstractReplyProducingMessageHandler 実装であり、一方向 (デフォルト) モードとリクエスト / 応答モード (producesReply オプション) の両方で動作します。これはデフォルトで非同期 (リセットする setAsync(false) ) であり、提供された ReactiveCassandraOperations に対してリアクティブな INSERTUPDATEDELETE または STATEMENT 操作を実行します。動作の型は、CassandraMessageHandler.Type オプションを介して構成できます。ingestQuery はモードを INSERT に設定します。query または statementExpression、または statementProcessor は、モードを STATEMENT に設定します。

次のコードスニペットは、このチャネルアダプターまたはゲートウェイのさまざまな構成を示しています。

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@Bean
IntegrationFlow cassandraSelectFlow(ReactiveCassandraOperations cassandraOperations) {
    return flow -> flow
            .handle(Cassandra.outboundGateway(cassandraOperations)
                    .query("SELECT * FROM book WHERE author = :author limit :size")
                    .parameter("author", "payload")
                    .parameter("size", m -> m.getHeaders().get("limit")))
            .channel(c -> c.flux("resultChannel"));
}
@Bean
fun outboundReactive(cassandraOperations: ReactiveCassandraOperations) =
    integrationFlow {
        handle(
            Cassandra.outboundChannelAdapter(cassandraOperations)
                              .statementExpression("T(QueryBuilder).truncate('book').build()")
        ) { async(false) }
    }
@ServiceActivator(inputChannel = "cassandraSelectChannel")
@Bean
public MessageHandler cassandraMessageHandler() {
    CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template);
    cassandraMessageHandler.setQuery("SELECT * FROM book WHERE author = :author limit :size");

    Map<String, Expression> params = new HashMap<>();
    params.put("author", PARSER.parseExpression("payload"));
    params.put("size", PARSER.parseExpression("headers.limit"));

    cassandraMessageHandler.setParameterExpressions(params);

    cassandraMessageHandler.setOutputChannel(resultChannel());
    cassandraMessageHandler.setProducesReply(true);
    return cassandraMessageHandler;
}
<int-cassandra:outbound-channel-adapter id="outboundAdapter"
                                        cassandra-template="cassandraTemplate"
                                        write-options="writeOptions"
                                        auto-startup="false"
                                        async="false"/>

<int-cassandra:outbound-gateway id="outgateway"
                                request-channel="input"
                                cassandra-template="cassandraTemplate"
                                mode="STATEMENT"
                                write-options="writeOptions"
                                query="SELECT * FROM book limit :size"
                                reply-channel="resultChannel"
                                auto-startup="true">
    <int-cassandra:parameter-expression name="author" expression="payload"/>
    <int-cassandra:parameter-expression name="size" expression="headers.limit"/>
</int-cassandra:outbound-gateway>

デフォルトの非同期モードで CassandraMessageHandler がゲートウェイとして使用される場合、Mono<WriteResult> が生成され、提供された MessageChannel 実装に従って処理されます。真のリアクティブ処理を行うには、出力チャネル構成に FluxMessageChannel をお勧めします。同期モードでは、応答値を取得するために Mono.block() が呼び出されます。

INSERTUPDATE、または DELETE 操作が実行される場合、エンティティ ( org.springframework.data.cassandra.core.mapping.Table とマークされている) がリクエストメッセージペイロードで予期されます。ペイロードがエンティティのリストである場合、それぞれのバッチ操作が実行されます。

ingestQuery モードでは、ペイロードが挿入する値のマトリックス (List<List<?>>) として存在することが想定されます。例: エンティティが次のような場合:

@Table("book")
public record Book(@PrimaryKey String isbn,
                   String title,
                   @Indexed String author,
                   int pages,
                   LocalDate saleDate,
                   boolean isInStock) {

}

また、チャネルアダプターには次の構成があります。

@Bean
public MessageHandler cassandraMessageHandler3() {
    CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template);
    String cqlIngest = "insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)";
    cassandraMessageHandler.setIngestQuery(cqlIngest);
    cassandraMessageHandler.setAsync(false);
    return cassandraMessageHandler;
}

リクエストメッセージのペイロードは、次のように変換する必要があります。

List<List<Object>> ingestBooks =
    payload.stream()
            .map(book ->
                    List.<Object>of(
                            book.isbn(),
                            book.title(),
                            book.author(),
                            book.pages(),
                            book.saleDate(),
                            book.isInStock()))
            .toList();

より高度なユースケースでは、ペイロードを com.datastax.oss.driver.api.core.cql.Statement のインスタンスとして使用できます。Apache Cassandra に対して実行するさまざまなステートメントを構築するには、com.datastax.oss.driver.api.querybuilder.QueryBuilder API を使用することをお勧めします。例: Book テーブルからすべてのデータを削除するには、次のようなペイロードを持つメッセージを CassandraMessageHandler に送信できます: QueryBuilder.truncate("book").build()。または、リクエストメッセージに基づくロジックの場合は、CassandraMessageHandler に statementExpression または statementProcessor を提供して、そのメッセージに基づいて Statement を構築できます。便宜上、com.datastax.oss.driver.api.querybuilder は SpEL 評価コンテキストに import として登録されるため、ターゲット式は次のように単純になります。

statement-expression="T(QueryBuilder).selectFrom("book").all()"

setParameterExpressions(Map<String, Expression> parameterExpressions) は、バインド可能な名前付きクエリパラメーターを表し、setQuery(String query) オプションでのみ使用されます。上記の Java および XML のサンプルを参照してください。