红茶的个人站点

  • 首页
  • 专栏
  • 开发工具
  • 其它
  • 隐私政策
Awalon
Talk is cheap,show me the code.
  1. 首页
  2. 未分类
  3. 正文

Redis 学习笔记 5:分布式锁

2025年5月5日 56点热度 0人点赞 0条评论

在前文中学习了如何基于 Redis 创建一个简单的分布式锁。虽然在大多数情况下这个锁已经可以满足需要,但其依然存在以下缺陷:

image-20250504110349891

事实上一般而言,我们可以直接使用 Redisson 提供的分布式锁而非自己创建。

Redisson

添加 Redisson 依赖:

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.13.6</version>
</dependency>

创建 Redisson 客户端实例:

@Configuration
public class RedisConfig {
    @Autowired
    RedisProperties redisProperties;
    @Bean
    public RedissonClient redissonClient(){
        Config config = new Config();
        String address = String.format("redis://%s:%s", redisProperties.getHost(), redisProperties.getPort());
        config.useSingleServer()
                .setAddress(address)
                .setPassword(redisProperties.getPassword());
        return Redisson.create(config);
    }
}

这里的RedisProperties是 Spring-data-redis 自带的一个配置类,可以借助其直接从配置文件中读取 redis 相关配置信息。

也可以通过修改配置文件的方式配置 Redisson 客户端,但缺点是会变更 spring-data-redis 对 Redis 客户端的默认配置,所以不建议那样做。

使用 Redisson 提供的分布式锁限制优惠券抢购:

// ...
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
    // ...
    @Autowired
    private RedissonClient redissonClient;
​
    // ...
    @Override
    public Result createOrder(Long voucherId) {
        // ...
        // 使用用户标识进行加锁
        String lockName = "lock:voucher-order:" + userId.toString();
        RLock lock = redissonClient.getLock(lockName);
        boolean isLock = lock.tryLock();
        if (!isLock) {
            return Result.fail("同一用户不能重复抢购");
        }
        try {
            return proxy.doCreateOrder(voucherId);
        } finally {
            lock.unlock();
        }
    }
    // ...
}

Redisson 提供的分布式锁RLock有多种加锁方式,这里展示的tryLock()是非阻塞式的加锁,如果获取锁失败,会立即返回。如果需要阻塞式获取锁(获取锁失败时等待并尝试获取),可以:

isLock = lock.tryLock(1,10, TimeUnit.SECONDS);

这里第一个参数是等待时长,第二个参数是 Redis 锁的过期时长。

可重入锁原理

Redisson 提供的分布式锁是可重入的,其原理和 JDK 提供的用于处理并发的可重入锁ReentrantLock是类似的。即在锁内部使用一个计数器,当一个线程多次获取同一个锁时,将计数器自增以记录已经重复获取锁的次数。在释放锁的时候将计数器减1,当计数器为0时才真正释放锁。

下面通过改造前文的 Redis 分布式锁,让其支持再入以说明 Redisson 锁可再入的实现原理。

在改造 Redis 锁前,先编写一个测试用例来证明目前的 Redis 锁不支持重入:

// ...
@Log4j2
@SpringBootTest
public class RedisLockTests {
    @Autowired
    private StringRedisTemplate redisTemplate;
​
    /**
     * 测试 Redis 锁是否可以再入
     */
    @Test
    public void test() {
        SimpleRedisLock lock = new SimpleRedisLock(redisTemplate, "test");
        boolean tryLock = lock.tryLock(2000);
        if (!tryLock) {
            log.error("test获取锁失败");
            return;
        }
        try {
            log.info("test获取锁成功");
            log.info("test执行业务代码");
            test2(lock);
        } finally {
            lock.unlock();
            log.info("test释放锁");
        }
    }
​
    private void test2(SimpleRedisLock lock) {
        boolean tryLock = lock.tryLock(2000);
        if (!tryLock) {
            log.error("test2获取锁失败");
            return;
        }
        try {
            log.info("test2获取锁成功");
            log.info("test2执行业务代码");
        } finally {
            lock.unlock();
            log.info("test2释放锁");
        }
    }
}

因为像前面说的,重入锁需要有一个计数器,同时还需要持有一个线程 ID 以检查是否当前线程持有的锁,因此不能再使用 Redis 中的 key-value 结构作为锁,改为使用 Hash 来同时保存这两个信息:

image-20250504141645445

这里将线程 ID 直接作为字典的 key 以节省存储空间。

