中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何使用Spring Batch批處理框架

發布時間:2021-10-25 15:30:53 來源:億速云 閱讀:196 作者:iii 欄目:編程語言

這篇文章主要講解了“如何使用Spring Batch批處理框架”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“如何使用Spring Batch批處理框架”吧!

1 前言

Spring Batch是一個輕量級的、完善的批處理框架,作為Spring體系中的一員,它擁有靈活、方便、生產可用的特點。在應對高效處理大量信息、定時處理大量數據等場景十分簡便。

結合調度框架能更大地發揮Spring Batch的作用。

2 Spring Batch的概念知識

2.1 分層架構

Spring Batch的分層架構圖如下:

如何使用Spring Batch批處理框架

通過例子講解Spring Batch入門,優秀的批處理框架

可以看到它分為三層,分別是:

  •  Application應用層:包含了所有任務batch jobs和開發人員自定義的代碼,主要是根據項目需要開發的業務流程等。

  •  Batch Core核心層:包含啟動和管理任務的運行環境類,如JobLauncher等。

  •  Batch Infrastructure基礎層:上面兩層是建立在基礎層之上的,包含基礎的讀入reader和寫出writer、重試框架等。

2.2 關鍵概念

理解下圖所涉及的概念至關重要,不然很難進行后續開發和問題分析。

如何使用Spring Batch批處理框架

通過例子講解Spring Batch入門,優秀的批處理框架

2.2.1 JobRepository

專門負責與數據庫打交道,對整個批處理的新增、更新、執行進行記錄。所以Spring Batch是需要依賴數據庫來管理的。

2.2.2 任務啟動器JobLauncher

負責啟動任務Job。

2.2.3 任務Job

Job是封裝整個批處理過程的單位,跑一個批處理任務,就是跑一個Job所定義的內容。

如何使用Spring Batch批處理框架

通過例子講解Spring Batch入門,優秀的批處理框架

上圖介紹了Job的一些相關概念:

  •  Job:封裝處理實體,定義過程邏輯。

  •  JobInstance:Job的運行實例,不同的實例,參數不同,所以定義好一個Job后可以通過不同參數運行多次。

  •  JobParameters:與JobInstance相關聯的參數。

  •  JobExecution:代表Job的一次實際執行,可能成功、可能失敗。

所以,開發人員要做的事情,就是定義Job。

2.2.4 步驟Step

Step是對Job某個過程的封裝,一個Job可以包含一個或多個Step,一步步的Step按特定邏輯執行,才代表Job執行完成。

如何使用Spring Batch批處理框架

通過例子講解Spring Batch入門,優秀的批處理框架

通過定義Step來組裝Job可以更靈活地實現復雜的業務邏輯。

2.2.5 輸入——處理——輸出

所以,定義一個Job關鍵是定義好一個或多個Step,然后把它們組裝好即可。而定義Step有多種方法,但有一種常用的模型就是輸入——處理——輸出,即Item Reader、Item Processor和Item Writer。比如通過Item Reader從文件輸入數據,然后通過Item Processor進行業務處理和數據轉換,最后通過Item Writer寫到數據庫中去。

Spring Batch為我們提供了許多開箱即用的Reader和Writer,非常方便。

3 代碼實例

理解了基本概念后,就直接通過代碼來感受一下吧。整個項目的功能是從多個csv文件中讀數據,處理后輸出到一個csv文件。

3.1 基本框架

添加依賴:

<dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-batch</artifactId>  </dependency>  <dependency>    <groupId>com.h3database</groupId>    <artifactId>h3</artifactId>    <scope>runtime</scope>  </dependency>

需要添加Spring Batch的依賴,同時使用H2作為內存數據庫比較方便,實際生產肯定是要使用外部的數據庫,如Oracle、PostgreSQL。

入口主類:

@SpringBootApplication  @EnableBatchProcessing  public class PkslowBatchJobMain {      public static void main(String[] args) {          SpringApplication.run(PkslowBatchJobMain.class, args);      }  }

也很簡單,只是在Springboot的基礎上添加注解@EnableBatchProcessing。

領域實體類Employee:

package com.pkslow.batch.entity;  public class Employee {      String id;      String firstName;      String lastName;  }

對應的csv文件內容如下:

id,firstName,lastName  1,Lokesh,Gupta  2,Amit,Mishra  3,Pankaj,Kumar  4,David,Miller

3.2 輸入&mdash;&mdash;處理&mdash;&mdash;輸出

3.2.1 讀取ItemReader

因為有多個輸入文件,所以定義如下:

