安徽基层党组织建设网站/阿里巴巴指数查询
前面已经写了一篇Redisson的分布式限流的使用,Redisson分布式限流的简单实践,对其中的原理很好奇。
一、使用
// 1、 声明一个限流器
RRateLimiter rateLimiter = redissonClient.getRateLimiter(key);// 2、 设置速率,5秒中产生3个令牌
rateLimiter.trySetRate(RateType.OVERALL, 3, 5, RateIntervalUnit.SECONDS);// 3、试图获取一个令牌,获取到返回true
rateLimiter.tryAcquire(1)
二、原理
getRateLimiter
// 声明一个限流器 名称 叫key
redissonClient.getRateLimiter(key)
RedissonRateLimiter#trySetRate
。5秒中产生3个令牌。rateInterval
指的是时间间隔,rate
指的是指定时间间隔产生的令牌数。
@Override
public boolean trySetRate(RateType type, long rate, long rateInterval, RateIntervalUnit unit) {return get(trySetRateAsync(type, rate, rateInterval, unit));
}@Override
public RFuture<Boolean> trySetRateAsync(RateType type, long rate, long rateInterval, RateIntervalUnit unit) {return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"redis.call('hsetnx', KEYS[1], 'rate', ARGV[1]);"+ "redis.call('hsetnx', KEYS[1], 'interval', ARGV[2]);"+ "return redis.call('hsetnx', KEYS[1], 'type', ARGV[3]);",Collections.singletonList(getRawName()), rate, unit.toMillis(rateInterval), type.ordinal());
}
RedissonRateLimiter#tryAcquire()
。- key的数组是
Arrays.asList(getRawName(), getValueName(), getClientValueName(), getPermitsName(), getClientPermitsName())
。 - 如果name是
rate.limiter
,那么lua脚本中的valueName
是{rate.limiter}:value
;permitsName
是{rate.limiter}:permits
。key[3]
的结果是getClientValueName()
,{rate.limiter}:value:4802866e-25b3-4482-aa74-61aa947d6f7a
,需要拼接机器的唯一id。key[5]
的结果是{rate.limiter}:permits:4802866e-25b3-4482-aa74-61aa947d6f7a
。 tonumber(rate) >= tonumber(ARGV[1])
,表明rate要比请求的令牌数大。- 如果首次获取,设置
valueName
为rate,设置permitsName
的score为当前时间戳,设置值为随机数和获取的令牌数,更新valueName
,减去需要获取的令牌数。 - 第二次获取令牌执行,获取
0-(当前时间-生成令牌间隔interval)
时间内的数据。获取之前所有的请求数released
,如果released>0
,更新valueName
为当前值+释放令牌数。之前的请求令牌数 >0
, 例如10s产生3个令牌,现在超过10s了,重置周期并计算剩余令牌数。 - 如果当前可提供的令牌数小于获取的令牌数,获取最近一次的记录。返回当前key的剩余过期时间。上一次请求的时间戳 - (当前时间戳 - 令牌生成的时间间隔) ,这个值表示还需要多久才能生产出足够的令牌。
- 如果当前令牌数 ≥ 请求的令牌数,表示令牌够多,直接更新zset。
- key的数组是
@Overridepublic boolean tryAcquire(long permits) {return get(tryAcquireAsync(RedisCommands.EVAL_NULL_BOOLEAN, permits));}private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) {return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,"local rate = redis.call('hget', KEYS[1], 'rate');"+ "local interval = redis.call('hget', KEYS[1], 'interval');"+ "local type = redis.call('hget', KEYS[1], 'type');"+ "assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')"+ "local valueName = KEYS[2];"+ "local permitsName = KEYS[4];"+ "if type == '1' then "+ "valueName = KEYS[3];"+ "permitsName = KEYS[5];"+ "end;"+ "assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'); "+ "local currentValue = redis.call('get', valueName); "+ "if currentValue ~= false then "+ "local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "+ "local released = 0; "+ "for i, v in ipairs(expiredValues) do "+ "local random, permits = struct.unpack('fI', v);"+ "released = released + permits;"+ "end; "+ "if released > 0 then "+ "redis.call('zremrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "+ "currentValue = tonumber(currentValue) + released; "+ "redis.call('set', valueName, currentValue);"+ "end;"+ "if tonumber(currentValue) < tonumber(ARGV[1]) then "+ "local nearest = redis.call('zrangebyscore', permitsName, '(' .. (tonumber(ARGV[2]) - interval), '+inf', 'withscores', 'limit', 0, 1); "+ "return tonumber(nearest[2]) - (tonumber(ARGV[2]) - interval);"+ "else "+ "redis.call('zadd', permitsName, ARGV[2], struct.pack('fI', ARGV[3], ARGV[1])); "+ "redis.call('decrby', valueName, ARGV[1]); "+ "return nil; "+ "end; "+ "else "+ "redis.call('set', valueName, rate); "+ "redis.call('zadd', permitsName, ARGV[2], struct.pack('fI', ARGV[3], ARGV[1])); "+ "redis.call('decrby', valueName, ARGV[1]); "+ "return nil; "+ "end;",Arrays.asList(getRawName(), getValueName(), getClientValueName(), getPermitsName(), getClientPermitsName()),value, System.currentTimeMillis(), ThreadLocalRandom.current().nextLong());}
RedissonRateLimiter#getValueName
,生成valueName
。
String getValueName() {return suffixName(getRawName(), "value");} String getPermitsName() {return suffixName(getRawName(), "permits");}public static String suffixName(String name, String suffix) {if (name.contains("{")) {return name + ":" + suffix;}return "{" + name + "}:" + suffix;}
RedissonRateLimiter#tryAcquire(long, long, java.util.concurrent.TimeUnit)
,带时限的获取令牌。delay就是lua脚本返回的,还需要多久才会有令牌。如果获取令牌的时间比设置的超时时间还要大的话,直接就false了,否则会再次尝试获取令牌。
@Overridepublic boolean tryAcquire(long permits, long timeout, TimeUnit unit) {return get(tryAcquireAsync(permits, timeout, unit));}@Overridepublic RFuture<Boolean> tryAcquireAsync(long permits, long timeout, TimeUnit unit) {RPromise<Boolean> promise = new RedissonPromise<Boolean>();long timeoutInMillis = -1;if (timeout >= 0) {timeoutInMillis = unit.toMillis(timeout);}tryAcquireAsync(permits, promise, timeoutInMillis);return promise;}private void tryAcquireAsync(long permits, RPromise<Boolean> promise, long timeoutInMillis) {long s = System.currentTimeMillis();RFuture<Long> future = tryAcquireAsync(RedisCommands.EVAL_LONG, permits);future.onComplete((delay, e) -> {if (e != null) {promise.tryFailure(e);return;}if (delay == null) {promise.trySuccess(true);return;}if (timeoutInMillis == -1) {commandExecutor.getConnectionManager().getGroup().schedule(() -> {tryAcquireAsync(permits, promise, timeoutInMillis);}, delay, TimeUnit.MILLISECONDS);return;}long el = System.currentTimeMillis() - s;long remains = timeoutInMillis - el;if (remains <= 0) {promise.trySuccess(false);return;}if (remains < delay) {commandExecutor.getConnectionManager().getGroup().schedule(() -> {promise.trySuccess(false);}, remains, TimeUnit.MILLISECONDS);} else {long start = System.currentTimeMillis();commandExecutor.getConnectionManager().getGroup().schedule(() -> {long elapsed = System.currentTimeMillis() - start;if (remains <= elapsed) {promise.trySuccess(false);return;}tryAcquireAsync(permits, promise, remains - elapsed);}, delay, TimeUnit.MILLISECONDS);}});}