您好,登錄后才能下訂單哦!
本篇內容主要講解“Storm DRPC怎么使用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Storm DRPC怎么使用”吧!
Storm里面引入DRPC主要是利用storm的實時計算能力來并行化CPU密集型(CPU intensive)的計算任務。DRPC的stormtopology以函數的參數流作為輸入,而把這些函數調用的返回值作為topology的輸出流。
DRPC其實不能算是storm本身的一個特性, 它是通過組合storm的原語stream、spout、bolt、 topology而成的一種模式(pattern)。本來應該把DRPC單獨打成一個包的, 但是DRPC實在是太有用了,所以我們我們把它和storm捆綁在一起。
概覽
Distributed RPC是由一個”DPRC服務器”協調(storm自帶了一個實現)。DRPC服務器協調:① 接收一個RPC請求 ② 發送請求到storm topology ③ 從storm topology接收結果 ④ 把結果發回給等待的客戶端。從客戶端的角度來看一個DRPC調用跟一個普通的RPC調用沒有任何區別。比如下面是客戶端如何調用RPC計算“reach”功能(function)的結果
DRPCClient client = new DRPCClient("drpc-host", 3772); String result = client.execute("reach", "http://twitter.com"); |
DRPC的工作流大致是這樣的(重要☆):
客戶端給DRPC服務器發送要執行的函數(function)的名字,以及這個函數的參數。實現了這個函數的topology使用DRPCSpout從DRPC服務器接收函數調用流,每個函數調用被DRPC服務器標記了一個唯一的id。 這個topology然后計算結果,在topology的最后,一個叫做ReturnResults的bolt會連接到DRPC服務器,并且把這個調用的結果發送給DRPC服務器(通過那個唯一的id標識)。DRPC服務器用那個唯一id來跟等待的客戶端匹配上,喚醒這個客戶端并且把結果發送給它。
LinearDRPCTopologyBuilder
Storm自帶了一個稱作LinearDRPCTopologyBuilder的topology builder,它把實現DRPC的幾乎所有步驟都自動化了。這些步驟包括:
1、設置spout
2、把結果返回給DRPC服務器
3、給bolt提供有限聚合幾組tuples的能力
來看一個簡單的例子,下面是一個把輸入參數后面添加一個”!”的DRPC topology的實現:
public static class ExclaimBolt extends BaseBasicBolt { public void execute(Tuple tuple, BasicOutputCollector collector) { String input = tuple.getString(1); collector.emit(new Values(tuple.getValue(0), input + "!")); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "result")); } } public static void main(String[] args) throws Exception { LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation"); builder.addBolt(new ExclaimBolt(), 3); // ... } |
可以看出來,我們需要做的事情非常的少。創建LinearDRPCTopologyBuilder的時候,你需要告訴它你要實現的DRPC函數(DRPC function)的名字。一個DRPC服務器可以協調很多函數,函數與函數之間靠函數名字來區分。你聲明的第一個bolt會接收一個兩維tuple,tuple的第一個字段是request-id,第二個字段是這個請求的參數。LinearDRPCTopologyBuilder同時要求我們topology的最后一個bolt發送一個形如[id, result]的二維tuple:第一個field是request-id,第二個field是這個函數的結果。最后所有中間tuple的第一個field必須是request-id。
在這里例子里面ExclaimBolt 簡單地在輸入tuple的第二個field后面再添加一個”!”,其余的事情都由LinearDRPCTopologyBuilder幫我們搞定:連接到DRPC服務器,并且把結果發回。
本地模式DRPC
DRPC可以以本地模式運行,下面就是以本地模式運行上面例子的代碼:
LocalDRPC drpc = new LocalDRPC(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc)); System.out.println("Results for 'hello':" + drpc.execute("exclamation", "hello")); cluster.shutdown(); drpc.shutdown(); |
首先你創建一個LocalDRPC對象,這個對象在進程內模擬一個DRPC服務器(這很類似于LocalCluster在進程內模擬一個Storm集群),然后創建LocalCluster對象在本地模式運行topology。LinearTopologyBuilder有單獨的方法來創建本地的topology和遠程的topology。在本地模式里面LocalDRPC對象不和任何端口綁定,所以我們的topology對象需要知道和誰交互,這就是為什么createLocalTopology方法接受一個LocalDRPC對象作為輸入的原因。
把topology啟動了之后,你就可以通過調用LocalDRPC對象的execute來調用RPC方法了。
遠程模式DRPC
在一個真實集群上面DRPC也是非常簡單的,有三個步驟:
1、啟動DRPC服務器
2、配置DRPC服務器的地址
3、提交DRPCtopology到storm集群里面去。
我們可以通過“bin/storm drpc”命令來啟動DRPC服務器。
接著,你需要讓你的storm集群知道你的DRPC服務器的地址。DRPCSpout需要這個地址從而可以從DRPC服務器來接收函數調用。這個可以配置在storm.yaml或者通過代碼的方式配置在topology里面。通過storm.yaml配置是這樣的:
drpc.servers: - "drpc1.foo.com" - "drpc2.foo.com" |
最后,你通過StormSubmitter對象來提交DRPC topology(這個跟你提交其它topology沒有區別)。如果要以遠程的方式運行上面的例子,用下面的代碼:
StormSubmitter.submitTopology("exclamation-drpc", conf, builder.createRemoteTopology()); |
我們用createRemoteTopology方法來創建運行在真實集群上的DRPC topology。
到此,相信大家對“Storm DRPC怎么使用”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。