现在的问题就是在获取锁和释放锁的部分加入计数器维护的逻辑。但就像在前文引入 Lua 脚本时讨论的那样,显然这些操作不能通过 Java 实现,因为那样做不能保证操作的原子性,因此需要用 Lua 脚本来实现:

--[[
    @描述: Redis 锁获取脚本(支持再入)
    @版本: 1.0.0
]] --    
local key = KEYS[1] -- Redis 锁的对应的 key
local threadId = ARGV[1] -- 持有锁的线程标识
local timeoutSec = ARGV[2] -- 锁的自动过期时长(单位秒)
-- 检查锁是否已经存在
local exists = redis.call('exists', key)
if (exists == 0) then
    -- 如果锁不存在,添加(正常获取到锁)
    redis.call('hset', key, threadId, 1)
    -- 更新锁的过期时间
    redis.call('expire', key, timeoutSec)
    return 1
end
-- 如果锁存在,检查是否当前线程的锁
if (redis.call('HEXISTS', key, threadId) == 0) then
    -- 如果不是当前线程的锁,返回错误信息(互斥,没有获取到锁)
    return 0
end
-- 是当前线程的锁(再入)
-- 计数器+1
redis.call('HINCRBY', key, threadId, 1)
-- 更新过期时长
redis.call('expire', key, timeoutSec)
return 1
--[[
    @描述:Redis 锁释放(支持再入)
]] --
local key = KEYS[1] -- Redis 锁的对应的 key
local threadId = ARGV[1] -- 持有锁的线程标识
local timeoutSec = ARGV[2] -- 锁的自动过期时长(单位秒)
-- 检查锁是否已经存在
if (redis.call('exists', key) == 0) then
    -- 锁不存在,返回错误信息
    return 0
end
-- 锁存在,检查是否当前线程持有的锁
if (redis.call('HEXISTS', key, threadId) == 0) then
    -- 不是当前线程持有的锁,返回错误信息
    return 0
end
-- 是当前线程持有的锁,计数器-1
redis.call('HINCRBY', key, threadId, -1)
-- 如果计数器小于等于0,删除锁
if (tonumber(redis.call('HGET', key, threadId)) <= 0) then
    redis.call('del', key)
    return 1
end
-- 如果计数器还未归0,更新锁的有效时长
redis.call('expire', key, timeoutSec)
return 1

需要注意的是,与之前不同的是,再次获取锁和释放锁的时候都需要更新锁的有效时长,以确保之后的业务能在锁生效期内正常执行完毕。

修改锁的实现类,用 Lua 脚本获取和释放锁:

// ...
public class SimpleRedisLock implements ILock {
    // ...
    // Redis 锁获取脚本
    private static final DefaultRedisScript<Long> LOCK_SCRIPT;
​
    static {
        LOCK_SCRIPT = new DefaultRedisScript<>();
        // 指定脚本的位置
        LOCK_SCRIPT.setLocation(new ClassPathResource("reentrant-lock.lua"));
        // 指定脚本的返回值类型
        LOCK_SCRIPT.setResultType(Long.class);
    }
​
    // Redis 锁释放脚本
    private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
​
    static {
        UNLOCK_SCRIPT = new DefaultRedisScript<>();
        // 指定脚本的位置
        UNLOCK_SCRIPT.setLocation(new ClassPathResource("reentrant-unlock.lua"));
        // 指定脚本的返回值类型
        UNLOCK_SCRIPT.setResultType(Long.class);
    }
​
    // ...
​
    @Override
    public boolean tryLock(long timeoutSec) {
        final String jvmThreadId = getJvmThreadId();
        Long res = stringRedisTemplate.execute(LOCK_SCRIPT,
                Collections.singletonList(redisKey),
                jvmThreadId,
                Long.toString(timeoutSec));
        return res != null && res > 0;
    }
    // ...
​
    @Override
    public void unlock(long timeoutSec) {
        // 使用 lua 脚本删除 Redis 锁
        stringRedisTemplate.execute(UNLOCK_SCRIPT,
                Collections.singletonList(redisKey),
                getJvmThreadId(),
                Long.toString(timeoutSec));
    }
​
    @Override
    public void unlock() {
        unlock(200);
    }
}

查看 Redisson 源码可以发现,其实现方式和上文描述的是类似的。

重试机制

