欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

多任务的使用模式

程序员文章站 2022-07-02 12:42:24
仅执行单次的任务 全局资源的加载(初始化), 多任务中公共资源销毁 编写异步方法 假如任务耗时比较长可以考虑把方法编写成异步的(类似前端知识中的ajax请求),例如任务中有需要下载的任务,这时候下载任务可以考虑编写成异步,例如数据存储任务等 编写异步任务方法要点: 任务内部启动goroutine处理 ......

仅执行单次的任务

全局资源的加载(初始化), 多任务中公共资源销毁

// 初始化工作
// 1. 通过init 方法,仅执行一次,但是不能重复调用
// 模拟全局配置
type appconfig struct {
	name string
	ip string
	version string
}
 var appconfig *appconfig

func init()  {
	appconfig=new(appconfig)	
	appconfig.name = "myapp"
	appconfig.ip = "192.168.66.88"
	appconfig.version="v1.0"
}

// 2. 使用sync.once方法,保证我们代码执行一次
// 销毁全局资源
var once sync.once
func delapp()  {
	once.do(func(){
		appconfig = nil
		log.println("delete app!")
	})
}

编写异步方法

假如任务耗时比较长可以考虑把方法编写成异步的(类似前端知识中的ajax请求),例如任务中有需要下载的任务,这时候下载任务可以考虑编写成异步,例如数据存储任务等

编写异步任务方法要点: 任务内部启动goroutine处理任务,并立即返回>

// 编写一个数据库异步处理任务(模拟)
func storagedata(data string){
    // 预处理数据
    log.println("入库前预处理: ",data)
    go func(data){
        time.sleep(time.sencond*3)
        log.println("数据库存储完成")
    }
}

编写异步带回调方法

异步方法的基础上,如果想处理异步的结果,需要传递一个回调方法

注意: 编写异步的基础上,加上回调方法(处理结果)

// 编写一个异步爬取web的方法
package main

import (
	"io/ioutil"
	"log"
	"net/http"
	"os"
	"time"
)

func crawlurl(url string,fn func(response *http.response))  {
	go func() {
		response,err:=http.get(url)
		if err!=nil{
			log.println(err)
			return
		}
		fn(response)
	}()
}

func main() {

	crawlurl("http://www.baidu.com", func(response *http.response) {
		data,err:=ioutil.readall(response.body)
		if err!=nil{
			return
		}
		log.println("content length: ",len(data))
	})
	log.println("main func do other thing")

	crawlurl("http://i2.hdslb.com/bfs/archive/bc8adff1dafe7494c6b2155ec82725af0034c31b.png", func(response *http.response) {
		data,err:=ioutil.readall(response.body)
		if err!=nil{
			log.println(err)
			return
		}
		f,_:=os.create("1.png")
		defer f.close()
		_, _ = f.write(data)
	})

	time.sleep(time.second*5)
}

 

等待所有任务结束

启动多个子任务,等待子任务结束

不需要结果

需要结果的

// 1. 需要返回结果,仅需要完成任务即可
func calltasknorsult(){
    wg := sync.waitgroup{}
    wg.add(5)
    for i:=0;i<5;i++{
        go func(num int) {
            defer wg.done()
            log.println("任务",num,"完成")
        }(i)
    }
    wg.wait()
}


// 2. 需要完成任务还需要返回结果,可能需要做后续处理
func calltaskresults()  {
    // 创建一个channel 用于接收任务处理结果
    results:=make(chan string,10)

    defer close(results)
    // 随机种子
    rand.seed(time.now().unixnano())

    for i:=0;i<10;i++{
        go func(num int) {
            // 随机生成一个结果,并把结果添加到结果队列
            results<-fmt.sprintf("task %d# result: %d",num,rand.intn(20)+10)
        }(i)
    }

    //等待输出结果
    for i:=0;i<10;i++{
        log.println(<-results)
    }
}

 

等待任意一个任务完成

执行等待多个子任务,只要任意一个任务完成即可,例如下载文件,假如有多个下载源,不确定哪个源网络情况好,可以考虑使用这个方式

// 模拟多源下载,任意一个任务完成就结束
func download(){
	// 定义一个channel,用于接收结果
	result:=make(chan string)

	defer close(result)

	// 种子
	rand.seed(time.now().unixnano())

	// 异步获取子任务下载 数据
	for i:=0;i<10;i++{
		go func(num int) {
			// 随机休眠一段时间
			time.sleep(time.second*time.duration(rand.intn(5)+1))
			result<-fmt.sprintf("task %d# download ok!",num)
		}(i)
	}

	// 等待子任务完成
	log.println(<-result)

}

 

协作等待其他任务结果

稍微复杂的一些业务,其中的一个子任务需要另一个子任务的结果

 

多任务的使用模式

 

 

