红茶的个人站点

  • 首页
  • 专栏
  • 开发工具
  • 其它
  • 隐私政策
Awalon
Talk is cheap,show me the code.
  1. 首页
  2. 未分类
  3. 正文

Redis 学习笔记 6:消息队列

2025年5月7日 61点热度 0人点赞 0条评论

秒杀优化

之前的优惠券秒杀业务是存在并发访问下的性能瓶颈的,这是因为其完整过程都是顺序执行的:

image-20250505112822990

并且扣减库存和创建订单的步骤都是数据库写操作,比较耗费时间。

可以用 JMeter 的测试脚本对其进行压力测试以说明性能问题。

模拟并发请求

编写一个测试用例来模拟1000个用户登录:

/**
     * 模拟多个用户登录
     */
@Test
public void testMultiUsersLogin() throws IOException {
    // 清理已经登录的用户
    Set<String> keys = stringRedisTemplate.keys("login:token:*");
    if (keys != null) {
        for (String key : keys) {
            stringRedisTemplate.delete(key);
        }
    }
    // 读取1000个用户信息
    final int USER_NUM = 1000;
    QueryWrapper<User> qw = new QueryWrapper<>();
    qw.ne("phone", "123").last(String.format("limit %d", USER_NUM));
    List<User> users = userService.list(qw);
    List<String> tokens = new ArrayList<>(USER_NUM);
    // 模拟用户登录
    for (User user : users) {
        String token = UUID.randomUUID().toString(true);
        tokens.add(token);
        UserDTO userDTO = new UserDTO();
        BeanUtils.copyProperties(user, userDTO);
        stringRedisTemplate.opsForValue().set(LOGIN_USER_KEY + token,
                                              OBJECT_MAPPER.writeValueAsString(userDTO), LOGIN_USER_TTL);
    }
    // 将用户token写入测试文件
    Resource resource = resourceLoader.getResource("classpath:");
    String filePath = resource.getURL().getPath() + "tokens.txt"; // 文件路径
    log.info(filePath);
    File file = new File(filePath);
    if (file.exists()) {
        boolean res = file.delete();
        if (!res){
            log.error("文件删除失败");
            return;
        }
    }
    try (FileWriter fw = new FileWriter(file)) {
        for (String token : tokens) {
            fw.write(token + "\n");
        }
    }
}

执行该测试用例,会在 redis 中写入 1000 个用户的 token 信息,且将这些 token 信息记录在项目的target/test-classes/tokens.txt文件中。

在 JMeter 中使用该文件作为 token 信息来模拟多用户请求:

image-20250505113421728

测试结果如下:

image-20250505114044585

拆分业务

可以将执行步骤拆分,用 Redis 完成优惠券秒杀资格检查的工作,然后就可以立即告诉用户秒杀成功/失败,之后开启单独线程异步执行优惠券订单生成工作:

image-20250505114516897

秒杀资格判断分为两部分,判断库存是否足够和检查用户是否已经拥有该优惠券。前者可以用 Redis 的 key-value 结构实现,后者可以用 set 结构实现:

image-20250505114822817

具体实现

在添加秒杀优惠券的时候将优惠券的库存信息写入 Redis:

// ...
@Service
public class VoucherServiceImpl extends ServiceImpl<VoucherMapper, Voucher> implements IVoucherService {
​
    // ...
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
​
    // ...
​
    @Override
    @Transactional
    public void addSeckillVoucher(Voucher voucher) {
        // ...
        // 将秒杀券库存写入 Redis
        String stockKey = SECKILL_STOCK_KEY + voucher.getId();
        stringRedisTemplate.opsForValue().set(
                stockKey,
                seckillVoucher.getStock().toString());
    }
}

显然,整个秒杀资格判断需要多条 Redis 命令,为了操作的原子性,需要借助 Lua 脚本:

--[[
    @描述:在 Redis 中完成优惠券秒杀资格判断
    @参数:优惠券id,用户id
    @返回值:0 表示秒杀成功,1 表示库存不足,2 表示该用户已经拥有该优惠券
]] --    
local voucherId = ARGV[1] -- 优惠券id
local userId = ARGV[2] -- 用户id
local stockKey = "seckill:stock:" .. voucherId -- 存储优惠券库存的key
local voucherUsersKey = "seckill:users:" .. voucherId -- 存储已经拥有该优惠券的用户id的key
-- 如果 redis 中不存在库存信息,视为不能秒杀
if (redis.call('EXISTS', stockKey) == 0) then
    return 1
