欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

golang sql连接池的实现方法详解

程序员文章站 2022-03-07 08:24:23
前言 golang的”database/sql”是操作数据库时常用的包,这个包定义了一些sql操作的接口,具体的实现还需要不同数据库的实现,mysql比较优秀的一个驱动是...

前言

golang的”database/sql”是操作数据库时常用的包,这个包定义了一些sql操作的接口,具体的实现还需要不同数据库的实现,mysql比较优秀的一个驱动是:github.com/go-sql-driver/mysql,在接口、驱动的设计上”database/sql”的实现非常优秀,对于类似设计有很多值得我们借鉴的地方,比如beego框架cache的实现模式就是借鉴了这个包的实现;”database/sql”除了定义接口外还有一个重要的功能:连接池,我们在实现其他网络通信时也可以借鉴其实现。

连接池的作用这里就不再多说了,我们先从一个简单的示例看下”database/sql”怎么用:

package main

import(
 "fmt"
 "database/sql"
 _ "github.com/go-sql-driver/mysql"
)

func main(){

 db, err := sql.open("mysql", "username:password@tcp(host)/db_name?charset=utf8&allowoldpasswords=1")
 if err != nil {
  fmt.println(err)
  return
 }
 defer db.close()

 rows,err := db.query("select * from test")

 for rows.next(){
  //row.scan(...)
 }
 rows.close()
}

用法很简单,首先open打开一个数据库,然后调用query、exec执行数据库操作,github.com/go-sql-driver/mysql具体实现了database/sql/driver的接口,所以最终具体的数据库操作都是调用github.com/go-sql-driver/mysql实现的方法,同一个数据库只需要调用一次open即可,下面根据具体的操作分析下”database/sql”都干了哪些事。

1.驱动注册

import _ "github.com/go-sql-driver/mysql"前面的”_”作用时不需要把该包都导进来,只执行包的init()方法,mysql驱动正是通过这种方式注册到”database/sql”中的:

//github.com/go-sql-driver/mysql/driver.go
func init() {
 sql.register("mysql", &mysqldriver{})
}

type mysqldriver struct{}

func (d mysqldriver) open(dsn string) (driver.conn, error) {
 ...
}

init()通过register()方法将mysql驱动添加到sql.drivers(类型:make(map[string]driver.driver))中,mysqldriver实现了driver.driver接口:

//database/sql/sql.go
func register(name string, driver driver.driver) {
 driversmu.lock()
 defer driversmu.unlock()
 if driver == nil {
  panic("sql: register driver is nil")
 }
 if _, dup := drivers[name]; dup {
  panic("sql: register called twice for driver " + name)
 }
 drivers[name] = driver
}

//database/sql/driver/driver.go
type driver interface {
 // open returns a new connection to the database.
 // the name is a string in a driver-specific format.
 //
 // open may return a cached connection (one previously
 // closed), but doing so is unnecessary; the sql package
 // maintains a pool of idle connections for efficient re-use.
 //
 // the returned connection is only used by one goroutine at a
 // time.
 open(name string) (conn, error)
}

假如我们同时用到多种数据库,就可以通过调用sql.register将不同数据库的实现注册到sql.drivers中去,用的时候再根据注册的name将对应的driver取出。

2.连接池实现

先看下连接池整体处理流程:

golang sql连接池的实现方法详解

2.1 初始化db

db, err := sql.open("mysql", "username:password@tcp(host)/db_name?charset=utf8&allowoldpasswords=1")

sql.open()是取出对应的db,这时mysql还没有建立连接,只是初始化了一个sql.db结构,这是非常重要的一个结构,所有相关的数据都保存在此结构中;open同时启动了一个connectionopener协程,后面再具体分析其作用。

type db struct {
 driver driver.driver //数据库实现驱动
 dsn string //数据库连接、配置参数信息,比如username、host、password等
 numclosed uint64

 mu   sync.mutex   //锁,操作db各成员时用到
 freeconn  []*driverconn  //空闲连接
 connrequests []chan connrequest //阻塞请求队列,等连接数达到最大限制时,后续请求将插入此队列等待可用连接
 numopen  int     //已建立连接或等待建立连接数
 openerch chan struct{}  //用于connectionopener
 closed  bool
 dep   map[finalcloser]depset
 lastput  map[*driverconn]string // stacktrace of last conn's put; debug only
 maxidle  int     //最大空闲连接数
 maxopen  int     //数据库最大连接数
 maxlifetime time.duration   //连接最长存活期,超过这个时间连接将不再被复用
 cleanerch chan struct{}
}

