欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

[Go] golang缓冲通道实现资源池

程序员文章站 2022-05-14 09:51:50
go的pool资源池:1.当有多个并发请求的时候,比如需要查询数据库2.先创建一个2个容量的数据库连接资源池3.当一个请求过来的时候,去资源池里请求连接资源,肯定是空的就创建一个连接,执行查询,结束后放入了资源池里4.当第二个请求过来的时候,也是去资源池请求连接资源,就直接在池中拿过来一个连接进行查 ......

go的pool资源池:
1.当有多个并发请求的时候,比如需要查询数据库
2.先创建一个2个容量的数据库连接资源池
3.当一个请求过来的时候,去资源池里请求连接资源,肯定是空的就创建一个连接,执行查询,结束后放入了资源池里
4.当第二个请求过来的时候,也是去资源池请求连接资源,就直接在池中拿过来一个连接进行查询
5.当并发大的时候,资源池里面没有足够连接资源,就会不停创建新资源,放入池里面的时候,也会放不进去,就主动关闭掉这个资源
6.这里的资源池实质上是一个缓冲通道,里面放着连接资源

 

package main

import (
	"errors"
	"io"
	"log"
	"math/rand"
	"sync"
	"sync/atomic"
	"time"
)

//定义一个结构体,这个实体类型可以作为整体单元被复制,可以作为参数或返回值,或被存储到数组
type pool struct {
	//定义成员,互斥锁类型
	m sync.mutex
	//定义成员,通道类型,通道传递的是io.closer类型
	resources chan io.closer
	//定义工厂成员,类型是func()(io.closer,error)
	//error是预定义类型,实际上是个interface接口类型
	factory func() (io.closer, error)
	closed  bool
}

//定义变量,函数返回的是error类型
var errpoolclosed = errors.new("池已经关闭了")

//定义new方法,创建一个池,返回的是pool类型的指针
//传入的参数是个函数类型func(io.closer,error)和池的大小
func new(fn func() (io.closer, error), size uint) (*pool, error) {
	//使用结构体字面值给结构体成员赋值
	mypool := pool{
		factory:   fn,
		resources: make(chan io.closer, size),
	}
	//返回两个返回值
	return &mypool, nil
}

//从池中请求获取一个资源,给pool类型定义的方法
//返回的值是io.closer类型
func (p *pool) acquire() (io.closer, error) {
	//基于select的多路复用
	//select会等待case中有能够执行的,才会去执行,等待其中一个能执行就执行
	//default分支会在所有case没法执行时,默认执行,也叫轮询channel
	select {
	case r, _ := <-p.resources:
		log.printf("请求资源:来自通道 %d", r.(*dbconn).id)
		return r, nil
	//如果缓冲通道中没有了,就会执行这里
	default:
		log.printf("请求资源:创建新资源")
		return p.factory()
	}
}

//将一个使用后的资源放回池
//传入的参数是io.closer类型
func (p *pool) release(r io.closer) {
	//使用mutex互斥锁
	p.m.lock()
	//解锁
	defer p.m.unlock()
	//如果池都关闭了
	if p.closed {
		//关掉资源
		r.close()
		return
	}
	//select多路选择
	//如果放回通道的时候满了,就关闭这个资源
	select {
	case p.resources <- r:
		log.printf("释放资源:放入通道 %d", r.(*dbconn).id)
	default:
		log.printf("释放资源:关闭资源%d", r.(*dbconn).id)
		r.close()
	}
}

//关闭资源池,关闭通道,将通道中的资源关掉
func (p *pool) close() {
	p.m.lock()
	defer p.m.unlock()
	p.closed = true
	//先关闭通道再清空资源
	close(p.resources)
	//清空并关闭资源
	for r := range p.resources {
		r.close()
	}
}

//定义全局常量
const (
	maxgoroutines = 20 //使用25个goroutine模拟同时的连接请求
	poolsize      = 2  //资源池中的大小
)

//定义结构体,模拟要共享的资源
type dbconn struct {
	//定义成员
	id int32
}

//dbconn实现io.closer接口
func (db *dbconn) close() error {
	return nil
}

var idcounter int32 //定义一个全局的共享的变量,更新时用原子函数锁住
//定义方法,创建dbconn实例
//返回的是io.closer类型和error类型
func createconn() (io.closer, error) {
	//原子函数锁住,更新加1
	id := atomic.addint32(&idcounter, 1)
	log.printf("创建新资源: %d", id)
	return &dbconn{id}, nil
}
func main() {
	//计数信号量
	var wg sync.waitgroup
	//同时并发的数量
	wg.add(maxgoroutines)
	mypool, _ := new(createconn, poolsize)
	//开25个goroutine同时查询
	for i := 0; i < maxgoroutines; i++ {
		//模拟请求
		time.sleep(time.duration(rand.intn(2)) * time.second)
		go func(gid int) {
			execquery(gid, mypool)
			wg.done()
		}(i)
	}
	//等待上面开的goroutine结束
	wg.wait()
	mypool.close()
}

//定义一个查询方法,参数是当前gorotineid和资源池
func execquery(goroutineid int, pool *pool) {
	//从池里请求资源,第一次肯定是没有的,就会创建一个dbconn实例
	conn, _ := pool.acquire()
	//将创建的dbconn实例放入了资源池的缓冲通道里
	defer pool.release(conn)
	//睡眠一下,模拟查询过程
	time.sleep(time.duration(rand.intn(10)) * time.second)
	log.printf("执行查询...协程id [%d] 资源id [%d]", goroutineid, conn.(*dbconn).id)
}

[Go] golang缓冲通道实现资源池