end
-- 判断库存是否足够
local stock = tonumber(redis.call('GET', stockKey))
if (stock <= 0) then
    -- 库存不足
    return 1
end
-- 判断用户是否已经有该优惠券
if (redis.call('SISMEMBER', voucherUsersKey, userId) == 1) then
    -- 用户已经拥有该优惠券
    return 2
end
-- 库存扣减
redis.call('INCRBY', stockKey, -1)
-- 将用户信息记录到优惠券-用户集合
redis.call('SADD', voucherUsersKey, userId)
return 0

重构优惠券秒杀逻辑,使用 Lua 脚本检验秒杀资格:

@Override
public Result createOrder(Long voucherId) {
    // ...
    // 通过 Redis 检查秒杀资格
    Long res = stringRedisTemplate.execute(
        SECKILL_CHECK_SCRIPT,
        Collections.emptyList(),
        voucherId.toString(),
        UserHolder.getUser().getId().toString());
    if (res == null) {
        // 优惠券秒杀资格检查脚本出错
        return Result.fail("缺少库存");
    }
    if (res != 0) {
        String errMsg = "缺少库存";
        if (res == 2) {
            errMsg = "已经抢购过该优惠券,不能重复抢购";
        }
        return Result.fail(errMsg);
    }
    // 分配一个新的订单id
    long orderId = globalIdGenerator.genGlobalId("voucher-order");
    // 秒杀成功,通过消息队列发送秒杀信息,由其它线程创建优惠券订单
    // TODO 发送消息
    return Result.ok(orderId);
}

这里使用 JDK 自带的阻塞消息队列:

// 用于处理优惠券秒杀的消息队列
private static final BlockingQueue<VoucherOrder> VOUCHER_MSG_QUEUE = new ArrayBlockingQueue<>(1024 * 1024);

消息队列的容量指定了一个较大的值(1024*1024)

启动一个子线程以处理消息:

// 用于异步处理秒杀订单的线程池
private final ExecutorService ES = Executors.newSingleThreadExecutor();
// ...
/**
 * 在当前ben对象初始化后启动订单处理线程
 */
@PostConstruct
public void afterInit() {
    ES.execute(() -> {
        // 获取代理对象
        VoucherOrderServiceImpl proxy = applicationContext.getBean(VoucherOrderServiceImpl.class);
        // 秒杀订单处理线程
        while (true) {
            // 从消息队列获取秒杀订单信息
            try {
                VoucherOrder voucherOrder = VOUCHER_MSG_QUEUE.take();
                // 创建订单
                RLock lock = redissonClient.getLock("seckill:lock:" + voucherOrder.getUserId());
                boolean isLocked = lock.tryLock(10, TimeUnit.SECONDS);
                if (isLocked) {
                    try {
                        proxy.doCreateOrder(voucherOrder);
                    } finally {
                        lock.unlock();
                    }
                }
            } catch (InterruptedException e) {
                log.error(e.toString());
            }
        }
    });
}

具体的订单生成逻辑:

@Transactional
public void doCreateOrder(VoucherOrder voucherOrder) {
    // 检查用户是否已经抢购过该优惠券
    Integer count = this.query()
        .eq("user_id", voucherOrder.getUserId())
        .eq("voucher_id", voucherOrder.getVoucherId())
        .count();
    if (count > 0) {
        log.error("已经抢购过优惠券,不能重复抢购");
        return;
    }
    // 扣减库存时检查
    boolean res = seckillVoucherService.update().setSql("stock=stock-1")
        .eq("voucher_id", voucherOrder.getVoucherId())
        .gt("stock", 0) // 只要库存大于0都可以提交更新
        .update();
    if (!res) {
        log.error("缺少库存");
        return;
    }
    this.save(voucherOrder);
}

这里要注意的是:

  • 子线程在“自调用”时依然必须显式获取代理对象以确保 JDBC 事务正常启用。

  • 子线程依然使用分布式锁控制一人一单的实现,鉴于我们已经使用 Redis 做了秒杀资格检查,这样做似乎是没有必要的,但这里依然使用锁进行限制是一种代码冗余设计。生成订单时对库存的额外检查同样是基于冗余设计的考虑。

  • 如果仅启用一个应用实例,因为这里仅启用了一个子线程,所以订单的处理是串行执行的,完全可以不考虑并发,但保留并发设计(使用锁)的好处是可以很容易启用多个子线程处理订单或者启用多个应用实例,以提高处理性能。

