中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Storm中Thrift如何使用

發布時間:2021-08-05 17:29:19 來源:億速云 閱讀:158 作者:Leah 欄目:云計算

這期內容當中小編將會給大家帶來有關Storm中Thrift如何使用,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

1 IDL

首先是storm.thrift, 作為IDL里面定義了用到的數據結構和service 
然后backtype.storm.generated, 存放從IDL通過Thrift自動轉化成的Java代碼

比如對于nimbus service 
在IDL的定義為,

service Nimbus {

  void submitTopology(1: string name, 2: string uploadedJarLocation, 3: stringjsonConf, 4: StormTopology topology) throws (1: AlreadyAliveException e, 2:InvalidTopologyException ite);

  void submitTopologyWithOpts(1: string name, 2: string uploadedJarLocation, 3:string jsonConf, 4: StormTopology topology, 5: SubmitOptions options) throws (1: AlreadyAliveException e, 2: InvalidTopologyExceptionite);

  void killTopology(1: string name) throws (1: NotAliveException e);

  void killTopologyWithOpts(1: string name, 2: KillOptions options) throws (1: NotAliveException e);

  void activate(1: string name) throws (1: NotAliveException e);

  void deactivate(1: string name) throws (1: NotAliveException e);

  void rebalance(1: string name, 2: RebalanceOptions options) throws (1: NotAliveException e, 2: InvalidTopologyExceptionite);

  // need to add functions for asking aboutstatus of storms, what nodes they're running on, looking at task logs

string beginFileUpload();

  void uploadChunk(1: string location, 2: binary chunk);

  void finishFileUpload(1: string location);

string beginFileDownload(1: string file);

  //can stop downloading chunks when receive0-length byte array back

binary downloadChunk(1: string id);

  // returns json

string getNimbusConf();

  // stats functions

ClusterSummary getClusterInfo();

TopologyInfo getTopologyInfo(1: string id) throws (1:NotAliveException e);

  //returns json

string getTopologyConf(1: string id) throws (1:NotAliveException e);

  StormTopologygetTopology(1: string id) throws (1: NotAliveException e);

StormTopology getUserTopology(1: string id) throws (1:NotAliveException e);

}

而對應在Nimbus.java的Java代碼如下,

public class Nimbus {

  public interface Iface {

    public void submitTopology(String name, StringuploadedJarLocation, String jsonConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException,org.apache.thrift7.TException;

    public void submitTopologyWithOpts(String name, StringuploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptionsoptions) throws AlreadyAliveException,InvalidTopologyException, org.apache.thrift7.TException;

    public void killTopology(String name) throws NotAliveException, org.apache.thrift7.TException;

    public void killTopologyWithOpts(String name,KillOptions options) throws NotAliveException,org.apache.thrift7.TException;

    public void activate(String name) throws NotAliveException, org.apache.thrift7.TException;

    public void deactivate(String name) throws NotAliveException, org.apache.thrift7.TException;

    public void rebalance(String name, RebalanceOptionsoptions) throws NotAliveException, InvalidTopologyException,org.apache.thrift7.TException;

    public String beginFileUpload() throwsorg.apache.thrift7.TException;

    public void uploadChunk(String location, ByteBufferchunk) throws org.apache.thrift7.TException;

    public void finishFileUpload(String location) throws org.apache.thrift7.TException;

    public String beginFileDownload(String file) throws org.apache.thrift7.TException;

    public ByteBuffer downloadChunk(String id) throws org.apache.thrift7.TException;

    public String getNimbusConf() throwsorg.apache.thrift7.TException;

    public ClusterSummary getClusterInfo() throwsorg.apache.thrift7.TException;

    public TopologyInfo getTopologyInfo(String id) throws NotAliveException, org.apache.thrift7.TException;

    public String getTopologyConf(String id) throws NotAliveException, org.apache.thrift7.TException;

    public StormTopology getTopology(String id) throws NotAliveException, org.apache.thrift7.TException;

    public StormTopology getUserTopology(String id) throws NotAliveException, org.apache.thrift7.TException;

  }

2 Client

1. 首先Get Client,

NimbusClient client =NimbusClient.getConfiguredClient(conf);

看看backtype.storm.utils下面的client.getConfiguredClient的邏輯, 
只是從配置中取出nimbus的host:port, 并new NimbusClient

    public static NimbusClient getConfiguredClient(Map conf) {

       try {

           String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);

           int nimbusPort =Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT));

           return new NimbusClient(conf, nimbusHost, nimbusPort);

       } catch (TTransportException ex) {

           throw new RuntimeException(ex);

       }

    }

