首先,在的基础上,于示例项目完成达人探店部分功能。
达人探店
发布探店笔记
将上传图片的存储地址修改为当前启动的前端 Nginx 所在的存储目录:
package com.hmdp.utils;
public class SystemConstants {
public static final String IMAGE_UPLOAD_DIR = "D:\\workspace\\learn-redis\\ch3\\nginx-1.18.0\\html\\hmdp\\imgs";
// ...
}
上传一个探店笔记,相关的文字内容和图片素材可以从下面获取:
链接: https://pan.baidu.com/s/1hnZ5LvXMGBw--jlGxHC6Fg?pwd=fzd2 提取码: fzd2
查看探店笔记
实现对应的接口:
// ...
("/blog")
public class BlogController {
// ...
("/{id}")
public Result queryBlogById(("id") Long id){
return blogService.queryBlogById(id);
}
}
// ...
public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IBlogService {
private IUserService userService;
public Result queryBlogById(Long id) {
Blog blog = getById(id);
if (blog == null){
return Result.fail("探店笔记不存在");
}
// 获取探店笔记对应的用户信息
User user = userService.getById(blog.getUserId());
blog.setIcon(user.getIcon());
blog.setName(user.getNickName());
return Result.ok(blog);
}
}
点赞
当前给探店笔记点赞的功能存在问题,一个用户可以重复点赞。这是因为接口实现比较简单:
("/like/{id}")
public Result likeBlog(("id") Long id) {
// 修改点赞数量
blogService.update()
.setSql("liked = liked + 1").eq("id", id).update();
return Result.ok();
}
没有经过任何判断,每次接口调用都将表数据字段+1。
实现用户点赞/取消点赞的业务逻辑:
// ...
public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IBlogService {
private IUserService userService;
private StringRedisTemplate stringRedisTemplate;
// ...
public Result likeBlog(Long id) {
// 通过 Redis 查询当前用户有没有点赞过
RedisBlogLike redisBlogLike = new RedisBlogLike(stringRedisTemplate, id);
Long userId = UserHolder.getUser().getId();
if (!redisBlogLike.isLiked(userId)) {
// 没有点赞过,点赞
// 数据库点赞次数+1
boolean updateRes = this.update()
.setSql("liked=liked+1")
.eq("id", id.toString())
.update();
// 如果数据更新成功,Redis 中也进行点赞操作
if (updateRes) {
redisBlogLike.like(userId);
return Result.ok();
}
return Result.fail("执行出错");
}
// 已经点赞过,取消点赞
// 数据库点赞次数-1
boolean updateRes = this.update()
.setSql("liked=liked-1")
.eq("id", id.toString())
.update();
// 如果数据更新成功,Redis 中也进行取消点赞操作
if (updateRes) {
redisBlogLike.unLike(userId);
return Result.ok();
}
return Result.fail("执行出错");
}
/**
* 维护 Redis 中的探店笔记点赞信息
*/
private static class RedisBlogLike {
// 用户维护探店笔记已经点赞过的用户 set 的 key
private final String key;
private final SetOperations<String, String> opsForSet;
/**
* @param stringRedisTemplate redis API
* @param blogId 探店笔记 id
*/
public RedisBlogLike(StringRedisTemplate stringRedisTemplate, long blogId) {
key = BLOG_LIKED_KEY + blogId;
opsForSet = stringRedisTemplate.opsForSet();
}
/**
* 判断指定用户是否已经给探店笔记点过赞了
*
* @param userId 用户id
* @return 是否已经点过赞
*/
public boolean isLiked(long userId) {
Boolean isMember = opsForSet.isMember(key, Long.toString(userId));
return BooleanUtil.isTrue(isMember);
}
/**
* 点赞
*
* @param userId 点赞的用户 id
*/
public void like(long userId) {
// 将用户 id 放入 Set
opsForSet.add(key, Long.toString(userId));
}
/**
* 取消点赞
*
* @param userId 取消点赞的用户 id
*/
public void unLike(long userId) {
// 将用户 id 从 Set 中移除
opsForSet.remove(key, Long.toString(userId));
}
}
}
为了让代码结构更清晰,这里使用一个内部类RedisBlogLike
来维护 Redis 上作为用户点赞记录的 Set 结构。
这里对 Redis 的操作是通过 API 多次调用实现的,没有使用 Lua 脚本,所以不具备原子性。此外,在数据库表更新点赞数据时也没有任何额外检查。因此这个功能在并发情况下是可能导致同一个用户触发多次点赞数增加或减少。但是考虑到点赞功能并不像下单那样危险,会导致库存超卖等经济损失。而且同一个用户大量并发的方式调用该接口,还可能导致点赞数下降,因此这里并没有采用锁来保护和限制此功能。
这里的讨论也说明了一件事,并不需要对项目中所有因为并发调用而可能导致的共享数据冲突采用加锁来保护,要具体情况具体分析。即在加锁会带来代码复杂度的增加和性能下降与不加锁可能导致的异常后果之间进行衡量,以最终确定是否要采用加锁。
("/hot")
public Result queryHotBlog((value = "current", defaultValue = "1") Integer current) {
// ...
records.forEach(blog ->{
// ...
// 追加当前用户是否已经点赞过的信息
IBlogService.RedisBlogLike redisBlogLike = new IBlogService.RedisBlogLike(stringRedisTemplate, blog.getId());
blog.setIsLike(redisBlogLike.isLiked(userId));
});
// ...
}
为了方便外部引用,这里将内部类
RedisBlogLike
重构,移动到接口IBlogService
中。
类似的,查询探店笔记详情的接口也需要做同样修改。
点赞排行榜
在探店笔记详情页,需要显示最早点赞的 N 个人的头像,因此不能再使用 Set 来记录点赞过的用户 id,而是应当改为使用 Sorted Set,score 则使用点赞时的时间戳。
下面是重构后的RedisBlogLike
:
// ...
public interface IBlogService extends IService<Blog> {
// ...
/**
* 维护 Redis 中的探店笔记点赞信息
*/
class RedisBlogLike {
// 用户维护探店笔记已经点赞过的用户 sorted set 的 key
private final String key;
private final ZSetOperations<String, String> opsForZSet;
/**
* @param stringRedisTemplate redis API
* @param blogId 探店笔记 id
*/
public RedisBlogLike(StringRedisTemplate stringRedisTemplate, long blogId) {
key = BLOG_LIKED_KEY + blogId;
opsForZSet = stringRedisTemplate.opsForZSet();
}
/**
* 判断指定用户是否已经给探店笔记点过赞了
*
* @param userId 用户id
* @return 是否已经点过赞
*/
public boolean isLiked(long userId) {
Double score = opsForZSet.score(key, Long.toString(userId));
// 如果能从 sorted set 中获取到元素对应的 score,说明存在该元素
return score != null;
}
/**
* 点赞
*
* @param userId 点赞的用户 id
*/
public void like(long userId) {
// 将用户 id 放入 Set
opsForZSet.add(key, Long.toString(userId), System.currentTimeMillis());
}
/**
* 取消点赞
*
* @param userId 取消点赞的用户 id
*/
public void unLike(long userId) {
// 将用户 id 从 Set 中移除
opsForZSet.remove(key, Long.toString(userId));
}
}
}
需要特别说明的是,大部分对于 ZSet(Sorted Set)的操作都与 Set 类似,唯一不同的是 ZSet 没有命令可以直接判断指定元素是否存在,只能使用score
命令,该命令会返回指定元素的 score,如果元素不存在,返回 nil(null)。
这里同样说明了将 Redis 中探店笔记点赞列表相关操作封装的好处,对其进行代码重构不需要修改调用的部分。
在RedisBlogLike
中实现获取最早点赞的N个用户的方法:
// ...
public interface IBlogService extends IService<Blog> {
// ...
class RedisBlogLike {
// ...
/**
* 获取最早点赞的N个用户id
*
* @param i 要获取的用户 id 数
* @return 最早点赞的n个用户id
*/
public Set<String> getEarliestLikes(int i) {
return opsForZSet.range(key, 0, i);
}
}
}
调用该方法:
// ...
@Service
public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IBlogService {
// ...
@Override
public Result queryBlogLikes(Long id) {
RedisBlogLike redisBlogLike = new RedisBlogLike(stringRedisTemplate, id);
Set<String> uids = redisBlogLike.getEarliestLikes(5);
List<User> users = userService.query().in("id", uids).list();
List<UserDTO> userDTOS = users.stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());
return Result.ok(userDTOS);
}
}
删除 Redis 中以前的点赞列表,刷新页面就能看到效果。不过实际用不同用户点赞会发现,获取到的点赞列表不一定是按最早点赞的排在最前边这样的顺序。这是因为实际读取用户信息时执行的 SQL 是:
SELECT id,phone,password,nick_name,icon,create_time,update_time FROM tb_user WHERE (id IN (5,1,4))
而该 SQL 并不是以 IN 语句中的顺序返回结果,而是以主键升序的方式返回结果。
如果要以 IN 语句中的顺序返回结果,可以:
SELECT id,phone,password,nick_name,icon,create_time,update_time FROM tb_user WHERE (id IN (5,1,4)) order by field (id,5,1,4);
当然也可以用多个SQL分别查询单个用户信息,然后组成用户列表的方式保证顺序正确,但那样是多次数据库查询,性能不如单条语句。此外还可以在 Java 中按原有顺序重新排序,那样不如在 SQL 中指定顺序简洁。
改为使用order by field...
的方式查询SQL:
Set<String> uids = redisBlogLike.getEarliestLikes(5);
String ids = String.join(",", uids);
List<User> users = userService.query().in("id", uids).last(String.format("order by field (id,%s)", ids)).list();
好友关注
关注/取消关注
Controller:
@RestController
@RequestMapping("/follow")
public class FollowController {
@Autowired
private IFollowService followService;
@PutMapping("/{id}/{flag}")
public Result follow(@PathVariable("id") Long id, @PathVariable("flag") Boolean flag) {
return followService.follow(id, flag);
}
@GetMapping("/or/not/{id}")
public Result isFollowed(@PathVariable("id") Long followUserId){
return followService.isFollowed(followUserId);
}
}
Service:
@Service
public class FollowServiceImpl extends ServiceImpl<FollowMapper, Follow> implements IFollowService {
@Override
public Result follow(Long id, Boolean flag) {
// 获取当前用户id
Long uid = UserHolder.getUser().getId();
if (BooleanUtil.isTrue(flag)) {
// 关注
Follow follow = new Follow();
follow.setUserId(uid);
follow.setFollowUserId(id);
this.save(follow);
} else {
// 取消关注
QueryWrapper<Follow> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("user_id", uid);
queryWrapper.eq("follow_user_id", id);
this.remove(queryWrapper);
}
return Result.ok();
}
@Override
public Result isFollowed(Long followUserId) {
QueryWrapper<Follow> queryWrapper = new QueryWrapper<>();
Long currentUserId = UserHolder.getUser().getId();
queryWrapper.eq("user_id", currentUserId);
queryWrapper.eq("follow_user_id", followUserId);
int count = this.count(queryWrapper);
return Result.ok(count > 0);
}
}
共同关注
共同关注功能可以利用 Redis 中的 Set 结构来实现,即在 Redis 中维护每个用户的关注列表,然后求交集。
重构代码,用一个内部类实现对 Redis 中的用户关注列表的操作:
// ...
public interface IFollowService extends IService<Follow> {
// ...
/**
* Redis 中指定用户的关注列表
*/
class RedisFollowList {
// 关注列表的 Redis key
private final String key;
private final SetOperations<String, String> opsForSet;
/**
* @param stringRedisTemplate
* @param userId 指定用户的 id
*/
public RedisFollowList(StringRedisTemplate stringRedisTemplate, long userId) {
this.key = FOLLOW_KEY + userId;
opsForSet = stringRedisTemplate.opsForSet();
}
/**
* 关注
*
* @param targetUserId 指定用户 id
*/
public void follow(long targetUserId) {
opsForSet.add(key, Long.toString(targetUserId));
}
/**
* 取消关注
*
* @param targetUserId 指定用户 id
*/
public void cancelFollow(long targetUserId) {
opsForSet.remove(key, Long.toString(targetUserId));
}
/**
* 判断是否已经关注过指定用户
*
* @param targetUserId 指定用户 id
*/
public boolean isFollowed(long targetUserId) {
Boolean isMember = opsForSet.isMember(key, Long.toString(targetUserId));
return BooleanUtil.isTrue(isMember);
}
}
}
Service 中除了操作数据库,还需要使用该内部类维护 Redis 中的关注列表:
@Service
public class FollowServiceImpl extends ServiceImpl<FollowMapper, Follow> implements IFollowService {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Override
public Result follow(Long id, Boolean flag) {
// 获取当前用户id
Long uid = UserHolder.getUser().getId();
RedisFollowList redisFollowList = new RedisFollowList(stringRedisTemplate, uid);
if (BooleanUtil.isTrue(flag)) {
// ...
if (res) {
// 在 Redis 中关注指定用户
redisFollowList.follow(id);
}
} else {
// ...
if (res){
// 在 Redis 中取消关注
redisFollowList.cancelFollow(id);
}
}
return Result.ok();
}
@Override
public Result isFollowed(Long followUserId) {
RedisFollowList redisFollowList = new RedisFollowList(stringRedisTemplate, UserHolder.getUser().getId());
return Result.ok(redisFollowList.isFollowed(followUserId));
}
}
实现个人详情页未实现的两个接口:
// ...
@Slf4j
@RestController
@RequestMapping("/user")
public class UserController {
// ...
@GetMapping("/{uid}")
public Result getUser(@PathVariable("uid") Long uid) {
User user = userService.getById(uid);
return Result.ok(BeanUtil.copyProperties(user, UserDTO.class));
}
}
// ...
@RestController
@RequestMapping("/blog")
public class BlogController {
// ...
@GetMapping("/of/user")
public Result queryBlogsByUser(@RequestParam("id") Long uid,
@RequestParam(name = "current", defaultValue = "1") Long current) {
return Result.ok(blogService.getUserBlogs(uid, current));
}
}
// ...
@Service
public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IBlogService {
// ...
@Override
public List<Blog> getUserBlogs(Long uid, Long current) {
QueryWrapper<Blog> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("user_id", uid);
IPage<Blog> page = new Page<>(current, 10);
this.page(page, queryWrapper);
return page.getRecords();
}
}
实现共同关注:
// ...
@RestController
@RequestMapping("/follow")
public class FollowController {
// ...
@GetMapping("/common/{id}")
public Result commonFollows(@PathVariable("id") Long uid){
return followService.commonFollows(uid);
}
}
// ...
@Service
public class FollowServiceImpl extends ServiceImpl<FollowMapper, Follow> implements IFollowService {
// ...
@Override
public Result commonFollows(Long uid) {
RedisFollowList redisFollowList = new RedisFollowList(stringRedisTemplate, UserHolder.getUser().getId());
Set<Long> uids = redisFollowList.commonFollows(uid);
if (uids.isEmpty()) {
return Result.ok(Collections.emptyList());
}
// 获取用户详情
List<User> users = userService.listByIds(uids);
return Result.ok(users);
}
}
// ...
public interface IFollowService extends IService<Follow> {
// ...
class RedisFollowList {
// ...
/**
* 返回当前用户与指定用户共同关注的用户列表
*
* @param uid 指定用户
* @return 共同关注的用户列表
*/
public Set<Long> commonFollows(Long uid) {
// 用 Set 求交集实现
Set<String> intersect = opsForSet.intersect(key, getKeyByUserId(uid));
if (intersect == null || intersect.isEmpty()) {
return Collections.emptySet();
}
return intersect.stream().map(Long::valueOf).collect(Collectors.toSet());
}
}
}
Feed 流
Feed 流的实现方案分为两种:
拉模式:
每次用户需要读取信息流时从发送方的发件箱拉取信息到自己的收件箱,排序后展示信息流。这样做的好处是不存在信息的多份拷贝,节省内存。缺点是读取时要进行信息拉取,可能存在延迟。
推模式:
在这种模式下消息的生产者不再有发件箱,产生消息后直接发送给订阅者的收件箱。这样做的缺陷是如果发送方有很多订阅者,同一条消息就会存在多条拷贝,占用内存。优点是订阅者在读取消息流时不存在临时拉取的行为,没有延迟。
推拉结合:
对于复杂应用,如果需要照顾到各种情况,可以采用推拉结合的方式,即将消息的发送者和订阅者划分为多种角色,按照角色的不同采用不同的策略,这样可以兼顾性能和用户体验(延迟):
-
大 V 发送消息给普通粉丝,采用拉模式,以节省内存
-
大 V 发送消息给活跃粉丝,采用推模式,以避免活跃用户感受到延迟
-
普通用户发送消息给粉丝,采用推模式,普通用户的粉丝较少,不会浪费太多内存
对于当前这个点评类示例应用,可以认为不存在大V,所以直接采用推模式。
Redis 中的 List 和 Set 结构都可以用于实现粉丝的收件箱,但如果采用 List + 索引分页查询的方式,可能出现信息重复的现象。
实际上所有基于索引的分页都可能导致类似问题,但一般的数据新增频率并不高,该问题可以在一定程度被容许,但信息流的更新频率相对来说比较高,不被容许。
可以使用 Redis 中的 SortedSet 结构实现滚动分页查询来避免该问题:
实际上就是将一个消息标识(这里是时间戳)作为 score,每次分页查询时记录上次查询到的最小(这里从大到小排序后读取)score,下次分页查询时从该 score 开始读取。因为新的消息 score 必然是最大的,所以只会加入头部,因此不会出现重复读取的问题。
该方案在存在连续出现多个相同 score 的元素时会有一些问题,需要仔细考虑实现细节以避免 bug。这点在之后的代码实现部分会说明。
下面实现代码,先将粉丝的探店笔记收件箱抽象为单独内部类进行处理:
// ...
public interface IBlogService extends IService<Blog> {
// ...
/**
* 基于 Redis 实现的粉丝探店笔记收件箱
*/
class RedisBlogMailbox {
// 指定用户收件箱在 Redis 中的 key
private final String key;
private final ZSetOperations<String, String> opsForZSet;
/**
* @param stringRedisTemplate Redis API
* @param fansUserId 粉丝的用户id
*/
public RedisBlogMailbox(StringRedisTemplate stringRedisTemplate, long fansUserId) {
opsForZSet = stringRedisTemplate.opsForZSet();
key = BLOG_MAILBOX + fansUserId;
}
/**
* 向粉丝推送有新的探店笔记发布
*
* @param blogId 探店笔记 id
*/
public void sendBlog(long blogId) {
// 获取当前时间戳
long millis = System.currentTimeMillis();
opsForZSet.add(key, Long.toString(blogId), millis);
}
}
}
重构探店笔记发布接口,添加向粉丝收件箱发送通知的功能:
// ...
@Service
public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IBlogService {
// ...
@Override
public Result publishBlog(Blog blog) {
// 获取登录用户
UserDTO user = UserHolder.getUser();
blog.setUserId(user.getId());
// 保存探店博文
boolean save = save(blog);
if (save) {
// 获取当前用户的粉丝
List<Long> fansIds = followService.getFans(user.getId());
// 为粉丝推送探店笔记更新通知
fansIds.forEach(uid -> {
RedisBlogMailbox redisBlogMailbox = new RedisBlogMailbox(stringRedisTemplate, uid);
redisBlogMailbox.sendBlog(blog.getId());
});
}
// 返回id
return Result.ok(blog.getId());
}
}
实现分页查询 SortedSet 的代码:
// ...
public interface IBlogService extends IService<Blog> {
// ...
class RedisBlogLike {
// ...
/**
* 分页读取探店笔记 id
*
* @param maxTimestamp 读取范围的最大时间戳
* @param offset 上次读取的相同元素偏移量
* @param count 页宽
* @return 分页查询结果
*/
public ScrollResult<Long> pageRead(Long maxTimestamp, Integer offset, int count) {
// 如果最大时间戳未指定,使用当前时间戳
if (maxTimestamp == null) {
maxTimestamp = System.currentTimeMillis();
}
// 如果相同元素偏移量未指定,视作第一次请求,设置为0
if (offset == null) {
offset = 0;
}
Set<ZSetOperations.TypedTuple<String>> blogTuples = opsForZSet.reverseRangeByScoreWithScores(key, 0, maxTimestamp, offset, count);
if (blogTuples == null) {
return null;
}
long minTime = 0; // 本次查询的最小时间戳
int nextOffset = 0; // 本次查询的末尾相同元素个数
for (ZSetOperations.TypedTuple<String> blogTuple : blogTuples) {
// 获取当前时间戳
long blogTime = Long.parseLong(Objects.requireNonNull(blogTuple.getValue()));
// 比较本次时间戳与上次时间戳
if (blogTime == minTime) {
// 时间戳相同的连续元素出现,偏移量自增
nextOffset++;
} else {
// 时间戳与上次时间戳不同,重置偏移量
nextOffset = 0;
}
// 本次时间戳设置为最小时间戳
minTime = blogTime;
}
List<Long> ids = blogTuples.stream().map(tuple -> Long.valueOf(Objects.requireNonNull(tuple.getValue()))).collect(Collectors.toList());
ScrollResult<Long> scrollResult = new ScrollResult<>();
scrollResult.setList(ids);
scrollResult.setMinTime(minTime);
scrollResult.setOffset(nextOffset + 1);
return scrollResult;
}
}
}
实现获取探店笔记信息流的接口:
// ...
@RestController
@RequestMapping("/blog")
public class BlogController {
// ...
/**
* 获取探店笔记的信息流
*
* @param lastId 上次调用接口时获取的最小时间戳(第一次查询时传当前时间戳)
* @return 探店笔记的信息流
*/
@GetMapping("/of/follow")
public Result queryBlogsFeed(@RequestParam("lastId") Long lastId, @RequestParam(value = "offset", required = false) Integer offset) {
return blogService.queryBlogsFeed(lastId, offset);
}
}
Service 层实现:
// ...
@Service
public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IBlogService {
// ...
@Override
public Result queryBlogsFeed(Long lastId, Integer offset) {
// 从 Redis 当前用户收件箱分页查询探店笔记id
RedisBlogMailbox redisBlogMailbox = new RedisBlogMailbox(stringRedisTemplate, UserHolder.getUser().getId());
ScrollResult<Long> scrollResult = redisBlogMailbox.pageRead(lastId, offset, 5);
// 向结果中填充博客详情
List<Blog> blogs = new ArrayList<>(scrollResult.getList().size());
for (Long blogId : scrollResult.getList()) {
Blog blog = this.getById(blogId);
this.fillExtraInfo(blog);
blogs.add(blog);
}
ScrollResult<Blog> result = new ScrollResult<>();
result.setList(blogs);
result.setMinTime(scrollResult.getMinTime());
result.setOffset(scrollResult.getOffset());
return Result.ok(result);
}
}
需要说明的是,这里基于上次查询的最小时间戳和相同元素偏移量的滚动分页查询实现是有 Bug 的。当页宽(每页元素个数)小于连续相同元素个数时,会导致连续查询到同一页数据的情况发生。比如有连续6个博文的时间戳相同,页宽是3,第一次查询时返回的偏移量是3,第二次查询,正常返回第4~6个博文,返回的偏移量仍然是3,第三次查询使用的时间戳与偏移量都与第二次相同,查询到的结果必然也与第二次相同,就会导致分页查询卡在这,一直重复返回第二页的内容。
当然连续出现多个博文时间戳相同的概率比较低,所以这个 Bug 正常情况下基本不会出现,同时还可以加大页宽来更进一步避免该 Bug 的出现。但如果使用的不是时间戳,而是更容易出现重复的其它信息作为 score,就要考虑采用其它分页实现来规避此 Bug。我在简单实现了一个方案,对应的调用示例见。
The End.
本文的完整示例代码可以从获取。
文章评论