最后不要忘了实现发送消息的代码:

// 秒杀成功,通过消息队列发送秒杀信息,由其它线程创建优惠券订单
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setId(orderId);
voucherOrder.setVoucherId(voucherId);
voucherOrder.setUserId(userId);
VOUCHER_MSG_QUEUE.add(voucherOrder);

再次模拟1000个用户进行秒杀:

image-20250505142509195

可以看到性能改善了很多。

消息队列

使用 JDK 自带的阻塞队列作为消息队列很容易实现,但有以下缺点:

  • 占用 JVM 内存,随着消息队列内的消息堆积,可能导致内存溢出。

  • 基于内存,不能持久化保存,JVM 宕机后消息丢失。

  • 不具备消息确认机制,消费者获取消息后进程意外结束,会导致消息丢失且没有得到正确处理。

因此我们需要使用更成熟的消息队列,最简单的是使用 Redis 自带的消息队列实现。

Redis 支持三种方式实现的消息队列,它们各有优缺点:

image-20250506120109385

List

使用 Redis 提供的 List 结构可以很容易实现一个消息队列。只需要从 List 的一端放入数据,从另一端读取数据:

127.0.0.1:6379> LPUSH mylist "world"
(integer) 1
127.0.0.1:6379> RPOP mylist
"world"
127.0.0.1:6379> RPOP mylist
(nil)

这里使用了 LPUSH 和 RPOP 命令,更常见的是阻塞式读取:

127.0.0.1:6379> BRPOP mylist 0
1) "mylist"
2) "hello"
(10.54s)

BRPOP 命令需要指定一个阻塞时长(单位毫秒),0 表示永久。

B表示 block(阻塞)

Pub/Sub

Pub/Sub 是 Redis 提供的一种消息传递机制,而非数据结构,因此它是不能持久化保存消息的(Redis 宕机后消息会丢失)。

127.0.0.1:6379> publish c1 m2
(integer) 1
127.0.0.1:6379> SUBSCRIBE c1
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "c1"
3) (integer) 1
1) "message"
2) "c1"
3) "m2"

使用 PUBLISH 和 SUBSCRIBE 命令可以很容易通过发布/订阅方式来传递消息,缺点是消费者只能获取到开启监听后产生的消息,并不能以回溯的方式获取之前产生的消息,因为并没有专门的数据结构存储那些消息。此外,消费者开启订阅监听后,产生的消息会在消费者端堆积,如果没有及时处理,可能导致消费者端的消息丢失(超过消息最大存储限制)。

此外和 List 不同的是,用 Pub/Sub 的方式发送消息,可以将一条消息同时发给多个消费者。

Stream

与 List 相比,Stream 是一个更完备的可以用于消息传递的数据结构。

可以用 XADD 命令将消息添加到 Stream:

127.0.0.1:6379> XADD stream1 * k1 v1 k2 v2
"1746527781821-0"

如果 Stream 不存在,将自动创建,可以用 NOMKSTREAM 参数禁用。

传递给 Stream 的消息以键值对(entry)的方式表示,XADD 命令需要指定一个 Stream ID,该 ID 用于发送的消息(entry)在 Stream 中的唯一标识。可以用*代替,此时 Redis 将自动生成一个。该标识将作为执行结果返回,比如这个示例中的1746527781821-0,这个 ID 由两部分构成,前边是一个时间戳,后边是一个自增的值。

用 XLEN 命令可以查看 Stream 中的消息数:

127.0.0.1:6379> XLEN stream1
(integer) 3

用 XREAD 命令可以从 Stream 中读取消息:

127.0.0.1:6379> XREAD count 1 streams stream1 0-0
1) 1) "stream1"
   2) 1) 1) "1746527781821-0"
         2) 1) "k1"
            2) "v1"
            3) "k2"
            4) "v2"

这个指令需要指定从哪条消息开始读取,因为和 List 不同,从 Stream 中读取消息后不会被删除(支持回溯),因此每次读取消息后要记录已经读过的消息在 Stream 中的 ID,下次从该 id 继续读取。如果想从 Stream 中的第一条开始读取,可以指定 id 为0-0或者0。

比如当前 Stream 中有3条消息:

127.0.0.1:6379> XLEN stream1
(integer) 3
127.0.0.1:6379> XREAD count 3 streams stream1 0-0
1) 1) "stream1"
   2) 1) 1) "1746527781821-0"
         2) 1) "k1"
            2) "v1"
            3) "k2"
            4) "v2"
      2) 1) "1746528704371-0"
         2) 1) "k1"
            2) "v1"
            3) "k2"
            4) "v2"
      3) 1) "1746528706772-0"
         2) 1) "k1"
            2) "v1"
            3) "k2"
            4) "v2"

从第二条消息(id 为1746528704371-0)开始读取一条消息:

127.0.0.1:6379> XREAD count 1 streams stream1 1746528704371-0
1) 1) "stream1"
   2) 1) 1) "1746528706772-0"
         2) 1) "k1"
            2) "v1"
            3) "k2"
            4) "v2"

XREAD 命令支持阻塞式的读取消息:

127.0.0.1:6379> XREAD count 1 block 10000 streams stream1 $
1) 1) "stream1"
   2) 1) 1) "1746529417426-0"
         2) 1) "k1"
            2) "v1"
            3) "k2"
            4) "v2"

这里的block 10000表示以阻塞的方式获取一条消息,最大等待时长为 10000 毫秒。

block 0 表示永久阻塞。

如果是希望从监听开始获取一条最新的消息,可以使用$代替消息 id,但需要注意的是,以这种方式获取消息后,再次读取消息时仍然需要使用消息 id,否则可能会漏读消息。

比如当前开启监听并获取到了一条最新消息:

127.0.0.1:6379> XREAD count 1 block 10000 streams stream1 $
1) 1) "stream1"
   2) 1) 1) "1746529417426-0"
         2) 1) "k1"
            2) "v1"
            3) "k2"
            4) "v2"

然后在消费者处理消息的同时,生产者又发送了一条消息:

127.0.0.1:6379> XADD stream1 * k1 v1 k2 v2
"1746529925863-0"

此时消费者完成了消息处理,重新开始监听:

127.0.0.1:6379> XREAD count 1 block 10000 streams stream1 $

此时并不能接收到上条消息,因为$只代表从消费者监听开始产生的消息中最新的消息。如果要正常获取,就需要指定 id:

127.0.0.1:6379> XREAD count 1 block 10000 streams stream1 1746529417426-0
1) 1) "stream1"
   2) 1) 1) "1746529866133-0"
         2) 1) "k1"
            2) "v1"
            3) "k2"
            4) "v2"

直接使用 XREAD 读取消息会很麻烦,需要自行记录已经读取的消息 id。而且 Stream 中同一条消息会发送给所有通过 XREAD 命令监听的客户端,有时候我们的需求是用多个消费者处理不同的消息,此时就需要使用消费者组。

使用 XGROUP CREATE 在指定 Stream 上创建消费者组:

127.0.0.1:6379> XGROUP create stream1 group1 0
OK

在消息组创建后,会将 Stream 中的消息拷贝到消费者组中,因此需要指定需要开始拷贝的消息 id,如果是所有的消息(从头开始拷贝),可以用0代替。如果不需要拷贝,仅用于接收新产生的消息,可以用$代替。

如果要新建一个 Stream 和消费者组,可以:

127.0.0.1:6379> XGROUP create stream2 group1 $ mkstream
OK

一个消费者组上可以有多个消费者处理消息,用XGROUP命令可以创建消费者:

127.0.0.1:6379> XGROUP createconsumer stream2 group1 consumer1
(integer) 1
127.0.0.1:6379> XGROUP createconsumer stream2 group1 consumer2
(integer) 1

使用 XREADGROUP 命令可以用一个消费者从消费者组中获取消息:

127.0.0.1:6379> XREADGROUP group group1 consumer1 count 1 block 0 streams stream2 >

这里的>表示没有被其它消费者获取过的消息。也就是说,消费者组的同一条消息不会重复发送给不同的消费者。实际上在消费者组中,Redis 为每个消费者维护一个记录列表,记录发送给对应消费者的消息。

消费者组是支持消息确认的,因此消费者正常处理完消息后需要使用 XACK 命令进行确认:

127.0.0.1:6379> XACK stream2 group1 1746532057224-0
(integer) 1

用 XPENDING 命令可以查看消费者组中没有经过确认的消息:

27.0.0.1:6379> XPENDING stream2 group1
1) (integer) 1
2) "1746532067048-0"
3) "1746532067048-0"
4) 1) 1) "consumer1"
      2) "1"

也可以筛选具体消费者没有经过确认的消息:

127.0.0.1:6379> XPENDING stream2 group1 - + 10 consumer1
1) 1) "1746532067048-0"
   2) "consumer1"
   3) (integer) 527293
   4) (integer) 1

这里的-表示最小的消息id,+表示最大的消息id,10表示获取最多10条消息。

用 Stream 实现消息队列

之前是用 Lua 脚本在 Redis 中检查秒杀资格,符合资格后在 Java 代码中将添加订单的消息发送给 JDK 阻塞队列:

// 通过 Redis 检查秒杀资格
Long userId = UserHolder.getUser().getId();
Long res = stringRedisTemplate.execute(
    SECKILL_CHECK_SCRIPT,
    Collections.emptyList(),
    voucherId.toString(),
    userId.toString());
if (res == null) {
    // 优惠券秒杀资格检查脚本出错
    return Result.fail("缺少库存");
}
if (res != 0) {
    String errMsg = "缺少库存";
    if (res == 2) {
        errMsg = "已经抢购过该优惠券,不能重复抢购";
    }
    return Result.fail(errMsg);
}
// 分配一个新的订单id
long orderId = globalIdGenerator.genGlobalId("voucher-order");
// 秒杀成功,通过消息队列发送秒杀信息,由其它线程创建优惠券订单
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setId(orderId);
voucherOrder.setVoucherId(voucherId);
voucherOrder.setUserId(userId);
VOUCHER_MSG_QUEUE.add(voucherOrder);

现在使用 Stream,因此可以在 Lua 脚本检查秒杀资格后,直接通过 Redis 命令发送消息:

local orderId = ARGV[3] -- 订单id
-- ...
-- 库存扣减
redis.call('INCRBY', stockKey, -1)
-- 将用户信息记录到优惠券-用户集合
redis.call('SADD', voucherUsersKey, userId)
-- 发送消息到消息队列
redis.call('XADD', 'seckill:msg-queue', '*', 'voucherId', voucherId, 'userId', userId, 'id', orderId)
return 0

当然,这里需要额外接收一个参数作为订单id。

通过 Stream 获取消息并生成订单:

// ...
@Log4j2
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
 	// ...

    // Stream 名称
    private static final String STREAM_NAME = "seckill:msg-queue";
    // consumer group 名称
    private static final String CONSUMER_GROUP_NAME = "my-group";
    private StreamOperations<String, Object, Object> streamOps;

    // 用于异步处理秒杀订单的线程池
    private final ExecutorService ES = Executors.newSingleThreadExecutor();

    /**
     * 自定义异常类,用于表示 Records 列表中缺少一个有效的 Record 信息
     */
    private static class EmptyRecordsException extends Exception {
        public EmptyRecordsException(String message) {
            super(message);
        }
    }

    private class OrderHandler implements Runnable {

        // Stream 中的消费者名称
        private final String CONSUMER_NAME = UUID.randomUUID().toString(true);

        /**
         * 子线程,从 Stream 中读取订单消息,并生成订单
         * 如果在处理过程中产生异常,将会从对应消费者的 pending-list 中获取一个未确认消息进行处理,作为一种异常恢复机制
         */
        @Override
        public void run() {
            // 获取代理对象
            while (true) {
                // 读取消息(如果消费者不存在,自动创建)
                List<MapRecord<String, Object, Object>> records = streamOps.read(Consumer.from(
                                CONSUMER_GROUP_NAME, CONSUMER_NAME),
                        StreamReadOptions.empty().count(1L).block(Duration.ofSeconds(20)),
                        StreamOffset.create(STREAM_NAME, ReadOffset.lastConsumed()));
                try {
                    // 处理消息
                    handleOrderMsg(records);
                    // 消息处理成功,再次开始循环处理新的消息
                } catch (Exception e) {
                    // 处理消息遇到异常
                    if (e instanceof EmptyRecordsException) {
                        // 没有获取到消息,重新尝试获取
                        log.info("没有获取到新的消息,尝试重新获取");
                        continue;
                    }
                    // 获取一个未确认的消息重新处理
                    while (true) {
                        PendingMessages pendingMessages = streamOps.pending(
                                STREAM_NAME,
                                Consumer.from(CONSUMER_GROUP_NAME, CONSUMER_NAME),
                                Range.unbounded(),
                                1
                        );
                        if (pendingMessages.isEmpty()) {
                            // 没有待处理消息
                            break;
                        }
                        PendingMessage pendingMessage = pendingMessages.get(0);
                        RecordId id = pendingMessage.getId();
                        List<MapRecord<String, Object, Object>> records2 = streamOps.range(STREAM_NAME, Range.closed(id.toString(), id.toString()));
                        try {
                            handleOrderMsg(records2);
                            // 成功处理掉一个待处理任务,结束错误处理流程
                            break;
                        } catch (Exception ex) {
                            if (ex instanceof EmptyRecordsException) {
                                // 没有获取到消息,尝试重新获取
                                log.error("没有获取到未确认消息,尝试重新获取");
                                continue;
                            }
                            // 处理消息出错,尝试再次获取一个待处理消息并进行处理
                            log.error(ex.getMessage());
                            ex.printStackTrace();
                        }
                    }
                }
            }
        }

        /**
         * 处理订单消息
         * @param records 一个 MapRecord 列表,包含至少一条有效的 MapRecord 信息
         * @throws EmptyRecordsException 当缺少一条有效的 MapRecord 时抛出
         * @throws InterruptedException
         */
        private void handleOrderMsg(List<MapRecord<String, Object, Object>> records) throws InterruptedException, EmptyRecordsException {
            if (records == null || records.isEmpty()) {
                // 没有获取到指定消息
                throw new EmptyRecordsException("没有获取到指定消息");
            }
            MapRecord<String, Object, Object> record = records.get(0);
            Map<Object, Object> map = record.getValue();
            VoucherOrder voucherOrder = new VoucherOrder();
            BeanUtil.fillBeanWithMap(map, voucherOrder, true);
            createOrderWithLock(voucherOrder);
            // 确认消息
            streamOps.acknowledge(CONSUMER_GROUP_NAME, record);
            // 处理订单成功,结束异常订单处理流程
            log.info("订单处理成功");
        }

        /**
         * 用加锁的方式创建订单(一人一单锁)
         * @param voucherOrder 秒杀券订单信息
         * @throws InterruptedException
         */
        private void createOrderWithLock(VoucherOrder voucherOrder) throws InterruptedException {
            VoucherOrderServiceImpl proxy = applicationContext.getBean(VoucherOrderServiceImpl.class);
            RLock lock = redissonClient.getLock("seckill:lock:" + voucherOrder.getUserId());
            boolean isLocked = lock.tryLock(10, TimeUnit.SECONDS);
            if (isLocked) {
                try {
                    proxy.doCreateOrder(voucherOrder);
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    public boolean consumerGroupExists(String streamKey, String groupName) {
        try {
            stringRedisTemplate.opsForStream().groups(streamKey)
                    .stream()
                    .anyMatch(g -> groupName.equals(g.groupName()));
            return true;
        } catch (Exception e) {
            return false;
        }
    }

    /**
     * 在当前ben对象初始化后启动订单处理线程
     */
    @PostConstruct
    public void afterInit() {
        streamOps = stringRedisTemplate.opsForStream();
        // 如果没有消费者组,创建
        if (!consumerGroupExists(STREAM_NAME, CONSUMER_GROUP_NAME)) {
            streamOps.createGroup(STREAM_NAME, ReadOffset.from("$"), CONSUMER_GROUP_NAME);
        }
        // 启动子线程处理订单
        ES.execute(new OrderHandler());
    }

    // ...
}

这里包含了消息处理产生异常时从 pending-list 重新获取未确认消息进行处理的机制,因此代码比较复杂,可以下载项目完整代码后自行查看。

The End.

参考资料

  • 黑马程序员Redis入门到实战教程

本作品采用 知识共享署名 4.0 国际许可协议 进行许可
标签: redis stream 消息队列
最后更新:2025年5月7日

魔芋红茶

加一点PHP,加一点Go,加一点Python......

点赞
< 上一篇
下一篇 >

文章评论

取消回复

*

code

COPYRIGHT © 2021 icexmoon.cn. ALL RIGHTS RESERVED.
本网站由提供CDN加速/云存储服务

Theme Kratos Made By Seaton Jiang

宁ICP备2021001508号

宁公网安备64040202000141号