Redisson 提供的分布式锁具备获取锁失败后进行重试的机制,且这种机制是基于 Redis 订阅和信号量的方式实现的,会有效避免 CPU 计算资源的浪费。此外,调用 API 时如果指定锁的过期时长为 -1,Redisson 会将锁的在 Redis 中的有效时长设置一个默认值(30秒),并启动一个守护进程(WatchDog)来定期重新刷新其有效时长,以保证该锁的长期有效。在释放锁的时候,该守护进程会被终止。

整个过程可以用下图表示:

image-20250504162053935

这里的ttl指获取锁的 Lua 脚本的返回值,如果锁成功获取,会返回 null,获取锁失败,会返回锁的剩余有效时长。

详细的源码分析和说明可以参考这个视频。

联锁

如果 Redisson 实现的锁是基于单个 Redis 的,那么是没有问题的,反之,如果是主从同步的集群,之前所使用的锁就会存在问题:

image-20250504172734538

如果从主节点获取锁成功,但还未将锁同步到其他从节点时主节点宕机,锁就会“丢失”。

这个问题可以用联锁来解决:

image-20250504172915751

即不使用主从,而是使用多台独立的 Redis 获取锁,只有从所有 Redis 获取锁成功,才算是成功,否则视为获取锁失败。在这种情况下,任意 Redis 宕机都不会导致锁失效。

为了演示,额外启动两个 Redis 实例:

docker run --name my-redis -p 6380:6379 -d redis
docker run --name my-redis2 -p 6381:6379 -d redis

关于如何用 Docker 部署 Redis 可以参考这里。

在配置为文件中添加两个 Redis 服务的配置信息:

my-config:
  redis1:
    host: 192.168.0.88
    port: 6380
  redis2:
    host: 192.168.0.88
    port: 6381

创建配置类读取该信息:

@Configuration
@ConfigurationProperties(prefix = "my-config")
@Data
public class MyConfigProperties {
    @Data
    public static class RedisConfig{
        private String host;
        private String port;
    }
    private RedisConfig redis1;
    private RedisConfig redis2;
}

创建对应的 Redisson 客户端:

@Configuration
public class RedisConfig {
    // ...
    @Autowired
    MyConfigProperties myConfig;
    
    // ...
​
    @Bean
    public RedissonClient redissonClient2(){
        Config config = new Config();
        String address = String.format("redis://%s:%s",
                myConfig.getRedis1().getHost(),
                myConfig.getRedis1().getPort());
        config.useSingleServer()
                .setAddress(address)
                .setPassword(redisProperties.getPassword());
        return Redisson.create(config);
    }
​
    @Bean
    public RedissonClient redissonClient3(){
        Config config = new Config();
        String address = String.format("redis://%s:%s",
                myConfig.getRedis2().getHost(),
                myConfig.getRedis2().getPort());
        config.useSingleServer()
                .setAddress(address)
                .setPassword(redisProperties.getPassword());
        return Redisson.create(config);
    }
}

修改测试用例,使用联锁:

// ...
@Log4j2
@SpringBootTest
public class RedisLockTests {
    // ...
    @Resource
    private RedissonClient redissonClient;
    @Resource
    private RedissonClient redissonClient2;
    @Resource
    private RedissonClient redissonClient3;
​
    /**
     * 测试 Redis 锁是否可以再入
     */
    @Test
    public void test() throws InterruptedException {
        // 创建 Redisson 联锁
        String lockName = "lock:test";
        RLock lock1 = redissonClient.getLock(lockName);
        RLock lock2 = redissonClient2.getLock(lockName);
        RLock lock3 = redissonClient3.getLock(lockName);
        RLock lock = redissonClient.getMultiLock(lock1, lock2, lock3);
        boolean tryLock = lock.tryLock(10, TimeUnit.SECONDS);
        // ...
    }
​
    private void test2(RLock lock) throws InterruptedException {
        // ...
    }
}

和单个锁类似,联锁同样可以指定等待时间以进行重试。

关于 Redisson 联锁的源码分析可以看这里。

本文的所有示例代码可以从这里获取。

The End.

参考资料

  • 黑马程序员Redis入门到实战教程

本作品采用 知识共享署名 4.0 国际许可协议 进行许可
标签: redis 分布式锁
最后更新:2025年5月5日

魔芋红茶

加一点PHP,加一点Go,加一点Python......

点赞
< 上一篇
下一篇 >

文章评论

取消回复

*

code

COPYRIGHT © 2021 icexmoon.cn. ALL RIGHTS RESERVED.
本网站由提供CDN加速/云存储服务

Theme Kratos Made By Seaton Jiang

宁ICP备2021001508号

宁公网安备64040202000141号