1、多个独立任务
package main import ( "fmt" "runtime" ) /** 并发多个独立的任务 */ func main() { //处理任务的goroutine 数量(处理任务数量,和等待所有的goroutine处理完毕微循环数量一致) workers := runtime.Numcpu() //使用最大的cup数 runtime.GOMAXPROCS(workers) //任务通道,所有的任务发往此通道 jobs := make(chan int,workers) //完成chan,当任务处理完毕时,往此chan发送一个数据。用于监听所有的goroutine都执行完毕,数据类型没有要求 done := make(chan bool,workers) //创建一个gotoutine,用于生成任务 go generateJob(jobs) //处理任务 processJob(done,jobs,workers) //等待所有goroutine 执行完毕 waitUntil(done,workers) } /** 生成任务 jobs chan 为只接收chan */ func generateJob(jobs chan<- int) { for i := 1; i <= 10; i++ { jobs <- i } //所有数据发送完毕后关闭通道。此方法只是告诉使用此通道的地方,此通道中已经不再发送数据了,并没有真正的关闭。 close(jobs) } /** 处理任务 只向done chan 中发送数据 只从jobs chan 中接收数据 workers 创建goroutine 数量 */ func processJob(done chan<- bool,jobs <-chan int,workers int) { for i := 0; i < workers; i++ { go func() { //当前jobs chan 中没有数据时阻塞,直到调用close(jobs)方法,或者有数据 for job := range jobs { //处理任务 fmt.Println(job) } //此goroutine执行完毕,done 中存放标识 done <- true }() } } //等待所有的goroutine 执行完毕 func waitUntil(done <-chan bool,workers int) { for i := 0; i < workers; i++ { <-done } }说明:
1)、处理任务的goroutine 数量和等待所有goroutine执行完毕循环(waitUntil方法)次数一致,为:workers数量
2)、定义的jobs通道是有缓存的,如果想要按顺序执行,可以不使用缓存,这样使用只能一个一个处理。
3)、generateJob 方法中有关闭jobs通道方法,只是告诉使用此通道的地方,已经没有数据往里面发送了,没有真正的关闭。代码中并没有关闭done 通道的代码,因为没有在需要检查这个通道是否被关闭的地方使用此通道。waitUntil 方法中,通过done的阻塞,可以确保所有的处理工作在主的goroutine退出之前完成。
2、互斥量(锁)
package safe import "sync" /** 安全map */ type SafeMap struct { //map CountMap map[string]int //互斥量 mutex *sync.RWMutex } /** 创建 */ func NewSafeMap() *SafeMap { return *SafeMap{make(map[string]int),new(sync.RWMutex)} } /* 添加计数 */ func (sm *SafeMap) Increment(str string) { //获取锁 sm.mutex.Lock() //释放锁 defer sm.mutex.Unlock() //计数 sm.CountMap[str]++ } /* 读取值 */ func (sm *SafeMap) Read(str string) int { //获取读锁锁 sm.mutex.RLock() //释放锁 defer sm.mutex.RUnlock() //计数 v,e := sm.CountMap[str] if !e { return v } else { return 0 } } package safe import "sync" /** 安全map */ type SafeMap struct { //map CountMap map[string]int //互斥量 mutex *sync.RWMutex } /** 创建 */ func NewSafeMap() *SafeMap { return *SafeMap{make(map[string]int),e := sm.CountMap[str] if !e { return v } else { return 0 } }说明:
1)、SafeMap 是一个多线程安全的map,通过new(sync.RWMutex) 创建一个读写锁。在操作map的时候首先要锁定互斥量,通过defer 来保证释放互斥量。(这是只写了一个方法)
2)、在Read 方法中,获取的读锁,这样可以更加高效一点。
3)、可以通过把对map的操作发送到一个没有缓存的通道,再对map进行操作也可以实现安全的map。
3、多个任务结果的合并
示例:计算50的阶乘
package main import ( "fmt" "runtime" ) func main() { workers := runtime.Numcpu() //使用最大的cup数 runtime.GOMAXPROCS(workers) //任务chann jobs := make(chan section,workers) //创建一个gotoutine,用于生成任务 go generateJob(jobs) //存放每个处理任务的goroutine 运算结果 result := make(chan uint64,workers) //处理任务 processJob(result,workers) //计算所有结果的乘积 factorialResult := caclResult(result,workers) fmt.Println(factorialResult) } /** 生成任务 jobs chan 为只接收chan */ func generateJob(jobs chan<- section) { jobs <- section{1,10} jobs <- section{11,20} jobs <- section{21,30} jobs <- section{31,40} jobs <- section{41,50} //所有数据发送完毕后关闭通道。此方法只是告诉使用此通道的地方,此通道中已经不再发送数据了,并没有真正的关闭。 close(jobs) } func processJob(result chan<- uint64,jobs <-chan section,workers int) { for i := 0; i < workers; i++ { go func() { //阶乘 var factorial uint64 = 1 for job := range jobs { for i := job.min; i <= job.max; i++ { factorial *= uint64(i) } } //计算完毕后,把结果放到result chan中 result <- factorial }() } } /** 计算所有结果 */ func caclResult(result <-chan uint64,workers int) uint64 { var factorial uint64 = 1 for i := 0; i < workers; i++ { count := <-result factorial *= count } return factorial } /** 保存阶乘区间的最大值和最小值 */ type section struct { min int max int }
说明:
1)、就是把50的阶乘分计算,示例代码是计算1..10,11..20 ... 和乘积,然后再计算各个结果的乘积
2)、generateJob中,是把要计算区间的struct 放入jobs通道
3)、在processJob中,每个goroutine都创建一个计算结果,在计算完毕后发送到result 通道中。result 保存所有的计算结果。
4)、监听主goroutine的代码被替换成计算总的结果。
4、不定goroutine 数量