您好,登錄后才能下訂單哦!
本篇內容主要講解“以太坊怎么實現rpc功能”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“以太坊怎么實現rpc功能”吧!
JSON-RPC是區塊鏈外部調用的標配了。以太坊同樣也實現了這個功能。底層支持四種協議:InProc,IPC,HTTP,WEBSOCKED。上層除了常規的方法調用之外還實現了Pub/Sub功能。
api接口分布在各個模塊,主要分為兩種
1:直接code再Node中的幾個service(admin,web3j,debug etc)
2: 實現了Service接口的服務結構,已經注冊的服務會調用APIs()方法獲得其中的api。
//file go-ethereum/node/node.go func (n *Node) startRPC(services map[reflect.Type]Service) error { apis := n.apis() for _, service := range services { apis = append(apis, service.APIs()...) } }
node中寫死的接口
// node中寫死的接口 func (n *Node) apis() []rpc.API { return []rpc.API{ { Namespace: "admin", Version: "1.0", Service: NewPrivateAdminAPI(n), }, { Namespace: "admin", Version: "1.0", Service: NewPublicAdminAPI(n), Public: true, }, { Namespace: "debug", Version: "1.0", Service: debug.Handler, }, { Namespace: "debug", Version: "1.0", Service: NewPublicDebugAPI(n), Public: true, }, { Namespace: "web3", Version: "1.0", Service: NewPublicWeb3API(n), Public: true, }, } }
Ethereum 服務實現的APIs()接口 類似的還有其他的服務(dashboard,ethstats)
//Ethereum 服務實現的APIs()接口 func (s *Ethereum) APIs() []rpc.API { apis := ethapi.GetAPIs(s.ApiBackend) // Append any APIs exposed explicitly by the consensus engine apis = append(apis, s.engine.APIs(s.BlockChain())...) // Append all the local APIs and return return append(apis, []rpc.API{ { Namespace: "eth", Version: "1.0", Service: NewPublicEthereumAPI(s), Public: true, }, { Namespace: "eth", Version: "1.0", Service: NewPublicMinerAPI(s), Public: true, }, { Namespace: "eth", Version: "1.0", Service: downloader.NewPublicDownloaderAPI(s.protocolManager.downloader, s.eventMux), Public: true, }, { Namespace: "miner", Version: "1.0", Service: NewPrivateMinerAPI(s), Public: false, }, { Namespace: "eth", Version: "1.0", Service: filters.NewPublicFilterAPI(s.ApiBackend, false), Public: true, }, { Namespace: "admin", Version: "1.0", Service: NewPrivateAdminAPI(s), }, { Namespace: "debug", Version: "1.0", Service: NewPublicDebugAPI(s), Public: true, }, { Namespace: "debug", Version: "1.0", Service: NewPrivateDebugAPI(s.chainConfig, s), }, { Namespace: "net", Version: "1.0", Service: s.netRPCService, Public: true, }, }...) }
這里的Service只是類型,還要注冊到Server里面,原理就是反射出結構體里的類型,解析出函數方法名稱(轉小寫),參數名稱,返回類型等信息,最終每個合格的方法都會生成service實例
type service struct { name string // name for service typ reflect.Type // receiver type callbacks callbacks // registered handlers subscriptions subscriptions // available subscriptions/notifications }
//反射除Service Api的結構方法 //file go-ethereum/rpc/utils.go func suitableCallbacks(rcvr reflect.Value, typ reflect.Type) (callbacks, subscriptions) { callbacks := make(callbacks) subscriptions := make(subscriptions) METHODS: for m := 0; m < typ.NumMethod(); m++ { method := typ.Method(m) mtype := method.Type //轉小寫 mname := formatName(method.Name) if method.PkgPath != "" { // method must be exported continue } var h callback //訂閱事件類型判斷 主要根據簽名的入參第二位和返回參數第一位 h.isSubscribe = isPubSub(mtype) h.rcvr = rcvr h.method = method h.errPos = -1 firstArg := 1 numIn := mtype.NumIn() if numIn >= 2 && mtype.In(1) == contextType { h.hasCtx = true firstArg = 2 } if h.isSubscribe { //訂閱類型 h.argTypes = make([]reflect.Type, numIn-firstArg) // skip rcvr type for i := firstArg; i < numIn; i++ { argType := mtype.In(i) if isExportedOrBuiltinType(argType) { h.argTypes[i-firstArg] = argType } else { continue METHODS } } subscriptions[mname] = &h continue METHODS } // determine method arguments, ignore first arg since it's the receiver type // Arguments must be exported or builtin types h.argTypes = make([]reflect.Type, numIn-firstArg) for i := firstArg; i < numIn; i++ { argType := mtype.In(i) if !isExportedOrBuiltinType(argType) { continue METHODS } h.argTypes[i-firstArg] = argType } // check that all returned values are exported or builtin types for i := 0; i < mtype.NumOut(); i++ { if !isExportedOrBuiltinType(mtype.Out(i)) { continue METHODS } } // when a method returns an error it must be the last returned value h.errPos = -1 for i := 0; i < mtype.NumOut(); i++ { if isErrorType(mtype.Out(i)) { h.errPos = i break } } if h.errPos >= 0 && h.errPos != mtype.NumOut()-1 { continue METHODS } switch mtype.NumOut() { case 0, 1, 2: if mtype.NumOut() == 2 && h.errPos == -1 { // method must one return value and 1 error continue METHODS } callbacks[mname] = &h } } return callbacks, subscriptions }
底層支持了InProc,IPC,HTTP,WEBSOCKED 四種傳輸協議
1 InProc 直接生成RPCService實例,掛在Node上面可以直接調用。
2 IPC 監聽管道,收到消息后解析成ServerCodec對象,扔給Server的ServeCodec方法使用
//file ipc.go func (srv *Server) ServeListener(l net.Listener) error { for { conn, err := l.Accept() if netutil.IsTemporaryError(err) { log.Warn("RPC accept error", "err", err) continue } else if err != nil { return err } log.Trace("Accepted connection", "addr", conn.RemoteAddr()) go srv.ServeCodec(NewJSONCodec(conn), OptionMethodInvocation|OptionSubscriptions) } }
3 HTTP 生成兩個中間件,第二個中間件接收消息生成ServerCOdec,扔給Server的ServeSingleRequest方法
//file http.go func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Permit dumb empty requests for remote health-checks (AWS) if r.Method == http.MethodGet && r.ContentLength == 0 && r.URL.RawQuery == "" { return } if code, err := validateRequest(r); err != nil { http.Error(w, err.Error(), code) return } // All checks passed, create a codec that reads direct from the request body // untilEOF and writes the response to w and order the server to process a // single request. ctx := context.Background() ctx = context.WithValue(ctx, "remote", r.RemoteAddr) ctx = context.WithValue(ctx, "scheme", r.Proto) ctx = context.WithValue(ctx, "local", r.Host) body := io.LimitReader(r.Body, maxRequestContentLength) codec := NewJSONCodec(&httpReadWriteNopCloser{body, w}) defer codec.Close() w.Header().Set("content-type", contentType) srv.ServeSingleRequest(codec, OptionMethodInvocation, ctx) }
1 WEBSOCKED 與Http類型生成WebsocketHandler中間件,到消息后解析成ServerCodec對象,扔給Server的ServeCodec方法使用
//websocked.go func (srv *Server) WebsocketHandler(allowedOrigins []string) http.Handler { return websocket.Server{ Handshake: wsHandshakeValidator(allowedOrigins), Handler: func(conn *websocket.Conn) { // Create a custom encode/decode pair to enforce payload size and number encoding conn.MaxPayloadBytes = maxRequestContentLength encoder := func(v interface{}) error { return websocketJSONCodec.Send(conn, v) } decoder := func(v interface{}) error { return websocketJSONCodec.Receive(conn, v) } srv.ServeCodec(NewCodec(conn, encoder, decoder), OptionMethodInvocation|OptionSubscriptions) }, } }
上面四種協議再拿到ServerCodec對象后,會把這個對象傳遞到service的響應請數里面去。最終都是調到handle函數里面,handle里面再根據不同的類型進行響應。
func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverRequest) (interface{}, func()) { if req.err != nil { return codec.CreateErrorResponse(&req.id, req.err), nil } if req.isUnsubscribe { //取消訂閱功能 if len(req.args) >= 1 && req.args[0].Kind() == reflect.String { notifier, supported := NotifierFromContext(ctx) //獲取notifier對象 if !supported { // interface doesn't support subscriptions (e.g. http) return codec.CreateErrorResponse(&req.id, &callbackError{ErrNotificationsUnsupported.Error()}), nil } //取消訂閱 subid := ID(req.args[0].String()) if err := notifier.unsubscribe(subid); err != nil { return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil } return codec.CreateResponse(req.id, true), nil } return codec.CreateErrorResponse(&req.id, &invalidParamsError{"Expected subscription id as first argument"}), nil } if req.callb.isSubscribe { //訂閱功能 subid, err := s.createSubscription(ctx, codec, req) if err != nil { return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil } // active the subscription after the sub id was successfully sent to the client activateSub := func() { notifier, _ := NotifierFromContext(ctx) //獲取notifier對象 notifier.activate(subid, req.svcname) //訂閱事件 } return codec.CreateResponse(req.id, subid), activateSub } // regular RPC call, prepare arguments //參數生成 if len(req.args) != len(req.callb.argTypes) { rpcErr := &invalidParamsError{fmt.Sprintf("%s%s%s expects %d parameters, got %d", req.svcname, serviceMethodSeparator, req.callb.method.Name, len(req.callb.argTypes), len(req.args))} return codec.CreateErrorResponse(&req.id, rpcErr), nil } arguments := []reflect.Value{req.callb.rcvr} if req.callb.hasCtx { arguments = append(arguments, reflect.ValueOf(ctx)) } if len(req.args) > 0 { arguments = append(arguments, req.args...) } // execute RPC method and return result //執行對應的函數 reply := req.callb.method.Func.Call(arguments) if len(reply) == 0 { return codec.CreateResponse(req.id, nil), nil } //校驗結果 if req.callb.errPos >= 0 { // test if method returned an error if !reply[req.callb.errPos].IsNil() { e := reply[req.callb.errPos].Interface().(error) res := codec.CreateErrorResponse(&req.id, &callbackError{e.Error()}) return res, nil } } return codec.CreateResponse(req.id, reply[0].Interface()), nil }
Pub/sub 實現
底層在context綁定一個notifier對象
if options&OptionSubscriptions == OptionSubscriptions { ctx = context.WithValue(ctx, notifierKey{}, newNotifier(codec)) }
sub/unsub的時候會通過context.Value中拿notifier對象,調用上面的方法注冊或者取消注冊
func NotifierFromContext(ctx context.Context) (*Notifier, bool) { n, ok := ctx.Value(notifierKey{}).(*Notifier) return n, ok }
注冊
func (n *Notifier) activate(id ID, namespace string) { n.subMu.Lock() defer n.subMu.Unlock() if sub, found := n.inactive[id]; found { sub.namespace = namespace n.active[id] = sub delete(n.inactive, id) } }
注銷
func (n *Notifier) unsubscribe(id ID) error { n.subMu.Lock() defer n.subMu.Unlock() if s, found := n.active[id]; found { close(s.err) delete(n.active, id) return nil } return ErrSubscriptionNotFound }
消息事件觸發
func (api *PrivateAdminAPI) PeerEvents(ctx context.Context) (*rpc.Subscription, error) { // Make sure the server is running, fail otherwise server := api.node.Server() if server == nil { return nil, ErrNodeStopped } // Create the subscription //獲取notifier對象 notifier, supported := rpc.NotifierFromContext(ctx) if !supported { return nil, rpc.ErrNotificationsUnsupported } //生成標識 rpcSub := notifier.CreateSubscription() go func() { events := make(chan *p2p.PeerEvent) sub := server.SubscribeEvents(events) defer sub.Unsubscribe() for { select { case event := <-events: //觸發事件,發送通知消息 notifier.Notify(rpcSub.ID, event) case <-sub.Err(): return case <-rpcSub.Err(): return case <-notifier.Closed(): return } } }() return rpcSub, nil }
到此,相信大家對“以太坊怎么實現rpc功能”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。