JDBC チャネルメッセージストア JSON 直列化

バージョン 7.0 では、JdbcChannelMessageStore に JSON 直列化のサポートが導入されました。Spring Integration はデフォルトで Java 直列化を使用してメッセージをデータベースに保存します。新しい JSON 直列化オプションは、代替の直列化メカニズムを提供します。

セキュリティに関する考慮事項 : JSON 直列化では、メッセージのコンテンツがデータベースにテキストとして保存されるため、機密データが漏洩する可能性があります。本番環境で JSON 直列化を使用する前に、適切なデータベースアクセス制御と保存データの暗号化を確保し、組織のデータ保護要件を考慮してください。

構成

JSON の (デ) 直列化には 2 つのコンポーネントが利用可能です。

  • JsonChannelMessageStorePreparedStatementSetter - メッセージを JSON に直列化する

  • JsonMessageRowMapper - JSON からメッセージをデシリアライズする

@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
    JdbcChannelMessageStore store = new JdbcChannelMessageStore(dataSource);
    store.setChannelMessageStoreQueryProvider(
        new PostgresChannelMessageStoreQueryProvider());

    // Enable JSON serialization
    store.setPreparedStatementSetter(
        new JsonChannelMessageStorePreparedStatementSetter());
    store.setMessageRowMapper(
        new JsonMessageRowMapper("com.example"));

    return store;
}

文字列パラメーター("com.example")は、デシリアライズの対象となる追加の信頼済みパッケージを指定します。これらのパッケージは、デフォルトの信頼済みパッケージ(信頼できるパッケージセクションを参照)に追加されます。セキュリティ上の理由から、信頼済みパッケージのクラスのみがデシリアライズ可能です。

データベーススキーマの変更

JSON 直列化にはデータベーススキーマの変更が必要ですBLOB/BYTEA 列型を含むデフォルトスキーマは JSON 直列化には使用できません。

MESSAGE_CONTENT 列は、JSON を保存できるテキストベースの型に変更する必要があります。

  • PostgreSQL

  • MySQL

  • H2

  • その他のデータベース

PostgreSQL の場合、JSONB 型を使用できます。

-- JSONB (enables JSON queries)
ALTER TABLE INT_CHANNEL_MESSAGE
ALTER COLUMN MESSAGE_CONTENT TYPE JSONB
USING MESSAGE_CONTENT::text::jsonb;

MySQL の場合、JSON 型を使用できます。

-- JSON type (enables JSON functions)
ALTER TABLE INT_CHANNEL_MESSAGE
MODIFY COLUMN MESSAGE_CONTENT JSON;

H2 データベースの場合、CLOB 型を使用できます。

ALTER TABLE INT_CHANNEL_MESSAGE
ALTER COLUMN MESSAGE_CONTENT CLOB;

大きなテキスト列 (CLOB、TEXT など) をサポートするデータベースでは、MESSAGE_CONTENT 列を適切なテキスト型に変更できます。

JSON 直列化のサンプルスキーマ

次の例は、JSON ベースのメッセージストレージ専用のテーブルを作成する方法を示しています。

  • PostgreSQL

  • MySQL

  • H2

CREATE TABLE JSON_CHANNEL_MESSAGE (
   MESSAGE_ID CHAR(36) NOT NULL,
   GROUP_KEY CHAR(36) NOT NULL,
   CREATED_DATE BIGINT NOT NULL,
   MESSAGE_PRIORITY BIGINT,
   MESSAGE_SEQUENCE BIGINT NOT NULL DEFAULT nextval('JSON_MESSAGE_SEQ'),
   MESSAGE_CONTENT JSONB, -- JSON message content
   REGION VARCHAR(100) NOT NULL,
   CONSTRAINT JSON_CHANNEL_MESSAGE_PK
      PRIMARY KEY (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE)
);
CREATE TABLE JSON_CHANNEL_MESSAGE (
   MESSAGE_ID CHAR(36) NOT NULL,
   GROUP_KEY CHAR(36) NOT NULL,
   CREATED_DATE BIGINT NOT NULL,
   MESSAGE_PRIORITY BIGINT,
   MESSAGE_SEQUENCE BIGINT NOT NULL AUTO_INCREMENT UNIQUE,
   MESSAGE_CONTENT JSON, -- JSON message content
   REGION VARCHAR(100) NOT NULL,
   CONSTRAINT JSON_CHANNEL_MESSAGE_PK
      PRIMARY KEY (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE)
);
CREATE TABLE JSON_CHANNEL_MESSAGE (
   MESSAGE_ID CHAR(36) NOT NULL,
   GROUP_KEY CHAR(36) NOT NULL,
   CREATED_DATE BIGINT NOT NULL,
   MESSAGE_PRIORITY BIGINT,
   MESSAGE_SEQUENCE BIGINT NOT NULL DEFAULT NEXT VALUE FOR JSON_MESSAGE_SEQ,
   MESSAGE_CONTENT CLOB, -- JSON message content
   REGION VARCHAR(100) NOT NULL,
   CONSTRAINT JSON_CHANNEL_MESSAGE_PK
      PRIMARY KEY (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE)
);

