您好,登錄后才能下訂單哦!
這篇文章主要介紹“Druid連接創建及銷毀的方法是什么”,在日常操作中,相信很多人在Druid連接創建及銷毀的方法是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Druid連接創建及銷毀的方法是什么”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
Druid是阿里開源的數據庫連接池,是阿里監控系統Dragoon的副產品,提供了強大的可監控性和基于Filter-Chain的可擴展性。
Druid數據庫連接池中可用的連接存放在一個數組connections中;
Druid數據庫連接池做并發控制,主要靠一把可重入鎖以及和這把鎖關聯的兩個Condition對象;
public DruidAbstractDataSource(boolean lockFair) { lock = new ReentrantLock(lockFair); notEmpty = lock.newCondition(); empty = lock.newCondition(); }
連接池沒有可用連接時,應用線程會在notEmpty上等待,連接池已滿時,生產連接的線程會在empty上等待;
對連接保活,就是每間隔一定時間,對達到了保活間隔周期的連接進行有效性校驗,可以將無效連接銷毀,也可以防止連接長時間不與數據庫服務端通信。
Druid版本:1.2.11
DruidDataSource連接的創建由CreateConnectionThread線程完成,其run() 方法如下所示。
public void run() { initedLatch.countDown(); long lastDiscardCount = 0; int errorCount = 0; for (; ; ) { try { lock.lockInterruptibly(); } catch (InterruptedException e2) { break; } long discardCount = DruidDataSource.this.discardCount; boolean discardChanged = discardCount - lastDiscardCount > 0; lastDiscardCount = discardCount; try { // emptyWait為true表示生產連接線程需要等待,無需生產連接 boolean emptyWait = true; // 發生了創建錯誤,且池中已無連接,且丟棄連接的統計沒有改變 // 此時生產連接線程需要生產連接 if (createError != null && poolingCount == 0 && !discardChanged) { emptyWait = false; } if (emptyWait && asyncInit && createCount < initialSize) { emptyWait = false; } if (emptyWait) { // 池中已有連接數大于等于正在等待連接的應用線程數 // 且當前是非keepAlive場景 // 且當前是非連續失敗 // 此時生產連接的線程在empty上等待 // keepAlive && activeCount + poolingCount < minIdle時會在shrink()方法中觸發emptySingal()來添加連接 // isFailContinuous()返回true表示連續失敗,即多次(默認2次)創建物理連接失敗 if (poolingCount >= notEmptyWaitThreadCount && (!(keepAlive && activeCount + poolingCount < minIdle)) && !isFailContinuous() ) { empty.await(); } // 防止創建超過maxActive數量的連接 if (activeCount + poolingCount >= maxActive) { empty.await(); continue; } } } catch (InterruptedException e) { // 省略 } finally { lock.unlock(); } PhysicalConnectionInfo connection = null; try { connection = createPhysicalConnection(); } catch (SQLException e) { LOG.error("create connection SQLException, url: " + jdbcUrl + ", errorCode " + e.getErrorCode() + ", state " + e.getSQLState(), e); errorCount++; if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) { // 多次創建失敗 setFailContinuous(true); // 如果配置了快速失敗,就喚醒所有在notEmpty上等待的應用線程 if (failFast) { lock.lock(); try { notEmpty.signalAll(); } finally { lock.unlock(); } } if (breakAfterAcquireFailure) { break; } try { Thread.sleep(timeBetweenConnectErrorMillis); } catch (InterruptedException interruptEx) { break; } } } catch (RuntimeException e) { LOG.error("create connection RuntimeException", e); setFailContinuous(true); continue; } catch (Error e) { LOG.error("create connection Error", e); setFailContinuous(true); break; } if (connection == null) { continue; } // 把連接添加到連接池 boolean result = put(connection); if (!result) { JdbcUtils.close(connection.getPhysicalConnection()); LOG.info("put physical connection to pool failed."); } errorCount = 0; if (closing || closed) { break; } } }
CreateConnectionThread的run() 方法整體就是在一個死循環中不斷的等待,被喚醒,然后創建線程。當一個物理連接被創建出來后,會調用DruidDataSource#put方法將其放到連接池connections中,put() 方法源碼如下所示。
protected boolean put(PhysicalConnectionInfo physicalConnectionInfo) { DruidConnectionHolder holder = null; try { holder = new DruidConnectionHolder(DruidDataSource.this, physicalConnectionInfo); } catch (SQLException ex) { // 省略 return false; } return put(holder, physicalConnectionInfo.createTaskId, false); } private boolean put(DruidConnectionHolder holder, long createTaskId, boolean checkExists) { // 涉及到連接池中連接數量改變的操作,都需要加鎖 lock.lock(); try { if (this.closing || this.closed) { return false; } // 池中已有連接數已經大于等于最大連接數,則不再把連接加到連接池并直接返回false if (poolingCount >= maxActive) { if (createScheduler != null) { clearCreateTask(createTaskId); } return false; } // 檢查重復添加 if (checkExists) { for (int i = 0; i < poolingCount; i++) { if (connections[i] == holder) { return false; } } } // 連接放入連接池 connections[poolingCount] = holder; // poolingCount++ incrementPoolingCount(); if (poolingCount > poolingPeak) { poolingPeak = poolingCount; poolingPeakTime = System.currentTimeMillis(); } // 喚醒在notEmpty上等待連接的應用線程 notEmpty.signal(); notEmptySignalCount++; if (createScheduler != null) { clearCreateTask(createTaskId); if (poolingCount + createTaskCount < notEmptyWaitThreadCount && activeCount + poolingCount + createTaskCount < maxActive) { emptySignal(); } } } finally { lock.unlock(); } return true; }
put() 方法會先將物理連接從PhysicalConnectionInfo中獲取出來并封裝成一個DruidConnectionHolder,DruidConnectionHolder就是Druid連接池中的連接。新添加的連接會存放在連接池數組connections的poolingCount位置,然后poolingCount會加1,也就是poolingCount代表著連接池中可以獲取的連接的數量。
DruidDataSource連接的銷毀由DestroyConnectionThread線程完成,其run() 方法如下所示。
public void run() { // run()方法只要執行了,就調用initedLatch#countDown initedLatch.countDown(); for (; ; ) { // 每間隔timeBetweenEvictionRunsMillis執行一次DestroyTask的run()方法 try { if (closed || closing) { break; } if (timeBetweenEvictionRunsMillis > 0) { Thread.sleep(timeBetweenEvictionRunsMillis); } else { Thread.sleep(1000); } if (Thread.interrupted()) { break; } // 執行DestroyTask的run()方法來銷毀需要銷毀的連接 destroyTask.run(); } catch (InterruptedException e) { break; } } }
DestroyConnectionThread的run() 方法就是在一個死循環中每間隔timeBetweenEvictionRunsMillis的時間就執行一次DestroyTask的run() 方法。DestroyTask#run方法實現如下所示。
public void run() { // 根據一系列條件判斷并銷毀連接 shrink(true, keepAlive); // RemoveAbandoned機制 if (isRemoveAbandoned()) { removeAbandoned(); } }
在DestroyTask#run方法中會調用DruidDataSource#shrink方法來根據設定的條件來判斷出需要銷毀和保活的連接。DruidDataSource#shrink方法如下所示。
// checkTime參數表示在將一個連接進行銷毀前,是否需要判斷一下空閑時間 public void shrink(boolean checkTime, boolean keepAlive) { // 加鎖 try { lock.lockInterruptibly(); } catch (InterruptedException e) { return; } // needFill = keepAlive && poolingCount + activeCount < minIdle // needFill為true時,會調用empty.signal()喚醒生產連接的線程來生產連接 boolean needFill = false; // evictCount記錄需要銷毀的連接數 // keepAliveCount記錄需要保活的連接數 int evictCount = 0; int keepAliveCount = 0; int fatalErrorIncrement = fatalErrorCount - fatalErrorCountLastShrink; fatalErrorCountLastShrink = fatalErrorCount; try { if (!inited) { return; } // checkCount = 池中已有連接數 - 最小空閑連接數 // 正常情況下,最多能夠將前checkCount個連接進行銷毀 final int checkCount = poolingCount - minIdle; final long currentTimeMillis = System.currentTimeMillis(); // 正常情況下,需要遍歷池中所有連接 // 從前往后遍歷,i為數組索引 for (int i = 0; i < poolingCount; ++i) { DruidConnectionHolder connection = connections[i]; // 如果發生了致命錯誤(onFatalError == true)且致命錯誤發生時間(lastFatalErrorTimeMillis)在連接建立時間之后 // 把連接加入到保活連接數組中 if ((onFatalError || fatalErrorIncrement > 0) && (lastFatalErrorTimeMillis > connection.connectTimeMillis)) { keepAliveConnections[keepAliveCount++] = connection; continue; } if (checkTime) { // phyTimeoutMillis表示連接的物理存活超時時間,默認值是-1 if (phyTimeoutMillis > 0) { // phyConnectTimeMillis表示連接的物理存活時間 long phyConnectTimeMillis = currentTimeMillis - connection.connectTimeMillis; // 連接的物理存活時間大于phyTimeoutMillis,則將這個連接放入evictConnections數組 if (phyConnectTimeMillis > phyTimeoutMillis) { evictConnections[evictCount++] = connection; continue; } } // idleMillis表示連接的空閑時間 long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis; // minEvictableIdleTimeMillis表示連接允許的最小空閑時間,默認是30分鐘 // keepAliveBetweenTimeMillis表示保活間隔時間,默認是2分鐘 // 如果連接的空閑時間小于minEvictableIdleTimeMillis且還小于keepAliveBetweenTimeMillis // 則connections數組中當前連接之后的連接都會滿足空閑時間小于minEvictableIdleTimeMillis且還小于keepAliveBetweenTimeMillis // 此時跳出遍歷,不再檢查其余的連接 if (idleMillis < minEvictableIdleTimeMillis && idleMillis < keepAliveBetweenTimeMillis ) { break; } // 連接的空閑時間大于等于允許的最小空閑時間 if (idleMillis >= minEvictableIdleTimeMillis) { if (checkTime && i < checkCount) { // i < checkCount這個條件的理解如下: // 每次shrink()方法執行時,connections數組中只有索引0到checkCount-1的連接才允許被銷毀 // 這樣才能保證銷毀完連接后,connections數組中至少還有minIdle個連接 evictConnections[evictCount++] = connection; continue; } else if (idleMillis > maxEvictableIdleTimeMillis) { // 如果空閑時間過久,已經大于了允許的最大空閑時間(默認7小時) // 那么無論如何都要銷毀這個連接 evictConnections[evictCount++] = connection; continue; } } // 如果開啟了保活機制,且連接空閑時間大于等于了保活間隔時間 // 此時將連接加入到保活連接數組中 if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) { keepAliveConnections[keepAliveCount++] = connection; } } else { // checkTime為false,那么前checkCount個連接直接進行銷毀,不再判斷這些連接的空閑時間是否超過閾值 if (i < checkCount) { evictConnections[evictCount++] = connection; } else { break; } } } // removeCount = 銷毀連接數 + 保活連接數 // removeCount表示本次從connections數組中拿掉的連接數 // 注:一定是從前往后拿,正常情況下最后minIdle個連接是安全的 int removeCount = evictCount + keepAliveCount; if (removeCount > 0) { // [0, 1, 2, 3, 4, null, null, null] -> [3, 4, 2, 3, 4, null, null, null] System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount); // [3, 4, 2, 3, 4, null, null, null] -> [3, 4, null, null, null, null, null, null, null] Arrays.fill(connections, poolingCount - removeCount, poolingCount, null); // 更新池中連接數 poolingCount -= removeCount; } keepAliveCheckCount += keepAliveCount; // 如果池中連接數加上活躍連接數(借出去的連接)小于最小空閑連接數 // 則將needFill設為true,后續需要喚醒生產連接的線程來生產連接 if (keepAlive && poolingCount + activeCount < minIdle) { needFill = true; } } finally { lock.unlock(); } if (evictCount > 0) { // 遍歷evictConnections數組,銷毀其中的連接 for (int i = 0; i < evictCount; ++i) { DruidConnectionHolder item = evictConnections[i]; Connection connection = item.getConnection(); JdbcUtils.close(connection); destroyCountUpdater.incrementAndGet(this); } Arrays.fill(evictConnections, null); } if (keepAliveCount > 0) { // 遍歷keepAliveConnections數組,對其中的連接做可用性校驗 // 校驗通過連接就放入connections數組,沒通過連接就銷毀 for (int i = keepAliveCount - 1; i >= 0; --i) { DruidConnectionHolder holer = keepAliveConnections[i]; Connection connection = holer.getConnection(); holer.incrementKeepAliveCheckCount(); boolean validate = false; try { this.validateConnection(connection); validate = true; } catch (Throwable error) { if (LOG.isDebugEnabled()) { LOG.debug("keepAliveErr", error); } } boolean discard = !validate; if (validate) { holer.lastKeepTimeMillis = System.currentTimeMillis(); boolean putOk = put(holer, 0L, true); if (!putOk) { discard = true; } } if (discard) { try { connection.close(); } catch (Exception e) { } lock.lock(); try { discardCount++; if (activeCount + poolingCount <= minIdle) { emptySignal(); } } finally { lock.unlock(); } } } this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount); Arrays.fill(keepAliveConnections, null); } // 如果needFill為true則喚醒生產連接的線程來生產連接 if (needFill) { lock.lock(); try { // 計算需要生產連接的個數 int fillCount = minIdle - (activeCount + poolingCount + createTaskCount); for (int i = 0; i < fillCount; ++i) { emptySignal(); } } finally { lock.unlock(); } } else if (onFatalError || fatalErrorIncrement > 0) { lock.lock(); try { emptySignal(); } finally { lock.unlock(); } } }
在DruidDataSource#shrink方法中,核心邏輯是遍歷connections數組中的連接,并判斷這些連接是需要銷毀還是需要保活。通常情況下,connections數組中的前checkCount(checkCount = poolingCount - minIdle) 個連接是危險的,因為這些連接只要滿足了:空閑時間 >= minEvictableIdleTimeMillis(允許的最小空閑時間),那么就需要被銷毀,而connections數組中的最后minIdle個連接是相對安全的,因為這些連接只有在滿足:空閑時間 > maxEvictableIdleTimeMillis(允許的最大空閑時間) 時,才會被銷毀。這么判斷的原因,主要就是需要讓連接池里能夠保證至少有minIdle個空閑連接可以讓應用線程獲取。
當確定好了需要銷毀和需要保活的連接后,此時會先將connections數組清理,只保留安全的連接,這個過程示意圖如下。
最后,會遍歷evictConnections數組,銷毀數組中的連接,遍歷keepAliveConnections數組,對其中的每個連接做可用性校驗,如果校驗可用,那么就重新放回connections數組,否則銷毀。
到此,關于“Druid連接創建及銷毀的方法是什么”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。