最新の安定バージョンについては、spring-cloud-stream 5.0.0 を使用してください。

エラー処理

このセクションでは、フレームワークによって提供されるエラー処理メカニズムの背後にある一般的な考え方について説明します。例として Rabbit バインダーを使用します。これは、個々のバインダーが、基盤となるブローカー機能(Kafka バインダーなど)に固有のサポートされている特定のメカニズムに対して異なるプロパティのセットを定義するためです。

エラーが発生し、Spring Cloud Stream はそれらに対処するためのいくつかの柔軟なメカニズムを提供します。この手法は、バインダーの実装、基盤となるメッセージングミドルウェアの機能、プログラミングモデルに依存していることに注意してください(これについては後で詳しく説明します)。

メッセージハンドラー (関数) が例外をスローするたびに、バインダーに反映されます。その時点で、バインダーは、Spring Retry [GitHub] (英語) ライブラリによって提供される RetryTemplate を使用して、同じメッセージの再試行を数回 (デフォルトでは 3 回) 試みます。再試行が失敗した場合、メッセージをドロップするか、再処理のためにメッセージを再キューイングするか、失敗したメッセージを DLQ に送信するエラー処理メカニズム次第です。

Rabbit と Kafka の両方がこれらの概念 (特に DLQ) をサポートしています。ただし、他のバインダではそうでない場合もあるため、サポートされているエラー処理オプションの詳細については、個々のバインダのドキュメントを参照してください。

ただし、リアクティブ関数は個々のメッセージを処理せず、フレームワークによって提供されるストリーム (つまり、Flux) をユーザーによって提供されるストリームに接続する方法を提供するため、メッセージハンドラーとしての資格がないことに注意してください。何でこれが大切ですか? これは、再試行テンプレート、失敗したメッセージのドロップ、再試行、DLQ、これらすべてを支援する構成プロパティに関してこのセクションの後半で説明する内容は、メッセージハンドラー (つまり、命令型関数) にの適用されるためです。

Reactive API は、独自の演算子とメカニズムの非常に豊富なライブラリを提供し、単純なメッセージハンドラーの場合よりもはるかに複雑なさまざまなリアクティブなユースケースに固有のエラー処理を支援します。reactor.core.publisher.Flux にある public final Flux<T> retryWhen(Retry retrySpec); などを使用してください。

@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
	return flux -> flux
			.retryWhen(Retry.backoff(3, Duration.ofMillis(1000)))
			.map(v -> v.toUpperCase());
}

失敗したメッセージを削除する

デフォルトでは、システムはエラーハンドラーを提供します。最初のエラーハンドラーは、単にエラーメッセージをログに記録します。2 番目のエラーハンドラーはバインダー固有のエラーハンドラーで、特定のメッセージングシステム (DLQ への送信など) のコンテキストでエラーメッセージを処理します。ただし、(この現在のシナリオでは) 追加のエラー処理構成が提供されていないため、このハンドラーは何もしません。本質的にログに記録された後、メッセージはドロップされます。

許容できる場合もありますが、ほとんどの場合はそうではありません。メッセージの損失を回避するために何らかの回復メカニズムが必要です。

エラーメッセージの処理

前のセクションでは、エラーの原因となったメッセージはデフォルトでログに記録され、ドロップされることを説明しました。フレームワークは、カスタムエラーハンドラー (通知の送信やデータベースへの書き込みなど) を提供するためのメカニズムも公開しています。これは、エラーに関するすべての情報 (スタックトレースなど) とは別に、元のメッセージ (エラーを引き起こしたメッセージ) を含む ErrorMessage を受け入れるように特別に設計された Consumer を追加することで実現できます。

カスタムエラーハンドラーは、フレームワークが提供するエラーハンドラー (ログエラーハンドラーとバインダー固有のエラーハンドラーなど - 前のセクションを参照) と相互に排他的であるため、干渉することはありません。カスタムエラーハンドラーを提供すると、失敗したメッセージを DLQ に送信するように構成されていても、送信は機能しません。
@Bean
public Consumer<ErrorMessage> myErrorHandler() {
	return v -> {
		// send SMS notification code
	};
}

このようなコンシューマーをエラーハンドラーとして識別するために必要なのは、関数名 spring.cloud.stream.bindings.<binding-name>.error-handler-definition=myErrorHandler を指す error-handler-definition プロパティを提供することだけです。

例: バインド名 uppercase-in-0 の場合、プロパティは次のようになります。

spring.cloud.stream.bindings.uppercase-in-0.error-handler-definition=myErrorHandler

特別なマッピング命令を使用してバインディングをより読みやすい名前 spring.cloud.stream.function.bindings.uppercase-in-0=upper にマップすると、このプロパティは次のようになります。

spring.cloud.stream.bindings.upper.error-handler-definition=myErrorHandler.
誤ってそのようなハンドラーを Function として宣言した場合でも、その出力に対して何も行われないことを除いて、ハンドラーは機能します。ただし、そのようなハンドラーがまだ Spring Cloud Function によって提供される機能に依存していることを考えると、ハンドラーに関数合成によって対処したい複雑性がある場合は、関数合成の恩恵を受けることもできます (ただし、可能性は低いです)。

デフォルトのエラーハンドラー