maxidle(默认值2)、maxopen(默认值0,无限制)、maxlifetime(默认值0,永不过期)可以分别通过setmaxidleconns、setmaxopenconns、setconnmaxlifetime设定。

2.2 获取连接

上面说了open时是没有建立数据库连接的,只有等用的时候才会实际建立连接,获取可用连接的操作有两种策略:cachedornewconn(有可用空闲连接则优先使用,没有则创建)、alwaysnewconn(不管有没有空闲连接都重新创建),下面以一个query的例子看下具体的操作:

rows, err := db.query("select * from test")

database/sql/sql.go:

func (db *db) query(query string, args ...interface{}) (*rows, error) {
 var rows *rows
 var err error
 //maxbadconnretries = 2
 for i := 0; i < maxbadconnretries; i++ {
  rows, err = db.query(query, args, cachedornewconn)
  if err != driver.errbadconn {
   break
  }
 }
 if err == driver.errbadconn {
  return db.query(query, args, alwaysnewconn)
 }
 return rows, err
}

func (db *db) query(query string, args []interface{}, strategy connreusestrategy) (*rows, error) {
 ci, err := db.conn(strategy)
 if err != nil {
  return nil, err
 }

 //到这已经获取到了可用连接,下面进行具体的数据库操作
 return db.queryconn(ci, ci.releaseconn, query, args)
}

数据库连接由db.query()获取:

func (db *db) conn(strategy connreusestrategy) (*driverconn, error) {
 db.mu.lock()
 if db.closed {
  db.mu.unlock()
  return nil, errdbclosed
 }
 lifetime := db.maxlifetime

 //从freeconn取一个空闲连接
 numfree := len(db.freeconn)
 if strategy == cachedornewconn && numfree > 0 {
  conn := db.freeconn[0]
  copy(db.freeconn, db.freeconn[1:])
  db.freeconn = db.freeconn[:numfree-1]
  conn.inuse = true
  db.mu.unlock()
  if conn.expired(lifetime) {
   conn.close()
   return nil, driver.errbadconn
  }
  return conn, nil
 }

 //如果没有空闲连接,而且当前建立的连接数已经达到最大限制则将请求加入connrequests队列,
 //并阻塞在这里,直到其它协程将占用的连接释放或connectionopenner创建
 if db.maxopen > 0 && db.numopen >= db.maxopen {
  // make the connrequest channel. it's buffered so that the
  // connectionopener doesn't block while waiting for the req to be read.
  req := make(chan connrequest, 1)
  db.connrequests = append(db.connrequests, req)
  db.mu.unlock()
  ret, ok := <-req //阻塞
  if !ok {
   return nil, errdbclosed
  }
  if ret.err == nil && ret.conn.expired(lifetime) { //连接过期了
   ret.conn.close()
   return nil, driver.errbadconn
  }
  return ret.conn, ret.err
 }

 db.numopen++ //上面说了numopen是已经建立或即将建立连接数,这里还没有建立连接,只是乐观的认为后面会成功,失败的时候再将此值减1
 db.mu.unlock()
 ci, err := db.driver.open(db.dsn) //调用driver的open方法建立连接
 if err != nil { //创建连接失败
  db.mu.lock()
  db.numopen-- // correct for earlier optimism
  db.maybeopennewconnections() //通知connectionopener协程尝试重新建立连接,否则在db.connrequests中等待的请求将一直阻塞,知道下次有连接建立
  db.mu.unlock()
  return nil, err
 }
 db.mu.lock()
 dc := &driverconn{
  db:  db,
  createdat: nowfunc(),
  ci:  ci,
 }
 db.adddeplocked(dc, dc)
 dc.inuse = true
 db.mu.unlock()
 return dc, nil
}

总结一下上面获取连接的过程:

* step1:首先检查下freeconn里是否有空闲连接,如果有且未超时则直接复用,返回连接,如果没有或连接已经过期则进入下一步;

* step2:检查当前已经建立及准备建立的连接数是否已经达到最大值,如果达到最大值也就意味着无法再创建新的连接了,当前请求需要在这等着连接释放,这时当前协程将创建一个channel:chan connrequest,并将其插入db.connrequests队列,然后阻塞在接收chan connrequest上,等到有连接可用时这里将拿到释放的连接,检查可用后返回;如果还未达到最大值则进入下一步;

* step3:创建一个连接,首先将numopen加1,然后再创建连接,如果等到创建完连接再把numopen加1会导致多个协程同时创建连接时一部分会浪费,所以提前将numopen占住,创建失败再将其减掉;如果创建连接成功则返回连接,失败则进入下一步

