目录
  • spring batch 自定义itemreader
    • 创建自定义itemreader
    • 配置itemreader bean
  • spring batch 之 itemreader
    • jdbcpagingitemreader
    • flatfileitemreader
    • staxeventitemreader
    • multiresourceitemreader

spring batch 自定义itemreader

spring batch支持各种数据输入源,如文件、数据库等。然而有时也会遇到一些默认不支持的数据源,这时我们则需要实现自己的数据源————自定义itemreader。本文通过示例说明如何自定义itemreader。

创建自定义itemreader

创建自定义itemreader需要下面两个步骤:

  • 创建一个实现itemreader接口的类,并提供返回对象类型 t 作为类型参数。
  • 按照下面规则实现itemreader接口的t read()方法

read()方法如果存在下一个对象则返回,否则返回null。

下面我们自定义itemreader,其返回在线测试课程的学生信息studto类型,为了减少复杂性,该数据存储在内存中。studto类是一个简单数据传输对象,代码如下:

@data
public class studto {
    private string emailaddress;
    private string name;
    private string purchasedpackage;
}

下面参照一下步骤创建itemreader:

  • 创建inmemorystudentreader 类
  • 实现itemreader接口,并设置返回对象类型为studto
  • 类中增加list studentdata 字段,其包括参加课程的学生信息
  • 类中增加nextstudentindex 字段,表示下一个studto对象的索引
  • 增加私有initialize()方法,初始化学生信息并设置索引值为0
  • 创建构造函数并调用initialize方法
  • 实现read()方法,包括下面规则:如果存在下一个学生,则返回studto对象并把索引加一。否则返回null。

inmemorystudentreader 代码如下:

public class inmemorystudentreader implements itemreader<studto> { 
    private int nextstudentindex;
    private list<studto> studentdata; 
    inmemorystudentreader() {
        initialize();
    }
 
    private void initialize() {
        studto tony = new studto();
        tony.setemailaddress("tony.tester@gmail.com");
        tony.setname("tony tester");
        tony.setpurchasedpackage("master");
 
        studto nick = new studto();
        nick.setemailaddress("nick.newbie@gmail.com");
        nick.setname("nick newbie");
        nick.setpurchasedpackage("starter");
 
        studto ian = new studto();
        ian.setemailaddress("ian.intermediate@gmail.com");
        ian.setname("ian intermediate");
        ian.setpurchasedpackage("intermediate");
 
        studentdata = collections.unmodifiablelist(arrays.aslist(tony, nick, ian));
        nextstudentindex = 0;
    }
 
    @override
    public studto read() throws exception {
        studto nextstudent = null;
 
        if (nextstudentindex < studentdata.size()) {
            nextstudent = studentdata.get(nextstudentindex);
            nextstudentindex++;
        } 
        return nextstudent;
    }
}

创建好自定义itemreader后,需要配置其作为bean让spring batch job使用。下面请看如何配置。

配置itemreader bean

配置类代码如下:

@configuration
public class inmemorystudentjobconfig { 
    @bean
    itemreader<studto> inmemorystudentreader() {
        return new inmemorystudentreader();
    }
}

需要增加@configuration表明类为配置类, 增加方法返回itemreader类型,并增加@bean注解,实现方法内容————返回inmemorystudentreader对象。

小结一下

本文通过示例说明如何自定义itemreader,主要包括三个方面:

  • 自定义itemreader需实现itemreader接口
  • 实现itemreader接口,需要指定返回类型作为类型参数(t)
  • 实现接口方法read,如果存在下一个对象则返回,反之返回null

spring batch 之 itemreader

重点介绍 itemreader,如何从不同数据源读取数据;以及异常处理及重启机制。

jdbcpagingitemreader

从数据库中读取数据

@configuration
public class dbjdbcdemojobconfiguration {
    @autowired
    private jobbuilderfactory jobbuilderfactory;
 
    @autowired
    private stepbuilderfactory stepbuilderfactory;
 
    @autowired
    @qualifier("dbjdbcdemowriter")
    private itemwriter<? super customer> dbjdbcdemowriter;
 
