欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

grpc 客户端连接请求源码

程序员文章站 2022-05-18 16:34:26
...
func main() {
	conn, err := grpc.Dial(":"+PORT, grpc.WithInsecure())
	...
	defer conn.Close()

	client := pb.NewSearchServiceClient(conn)
	resp, err := client.Search(context.Background(), &pb.SearchRequest{
		Request: "gRPC",
	})
	...
}

创建拨号连接

func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
	cc := &ClientConn{
		target:            target,
		csMgr:             &connectivityStateManager{},
		conns:             make(map[*addrConn]struct{}),
		dopts:             defaultDialOptions(),
		blockingpicker:    newPickerWrapper(),
		czData:            new(channelzData),
		firstResolveEvent: grpcsync.NewEvent(),
	}
	...
	chainUnaryClientInterceptors(cc)
	chainStreamClientInterceptors(cc)

grpc.Dial 方法实际上是对于 grpc.DialContext 的封装,区别在于 ctx 是直接传入 context.Background。其主要功能是创建与给定目标的客户端连接,其承担了以下职责

  1. 初始化 ClientConn
  2. 初始化(基于进程 LB)负载均衡配置
  3. 初始化 channelz
  4. 初始化重试规则和客户端一元/流式拦截器
  5. 初始化协议栈上的基础信息
  6. 相关 context 的超时控制
  7. 初始化并解析地址信息
  8. 创建与服务端之间的连接

grpc.Dial后到底连接了没

之前听到有的人说调用 grpc.Dial 后客户端就已经与服务端建立起了连接,但这对不对呢?我们先鸟瞰全貌,看看正在跑的 goroutine。如下:
grpc 客户端连接请求源码
我们可以有几个核心方法一直在等待/处理信号,通过分析底层源码可得知。涉及如下:

func (ac *addrConn) connect()
func (ac *addrConn) resetTransport()
func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time)
func (ac *addrConn) getReadyTransport()

在这里主要分析 goroutine 提示的 resetTransport 方法,看看都做了啥。核心代码如下:

func (ac *addrConn) resetTransport() {
	for i := 0; ; i++ {
		if ac.state == connectivity.Shutdown {
			return
		}
		...
		connectDeadline := time.Now().Add(dialDuration)
		ac.updateConnectivityState(connectivity.Connecting)
		newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
		if err != nil {
			if ac.state == connectivity.Shutdown {
				return
			}
			ac.updateConnectivityState(connectivity.TransientFailure)
			timer := time.NewTimer(backoffFor)
			select {
			case <-timer.C:
				...
			}
			continue
		}

		if ac.state == connectivity.Shutdown {
			newTr.Close()
			return
		}
		...
		if !healthcheckManagingState {
			ac.updateConnectivityState(connectivity.Ready)
		}
		...

		if ac.state == connectivity.Shutdown {
			return
		}
		ac.updateConnectivityState(connectivity.TransientFailure)
	}
}

在该方法中会不断地去尝试创建连接,若成功则结束。否则不断地根据 Backoff 算法的重试机制去尝试创建连接,直到成功为止。从结论上来讲,单纯调用 DialContext 是异步建立连接的,也就是并不是马上生效,处于 Connecting 状态,而正式下要到达 Ready 状态才可用。

真的连接了吗

grpc 客户端连接请求源码
在抓包工具上提示一个包都没有,那么这算真正连接了吗?我认为这是一个表述问题,我们应该尽可能的严谨。如果你真的想通过 DialContext 方法就打通与服务端的连接,则需要调用 WithBlock 方法,虽然会导致阻塞等待,但最终连接会到达 Ready 状态(握手成功)。如下图:
grpc 客户端连接请求源码

实例化 Service API

type SearchServiceClient interface {
	Search(ctx context.Context, in *SearchRequest, opts ...grpc.CallOption) (*SearchResponse, error)
}

type searchServiceClient struct {
	cc *grpc.ClientConn
}

func NewSearchServiceClient(cc *grpc.ClientConn) SearchServiceClient {
	return &searchServiceClient{cc}
}

调用

// search.pb.go
func (c *searchServiceClient) Search(ctx context.Context, in *SearchRequest, opts ...grpc.CallOption) (*SearchResponse, error) {
	out := new(SearchResponse)
	err := c.cc.Invoke(ctx, "/proto.SearchService/Search", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

proto 生成的 RPC 方法更像是一个包装盒,把需要的东西放进去,而实际上调用的还是 grpc.invoke 方法。如下:

func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
	cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
	if err != nil {
		return err
	}
	if err := cs.SendMsg(req); err != nil {
		return err
	}
	return cs.RecvMsg(reply)
}

通过概览,可以关注到三块调用。如下:

  1. newClientStream:获取传输层 Trasport 并组合封装到 ClientStream 中返回,在这块会涉及负载均衡、超时控制、 Encoding、 Stream 的动作,与服务端基本一致的行为。
  2. cs.SendMsg:发送 RPC 请求出去,但其并不承担等待响应的功能。
  3. cs.RecvMsg:阻塞等待接受到的 RPC 方法响应结果。

连接

// clientconn.go
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
	t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{
		FullMethodName: method,
	})
	if err != nil {
		return nil, nil, toRPCErr(err)
	}
	return t, done, nil
}

在 newClientStream 方法中,我们通过 getTransport 方法获取了 Transport 层中抽象出来的 ClientTransport 和 ServerTransport,实际上就是获取一个连接给后续 RPC 调用传输使用。

关闭连接

func (cc *ClientConn) Close() error {
	defer cc.cancel()
    ...
	cc.csMgr.updateState(connectivity.Shutdown)
    ...
	cc.blockingpicker.close()
	if rWrapper != nil {
		rWrapper.close()
	}
	if bWrapper != nil {
		bWrapper.close()
	}

	for ac := range conns {
		ac.tearDown(ErrClientConnClosing)
	}
	if channelz.IsOn() {
		...
		channelz.AddTraceEvent(cc.channelzID, ted)
		channelz.RemoveEntry(cc.channelzID)
	}
	return nil
}

该方法会取消 ClientConn 上下文,同时关闭所有底层传输。涉及如下:

  1. Context Cancel
  2. 清空并关闭客户端连接
  3. 清空并关闭解析器连接
  4. 清空并关闭负载均衡连接
  5. 添加跟踪引用
  6. 移除当前通道信息

上一篇: 运算符重载

下一篇: 运算符重载