* step4:创建连接失败时有一个善后操作,当然并不仅仅是将最初占用的numopen数减掉,更重要的一个操作是通知connectionopener协程根据db.connrequests等待的长度创建连接,这个操作的原因是:

numopen在连接成功创建前就加了1,这时候如果numopen已经达到最大值再有获取conn的请求将阻塞在step2,这些请求会等着先前进来的请求释放连接,假设先前进来的这些请求创建连接全部失败,那么如果它们直接返回了那些等待的请求将一直阻塞在那,因为不可能有连接释放(极限值,如果部分创建成功则会有部分释放),直到新请求进来重新成功创建连接,显然这样是有问题的,所以maybeopennewconnections将通知connectionopener根据db.connrequests长度及可创建的最大连接数重新创建连接,然后将新创建的连接发给阻塞的请求。

注意:如果maxopen=0将不会有请求阻塞等待连接,所有请求只要从freeconn中取不到连接就会新创建。

另外query、exec有个重试机制,首先优先使用空闲连接,如果2次取到的连接都无效则尝试新创建连接。

获取到可用连接后将调用具体数据库的driver处理sql。

2.3 释放连接

数据库连接在被使用完成后需要归还给连接池以供其它请求复用,释放连接的操作是:putconn():

func (db *db) putconn(dc *driverconn, err error) {
 ...

 //如果连接已经无效,则不再放入连接池
 if err == driver.errbadconn {
  db.maybeopennewconnections()
  dc.close() //这里最终将numopen数减掉
  return
 }
 ...

 //正常归还
 added := db.putconndblocked(dc, nil)
 ...
}

func (db *db) putconndblocked(dc *driverconn, err error) bool {
 if db.maxopen > 0 && db.numopen > db.maxopen {
  return false
 }
 //有等待连接的请求则将连接发给它们,否则放入freeconn
 if c := len(db.connrequests); c > 0 {
  req := db.connrequests[0]
  // this copy is o(n) but in practice faster than a linked list.
  // todo: consider compacting it down less often and
  // moving the base instead?
  copy(db.connrequests, db.connrequests[1:])
  db.connrequests = db.connrequests[:c-1]
  if err == nil {
   dc.inuse = true
  }
  req <- connrequest{
   conn: dc,
   err: err,
  }
  return true
 } else if err == nil && !db.closed && db.maxidleconnslocked() > len(db.freeconn) {
  db.freeconn = append(db.freeconn, dc)
  db.startcleanerlocked()
  return true
 }
 return false
}

释放的过程:

* step1:首先检查下当前归还的连接在使用过程中是否发现已经无效,如果无效则不再放入连接池,然后检查下等待连接的请求数新建连接,类似获取连接时的异常处理,如果连接有效则进入下一步;

* step2:检查下当前是否有等待连接阻塞的请求,有的话将当前连接发给最早的那个请求,没有的话则再判断空闲连接数是否达到上限,没有则放入freeconn空闲连接池,达到上限则将连接关闭释放。

* step3:(只执行一次)启动connectioncleaner协程定时检查feeconn中是否有过期连接,有则剔除。

有个地方需要注意的是,query、exec操作用法有些差异:

a.exec(update、insert、delete等无结果集返回的操作)调用完后会自动释放连接;

b.query(返回sql.rows)则不会释放连接,调用完后仍然占有连接,它将连接的所属权转移给了sql.rows,所以需要手动调用close归还连接,即使不用rows也得调用rows.close(),否则可能导致后续使用出错,如下的用法是错误的:

//错误
db.setmaxopenconns(1)
db.query("select * from test")

row,err := db.query("select * from test") //此操作将一直阻塞

//正确
db.setmaxopenconns(1)
r,_ := db.query("select * from test")
r.close() //将连接的所属权归还,释放连接
row,err := db.query("select * from test")
//other op
row.close()

附:请求一个连接的函数有好几种,执行完毕处理连接的方式稍有差别,大致如下:

  • db.ping() 调用完毕后会马上把连接返回给连接池。
  • db.exec() 调用完毕后会马上把连接返回给连接池,但是它返回的result对象还保留这连接的引用,当后面的代码需要处理结果集的时候连接将会被重用。
  • db.query() 调用完毕后会将连接传递给sql.rows类型,当然后者迭代完毕或者显示的调用.clonse()方法后,连接将会被释放回到连接池。
  • db.queryrow()调用完毕后会将连接传递给sql.row类型,当.scan()方法调用之后把连接释放回到连接池。
  • db.begin() 调用完毕后将连接传递给sql.tx类型对象,当.commit()或.rollback()方法调用后释放连接。

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对的支持。