    @autowired
    private datasource datasource;
 
    @bean
    public job dbjdbcdemojob(){
        return jobbuilderfactory.get("dbjdbcdemojob")
                .start(dbjdbcdemostep())
                .build();
     }
 
    @bean
    public step dbjdbcdemostep() {
        return stepbuilderfactory.get("dbjdbcdemostep")
                .<customer,customer>chunk(100)
                .reader(dbjdbcdemoreader())
                .writer(dbjdbcdemowriter)
                .build();
    }
 
    @bean
    @stepscope
    public jdbcpagingitemreader<customer> dbjdbcdemoreader() {
        jdbcpagingitemreader<customer> reader = new jdbcpagingitemreader<>();
 
        reader.setdatasource(this.datasource);
        reader.setfetchsize(100); //批量读取
        reader.setrowmapper((rs,rownum)->{
            return customer.builder().id(rs.getlong("id"))
                    .firstname(rs.getstring("firstname"))
                    .lastname(rs.getstring("lastname"))
                    .birthdate(rs.getstring("birthdate"))
                    .build();
 
        });
 
        mysqlpagingqueryprovider queryprovider = new mysqlpagingqueryprovider();
        queryprovider.setselectclause("id, firstname, lastname, birthdate");
        queryprovider.setfromclause("from customer");
        map<string, order> sortkeys = new hashmap<>(1);
        sortkeys.put("id", order.ascending);
        queryprovider.setsortkeys(sortkeys); 
        reader.setqueryprovider(queryprovider); 
        return reader; 
    }
}

job 和 itermwriter不是本文介绍重点,此处举例,下面例子相同

@component("dbjdbcdemowriter")
public class dbjdbcdemowriter implements itemwriter<customer> {
    @override
    public void write(list<? extends customer> items) throws exception {
        for (customer customer:items)
            system.out.println(customer); 
    }
}

flatfileitemreader

从cvs文件中读取数据

 
@configuration
public class flatfiledemojobconfiguration {
    @autowired
    private jobbuilderfactory jobbuilderfactory; 
    @autowired
    private stepbuilderfactory stepbuilderfactory; 
    @autowired
    @qualifier("flatfiledemowriter")
    private itemwriter<? super customer> flatfiledemowriter; 
    @bean
    public job flatfiledemojob(){
        return jobbuilderfactory.get("flatfiledemojob")
                .start(flatfiledemostep())
                .build(); 
    }
 
    @bean
    public step flatfiledemostep() {
        return stepbuilderfactory.get("flatfiledemostep")
                .<customer,customer>chunk(100)
                .reader(flatfiledemoreader())
                .writer(flatfiledemowriter)
                .build();
    }
 
    @bean
    @stepscope
    public flatfileitemreader<customer> flatfiledemoreader() {
        flatfileitemreader<customer> reader = new flatfileitemreader<>();
        reader.setresource(new classpathresource("customer.csv"));
        reader.setlinestoskip(1);
 
        delimitedlinetokenizer tokenizer = new delimitedlinetokenizer();
        tokenizer.setnames(new string[]{"id","firstname","lastname","birthdate"});
 
        defaultlinemapper<customer> linemapper = new defaultlinemapper<>();
        linemapper.setlinetokenizer(tokenizer);
        linemapper.setfieldsetmapper((fieldset -> {
            return customer.builder().id(fieldset.readlong("id"))
                    .firstname(fieldset.readstring("firstname"))
                    .lastname(fieldset.readstring("lastname"))
                    .birthdate(fieldset.readstring("birthdate"))
                    .build();
        }));
        linemapper.afterpropertiesset(); 
        reader.setlinemapper(linemapper); 
        return reader; 
    }
}

staxeventitemreader

从xml文件中读取数据

