分散ロック

多くの状況では、あるコンテキスト (または単一のメッセージ) に対するアクションは排他的な方法で実行する必要があります。1 つの例は、現在のメッセージのメッセージグループの状態をチェックして、グループを解放できるか、それとも将来の検討のためにそのメッセージを追加できるかを判断する必要があるアグリゲーターコンポーネントです。この目的のために、Java は java.util.concurrent.locks.Lock 実装を備えた API を提供します。ただし、アプリケーションがクラスター内で分散および / または実行される場合、問題はさらに複雑になります。この場合のロックは困難であり、排他性要件を達成するには、何らかの共有状態とその特定のアプローチが必要です。

Spring Integration は、ReentrantLock API に基づくメモリ内 DefaultLockRegistry 実装を備えた LockRegistry 抽象化を提供します。LockRegistry の obtain(Object) メソッドは、特定のコンテキストで lock key を必要とします。たとえば、アグリゲータは correlationKey を使用してグループ全体の操作をロックします。これにより、複数のロックを同時に使用できます。この obtain(Object) メソッドは、LockRegistry 実装に応じて java.util.concurrent.locks.Lock インスタンスを返します。残りのロジックは標準的な Java 並行処理アルゴリズムと同じです。

バージョン 6.2 以降、LockRegistry は、ロック中にいくつかのタスクを実行するための executeLocked() API (このインターフェースの default メソッド) を提供します。この API の動作は、よく知られている JdbcTemplateJmsTemplate、または RestTemplate と似ています。次の例は、この API の使用箇所を示しています。

LockRegistry registry = new DefaultLockRegistry();
...
registry.executeLocked("someLockKey", () -> someExclusiveResourceCall());

このメソッドはタスク呼び出しから例外を再スローし、Lock が中断された場合は InterruptedException をスローします。さらに、Duration を使用するバリアントは、lock.tryLock() が false を返すときに java.util.concurrent.TimeoutException をスローします。

Spring Integration は、分散ロック用に次の LockRegistry 実装を提供します。

Spring Cloud AWS (英語) には DynamoDbLockRegistry も備わっています。

バージョン 7.0 以降、DistributedLock インターフェースが導入され、カスタム TTL(time-to-live)でロックを取得するための新しいメソッド(lock(Duration ttl)と tryLock(long time, TimeUnit unit, Duration ttl) が提供されるようになりました。JdbcLock と RedisLock はどちらも、カスタマイズされた TTL(time-to-live)機能をサポートするために DistributedLock インターフェースを実装しています。LockRegistry<L extends Lock> は、Lock を継承する型の汎用インターフェースになりました。RenewableLockRegistry インターフェースは、新しい renewLock(Object lockKey, Duration ttl) メソッドを提供し、カスタム TTL 値でロックを更新できるようになりました。JdbcLockRegistry と RedisLockRegistry はどちらも、型パラメーター DistributedLock を持つ LockRegistry と RenewableLockRegistry インターフェースを実装しています。

以下は、レジストリから DistributedLock を取得し、特定の有効期間値で取得する方法の例です。

DistributedLock lock = registry.obtain("foo");
Duration timeToLive = Duration.ofMillis(500);

if(lock.tryLock(100, TimeUnit.MILLISECONDS, timeToLive)){
    try {
        // do something
    } catch (Exception e) {
        // handle exception
    } finally{
        lock. unlock();
    }
}