@Value("input/inputData*.csv")  private Resource[] inputResources;  @Bean  public MultiResourceItemReader<Employee> multiResourceItemReader()  {   MultiResourceItemReader<Employee> resourceItemReader = new MultiResourceItemReader<Employee>();    resourceItemReader.setResources(inputResources);    resourceItemReader.setDelegate(reader());   return resourceItemReader;  }  @Bean public FlatFileItemReader<Employee> reader()  {    FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>();    //跳過csv文件第一行,為表頭    reader.setLinesToSkip(1);    reader.setLineMapper(new DefaultLineMapper() {      {        setLineTokenizer(new DelimitedLineTokenizer() {          {            //字段名            setNames(new String[] { "id", "firstName", "lastName" });          }        });        setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() {          {            //轉換化后的目標類            setTargetType(Employee.class);          }        });      }    });    return reader;  }

這里使用了FlatFileItemReader,方便我們從文件讀取數據。

3.2.2 處理ItemProcessor

為了簡單演示,處理很簡單,就是把最后一列轉為大寫:

public ItemProcessor<Employee, Employee> itemProcessor() {    return employee -> {      employee.setLastName(employee.getLastName().toUpperCase());      return employee;    };  }

3.2.3 輸出ItremWriter

比較簡單,代碼及注釋如下:

private Resource outputResource = new FileSystemResource("output/outputData.csv");  @Bean  public FlatFileItemWriter<Employee> writer()  {    FlatFileItemWriter<Employee> writer = new FlatFileItemWriter<>();    writer.setResource(outputResource);    //是否為追加模式    writer.setAppendAllowed(true);    writer.setLineAggregator(new DelimitedLineAggregator<Employee>() {      {        //設置分割符        setDelimiter(",");        setFieldExtractor(new BeanWrapperFieldExtractor<Employee>() {          {            //設置字段            setNames(new String[] { "id", "firstName", "lastName" });          }        });      }    });    return writer;  }

3.3 Step

有了Reader-Processor-Writer后,就可以定義Step了:

@Bean  public Step csvStep() {    return stepBuilderFactory.get("csvStep").<Employee, Employee>chunk(5)      .reader(multiResourceItemReader())      .processor(itemProcessor())      .writer(writer())      .build();  }

這里有一個chunk的設置,值為5,意思是5條記錄后再提交輸出,可以根據自己需求定義。

3.4 Job

完成了Step的編碼,定義Job就容易了:

@Bean  public Job pkslowCsvJob() {    return jobBuilderFactory      .get("pkslowCsvJob")      .incrementer(new RunIdIncrementer())      .start(csvStep())      .build();  }

3.5 運行

完成以上編碼后,執行程序,結果如下:

如何使用Spring Batch批處理框架

通過例子講解Spring Batch入門,優秀的批處理框架

成功讀取數據,并將最后字段轉為大寫,并輸出到outputData.csv文件。

4 監聽Listener

可以通過Listener接口對特定事件進行監聽,以實現更多業務功能。比如如果處理失敗,就記錄一條失敗日志;處理完成,就通知下游拿數據等。

我們分別對Read、Process和Write事件進行監聽,對應分別要實現ItemReadListener接口、ItemProcessListener接口和ItemWriteListener接口。因為代碼比較簡單,就是打印一下日志,這里只貼出ItemWriteListener的實現代碼:

public class PkslowWriteListener implements ItemWriteListener<Employee> {      private static final Log logger = LogFactory.getLog(PkslowWriteListener.class);      @Override      public void beforeWrite(List<? extends Employee> list) {          logger.info("beforeWrite: " + list);      }      @Override      public void afterWrite(List<? extends Employee> list) {          logger.info("afterWrite: " + list);      }      @Override      public void onWriteError(Exception e, List<? extends Employee> list) {          logger.info("onWriteError: " + list);      }  }

把實現的監聽器listener整合到Step中去:

@Bean  public Step csvStep() {    return stepBuilderFactory.get("csvStep").<Employee, Employee>chunk(5)      .reader(multiResourceItemReader())      .listener(new PkslowReadListener())      .processor(itemProcessor())      .listener(new PkslowProcessListener())      .writer(writer())      .listener(new PkslowWriteListener())      .build();  }

執行后看一下日志:

如何使用Spring Batch批處理框架

通過例子講解Spring Batch入門,優秀的批處理框架

這里就能明顯看到之前設置的chunk的作用了。Writer每次是處理5條記錄,如果一條輸出一次,會對IO造成壓力。

感謝各位的閱讀,以上就是“如何使用Spring Batch批處理框架”的內容了,經過本文的學習后,相信大家對如何使用Spring Batch批處理框架這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

琼中| 汝阳县| 库尔勒市| 盈江县| 酉阳| 大渡口区| 合江县| 定陶县| 汤原县| 金湖县| 兴宁市| 麦盖提县| 曲麻莱县| 永州市| 福海县| 纳雍县| 石屏县| 寿阳县| 庆阳市| 仁寿县| 贺州市| 土默特左旗| 靖州| 曲沃县| 信阳市| 扎兰屯市| 西华县| 松原市| 宝应县| 滕州市| 梁平县| 常熟市| 连州市| 基隆市| 开平市| 太仆寺旗| 汕尾市| 阿坝| 龙门县| 开平市| 黄浦区|