すべての関数 Bean に対して単一のエラーハンドラーが必要な場合は、標準の spring-cloud-stream メカニズムを使用して、既定のプロパティ spring.cloud.stream.default.error-handler-definition=myErrorHandler を定義できます。

DLQ - デッドレターキュー

おそらく最も一般的なメカニズムである DLQ を使用すると、失敗したメッセージを特別な宛先である Dead LetterQueue に送信できます。

構成すると、失敗したメッセージがこの宛先に送信され、その後の再処理または監査と調整が行われます。

次の例を考えてみましょう。

@SpringBootApplication
public class SimpleStreamApplication {

	public static void main(String[] args) throws Exception {
		SpringApplication.run(SimpleStreamApplication.class,
		  "--spring.cloud.function.definition=uppercase",
		  "--spring.cloud.stream.bindings.uppercase-in-0.destination=uppercase",
		  "--spring.cloud.stream.bindings.uppercase-in-0.group=myGroup",
		  "--spring.cloud.stream.rabbit.bindings.uppercase-in-0.consumer.auto-bind-dlq=true"
		);
	}

	@Bean
	public Function<Person, Person> uppercase() {
		return personIn -> {
		   throw new RuntimeException("intentional");
	      });
		};
	}
}

注意として、この例では、プロパティの uppercase-in-0 セグメントは、入力宛先バインディングの名前に対応しています。consumer セグメントは、それがコンシューマーアセットであることを示しています。

DLQ を使用する場合、DLQ 宛先の適切な命名のために、少なくとも group プロパティを提供する必要があります。ただし、この例のように、group は destination プロパティと一緒に使用されることがよくあります。

いくつかの標準プロパティとは別に、uppercase 宛先に対応する uppercase-in-0 バインディングの DLQ 宛先を作成および構成するようにバインダーに指示するように auto-bind-dlq を設定します(対応するプロパティを参照)。これにより、uppercase.myGroup.dlq という名前の追加の Rabbit キューが生成されます(Kafka 固有の Kafka ドキュメントを参照) DLQ プロパティ)。

構成が完了すると、失敗したすべてのメッセージがこの宛先にルーティングされ、元のメッセージが保持されて以降のアクションが実行されます。

また、次のように、エラーメッセージに元のエラーに関連する詳細情報が含まれていることがわかります。

. . . .
x-exception-stacktrace:	org.springframework.messaging.MessageHandlingException: nested exception is
      org.springframework.messaging.MessagingException: has an error, failedMessage=GenericMessage [payload=byte[15],
      headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=input.hello, amqp_deliveryTag=1,
      deliveryAttempt=3, amqp_consumerQueue=input.hello, amqp_redelivered=false, id=a15231e6-3f80-677b-5ad7-d4b1e61e486e,
      amqp_consumerTag=amq.ctag-skBFapilvtZhDsn0k3ZmQg, contentType=application/json, timestamp=1522327846136}]
      at org.spring...integ...han...MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:107)
      at. . . . .
Payload: blah

max-attempts を "1" に設定することにより、DLQ への即時ディスパッチ(再試行なし)を容易にすることもできます。例:

--spring.cloud.stream.bindings.uppercase-in-0.consumer.max-attempts=1

テンプレートを再試行

このセクションでは、再試行機能の構成に関連する構成プロパティについて説明します。

RetryTemplate は Spring Retry [GitHub] (英語) ライブラリの一部です。RetryTemplate のすべての機能を網羅することはこのドキュメントの範囲外ですが、RetryTemplate に特に関連する次のコンシューマープロパティについて説明します。

maxAttempts

メッセージの処理の試行回数。

デフォルト: 3.

backOffInitialInterval

再試行時のバックオフ初期間隔。

デフォルトは 1000 ミリ秒です。

backOffMaxInterval

最大バックオフ間隔。

デフォルトの 10000 ミリ秒。

backOffMultiplier

バックオフ乗数。

デフォルトの 2.0。

defaultRetryable

retryableExceptions にリストされていないリスナーによってスローされた例外が再試行可能かどうか。

デフォルト: true.

retryableExceptions

キーの Throwable クラス名と値のブール値のマップ。再試行される、または再試行されない例外(およびサブクラス)を指定します。defaultRetriable も参照してください。例: spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=false

デフォルト: 空。

上記の設定は、カスタマイズ要件の大部分には十分ですが、特定の複雑な要件を満たさない場合があります。その場合、RetryTemplate の独自のインスタンスを提供することをお勧めします。これを行うには、アプリケーション構成で Bean として構成します。アプリケーションが提供するインスタンスは、フレームワークが提供するインスタンスをオーバーライドします。また、競合を回避するには、バインダーで使用する RetryTemplate のインスタンスを @StreamRetryTemplate として修飾する必要があります。例:

@StreamRetryTemplate
public RetryTemplate myRetryTemplate() {
    return new RetryTemplate();
}

上記の例からわかるように、@StreamRetryTemplate は修飾された @Bean であるため、@Bean でアノテーションを付ける必要はありません。

RetryTemplate をより正確にする必要がある場合は、ConsumerProperties で名前で Bean を指定して、バインディングごとに特定の再試行 Bean を関連付けることができます。

spring.cloud.stream.bindings.<foo>.consumer.retry-template-name=<your-retry-template-bean-name>