您好,登錄后才能下訂單哦!
前言
近期接到一個任務,需要改造現有從mysql往Elasticsearch導入數據MTE(mysqlToEs)小工具,由于之前采用單線程導入,千億數據需要兩周左右的時間才能導入完成,導入效率非常低。所以樓主花了3天的時間,利用java線程池框架Executors中的FixedThreadPool線程池重寫了MTE導入工具,單臺服務器導入效率提高十幾倍(合理調整線程數據,效率更高)。
關鍵技術棧
工具說明
maven依賴
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.version}</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>${elasticsearch.version}</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>${elasticsearch.version}</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>${lombok.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency>
java線程池設置
默認線程池大小為21個,可調整。其中POR為處理流程已辦數據線程池,ROR為處理流程已閱數據線程池。
private static int THREADS = 21; public static ExecutorService POR = Executors.newFixedThreadPool(THREADS); public static ExecutorService ROR = Executors.newFixedThreadPool(THREADS);
定義已辦生產者線程/已閱生產者線程:ZlPendProducer/ZlReadProducer
public class ZlPendProducer implements Runnable { ... @Override public void run() { System.out.println(threadName + "::啟動..."); for (int j = 0; j < Const.TBL.TBL_PEND_COUNT; j++) try { .... int size = 1000; for (int i = 0; i < count; i += size) { if (i + size > count) { //作用為size最后沒有100條數據則剩余幾條newList中就裝幾條 size = count - i; } String sql = "select * from " + tableName + " limit " + i + ", " + size; System.out.println(tableName + "::sql::" + sql); rs = statement.executeQuery(sql); List<HistPendingEntity> lst = new ArrayList<>(); while (rs.next()) { HistPendingEntity p = PendUtils.getHistPendingEntity(rs); lst.add(p); } MteExecutor.POR.submit(new ZlPendConsumer(lst)); Thread.sleep(2000); } .... } catch (Exception e) { e.printStackTrace(); } } } public class ZlReadProducer implements Runnable { ...已閱生產者處理邏輯同已辦生產者 }
定義已辦消費者線程/已閱生產者線程:ZlPendConsumer/ZlReadConsumer
public class ZlPendConsumer implements Runnable { private String threadName; private List<HistPendingEntity> lst; public ZlPendConsumer(List<HistPendingEntity> lst) { this.lst = lst; } @Override public void run() { ... lst.forEach(v -> { try { String json = new Gson().toJson(v); EsClient.addDataInJSON(json, Const.ES.HistPendDB_Index, Const.ES.HistPendDB_type, v.getPendingId(), null); Const.COUNTER.LD_P.incrementAndGet(); } catch (Exception e) { e.printStackTrace(); System.out.println("err::PendingId::" + v.getPendingId()); } }); ... } } public class ZlReadConsumer implements Runnable { //已閱消費者處理邏輯同已辦消費者 }
定義導入Elasticsearch數據監控線程:Monitor
監控線程-Monitor為了計算每分鐘導入Elasticsearch的數據總條數,利用監控線程,可以調整線程池的線程數的大小,以便利用多線程更快速的導入數據。
public void monitorToES() { new Thread(() -> { while (true) { StringBuilder sb = new StringBuilder(); sb.append("已辦表數::").append(Const.TBL.TBL_PEND_COUNT) .append("::已辦總數::").append(Const.COUNTER.LD_P_TOTAL) .append("::已辦入庫總數::").append(Const.COUNTER.LD_P); sb.append("~~~~已閱表數::").append(Const.TBL.TBL_READ_COUNT); sb.append("::已閱總數::").append(Const.COUNTER.LD_R_TOTAL) .append("::已閱入庫總數::").append(Const.COUNTER.LD_R); if (ldPrevPendCount == 0 && ldPrevReadCount == 0) { ldPrevPendCount = Const.COUNTER.LD_P.get(); ldPrevReadCount = Const.COUNTER.LD_R.get(); start = System.currentTimeMillis(); } else { long end = System.currentTimeMillis(); if ((end - start) / 1000 >= 60) { start = end; sb.append("\n#########################################\n"); sb.append("已辦每分鐘TPS::" + (Const.COUNTER.LD_P.get() - ldPrevPendCount) + "條"); sb.append("::已閱每分鐘TPS::" + (Const.COUNTER.LD_R.get() - ldPrevReadCount) + "條"); ldPrevPendCount = Const.COUNTER.LD_P.get(); ldPrevReadCount = Const.COUNTER.LD_R.get(); } } System.out.println(sb.toString()); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); }
初始化Elasticsearch:EsClient
String cName = meta.get("cName");//es集群名字 String esNodes = meta.get("esNodes");//es集群ip節點 Settings esSetting = Settings.builder() .put("cluster.name", cName) .put("client.transport.sniff", true)//增加嗅探機制,找到ES集群 .put("thread_pool.search.size", 5)//增加線程池個數,暫時設為5 .build(); String[] nodes = esNodes.split(","); client = new PreBuiltTransportClient(esSetting); for (String node : nodes) { if (node.length() > 0) { String[] hostPort = node.split(":"); client.addTransportAddress(new TransportAddress(InetAddress.getByName(hostPort[0]), Integer.parseInt(hostPort[1]))); } }
初始化數據庫連接
conn = DriverManager.getConnection(url, user, password);
啟動參數
nohup java -jar mte.jar ES-Cluster2019 node1:9300,node2:9300,node3:9300 root 123456! jdbc:mysql://ip:3306/mte 130 130 >> ./mte.log 2>&1 &
參數說明
ES-Cluster2019 為Elasticsearch集群名字
node1:9300,node2:9300,node3:9300為es的節點IP
130 130為已辦已閱分表的數據
程序入口:MteMain
// 監控線程 Monitor monitorService = new Monitor(); monitorService.monitorToES(); // 已辦生產者線程 Thread pendProducerThread = new Thread(new ZlPendProducer(conn, "ZlPendProducer")); pendProducerThread.start(); // 已閱生產者線程 Thread readProducerThread = new Thread(new ZlReadProducer(conn, "ZlReadProducer")); readProducerThread.start();
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持億速云。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。