最新の安定バージョンについては、spring-cloud-stream 4.2.1 を使用してください。

カスタム Kafka バインダーヘルスインジケーター

デフォルトの Kafka バインダーヘルスインジケーターのオーバーライド

Spring Boot アクチュエーターがクラスパス上にある場合、Kafka バインダーはデフォルトのヘルスインジケーターをアクティブにします。このヘルスインジケーターは、バインダーのヘルスと Kafka ブローカーとの通信の課題をチェックします。アプリケーションがこのデフォルトのヘルスチェック実装を無効にし、カスタム実装を含めたい場合は、KafkaBinderHealth インターフェースの実装を提供できます。KafkaBinderHealth は、HealthIndicator から拡張されたマーカーインターフェースです。カスタム実装では、health() メソッドの実装を提供する必要があります。カスタム実装は、Bean としてアプリケーション構成に存在する必要があります。バインダーがカスタム実装を検出すると、デフォルトの実装の代わりにそれを使用します。これは、アプリケーションでのそのようなカスタム実装 Bean の例です。

@Bean
public KafkaBinderHealth kafkaBinderHealthIndicator() {
    return new KafkaBinderHealth() {
        @Override
        public Health health() {
            // custom implementation details.
        }
    };
}

カスタム kafka バインダーのヘルスインジケーターの例

カスタム Kafka バインダー HealthIndicator を作成するための疑似コードを次に示します。この例では、最初にクラスター接続を具体的にチェックし、次にトピック関連の課題をチェックすることによって、バインダーが提供する Kafka HealthIndicator をオーバーライドしようとします。

まず、KafkaBinderHealth インターフェースのカスタム実装を作成する必要があります。

public class KafkaBinderHealthImplementation implements KafkaBinderHealth {
    @Value("${spring.cloud.bus.destination}")
    private String topic;
    private final AdminClient client;

    public KafkaBinderHealthImplementation(final KafkaAdmin admin) {
		// More about configuring Kafka
		// https://docs.spring.io/spring-kafka/reference/html/#configuring-topics
        this.client = AdminClient.create(admin.getConfigurationProperties());
    }

    @Override
    public Health health() {
        if (!checkBrokersConnection()) {
            logger.error("Error when connect brokers");
			return Health.down().withDetail("BrokersConnectionError", "Error message").build();
        }
		if (!checkTopicConnection()) {
			logger.error("Error when trying to connect with specific topic");
			return Health.down().withDetail("TopicError", "Error message with topic name").build();
		}
        return Health.up().build();
    }

    public boolean checkBrokersConnection() {
        // Your implementation
    }

    public boolean checkTopicConnection() {
		// Your implementation
    }
}

次に、カスタム実装用の Bean を作成する必要があります。

@Configuration
public class KafkaBinderHealthIndicatorConfiguration {
	@Bean
	public KafkaBinderHealth kafkaBinderHealthIndicator(final KafkaAdmin admin) {
		return new KafkaBinderHealthImplementation(admin);
	}
}