アイテム処理

ItemReader および ItemWriter インターフェースはどちらも特定のタスクに非常に役立ちますが、作成する前にビジネスロジックを挿入する場合はどうでしょうか。読み取りと書き込みの両方の 1 つのオプションは、複合パターンを使用することです。別の ItemWriter を含む ItemWriter、または別の ItemReader を含む ItemReader を作成します。次のコードは例を示しています。

public class CompositeItemWriter<T> implements ItemWriter<T> {

    ItemWriter<T> itemWriter;

    public CompositeItemWriter(ItemWriter<T> itemWriter) {
        this.itemWriter = itemWriter;
    }

    public void write(Chunk<? extends T> items) throws Exception {
        //Add business logic here
       itemWriter.write(items);
    }

    public void setDelegate(ItemWriter<T> itemWriter){
        this.itemWriter = itemWriter;
    }
}

前のクラスには、いくつかのビジネスロジックを提供した後に委譲する別の ItemWriter が含まれています。このパターンは、おそらくメインの ItemReader によって提供された入力に基づいて、より多くの参照データを取得するために、ItemReader にも簡単に使用できます。write への呼び出しを自分で制御する必要がある場合にも役立ちます。ただし、実際に書き込む前に、書き込み用に渡されたアイテムを「変換」するだけの場合は、自分で write する必要はありません。アイテムを変更するだけです。このシナリオでは、次のインターフェース定義が示すように、Spring Batch は ItemProcessor インターフェースを提供します。

public interface ItemProcessor<I, O> {

    O process(I item) throws Exception;
}

ItemProcessor は簡単です。1 つのオブジェクトが与えられたら、それを変換して別のオブジェクトを返します。指定されたオブジェクトは、同じ型である場合とそうでない場合があります。ポイントは、ビジネスロジックをプロセス内で適用できることであり、そのロジックの作成は完全に開発者の責任です。ItemProcessor は、ステップに直接接続できます。例: ItemReader が型 Foo のクラスを提供し、書き出す前に型 Bar に変換する必要があると仮定します。次の例は、変換を実行する ItemProcessor を示しています。

public class Foo {}

public class Bar {
    public Bar(Foo foo) {}
}

public class FooProcessor implements ItemProcessor<Foo, Bar> {
    public Bar process(Foo foo) throws Exception {
        //Perform simple transformation, convert a Foo to a Bar
        return new Bar(foo);
    }
}

public class BarWriter implements ItemWriter<Bar> {
    public void write(Chunk<? extends Bar> bars) throws Exception {
        //write bars
    }
}

前の例では、Foo という名前のクラス、Bar という名前のクラス、および ItemProcessor インターフェースに準拠する FooProcessor という名前のクラスがあります。変換は単純ですが、ここではあらゆる型の変換を行うことができます。BarWriter は Bar オブジェクトを書き込み、他の型が提供されている場合は例外をスローします。同様に、Foo 以外が提供された場合、FooProcessor は例外をスローします。次に、次の例に示すように、FooProcessor を Step に注入できます。

  • Java

  • XML

Java 構成
@Bean
public Job ioSampleJob(JobRepository jobRepository, Step step1) {
	return new JobBuilder("ioSampleJob", jobRepository)
				.start(step1)
				.build();
}

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<Foo, Bar>chunk(2, transactionManager)
				.reader(fooReader())
				.processor(fooProcessor())
				.writer(barWriter())
				.build();
}
XML 構成
<job id="ioSampleJob">
    <step name="step1">
        <tasklet>
            <chunk reader="fooReader" processor="fooProcessor" writer="barWriter"
                   commit-interval="2"/>
        </tasklet>
    </step>
</job>

ItemProcessor と ItemReader または ItemWriter の違いは、ItemProcessor は Step ではオプションであるということです。

ItemProcessors のチェーン

単一の変換を実行すると、多くのシナリオで役立ちますが、複数の ItemProcessor 実装をまとめて「チェーン」したい場合はどうすればよいでしょうか ? これは、前述の複合パターンを使用して行うことができます。前の単一の変換を更新するには、たとえば、次の例に示すように、Foo が Bar に変換され、これが Foobar に変換されて書き出されます。

public class Foo {}

public class Bar {
    public Bar(Foo foo) {}
}

public class Foobar {
    public Foobar(Bar bar) {}
}

public class FooProcessor implements ItemProcessor<Foo, Bar> {
    public Bar process(Foo foo) throws Exception {
        //Perform simple transformation, convert a Foo to a Bar
        return new Bar(foo);
    }
}

public class BarProcessor implements ItemProcessor<Bar, Foobar> {
    public Foobar process(Bar bar) throws Exception {
        return new Foobar(bar);
    }
}

public class FoobarWriter implements ItemWriter<Foobar>{
    public void write(Chunk<? extends Foobar> items) throws Exception {
        //write items
    }
}

次の例に示すように、FooProcessor と BarProcessor を「連鎖」させて、結果の Foobar を生成できます。

CompositeItemProcessor<Foo,Foobar> compositeProcessor =
                                      new CompositeItemProcessor<Foo,Foobar>();
List itemProcessors = new ArrayList();
itemProcessors.add(new FooProcessor());
itemProcessors.add(new BarProcessor());
compositeProcessor.setDelegates(itemProcessors);

前の例と同様に、複合プロセッサーを Step に構成できます。

  • Java

  • XML

