您好,登錄后才能下訂單哦!
小編給大家分享一下Hive Metastore客戶端自動重連機制的示例分析,希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!
本文基于Hive2.1.0的Apache社區版,目的是為了探究Metastore和底層RDBMS和底層服務變更(例如版本升級、服務遷移等運維操作)對客戶端和用戶的影響。Hive提供了在客戶端對Metastore連接超時自動重連的容錯機制,允許我們通過調整參數配置調整停服時間限制,在規定時間內重啟服務對用戶無顯著影響。由于Metastore底層RDBMS我們采用的是業內通用的Mysql,因此后面以Mysql來替代RDBMS進行描述和驗證
參數 | 默認值 | 說明 | 配置范圍 |
---|---|---|---|
hive.metastore.connect.retries | 3 | 客戶端建立與metastore連接時的重試次數 | Metastore客戶端,如CLI、Hiveserver2等 |
hive.metastore.failure.retries | 1 | 客戶端訪問metastore的失敗重試次數 | Metastore客戶端,如CLI、Hiveserver2等 |
hive.metastore.client.connect.retry.delay | 1s | Metastore客戶端重連/重試等待的時間 | Metastore客戶端,如CLI、Hiveserver2等 |
hive.metastore.client.socket.timeout | 600s | Metastore客戶端socket超時時間,傳遞給底層Socket,超時之后底層Socket會自動斷開 | Metastore客戶端,如CLI、Hiveserver2等 |
hive.metastore.client.socket.lifetime | 0 | socket存活時間,超時之后客戶端在下一次訪問Metastore時會主動斷開現有連接并重新建立連接,0表示不主動斷開 | Metastore客戶端,如CLI、Hiveserver2等 |
hive.hmshandler.retry.attempts | 10 | 在JDO數據存儲出現錯誤后嘗試連接的次數 | Metastore |
hive.hmshandler.retry.interval | 2000ms | JDO連接嘗試間隔,單位:ms | Metastore |
hive.server2.thrift.client.connect.retry.limit | 1 | 客戶端建立與Hiveserver2連接的重試次數 | Hiveserver2的客戶端,如Beeline等 |
hive.server2.thrift.client.retry.limit | 1 | 客戶端訪問Hiveserver2的失敗重試次數 | Hiveserver2的客戶端,如Beeline等 |
hive.server2.thrift.client.retry.delay.seconds | 1s | Hiveserver2客戶端重連/重試等待的時間 | Hiveserver2的客戶端,如Beeline等 |
為了弄清這兩個參數的區別,讓我們通過源碼來確認一下,ps:為了方便閱讀后面會用......省略掉無關的代碼邏輯
CLI和Hiveserver2都是通過org.apache.hadoop.hive.ql.metadata.Hive類與Metastore的交互的。首先讓我們以createDatabase(Database, boolean)方法為例來看看具體的交互過程
/** * Create a database * @param db * @param ifNotExist if true, will ignore AlreadyExistsException exception * @throws AlreadyExistsException * @throws HiveException */ public void createDatabase(Database db, boolean ifNotExist) throws AlreadyExistsException, HiveException { try { getMSC().createDatabase(db); } catch (AlreadyExistsException e) { if (!ifNotExist) { throw e; } } catch (Exception e) { throw new HiveException(e); } } /** * @return the metastore client for the current thread * @throws MetaException */ @LimitedPrivate(value = {"Hive"}) @Unstable public synchronized IMetaStoreClient getMSC( boolean allowEmbedded, boolean forceCreate) throws MetaException { if (metaStoreClient == null || forceCreate) { ...... try { metaStoreClient = createMetaStoreClient(allowEmbedded); } catch (RuntimeException ex) { ...... } ...... } return metaStoreClient; }
Hive類維護了一個IMetaStoreClient對象,通過getMSC()方法獲取,getMSC()方法在這里采用了懶漢模式去創建,接下來看下Hive是如何創建一個IMetaStoreClient對象的
// org.apache.hadoop.hive.ql.metadata.Hive.java private IMetaStoreClient createMetaStoreClient(boolean allowEmbedded) throws MetaException { ...... if (conf.getBoolVar(ConfVars.METASTORE_FASTPATH)) { return new SessionHiveMetaStoreClient(conf, hookLoader, allowEmbedded); } else { return RetryingMetaStoreClient.getProxy(conf, hookLoader, metaCallTimeMap, SessionHiveMetaStoreClient.class.getName(), allowEmbedded); } }
if后面的分支用于創建客戶端內置的本地Metastore,這主要用于開發調試階段,因此我們只關注else后面的邏輯,即通過RetryingMetaStoreClient.getProxy方法創建一個IMetaStoreClient對象。RetryingMetaStoreClient.getProxy方法通過幾次簡單地調用重載函數,最終來到下面的方法
// org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.java public static IMetaStoreClient getProxy(HiveConf hiveConf, Class<?>[] constructorArgTypes, Object[] constructorArgs, ConcurrentHashMap<String, Long> metaCallTimeMap, String mscClassName) throws MetaException { @SuppressWarnings("unchecked") Class<? extends IMetaStoreClient> baseClass = (Class<? extends IMetaStoreClient>)MetaStoreUtils.getClass(mscClassName); RetryingMetaStoreClient handler = new RetryingMetaStoreClient(hiveConf, constructorArgTypes, constructorArgs, metaCallTimeMap, baseClass); return (IMetaStoreClient) Proxy.newProxyInstance( RetryingMetaStoreClient.class.getClassLoader(), baseClass.getInterfaces(), handler); }
可以看到,這里利用Java代理機制創建并返回了一個IMetaStoreClient的代理——RetryingMetaStoreClient,此后對IMetaStoreClient對象的調用都委托給RetryingMetaStoreClient.invoke 處理,接下來讓我們看下RetryingMetaStoreClient.invoke方法是如何處理用戶對IMetastoreClient對象的操作的
// org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.java public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Object ret = null; int retriesMade = 0; TException caughtException = null; while (true) { try { reloginExpiringKeytabUser(); // 1. 檢查是否重連,重連的場景包括: // a) 上一次循環訪問Metastore異常,且異常類型支持自動重試訪問 // b) 底層socket超時,超時參數:hive.metastore.client.socket.lifetime if (retriesMade > 0 || hasConnectionLifeTimeReached(method)) { base.reconnect(); lastConnectionTime = System.currentTimeMillis(); } if (metaCallTimeMap == null) { ret = method.invoke(base, args); } else { // need to capture the timing long startTime = System.currentTimeMillis(); ret = method.invoke(base, args); long timeTaken = System.currentTimeMillis() - startTime; addMethodTime(method, timeTaken); } // 2. 訪問Metastore正常,返回結果給上層調用并結束循環,用戶不主動結束的情況下底層與Metastore的連接持續保持著 break; // 3. 處理訪問Metastore過程中出現的異常,異常主要分三類: // a) 用戶操作異常或元數據異常,將異常拋給用戶處理并結束循環 // b) 底層連接異常,例如網絡問題、Metastore服務異常(停服、連接超限等)等支持自動重連,進入步驟4 // c) 其他未知異常,拋給用戶處理并結束循環 } catch (UndeclaredThrowableException e) { throw e.getCause(); } catch (InvocationTargetException e) { Throwable t = e.getCause(); if (t instanceof TApplicationException) { TApplicationException tae = (TApplicationException)t; switch (tae.getType()) { case TApplicationException.UNSUPPORTED_CLIENT_TYPE: case TApplicationException.UNKNOWN_METHOD: case TApplicationException.WRONG_METHOD_NAME: case TApplicationException.INVALID_PROTOCOL: throw t; default: caughtException = tae; } } else if ((t instanceof TProtocolException) || (t instanceof TTransportException)) { caughtException = (TException)t; } else if ((t instanceof MetaException) && t.getMessage().matches( "(?s).*(JDO[a-zA-Z]*|TProtocol|TTransport)Exception.*") && !t.getMessage().contains("java.sql.SQLIntegrityConstraintViolationException")) { caughtException = (MetaException)t; } else { throw t; } } catch (MetaException e) { if (e.getMessage().matches("(?s).*(IO|TTransport)Exception.*") && !e.getMessage().contains("java.sql.SQLIntegrityConstraintViolationException")) { caughtException = e; } else { throw e; } } // 4. 對于支持自動重試的異常,會記錄重試次數并驗證次數是否超限,是則返回異常并結束循環,否則將以warn形式輸出異常日志提醒并等等一段時間后開始下一次循環自動重試訪問Metastore。這里用到的重試次數參數和等待時間參數分別是 hive.metastore.failure.retries,hive.metastore.client.connect.retry.delay if (retriesMade >= retryLimit) { throw caughtException; } retriesMade++; Thread.sleep(retryDelaySeconds * 1000); } return ret; } protected RetryingMetaStoreClient(HiveConf hiveConf, Class<?>[] constructorArgTypes, Object[] constructorArgs, ConcurrentHashMap<String, Long> metaCallTimeMap, Class<? extends IMetaStoreClient> msClientClass) throws MetaException { this.retryLimit = hiveConf.getIntVar(HiveConf.ConfVars.METASTORETHRIFTFAILURERETRIES); this.retryDelaySeconds = hiveConf.getTimeVar( HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY, TimeUnit.SECONDS); this.metaCallTimeMap = metaCallTimeMap; this.connectionLifeTimeInMillis = hiveConf.getTimeVar( HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_LIFETIME, TimeUnit.MILLISECONDS); ...... this.base = (IMetaStoreClient) MetaStoreUtils.newInstance( msClientClass, constructorArgTypes, constructorArgs); }
從 RetryingMetaStoreClient 的構造函數中可以發現,RetryingMetaStoreClient 維護了一個 HiveMetaStoreClient 對象,用戶在上層調用一次 RetryingMetaStoreClient 對象操作,例如第一步的 createDatabase 方法,會經過 RetryingMetaStoreClient.invoke 的封裝最終調用HiveMetaStoreClient類中的同名方法進行操作。在 RetryingMetaStoreClient.invoke 中封裝了自動重試的邏輯,在底層與Metastore的連接過程中出現異常的情況下會自動重試而不影響上層用戶的操作。
這里我們在注釋中標注了 invoke 方法中主要的操作步驟,可以看到,重試次數由參數hive.metastore.failure.retries控制,兩次重試之間的等待時間由hive.metastore.client.connect.retry.delay控制。
注意,這里我們說的是“重試”,而不是“重連”,一次重試中與Metastore的交互有兩步:1. 建立與Metastore的會話 2. 執行用戶請求。我們繼續看下客戶端是怎么建立與Metastore的會話的
// org.apache.hadoop.hive.metastore.HiveMetaStoreClient.java @Override public void reconnect() throws MetaException { ...... close(); // 當配置了多個Metastore時,會隨機調整Metastore順序 promoteRandomMetaStoreURI(); open(); } private void open() throws MetaException { isConnected = false; ...... // hive.metastore.client.socket.timeout int clientSocketTimeout = (int) conf.getTimeVar( ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS); for (int attempt = 0; !isConnected && attempt < retries; ++attempt) { for (URI store : metastoreUris) { try { transport = new TSocket(store.getHost(), store.getPort(), clientSocketTimeout); ...... try { transport.open(); isConnected = true; } catch (TTransportException e) { ...... } ...... } catch (MetaException e) { ...... } if (isConnected) { break; } } // Wait before launching the next round of connection retries. if (!isConnected && retryDelaySeconds > 0) { try { Thread.sleep(retryDelaySeconds * 1000); } catch (InterruptedException ignore) {} } } if (!isConnected) { throw new MetaException("Could not connect to meta store using any of the URIs provided." + " Most recent failure: " + StringUtils.stringifyException(tte)); } ...... } public HiveMetaStoreClient(HiveConf conf, HiveMetaHookLoader hookLoader, Boolean allowEmbedded) throws MetaException { ...... // hive.metastore.connect.retries retries = HiveConf.getIntVar(conf, HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES); // hive.metastore.client.connect.retry.delay retryDelaySeconds = conf.getTimeVar( ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY, TimeUnit.SECONDS); ...... // 初始化一個HiveMetaStoreClient對象時會嘗試建立與Metastore的長會話 open(); }
同上一步的重試邏輯類似,與Metastore的連接支持自動重連,由 hive.metastore.connect.retries 控制重連次數,hive.metastore.client.connect.retry.delay 控制重連等待時間,底層利用Thrift提供的RPC通信服務。
如果配置了多個Metastore地址,每一次重連的時候會按順序遍歷所有的Metastore并嘗試與之建立會話,直到有一個會話建立成功為止。
此外,初始化一個HiveMetaStoreClient對象時會調用open()方法嘗試建立一個與Metastore的長會話,供后面的用戶請求使用
HiveMetaStoreClient.open() 方法建立一個與Metastore的會話,該方法中會在連接失敗的情況下自動重連,重連次數、重連等待時間分別由參數 hive.metastore.connect.retries 、 hive.metastore.client.connect.retry.delay 控制。且每次重連時會遍歷用戶配置的所有的Metastore直到成功建立一個會話
用戶新建一個Metastore客戶端(例如啟動一個CLI、Hiveserver2進程)時,會初始化并維護一個IMetaStoreClient對象,在初始化時調用 *HiveMetaStoreClient.open()*方法建立一個與Metastore的長會話
用戶每次調用IMetaStoreClient中的方法進行業務操作,實際上委托給 RetryingMetaStoreClient.invoke 方法操作,在遇到與Metastore連接等異常時會進行自動重試,重試次數、重試等待時間分別由參數 hive.metastore.failure.retries 、 hive.metastore.client.connect.retry.delay 控制
RetryingMetaStoreClient.invoke 中每次重試會嘗試調用 HiveMetaStoreClient.reconnect() 方法重連Metastore,HiveMetaStoreClient.reconnect() 方法內會調用 HiveMetaStoreClient.open() 去連接Metastore。因此,invoke方法實際上在重試循環中嵌套了循環重連Metastore的操作
所以 hive.metastore.failure.retries 參數實際上僅用于在已經建立了Metastore的會話的基礎上進行正常的業務訪問過程中遇到連接異常等問題時的重試次數限制,而 hive.metastore.connect.retries 則是更底層自動重連Metastore的次數限制
此外,hive.server2.thrift.client.connect.retry.limit 同 hive.server2.thrift.client.retry.limit 的區別也與hive.metastore.connect.retries 和 hive.metastore.failure.retries的區別類似,這里就不再贅述,有興趣的同學可以參照本篇文檔去研究下源碼
看完了這篇文章,相信你對“Hive Metastore客戶端自動重連機制的示例分析”有了一定的了解,如果想了解更多相關知識,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。