JSON 構造

Jackson ベースの直列化を使用する場合、メッセージは Jackson の多態的型処理を使用して次の JSON 構造で保存されます。

{
  "@class": "org.springframework.messaging.support.GenericMessage",
  "payload": {
    "@class": "com.example.OrderMessage",
    "orderId": "ORDER-12345",
    "amount": 1299.99
  },
  "headers": {
    "@class": "java.util.HashMap",
    "priority": ["java.lang.String", "HIGH"],
    "id": ["java.util.UUID", "a1b2c3d4-..."],
    "timestamp": ["java.lang.Long", 1234567890]
  }
}

@class プロパティは、多態型を適切に逆直列化するために必要な型情報を提供します。

JSON コンテンツのクエリ (オプション)

ネイティブ JSON 列型 (PostgreSQL JSONB または MySQL JSON) を使用すると、メッセージの内容を直接照会できます。

PostgreSQL JSONB クエリ

-- Find messages by payload field
SELECT * FROM JSON_CHANNEL_MESSAGE
WHERE MESSAGE_CONTENT @> '{"payload": {"orderId": "ORDER-12345"}}';

-- Find high-priority messages
SELECT * FROM JSON_CHANNEL_MESSAGE
WHERE MESSAGE_CONTENT -> 'headers' @> '{"priority": ["java.lang.String", "HIGH"]}';

MySQL JSON 関数

-- Find messages by payload field
SELECT * FROM JSON_CHANNEL_MESSAGE
WHERE JSON_EXTRACT(MESSAGE_CONTENT, '$.payload.orderId') = 'ORDER-12345';

-- Find high-priority messages
SELECT * FROM JSON_CHANNEL_MESSAGE
WHERE JSON_EXTRACT(MESSAGE_CONTENT, '$.headers.priority[1]') = 'HIGH';

TEXT または CLOB 列型を使用している場合、これらの JSON 固有のクエリは使用できません。ただし、Spring Integration を介した保存と取得では、JSON 直列化は引き続き機能します。

信頼できるパッケージ

JacksonMessagingUtils.messagingAwareMapper() は、セキュリティの脆弱性を防ぐために、すべてのデ直列化されたクラスを信頼できるパッケージリストに対して検証します。

デフォルトの信頼できるパッケージには以下が含まれます: - java.util - java.lang - org.springframework.messaging.support - org.springframework.integration.support - org.springframework.integration.message - org.springframework.integration.store - org.springframework.integration.history - org.springframework.integration.handler

追加のパッケージはコンストラクターで指定でき、このリストに追加されます。

// Trust additional packages for custom payload types
new JsonMessageRowMapper("com.example.orders", "com.example.payments")

カスタム JsonObjectMapper

高度なシナリオでは、カスタム JsonObjectMapper を提供できます。

import org.springframework.integration.support.json.JacksonJsonObjectMapper;
import org.springframework.integration.support.json.JacksonMessagingUtils;
import tools.jackson.databind.ObjectMapper;
import tools.jackson.databind.SerializationFeature;

@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
    ObjectMapper customMapper = JacksonMessagingUtils.messagingAwareMapper("com.example");
    customMapper.enable(SerializationFeature.INDENT_OUTPUT);
    customMapper.registerModule(new CustomModule());

    JacksonJsonObjectMapper jsonObjectMapper = new JacksonJsonObjectMapper(customMapper);

    JdbcChannelMessageStore store = new JdbcChannelMessageStore(dataSource);
    store.setPreparedStatementSetter(
        new JsonChannelMessageStorePreparedStatementSetter(jsonObjectMapper));
    store.setMessageRowMapper(
        new JsonMessageRowMapper(jsonObjectMapper));

    return store;
}

カスタム JsonObjectMapper は、Spring Integration メッセージの直列化に合わせて適切に設定する必要があります。まず JacksonMessagingUtils.messagingAwareMapper() から始めて、そこからカスタマイズすることをお勧めします。一貫性のある直列化とデ直列化を実現するには、JsonChannelMessageStorePreparedStatementSetter と JsonMessageRowMapper の両方で同じ設定を使用する必要があります。