// 异步协作,任务b需要任务的计算结果
func bwaita()  {
	// 创建channel 用于任务通讯
	ch:=make(chan string,1)  // 缓存为1
	defer close(ch)
	// 创建wait group
	wg:= sync.waitgroup{}
	wg.add(2)
	// 启动任务a
	go func() {
		defer wg.done()
		log.println("a do working...")
		time.sleep(time.second*3)
		log.println("a end calc...., send result to b")
		ch<-"data for b"
		log.println("a do send result to b, do other thing!")
	}()

	// 启动任务b
	go func() {
		defer wg.done()
		log.println("b do working...")
		time.sleep(time.second*2)
		log.println("b wait a...")
		log.println(<-ch)
		log.println("b user a result do other thing!")
	}()
	wg.wait()
}

任务取消

某些特殊的情景下,我们可能需要取消子任务的执行,例如主任务因为用户的原因,需要提前结束,通知所有的子任务结束

单层任务取消

func cancletask()  {
	// 创建一个cancele 的channel 用于通知子任务结束
	canncle:=make(chan struct{})

	// 封装一个方法用于检查 任务是否被取消了
	iscancle:= func() bool {
		select {
		case <-canncle:
			return true
		default:
			return false
		}
	}
	//模拟启动子任务
	for i:=0;i<5;i++{
		go func(num int) {
			for{
        // 监听任务是否被取消
				if iscancle(){
					// 如果被取消则 退出任务
					log.printf("task #%d is canceled!\n",num)
					return
				}
				log.printf("task #%d is working...\n",num)
                time.sleep(time.millisecond *50)
			}
		}(i)
	}
	// 模拟任务运行一段时间
	log.println("main working for an while...")
	time.sleep(time.millisecond *100)

	// 取消任务,利用关闭channel的广播特性
	close(canncle)

	time.sleep(time.second)
}

 

多层任务取消

当任务比较复杂的时候,更多情况可能是关联任务,例如下图这样的多层任务

context 初识

  • 根context节点,context.backgroud() 创建一个空的context(根节点),没有任何作用,为了给子contentx继承的

  • ctx,canncel:=context.withcancel(ctx),返回一个ctx子节点,返回一个canclefuc

  • 调用取消方法,给所有的子节点发送取消信号

  • 子任务中监听取消信号并退出任务

使用context取消任务

func cancelwithctx()  {
	// 创建空ctx,根节点
	root:=context.background()

	// 定义iscancel方法
	iscanncel:= func(ctx context.context) bool {
		select {
		case <-ctx.done():
			return true
		default:
			return false
		}
	}

	task:=func (ctx context.context, num int){
		// 启动子任务
		go func() {
			for{
				if iscanncel(ctx){
					// 如果被取消则 退出任务
					log.printf("task #%d sub is canceled!\n",num)
					return
				}
			}
		}()
		for{
			if iscanncel(ctx){
				// 如果被取消则 退出任务
				log.printf("task #%d is canceled!\n",num)
				return
			}
			log.printf("task #%d is working...\n",num)
			time.sleep(time.millisecond*200)
		}
	}
    // 创建子ctx,用于取消
	ctxone,cancelone:=context.withcancel(root)
	go task(ctxone,1)
    
    // 创建子ctx,用于取消
	ctxtwo,_:=context.withcancel(root)
	go task(ctxtwo,2)

	// 模拟保证所有的goroutine都运行起来
	time.sleep(time.second)
    // 取消 第一个任务
	cancelone()
	time.sleep(time.second)
}

output:

2019/06/27 11:55:45 task #1 is working...
2019/06/27 11:55:45 task #2 is working...
2019/06/27 11:55:46 task #1 is working...
2019/06/27 11:55:46 task #2 is working...
2019/06/27 11:55:46 task #1 is working...
2019/06/27 11:55:46 task #2 is working...
2019/06/27 11:55:46 task #1 is working...
2019/06/27 11:55:46 task #2 is working...
2019/06/27 11:55:46 task #1 is working...
2019/06/27 11:55:46 task #2 is working...
2019/06/27 11:55:46 task #1 sub is canceled!
2019/06/27 11:55:46 task #1 is canceled!
2019/06/27 11:55:46 task #2 is working...
2019/06/27 11:55:47 task #2 is working...
2019/06/27 11:55:47 task #2 is working...
2019/06/27 11:55:47 task #2 is working...
2019/06/27 11:55:47 task #2 is working...

 

context在net/http包中的演示

package main

import (
	"fmt"
	"net/http"
	"time"
)

func ctxtest(w http.responsewriter, r *http.request){
	// 获取 ctx
	ctx := r.context()

	fmt.println("processing request")

	select {
	case <-time.after(5 * time.second):
		// 模拟请求处理完
		w.write([]byte("request processed"))
	case <-ctx.done(): // 网页加载完毕 done 信号发出
		// 用户取消的时候,获取取消信号s
		fmt.println( "request cancelled")
	}
}

func main() {
	// 绑定路由处理方法
	http.handlefunc("/",ctxtest)
	// 启动服务
	http.listenandserve(":8000", nil)
}