自适应过载保护 PART2

背景

之前 PART1 文章主要集中在服务提供者角度的过载保护策略

本文想讨论下服务消费者角度,如何应对过载问题

即在过载发生时,如何从被动接收service响应过渡到主动调整请求策略

算法

服务消费者自行限制请求速度,限制生成请求的数量, 超过这个数量的请求直接在本地回复失败,而不会真是发送到服务端提供者

第一时间想到熔断机制,如 Hystrix,但它有一个问题,熔断器开启时,过于一刀切,一个都不放过

是否有算法可以适度的允许request进行?

有如下指标,每个请求者记录某个时间窗口内的以下信息(滑动窗口实现):

  • requests(客户端请求总量): The number of requests attempted by the application layer(at the client, on top of the adaptive throttling system)
  • accepts(成功的请求总量 - 被 accepted的量): The number of requests accepted by the backend
  • 在通常情况下(无错误发生时) requests==accepts
  • 出现过载保护后,accepts的数量会逐渐小于 requests
  • 请求者可以继续发送请求直到 requests=K * accepts,一旦超过这个值就启动自适应限流机制,新产生的请求在本地会以 p 概率被拒绝
  • 当请求者主动丢弃请求时,requests 值会一直增大,在某个时间点会超过 K*accepts,使概率 p 计算出来的值大于 0,此时请求者会以此概率对请求做主动丢弃
  • 当后端逐渐恢复时,accepts 增加,同时 requests 值也会增加,但是由于 K 的关系,K*accepts 的放大倍数更快),概率 p趋近于0,请求者自适应限流结束
  • 降低 K 值会使自适应限流算法更加激进(允许请求者在算法启动时拒绝更多本地请求)
  • 增加 K 值会使自适应限流算法不再那么激进(允许服务端在算法启动时尝试接收更多的请求,与上面相反)

实现

go-kratos 代码示例(K = 1.5)

// sreBreaker is a sre CircuitBreaker pattern.
type sreBreaker struct {
	stat metric.RollingCounter
	r    *rand.Rand
	// rand.New(...) returns a non thread safe object
	randLock sync.Mutex

	k       float64
	request int64

	state int32
}
func (b *sreBreaker) summary() (success int64, total int64) {
	b.stat.Reduce(func(iterator metric.Iterator) float64 {
		for iterator.Next() {
			bucket := iterator.Bucket()
			total += bucket.Count
			for _, p := range bucket.Points {
				success += int64(p)
			}
		}
		return 0
	})
	return
}
func (b *sreBreaker) Allow() error {
	success, total := b.summary()
	k := b.k * float64(success)
	if log.V(5) {
		log.Info("breaker: request: %d, succee: %d, fail: %d", total, success, total-success)
	}
	// check overflow requests = K * success
	if total < b.request || float64(total) < k {
		if atomic.LoadInt32(&b.state) == StateOpen {
			atomic.CompareAndSwapInt32(&b.state, StateOpen, StateClosed)
		}
		return nil
	}
	if atomic.LoadInt32(&b.state) == StateClosed {
		atomic.CompareAndSwapInt32(&b.state, StateClosed, StateOpen)
	}
	dr := math.Max(0, (float64(total)-k)/float64(total+1))
	drop := b.trueOnProba(dr)
	if log.V(5) {
		log.Info("breaker: drop ratio: %f, drop: %t", dr, drop)
	}
	if drop {
		return ecode.ServiceUnavailable
	}
	return nil
}

本地drop请求的概率

dr := math.Max(0, (float64(total)-k)/float64(total+1))

判断drop := b.trueOnProba(dr)

即产生一个 0-1 之间的浮点数 ,小于dr则drop调请求

参考

Google handling-overload