您好,登錄后才能下訂單哦!
這篇文章主要介紹了SpringBoot+SpringBatch+Quartz整合定時批量任務方式的示例分析,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
需求內容如下:
PC網頁觸發一條設備升級記錄(下圖),后臺要定時批量設備更新。這里定時要用到Quartz,批量數據處理要用到SpringBatch,二者結合,可以完成該需求。
由于之前,沒有用過SpringBatch,于是上網查了下資料,發現可參考的不是很多,于是只能去慢慢的翻看官方文檔。
遇到不少問題,就記錄一下吧。
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> </dependencies>
spring: datasource: username: thinklink password: thinklink url: jdbc:postgresql://172.16.205.54:5432/thinklink driver-class-name: org.postgresql.Driver batch: job: enabled: false server: port: 8073 #upgrade-dispatch-base-url: http://172.16.205.125:8080/api/rpc/dispatch/command/ upgrade-dispatch-base-url: http://172.16.205.211:8080/api/noauth/rpc/dispatch/command/ # 每次批量處理的數據量,默認為5000 batch-size: 5000
觸發批處理任務的入口,執行一個job
@Service("batchService") public class BatchServiceImpl implements BatchService { // 框架自動注入 @Autowired private JobLauncher jobLauncher; @Autowired private Job updateDeviceJob; /** * 根據 taskId 創建一個Job * @param taskId * @throws Exception */ @Override public void createBatchJob(String taskId) throws Exception { JobParameters jobParameters = new JobParametersBuilder() .addString("taskId", taskId) .addString("uuid", UUID.randomUUID().toString().replace("-","")) .toJobParameters(); // 傳入一個Job任務和任務需要的參數 jobLauncher.run(updateDeviceJob, jobParameters); } }
此部分最重要(☆☆☆☆☆)
@Configuration public class BatchConfiguration { private static final Logger log = LoggerFactory.getLogger(BatchConfiguration.class); @Value("${batch-size:5000}") private int batchSize; // 框架自動注入 @Autowired public JobBuilderFactory jobBuilderFactory; // 框架自動注入 @Autowired public StepBuilderFactory stepBuilderFactory; // 數據過濾器,對從數據庫讀出來的數據,注意進行操作 @Autowired public TaskItemProcessor taskItemProcessor; // 接收job參數 public Map<String, JobParameter> parameters; public Object taskId; @Autowired private JdbcTemplate jdbcTemplate; // 讀取數據庫操作 @Bean @StepScope public JdbcCursorItemReader<DispatchRequest> itemReader(DataSource dataSource) { String querySql = " SELECT " + " e. ID AS taskId, " + " e.user_id AS userId, " + " e.timing_startup AS startTime, " + " u.device_id AS deviceId, " + " d.app_name AS appName, " + " d.compose_file AS composeFile, " + " e.failure_retry AS failureRetry, " + " e.tetry_times AS retryTimes, " + " e.device_managered AS deviceManagered " + " FROM " + " eiot_upgrade_task e " + " LEFT JOIN eiot_upgrade_device u ON e. ID = u.upgrade_task_id " + " LEFT JOIN eiot_app_detail d ON e.app_id = d. ID " + " WHERE " + " ( " + " u.device_upgrade_status = 0 " + " OR u.device_upgrade_status = 2" + " )" + " AND e.tetry_times > u.retry_times " + " AND e. ID = ?"; return new JdbcCursorItemReaderBuilder<DispatchRequest>() .name("itemReader") .sql(querySql) .dataSource(dataSource) .queryArguments(new Object[]{parameters.get("taskId").getValue()}) .rowMapper(new DispatchRequest.DispatchRequestRowMapper()) .build(); } // 將結果寫回數據庫 @Bean @StepScope public ItemWriter<ProcessResult> itemWriter() { return new ItemWriter<ProcessResult>() { private int updateTaskStatus(DispatchRequest dispatchRequest, int status) { log.info("update taskId: {}, deviceId: {} to status {}", dispatchRequest.getTaskId(), dispatchRequest.getDeviceId(), status); Integer retryTimes = jdbcTemplate.queryForObject( "select retry_times from eiot_upgrade_device where device_id = ? and upgrade_task_id = ?", new Object[]{ dispatchRequest.getDeviceId(), dispatchRequest.getTaskId()}, Integer.class ); retryTimes += 1; int updateCount = jdbcTemplate.update("update eiot_upgrade_device set device_upgrade_status = ?, retry_times = ? " + "where device_id = ? and upgrade_task_id = ?", status, retryTimes, dispatchRequest.getDeviceId(), dispatchRequest.getTaskId()); if (updateCount <= 0) { log.warn("no task updated"); } else { log.info("count of {} task updated", updateCount); } // 最后一次重試 if (status == STATUS_DISPATCH_FAILED && retryTimes == dispatchRequest.getRetryTimes()) { log.info("the last retry of {} failed, inc deviceManagered", dispatchRequest.getTaskId()); return 1; } else { return 0; } } @Override @Transactional public void write(List<? extends ProcessResult> list) throws Exception { Map taskMap = jdbcTemplate.queryForMap( "select device_managered, device_count, task_status from eiot_upgrade_task where id = ?", list.get(0).getDispatchRequest().getTaskId() // 我們認定一個批量里面,taskId都是一樣的 ); int deviceManagered = (int)taskMap.get("device_managered"); Integer deviceCount = (Integer) taskMap.get("device_count"); if (deviceCount == null) { log.warn("deviceCount of task {} is null", list.get(0).getDispatchRequest().getTaskId()); } int taskStatus = (int)taskMap.get("task_status"); for (ProcessResult result: list) { deviceManagered += updateTaskStatus(result.getDispatchRequest(), result.getStatus()); } if (deviceCount != null && deviceManagered == deviceCount) { taskStatus = 2; //任務狀態 0:待升級,1:升級中,2:已完成 } jdbcTemplate.update("update eiot_upgrade_task set device_managered = ?, task_status = ? " + "where id = ?", deviceManagered, taskStatus, list.get(0).getDispatchRequest().getTaskId()); } }; } /** * 定義一個下發更新的 job * @return */ @Bean public Job updateDeviceJob(Step updateDeviceStep) { return jobBuilderFactory.get(UUID.randomUUID().toString().replace("-", "")) .listener(new JobListener()) // 設置Job的監聽器 .flow(updateDeviceStep)// 執行下發更新的Step .end() .build(); } /** * 定義一個下發更新的 step * @return */ @Bean public Step updateDeviceStep(JdbcCursorItemReader<DispatchRequest> itemReader,ItemWriter<ProcessResult> itemWriter) { return stepBuilderFactory.get(UUID.randomUUID().toString().replace("-", "")) .<DispatchRequest, ProcessResult> chunk(batchSize) .reader(itemReader) //根據taskId從數據庫讀取更新設備信息 .processor(taskItemProcessor) // 每條更新信息,執行下發更新接口 .writer(itemWriter) .build(); } // job 監聽器 public class JobListener implements JobExecutionListener { @Override public void beforeJob(JobExecution jobExecution) { log.info(jobExecution.getJobInstance().getJobName() + " before... "); parameters = jobExecution.getJobParameters().getParameters(); taskId = parameters.get("taskId").getValue(); log.info("job param taskId : " + parameters.get("taskId")); } @Override public void afterJob(JobExecution jobExecution) { log.info(jobExecution.getJobInstance().getJobName() + " after... "); // 當所有job執行完之后,查詢設備更新狀態,如果有失敗,則要定時重新執行job String sql = " SELECT " + " count(*) " + " FROM " + " eiot_upgrade_device d " + " LEFT JOIN eiot_upgrade_task u ON d.upgrade_task_id = u. ID " + " WHERE " + " u. ID = ? " + " AND d.retry_times < u.tetry_times " + " AND ( " + " d.device_upgrade_status = 0 " + " OR d.device_upgrade_status = 2 " + " ) "; // 獲取更新失敗的設備個數 Integer count = jdbcTemplate.queryForObject(sql, new Object[]{taskId}, Integer.class); log.info("update device failure count : " + count); // 下面是使用Quartz觸發定時任務 // 獲取任務時間,單位秒 // String time = jdbcTemplate.queryForObject(sql, new Object[]{taskId}, Integer.class); // 此處方便測試,應該從數據庫中取taskId對應的重試間隔,單位秒 Integer millSecond = 10; if(count != null && count > 0){ String jobName = "UpgradeTask_" + taskId; String reTaskId = taskId.toString(); Map<String,Object> params = new HashMap<>(); params.put("jobName",jobName); params.put("taskId",reTaskId); if (QuartzManager.checkNameNotExist(jobName)) { QuartzManager.scheduleRunOnceJob(jobName, RunOnceJobLogic.class,params,millSecond); } } } } }
可以在此對數據進行過濾操作
@Component("taskItemProcessor") public class TaskItemProcessor implements ItemProcessor<DispatchRequest, ProcessResult> { public static final int STATUS_DISPATCH_FAILED = 2; public static final int STATUS_DISPATCH_SUCC = 1; private static final Logger log = LoggerFactory.getLogger(TaskItemProcessor.class); @Value("${upgrade-dispatch-base-url:http://localhost/api/v2/rpc/dispatch/command/}") private String dispatchUrl; @Autowired JdbcTemplate jdbcTemplate; /** * 在這里,執行 下發更新指令 的操作 * @param dispatchRequest * @return * @throws Exception */ @Override public ProcessResult process(final DispatchRequest dispatchRequest) { // 調用接口,下發指令 String url = dispatchUrl + dispatchRequest.getDeviceId()+"/"+dispatchRequest.getUserId(); log.info("request url:" + url); RestTemplate restTemplate = new RestTemplate(); HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_JSON_UTF8); MultiValueMap<String, String> params = new LinkedMultiValueMap<String, String>(); JSONObject jsonOuter = new JSONObject(); JSONObject jsonInner = new JSONObject(); try { jsonInner.put("jobId",dispatchRequest.getTaskId()); jsonInner.put("name",dispatchRequest.getName()); jsonInner.put("composeFile", Base64Util.bytesToBase64Str(dispatchRequest.getComposeFile())); jsonInner.put("policy",new JSONObject().put("startTime",dispatchRequest.getPolicy())); jsonInner.put("timestamp",dispatchRequest.getTimestamp()); jsonOuter.put("method","updateApp"); jsonOuter.put("params",jsonInner); } catch (JSONException e) { log.info("JSON convert Exception :" + e); }catch (IOException e) { log.info("Base64Util bytesToBase64Str :" + e); } log.info("request body json :" + jsonOuter); HttpEntity<String> requestEntity = new HttpEntity<String>(jsonOuter.toString(),headers); int status; try { ResponseEntity<String> response = restTemplate.postForEntity(url,requestEntity,String.class); log.info("response :" + response); if (response.getStatusCode() == HttpStatus.OK) { status = STATUS_DISPATCH_SUCC; } else { status = STATUS_DISPATCH_FAILED; } }catch (Exception e){ status = STATUS_DISPATCH_FAILED; } return new ProcessResult(dispatchRequest, status); } }
注意靜態內部類
public class DispatchRequest { private String taskId; private String deviceId; private String userId; private String name; private byte[] composeFile; private String policy; private String timestamp; private String md5; private int failureRetry; private int retryTimes; private int deviceManagered; // 省略構造函數,setter/getter/tostring方法 //...... public static class DispatchRequestRowMapper implements RowMapper<DispatchRequest> { @Override public DispatchRequest mapRow(ResultSet resultSet, int i) throws SQLException { DispatchRequest dispatchRequest = new DispatchRequest(); dispatchRequest.setTaskId(resultSet.getString("taskId")); dispatchRequest.setUserId(resultSet.getString("userId")); dispatchRequest.setPolicy(resultSet.getString("startTime")); dispatchRequest.setDeviceId(resultSet.getString("deviceId")); dispatchRequest.setName(resultSet.getString("appName")); dispatchRequest.setComposeFile(resultSet.getBytes("composeFile")); dispatchRequest.setTimestamp(DateUtil.DateToString(new Date())); dispatchRequest.setRetryTimes(resultSet.getInt("retryTimes")); dispatchRequest.setFailureRetry(resultSet.getInt("failureRetry")); dispatchRequest.setDeviceManagered(resultSet.getInt("deviceManagered")); return dispatchRequest; } } }
@SpringBootApplication @EnableBatchProcessing public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
其實SpringBatch并沒有想象中那么好用,當從數據庫中每次取5000條數據后,進入processor中是逐條處理的,這個時候不能不行操作,等5000條數據處理完之后,再一次性執行ItemWriter方法。
在使用的過程中,最坑的地方是ItemReader和ItemWriter這兩個地方,如何執行自定義的Sql,參考文中代碼就行。至于Quartz定時功能,很簡單,只要定時創建SpringBatch里面的Job,讓這個job啟動就好了,此處就不在給出了,貼的代碼太多了。由于公司一些原因,代碼不能放到GitHub上。
啟動時報Exception
Driver's Blob representation is of an unsupported type: weblogic.jdbc.wrapper.Blob_oracle_sql_BLOB
quartz的driverDelegateClass配置的是OracleDelegate,應用運行在weblogic上
driverDelegateClass對應配置改為
org.quartz.impl.jdbcjobstore.oracle.weblogic.WebLogicOracleDelegate
感謝你能夠認真閱讀完這篇文章,希望小編分享的“SpringBoot+SpringBatch+Quartz整合定時批量任務方式的示例分析”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業資訊頻道,更多相關知識等著你來學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。