在中学习了如何基于 Redis 创建一个简单的分布式锁。虽然在大多数情况下这个锁已经可以满足需要,但其依然存在以下缺陷:
事实上一般而言,我们可以直接使用 Redisson 提供的分布式锁而非自己创建。
Redisson
添加 Redisson 依赖:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
创建 Redisson 客户端实例:
public class RedisConfig {
RedisProperties redisProperties;
public RedissonClient redissonClient(){
Config config = new Config();
String address = String.format("redis://%s:%s", redisProperties.getHost(), redisProperties.getPort());
config.useSingleServer()
.setAddress(address)
.setPassword(redisProperties.getPassword());
return Redisson.create(config);
}
}
这里的RedisProperties
是 Spring-data-redis 自带的一个配置类,可以借助其直接从配置文件中读取 redis 相关配置信息。
也可以通过修改配置文件的方式配置 Redisson 客户端,但缺点是会变更 spring-data-redis 对 Redis 客户端的默认配置,所以不建议那样做。
使用 Redisson 提供的分布式锁限制优惠券抢购:
// ...
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
// ...
private RedissonClient redissonClient;
// ...
public Result createOrder(Long voucherId) {
// ...
// 使用用户标识进行加锁
String lockName = "lock:voucher-order:" + userId.toString();
RLock lock = redissonClient.getLock(lockName);
boolean isLock = lock.tryLock();
if (!isLock) {
return Result.fail("同一用户不能重复抢购");
}
try {
return proxy.doCreateOrder(voucherId);
} finally {
lock.unlock();
}
}
// ...
}
Redisson 提供的分布式锁RLock
有多种加锁方式,这里展示的tryLock()
是非阻塞式的加锁,如果获取锁失败,会立即返回。如果需要阻塞式获取锁(获取锁失败时等待并尝试获取),可以:
isLock = lock.tryLock(1,10, TimeUnit.SECONDS);
这里第一个参数是等待时长,第二个参数是 Redis 锁的过期时长。
可重入锁原理
Redisson 提供的分布式锁是可重入的,其原理和 JDK 提供的用于处理并发的可重入锁ReentrantLock
是类似的。即在锁内部使用一个计数器,当一个线程多次获取同一个锁时,将计数器自增以记录已经重复获取锁的次数。在释放锁的时候将计数器减1,当计数器为0时才真正释放锁。
下面通过改造的 Redis 分布式锁,让其支持再入以说明 Redisson 锁可再入的实现原理。
在改造 Redis 锁前,先编写一个测试用例来证明目前的 Redis 锁不支持重入:
// ...
public class RedisLockTests {
private StringRedisTemplate redisTemplate;
/**
* 测试 Redis 锁是否可以再入
*/
public void test() {
SimpleRedisLock lock = new SimpleRedisLock(redisTemplate, "test");
boolean tryLock = lock.tryLock(2000);
if (!tryLock) {
log.error("test获取锁失败");
return;
}
try {
log.info("test获取锁成功");
log.info("test执行业务代码");
test2(lock);
} finally {
lock.unlock();
log.info("test释放锁");
}
}
private void test2(SimpleRedisLock lock) {
boolean tryLock = lock.tryLock(2000);
if (!tryLock) {
log.error("test2获取锁失败");
return;
}
try {
log.info("test2获取锁成功");
log.info("test2执行业务代码");
} finally {
lock.unlock();
log.info("test2释放锁");
}
}
}
因为像前面说的,重入锁需要有一个计数器,同时还需要持有一个线程 ID 以检查是否当前线程持有的锁,因此不能再使用 Redis 中的 key-value 结构作为锁,改为使用 Hash 来同时保存这两个信息:
这里将线程 ID 直接作为字典的 key 以节省存储空间。
现在的问题就是在获取锁和释放锁的部分加入计数器维护的逻辑。但就像在引入 Lua 脚本时讨论的那样,显然这些操作不能通过 Java 实现,因为那样做不能保证操作的原子性,因此需要用 Lua 脚本来实现:
--[[
@描述: Redis 锁获取脚本(支持再入)
@版本: 1.0.0
]] --
local key = KEYS[1] -- Redis 锁的对应的 key
local threadId = ARGV[1] -- 持有锁的线程标识
local timeoutSec = ARGV[2] -- 锁的自动过期时长(单位秒)
-- 检查锁是否已经存在
local exists = redis.call('exists', key)
if (exists == 0) then
-- 如果锁不存在,添加(正常获取到锁)
redis.call('hset', key, threadId, 1)
-- 更新锁的过期时间
redis.call('expire', key, timeoutSec)
return 1
end
-- 如果锁存在,检查是否当前线程的锁
if (redis.call('HEXISTS', key, threadId) == 0) then
-- 如果不是当前线程的锁,返回错误信息(互斥,没有获取到锁)
return 0
end
-- 是当前线程的锁(再入)
-- 计数器+1
redis.call('HINCRBY', key, threadId, 1)
-- 更新过期时长
redis.call('expire', key, timeoutSec)
return 1
--[[
描述:Redis 锁释放(支持再入)
]] --
local key = KEYS[1] -- Redis 锁的对应的 key
local threadId = ARGV[1] -- 持有锁的线程标识
local timeoutSec = ARGV[2] -- 锁的自动过期时长(单位秒)
-- 检查锁是否已经存在
if (redis.call('exists', key) == 0) then
-- 锁不存在,返回错误信息
return 0
end
-- 锁存在,检查是否当前线程持有的锁
if (redis.call('HEXISTS', key, threadId) == 0) then
-- 不是当前线程持有的锁,返回错误信息
return 0
end
-- 是当前线程持有的锁,计数器-1
redis.call('HINCRBY', key, threadId, -1)
-- 如果计数器小于等于0,删除锁
if (tonumber(redis.call('HGET', key, threadId)) <= 0) then
redis.call('del', key)
return 1
end
-- 如果计数器还未归0,更新锁的有效时长
redis.call('expire', key, timeoutSec)
return 1
需要注意的是,与之前不同的是,再次获取锁和释放锁的时候都需要更新锁的有效时长,以确保之后的业务能在锁生效期内正常执行完毕。
修改锁的实现类,用 Lua 脚本获取和释放锁:
// ...
public class SimpleRedisLock implements ILock {
// ...
// Redis 锁获取脚本
private static final DefaultRedisScript<Long> LOCK_SCRIPT;
static {
LOCK_SCRIPT = new DefaultRedisScript<>();
// 指定脚本的位置
LOCK_SCRIPT.setLocation(new ClassPathResource("reentrant-lock.lua"));
// 指定脚本的返回值类型
LOCK_SCRIPT.setResultType(Long.class);
}
// Redis 锁释放脚本
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
// 指定脚本的位置
UNLOCK_SCRIPT.setLocation(new ClassPathResource("reentrant-unlock.lua"));
// 指定脚本的返回值类型
UNLOCK_SCRIPT.setResultType(Long.class);
}
// ...
public boolean tryLock(long timeoutSec) {
final String jvmThreadId = getJvmThreadId();
Long res = stringRedisTemplate.execute(LOCK_SCRIPT,
Collections.singletonList(redisKey),
jvmThreadId,
Long.toString(timeoutSec));
return res != null && res > 0;
}
// ...
public void unlock(long timeoutSec) {
// 使用 lua 脚本删除 Redis 锁
stringRedisTemplate.execute(UNLOCK_SCRIPT,
Collections.singletonList(redisKey),
getJvmThreadId(),
Long.toString(timeoutSec));
}
public void unlock() {
unlock(200);
}
}
查看 Redisson 源码可以发现,其实现方式和上文描述的是类似的。
重试机制
Redisson 提供的分布式锁具备获取锁失败后进行重试的机制,且这种机制是基于 Redis 订阅和信号量的方式实现的,会有效避免 CPU 计算资源的浪费。此外,调用 API 时如果指定锁的过期时长为 -1,Redisson 会将锁的在 Redis 中的有效时长设置一个默认值(30秒),并启动一个守护进程(WatchDog)来定期重新刷新其有效时长,以保证该锁的长期有效。在释放锁的时候,该守护进程会被终止。
整个过程可以用下图表示:
这里的
ttl
指获取锁的 Lua 脚本的返回值,如果锁成功获取,会返回 null,获取锁失败,会返回锁的剩余有效时长。
详细的源码分析和说明可以参考这个。
联锁
如果 Redisson 实现的锁是基于单个 Redis 的,那么是没有问题的,反之,如果是主从同步的集群,之前所使用的锁就会存在问题:
如果从主节点获取锁成功,但还未将锁同步到其他从节点时主节点宕机,锁就会“丢失”。
这个问题可以用联锁来解决:
即不使用主从,而是使用多台独立的 Redis 获取锁,只有从所有 Redis 获取锁成功,才算是成功,否则视为获取锁失败。在这种情况下,任意 Redis 宕机都不会导致锁失效。
为了演示,额外启动两个 Redis 实例:
docker run --name my-redis -p 6380:6379 -d redis
docker run --name my-redis2 -p 6381:6379 -d redis
关于如何用 Docker 部署 Redis 可以参考。
在配置为文件中添加两个 Redis 服务的配置信息:
my-config
redis1
host192.168.0.88
port6380
redis2
host192.168.0.88
port6381
创建配置类读取该信息:
prefix = "my-config")
(
public class MyConfigProperties {
public static class RedisConfig{
private String host;
private String port;
}
private RedisConfig redis1;
private RedisConfig redis2;
}
创建对应的 Redisson 客户端:
public class RedisConfig {
// ...
MyConfigProperties myConfig;
// ...
public RedissonClient redissonClient2(){
Config config = new Config();
String address = String.format("redis://%s:%s",
myConfig.getRedis1().getHost(),
myConfig.getRedis1().getPort());
config.useSingleServer()
.setAddress(address)
.setPassword(redisProperties.getPassword());
return Redisson.create(config);
}
public RedissonClient redissonClient3(){
Config config = new Config();
String address = String.format("redis://%s:%s",
myConfig.getRedis2().getHost(),
myConfig.getRedis2().getPort());
config.useSingleServer()
.setAddress(address)
.setPassword(redisProperties.getPassword());
return Redisson.create(config);
}
}
修改测试用例,使用联锁:
// ...
public class RedisLockTests {
// ...
private RedissonClient redissonClient;
private RedissonClient redissonClient2;
private RedissonClient redissonClient3;
/**
* 测试 Redis 锁是否可以再入
*/
public void test() throws InterruptedException {
// 创建 Redisson 联锁
String lockName = "lock:test";
RLock lock1 = redissonClient.getLock(lockName);
RLock lock2 = redissonClient2.getLock(lockName);
RLock lock3 = redissonClient3.getLock(lockName);
RLock lock = redissonClient.getMultiLock(lock1, lock2, lock3);
boolean tryLock = lock.tryLock(10, TimeUnit.SECONDS);
// ...
}
private void test2(RLock lock) throws InterruptedException {
// ...
}
}
和单个锁类似,联锁同样可以指定等待时间以进行重试。
关于 Redisson 联锁的源码分析可以看。
本文的所有示例代码可以从获取。
The End.
文章评论