欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Go常见并发模式

程序员文章站 2022-06-19 09:56:37
文章目录一、Go语言并发的基础元素1.goroutine2.channel3.sync 包的同步原语4.多并发控制神器:Context二、常见并发模式Go语言实现1.for select 循环模式无限循环(监控狗)有限循环(for range select )2.select timeout 模式3.流水线模式(Pipeline)4.扇出和扇入模式5.未来模式(Futures)一、Go语言并发的基础元素goroutine、channel、sync 是并发编程中必不可少的元素,context标准包更是为...

一、Go语言并发的基础元素

goroutine、channel、sync 是并发编程中必不可少的元素,
context标准包更是为我们并发编程提供了更好的支持。

1.goroutine

2.channel

常用结构:channel+select
select { 
case i1 = <-c1: 
     //todo 
case c2 <- i2: 
        //todo 
default: 
        // default todo 
} 

3.sync 包的同步原语

sync.Mutex 互斥锁
sync.RWMutex 读写锁
sync.WaitGroup
sync.Once
sync.Cond

4.多并发控制神器:Context

二、常见并发模式Go语言实现

并发模式和设计模式很相似,都是对现实场景的抽象封装,以便提供一个统一的解决方案。
但和设计模式不同的是,并发模式更专注于异步和并发。
我们会在很多项目的源代码中一遍遍的看到下面提到的并发模式,虽然解决的问题不一样,但它们的思路是相似的,所以我们也可以把它们进一步抽象,这样在项目开发中就可以直接复用。
并发模式不限于下面列举的这些,在项目中和并发、异步有关并且可以被抽象复用的解决方案都可以总结为并发模式。

1.for select 循环模式

for select 循环模式非常常见,它一般和 channel 组合完成任务,代码格式如下:
for { //for无限循环,或者for range循环 
  select { 
    //通过一个channel控制 
  } 
} 

For select有两种循环模式:有限循环和无限循环。

无限循环(监控狗)

//default 语句中执行任务,
//done channel 接收关闭通知。
for  { 
   select { 
   case <-done: 
      return 
   default: 
      //执行具体的任务 
   } 
 } 

有限循环(for range select )

一般用于把可以迭代的内容发送到 channel 上:

//done channel接收关闭通知,
//resultCh channel 用于接收 for range 循环的值,这些值通过 resultCh 可以传送给其他的调用者。
for _,s:=range []int{}{ 
   select { 
   case <-done: 
      return 
   case resultCh <- s: 
   } 
} 

2.select timeout 模式

获取数据遇到超时时,我们不可能一直等待,所以需要设置一个超时时间,如下所示

func main() { 
   result := make(chan string) 
   go func() { 
      //模拟网络访问 
      time.Sleep(8 * time.Second) 
      result <- "服务端结果" 
   }() 
   select { 
   case v := <-result: 
      fmt.Println(v) 
   case <-time.After(5 * time.Second): 
      fmt.Println("网络访问超时了") 
   } 
} 

select timeout 模式的核心在于通过 time.After 函数设置一个超时时间,防止因为异常造成 select 语句的无限等待。
小提示:如果可以使用 Context 的 WithTimeout 函数超时取消,要优先使用。

3.流水线模式(Pipeline)

以组装手机为例,假设一条组装手机的流水线有 3 道工序,分别是配件采购、配件组装、打包成品,如图所示:
Go常见并发模式

从以上示意图中可以看到,采购的配件通过 channel 传递给工序 2 进行组装,然后再通过 channel 传递给工序 3 打包成品。相对工序 2 来说,工序 1 是生产者,工序 3 是消费者。相对工序 1 来说,工序 2 是消费者。相对工序 3 来说,工序 2 是生产者。
我用下面的几组代码进行演示:

//工序1采购 
func buy(n int) <-chan string { 
   out := make(chan string) 
   go func() { 
      defer close(out) 
      for i := 1; i <= n; i++ { 
         out <- fmt.Sprint("配件", i) 
      } 
   }() 
   return out 
} 

