Golang 实现Thrift客户端连接池方式
1 前言
阅读文章之前,请先了解一下thrift相关知识。并没有提供客户端连接池的实现方案,而我们在实际使用时,thrift客户端必须复用,来保证较为可观的吞吐量,并避免在高qps调用情况下,不断的创建、释放客户端所带来的机器端口耗尽问题。
本文会详细讲解如何实现一个简单可靠的thrift客户端连接池,并通过对照实验来说明thrift客户端连接池所带来的好处。
由于篇幅的原因,本文只粘出关键代码,源代码请查看thrift client pool demo
1.1 运行环境
golang版本: go1.14.3 darwin/amd64
thrift golang库版本: 0.13.0
thrift idl编辑器版本: 0.13.0
1.2 .thrift文件
namespace java com.czl.api.thrift.model namespace cpp com.czl.api namespace php com.czl.api namespace py com.czl.api namespace js com.czl.apixianz namespace go com.czl.api struct apirequest { 1: required i16 id; } struct apiresponse{ 1:required string name; } // service1 service apiservice1{ apiresponse query(1:apirequest request) } // service2 service apiservice2{ apiresponse query(1:apirequest request) }
注:请通过安装thrift idl编译器,并生成客户端、服务端代码。
1.3 对照实验说明
通过脚本开启100个协程并发调用rpc服务10分钟,统计这段时间内,未使用thrift客户端连接池与使用客户端连接池服务的平均吞吐量、thrift api调用平均延迟、机器端口消耗等数据进行性能对比。
实验一: 未使用thrift客户端连接池
实验二: 使用thrift客户端连接池
2 thrift客户端连接池实现
2.1 连接池的功能
首先,我们要明确一下连接池的职责,这里我简单的总结一下,连接池主要功能是维护连接的创建、释放,通过缓存连接来复用连接,减少创建连接所带来的开销,提高系统的吞吐量,一般连接池还会有连接断开的重连机制、超时机制等。这里我们可以先定义出大部分连接池都会有的功能,只是定义,可以先不管每个功能的具体实现。每一个空闲thrift客户端其实底层都维护着一条空闲tcp连接,空闲thrift客户端与空闲连接在这里其实是同一个概念。
...... // thrift客户端创建方法,留给业务去实现 type thriftdial func(addr string, conntimeout time.duration) (*idleclient, error) // 关闭thrift客户端,留给业务实现 type thriftclientclose func(c *idleclient) error // thrift客户端连接池 type thriftpool struct { // thrift客户端创建逻辑,业务自己实现 dial thriftdial // thrift客户端关闭逻辑,业务自己实现 close thriftclientclose // 空闲客户端,用双端队列存储 idle list.list // 同步锁,确保count、status、idle等公共数据并发操作安全 lock *sync.mutex // 记录当前已经创建的thrift客户端,确保maxconn配置 count int32 // thrift客户端连接池状态,目前就open和stop两种 status uint32 // thrift客户端连接池相关配置 config *thriftpoolconfig } // 连接池配置 type thriftpoolconfig struct { // thrfit server端地址 addr string // 最大连接数 maxconn int32 // 创建连接超时时间 conntimeout time.duration // 空闲客户端超时时间,超时主动释放连接,关闭客户端 idletimeout time.duration // 获取thrift客户端超时时间 timeout time.duration // 获取thrift客户端失败重试间隔 interval time.duration } // thrift客户端 type idleclient struct { // thrift传输层,封装了底层连接建立、维护、关闭、数据读写等细节 transport thrift.ttransport // 真正的thrift客户端,业务创建传入 rawclient interface{} } // 封装了thrift客户端 type idleconn struct { // 空闲thrift客户端 c *idleclient // 最近一次放入空闲队列的时间 t time.time } // 获取thrift空闲客户端 func (p *thriftpool) get() (*idleclient, error) { // 1. 从空闲池中获取空闲客户端,获取到更新数据,返回,否则执行第2步 // 2. 创建新到thrift客户端,更新数据,返回thrift客户端 ...... } // 归还thrift客户端 func (p *thriftpool) put(client *idleclient) error { // 1. 如果客户端已经断开,更新数据,返回,否则执行第2步 // 2. 将thrift客户端丢进空闲连接池,更新数据,返回 ...... } // 超时管理,定期释放空闲太久的连接 func (p *thriftpool) checktimeout() { // 扫描空闲连接池,将空闲太久的连接主动释放掉,并更新数据 ...... } // 异常连接重连 func (p *thriftpool) reconnect(client *idleclient) (newclient *idleclient, err error) { // 1. 关闭旧客户端 // 2. 创建新的客户端,并返回 ...... } // 其他方法 ......
这里有两个关键的数据结构,thriftpool和idleclient,thriftpool负责实现整个连接池的功能,idleclient封装了真正的thrift客户端。
先看一下thriftpool的定义:
// thrift客户端创建方法,留给业务去实现 type thriftdial func(addr string, conntimeout time.duration) (*idleclient, error) // 关闭thrift客户端,留给业务实现 type thriftclientclose func(c *idleclient) error // thrift客户端连接池 type thriftpool struct { // thrift客户端创建逻辑,业务自己实现 dial thriftdial // thrift客户端关闭逻辑,业务自己实现 close thriftclientclose // 空闲客户端,用双端队列存储 idle list.list // 同步锁,确保count、status、idle等公共数据并发操作安全 lock *sync.mutex // 记录当前已经创建的thrift客户端,确保maxconn配置 count int32 // thrift客户端连接池状态,目前就open和stop两种 status uint32 // thrift客户端连接池相关配置 config *thriftpoolconfig } // 连接池配置 type thriftpoolconfig struct { // thrfit server端地址 addr string // 最大连接数 maxconn int32 // 创建连接超时时间 conntimeout time.duration // 空闲客户端超时时间,超时主动释放连接,关闭客户端 idletimeout time.duration // 获取thrift客户端超时时间 timeout time.duration // 获取thrift客户端失败重试间隔 interval time.duration }
thrift客户端创建与关闭,涉及到业务细节,这里抽离成dial方法和close方法。
连接池需要维护空闲客户端,这里用双端队列来存储。
一般的连接池,都应该支持最大连接数配置,maxconn可以配置连接池最大连接数,同时我们用count来记录连接池当前已经创建的连接。
为了实现连接池的超时管理,当然也得有相关超时配置。
连接池的状态、当前连接数等这些属性,是多协程并发操作的,这里用同步锁lock来确保并发操作安全。
在看一下idleclient实现:
// thrift客户端 type idleclient struct { // thrift传输层,封装了底层连接建立、维护、关闭、数据读写等细节 transport thrift.ttransport // 真正的thrift客户端,业务创建传入 rawclient interface{} } // 封装了thrift客户端 type idleconn struct { // 空闲thrift客户端 c *idleclient // 最近一次放入空闲队列的时间 t time.time }
rawclient是真正的thrift客户端,与实际逻辑相关。
transport thrift传输层,thrift传输层,封装了底层连接建立、维护、关闭、数据读写等细节。
idleconn封装了idleclient,用来实现空闲连接超时管理,idleconn记录一个时间,这个时间是thrift客户端最近一次被放入空闲队列的时间。
2.2 获取连接
...... var nowfunc = time.now ...... // 获取thrift空闲客户端 func (p *thriftpool) get() (*idleclient, error) { return p.get(nowfunc().add(p.config.timeout)) } // 获取连接的逻辑实现 // expire设定了一个超时时间点,当没有可用连接时,程序会休眠一小段时间后重试 // 如果一直获取不到连接,一旦到达超时时间点,则报errovermax错误 func (p *thriftpool) get(expire time.time) (*idleclient, error) { if atomic.loaduint32(&p.status) == poolstop { return nil, errpoolclosed } // 判断是否超额 p.lock.lock() if p.idle.len() == 0 && atomic.loadint32(&p.count) >= p.config.maxconn { p.lock.unlock() // 不采用递归的方式来实现重试机制,防止栈溢出,这里改用循环方式来实现重试 for { // 休眠一段时间再重试 time.sleep(p.config.interval) // 超时退出 if nowfunc().after(expire) { return nil, errovermax } p.lock.lock() if p.idle.len() == 0 && atomic.loadint32(&p.count) >= p.config.maxconn { p.lock.unlock() } else { // 有可用链接,退出for循环 break } } } if p.idle.len() == 0 { // 先加1,防止首次创建连接时,tcp握手太久,导致p.count未能及时+1,而新的请求已经到来 // 从而导致短暂性实际连接数大于p.count(大部分链接由于无法进入空闲链接队列,而被关闭,处于time_wati状态) atomic.addint32(&p.count, 1) p.lock.unlock() client, err := p.dial(p.config.addr, p.config.conntimeout) if err != nil { atomic.addint32(&p.count, -1) return nil, err } // 检查连接是否有效 if !client.check() { atomic.addint32(&p.count, -1) return nil, errsocketdisconnect } return client, nil } // 从队头中获取空闲连接 ele := p.idle.front() idlec := ele.value.(*idleconn) p.idle.remove(ele) p.lock.unlock() // 连接从空闲队列获取,可能已经关闭了,这里再重新检查一遍 if !idlec.c.check() { atomic.addint32(&p.count, -1) return nil, errsocketdisconnect } return idlec.c, nil }
p.get()的逻辑比较清晰:如果空闲队列没有连接,且当前连接已经到达p.config.maxconn,就休眠等待重试;当满足获取连接条件时p.idle.len() != 0 || atomic.loadint32(&p.count) < p.config.maxconn,有空闲连接,则返回空闲连接,减少创建连接的开销,没有的话,再重新创建一条新的连接。
这里有两个关键的地方需要注意:
等待重试的逻辑,不要用递归的方式来实现,防止运行栈溢出。
// 递归的方法实现等待重试逻辑 func (p *thriftpool) get(expire time.time) (*idleclient, error) { // 超时退出 if nowfunc().after(expire) { return nil, errovermax } if atomic.loaduint32(&p.status) == poolstop { return nil, errpoolclosed } // 判断是否超额 p.lock.lock() if p.idle.len() == 0 && atomic.loadint32(&p.count) >= p.config.maxconn { p.lock.unlock() // 休眠递归重试 time.sleep(p.config.interval) p.get(expire) } ....... }
注意p.lock.lock()的和p.lock.unlock()调用时机,确保公共数据并发操作安全。
2.3 释放连接
// 归还thrift客户端 func (p *thriftpool) put(client *idleclient) error { if client == nil { return nil } if atomic.loaduint32(&p.status) == poolstop { err := p.close(client) client = nil return err } if atomic.loadint32(&p.count) > p.config.maxconn || !client.check() { atomic.addint32(&p.count, -1) err := p.close(client) client = nil return err } p.lock.lock() p.idle.pushfront(&idleconn{ c: client, t: nowfunc(), }) p.lock.unlock() return nil }
p.put()逻辑也比较简单,如果连接已经失效,p.count需要-1,并进行连接关闭操作。否则丢到空闲队列里,这里还是丢到队头,没错,还是丢到队头,p.get()和p.put()都是从队头操作,有点像堆操作,为啥这么处理,等下面说到空闲连接超时管理就清楚了,这里先记住丢回空闲队列的时候,会更新空闲连接的时间。
2.4 超时管理
获取连接超时管理p.get()方法已经讲过了,创建连接超时管理由p.dial()去实现,下面说的是空闲连接的超时管理,空闲队列的连接,如果一直没有使用,超过一定时间,需要主动关闭掉,服务端的资源有限,不需要用的连接就主动关掉,而且连接放太久,服务端也会主动关掉。
// 超时管理,定期释放空闲太久的连接 func (p *thriftpool) checktimeout() { p.lock.lock() for p.idle.len() != 0 { ele := p.idle.back() if ele == nil { break } v := ele.value.(*idleconn) if v.t.add(p.config.idletimeout).after(nowfunc()) { break } //timeout && clear p.idle.remove(ele) p.lock.unlock() p.close(v.c) //close client connection atomic.addint32(&p.count, -1) p.lock.lock() } p.lock.unlock() return }
清理超时空闲连接的时候,是从队尾开始清理掉超时或者无效的连接,直到找到第一个可用的连接或者队列为空。p.get()和p.put()都从队头操作队列,保证了活跃的连接都在队头,如果一开始创建的连接太多,后面业务操作变少,不需要那么多连接的时候,那多余的连接就会沉到队尾,被超时管理所清理掉。另外,这样设计也可以优化操作的时间复杂度<o(n)。
2.5 重连机制
事实上,thrift的transport层并没有提供一个检查连接是否有效的方法,一开始实现连接池的时候,检测方法是调用thrift.ttransport.isopen()来判断
// 检测连接是否有效 func (c *idleclient) check() bool { if c.transport == nil || c.rawclient == nil { return false } return c.transport.isopen() }
可在测试阶段发现当底层当tcp连接被异常断开的时候(服务端重启、服务端宕机等),c.transport.isopen()并不能如期的返回false,如果我们查看thrift的源码,可以发现,其实c.transport.isopen()只和我们是否调用了c.transport.open()方法有关。为了能实现断开重连机制,我们只能在使用阶段发现异常连接时,重连连接。
这里我在thriftpool上封装了一层代理thriftpoolagent,来实现断开重连逻辑,具体请参考代码实现。
package pool import ( "fmt" "github.com/apache/thrift/lib/go/thrift" "log" "net" ) type thriftpoolagent struct { pool *thriftpool } func newthriftpoolagent() *thriftpoolagent { return &thriftpoolagent{} } func (a *thriftpoolagent) init(pool *thriftpool) { a.pool = pool } // 真正的业务逻辑放到do方法做,thriftpoolagent只要保证获取到可用的thrift客户端,然后传给do方法就行了 func (a *thriftpoolagent) do(do func(rawclient interface{}) error) error { var ( client *idleclient err error ) defer func() { if client != nil { if err == nil { if rerr := a.releaseclient(client); rerr != nil { log.println(fmt.sprintf("releaseclient error: %v", rerr)) } } else if _, ok := err.(net.error); ok { a.closeclient(client) } else if _, ok = err.(thrift.ttransportexception); ok { a.closeclient(client) } else { if rerr := a.releaseclient(client); rerr != nil { log.println(fmt.sprintf("releaseclient error: %v", rerr)) } } } }() // 从连接池里获取链接 client, err = a.getclient() if err != nil { return err } if err = do(client.rawclient); err != nil { if _, ok := err.(net.error); ok { log.println(fmt.sprintf("err: retry tcp, %t, %s", err, err.error())) // 网络错误,重建连接 client, err = a.reconnect(client) if err != nil { return err } return do(client.rawclient) } if _, ok := err.(thrift.ttransportexception); ok { log.println(fmt.sprintf("err: retry tcp, %t, %s", err, err.error())) // thrift传输层错误,也重建连接 client, err = a.reconnect(client) if err != nil { return err } return do(client.rawclient) } return err } return nil } // 获取连接 func (a *thriftpoolagent) getclient() (*idleclient, error) { return a.pool.get() } // 释放连接 func (a *thriftpoolagent) releaseclient(client *idleclient) error { return a.pool.put(client) } // 关闭有问题的连接,并重新创建一个新的连接 func (a *thriftpoolagent) reconnect(client *idleclient) (newclient *idleclient, err error) { return a.pool.reconnect(client) } // 关闭连接 func (a *thriftpoolagent) closeclient(client *idleclient) { a.pool.closeconn(client) } // 释放连接池 func (a *thriftpoolagent) release() { a.pool.release() } func (a *thriftpoolagent) getidlecount() uint32 { return a.pool.getidlecount() } func (a *thriftpoolagent) getconncount() int32 { return a.pool.getconncount() }
3 对照实验
启用100个协程,不断调用thrift服务端api 10分钟,对比服务平均吞吐量、thrift api调用平均延迟、机器端口消耗。
平均吞吐量(r/s) = 总成功数 / 600
api调用平均延迟(ms/r) = 总成功数 / api成功请求总耗时(微秒) / 1000
机器端口消耗计算:netstat -nt | grep 9444 -c
3.1 实验一:未使用连接池
机器端口消耗
平均吞吐量、平均延迟
从结果看,api的平均延迟在77ms左右,但是服务的平均吞吐量才到360,比理论值1000 / 77 * 1000 = 1299少了很多,而且有96409次错误,报错的主要原因是:connect can't assign request address,100个协程并发调用就已经消耗了1.6w个端口,如果并发数更高的场景,端口消耗的情况会更加严重,实际上,这1.6w条tcp连接,几乎都是time_wait状态,thrfit客户端用完就close掉,根据tcp三次握手可知主动断开连接的一方最终将会处于time_wait状态,并等待2msl时间。
3.2 实验二:使用连接池
机器端口消耗
平均吞吐量、平均延迟
可以看出,用了连接池后,平均吞吐量可达到1.8w,api调用平均延迟才0.5ms,你可能会问,理论吞吐量不是可以达到1000 / 0.5 * 100 = 20w?理论归理论,如果按照1.8w吞吐量算,一次处理过程总时间消耗是1000 / (18000 / 100) = 5.6ms,所以这里影响吞吐量的因素已经不是api调用的耗时了,1.8w的吞吐量其实已经挺不错了。
另外,消耗的端口数也才194/2 = 97(除余2是因为server端也在本地跑),而且都是establish状态,连接一直保持着,不断的在被复用。连接被复用,少了创建tcp连接的三次握手环节,这里也可以解释为啥api调用的平均延迟可以从77ms降到0.5ms,不过0.5ms确实有点低,线上环境server一般不会和client在同一台机器,而且业务逻辑也会比这里复杂,api调用的平均延迟会相对高一点。
4 总结
调用thrift api必须使用thrift客户端连接池,否则在高并发的情况下,会有大量的tcp连接处于time_wait状态,机器端口被大量消耗,可能会导致部分请求失败甚至服务不可用。每次请求都重新创建tcp连接,进行tcp三次握手环节,api调用的延迟会比较高,服务的吞吐量也不会很高。
使用thrift客户端连接池,可以提高系统的吞吐量,同时可以避免机器端口被耗尽的危险,提高服务的可靠性。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持。如有错误或未考虑完全的地方,望不吝赐教。