パブリッシャーが確認
メッセージの公開結果を取得するには、2 つのメカニズムがあります。いずれの場合も、接続ファクトリには publisherConfirmType
セット ConfirmType.CORRELATED
が必要です。「レガシー」メカニズムは、confirmAckChannel
をメッセージチャネルの Bean 名に設定し、そこから非同期で確認を取得できるようにすることです。負の ack がエラーチャネルに送信されます(有効になっている場合)- エラーチャネルを参照してください。
バージョン 3.1 で追加された推奨メカニズムは、相関データヘッダーを使用し、その Future<Confirm>
プロパティを介して結果を待機することです。これは、結果を待つ前に複数のメッセージを送信できるため、バッチリスナーで特に役立ちます。この手法を使用するには、useConfirmHeader
プロパティを true に設定します。次の簡単なアプリケーションは、この手法の使用例です。
spring.cloud.stream.bindings.input-in-0.group=someGroup
spring.cloud.stream.bindings.input-in-0.consumer.batch-mode=true
spring.cloud.stream.source=output
spring.cloud.stream.bindings.output-out-0.producer.error-channel-enabled=true
spring.cloud.stream.rabbit.bindings.output-out-0.producer.useConfirmHeader=true
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.batch-size=10
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
@SpringBootApplication
public class Application {
private static final Logger log = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Autowired
private StreamBridge bridge;
@Bean
Consumer<List<String>> input() {
return list -> {
List<MyCorrelationData> results = new ArrayList<>();
list.forEach(str -> {
log.info("Received: " + str);
MyCorrelationData corr = new MyCorrelationData(UUID.randomUUID().toString(), str);
results.add(corr);
this.bridge.send("output-out-0", MessageBuilder.withPayload(str.toUpperCase())
.setHeader(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION, corr)
.build());
});
results.forEach(correlation -> {
try {
Confirm confirm = correlation.getFuture().get(10, TimeUnit.SECONDS);
log.info(confirm + " for " + correlation.getPayload());
if (correlation.getReturnedMessage() != null) {
log.error("Message for " + correlation.getPayload() + " was returned ");
// throw some exception to invoke binder retry/error handling
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
catch (ExecutionException | TimeoutException e) {
throw new IllegalStateException(e);
}
});
};
}
@Bean
public ApplicationRunner runner(BatchingRabbitTemplate template) {
return args -> IntStream.range(0, 10).forEach(i ->
template.convertAndSend("input-in-0", "input-in-0.rbgh303", "foo" + i));
}
@Bean
public BatchingRabbitTemplate template(CachingConnectionFactory cf, TaskScheduler taskScheduler) {
BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(10, 1000000, 1000);
return new BatchingRabbitTemplate(cf, batchingStrategy, taskScheduler);
}
}
class MyCorrelationData extends CorrelationData {
private final String payload;
MyCorrelationData(String id, String payload) {
super(id);
this.payload = payload;
}
public String getPayload() {
return this.payload;
}
}
ご覧のとおり、各メッセージを送信してから、公開結果を待ちます。メッセージをルーティングできない場合は、将来が完了する前に、返されたメッセージが相関データに入力されます。
フレームワークが相関を実行できるように、相関データには一意の id を提供する必要があります。 |
useConfirmHeader
と confirmAckChannel
の両方を設定することはできませんが、useConfirmHeader
が true の場合でも、エラーチャネルで返されたメッセージを受信できますが、相関ヘッダーを使用する方が便利です。