您好,登錄后才能下訂單哦!
本篇內容介紹了“TARS C++客戶端是什么”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
TARS是騰訊使用十年的微服務開發框架,目前支持C++、Java、PHP、Node.js、Go語言。該開源項目為用戶提供了涉及到開發、運維、以及測試的一整套微服務平臺PaaS解決方案,幫助一個產品或者服務快速開發、部署、測試、上線。目前該框架應用在騰訊各大核心業務,基于該框架部署運行的服務節點規模達到數十萬。 TARS的通信模型中包含客戶端和服務端。客戶端服務端之間主要是利用RPC進行通信。本系列文章分上下兩篇,對RPC調用部分進行源碼解析。本文是上篇,我們將以C++語言為載體,帶大家了解一下TARS的客戶端。
TARS的客戶端最重要的類是Communicator,一個客戶端只能聲明出一個Communicator類實例,用戶可以通過CommunicatorPtr& Application::getCommunicator()獲取線程安全的Communicator類單例。Communicator類聚合了兩個比較重要的類,一個是CommunicatorEpoll,負責網絡線程的建立與通過ObjectProxyFactory生成ObjectProxy;另一個是ServantProxyFactory,生成不同的RPC服務句柄,即ServantProxy,用戶通過ServantProxy調用RPC服務。下面簡單介紹幾個類的作用。
一個Communicator實例就是一個客戶端,負責與服務端建立連接,生成RPC服務句柄,可以通過CommunicatorPtr& Application::getCommunicator()獲取Communicator實例,用戶最后不要自己聲明定義新的Communicator實例。
ServantProxy就是一個服務代理,ServantProxy可以通過ServantProxyFactory工廠類生成,用戶往往通過Communicator的template<class T> void stringToProxy()接口間接調用ServantProxyFactory的ServantPrx::element_type* getServantProxy()接口以獲取服務代理,通過服務代理ServantProxy,用戶就可以進行RPC調用了。ServantProxy內含多個服務實體ObjectProxy(詳見下文第4小點),能夠幫助用戶在同一個服務代理內進行負載均衡。
CommunicatorEpoll類代表客戶端的網絡模塊,內含TC_Epoller作為IO復用,能夠同時處理不同主調線程(caller線程)的多個請求。CommunicatorEpoll內含服務實體工廠類ObjectProxyFactory(詳見下文第4小點),意味著在同一網絡線程中,能夠產生不同服務的實體,能夠完成不同的RPC服務調用。CommunicatorEpoll還聚合了異步調用處理線程AsyncProcThread,負責接收到異步的響應包之后,將響應包交給該線程處理。
ObjectProxy類是一個服務實體,注意與ServantProxy類是一個服務代理相區別,前者表示一個網絡線程上的某個服務實體A,后者表示對所有網絡線程上的某服務實體A的總代理,更詳細的介紹可見下文。ObjectProxy通過ObjectProxyFactory生成,而ObjectProxyFactory類的實例是CommunicatorEpoll的成員變量,意味著一個網絡線程CommunicatorEpoll能夠產生各種各樣的服務實體ObjectProxy,發起不同的RPC服務。ObjectProxy通過AdapterProxy來管理對服務端的連接。 好了,介紹完所有的類之后,先通過類圖理一理他們之間的關系,這個類圖在之后的文章中將會再次出現。
TARS的客戶端最重要的類是Communicator,一個客戶端只能聲明出一個Communicator類實例,用戶可以通過CommunicatorPtr& Application::getCommunicator()獲取線程安全的Communicator類單例。Communicator類聚合了兩個比較重要的類,一個是CommunicatorEpoll,負責網絡線程的建立與通過ObjectProxyFactory生成ObjectProxy;另一個是ServantProxyFactory,生成不同的RPC服務句柄,即ServantProxy,用戶通過ServantProxy調用RPC服務。 根據用戶配置,Communicator擁有n個網絡線程,即n個CommunicatorEpoll。每個CommunicatorEpoll擁有一個ObjectProxyFactory類,每個ObjectProxyFactory可以生成一系列的不同服務的實體對象ObjectProxy,因此,假如Communicator擁有兩個CommunicatorEpoll,并有foo與bar這兩類不同的服務實體對象,那么如下圖(1-2)所示,每個CommunicatorEpoll可以通過 ObjectProxyFactory創建兩類ObjectProxy,這是TARS客戶端的第一層負載均衡,每個線程都可以分擔所有服務的RPC請求,因此,一個服務的阻塞可能會影響其他服務,因為網絡線程是多個服務實體ObjectProxy所共享的。
Communicator類下另一個比較重要的ServantProxyFactory類的作用是依據實際服務端的信息(如服務器的socket標志)與Communicator中客戶端的信息(如網絡線程數)而生成ServantProxy句柄,通過句柄調用RPC服務。舉個例子,如下圖(1-3)所示,Communicator實例通過ServantProxyFactory成員變量的getServantProxy()接口在構造fooServantProxy句柄的時候,會獲取Communicator實例下的所有CommunicatorEpoll(即CommunicatorEpoll-1與CommunicatorEpoll-2)中的fooObjectProxy(即fooObjectProxy-1與fooObjectProxy-2),并作為構造fooServantProxy的參數。Communicator通過ServantProxyFactory能夠獲取foo與bar這兩類ServantProxy,ServantProxy與相應的ObjectProxy存在相應的聚合關系:
另外,每個ObjectProxy都擁有一個EndpointManager,例如,fooObjectProxy 的EndpointManager管理fooObjectProxy 下面的所有fooAdapterProxy,每個AdapterProxy連接到一個提供相應foo服務的服務端物理機socket上。通過EndpointManager還可以以不同的負載均衡方式獲取連接AdapterProxy。假如foo服務有兩臺物理機,bar服務有一臺物理機,那么ObjectProxy,EndpointManager與AdapterProxy關系如下圖(1-4)所示。上面提到,不同的網絡線程CommunicatorEpoll均可以發起同一RPC請求,對于同一RPC服務,選取不同的ObjectProxy(或可認為選取不同的網絡線程CommunicatorEpoll)是第一層的負載均衡,而對于同一個被選中的ObjectProxy,通過EndpointManager選擇不同的socket連接AdapterProxy(假如ObjectProxy有大于1個的AdapterProxy,如圖(1-4)的fooObjectProxy)是第二層的負載均衡。
在客戶端進行初始化時,必須建立上面介紹的關系,因此相應的類圖如圖(1-5)所示,通過類圖可以看出各類的關系,以及初始化需要用到的函數。
現在,通過代碼跟蹤來看看,在客戶端初始化過程中,各個類是如何被初始化出來并建立上述的架構關系的。在簡述之前,可以先看看函數的調用流程圖,若看不清晰,可以將圖片保存下來,用看圖軟件放大查看,強烈建議結合文章的代碼解析以TARS源碼一起查看,文章后面的所有代碼流程圖均如此。 接下來,將會按照函數調用流程圖來一步一步分析客戶端代理是如何被初始化出來的:
在客戶端程序中,一開始會執行下面的代碼進行整個客戶端代理的初始化:
Communicator comm; HelloPrx prx; comm.stringToProxy("TestApp.HelloServer.HelloObj@tcp -h 1.1.1.1 -p 20001" , prx);
先聲明一個Communicator變量comm(其實不建議這么做)以及一個ServantProxy類的指針變量prx,在此處,服務為Hello,因此聲明一個HelloPrx prx。注意一個客戶端只能擁有一個Communicator。為了能夠獲得RPC的服務句柄,我們調用Communicator::stringToProxy(),并傳入服務端的信息與prx變量,函數返回后,prx就是RPC服務的句柄。 進入Communicator::stringToProxy()函數中,我們通過Communicator::getServantProxy()來依據objectName與setName獲取服務代理ServantProxy:
/** * 生成代理 * @param T * @param objectName * @param setName 指定set調用的setid * @param proxy */ template<class T> void stringToProxy(const string& objectName, T& proxy,const string& setName="") { ServantProxy * pServantProxy = getServantProxy(objectName,setName); proxy = (typename T::element_type*)(pServantProxy); }
進入Communicator::getServantProxy(),首先會執行Communicator::initialize()來初始化Communicator,需要注意一點,Communicator:: initialize()只會被執行一次,下一次執行Communicator::getServantProxy()將不會再次執行Communicator:: initialize()函數:
void Communicator::initialize() { TC_LockT<TC_ThreadRecMutex> lock(*this); if (_initialized) //已經被初始化則直接返回 return; ...... }
進入Communicator::initialize()函數中,在這里,將會new出上文介紹的與Communicator密切相關的類ServantProxyFactory與n個CommunicatorEpoll,n為客戶端的網絡線程數,最小為1,最大為MAX_CLIENT_THREAD_NUM:
void Communicator::initialize() { ...... _servantProxyFactory = new ServantProxyFactory(this); ...... for(size_t i = 0; i < _clientThreadNum; ++i) { _communicatorEpoll[i] = new CommunicatorEpoll(this, i); _communicatorEpoll[i]->start(); //啟動網絡線程 } ...... }
在CommunicatorEpoll的構造函數中,ObjectProxyFactory被創建出來,這是構造圖(1-2)關系的前提。除此之外,還可以看到獲取相應配置,創建并啟動若干個異步回調后的處理線程。創建完成后,調用CommunicatorEpoll::start()啟動網絡線程。至此,Communicator::initialize()順利執行。通過下圖回顧上面的過程:
代碼回到Communicator::getServantProxy()中 Communicator::getServantProxy()會執行ServantProxyFactory::getServantProxy()并返回相應的服務代理:
ServantProxy* Communicator::getServantProxy(const string& objectName, const string& setName) { …… return _servantProxyFactory->getServantProxy(objectName,setName); }
進入ServantProxyFactory::getServantProxy(),首先會加鎖,從map<string, ServantPrx> _servantProxy中查找目標,若查找成功直接返回。若查找失敗,TARS需要構造出相應的ServantProxy,ServantProxy的構造需要如圖(1-3)所示的相對應的ObjectProxy作為構造函數的參數,由此可見,在ServantProxyFactory::getServantProxy()中有如下獲取ObjectProxy指針數組的代碼:
ObjectProxy ** ppObjectProxy = new ObjectProxy * [_comm->getClientThreadNum()]; assert(ppObjectProxy != NULL); for(size_t i = 0; i < _comm->getClientThreadNum(); ++i) { ppObjectProxy[i] = _comm->getCommunicatorEpoll(i)->getObjectProxy(name, setName); }
代碼來到ObjectProxyFactory::getObjectProxy(),同樣,會首先加鎖,再從map<string,ObjectProxy*> _objectProxys中查找是否已經擁有目標ObjectProxy,若查找成功直接返回。若查找失敗,需要新建一個新的ObjectProxy,通過類圖可知,ObjectProxy需要一個CommunicatorEpoll對象進行初始化,由此關聯管理自己的CommunicatorEpoll,CommunicatorEpoll之后便可以通過getObjectProxy()接口獲取屬于自己的ObjectProxy。詳細過程可見下圖:
新建ObjectProxy的過程同樣非常值得關注,在ObjectProxy::ObjectProxy()中,關鍵代碼是:
_endpointManger.reset(new EndpointManager(this, _communicatorEpoll->getCommunicator(), sObjectProxyName, pCommunicatorEpoll->isFirstNetThread(), setName));
每個ObjectProxy都有屬于自己的EndpointManager負責管理到服務端的所有socket連接AdapterProxy,每個AdapterProxy連接到一個提供相應服務的服務端物理機socket上。通過EndpointManager還可以以不同的負載均衡方式獲取與服務器的socket連接AdapterProxy。 ObjectProxy:: ObjectProxy()是圖(1-6)或者圖(1-8)中的略1,具體的代碼流程如圖(1-9)所示。ObjectProxy創建一個EndpointManager對象,在EndpointManager的初始化過程中,依據客戶端提供的信息,直接創建連接到服務端物理機的TCP/UDP連接AdapterProxy或者從代理中獲取服務端物理機socket列表后再創建TCP/UDP連接AdapterProxy。
按照圖(1-9)的程序流程執行完成后,便會建立如圖(2-3)所示的一個ObjectProxy對多個AdapterProxy的關系。 新建ObjectProxy之后,就可以調用其ObjectProxy::initialize()函數進行ObjectProxy對象的初始化了,當然,需要將ObjectProxy對象插入ObjectProxyFactory的成員變量_objectProxys與_vObjectProxys中,方便下次直接返回ObjectProxy對象。
退出層層的函數調用棧,代碼再次回 ServantProxyFactory::getServantProxy(),此時,ServantProxyFactory已經獲得相應的ObjectProxy數組ObjectProxy** ppObjectProxy,接著便可以調用:
ServantPrx sp = new ServantProxy(_comm, ppObjectProxy, _comm->getClientThreadNum());
進行ServantProxy的構造。構造完成便可以呈現出如圖(2-1)的關系。在ServantProxy的構造函數中可以看到,ServantProxy在新建一個EndpointManagerThread變量,這是對外獲取路由請求的類,是TARS為調用邏輯而提供的多種解決跨地區調用等問題的方案。同時可以看到:
for(size_t i = 0;i < _objectProxyNum; ++i) { (*(_objectProxy + i))->setServantProxy(this); }
建立了ServantProxy與ObjectProxy的相互關聯關系。剩下的是讀取配置文件,獲取相應的信息。 構造ServantProxy變量完成后,ServantProxyFactory::getServantProxy()獲取一些超時參數,賦值給ServantProxy變量,同時將其放進map<string, ServantPrx> _servantProxy中,方便下次直接查找獲取。 ServantProxyFactory::getServantProxy()將剛剛構造的ServantProxy指針變量返回給調用他的Communicator::getServantProxy(),在Communicator::getServantProxy()中:
ServantProxy * Communicator::getServantProxy(const string& objectName,const string& setName) { …… return _servantProxyFactory->getServantProxy(objectName,setName); }
直接將返回值返回給調用起Communicator::getServantProxy()的Communicator::stringToProxy()。可以看到:
template<class T> void stringToProxy(const string& objectName, T& proxy,const string& setName="") { ServantProxy * pServantProxy = getServantProxy(objectName,setName); proxy = (typename T::element_type*)(pServantProxy); }
Communicator::stringToProxy()將返回值強制轉換為客戶端代碼中與HelloPrx prx同樣的類型HelloPrx。由于函數參數proxy就是prx的引用。那么實際就是將句柄prx成功初始化了,用戶可以利用句柄prx進行RPC調用了。
當我們獲得一個ServantProxy句柄后,便可以進行RPC調用了。Tars提供四種RPC調用方式,分別是同步調用,異步調用,promise調用與協程調用。其中最簡單最常見的RPC調用方式是同步調用,接下來,將簡單分析Tars的同步調用。
現假設有一個MyDemo.StringServer.StringServantObj的服務,提供一個RPC接口是append,傳入兩個string類型的變量,返回兩個string類型變量的拼接結果。而且假設有兩臺服務器,socket標識分別是192.168.106.129:34132與192.168.106.130:34132,設置客戶端的網絡線程數為3,那么執行如下代碼:
Communicator _comm; StringServantPrx _proxy; _comm.stringToProxy("MyDemo.StringServer.StringServantObj@tcp -h 192.168.106.129 -p 34132", _proxy); _comm.stringToProxy("MyDemo.StringServer.StringServantObj@tcp -h 192.168.106.130 -p 34132", _proxy);
經過上文關于客戶端初始化的分析介紹可知,可以得出如下圖所示的關系圖:
獲取StringServantPrx _proxy后,直接調用:
string str1(abc-), str2(defg), rStr; int retCode = _proxy->append(str1, str2, rStr);
成功進行RPC同步調用后,返回的結果是rStr = “abc-defg”。
同樣,我們先看看與同步調用相關的類圖,如下圖所示:
StringServantProxy是繼承自ServantProxy的,StringServantProxy提供了RPC同步調用的接口Int32 append(),當用戶發起同步調用_proxy->append(str1, str2, rStr)時,所進行的函數調用過程如下圖所示。
在函數StringServantProxy::append()中,程序會先構造ServantProxy::tars_invoke()所需要的參數,如請求包類型,RPC方法名,方法參數等,需要值得注意的是,傳遞參數中有一個ResponsePacket類型的變量,在同步調用中,最終的返回結果會放置在這個變量上。接下來便直接調用了ServantProxy::tars_invoke()方法:
tars_invoke(tars::TARSNORMAL, "append", _os.getByteBuffer(), context, _mStatus, rep);
在ServantProxy::tars_invoke()方法中,先創建一個ReqMessage變量msg,初始化msg變量,給變量賦值,如Tars版本號,數據包類型,服務名,RPC方法名,Tars的上下文容器,同步調用的超時時間(單位為毫秒)等。最后,調用ServantProxy::invoke()進行遠程方法調用。
無論同步調用還是各種異步調用,ServantProxy::invoke()都是RPC調用的必經之地。在ServantProxy::invoke()中,繼續填充傳遞進來的變量ReqMessage msg。此外,還需要獲取調用者caller線程的線程私有數據ServantProxyThreadData,用來指導RPC調用。客戶端的每個caller線程都有屬于自己的維護調用上下文的線程私有數據,如hash屬性,消息染色信息。最關鍵的還是每條caller線程與每條客戶端網絡線程CommunicatorEpoll進行信息交互的橋梁——通信隊列ReqInfoQueue數組,數組中的每個ReqInfoQueue元素負責與一條網絡線程進行交互,如圖(1-13)所示,圖中橙色陰影代表數組ReqInfoQueue[],陰影內的圓柱體代表數組元素ReqInfoQueue。假如客戶端create兩條線程(下稱caller線程)發起StringServant服務的RPC請求,且客戶端網絡線程數設置為2,那么兩條caller線程各自有屬于自己的線程私有數據請求隊列數組ReqInfoQueue[],數組里面的ReqInfoQueue元素便是該數組對應的caller線程與兩條網絡線程的通信橋梁,一條網絡線程對應著數組里面的一個元素,通過網絡線程ID進行數組索引。整個關系有點像生產者消費者模型,生產者Caller線程向自己的線程私有數據**ReqInfoQueue[]**中的第N個元素ReqInfoQueue[N] push請求包,消費者客戶端第N個網絡線程就會從這個隊列中pop請求包。
閱讀代碼可能會發現幾個常量值,如MAX_CLIENT_THREAD_NUM=64,這是最大網絡線程數,在圖(1-13)中為單個請求隊列數組ReqInfoQueue[]的最大size;MAX_CLIENT_NOTIFYEVENT_NUM=2048,在圖(1-13)中,可以看作caller線程的最大數量,或者請求隊列數組ReqInfoQueue[]的最大數量(反正兩者一一對應,每個caller線程都有自己的線程私有數據ReqInfoQueue[])。
接著依據caller線程的線程私有數據進行第一次的負載均衡——選取ObjectProxy(即選擇網絡線程CommunicatorEpoll)和與之相對應的ReqInfoQueue:
ObjectProxy * pObjProxy = NULL; ReqInfoQueue * pReqQ = NULL; //選擇網絡線程 selectNetThreadInfo(pSptd, pObjProxy, pReqQ);
在ServantProxy::selectNetThreadInfo()中,通過輪詢的形式來選取ObjectProxy與ReqInfoQueue。
退出ServantProxy::selectNetThreadInfo()后,便得到ObjectProxy_類型的pObjProxy及其對應的ReqInfoQueue_類型的ReqInfoQueue,稍后通過pObjectProxy來發送RPC請求,請求信息會暫存在ReqInfoQueue中。
由于是同步調用,需要新建一個條件變量去監聽RPC的完成,可見:
//同步調用 new 一個ReqMonitor assert(msg->pMonitor == NULL); if(msg->eType == ReqMessage::SYNC_CALL) { msg->bMonitorFin = false; if(pSptd->_sched) { msg->bCoroFlag = true; msg->sched = pSptd->_sched; msg->iCoroId = pSptd->_sched->getCoroutineId(); } else { msg->pMonitor = new ReqMonitor; } }
創建完條件變量,接下來往ReqInfoQueue中push_back()請求信息包msg,并通知pObjProxy所屬的CommunicatorEpoll進行數據發送:
if(!pReqQ->push_back(msg,bEmpty)) { TLOGERROR("[TARS][ServantProxy::invoke msgQueue push_back error num:" << pSptd->_netSeq << "]" << endl); delete msg; msg = NULL; pObjProxy->getCommunicatorEpoll()->notify(pSptd->_reqQNo, pReqQ); throw TarsClientQueueException("client queue full"); } pObjProxy->getCommunicatorEpoll()->notify(pSptd->_reqQNo, pReqQ);
來到CommunicatorEpoll::notify()中,往請求事件通知數組NotifyInfo _notify[]中添加請求事件,通知CommunicatorEpoll進行請求包的發送。注意了,這個函數的作用僅僅是通知網絡線程準備發送數據,通過TC_Epoller::mod()或者TC_Epoller::add()觸發一個EPOLLIN事件,從而促使阻塞在TC_Epoller::wait()(在CommunicatorEpoll::run()中阻塞)的網絡線程CommunicatorEpoll被喚醒,并設置喚醒后的epoll_event中的聯合體epoll_data變量為&_notify[iSeq].stFDInfo:
void CommunicatorEpoll::notify(size_t iSeq,ReqInfoQueue * msgQueue) { assert(iSeq < MAX_CLIENT_NOTIFYEVENT_NUM); if(_notify[iSeq].bValid) { _ep.mod(_notify[iSeq].notify.getfd(),(long long)&_notify[iSeq].stFDInfo, EPOLLIN); assert(_notify[iSeq].stFDInfo.p == (void*)msgQueue); } else { _notify[iSeq].stFDInfo.iType = FDInfo::ET_C_NOTIFY; _notify[iSeq].stFDInfo.p = (void*)msgQueue; _notify[iSeq].stFDInfo.fd = _notify[iSeq].eventFd; _notify[iSeq].stFDInfo.iSeq = iSeq; _notify[iSeq].notify.createSocket(); _notify[iSeq].bValid = true; _ep.add(_notify[iSeq].notify.getfd(),(long long)&_notify[iSeq].stFDInfo, EPOLLIN); } }
就是經過這么一個操作,網絡線程就可以被喚醒,喚醒后通過epoll_event變量可獲得&_notify[iSeq].stFDInfo。接下來的請求發送與響應的接收會在后面會詳細介紹。
隨后,代碼再次回到ServantProxy::invoke(),阻塞于:
if(!msg->bMonitorFin) { TC_ThreadLock::Lock lock(*(msg->pMonitor)); //等待直到網絡線程通知過來 if(!msg->bMonitorFin) { msg->pMonitor->wait(); } }
等待網絡線程接收到數據后,對其進行喚醒。 接收到響應后,檢查是否成功獲取響應,是則直接退出函數即可,響應信息在傳入的參數msg中:
if(msg->eStatus == ReqMessage::REQ_RSP && msg->response.iRet == TARSSERVERSUCCESS) { snprintf(pSptd->_szHost, sizeof(pSptd->_szHost), "%s", msg->adapter->endpoint().desc().c_str()); //成功 return; }
若接收失敗,會拋出異常,并刪除msg:
TarsException::throwException(ret, os.str());
若接收成功,退出ServantProxy::invoke()后,回到ServantProxy::tars_invoke(),獲取ResponsePacket類型的響應信息,并刪除msg包:
rsp = msg->response; delete msg; msg = NULL;
代碼回到StringServantProxy::append(),此時經過同步調用,可以直接獲取RPC返回值并回到客戶端中。
上面提到,當在ServantProxy::invoke()中,調用CommunicatorEpoll::notify()通知網絡線程進行請求發送后,接下來,網絡線程的具體執行流程如下圖所示:
由于CommunicatorEpoll繼承自TC_Thread,在上文1.2.2節中的第2小點的初始化CommunicatorEpoll之后便調用其CommunicatorEpoll::start()函數啟動網絡線程,網絡線程在CommunicatorEpoll::run()中一直等待_ep.wait(iTimeout)。由于在上一節的描述中,在CommunicatorEpoll::notify(),caller線程發起了通知notify,網絡線程在CommunicatorEpoll::run()就會調用CommunicatorEpoll::handle()處理通知:
void CommunicatorEpoll::run() { ...... try { int iTimeout = ((_waitTimeout < _timeoutCheckInterval) ? _waitTimeout : _timeoutCheckInterval); int num = _ep.wait(iTimeout); if(_terminate) { break; } //先處理epoll的網絡事件 for (int i = 0; i < num; ++i) { //獲取epoll_event變量的data,就是1.3.1節中提過的&_notify[iSeq].stFDInfo const epoll_event& ev = _ep.get(i); uint64_t data = ev.data.u64; if(data == 0) { continue; //data非指針, 退出循環 } handle((FDInfo*)data, ev.events); } } ...... }
在CommunicatorEpoll::handle()中,通過傳遞進來的epoll_event中的data成員變量獲取前面被選中的ObjectProxy并調用其ObjectProxy::invoke()函數:
void CommunicatorEpoll::handle(FDInfo * pFDInfo, uint32_t events) { try { assert(pFDInfo != NULL); //隊列有消息通知過來 if(FDInfo::ET_C_NOTIFY == pFDInfo->iType) { ReqInfoQueue * pInfoQueue=(ReqInfoQueue*)pFDInfo->p; ReqMessage * msg = NULL; try { while(pInfoQueue->pop_front(msg)) { ...... try { msg->pObjectProxy->invoke(msg); } ...... } } ...... } ...... }
在ObjectProxy::invoke()中將進行第二次的負載均衡,像圖(1-4)所示,每個ObjectProxy通過EndpointManager可以以不同的負載均衡方式對AdapterProxy進行選取選擇:
void ObjectProxy::invoke(ReqMessage * msg) { ...... //選擇一個遠程服務的Adapter來調用 AdapterProxy * pAdapterProxy = NULL; bool bFirst = _endpointManger->selectAdapterProxy(msg, pAdapterProxy); ...... }
在EndpointManager:: selectAdapterProxy()中,有多種負載均衡的方式來選取AdapterProxy,如getHashProxy(),getWeightedProxy(),getNextValidProxy()等。
獲取AdapterProxy之后,便將選擇到的AdapterProxy賦值給EndpointManager:: selectAdapterProxy()函數中的引用參數pAdapterProxy,隨后執行:
void ObjectProxy::invoke(ReqMessage * msg) { ...... msg->adapter = pAdapterProxy; pAdapterProxy->invoke(msg); }
調用pAdapterProxy將請求信息發送出去。而在AdapterProxy::invoke()中,AdapterProxy將調用Transceiver::sendRequset()進行請求的發送。 至此,對應同步調用的網絡線程發送請求的工作就結束了,網絡線程會回到CommunicatorEpoll::run()中,繼續等待數據的收發。
當網絡線程CommunicatorEpoll接收到響應數據之后,如同之前發送請求那樣, 在CommunicatorEpoll::run()中,程序獲取活躍的epoll_event的變量,并將其中的epoll_data_t data傳遞給CommunicatorEpoll::handle():
//先處理epoll的網絡事件 for (int i = 0; i < num; ++i) { const epoll_event& ev = _ep.get(i); uint64_t data = ev.data.u64; if(data == 0) { continue; //data非指針, 退出循環 } handle((FDInfo*)data, ev.events); }
接下來的程序流程如下圖所示:
在CommunicatorEpoll::handle()中,從epoll_data::data中獲取Transceiver指針,并調用CommunicatorEpoll::handleInputImp():
Transceiver *pTransceiver = (Transceiver*)pFDInfo->p; //先收包 if (events & EPOLLIN) { try { handleInputImp(pTransceiver); } catch(exception & e) { TLOGERROR("[TARS]CommunicatorEpoll::handle exp:"<<e.what()<<" ,line:"<<__LINE__<<endl); } catch(...) { TLOGERROR("[TARS]CommunicatorEpoll::handle|"<<__LINE__<<endl); } }
在CommunicatorEpoll::handleInputImp()中,除了對連接的判斷外,主要做兩件事,調用Transceiver::doResponse()以及AdapterProxy::finishInvoke(ResponsePacket&),前者的工作是從socket連接中獲取響應數據并判斷接收的數據是否為一個完整的RPC響應包。后者的作用是將響應結果返回給客戶端,同步調用的會喚醒阻塞等待在條件變量中的caller線程,異步調用的會在異步回調處理線程中執行回調函數。 在AdapterProxy::finishInvoke(ResponsePacket&)中,需要注意一點,假如是同步調用的,需要獲取響應包rsp對應的ReqMessage信息,在Tars中,執行:
ReqMessage * msg = NULL; // 獲取響應包rsp對應的msg信息,并在超時隊列中剔除該msg bool retErase = _timeoutQueue->erase(rsp.iRequestId, msg);
在找回響應包對應的請求信息msg的同時,將其在超時隊列中剔除出來。接著執行:
msg->eStatus = ReqMessage::REQ_RSP; msg->response = rsp; finishInvoke(msg);
程序調用另一個重載函數AdapterProxy::finishInvoke(ReqMessage*),在AdapterProxy::finishInvoke(ReqMessage*)中,不同的RPC調用方式會執行不同的動作,例如同步調用會喚醒對應的caller線程:
//同步調用,喚醒ServantProxy線程 if(msg->eType == ReqMessage::SYNC_CALL) { if(!msg->bCoroFlag) { assert(msg->pMonitor); TC_ThreadLock::Lock sync(*(msg->pMonitor)); msg->pMonitor->notify(); msg->bMonitorFin = true; } else { msg->sched->put(msg->iCoroId); } return ; }
至此,對應同步調用的網絡線程接收響應的工作就結束了,網絡線程會回到CommunicatorEpoll::run()中,繼續等待數據的收發。 綜上,客戶端同步調用的過程如下圖所示。
在Tars中,除了最常見的同步調用之外,還可以進行異步調用,異步調用可分三種:普通的異步調用,promise異步調用與協程異步調用,這里簡單介紹普通的異步調用,看看其與上文介紹的同步調用有何異同。
異步調用不會阻塞整個客戶端程序,調用完成(請求發送)之后,用戶可以繼續處理其他事情,等接收到響應之后,Tars會在異步處理線程當中執行用戶實現好的回調函數。在這里,會用到《Effective C++》中條款35所介紹的“藉由Non-Virtual Interface手法實現Template Method模式”,用戶需要繼承一個XXXServantPrxCallback基類,并實現里面的虛函數,異步回調線程會在收到響應包之后回調這些虛函數,具體的異步調用客戶端示例這里不作詳細介紹,在Tars的Example中會找到相應的示例代碼。
本文第一章已經詳細介紹了客戶端的初始化,這里再簡單提一下,在第一章的“1.2.2初始化代碼跟蹤- 2.執行Communicator的初始化函數”中,已經提到說,在每一個網絡線程CommunicatorEpoll的初始化過程中,會創建_asyncThreadNum條異步線程,等待異步調用的時候處理響應數據:
CommunicatorEpoll::CommunicatorEpoll(Communicator * pCommunicator,size_t netThreadSeq) { ...... //異步線程數 _asyncThreadNum = TC_Common::strto<size_t>(pCommunicator->getProperty("asyncthread", "3")); if(_asyncThreadNum == 0) { _asyncThreadNum = 3; } if(_asyncThreadNum > MAX_CLIENT_ASYNCTHREAD_NUM) { _asyncThreadNum = MAX_CLIENT_ASYNCTHREAD_NUM; } ...... //異步隊列的大小 size_t iAsyncQueueCap = TC_Common::strto<size_t>(pCommunicator->getProperty("asyncqueuecap", "10000")); if(iAsyncQueueCap < 10000) { iAsyncQueueCap = 10000; } ...... //創建異步線程 for(size_t i = 0; i < _asyncThreadNum; ++i) { _asyncThread[i] = new AsyncProcThread(iAsyncQueueCap); _asyncThread[i]->start(); } ...... }
在開始講述異步調用與接收響應之前,先看看大致的調用過程,與圖(1-16)的同步調用來個對比。
跟同步調用的示例一樣,現在有一MyDemo.StringServer.StringServantObj的服務,提供一個RPC接口是append,傳入兩個string類型的變量,返回兩個string類型變量的拼接結果。在執行tars2cpp而生成的文件中,定義了回調函數基類StringServantPrxCallback,用戶需要public繼承這個基類并實現自己的方法,例如:
class asyncClientCallback : public StringServantPrxCallback { public: void callback_append(Int32 ret, const string& rStr) { cout << "append: async callback success and retCode is " << ret << " ,rStr is " << rStr << "\n"; } void callback_append_exception(Int32 ret) { cout << "append: async callback fail and retCode is " << ret << "\n"; } };
然后,用戶就可以通過proxy->async_append(new asyncClientCallback(), str1, str2)進行異步調用了,調用過程與上文的同步調用差不多,函數調用流程如下圖所示,可以與圖(1-12)進行比較,看看同步調用與異步調用的異同。
在異步調用中,客戶端發起異步調用_proxy->async_append(new asyncClientCallback(), str1, str2)后,在函數StringServantProxy::async_append()中,程序同樣會先構造ServantProxy::tars_invoke_async()所需要的參數,如請求包類型,RPC方法名,方法參數等,與同步調用的一個區別是,還傳遞了承載回調函數的派生類實例。接下來便直接調用了ServantProxy::tars_invoke_async()方法:
tars_invoke_async(tars::TARSNORMAL,"append", _os.getByteBuffer(), context, _mStatus, callback)
在ServantProxy::tars_invoke_async()方法中,先創建一個ReqMessage變量msg,初始化msg變量,給變量賦值,如Tars版本號,數據包類型,服務名,RPC方法名,Tars的上下文容器,異步調用的超時時間(單位為毫秒)以及異步調用后的回調函數ServantProxyCallbackPtr callback(等待異步調用返回響應后回調里面的函數)等。最后,與同步調用一樣,調用ServantProxy::invoke()進行遠程方法調用。
在ServantProxy::invoke()中,繼續填充傳遞進來的變量ReqMessage msg。此外,還需要獲取調用者caller線程的線程私有數據ServantProxyThreadData,用來指導RPC調用。與同步調用一樣,利用ServantProxy::selectNetThreadInfo()來輪詢選取ObjectProxy(網絡線程CommunicatorEpoll)與對應的ReqInfoQueue,詳細可看同步調用中的介紹,注意區分客戶端中的調用者caller線程與網絡線程,以及之間的通信橋梁。
退出ServantProxy::selectNetThreadInfo()后,便得到ObjectProxy_類型的pObjProxy及其對應的ReqInfoQueue_類型的ReqInfoQueue,在異步調用中,不需要建立條件變量來阻塞進程,直接通過pObjectProxy來發送RPC請求,請求信息會暫存在ReqInfoQueue中:
if(!pReqQ->push_back(msg,bEmpty)) { TLOGERROR("[TARS][ServantProxy::invoke msgQueue push_back error num:" << pSptd->_netSeq << "]" << endl); delete msg; msg = NULL; pObjProxy->getCommunicatorEpoll()->notify(pSptd->_reqQNo, pReqQ); throw TarsClientQueueException("client queue full"); } pObjProxy->getCommunicatorEpoll()->notify(pSptd->_reqQNo, pReqQ);
在之后,就不需要做任何的工作,退出層層函數調用,回到客戶端中,程序可以繼續執行其他動作。
異步調用的請求發送過程與同步調用的一致,都是在網絡線程中通過ObjectProxy去調用AdapterProxy來發送數據。但是在接收到響應之后,通過圖(1-15)可以看到,在函數AdapterProxy::finishInvoke(ReqMessage*)中,同步調用會通過msg->pMonitor->notify()喚醒客戶端的caller線程來接收響應包,而在異步調用中,則是如圖(1-19)所示,CommunicatorEpoll與AsyncProcThread的關系如圖(1-20)所示。
在函數AdapterProxy::finishInvoke(ReqMessage*)中,程序通過:
//異步回調,放入回調處理線程中 _objectProxy->getCommunicatorEpoll()->pushAsyncThreadQueue(msg);
將信息包msg(帶響應信息)放到異步回調處理線程中,在CommunicatorEpoll::pushAsyncThreadQueue()中,通過輪詢的方式選擇異步回調處理線程處理接收到的響應包,異步處理線程數默認是3,最大是1024。
void CommunicatorEpoll::pushAsyncThreadQueue(ReqMessage * msg) { //先不考慮每個線程隊列數目不一致的情況 _asyncThread[_asyncSeq]->push_back(msg); _asyncSeq ++; if(_asyncSeq == _asyncThreadNum) { _asyncSeq = 0; } }
選取之后,通過AsyncProcThread::push_back(),將msg包放在響應包隊列AsyncProcThread::_msgQueue中,然后通過AsyncProcThread:: notify()函數通知本異步回調處理線程進行處理,AsyncProcThread:: notify()函數可以令阻塞在AsyncProcThread:: run()中的AsyncProcThread::timedWait()的異步處理線程被喚醒。
在AsyncProcThread::run()中,主要執行下面的程序進行函數回調:
if (_msgQueue->pop_front(msg)) { ...... try { ReqMessagePtr msgPtr = msg; msg->callback->onDispatch(msgPtr); } catch (exception& e) { TLOGERROR("[TARS][AsyncProcThread exception]:" << e.what() << endl); } catch (...) { TLOGERROR("[TARS][AsyncProcThread exception.]" << endl); } }
通過msg->callback,程序可以調用回調函數基類StringServantPrxCallback里面的onDispatch()函數。在StringServantPrxCallback:: onDispatch()中,分析此次響應所對應的RPC方法名,獲取響應結果,并通過動態多態,執行用戶所定義好的派生類的虛函數。通過ReqMessagePtr的引用計數,還可以將ReqNessage* msg刪除掉,與同步調用不同,同步調用的msg的新建與刪除都在caller線程中,而異步調用的msg在caller線程上構造,在異步回調處理線程中析構。
“TARS C++客戶端是什么”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。