カスタム ItemReader および ItemWriter の作成

これまでのところ、この章では、Spring Batch での読み取りと書き込みの基本的な契約と、そのための一般的な実装について説明しました。ただし、これらはすべてかなり一般的であり、すぐに使用可能な実装ではカバーされない可能性のある多くの潜在的なシナリオがあります。このセクションでは、簡単な例を使用して、カスタム ItemReader および ItemWriter 実装を作成し、それらの契約を正しく実装する方法を示します。ItemReader は、リーダーまたはライターを再起動可能にする方法を示すために、ItemStream も実装しています。

カスタム ItemReader の例

この例の目的のために、提供されたリストから読み取る単純な ItemReader 実装を作成します。次のコードに示すように、ItemReader の最も基本的な契約である read メソッドを実装することから始めます。

public class CustomItemReader<T> implements ItemReader<T> {

    List<T> items;

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
       NonTransientResourceException, ParseException {

        if (!items.isEmpty()) {
            return items.remove(0);
        }
        return null;
    }
}

上記のクラスは項目のリストを取得し、一度に 1 つずつ返し、リストからそれぞれを削除します。リストが空の場合、次のテストコードに示すように、null を返し、ItemReader の最も基本的な要件を満たします。

List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");

ItemReader itemReader = new CustomItemReader<>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());

ItemReader を再起動可能にする

最後の課題は、ItemReader を再起動可能にすることです。現在、処理が中断されて再び開始される場合、ItemReader は最初から開始する必要があります。これは実際には多くのシナリオで有効ですが、バッチジョブを中断したところから再開することが望ましい場合があります。多くの場合、重要な判別基準は、リーダーがステートフルかステートレスかです。ステートレスリーダーは再起動の可能性について心配する必要はありませんが、ステートフルリーダーは再起動時に最後の既知の状態を再構成する必要があります。このため、可能な場合はカスタムリーダーをステートレスにしておくことをお勧めします。そのため、再起動性について心配する必要はありません。

状態を保存する必要がある場合は、ItemStream インターフェースを使用する必要があります。

public class CustomItemReader<T> implements ItemReader<T>, ItemStream {

    List<T> items;
    int currentIndex = 0;
    private static final String CURRENT_INDEX = "current.index";

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
        ParseException, NonTransientResourceException {

        if (currentIndex < items.size()) {
            return items.get(currentIndex++);
        }

        return null;
    }

    public void open(ExecutionContext executionContext) throws ItemStreamException {
        if (executionContext.containsKey(CURRENT_INDEX)) {
            currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
        }
        else {
            currentIndex = 0;
        }
    }

    public void update(ExecutionContext executionContext) throws ItemStreamException {
        executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
    }

    public void close() throws ItemStreamException {}
}

ItemStreamupdate メソッドを呼び出すたびに、ItemReader の現在のインデックスが、提供された ExecutionContext に "current.index" のキーで格納されます。ItemStreamopen メソッドが呼び出されると、ExecutionContext がチェックされ、そのキーを持つエントリが含まれているかどうかが確認されます。キーが見つかった場合、現在のインデックスはその場所に移動されます。これはかなり些細な例ですが、それでも一般的な契約を満たしています。

ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);

List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<>(items);

((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());

ほとんどの ItemReaders には、はるかに洗練された再起動ロジックがあります。たとえば、JdbcCursorItemReader は、カーソル内で最後に処理された行の行 ID を格納します。

また、ExecutionContext 内で使用されるキーは些細なものであってはならないことに注意する価値があります。これは、同じ ExecutionContext が Step 内のすべての ItemStreams に使用されるためです。ほとんどの場合、一意性を保証するには、単にクラス名をキーに追加するだけで十分です。ただし、まれに同じステップで同じ型の ItemStream が 2 つ使用される場合(出力に 2 つのファイルが必要な場合に発生する可能性があります)、より一意の名前が必要です。このため、Spring Batch ItemReader および ItemWriter 実装の多くには、このキー名をオーバーライドできる setName() プロパティがあります。

カスタム ItemWriter の例

カスタム ItemWriter の実装は、上記の ItemReader の例と多くの点で似ていますが、独自の例を保証するのに十分な点で異なります。ただし、再起動可能性の追加は基本的に同じであるため、この例では説明しません。ItemReader の例と同様に、できるだけ簡単に例を保つために List が使用されます。

public class CustomItemWriter<T> implements ItemWriter<T> {

    List<T> output = TransactionAwareProxyFactory.createTransactionalList();

    public void write(Chunk<? extends T> items) throws Exception {
        output.addAll(items);
    }

    public List<T> getOutput() {
        return output;
    }
}

ItemWriter を再起動可能にする

ItemWriter を再起動可能にするには、ItemReader と同じプロセスに従い、実行コンテキストを同期する ItemStream インターフェースを追加および実装します。この例では、処理されたアイテムの数をカウントし、それをフッターレコードとして追加する必要があります。それを行う必要がある場合は、ItemWriter に ItemStream を実装して、ストリームが再び開かれた場合にカウンターが実行コンテキストから再構成されるようにすることができます。

多くの現実的なケースでは、カスタム ItemWriters は、それ自体が再起動可能な別のライター(たとえば、ファイルへの書き込み時)に委譲するか、トランザクションリソースに書き込むため、ステートレスなので再起動する必要はありません。ステートフルライターを使用している場合は、ItemWriter と同様に ItemStream も実装する必要があります。また、ライターのクライアントは ItemStream を認識する必要があるため、構成でストリームとして登録する必要があることも覚えておいてください。