golang 实现一个通用协程池

golang 是一门很优秀的语言,语法简单,功能强大 ,支持的 channal、goroutine 等都是非常优秀的特性。由于之前用golang 重构一个项目,对golang不是太了解,栽了不少坑,其中主要问题为:

1. go 直接协程运行函数方法,大并发的时候不太可控会导致协程数量急剧增加

2.协程池方式运行有不想每一个结构体都启动一个协程池

所以就萌生出搞一个通用协程池的想法,主要思想为,启动多个协程 从 channal 队列读取数据,由业务将需要执行的方法与参数放入channal,协程池仅仅负责维护协程,自动扩容,缩减,启用备用队列等策略,至于返回结果之类的都有业务方实现。

关键实现:

1. 定义一个task 的结构体 标示具体要执行的任务格式

type Job func([]interface{})
type taskWork struct {
	Run       Job
	startBool bool
	params    []interface{}
}

2.定义一个worker 池,控制协程相关信息

type WorkPool struct {
	taskPool  chan taskWork
	workNum   int
	maxNum    int
	stopTopic bool
	//考虑后期 作为冗余队列使用
	taskQue chan taskWork
}

3.实现协程池相关启动,停止,扩容策略,缩减策略,备用队列启用等 逻辑

//得到一个线程池并返回 句柄
func (p *WorkPool) InitPool() {
	*p = WorkPool{workNum: workerNumDefault,maxNum: workerNumMax,stopTopic: false,taskPool: make(chan taskWork,workerNumDefault*2),taskQue: nil}

	(p).start()
	go (p).workerRemoveConf()
}

//开始work
func (p *WorkPool) start() {
	for i := 0; i < workerNumDefault; i++ {
		p.workInit(i)
		fmt.Println("start pool task:",i)
	}
}

//初始化 work池 后期应该考虑如何 自动 增减协程数,以达到最优
func (p *WorkPool) workInit(id int) {
	go func(idNum int) {
		//var i int = 0
		for {
			select {
			case task := <-p.taskPool:
				if task.startBool == true && task.Run != nil {
					//fmt.Print("this is pool ",idNum,"---")
					task.Run(task.params)
				}
				//单个结束任务
				if task.startBool == false {
					//fmt.Print("this is pool -- ","---")
					return
				}
				//防止从channal 中读取数据超时
			case <-time.After(time.Millisecond * 1000):
				//fmt.Println("time out init")
				if p.stopTopic == true && len(p.taskPool) == 0 {
					fmt.Println("topic=",p.stopTopic)
					//work数递减
					p.workNum--
					return
				}
				//从备用队列读取数据
			case queTask := <-p.taskQue:
				if queTask.startBool == true && queTask.Run != nil {
					//fmt.Print("this is que ","---")
					queTask.Run(queTask.params)
				}
			}

		}
	}(id)

}

//停止一个workPool
func (p *WorkPool) Stop() {
	p.stopTopic = true
}

//普通运行实例,非自动扩充
func (p *WorkPool) Run(funcJob Job,params ...interface{}) {
	p.taskPool <- taskWork{funcJob,true,params}
}

//用select 去做 实现 自动扩充 协程个数 启用备用队列等特性
func (p *WorkPool) RunAuto(funcJob Job,params ...interface{}) {
	task := taskWork{funcJob,params}
	select {
	//正常写入
	case p.taskPool <- task:
		//写入超时 说明队列满了 写入备用队列
	case <-time.After(time.Millisecond * 1000):
		p.taskQueInit()
		p.workerAddConf()
		//task 入备用队列
		p.taskQue <- task
	}
}

//自动初始化备用队列
func (p *WorkPool) taskQueInit() {
	//扩充队列
	if p.taskQue == nil {
		p.taskQue = make(chan taskWork,p.maxNum*2)
	}
}