@configuration
public class xmlfiledemojobconfiguration {
    @autowired
    private jobbuilderfactory jobbuilderfactory; 
    @autowired
    private stepbuilderfactory stepbuilderfactory; 
    @autowired
    @qualifier("xmlfiledemowriter")
    private itemwriter<? super customer> xmlfiledemowriter; 
    @bean
    public job xmlfiledemojob(){
        return jobbuilderfactory.get("xmlfiledemojob")
                .start(xmlfiledemostep())
                .build(); 
    } 
    @bean
    public step xmlfiledemostep() {
        return stepbuilderfactory.get("xmlfiledemostep")
                .<customer,customer>chunk(10)
                .reader(xmlfiledemoreader())
                .writer(xmlfiledemowriter)
                .build();
    } 
    @bean
    @stepscope
    public staxeventitemreader<customer> xmlfiledemoreader() {
        staxeventitemreader<customer> reader = new staxeventitemreader<>(); 
        reader.setresource(new classpathresource("customer.xml"));
        reader.setfragmentrootelementname("customer");  
        xstreammarshaller unmarshaller = new xstreammarshaller();
        map<string,class> map = new hashmap<>();
        map.put("customer",customer.class);
        unmarshaller.setaliases(map);
        reader.setunmarshaller(unmarshaller);  
        return reader; 
    }
}

multiresourceitemreader

从多个文件读取数据

@configuration
public class multiplefiledemojobconfiguration {
    @autowired
    private jobbuilderfactory jobbuilderfactory;
 
    @autowired
    private stepbuilderfactory stepbuilderfactory;
 
    @autowired
    @qualifier("flatfiledemowriter")
    private itemwriter<? super customer> flatfiledemowriter;
 
    @value("classpath*:/file*.csv")
    private resource[] inputfiles;
 
    @bean
    public job multiplefiledemojob(){
        return jobbuilderfactory.get("multiplefiledemojob")
                .start(multiplefiledemostep())
                .build(); 
    }
 
    @bean
    public step multiplefiledemostep() {
        return stepbuilderfactory.get("multiplefiledemostep")
                .<customer,customer>chunk(50)
                .reader(multipleresourceitemreader())
                .writer(flatfiledemowriter)
                .build();
    }
 
    private multiresourceitemreader<customer> multipleresourceitemreader() { 
        multiresourceitemreader<customer> reader = new multiresourceitemreader<>(); 
        reader.setdelegate(flatfilereader());
        reader.setresources(inputfiles); 
        return reader;
    }
 
    @bean
    public flatfileitemreader<customer> flatfilereader() {
        flatfileitemreader<customer> reader = new flatfileitemreader<>();
        reader.setresource(new classpathresource("customer.csv"));
       // reader.setlinestoskip(1);
 
        delimitedlinetokenizer tokenizer = new delimitedlinetokenizer();
        tokenizer.setnames(new string[]{"id","firstname","lastname","birthdate"});
 
        defaultlinemapper<customer> linemapper = new defaultlinemapper<>();
        linemapper.setlinetokenizer(tokenizer);
        linemapper.setfieldsetmapper((fieldset -> {
            return customer.builder().id(fieldset.readlong("id"))
                    .firstname(fieldset.readstring("firstname"))
                    .lastname(fieldset.readstring("lastname"))
                    .birthdate(fieldset.readstring("birthdate"))
                    .build();
        }));
        linemapper.afterpropertiesset(); 
        reader.setlinemapper(linemapper); 
        return reader; 
    }
}

异常处理及重启机制

对于chunk-oriented step,spring batch提供了管理状态的工具。如何在一个步骤中管理状态是通过itemstream接口为开发人员提供访问权限保持状态的组件。这里提到的这个组件是executioncontext实际上它是键值对的映射。map存储特定步骤的状态。该executioncontext使重启步骤成为可能,因为状态在jobrepository中持久存在。

执行期间出现错误时,最后一个状态将更新为jobrepository。下次作业运行时,最后一个状态将用于填充executioncontext然后

可以继续从上次离开的地方开始运行。

检查itemstream接口:

将在步骤开始时调用open()并执行executioncontext;

用db填充值; update()将在每个步骤或事务结束时调用,更新executioncontext;

完成所有数据块后调用close();

下面我们构造个例子

准备个cvs文件,在第33条数据,添加一条错误名字信息 ;当读取到这条数据时,抛出异常终止程序。

