go语言通过使用goroutine和channel,可以非常方便的执行异步处理操作。
[原博文](https://www.opsdash.com/blog/job-queues-in-go.html)
THE “NO-JOB-QUEUE” JOB QUEUE
如果仅仅只需要执行一个异步操作,而不需要job queue,那么可以用下面代码
- go process(job)
但是如果我们需要控制同时工作的job的数量或者对生产者的生产进行限制,就需要使用job queue.
THE SIMPLEST JOB QUEUE
下面是一个简单的job queue实现,worker从job queue中获取一个job来处理。
- func worker(jobChan <-chan Job) {
- for job := range jobChan {
- process(job)
- }
- }
-
- // make a channel with a capacity of 100.
- jobChan := make(chan Job, 100)
-
- // start the worker
- go worker(jobChan)
-
- // enqueue a job
- jobChan <- job
上面代码,创建一个Job对象的channel用来存放和传递job,该channel在创建时设置容量为100.然后启动一个worker goroutine从channel中抽取一个job进行处理,worker一次处理一个job。通过<- job往channel中加入新的job。channerl中数据的in和out是线程安全的,开发人员无需担心互斥。
PRODUCER THROTTLING
jobChan创建时拥有100的容量,那么如果这个channel已经有了100个job,再执行
- // enqueue a job
- jobChan <- job
操作时,这个操作就会阻塞。这个模式可以帮助我们限制生产者生产数据的数量,避免生产的数据过多。
如果channerl满了之后,我们并不希望当前生产者阻塞而是要返回一个错误消息给上层的调用者,我们可以使用下面的方法。
ENQUEUEING WITHOUT BLOCKING
实现非阻塞的生产者模式,我们可以使用select
- // TryEnqueue tries to enqueue a job to the given job channel. Returns true if
- // the operation was successful,and false if enqueuing would not have been
- // possible without blocking. Job is not enqueued in the latter case.
- func TryEnqueue(job Job,jobChan <-chan Job) bool {
- select {
- case jobChan <- job:
- return true
- default:
- return false
- }
- }
当通道jobchan已经满的时候,jobChan <- job: 阻塞,程序跳到default中执行。
STOPPING THE WORKER
如果job已经完了,如果优雅的告知worker停止,而不是阻塞的等待呢?我们可以使用close去关闭channel。
- close(jobChan)
然后woker的代码就可以变为
- for job := range jobChan {...}
在channerl关闭之前进入到jobChan的job会被woker读取出来进行处理,最后这个循环会自动退出。
(在golang中,读取已经关闭的channel是合法的,不过返回的第二结果是false)
WAITING FOR THE WORKER
使用close函数,是主动让worker去停止,但是如果想要等待woker处理完,我们就要使用sync.WaitGroup
- // use a WaitGroup
- var wg sync.WaitGroup
-
- func worker(jobChan <-chan Job) {
- defer wg.Done()
-
- for job := range jobChan {
- process(job)
- }
- }
-
- // increment the WaitGroup before starting the worker
- wg.Add(1)
- go worker(jobChan)
-
- // to stop the worker,first close the job channel
- close(jobChan)
-
- // then wait using the WaitGroup
- wg.Wait()
wg.Add(1)让WaitGroup增加1,wg.Done()让WaitGroup减1,wg.Wait()会一直阻塞除非变为0 。
WAITING WITH A TIMEOUT
如果不想要WaitGroup一直等,而是有个超时时间,我们可以用select实现
- // WaitTimeout does a Wait on a sync.WaitGroup object but with a specified
- // timeout. Returns true if the wait completed without timing out,false
- // otherwise.
- func WaitTimeout(wg *sync.WaitGroup,timeout time.Duration) bool {
- ch := make(chan struct{})
- go func() {
- wg.Wait()
- close(ch)
- }()
- select {
- case <-ch:
- return true
- case <-time.After(timeout):
- return false
- }
- }
-
- // now use the WaitTimeout instead of wg.Wait()
- WaitTimeout(&wg, 5 * time.Second)
如果WaitGroup先返回,那么close(ch)执行后,case<- ch:有效就会执行,否则当timeout到达后,case <-time.After(timeout):
就会执行。
CANCELLING WORKERS
如果想要worker立刻停止当前工作,而不是之前那样worker还会处理剩下的job,我们可以利用context
- // create a context that can be cancelled
- ctx,cancel := context.WithCancel(context.Background())
-
- // start the goroutine passing it the context
- go worker(ctx,jobChan)
-
- func worker(ctx context.Context,jobChan <-chan Job) {
- for {
- select {
- case <-ctx.Done():
- return
-
- case job := <-jobChan:
- process(job)
- }
- }
- }
-
- // Invoke cancel when the worker needs to be stopped. This *does not* wait
- // for the worker to exit.
- cancel()
首先
- ctx,cancel := context.WithCancel(context.Background())
创建一个context对象以及相关的cancel,当cancel被调用后,ctx.Done()变为可读,worker就会返回
这个方法有一个小问题,就是如果某时刻jobChan中有job,同时cancel也被调用了,那么<-ctx.Done():和job := <-jobChan:
同时都不阻塞了,那么select就会随机选一个,这个作为开发者就无法决定了。
CANCELLING WORKERS WITHOUT CONTEXT
- // create a cancel channel
- cancelChan := make(chan struct{})
-
- // start the goroutine passing it the cancel channel
- go worker(jobChan,cancelChan)
-
- func worker(jobChan <-chan Job,cancelChan <-chan struct{}) {
- for {
- select {
- case <-cancelChan:
- return
-
- case job := <-jobChan:
- process(job)
- }
- }
- }
-
- // to cancel the worker,close the cancel channel
- close(cancelChan)
A POOL OF WORKERS
使用多个worker可以提高程序的并发行,最简单的做法如下
- for i:=0; i<workerCount; i++ {
- go worker(jobChan)
- }
然后多个worker会尝试从同一个channel中获取job,这个操作是安全的,作为开发者可以放心。
要等待这些worker完成工作,仍然可以用wait group如下:
- for i:=0; i<workerCount; i++ {
- wg.Add(1)
- go worker(jobChan)
- }
-
- // wait for all workers to exit
- wg.Wait()
如果要cancel这些worker,可以再单独使用一个channel,用来给这些worker发通知,类似CANCELLING WORKERS WITHOUT CONTEXT那样
- // create cancel channel
- cancelChan := make(chan struct{})
-
- // pass the channel to the workers,let them wait on it
- for i:=0; i<workerCount; i++ {
- go worker(jobChan,cancelChan)
- }
-
- // close the channel to signal the workers
- close(cancelChan)