上一篇文章我们讲了net/rpc中client部分的实现,我本机源码安装路径在/usr/local/go,这net/rpc(golang 1.4版本)涉及到的相关代码主要有:
server.go
因为从client我们知道是复用的socket来实现并发调用rpc方法,我们先从方法注册来看源码部分:
// Server对象大都是保存方法存根,保证对象互斥的 type Server struct { mu sync.RWMutex // protects the serviceMap serviceMap map[string]*service reqLock syncMutex// protects freeReq freeReq *Request respLock sync// protects freeResp freeResp Response} func NewServer()return&ServerserviceMap: make(mapservice)}// rpc.Register默认使用了一个Server,只对serviceMap进行了初始化 var DefaultServer=// rpc的service包括方法名、方法反射,类型等 type service name string// name of service rcvr reflectValue// receiver of methods for the service typ reflectType// type of the receiver method mapmethodType // registered methods // 无论是RegisterName、Register最终都调用了register的内部方法 func server )registerrcvr interface{},,0);"> useName bool error // 保证注册服务安全,先加锁 server.muLock defer serverUnlock// 如果服务为空,默认注册一个 ifserviceMap ==nil// 获取注册服务的反射信息 s :=new styp reflectTypeOfrcvrrcvr ValueOf// 可以使用自定义名称 sname Indirects).Type().Name useName name """rpc.Register: no service name for type "+typString logPrint errorsNew// 方法必须是暴露的,既服务名首字符大写 !isExportedsname&&useName "rpc.Register: type "" is not exported"// 不允许重复注册 _ present [];("rpc: service already defined: " snamename sname // 开始注册rpc struct内部的方法存根 method suitableMethodstrue// 如果struct内部一个方法也没,那么直接报错,错误信息还非常详细 lenmethod 0 str // To help the user,see if a pointer receiver would work. method reflectPtrTo),136);">false!=" has no exported methods of suitable type (hint: pass a pointer to value of that type)"else" has no exported methods of suitable type"str// 保存在server的serviceMap中 name] s // 上文提到了服务还需要方法存根的注册 func suitableMethodstyp reflect reportErr mapmethodType // 根据方法名创建保存内部方法map methods methodType// 获取rpc struct内部的方法 for m 0;< typNumMethod(); m++Methodm mtype method mname Name// 之前对这行代码觉得比较奇葩,方法是否是暴露,是看是否有PkgPath的,如果是私有方法,PkgPath显示包名 PkgPathcontinue// 判断是否是三个参数:第一个是结构本身,第二个是参数,第三个是返回值 // Method needs three ins: receiver,*args,*reply. mtypeNumIn3 reportErr Println"method" mname"has wrong number of ins:"())// args是指针类型 // First arg need not be a pointer. argType In(1isExportedOrBuiltinTypeargTypemname"argument type not exported:" argType// reply是指针类型 // Second arg must be a pointer. replyType 2 replyTypeKindPtr"reply type not a pointer:"// Reply type must be exported. // reply必须是可暴露的 replyType"reply type not exported:"// Method needs one out. // 必须有一个返回值,而且要是error NumOut1"has wrong number of outs:"// The return type of the method must be error. returnType Out); typeOfError "returns" returnType(),0);">"not error" methods&ArgTypeReplyType}
请求调用:
方法已经被注册成功,接下来我们看看是如何客户端发送请求调用的:
func Acceptlis netListener conn err lisFatal"rpc.Serve: accept:" errError// TODO(r): exit? // accept连接以后,打开一个goroutine处理请求 go serverServeConnconnconn ioReadWriteCloser buf bufioNewWriter srv gobServerCodec rwc dec gobNewDecoder encNewEncoderbuf encBuf buf// 根据指定的codec进行协议解析 ServeCodecsrvcodec ServerCodec sending syncMutex// 解析请求 service req argv replyv keepReadingreadRequestcodec debugLog ioEOF "rpc:"keepReading break// send a response if we actually managed to read a header. // 如果当前请求错误了,我们应该返回信息,然后继续处理 req sendResponsesending invalidRequest codecfreeRequestreq// 因为需要继续处理后续请求,所以开一个gorutine处理rpc方法 go servicecallserver sending// 如果连接关闭了需要释放资源 Close readRequestHeaderservice *Request keepReading err error// 解析头部,如果失败,直接返回了 getRequestReadRequestHeader||ErrUnexpectedEOF"rpc: server cannot decode request: "Printf"rpc: [trace:%v]\n"Tracer// We read the header successfully. If we see an error now,0);">// we can still recover and move on to the next request. keepReading true// 获取请求中xxx.xxx中.的位置 dot stringsLastIndexServiceMethod".""rpc: service/method request ill-formed: "// 拿到struct名字和方法名字 serviceName [:dot methodName +:]// Look up the request.// 加读锁,获取对象 RLock service serviceNameRUnlock"rpc: can't find service "// 获取反射类型,看见rpc中的发射其实是预先放入map中的 methodName"rpc: can't find method " readRequest replyv reflectValuereadRequestHeader// discard body ReadRequestBody(nil// 解析请求中的args argIsValue false// if true,need to indirect before calling. argv mtypeElem// argv guaranteed to be a pointer now. argvInterface());// 初始化reply类型 replyv s call codec numCallsfunctionFunc// Invoke the method,providing a new value for the reply. // 这里是真正调用rpc方法的地方 returnValues functionCall([]})// The return value for the method is an error. errInter returnValues[]. errmsg errInter.(error// 处理返回请求了 errmsg sendResponsesending reply errmsg resp getResponse// Encode the response header respServiceMethodError errmsg reply invalidRequest // 上一文提到,客户端是根据序号来定位请求的,所以需要原样返回 SeqWriteResponseresp reply"rpc: writing response:"freeResponse
资源重用:
上面把大致的rpc请求都说明了,server有一个技巧是重用对象,这里使用的是链表方式处理的:
// 可以看出使用一个free list链表,来避免Request以及Response对象频繁创建,导致GC压力 getRequestreqLockfreeReq (freeReq .nextreq {} freeRequestfreeReq server req server getResponserespLockfreeResp ResponsefreeResp resp freeResponsefreeResp server resp server 最后,sending这把锁的目的是避免同一个套接字快速请求中避免返回包写入乱序,因此避免一个包完整写入完毕才允许下一个返回写入套接字。通过rpc包源码解析,可以看到标准库中的核心思想还是channel+mutex实现复用对象,以及各种方式的复用,避免GC压力,在我们以后写高性能服务端可以借鉴的地方。###########################
本文来自:猎豹移动技术博客
感谢作者:毛,剑