首先科普一下常用时间换算:
1000 毫秒(ms) = 1秒 1,000,000 微秒(μs) = 1秒 1,000 纳秒(ns) = 1秒
这个应该不需要解释。
首先看一下定义数据结构:
type Bucket struct { startTime time.Time capacity int64 quantum int64 fillInterval time.Duration mu sync.Mutex avail int64 availTick int64 }
上面的avail就是可用的令牌个数,availTick是已经走过的时间tick,这个后面会讲到。限速,就是要控制每秒的速度,看一下速度是怎么设定的10的九次方(纳秒)*因子除以时间间隔,
func (tb *Bucket) Rate() float64 {
return 1e9 * float64(tb.quantum) / float64(tb.fillInterval)
}
测试一下“速度”
func isCloseTo(x,y,tolerance float64) bool {
return math.Abs(x-y)/y < tolerance
}
tb = ratelimit.NewBucketWithQuantum(100*time.Millisecond, 1, 5)
println(isCloseTo(tb.Rate(), 50, 0.00001))
func (tb *Bucket) available(now time.Time) int64 {
tb.mu.Lock()
defer tb.mu.Unlock()
tb.adjust(now)
return tb.avail
}
这个方法通过adjust调整,
func (tb *Bucket) adjust(now time.Time) (currentTick int64) {
currentTick = int64(now.Sub(tb.startTime) / tb.fillInterval)
if tb.avail >= tb.capacity {
return
}
tb.avail += (currentTick - tb.availTick) * tb.quantum
if tb.avail > tb.capacity {
tb.avail = tb.capacity
}
tb.availTick = currentTick
return
}
我们看看是怎么调整的,拿当前时间减去初始时间除以时间间隔获取tick个数,那这个tick数减去之前已经分配tick个数,就是增量的时间Tick,再拿这个增量的Tick*因子就算出要增加的令牌个数,这样就调整了avail的个数了,并且更新availTick个数,当然不能超过容量,所以才有了tb.avail = tb.capacity。设计的简单可靠,不需要开启单独协程定时的往这个bucket里面加入令牌。
查到可用的令牌,当然你就可以获取令牌了,通过toke方法,
func (tb *Bucket) take(now time.Time,count int64,maxWait time.Duration) (time.Duration,bool) {
if count <= 0 {
return 0,true
}
tb.mu.Lock()
defer tb.mu.Unlock()
currentTick := tb.adjust(now)
avail := tb.avail - count
if avail >= 0 {
tb.avail = avail
return 0,true
}
// Round up the missing tokens to the nearest multiple
// of quantum - the tokens won't be available until
// that tick.
endTick := currentTick + (-avail+tb.quantum-1)/tb.quantum
endTime := tb.startTime.Add(time.Duration(endTick) * tb.fillInterval)
waitTime := endTime.Sub(now)
if waitTime > maxWait {
return 0,false
}
tb.avail = avail
return waitTime,true
}
这个里面不仅涉及到令牌获取,获取后tb.avail - count,而且还可以估算等待时间,如果超过现有令牌,可以预估等待时间waitTime,这样不仅可以获取,还可以通过Wait方法等待,如果成功返回0,true。
func (tb *Bucket) Wait(count int64) {
if d := tb.Take(count); d > 0 {
time.Sleep(d)
}
}
等待,直到令牌数符合要求。那个整个限速器基本功能就具备了,还补充两个方法,
func (tb *Bucket) takeAvailable(now time.Time,count int64) int64 {
if count <= 0 {
return 0
}
tb.mu.Lock()
defer tb.mu.Unlock()
tb.adjust(now)
if tb.avail <= 0 {
return 0
}
if count > tb.avail {
count = tb.avail
}
tb.avail -= count
return count
}
这个方法会安全的拿到可用的令牌,如果获取令牌超过的话,会获取并返回当前现有的令牌。当然,如果你已经知道速度,就可以创建一个已知rate的令牌桶了
func NewBucketWithRate(rate float64,capacity int64) *Bucket {
for quantum := int64(1); quantum < 1<<50; quantum = nextQuantum(quantum) {
fillInterval := time.Duration(1e9 * float64(quantum) / rate)
if fillInterval <= 0 {
continue
}
tb := NewBucketWithQuantum(fillInterval,capacity,quantum)
if diff := math.Abs(tb.Rate() - rate); diff/rate <= rateMargin {
return tb
}
}
panic("cannot find suitable quantum for " + strconv.FormatFloat(rate,'g', -1, 64))
}
这样完整的限速器完毕