欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

rpcx-Server端实现

程序员文章站 2022-07-08 23:28:14
...

一.代码介绍

1.1 server结构

rpcx-Server端实现

// Server is rpcx server that use TCP or UDP.
type Server struct {
	ln                 net.Listener   //监听
	readTimeout        time.Duration  //读取client数据的超时时间
	writeTimeout       time.Duration  //写入client数据的超时时间
	gatewayHTTPServer  *http.Server   
	DisableHTTPGateway bool //使用HTTP网关
	DisableJSONRPC     bool //使用json-rpc

	serviceMapMu sync.RWMutex
	serviceMap   map[string]*service   //server端提供的service的记录表

	mu         sync.RWMutex            
	activeConn map[net.Conn]struct{}   // 存活的连接
	doneChan   chan struct{}           // server完成管道
	seq        uint64                  // server端编号

	inShutdown int32
	onShutdown []func(s *Server)      //禁止一个套接字的IO

	// TLSConfig for creating tls tcp connection.
	tlsConfig *tls.Config            // tcp连接的配置
	// BlockCrypt for kcp.BlockCrypt
	options map[string]interface{}  //kip协议时提供的一些限制

	// CORS options
	corsOptions *CORSOptions

	Plugins PluginContainer  //插件管理,通过实现插件注册插件,增加server的特性

	// AuthFunc can be used to auth.
	AuthFunc func(ctx context.Context, req *protocol.Message, token string) error //认证

	handlerMsgNum int32     //处理消息量
}

1.2 server 启动

func (s *Server) Serve(network, address string) (err error) {
	s.startShutdownListener()
	var ln net.Listener
	ln, err = s.makeListener(network, address)
	if err != nil {
		return
	}

	if network == "http" {
		s.serveByHTTP(ln, "")
		return nil
	}

	// try to start gateway
	ln = s.startGateway(network, ln)

	return s.serveListener(ln)
}
  • 创建监听。
  • 根据协议启动serve, 如果是HTTP协议走serveByHTTP函数;如果是TCP协议开启网关,走serveListener函数。
  • 注意:这个过程是阻塞的。

1.首先看一下serveByHTTP函数:

func (s *Server) serveByHTTP(ln net.Listener, rpcPath string) {
	s.ln = ln

	if rpcPath == "" {
		rpcPath = share.DefaultRPCPath //rpc为空,给一个默认路径
	}
	http.Handle(rpcPath, s)
	srv := &http.Server{Handler: nil} //构建server
	srv.Serve(ln)
}
  • 判断rpcPath, 为空赋值"/_rpcx_"。
  • 根据rpcPath,注册handler。
  • 构建server,启动http server。

2.再看一下serveListener函数:

func (s *Server) serveListener(ln net.Listener) error {

	var tempDelay time.Duration

	s.mu.Lock()
	s.ln = ln
	s.mu.Unlock()

	for {
		conn, e := ln.Accept()   //获取socket连接,epoll
		if e != nil {
			select {
			case <-s.getDoneChan():
				return ErrServerClosed //收到关闭信号,退出关闭
			default:
			}

			if ne, ok := e.(net.Error); ok && ne.Temporary() { //网络错误,延迟重新建立连接
				if tempDelay == 0 {
					tempDelay = 5 * time.Millisecond  //5ms
				} else {
					tempDelay *= 2 //设置延迟时间,开始递增,避免无意义的重试
				}

				if max := 1 * time.Second; tempDelay > max {
					tempDelay = max
				} //限制了最大重试时间在1s

				log.Errorf("rpcx: Accept error: %v; retrying in %v", e, tempDelay)
				time.Sleep(tempDelay)
				continue
			}
			return e
		}
        //如果没有网络错误,下次网络错误的重试时间重新开始
		tempDelay = 0

		if tc, ok := conn.(*net.TCPConn); ok { //取出TCP连接
			tc.SetKeepAlive(true)//设置TCP保持长连接
			tc.SetKeepAlivePeriod(3 * time.Minute)设置TCP探测时间间隔时间为3分钟,如果客户端3分钟没有和服务端通信,则开始探测
			tc.SetLinger(10)
		}

		conn, ok := s.Plugins.DoPostConnAccept(conn)
		if !ok {
			closeChannel(s, conn)
			continue
		}

		s.mu.Lock()
		s.activeConn[conn] = struct{}{} //空struct{}占位,连接不可用
		s.mu.Unlock()

		go s.serveConn(conn)//协程处理连接
	}
}

二.类图

rpcx-Server端实现

 

相关标签: 分布式与微服务