NimbusClient 繼承自ThriftClient, public class NimbusClient extends ThriftClient 
ThriftClient又做了什么? 關鍵是怎么進行數據序列化和怎么將數據傳輸到remote 
這里看出Thrift對Transport和Protocol的封裝 
對于Transport, 其實就是對Socket的封裝, 使用TSocket(host, port) 
然后對于protocol, 默認使用TBinaryProtocol, 如果你不指定的話

    public ThriftClient(Map storm_conf, String host, int port, Integer timeout) throws TTransportException {

       try {

           //locate loginconfiguration

           Configuration login_conf = AuthUtils.GetConfiguration(storm_conf);

           //construct atransport plugin

           ITransportPlugin  transportPlugin= AuthUtils.GetTransportPlugin(storm_conf, login_conf);

           //create a socketwith server

           if(host==null) {

                throw new IllegalArgumentException("host is not set");

           }

           if(port<=0) {

                throw new IllegalArgumentException("invalid port: "+port);

           }            

           TSocket socket = new TSocket(host, port);

           if(timeout!=null) {

                socket.setTimeout(timeout);

           }

           final TTransport underlyingTransport = socket;

           //establishclient-server transport via plugin

           _transport = transportPlugin.connect(underlyingTransport, host);

       } catch (IOException ex) {

           throw new RuntimeException(ex);

       }

       _protocol = null;

        if (_transport != null)

           _protocol = new TBinaryProtocol(_transport);

    }

2. 調用任意RPC 
那么就看看submitTopologyWithOpts

client.getClient().submitTopologyWithOpts(name,submittedJar, serConf, topology, opts);

可以看出上面的Nimbus的interface里面有這個方法的定義, 而且Thrift不僅僅自動產生java interface, 而且還提供整個RPC client端的實現

    public void submitTopologyWithOpts(String name, StringuploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptionsoptions) throws AlreadyAliveException,InvalidTopologyException, org.apache.thrift7.TException

    {

     send_submitTopologyWithOpts(name, uploadedJarLocation, jsonConf,topology, options);

     recv_submitTopologyWithOpts();

    }

分兩步, 
首先send_submitTopologyWithOpts, 調用sendBase 
接著, recv_submitTopologyWithOpts, 調用receiveBase

  protected void sendBase(String methodName, TBase args) throws TException {

   oprot_.writeMessageBegin(new TMessage(methodName, TMessageType.CALL,++seqid_));

   args.write(oprot_);

   oprot_.writeMessageEnd();

   oprot_.getTransport().flush();

  }

  protected void receiveBase(TBase result, String methodName)throws TException {

   TMessage msg = iprot_.readMessageBegin();

    if (msg.type == TMessageType.EXCEPTION) {

     TApplicationException x = TApplicationException.read(iprot_);

      iprot_.readMessageEnd();

      throw x;

    }

    if (msg.seqid != seqid_) {

      throw newTApplicationException(TApplicationException.BAD_SEQUENCE_ID, methodName +" failed: out ofsequence response");

    }

   result.read(iprot_);

   iprot_.readMessageEnd();

  }

可以看出Thrift對protocol的封裝, 不需要自己處理序列化, 調用protocol的接口搞定 

3 Server

Thrift強大的地方是, 實現了整個協議棧而不光只是IDL的轉化, 對于server也給出多種實現 
下面看看在nimbus server端, 是用clojure來寫的 
可見其中使用Thrift封裝的NonblockingServerSocket, THsHaServer,TBinaryProtocol, Proccessor, 非常簡單 
其中processor會使用service-handle來處理recv到的數據, 所以作為使用者只需要在service-handle中實現Nimbus$Iface, 其他和server相關的, Thrift都已經幫你封裝好了, 這里使用的IDL也在backtype.storm.generated, 因為clojure基于JVM所以IDL只需要轉化成Java即可.

(defn launch-server! [conf nimbus]

(validate-distributed-mode! conf)

  (let[service-handler (service-handler conf nimbus)

       options (-> (TNonblockingServerSocket. (int (confNIMBUS-THRIFT-PORT)))

                    (THsHaServer$Args.)

                    (.workerThreads 64)

                    (.protocolFactory (TBinaryProtocol$Factory.))

                    (.processor(Nimbus$Processor. service-handler))

                    )

    (.addShutdownHook(Runtime/getRuntime) (Thread. (fn [] (.shutdown service-handler) (.stopserver))))

   (log-message "StartingNimbus server...")

   (.serve server)))

上述就是小編為大家分享的Storm中Thrift如何使用了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

德保县| 武义县| 论坛| 禹城市| 兴和县| 芦溪县| 应用必备| 岗巴县| 四会市| 万盛区| 新绛县| 志丹县| 东宁县| 巫山县| 科技| 淮南市| 海南省| 外汇| 句容市| 泾阳县| 静海县| 宁城县| 舒城县| 汝城县| 深圳市| 石城县| 慈利县| 连平县| 来安县| 宿迁市| 金秀| 北海市| 安福县| 石柱| 陆川县| 灯塔市| 萨迦县| 灵丘县| 佛冈县| 图片| 陈巴尔虎旗|