您好,登錄后才能下訂單哦!
jstorm中bolt是如何處理異常的?相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。
BasicBoltExecutor源碼:
public void execute(Tuple input) { _collector.setContext(input); try { _bolt.execute(input, _collector); _collector.getOutputter().ack(input); } catch (FailedException e) { if (e instanceof ReportedFailedException) { _collector.reportError(e); } _collector.getOutputter().fail(input); } }
_bolt.execute(input, _collector) 就是執行我們自己編寫的bolt里的excute方法。可以看到,在這里,只會catch storm自己定義的FailedException,并且發送fail消息,標記tuple處理失敗, 其余異常則會被放過。
再外層是BoltExecutors的processTupleEvent方法:
try { if (!isSystemBolt && tuple.getSourceStreamId().equals(Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID)) { backpressureTrigger.handle(tuple); } else { bolt.execute(tuple); } } catch (Throwable e) { error = e; LOG.error("bolt execute error ", e); report_error.report(e); }
在這里,所有異常都會被catch住,但是只會進行report_error,并不會發fail消息,相關tuple只能等超時才能被標記為失敗。
再來看report_error.report(e) 的具體實現,通過看構造函數,可以看到report_error是一個TaskReportErrorAndDie類,
@Override public void report(Throwable error) { this.reporterror.report(error); this.haltfn.run(); }
在這里,reporterror是一個AsyncLoopDefaultKill類
@Override public void run() { JStormUtils.halt_process(1, "Async loop died!"); }
這里就是整個過程的最終步驟了, JStormUtils.halt_process()方法會打印一條"Async loop died!"的日志后將worker進程殺死。
思考
通過代碼可以出來,對于jstorm,“異常后worker退出”是一個故意設計出的特性,并非程序不健壯。猜測這一塊的設計理念就是對于已知異常,開發人員自己捕獲并重新拋出FailedException,使相應消息失敗;未知異常則強制使進程直接失敗退出,避免過度的catch導致問題被掩蓋。
不過雖然話是這么說,對這個設計還是持保留意見,畢竟storm和普通的java程序不一樣,storm的worker進程在退出后是會自動被重啟的,所以這種異常處理方式并不能起到failfast的效果。
相反,worker的持續重啟,還會帶來一些其他問題。再一個,不主動將消息標為失敗,而是等超時,如果設置的超時時間過長(當然超時時間太長也不合理),也會引入一些問題。比如說kafkaSpout, 一條消息沒被ack之前是不會繼續取后邊的數據的,這樣如果有一條數據需要等超時,同分區下的數據在這一個超時周期內,就都無法被處理了。
從另一方面來說,如果像FailedException一樣處理其他所有異常,由于異常之后可以看到有數據fail,也并不會掩蓋問題。
看完上述內容,你們掌握jstorm中bolt是如何處理異常的的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。