您好,登錄后才能下訂單哦!
怎么在Java中利用多線程批量導入數據?相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。
Java的基本數據類型分為:1、整數類型,用來表示整數的數據類型。2、浮點類型,用來表示小數的數據類型。3、字符類型,字符類型的關鍵字是“char”。4、布爾類型,是表示邏輯值的基本數據類型。
前言:
當遇到大量數據導入時,為了提高處理的速度,可以選擇使用多線程來批量處理這些處理。常見的場景有:
大文件導入數據庫(這個文件不一定是標準的CSV可導入文件或者需要在內存中經過一定的處理)
數據同步(從第三方接口拉取數據處理后寫入自己的數據庫)
以上的場景有一個共性,這類數據導入的場景簡單來說就是將數據從一個數據源移動到另外一個數據源,而其中必定可以分為兩步
數據讀取:從數據源讀取數據到內存
數據寫入:將內存中的數據寫入到另外一個數據源,可能存在數據處理
而且根據讀取的速度一般會比數據寫入的速度快很多,即讀取快,寫入慢。
設計思路
由于場景的特點是讀取快,寫入慢,如果是使用多線程處理,建議是數據寫入部分改造為多線程。而數據讀取可以改造成批量讀取數據。簡單來說就是兩個要點:
批量讀取數據
多線程寫入數據
示例
多線程批量處理最簡單的方案是使用線程池來進行處理,下面會通過一個模擬批量讀取和寫入的服務,以及對這個服務的多線程寫入調用作為示例,展示如何多線程批量數據導入。
模擬服務
import java.util.concurrent.atomic.AtomicLong; /** * 數據批量寫入用的模擬服務 * * @author RJH * create at 2019-04-01 */ public class MockService { /** * 可讀取總數 */ private long canReadTotal; /** * 寫入總數 */ private AtomicLong writeTotal=new AtomicLong(0); /** * 寫入休眠時間(單位:毫秒) */ private final long sleepTime; /** * 構造方法 * * @param canReadTotal * @param sleepTime */ public MockService(long canReadTotal, long sleepTime) { this.canReadTotal = canReadTotal; this.sleepTime = sleepTime; } /** * 批量讀取數據接口 * * @param num * @return */ public synchronized long readData(int num) { long readNum; if (canReadTotal >= num) { canReadTotal -= num; readNum = num; } else { readNum = canReadTotal; canReadTotal = 0; } //System.out.println("read data size:" + readNum); return readNum; } /** * 寫入數據接口 */ public void writeData() { try { // 休眠一定時間模擬寫入速度慢 Thread.sleep(sleepTime); } catch (InterruptedException e) { e.printStackTrace(); } // 寫入總數自增 System.out.println("thread:" + Thread.currentThread() + " write data:" + writeTotal.incrementAndGet()); } /** * 獲取寫入的總數 * * @return */ public long getWriteTotal() { return writeTotal.get(); } }
批量數據處理器
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 基于線程池的多線程批量寫入處理器 * @author RJH * create at 2019-04-01 */ public class SimpleBatchHandler { private ExecutorService executorService; private MockService service; /** * 每次批量讀取的數據量 */ private int batch; /** * 線程個數 */ private int threadNum; public SimpleBatchHandler(MockService service, int batch,int threadNum) { this.service = service; this.batch = batch; //使用固定數目的線程池 this.executorService = Executors.newFixedThreadPool(threadNum); } /** * 開始處理 */ public void startHandle() { // 開始處理的時間 long startTime = System.currentTimeMillis(); System.out.println("start handle time:" + startTime); long readData; while ((readData = service.readData(batch)) != 0) {// 批量讀取數據,知道讀取不到數據才停止 for (long i = 0; i < readData; i++) { executorService.execute(() -> service.writeData()); } } // 關閉線程池 executorService.shutdown(); while (!executorService.isTerminated()) {//等待線程池中的線程執行完 } // 結束時間 long endTime = System.currentTimeMillis(); System.out.println("end handle time:" + endTime); // 總耗時 System.out.println("total handle time:" + (endTime - startTime) + "ms"); // 寫入總數 System.out.println("total write num:" + service.getWriteTotal()); } }
測試類
public class SimpleBatchHandlerTest { public static void main(String[] args) { // 總數 long total=100000; // 休眠時間 long sleepTime=100; // 每次拉取的數量 int batch=100; // 線程個數 int threadNum=16; MockService mockService=new MockService(total,sleepTime); SimpleBatchHandler handler=new SimpleBatchHandler(mockService,batch,threadNum); handler.startHandle(); } }
運行結果
start handle time:1554298681755 thread:Thread[pool-1-thread-2,5,main] write data:1 thread:Thread[pool-1-thread-1,5,main] write data:2 ...省略部分輸出 thread:Thread[pool-1-thread-4,5,main] write data:100000 end handle time:1554299330202 total handle time:648447ms total write num:100000
看完上述內容,你們掌握怎么在Java中利用多線程批量導入數據的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。