欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

通过 Channel 实现 Goroutine Pool

程序员文章站 2022-06-20 23:03:20
最近用到了 Go 从 Excel 导数据到服务器内部 用的是 http 请求 但是发现一个问题 从文件读取之后 新开 Goroutine 会无限制新增 导致全部卡在初始化请求 于是乎就卡死了 问题模拟 模拟代码 go func main() { pool := sync.WaitGroup{} fo ......

最近用到了 go 从 excel 导数据到服务器内部 用的是 http 请求
但是发现一个问题 从文件读取之后 新开 goroutine 会无限制新增
导致全部卡在初始化请求 于是乎就卡死了

问题模拟

  • 模拟代码
func main() {
    pool := sync.waitgroup{}
    for i := 0; i < 500; i++ {
        pool.add(1)
        go func(i int) {
            resp, err := http.get("http://ip.3322.org")
            if err != nil {
                fmt.println(i, err)
            } else {
                defer resp.body.close()
                result, _ := ioutil.readall(resp.body)
                fmt.println(i, string(result))
            }
            pool.done()
        }(i)
    }
    pool.wait()
}
  • 数量小的情况下 没有问题 但是数量比较大的情况 就会发现程序直接卡死 一段时间之后报错 并且没有发出任何请求

问题解决

  • 实际上看的出来 是应为同时发起了太多的http请求 导致系统卡死 数据没有发送
  • 想到我在java中用thread提交请求 我就考虑 可不可限制 goroutine 的数量
  • 使用强大的百度 果然找到了大佬已经写好的协程池
  • 代码如下 我加上了注释
package gopool

import (
    "sync"
)

// pool goroutine pool
type pool struct {
    queue chan int
    wg    *sync.waitgroup
}

// new 新建一个协程池
func new(size int) *pool {
    if size <= 0 {
        size = 1
    }
    return &pool{
        queue: make(chan int, size),
        wg:    &sync.waitgroup{},
    }
}

// add 新增一个执行
func (p *pool) add(delta int) {
    // delta为正数就添加
    for i := 0; i < delta; i++ {
        p.queue <- 1
    }
    // delta为负数就减少
    for i := 0; i > delta; i-- {
        <-p.queue
    }
    p.wg.add(delta)
}

// done 执行完成减一
func (p *pool) done() {
    <-p.queue
    p.wg.done()
}

// wait 等待goroutine执行完毕
func (p *pool) wait() {
    p.wg.wait()
}
  • 然后修改刚才的测试方法
package main

import (
    "io/ioutil"
    "log"
    "net/http"
    "yumc.pw/cloud/lib/gopool"
)

func main() {
    // 这里限制5个并发
    pool := gopool.new(5)// sync.waitgroup{}
    for i := 0; i < 500; i++ {
        pool.add(1)
        go func(i int) {
            resp, err := http.get("http://ip.3322.org")
            if err != nil {
                fmt.println(i, err)
            } else {
                defer resp.body.close()
                result, _ := ioutil.readall(resp.body)
                fmt.println(i, string(result))
            }
            pool.done()
        }(i)
    }
    pool.wait()
}
  • 完美解决