本文基于的黑马点评项目进行学习。
Redis 生成全局唯一ID
整个全局唯一 ID 的结构如下:
这里的时间戳是当前时间基于某一个基准时间(项目开始前的某个时间点)的时间戳。序列号是依赖 Redis 生成的对于某个业务唯一的自增量。
先计算一个基准时间的时间戳:
public static void main(String[] args) {
// 计算时间戳基准时间戳
long baseTimestamp = LocalDateTime.of(2024, 1, 1, 0, 0, 0).toEpochSecond(ZoneOffset.UTC);
System.out.println(baseTimestamp);
}
这里我选择的基准时间是 2024年1月1日0点0分0秒。
编写生成全局唯一 ID 的工具类:
public class GlobalIdGenerator {
private static final long BASE_TIMESTAMP = 1704067200L;
private StringRedisTemplate stringRedisTemplate;
/**
* 生成全局唯一ID
*
* @param key 业务代码
* @return 全局唯一ID
*/
public long genGlobalId(String key) {
// 计算时间戳差额
LocalDateTime now = LocalDateTime.now();
long timestamp = now.toEpochSecond(ZoneOffset.UTC) - BASE_TIMESTAMP;
// 从 Redis 获取对应的自增量
String dateStr = now.format(DateTimeFormatter.ofPattern("yyyyMMdd"));
String redisKey = "incr:" + key + ":" + dateStr;
long incrNum = stringRedisTemplate.opsForValue().increment(redisKey);
// 生成唯一id
return timestamp << 32 | incrNum;
}
}
这里的BASE_TIMESTAMP
值取之前设置的基准时间的时间戳。生成 ID 的算法是将时间戳差值(timestamp)左移 32 位后再与序列号(incrNum)按位或。
编写测试用例:
public class GlobalIdGeneratorTests {
private GlobalIdGenerator globalIdGenerator;
public void test() throws InterruptedException {
final int THREAD_NUM = 30;
CountDownLatch countDownLatch = new CountDownLatch(THREAD_NUM);
ExecutorService es = Executors.newFixedThreadPool(THREAD_NUM);
Runnable task = () -> {
for (int i = 0; i < 1000; i++) {
long orderID = globalIdGenerator.genGlobalId("order");
System.out.println(orderID);
}
countDownLatch.countDown();
};
LocalDateTime begin = LocalDateTime.now();
for (int i = 0; i < THREAD_NUM; i++) {
es.execute(task);
}
countDownLatch.await();
LocalDateTime end = LocalDateTime.now();
long millis = Duration.between(begin, end).toMillis();
System.out.println("共耗时" + millis + "毫秒");
}
}
这里用 30 个线程生成 ID,每个线程执行 1000 次生成过程,共生成 30000 次 ID。
添加优惠券
通过接口添加秒杀优惠券,调用示例可以参考。
优惠券下单
package com.hmdp.service.impl;
//...
/**
* <p>
* 服务实现类
* </p>
*
* @author 虎哥
* @since 2021-12-22
*/
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
ISeckillVoucherService seckillVoucherService;
GlobalIdGenerator globalIdGenerator;
/**
* 创建优惠券订单
*
* @param voucherId
* @return
*/
public Result createOrder(Long voucherId) {
// 获取秒杀优惠券信息
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 判断当前时间是否在优惠券有效期内
LocalDateTime now = LocalDateTime.now();
if (now.isBefore(voucher.getBeginTime())) {
return Result.fail("优惠券抢购未开始");
}
if (now.isAfter(voucher.getEndTime())) {
return Result.fail("优惠券抢购已结束");
}
// 判断优惠券库存是否够
if (voucher.getStock() <= 0) {
return Result.fail("缺少库存");
}
// 扣减库存
boolean res = seckillVoucherService.update().setSql("stock=stock-1").eq("voucher_id", voucherId).update();
if (!res){
return Result.fail("缺少库存");
}
// 生成秒杀优惠券订单
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setId(globalIdGenerator.genGlobalId("voucher-order"));
voucherOrder.setVoucherId(voucherId);
voucherOrder.setUserId(UserHolder.getUser().getId());
this.save(voucherOrder);
return Result.ok(voucherOrder.getId());
}
}
优惠券秒杀
超卖问题
上面的代码在多个线程并发抢购的情况下,会出现超卖问题。
我们可以用 JMeter 来模拟多线程抢购的情况,测试脚本可以从下面的链接下载:
通过网盘分享的文件:秒杀抢购.jmx 链接: https://pan.baidu.com/s/1BmKKhN1FCLJH_qR7hXB0Uw?pwd=y74i 提取码: y74i
需要注意的是,在导入测试脚本后,需要在登录状态头中填入当前登录用户的token
以模拟用户登录:
可以修改 Redis 中的 token 过期时间以保持长时间有效登录,方便后续测试。
此外还需要将测试接口中的路径参数修改为你要模拟抢购的目标优惠券id:
为了方便观察,在抢购前将优惠券库存修改为100
:
并清空tb_voucher_order
表。
执行 JMeter 的秒杀测试脚本,可以观察到库存为负:
且tb_voucher_order
表创建了109个订单。
查看 JMeter 测试报告可以看到有 70% 左右的调用失败:
同步锁
以上问题是因为在多线程并发的情况下,优惠券的库存资源是多个线程的共享资源,事实上每个线程的读取和写入库存动作是分步完成的,所以可能在读取和写入动作之间,其它线程完成读取/写入动作,就会导致出现多线程的数据一致性问题。
同样的,像 Java 解决多线程问题那样,用互斥锁可以解决此类问题:
// ...
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
// ...
private final Object orderCreateLock = new Object[0];
"orderCreateLock")
(
public Result createOrder(Long voucherId) {
// ...
if (voucher.getStock() <= 1) {
return Result.fail("缺少库存");
}
// ...
}
}
@Synchronized
注解的用途可以看。判断库存这里必须是
<=1
而非<=0
,否则会超卖掉1个优惠券,原因可能与 MyBatis 缓存有关。
现在执行 JMeter 就不会出现超卖的情况了。
乐观锁
上面的互斥锁实际上是一种“悲观锁”,即认为并发情况下出现问题是必然且高频的,所以必须对访问共享数据的代码进行锁保护,以让这些代码串行执行。对应的,还有一种“乐观锁”,即认为这种情况是低频的,在某种程度上可以容许它们发生,只需要在关键的更新数据时进行检查,如果有其他线程已经对数据进行了修改,就放弃当前线程对数据的修改并返回错误,这样同样不会导致共享数据出错。
最简单的方式是在存在共享数据的表中添加一个version
字段,以用于乐观锁检查是否有其他线程修改数据。在这个示例中,我们需要给tb_seckill_voucher
表添加一个version
字段:
`version` bigint NOT NULL COMMENT '用于乐观锁的版本控制',
给实体类添加相应字段:
public class SeckillVoucher implements Serializable {
// ...
private Long version;
}
在扣减库存时检查 version 字段:
// ...
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
// ...
public Result createOrder(Long voucherId) {
// ...
// 扣减库存时检查 version
boolean res = seckillVoucherService.update().setSql("stock=stock-1,version=version+1")
.eq("voucher_id", voucherId)
.eq("version", voucher.getVersion())
.update();
if (!res) {
return Result.fail("缺少库存");
}
// ...
}
}
MybatisPlus 有一个
@version
注解可以自动处理乐观锁的veresion
字段,但不如自己控制灵活。
这里在正常更新库存后,version 字段会自增,所以一旦当前线程在更新时发现 version 字段匹配不到,就说明其它线程已经修改了库存。
对于当前这个示例,version 字段实际上并不是必须的,完全可以用库存字段代替,换言之,在更新库存时进行判断,如果库存与读取时一致,说明其它线程还没有更新库存,可以更新,反之则不可。
删除表和实体中的 version 字段后代码修改为:
// 扣减库存时检查 version
boolean res = seckillVoucherService.update().setSql("stock=stock-1")
.eq("voucher_id", voucherId)
.eq("stock", voucher.getStock())
.update();
此时执行测试脚本会发现实际上 200 个线程中有 80% 都会失败,查看优惠券库存也会发现只扣减掉 20 个库存。
这是因为相比悲观锁的串行执行,这里乐观锁下会有大量线程执行到提交 update 语句时才发现库存已经改变,继而放弃更新。但事实上,在这个示例中,只要库存大于0,即使库存已经被其它线程修改,也是可以提交更新的。也就是说在这里可以放宽乐观锁的更新条件:
// 扣减库存时检查
boolean res = seckillVoucherService.update().setSql("stock=stock-1")
.eq("voucher_id", voucherId)
.gt("stock", 0) // 只要库存大于0都可以提交更新
.update();
再次执行测试,就会发现200个线程有50%的错误率,库存也能正确扣减到0。
一人一单
假设我们需要给优惠券抢购添加限制,一人只能抢购一单:
// 检查用户是否已经抢购过该优惠券
Integer count = this.query()
.eq("user_id", UserHolder.getUser().getId())
.eq("voucher_id", voucherId)
.count();
if (count > 0) {
return Result.fail("已经抢购过优惠券,不能重复抢购");
}
// 扣减库存时检查
// ...
实际运行测试会发现依然会出现一个用户抢购多单的情况,其原因和之前并发情况下扣减库存是相同的——同一时间有多个线程“越过”了检查点并扣减了库存。
与扣减库存所不同的是,这里检查的是表数据中是否存在特定数据行,而非某个数据的修改,所以无法通过版本控制的方式使用乐观锁。但悲观锁依然是有效的:
// ...
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
// ...
public Result createOrder(Long voucherId) {
// ...
return doCreateOrder(voucherId);
}
public synchronized Result doCreateOrder(Long voucherId) {
// 检查用户是否已经抢购过该优惠券
Integer count = this.query()
.eq("user_id", UserHolder.getUser().getId())
.eq("voucher_id", voucherId)
.count();
if (count > 0) {
return Result.fail("已经抢购过优惠券,不能重复抢购");
}
// 扣减库存时检查
// ...
// 生成秒杀优惠券订单
// ...
}
}
为了方便起见,这里将需要加锁的部分(涉及优惠券订单表的操作)封装到单独方法doCreateOrder
中。且因为createOrder
方法中仅包含数据读取操作,所以将事务缩小到doCreateOrder
方法。
事实上上面的代码是有问题的,因为 Spring 的事务是通过 AOP 实现的,而 AOP 又是通过代理对象实现的。这导致 Spring 的默认代理存在一个自调用会导致 AOP 失效的问题。换言之,这里的createOrder
方法调用this.doCreateOrder(...)
不会使用事务,也就是常说的事务失效的情况之一。
可以通过开启相关日志的方式观察 JDBC 事务有没有在 SQL 执行时开启:
logging level org springframework jdbc datasource DataSourceTransactionManager DEBUG transaction interceptor TRACE
解决这个问题有多种方式,这里通过最简单的显式获取代理对象并调用的方式:
// ...
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
// ...
private ApplicationContext applicationContext;
// ...
public Result createOrder(Long voucherId) {
// 显式获取代理对象并调用
VoucherOrderServiceImpl proxy = applicationContext.getBean(VoucherOrderServiceImpl.class);
return proxy.doCreateOrder(voucherId);
}
public synchronized Result doCreateOrder(Long voucherId) {
// ...
}
}
目前代码依然存在问题,运行测试就会发现,有小概率会出现一个用户抢了同一个优惠券2次以上的情况出现。
仔细分析代码,在实际 Spring 框架执行时,代码大概是这样的:
代理类{ 用于执行 JDBC 事务的方法(){ 开启 JDBC 事务; 调用实际上的业务方法; 结束 JDBC 事务; } }
而我们的锁是加在实际上的业务代码(具体是这里的doCreateOrder
)方法上的,换言之,锁保护的临界区范围是小于 JDBC 事务范围的,这就导致这么一个问题——某个线程的事务还没有提交,订单表中的数据还没有生成,锁就已经释放,此时其它线程就可能获取锁并执行订单创建。
所以正确的方式是让锁完整包裹事务:
// ...
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
// ...
public Result createOrder(Long voucherId) {
// ...
synchronized (this){
return proxy.doCreateOrder(voucherId);
}
}
public Result doCreateOrder(Long voucherId) {
// ...
}
}
这样就 OK 了。
实际上这里并不需要对this
加锁,因为我们仅需要避免针对同一个用户的并发,不同用户是不需要考虑并发情况的。所以使用用户标识加锁可以有效改善并发性能:
Long userId = UserHolder.getUser().getId();
// 使用用户标识进行加锁
synchronized (userId.toString().intern()){
return proxy.doCreateOrder(voucherId);
}
注意,这里使用的是userId.toString().intern()
而非userId.toString()
,前者会获取同一个内容字符串的字符串常量池中的引用。
分布式锁
上面展示的仅是单体应用的情况下如何解决并发问题,如果是分布式应用(存在多个JVM),之前使用的锁就不再有效,因为这些锁仅在 JVM 内生效,可以保证其内部的线程“互斥执行”,但对于 JVM 之间,这些锁将不再有效。
此时我们需要使用 JVM 外的锁(即分布式锁)来保证不同 JVM 的线程也能够互斥执行。
常见的分布式锁有三种实现方式:
这里使用 Redis 实现分布式锁,需要特别注意的是,当特殊情况下线程在获取锁后,释放锁前意外终止时,Redis 锁并不会自动释放,因此需要利用 Redis 的过期时间来防止这种情况下导致的死锁。
原理
基于 Redis 的分布式锁实现实际是利用 setnx
命令实现的:
127.0.0.1:6379> setnx lock 123
(integer) 1
127.0.0.1:6379> setnx lock 123
(integer) 0
127.0.0.1:6379> keys lock
1) "lock"
释放锁即删除对应的 key:
127.0.0.1:6379> del lock
(integer) 1
127.0.0.1:6379> setnx lock 123
(integer) 1
127.0.0.1:6379> setnx lock 123
(integer) 0
正如之前说的,为了避免特殊情况下的死锁,需要在获取锁后指定一个有效时间:
127.0.0.1:6379> setnx lock 123
(integer) 1
127.0.0.1:6379> EXPIRE lock 10
(integer) 1
127.0.0.1:6379> ttl lock
(integer) 5
127.0.0.1:6379> ttl lock
(integer) 3
需要注意的是,获取锁和指定过期时间的命令应当“一起执行”,即要确保其具备原子性,以防止获取锁后,设置过期时间之前线程以外终止导致的死锁。
因此应当使用以下命令:
127.0.0.1:6379> SET lock 123 ex 10 nx
OK
127.0.0.1:6379> ttl lock
(integer) -2
实现
在实现分布式锁前,先将示例应用修改为分布式,即启动两个实例:
修改 Nginx 配置,启用反向代理和负载均衡:
# ...
http {
# ...
server {
# ...
location /api {
# ...
# proxy_pass http://127.0.0.1:8081;
proxy_pass http://backend;
}
}
upstream backend {
server 127.0.0.1:8081 max_fails=5 fail_timeout=10s weight=1;
server 127.0.0.1:8082 max_fails=5 fail_timeout=10s weight=1;
}
}
重启 Nginx:
❯ nginx -s quit
❯ start nginx
可以通过开启两个标签页分别抢购,并添加断点的方式观察两个不同 JVM 的线程同时进入加锁的临界区的情况:
创建分布式锁接口:
public interface ILock {
/**
* 尝试获取锁(非阻塞式)
*
* @param timeoutSec 超时自动释放(单位,秒)
* @return
*/
boolean tryLock(long timeoutSec);
/**
* 释放锁
*/
void unlock();
}
创建实现类:
public class SimpleRedisLock implements ILock {
private StringRedisTemplate stringRedisTemplate;
// 用于锁的业务名称
private final String businessName;
// 锁的统一前缀
private static final String KEY_PREFIX = "lock:";
// redis key
private final String redisKey;
public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String businessName) {
this.stringRedisTemplate = stringRedisTemplate;
this.businessName = businessName;
this.redisKey = KEY_PREFIX + businessName;
}
public boolean tryLock(long timeoutSec) {
String threadName = Thread.currentThread().getName();
Boolean isLock = stringRedisTemplate.opsForValue().setIfAbsent(redisKey, threadName, timeoutSec, TimeUnit.SECONDS);
return BooleanUtil.isTrue(isLock);
}
public void unlock() {
stringRedisTemplate.delete(redisKey);
}
}
使用分布式锁保护临界区:
// 使用用户标识进行加锁
ILock lock = new SimpleRedisLock(stringRedisTemplate, "voucher-order:" + userId.toString());
boolean isLock = lock.tryLock(2000);
if (!isLock){
return Result.fail("同一用户不能重复抢购");
}
try{
return proxy.doCreateOrder(voucherId);
}
finally {
lock.unlock();
}
方便起见,这里是通过非阻塞的方式用 Redis 加锁,如果获取失败,就简单返回错误。毕竟这里锁的颗粒度是针对单个用户,只有一个用户同时有大量请求时才会触发失败,此种情况下可以认为用户在用脚本秒杀。
为了用 debug 的方式观察分布式锁的生效,这里给了一个较长的锁过期时间(2000秒),在真实开发中,通常只需要设定一个比业务执行时长稍长的时间,比如业务平均执行时长的10倍。
锁误删
上面的 “Redis 锁” 实际上存在一个问题:
就像上图展示的,如果线程1因为某些原因长时间阻塞,此时 Redis 锁因为超时被释放,然后其它线程(线程2)就可以获取 Redis 锁。而稍后在线程1执行完业务代码后,会释放 Redis 锁,而此时线程2可能仍然在执行,释放了锁后别的线程(线程3)就可以获取锁。在这种情况下,两个线程(线程2和线程3)同时获取了锁,并同时在执行业务代码。
这个问题的本质是当前线程释放了其它线程获取的锁而导致的。因此,解决该问题的关键在于,释放 Redis 锁时要进行检查,即只能释放自己获取的锁。
修改 Redis 锁的代码,添加相应验证逻辑:
public class SimpleRedisLock implements ILock {
private StringRedisTemplate stringRedisTemplate;
// 用于锁的业务名称
private final String businessName;
// 锁的统一前缀
private static final String KEY_PREFIX = "lock:";
// redis key
private final String redisKey;
// uuid,用于区分不同 JVM 创建的锁
private static final String uuid = UUID.randomUUID().toString(true);
public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String businessName) {
this.stringRedisTemplate = stringRedisTemplate;
this.businessName = businessName;
this.redisKey = KEY_PREFIX + businessName;
}
public boolean tryLock(long timeoutSec) {
final String jvmThreadId = getJvmThreadId();
Boolean isLock = stringRedisTemplate.opsForValue().setIfAbsent(redisKey, jvmThreadId, timeoutSec, TimeUnit.SECONDS);
return BooleanUtil.isTrue(isLock);
}
/**
* 获取锁位于JVM当前线程的唯一标识
*
* @return 锁位于JVM当前线程的唯一标识
*/
private String getJvmThreadId() {
// 获取当前线程id,用于区分同一个 JVM 的不同线程创建的锁
long threadId = Thread.currentThread().getId();
// 拼接 uuid和线程id,可以唯一确定一个JVM中的一个线程
final String jvmThreadId = uuid + "-" + threadId;
return jvmThreadId;
}
public void unlock() {
// 删除 Redis key 时判断,只删除当前线程创建的 key
// 从 redis 获取锁对应的值
String lockVal = stringRedisTemplate.opsForValue().get(redisKey);
String jvmThreadId = getJvmThreadId();
// 比较值与当前 JVM-线程标识是否一致
if (jvmThreadId.equals(lockVal)){
stringRedisTemplate.delete(redisKey);
}
}
}
Lua脚本
在极端情况下,上面的代码依然会出错:
原因是检查锁是否是自己获取的与释放锁操作是分别执行的,这两者之间依然可能出现阻塞进而导致之前所说的问题。要解决这个问题需要借助 Lua 脚本。
Redis 是支持执行 Lua 脚本的,并且可以保证执行单个 Lua 脚本时是原子性的。换言之,我们可以借助 Lua 脚本来批量地执行一些 Redis 命令,以保证这些命令执行时的原子性。
-- Redis 锁删除脚本,对比 Redis 中的锁标识与当前给定标识,如果一致,删除,否则不删除
------------------
-- 从命令行获取参数
-- Redis 锁对应的key
local lockKey = KEYS[1]
-- Redis 锁需要匹配的目标标识
local targetKeyVal = ARGV[1]
-- 获取 Redis 中存储的锁的当前值
local keyVal = redis.call('get', lockKey)
if (targetKeyVal == keyVal) then
-- 删除锁对应的key
return redis.call('del', lockKey)
end
-- 失败返回
return 0
这里的 Lua 脚本位于
resources/unlock.lua
。更多的 Lua 语法可以参考
释放 Redis 锁的操作改为通过执行脚本完成,以确保其原子性:
// ...
public class SimpleRedisLock implements ILock {
// ...
// 用 Lua 编写的 Redis 锁释放脚本
private static final DefaultRedisScript<Long> REDIS_SCRIPT;
static {
REDIS_SCRIPT = new DefaultRedisScript<>();
// 指定脚本的位置
REDIS_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
// 指定脚本的返回值类型
REDIS_SCRIPT.setResultType(Long.class);
}
// ...
@Override
public void unlock() {
// 使用 lua 脚本删除 Redis 锁
List<String> keys = Collections.singletonList(redisKey);
String jvmThreadId = getJvmThreadId();
stringRedisTemplate.execute(REDIS_SCRIPT, keys, jvmThreadId);
}
}
本文的示例代码可以从获取。
文章评论