首先我们定义一个采购函数 buy,它有一个参数 n,可以设置要采购多少套配件。采购代码的实现逻辑是通过 for 循环产生配件,然后放到 channel 类型的变量 out 里,最后返回这个 out,调用者就可以从 out 中获得配件。

//工序2组装 
func build(in <-chan string) <-chan string { 
   out := make(chan string) 
   go func() { 
      defer close(out) 
      for c := range in { 
         out <- "组装(" + c + ")" 
      } 
   }() 
   return out 
} 

组装函数 build 有一个 channel 类型的参数 in,用于接收配件进行组装,组装后的手机放到 channel 类型的变量 out 中返回。
有了组装好的手机,就可以放在精美的包装盒中售卖了,而包装的操作是工序 3 完成的,对应的函数是 pack,如下所示:

//工序3打包 
func pack(in <-chan string) <-chan string { 
   out := make(chan string) 
   go func() { 
      defer close(out) 
      for c := range in { 
         out <- "打包(" + c + ")" 
      } 
   }() 
   return out 
} 

函数 pack 的代码实现和组装函数 build 基本相同
流水线上的三道工序都完成后,就可以通过一个组织者把三道工序组织在一起,形成一条完整的手机组装流水线,这个组织者可以是我们常用的 main 函数,如下面的代码所示:

func main() { 
   coms := buy(10)    //采购10套配件 
   phones := build(coms) //组装10部手机 
   packs := pack(phones) //打包它们以便售卖 
   //输出测试,看看效果 
   for p := range packs { 
      fmt.Println(p) 
   } 
} 

从上述例子中,我们可以总结出一个流水线模式的构成:

  1. 流水线由一道道工序构成,每道工序通过 channel 把数据传递到下一个工序;
  2. 每道工序一般会对应一个函数,函数里有协程和 channel,协程一般用于处理数据并把它放入 channel 中,整个函数会返回这个 channel 以供下一道工序使用;
  3. 最终要有一个组织者(示例中的 main 函数)把这些工序串起来,这样就形成了一个完整的流水线,对于数据来说就是数据流。

4.扇出和扇入模式

假如上面的流水线出现了问题,其中的工序2特别慢,
为了提升产能,需要对工序 2 增加两班人手。如图便形成了扇入扇出:
Go常见并发模式

什么是扇入扇出:
1.以工序 1 为中点,三条传递数据的线发散出去,就像一把打开的扇子一样,所以叫扇出;
2.以 merge 组件为中点,三条传递数据的线汇聚到 merge 组件,也像一把打开的扇子一样,所以叫扇入
小提示:扇出和扇入都像一把打开的扇子,因为数据传递的方向不同,所以叫法也不一样,扇出的数据流向是发散传递出去,是输出流;扇入的数据流向是汇聚进来,是输入流。
三道工序的实现函数 buy、build、pack 都保持不变,只需要增加一个 merge 函数即可:

//扇入函数(组件),把多个chanel中的数据发送到一个channel中 
func merge(ins ...<-chan string) <-chan string { 
   var wg sync.WaitGroup 
   out := make(chan string) 
   //把一个channel中的数据发送到out中 
   p:=func(in <-chan string) { 
      defer wg.Done() 
      for c := range in { 
         out <- c 
      } 
   } 
   wg.Add(len(ins)) 
   //扇入,需要启动多个goroutine用于处于多个channel中的数据 
   for _,cs:=range ins{ 
      go p(cs) 
   } 
   //等待所有输入的数据ins处理完,再关闭输出out 
   go func() { 
      wg.Wait() 
      close(out) 
   }() 
   return out 
} 

