本案例使用 python 基于令牌桶算法进行测试。# encoding: utf-8
"""
author: yangyi@youzan.com
time: 2020/9/9 10:43 PM
func:
"""
import time
import multiprocessing
TEST = {
# 测试 {'test1': 20} 每秒产生的令牌数量
'all': {
'test1': {'test1': 20},
'test2': {'test2': 50},
'test3': {'test3': 80},
'test4': {'test4': 100},#表示突发100个请求
'test5': {'test5': 200},#表示突发200个请求
'test6': {'test6': 20},
}
}
class TokenBucket(object):
# rate是令牌发放速度,capacity是桶的大小
def __init__(self, rate, capacity):
self._rate = rate
self._capacity = capacity
self._current_amount = 0
self._last_consume_time = int(time.time())
# token_amount是发送数据需要的令牌数
def consume(self, token_amount):
time.sleep(1)
# 计算从上次发送到这次发送,新发放的令牌数量
increment = (int(time.time()) - self._last_consume_time) * self._rate
# 令牌数量不能超过桶的容量
self._current_amount = min(
increment + self._current_amount, self._capacity)
# 如果没有足够的令牌,则不能发送数据
if token_amount > self._current_amount:
return False
self._last_consume_time = int(time.time())
self._current_amount -= token_amount
return True
def job():
i = 100
while i>1:
for result in result_dict.values():
key = tuple(result.keys())[0]
rate = tuple(result.values())[0]
i = i-1
if i <= 0:
break
if not limiter.consume(rate):
print(key + ' 限流')
else:
print(key + ' 正常')
def run():
threads = [multiprocessing.Process(target=job) for i in range(3)]
for thread in threads:
thread.start()
if __name__ == '__main__':
result_dict = TEST["all"]
RATE = 30
CAPACITY = 120
limiter = TokenBucket(RATE, CAPACITY)
run()