//自动扩充协程 简单的自动扩充策略
func (p *WorkPool) workerAddConf() {
	//说明需要扩充进程  协程数量小于 1000 协程数量成倍增长
	if p.workNum < 1000 {
		p.workerAdd(p.workNum)
	} else if p.workNum < p.maxNum {
		tmpNum := p.maxNum - p.workNum
		tmpNum = tmpNum / 10
		if tmpNum == 0 {
			tmpNum = 1
		}
		p.workerAdd(1)
	}
}

//自动缩减协程 实现比较粗糙,可以考虑后续精细实现一些策略
func (p *WorkPool) workerRemoveConf() {
	for {
		select {
		case <-time.After(time.Millisecond * 1000 * 600):
			if p.workNum > workerNumDefault && len(p.taskPool) == 0 && len(p.taskQue) == 0 {
				rmNum := (p.workNum - workerNumDefault) / 5
				if rmNum == 0 {
					rmNum = 1
				}
				p.workerRemove(rmNum)
			}
		}
	}

}
func (p *WorkPool) workerAdd(num int) {
	for i := 0; i < num; i++ {
		p.workNum++
		p.workInit(p.workNum)
	}
}
func (p *WorkPool) workerRemove(num int) {
	for i := 0; i < num; i++ {
		task := taskWork{startBool: false}
		p.taskPool <- task
		p.workNum--
	}
}

4.我们来看一下使用的demo,可以很方便的把一个已有业务作为task 交给协程池执行了

package main

import (
	"fmt"
	"github.com/wangyaofenghist/go-Call/call"
	"github.com/wangyaofenghist/go-Call/test"
	"github.com/wangyaofenghist/go-worker-base/worker"
	"runtime"
	"time"
)

//声明一号池子
var poolOne worker.WorkPool

//声明回调变量
var funcs call.CallMap

//以结构体方式调用
type runWorker struct{}

//初始化协程池 和回调参数
func init() {
	poolOne.InitPool()
	funcs = call.CreateCall()

}

//通用回调
func (f *runWorker) Run(param []interface{}) {
	name := param[0].(string)
	//调用回调并拿回结果
	result,err := funcs.Call(name,param[1:]...)
	fmt.Println(result,err)
}
func runtimeNum() {
	for {
		fmt.Println("runtime num:",runtime.NumGoroutine())
		time.Sleep(time.Millisecond * 1000)
	}

}

//主函数
func main() {
	tmp := make(chan int)
	go runtimeNum()
	var runFunc runWorker = runWorker{}
	funcs.AddCall("test4",test.Test4)
	var startTime = time.Now().UnixNano()
	for i := 0; i < 10000; i++ {
		poolOne.RunAuto(runFunc.Run,"test4"," ee "," ff")
	}
	<-tmp
	return
	
}

以上通用协程池实现略微粗糙,并没有考虑太精细化的自动扩充协程策略或缩减策略,demo中可以将需要回调的函数加上sleep 会看到协程自动扩充与销毁的过程,中间涉及到一个通用回调是通过golang 的反射机制实现的一段通用代码,可以从gitHub 中拉取。

通用协程池 go-worker-base :https://github.com/wangyaofenghist/go-worker-base

通用回调函数 go-Call : https://github.com/wangyaofenghist/go-Call

相关文章

程序目录结构 简单实现,用户登录后返回一个jwt的token,下次请求带上token请求用户信息接口并返回信息...
本篇博客的主要内容是用go写一个简单的Proof-of-Work共识机制,不涉及到网络通信环节,只是一个本地的简...
简介 默克尔树(MerkleTree)是一种典型的二叉树结构,其主要特点为: 最下面的叶节点包含存储数据或其...
接下来学习并发编程, 并发编程是go语言最有特色的地方, go对并发编程是原生支持. goroutine是go中最近本...
先普及一下, 什么是广度优先搜索 广度优先搜索类似于树的层次遍历。从图中的某一顶点出发,遍历每一个顶...
第一天: 接口的定义和实现 第二天: 一. go语言是面向接口编程. 在学习继承的时候说过, go语言只有封装,...