rpcx-Server端实现
程序员文章站
2022-07-08 23:28:14
...
一.代码介绍
1.1 server结构
// Server is rpcx server that use TCP or UDP.
type Server struct {
ln net.Listener //监听
readTimeout time.Duration //读取client数据的超时时间
writeTimeout time.Duration //写入client数据的超时时间
gatewayHTTPServer *http.Server
DisableHTTPGateway bool //使用HTTP网关
DisableJSONRPC bool //使用json-rpc
serviceMapMu sync.RWMutex
serviceMap map[string]*service //server端提供的service的记录表
mu sync.RWMutex
activeConn map[net.Conn]struct{} // 存活的连接
doneChan chan struct{} // server完成管道
seq uint64 // server端编号
inShutdown int32
onShutdown []func(s *Server) //禁止一个套接字的IO
// TLSConfig for creating tls tcp connection.
tlsConfig *tls.Config // tcp连接的配置
// BlockCrypt for kcp.BlockCrypt
options map[string]interface{} //kip协议时提供的一些限制
// CORS options
corsOptions *CORSOptions
Plugins PluginContainer //插件管理,通过实现插件注册插件,增加server的特性
// AuthFunc can be used to auth.
AuthFunc func(ctx context.Context, req *protocol.Message, token string) error //认证
handlerMsgNum int32 //处理消息量
}
1.2 server 启动
func (s *Server) Serve(network, address string) (err error) {
s.startShutdownListener()
var ln net.Listener
ln, err = s.makeListener(network, address)
if err != nil {
return
}
if network == "http" {
s.serveByHTTP(ln, "")
return nil
}
// try to start gateway
ln = s.startGateway(network, ln)
return s.serveListener(ln)
}
- 创建监听。
- 根据协议启动serve, 如果是HTTP协议走serveByHTTP函数;如果是TCP协议开启网关,走serveListener函数。
- 注意:这个过程是阻塞的。
1.首先看一下serveByHTTP函数:
func (s *Server) serveByHTTP(ln net.Listener, rpcPath string) {
s.ln = ln
if rpcPath == "" {
rpcPath = share.DefaultRPCPath //rpc为空,给一个默认路径
}
http.Handle(rpcPath, s)
srv := &http.Server{Handler: nil} //构建server
srv.Serve(ln)
}
- 判断rpcPath, 为空赋值"/_rpcx_"。
- 根据rpcPath,注册handler。
- 构建server,启动http server。
2.再看一下serveListener函数:
func (s *Server) serveListener(ln net.Listener) error {
var tempDelay time.Duration
s.mu.Lock()
s.ln = ln
s.mu.Unlock()
for {
conn, e := ln.Accept() //获取socket连接,epoll
if e != nil {
select {
case <-s.getDoneChan():
return ErrServerClosed //收到关闭信号,退出关闭
default:
}
if ne, ok := e.(net.Error); ok && ne.Temporary() { //网络错误,延迟重新建立连接
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond //5ms
} else {
tempDelay *= 2 //设置延迟时间,开始递增,避免无意义的重试
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
} //限制了最大重试时间在1s
log.Errorf("rpcx: Accept error: %v; retrying in %v", e, tempDelay)
time.Sleep(tempDelay)
continue
}
return e
}
//如果没有网络错误,下次网络错误的重试时间重新开始
tempDelay = 0
if tc, ok := conn.(*net.TCPConn); ok { //取出TCP连接
tc.SetKeepAlive(true)//设置TCP保持长连接
tc.SetKeepAlivePeriod(3 * time.Minute)设置TCP探测时间间隔时间为3分钟,如果客户端3分钟没有和服务端通信,则开始探测
tc.SetLinger(10)
}
conn, ok := s.Plugins.DoPostConnAccept(conn)
if !ok {
closeChannel(s, conn)
continue
}
s.mu.Lock()
s.activeConn[conn] = struct{}{} //空struct{}占位,连接不可用
s.mu.Unlock()
go s.serveConn(conn)//协程处理连接
}
}
二.类图