几种限流方式
前言
参考文章:传送门1传送门2
限流的几种方式:
1.令牌桶算法:令牌桶是一个固定容量的桶,桶中可以容纳一定数量的令牌。当请求到来时,会从桶中获取一个令牌,如果桶中没有令牌,则请求会被拒绝。令牌桶算法的核心在于,桶中的令牌数量是有限的,因此系统的访问速率就被限制了
2.漏桶算法:漏桶是一个固定容量的桶,桶中会不断地往外漏水。当请求到来时,会将请求放入桶中,如果桶中没有空间,则请求会被拒绝。漏桶算法的核心在于,桶中的水量是有限的,因此系统的访问速率就被限制了
3. 滑动窗口计数:滑动窗口计数是一种时间窗口内的请求计数方法,通过设定时间窗口和最大请求次数,来限制请求的进入。滑动窗口可以精确地控制时间窗口内的请求量
4. 固定窗口计数:与滑动窗口计数类似,但是固定窗口计数以固定的时间间隔来统计请求,而不是滑动的时间窗口。这种方法实现简单,但不能精确控制流量
5. 信号量(Semaphore):Java中的Semaphore
是一种限制资源使用的同步工具,可以用于限流。通过设定信号量的最大计数,可以限制同时进入系统的请求数量
6. Nginx 限流:在使用Nginx作为Web服务器时,可以利用Nginx的限流模块进行流量控制,它支持多种限流算法,如漏桶、令牌桶等
正文
令牌桶算法和漏桶算法都是比较简单的流量控制算法,它们的实现成本比较低。但是,这两种算法都存在一些缺点。令牌桶算法可能会导致请求的延迟,因为请求可能需要等待令牌。漏桶算法可能会导致请求的丢失,因为桶中可能会没有空间容纳请求 . 这两个算法基本都能用Google Guava 的 RateLimiter 实现
指标 | 令牌桶算法 | 漏桶算法 |
---|---|---|
实现成本 | 低 | 低 |
请求延迟 | 可能存在 | 可能存在 |
请求丢失 | 不存在 | 可能存在 |
适用场景 | 对请求延迟要求比较高 | 对请求丢失要求比较高 |
先看一下RateLimiter的api吧
使用也很简单,利用AtomicInteger实现
public class CounterRateLimiter {
//初始化计数器
AtomicInteger atomicInteger = new AtomicInteger(0);
//计时时间
long time;
//实现每秒给5个通行证
@Test
public void demo() throws InterruptedException {
for (int i = 0; i < 100; i++) {
new Thread(() -> {
//如果时间为0,则设置时间为当前时间,也就是初始化请求时间
if (time == 0) {
time = System.currentTimeMillis();
}
//如果超过一秒后就重置通行证和时间
if (System.currentTimeMillis() - time > 1000) {
System.out.println("更新了::" + atomicInteger.get());
atomicInteger.set(0);
time = 0;
}
//每次请求+1
int i1 = atomicInteger.incrementAndGet();
//达到五个通行证后其他的请求就拒绝
if (i1 > 5) {
//这里可以扩展一个功能,原本是只允许一秒内消费5个通行证(固定每秒可以通行的数量),
// 或者是每秒内同时允许消费通行证的只能有5个,
// 相当最开始5个消费完后,之后每一个请求结束了有空余令牌,就可以有新的请求补上,反正就5个通行证,可以重复用
atomicInteger.decrementAndGet();
atomicInteger.decrementAndGet();
System.out.println("失败");
} else {
System.err.println("成功");
}
}).start();
}
}
}
利用RateLimiter实现
//令牌桶
@Test
void demo3() {
//按照固定速率每秒创建4个令牌
//RateLimiter rateLimiter = RateLimiter.create(3);
//按照固定速率每秒创建4个令牌,预热时间为3秒
// 这个预热是什么意思呢,如果不设置预热时间参数,那么,当前每秒产生4个令牌,请求消费2个令牌,那么就是每秒可以有两个请求,
// 而预热呢,顾名思义,就是先热身,不要一下就来的这么猛,前三秒由慢到快再到正常
RateLimiter rateLimiter = RateLimiter.create(4,3,TimeUnit.SECONDS);
// 模拟请求
for (int i = 0; i < 10; i++) {
// 固定速率请求获取令牌,没获取到会被阻塞
rateLimiter.acquire(2);
// 处理请求
System.out.println("处理请求:" + LocalDateTime.now());
}
}
//漏桶
public static void main(String[] args) {
//新建一个每秒生产10个令牌桶
RateLimiter rateLimiter = RateLimiter.create(10);
ExecutorService executor = Executors.newFixedThreadPool(100);
for (int i = 0; i < 10; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
//每个请求消费2令牌,最多等待2秒,也就是如果令牌桶中没有足够的许可,
//则方法会等待一段时间,直到超过最大等待时间。如果在等待时间内仍无法获取足够的许可,则方法返回 false ,表示许可获取失败
//需要注意, tryAcquire() 方法是非阻塞的,会立即返回,即使无法获取许可,也不会阻塞当前线程
if (rateLimiter.tryAcquire(3, 2, TimeUnit.SECONDS)) {
// if (rateLimiter.tryAcquire(3, 2, TimeUnit.SECONDS)) {
System.out.println(Thread.currentThread().getName() + " " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
} else {
System.err.println("超时");
}
}
});
}
executor.shutdown();
}
正式使用,以切面注解方式使用,这样比较方便
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Limiter {
/**
* 不进行限流
*/
int NOT_LIMITED = 0;
/**
* qps (每秒并发量)
*/
double qps() default NOT_LIMITED;
/**
* 超时时长,默认不等待
*/
int timeout() default 0;
/**
* 超时时间单位,默认毫秒
*/
TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
/**
* 返回错误信息
*/
String msg() default "系统忙,请稍后再试";
}
@Slf4j
@Aspect
@Component
public class RateLimiterAspect {
/**
* key: 类全路径+方法名
*/
private static final ConcurrentMap<String, RateLimiter> RATE_LIMITER_CACHE = new ConcurrentHashMap<>();
@Around("@within(limiter) || @annotation(limiter)")
public Object pointcut(ProceedingJoinPoint point, Limiter limiter) throws Throwable {
MethodSignature signature = (MethodSignature) point.getSignature();
Method method = signature.getMethod();
//默认不限流,虽然加了限流注解,但需要判断一下是否设置了qps,如果没有设置,则不限流,如果设置了qps,则进行限流操作
if (limiter != null && limiter.qps() > Limiter.NOT_LIMITED) {
double qps = limiter.qps();
//这个key可以根据具体需求配置,例如根据ip限制,或用户
String key = method.getDeclaringClass().getName() + method.getName();
if (RATE_LIMITER_CACHE.get(key) == null) {
// 初始化 QPS(也就是设置每秒允许多少请求)
RATE_LIMITER_CACHE.put(key, RateLimiter.create(qps));
}
//acquire()方法是阻塞的,如果获取不到令牌,那么会一直阻塞,直到获取到令牌为止,这不会抛弃请求,只会阻塞(二选一)
//RATE_LIMITER_CACHE.get(key).acquire()
// 尝试获取令牌(二选一)
if (RATE_LIMITER_CACHE.get(key) != null && !RATE_LIMITER_CACHE.get(key).tryAcquire(limiter.timeout(), limiter.timeUnit())) {
log.error("触发限流操作{}", key);
throw new RuntimeException(limiter.msg());
}
}
return point.proceed();
}
}
@GetMapping
@Limiter(qps = 1, msg = "您已被限流!",timeout = 2)
public String getUserName() {
log.info("成功消费");
return "success";
}
而滑动窗口,结合Redis+lua使用较为方便,固定窗口则更加简单,就贴个
public class SlidingWindowRateLimiter {
private Jedis jedis;
private String key;
private int limit;
public SlidingWindowRateLimiter(Jedis jedis, String key, int limit) {
this.jedis = jedis;
this.key = key;
this.limit = limit;
}
public boolean allowRequest(String key) {
// 当前时间戳
long currentTime = System.currentTimeMillis();
// 使用Lua脚本来确保原子性操作
String luaScript = "local window_start = ARGV[1] - 60000\n" +
"redis.call('ZREMRANGEBYSCORE', KEYS[1], '-inf', window_start)\n" +
"local current_requests = redis.call('ZCARD', KEYS[1])\n" +
"if current_requests < tonumber(ARGV[2]) then\n" +
" redis.call('ZADD', KEYS[1], ARGV[1], ARGV[1])\n" +
" return 1\n" +
"else\n" +
" return 0\n" +
"end";
Object result = jedis.eval(luaScript, 1, key, String.valueOf(currentTime), String.valueOf(limit));
return (Long) result == 1;
}
}