Apache Cassandra サポート
Spring Integration は、Apache Cassandra クラスターに対してデータベース操作を実行するためのチャネルアダプター (バージョン 6.0 以降) を提供します。これは完全に Spring Data for Apache Cassandra プロジェクトに基づいています。
この依存関係をプロジェクトに含める必要があります。
Cassandra 送信コンポーネント
CassandraMessageHandler
は AbstractReplyProducingMessageHandler
実装であり、一方向 (デフォルト) モードとリクエスト / 応答モード (producesReply
オプション) の両方で動作します。これはデフォルトで非同期 (リセットする setAsync(false)
) であり、提供された ReactiveCassandraOperations
に対してリアクティブな INSERT
、UPDATE
、DELETE
または STATEMENT
操作を実行します。動作の型は、CassandraMessageHandler.Type
オプションを介して構成できます。ingestQuery
はモードを INSERT
に設定します。query
または statementExpression
、または statementProcessor
は、モードを STATEMENT
に設定します。
次のコードスニペットは、このチャネルアダプターまたはゲートウェイのさまざまな構成を示しています。
Java DSL
Kotlin DSL
Java
XML
@Bean
IntegrationFlow cassandraSelectFlow(ReactiveCassandraOperations cassandraOperations) {
return flow -> flow
.handle(Cassandra.outboundGateway(cassandraOperations)
.query("SELECT * FROM book WHERE author = :author limit :size")
.parameter("author", "payload")
.parameter("size", m -> m.getHeaders().get("limit")))
.channel(c -> c.flux("resultChannel"));
}
@Bean
fun outboundReactive(cassandraOperations: ReactiveCassandraOperations) =
integrationFlow {
handle(
Cassandra.outboundChannelAdapter(cassandraOperations)
.statementExpression("T(QueryBuilder).truncate('book').build()")
) { async(false) }
}
@ServiceActivator(inputChannel = "cassandraSelectChannel")
@Bean
public MessageHandler cassandraMessageHandler() {
CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template);
cassandraMessageHandler.setQuery("SELECT * FROM book WHERE author = :author limit :size");
Map<String, Expression> params = new HashMap<>();
params.put("author", PARSER.parseExpression("payload"));
params.put("size", PARSER.parseExpression("headers.limit"));
cassandraMessageHandler.setParameterExpressions(params);
cassandraMessageHandler.setOutputChannel(resultChannel());
cassandraMessageHandler.setProducesReply(true);
return cassandraMessageHandler;
}
<int-cassandra:outbound-channel-adapter id="outboundAdapter"
cassandra-template="cassandraTemplate"
write-options="writeOptions"
auto-startup="false"
async="false"/>
<int-cassandra:outbound-gateway id="outgateway"
request-channel="input"
cassandra-template="cassandraTemplate"
mode="STATEMENT"
write-options="writeOptions"
query="SELECT * FROM book limit :size"
reply-channel="resultChannel"
auto-startup="true">
<int-cassandra:parameter-expression name="author" expression="payload"/>
<int-cassandra:parameter-expression name="size" expression="headers.limit"/>
</int-cassandra:outbound-gateway>
デフォルトの非同期モードで CassandraMessageHandler
がゲートウェイとして使用される場合、Mono<WriteResult>
が生成され、提供された MessageChannel
実装に従って処理されます。真のリアクティブ処理を行うには、出力チャネル構成に FluxMessageChannel
をお勧めします。同期モードでは、応答値を取得するために Mono.block()
が呼び出されます。
INSERT
、UPDATE
、または DELETE
操作が実行される場合、エンティティ ( org.springframework.data.cassandra.core.mapping.Table
とマークされている) がリクエストメッセージペイロードで予期されます。ペイロードがエンティティのリストである場合、それぞれのバッチ操作が実行されます。
ingestQuery
モードでは、ペイロードが挿入する値のマトリックス (List<List<?>>
) として存在することが想定されます。例: エンティティが次のような場合:
@Table("book")
public record Book(@PrimaryKey String isbn,
String title,
@Indexed String author,
int pages,
LocalDate saleDate,
boolean isInStock) {
}
また、チャネルアダプターには次の構成があります。
@Bean
public MessageHandler cassandraMessageHandler3() {
CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template);
String cqlIngest = "insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)";
cassandraMessageHandler.setIngestQuery(cqlIngest);
cassandraMessageHandler.setAsync(false);
return cassandraMessageHandler;
}
リクエストメッセージのペイロードは、次のように変換する必要があります。
List<List<Object>> ingestBooks =
payload.stream()
.map(book ->
List.<Object>of(
book.isbn(),
book.title(),
book.author(),
book.pages(),
book.saleDate(),
book.isInStock()))
.toList();
より高度なユースケースでは、ペイロードを com.datastax.oss.driver.api.core.cql.Statement
のインスタンスとして使用できます。Apache Cassandra に対して実行するさまざまなステートメントを構築するには、com.datastax.oss.driver.api.querybuilder.QueryBuilder
API を使用することをお勧めします。例: Book
テーブルからすべてのデータを削除するには、次のようなペイロードを持つメッセージを CassandraMessageHandler
に送信できます: QueryBuilder.truncate("book").build()
。または、リクエストメッセージに基づくロジックの場合は、CassandraMessageHandler
に statementExpression
または statementProcessor
を提供して、そのメッセージに基づいて Statement
を構築できます。便宜上、com.datastax.oss.driver.api.querybuilder
は SpEL 評価コンテキストに import
として登録されるため、ターゲット式は次のように単純になります。
statement-expression="T(QueryBuilder).selectFrom("book").all()"
setParameterExpressions(Map<String, Expression> parameterExpressions)
は、バインド可能な名前付きクエリパラメーターを表し、setQuery(String query)
オプションでのみ使用されます。上記の Java および XML のサンプルを参照してください。