新增的 merge 函数的核心逻辑就是对输入的每个 channel 使用单独的协程处理,并将每个协程处理的结果都发送到变量 out 中,达到扇入的目的。总结起来就是通过多个协程并发,把多个 channel 合成一个。
在整条手机组装流水线中,merge 函数非常小,而且和业务无关,不能当作一道工序,所以我把它叫作组件。该 merge 组件是可以复用的,流水线中的任何工序需要扇入的时候,都可以使用 merge 组件。
小提示:这次的改造新增了 merge 函数,其他函数保持不变,符合开闭原则。开闭原则规定“软件中的对象(类,模块,函数等等)应该对于扩展是开放的,但是对于修改是封闭的”。
有了可以复用的 merge 组件,现在来看流水线的组织者 main 函数是如何使用扇出和扇入并发模式的,如下所示:

func main() { 
   coms := buy(100)    //采购100套配件 
   //三班人同时组装100部手机 
   phones1 := build(coms) 
   phones2 := build(coms) 
   phones3 := build(coms) 
   //汇聚三个channel成一个 
   phones := merge(phones1,phones2,phones3) 
   packs := pack(phones) //打包它们以便售卖 
   //输出测试,看看效果 
   for p := range packs { 
      fmt.Println(p) 
   } 
} 

这个示例采购了 100 套配件,也就是开始增加产能了。于是同时调用三次 build 函数,也就是为工序 2 增加人手,这里是三班人手同时组装配件,然后通过 merge 函数这个可复用的组件将三个 channel 汇聚为一个,然后传给 pack 函数打包。
这样通过扇出和扇入模式,整条流水线就被扩充好了,大大提升了生产效率。因为已经有了通用的扇入组件 merge,所以整条流水中任何需要扇出、扇入提高性能的工序,都可以复用 merge 组件做扇入,并且不用做任何修改。

5.未来模式(Futures)

流水线模式中的工序是相互依赖的,上一道工序做完,下一道工序才能开始。但是在我们的实际需求中,也有大量的任务之间相互独立、没有依赖,所以为了提高性能,这些独立的任务就可以并发执行。
主协程不用等待子协程返回的结果,可以先去做其他事情,等未来需要子协程结果的时候再来取,如果子协程还没有返回结果,就一直等待。

//洗菜 
 
func washVegetables() <-chan string { 
   vegetables := make(chan string) 
   go func() { 
      time.Sleep(5 * time.Second) 
      vegetables <- "洗好的菜" 
   }() 
   return vegetables 
} 
//烧水 
func boilWater() <-chan string { 
   water := make(chan string) 
   go func() { 
      time.Sleep(5 * time.Second) 
      water <- "烧开的水" 
   }() 
   return water 
} 

洗菜和烧水这两个相互独立的任务可以一起做,所以示例中通过开启协程的方式,实现同时做的功能。当任务完成后,结果会通过 channel 返回。
小提示:示例中的等待 5 秒用来描述洗菜和烧火的耗时。
在启动两个子协程同时去洗菜和烧水的时候,主协程就可以去干点其他事情(示例中是眯一会),等睡醒了,要做火锅的时候,就需要洗好的菜和烧好的水这两个结果了。我用下面的代码进行演示:

func main() { 
   vegetablesCh := washVegetables() //洗菜 
   waterCh := boilWater()           //烧水 
   fmt.Println("已经安排洗菜和烧水了,我先眯一会") 
   time.Sleep(2 * time.Second) 
   fmt.Println("要做火锅了,看看菜和水好了吗") 
   vegetables := <-vegetablesCh 
   water := <-waterCh 
   fmt.Println("准备好了,可以做火锅了:",vegetables,water) 
} 

Futures 模式下的协程和普通协程最大的区别是可以返回结果,而这个结果会在未来的某个时间点使用。所以在未来获取这个结果的操作必须是一个阻塞的操作,要一直等到获取结果为止。
如果你的大任务可以拆解为一个个独立并发执行的小任务,并且可以通过这些小任务的结果得出最终大任务的结果,就可以使用 Futures 模式。

本文地址:https://blog.csdn.net/Edu_enth/article/details/112601550