オフセットのリセット

アプリケーションが起動すると、割り当てられた各パーティションの初期位置は、2 つのプロパティ startOffset と resetOffsets に依存します。resetOffsets が false の場合、通常の Kafka コンシューマー auto.offset.reset [Apache] (英語) セマンティクスが適用されます。つまり、バインディングのコンシューマーグループのパーティションにコミットされたオフセットがない場合、位置は earliest または latest です。デフォルトでは、明示的な group を使用したバインディングは earliest を使用し、匿名のバインディング(group を使用しない)は latest を使用します。これらのデフォルトは、startOffset バインディングプロパティを設定することで上書きできます。特定の group で初めてバインディングが開始されたとき、コミットされたオフセットはありません。コミットされたオフセットが存在しないもう 1 つの条件は、オフセットの有効期限が切れているかどうかです。最新のブローカー(2.1 以降)およびデフォルトのブローカープロパティでは、最後のメンバーがグループを離れてから 7 日後にオフセットの有効期限が切れます。詳細については、offsets.retention.minutes [Apache] (英語) ブローカーのプロパティを参照してください。

resetOffsets が true の場合、バインダーは、このバインディングがトピックから消費されたことがないかのように、ブローカーにコミットされたオフセットがない場合に適用されるセマンティクスと同様のセマンティクスを適用します。つまり、現在コミットされているオフセットは無視されます。

以下は、これが使用される可能性のある 2 つのユースケースです。

  1. キーと値のペアを含む圧縮されたトピックからの消費。resetOffsets を true に、startOffset を earliest に設定します。バインディングは、新しく割り当てられたすべてのパーティションで seekToBeginning を実行します。

  2. イベントを含むトピックから消費します。このトピックでは、このバインディングの実行中に発生するイベントのみに関心があります。resetOffsets を true に、startOffset を latest に設定します。バインディングは、新しく割り当てられたすべてのパーティションで seekToEnd を実行します。

最初の割り当て後にリバランスが発生した場合、シークは、最初の割り当て中に割り当てられなかった、新しく割り当てられたパーティションに対してのみ実行されます。

トピックオフセットの詳細な制御については、リスナーのリバランスを参照してください。リスナーが提供されている場合、resetOffsets を true に設定しないでください。そうしないと、エラーが発生します。