itemreader测试代码

 
@component("restartdemoreader")
public class restartdemoreader implements itemstreamreader<customer> {  
    private long curline = 0l;
    private boolean restart = false; 
    private flatfileitemreader<customer> reader = new flatfileitemreader<>(); 
    private executioncontext executioncontext;
    restartdemoreader
    public () {
        
        reader.setresource(new classpathresource("restartdemo.csv")); 
        delimitedlinetokenizer tokenizer = new delimitedlinetokenizer();
        tokenizer.setnames(new string[]{"id", "firstname", "lastname", "birthdate"});
 
        defaultlinemapper<customer> linemapper = new defaultlinemapper<>();
        linemapper.setlinetokenizer(tokenizer);
        linemapper.setfieldsetmapper((fieldset -> {
            return customer.builder().id(fieldset.readlong("id"))
                    .firstname(fieldset.readstring("firstname"))
                    .lastname(fieldset.readstring("lastname"))
                    .birthdate(fieldset.readstring("birthdate"))
                    .build();
        }));
        linemapper.afterpropertiesset(); 
        reader.setlinemapper(linemapper);
    }
 
    @override
    public customer read() throws exception, unexpectedinputexception, parseexception,
            nontransientresourceexception { 
        customer customer = null; 
        this.curline++;
        //如果是重启,则从上一步读取的行数继续往下执行
        if (restart) {
            reader.setlinestoskip(this.curline.intvalue()-1);
            restart = false;
            system.out.println("start reading from line: " + this.curline);
        }
 
        reader.open(this.executioncontext); 
        customer = reader.read();
        //当匹配到wrongname时,显示抛出异常,终止程序
        if (customer != null) {
            if (customer.getfirstname().equals("wrongname"))
                throw new runtimeexception("something wrong. customer id: " + customer.getid());
        } else {
            curline--;
        }
        return customer;
    }
 
    /**
     * 判断是否是重启job
     * @param executioncontext
     * @throws itemstreamexception
     */
    @override
    public void open(executioncontext executioncontext) throws itemstreamexception {
        this.executioncontext = executioncontext;
        if (executioncontext.containskey("curline")) {
            this.curline = executioncontext.getlong("curline");
            this.restart = true;
        } else {
            this.curline = 0l;
            executioncontext.put("curline", this.curline.intvalue());
        } 
    }
 
    @override
    public void update(executioncontext executioncontext) throws itemstreamexception {
        system.out.println("update curline: " + this.curline);
        executioncontext.put("curline", this.curline); 
    }
 
    @override
    public void close() throws itemstreamexception { 
    }
}

job配置

以10条记录为一个批次,进行读取

@configuration
public class restartdemojobconfiguration {
    @autowired
    private jobbuilderfactory jobbuilderfactory;
 
    @autowired
    private stepbuilderfactory stepbuilderfactory;
 
    @autowired
    @qualifier("flatfiledemowriter")
    private itemwriter<? super customer> flatfiledemowriter;
 
    @autowired
    @qualifier("restartdemoreader")
    private itemreader<customer> restartdemoreader;
 
    @bean
    public job restartdemojob(){
        return jobbuilderfactory.get("restartdemojob")
                .start(restartdemostep())
                .build(); 
    }
 
    @bean
    public step restartdemostep() {
        return stepbuilderfactory.get("restartdemostep")
                .<customer,customer>chunk(10)
                .reader(restartdemoreader)
                .writer(flatfiledemowriter)
                .build();
    }
}

当我们第一次执行时,程序在33行抛出异常异常,curline值是30;

这时,我们可以查询数据库 batch_step_excution表,发现curline值已经以 键值对形式,持久化进数据库(上文以10条数据为一个批次;故33条数据异常时,curline值为30)

接下来,我们更新wrongname,再次执行程序;

程序会执行open方法,判断数据库step中map是否存在curline,如果存在,则是重跑,即读取curline,从该批次开始往下继续执行;

以上为个人经验,希望能给大家一个参考,也希望大家多多支持www.887551.com。