目录
  • 一、spring batch的概念知识
      • 1.2.1、jobrepository
      • 1.2.2、任务启动器joblauncher
      • 1.2.3、任务job
      • 1.2.4、步骤step
        • 2.2.1、读取itemreader
        • 2.2.2、处理itemprocessor
        • 2.2.3、输出itremwriter
      • 2.3、step
        • 2.4、job
          • 三、监听listener

            一、spring batch的概念知识

            1.1、分层架构

            spring batch的分层架构图如下:

            可以看到它分为三层,分别是:

            • application应用层:包含了所有任务batch jobs和开发人员自定义的代码,主要是根据项目需要开发的业务流程等。
            • batch core核心层:包含启动和管理任务的运行环境类,如joblauncher等。
            • batch infrastructure基础层:上面两层是建立在基础层之上的,包含基础的读入reader写出writer、重试框架等。

            1.2、关键概念

            理解下图所涉及的概念至关重要,不然很难进行后续开发和问题分析。

            1.2.1、jobrepository

            专门负责与数据库打交道,对整个批处理的新增、更新、执行进行记录。所以spring batch是需要依赖数据库来管理的。

            1.2.2、任务启动器joblauncher

            负责启动任务job

            1.2.3、任务job

            job是封装整个批处理过程的单位,跑一个批处理任务,就是跑一个job所定义的内容。

            上图介绍了job的一些相关概念:

            • job:封装处理实体,定义过程逻辑。
            • jobinstancejob的运行实例,不同的实例,参数不同,所以定义好一个job后可以通过不同参数运行多次。
            • jobparameters:与jobinstance相关联的参数。
            • jobexecution:代表job的一次实际执行,可能成功、可能失败。

            所以,开发人员要做的事情,就是定义job

            1.2.4、步骤step

            step是对job某个过程的封装,一个job可以包含一个或多个step,一步步的step按特定逻辑执行,才代表job执行完成。

            通过定义step来组装job可以更灵活地实现复杂的业务逻辑。

            1.2.5、输入——处理——输出

            所以,定义一个job关键是定义好一个或多个step,然后把它们组装好即可。而定义step有多种方法,但有一种常用的模型就是输入——处理——输出,即item readeritem processoritem writer。比如通过item reader从文件输入数据,然后通过item processor进行业务处理和数据转换,最后通过item writer写到数据库中去。

            spring batch为我们提供了许多开箱即用的readerwriter,非常方便。

            二、代码实例

            理解了基本概念后,就直接通过代码来感受一下吧。整个项目的功能是从多个csv文件中读数据,处理后输出到一个csv文件。

            2.1、基本框架

            添加依赖:

            <dependency>
              <groupid>org.springframework.boot</groupid>
              <artifactid>spring-boot-starter-batch</artifactid>
            </dependency>
            <dependency>
              <groupid>com.h2database</groupid>
              <artifactid>h2</artifactid>
              <scope>runtime</scope>
            </dependency>

            需要添加spring batch的依赖,同时使用h2作为内存数据库比较方便,实际生产肯定是要使用外部的数据库,如oraclepostgresql

            入口主类:

            @springbootapplication
            @enablebatchprocessing
            public class pkslowbatchjobmain {
                public static void main(string[] args) {
                    springapplication.run(pkslowbatchjobmain.class, args);
                }
            }

            也很简单,只是在springboot的基础上添加注解@enablebatchprocessing

            领域实体类employee

            package com.pkslow.batch.entity;
            public class employee {
                string id;
                string firstname;
                string lastname;
            }

            对应的csv文件内容如下:

            id,firstname,lastname

            1,lokesh,gupta

            2,amit,mishra

            3,pankaj,kumar

            4,david,miller

            2.2、输入——处理——输出

            2.2.1、读取itemreader

            因为有多个输入文件,所以定义如下:

            @value("input/inputdata*.csv")
            private resource[] inputresources;
            
            @bean
            public multiresourceitemreader<employee> multiresourceitemreader()
            {
              multiresourceitemreader<employee> resourceitemreader = new multiresourceitemreader<employee>();
              resourceitemreader.setresources(inputresources);
              resourceitemreader.setdelegate(reader());
              return resourceitemreader;
            }
            
            @bean
            public flatfileitemreader<employee> reader()
            {
              flatfileitemreader<employee> reader = new flatfileitemreader<employee>();
              //跳过csv文件第一行,为表头
              reader.setlinestoskip(1);
              reader.setlinemapper(new defaultlinemapper() {
                {
                  setlinetokenizer(new delimitedlinetokenizer() {
                    {
                      //字段名
                      setnames(new string[] { "id", "firstname", "lastname" });
                    }
                  });
                  setfieldsetmapper(new beanwrapperfieldsetmapper<employee>() {
                    {
                      //转换化后的目标类
                      settargettype(employee.class);
                    }
                  });
                }
              });
              return reader;
            }

            这里使用了flatfileitemreader,方便我们从文件读取数据。

            2.2.2、处理itemprocessor

            为了简单演示,处理很简单,就是把最后一列转为大写:

            public itemprocessor<employee, employee> itemprocessor() {
              return employee -> {
                employee.setlastname(employee.getlastname().touppercase());
                return employee;
              };
            }

            2.2.3、输出itremwriter

            比较简单,代码及注释如下:

            private resource outputresource = new filesystemresource("output/outputdata.csv");
            
            @bean
            public flatfileitemwriter<employee> writer()
            {
              flatfileitemwriter<employee> writer = new flatfileitemwriter<>();
              writer.setresource(outputresource);
              //是否为追加模式
              writer.setappendallowed(true);
              writer.setlineaggregator(new delimitedlineaggregator<employee>() {
                {
                  //设置分割符
                  setdelimiter(",");
                  setfieldextractor(new beanwrapperfieldextractor<employee>() {
                    {
                      //设置字段
                      setnames(new string[] { "id", "firstname", "lastname" });
                    }
                  });
                }
              });
              return writer;
            }

            2.3、step

            有了reader-processor-writer后,就可以定义step了:

            @bean
            public step csvstep() {
              return stepbuilderfactory.get("csvstep").<employee, employee>chunk(5)
                .reader(multiresourceitemreader())
                .processor(itemprocessor())
                .writer(writer())
                .build();
            }

            这里有一个chunk的设置,值为5,意思是5条记录后再提交输出,可以根据自己需求定义。

            2.4、job

            完成了step的编码,定义job就容易了:

            @bean
            public job pkslowcsvjob() {
              return jobbuilderfactory
                .get("pkslowcsvjob")
                .incrementer(new runidincrementer())
                .start(csvstep())
                .build();
            }

            2.5、运行

            完成以上编码后,执行程序,结果如下:

            成功读取数据,并将最后字段转为大写,并输出到outputdata.csv文件。

            三、监听listener

            可以通过listener接口对特定事件进行监听,以实现更多业务功能。比如如果处理失败,就记录一条失败日志;处理完成,就通知下游拿数据等。

            我们分别对readprocesswrite事件进行监听,对应分别要实现itemreadlistener接口、itemprocesslistener接口和itemwritelistener接口。因为代码比较简单,就是打印一下日志,这里只贴出itemwritelistener的实现代码:

            public class pkslowwritelistener implements itemwritelistener<employee> {
                private static final log logger = logfactory.getlog(pkslowwritelistener.class);
                @override
                public void beforewrite(list<? extends employee> list) {
                    logger.info("beforewrite: " + list);
                }
            
                @override
                public void afterwrite(list<? extends employee> list) {
                    logger.info("afterwrite: " + list);
                }
            
                @override
                public void onwriteerror(exception e, list<? extends employee> list) {
                    logger.info("onwriteerror: " + list);
                }
            }

            把实现的监听器listener整合到step中去:

            @bean
            public step csvstep() {
              return stepbuilderfactory.get("csvstep").<employee, employee>chunk(5)
                .reader(multiresourceitemreader())
                .listener(new pkslowreadlistener())
                .processor(itemprocessor())
                .listener(new pkslowprocesslistener())
                .writer(writer())
                .listener(new pkslowwritelistener())
                .build();
            }

            执行后看一下日志:

            这里就能明显看到之前设置的chunk的作用了。writer每次是处理5条记录,如果一条输出一次,会对io造成压力。

            以上就是详解spring batch入门之优秀的批处理框架的详细内容,更多关于spring batch 批处理框架的资料请关注www.887551.com其它相关文章!