Java 構成
@Bean
public Job ioSampleJob(JobRepository jobRepository, Step step1) {
	return new JobBuilder("ioSampleJob", jobRepository)
				.start(step1)
				.build();
}

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<Foo, Foobar>chunk(2, transactionManager)
				.reader(fooReader())
				.processor(compositeProcessor())
				.writer(foobarWriter())
				.build();
}

@Bean
public CompositeItemProcessor compositeProcessor() {
	List<ItemProcessor> delegates = new ArrayList<>(2);
	delegates.add(new FooProcessor());
	delegates.add(new BarProcessor());

	CompositeItemProcessor processor = new CompositeItemProcessor();

	processor.setDelegates(delegates);

	return processor;
}
XML 構成
<job id="ioSampleJob">
    <step name="step1">
        <tasklet>
            <chunk reader="fooReader" processor="compositeItemProcessor" writer="foobarWriter"
                   commit-interval="2"/>
        </tasklet>
    </step>
</job>

<bean id="compositeItemProcessor"
      class="org.springframework.batch.item.support.CompositeItemProcessor">
    <property name="delegates">
        <list>
            <bean class="..FooProcessor" />
            <bean class="..BarProcessor" />
        </list>
    </property>
</bean>

レコードのフィルタリング

アイテムプロセッサーの一般的な用途の 1 つは、レコードが ItemWriter に渡される前にフィルターで除外することです。フィルタリングは、スキップとは異なるアクションです。スキップはレコードが無効であることを示し、フィルタリングはレコードを書き込むべきではないことを示します。

例: 3 つの異なる型のレコード (挿入するレコード、更新するレコード、削除するレコード) を含むファイルを読み取るバッチジョブを考えてみましょう。レコードの削除がシステムでサポートされていない場合、削除可能なレコードを ItemWriter に送信したくありません。ただし、これらのレコードは実際には不良レコードではないため、スキップするのではなく、除外する必要があります。その結果、ItemWriter は挿入可能で更新可能なレコードのみを受け取ります。

レコードをフィルタリングするには、ItemProcessor から null を返すことができます。フレームワークは、結果が null であることを検出し、そのアイテムを ItemWriter に配信されるレコードのリストに追加しないようにします。ItemProcessor から例外がスローされると、スキップされます。

入力の検証

ItemReader および ItemWriter の章では、入力を解析するための複数のアプローチについて説明しています。「整形式」でない場合、各主要な実装は例外をスローします。データの範囲が欠落している場合、FixedLengthTokenizer は例外をスローします。同様に、存在しない RowMapper または FieldSetMapper のインデックスにアクセスしようとしたり、予期したものとは異なる形式になっていると、例外がスローされます。これらの型の例外はすべて、read が戻る前にスローされます。ただし、返されたアイテムが有効かどうかの課題には対処していません。例: フィールドの 1 つが年齢である場合、それを負にすることはできません。存在し、数値であるため、正しく解析される可能性がありますが、例外は発生しません。すでに多数の検証フレームワークがあるため、Spring Batch はさらに別の検証フレームワークを提供しようとはしていません。代わりに、次のインターフェース定義が示すように、任意の数のフレームワークで実装できる Validator と呼ばれる単純なインターフェースを提供します。

public interface Validator<T> {

    void validate(T value) throws ValidationException;

}

オブジェクトが無効な場合は validate メソッドが例外をスローし、有効な場合は正常に戻るという契約があります。次の Bean 定義が示すように、Spring Batch は ValidatingItemProcessor を提供します。

  • Java

  • XML

Java 構成
@Bean
public ValidatingItemProcessor itemProcessor() {
	ValidatingItemProcessor processor = new ValidatingItemProcessor();

	processor.setValidator(validator());

	return processor;
}

@Bean
public SpringValidator validator() {
	SpringValidator validator = new SpringValidator();

	validator.setValidator(new TradeValidator());

	return validator;
}
XML 構成
<bean class="org.springframework.batch.item.validator.ValidatingItemProcessor">
    <property name="validator" ref="validator" />
</bean>

<bean id="validator" class="org.springframework.batch.item.validator.SpringValidator">
	<property name="validator">
		<bean class="org.springframework.batch.samples.domain.trade.internal.validator.TradeValidator"/>
	</property>
</bean>

BeanValidatingItemProcessor を使用して、Bean 検証 API (JSR-303) アノテーションでアノテーションが付けられたアイテムを検証することもできます。例: 次の型 Person を検討してください:

class Person {

    @NotEmpty
    private String name;

    public Person(String name) {
     this.name = name;
    }

    public String getName() {
     return name;
    }

    public void setName(String name) {
     this.name = name;
    }

}

アプリケーションコンテキストで BeanValidatingItemProcessor Bean を宣言することで項目を検証し、チャンク指向のステップでプロセッサーとして登録できます。

@Bean
public BeanValidatingItemProcessor<Person> beanValidatingItemProcessor() throws Exception {
    BeanValidatingItemProcessor<Person> beanValidatingItemProcessor = new BeanValidatingItemProcessor<>();
    beanValidatingItemProcessor.setFilter(true);

    return beanValidatingItemProcessor;
}

フォールトトレランス

チャンクがロールバックされると、読み取り中にキャッシュされたアイテムが再処理される場合があります。ステップが耐障害性を持つように構成されている場合 (通常、スキップまたは再試行処理を使用することによって)、使用される ItemProcessor はべき等になるように実装する必要があります。通常、これは、ItemProcessor の入力項目に対して変更を実行せず、結果であるインスタンスのみを更新することで構成されます。