sorted set在没有重复元素的集合基础上,每个元素多了一个分数score属性作为权重,查询时可以用score排序。
Sorted Set类型的常用操作
ZADD 添加元素
127.0.0.1:6379> ZADD rank 98 u1 (integer) 1 127.0.0.1:6379> ZADD rank 95 u2 (integer) 1 127.0.0.1:6379> ZADD rank 60 u3 (integer) 1 127.0.0.1:6379> ZADD rank 76 u4 (integer) 1 127.0.0.1:6379> ZADD rank 77 u5 (integer) 1
ZCARD 获取集合成员数量
127.0.0.1:6379> ZCARD rank (integer) 5
ZCOUNT 获取指定分数段内的成员数量
127.0.0.1:6379> ZCOUNT rank 80 100 (integer) 2
ZINCRBY 给集合中指定成员增加指定分数
127.0.0.1:6379> ZINCRBY rank 2 u3 "62"
ZRANGE/ZREVRANGE 返回按索引排序的成员
127.0.0.1:6379> ZRANGE rank 0 1 # 分数从小到大 1) "u3" 2) "u4" 127.0.0.1:6379> ZREVRANGE rank 0 2 # 分数从大到下 1) "u1" 2) "u2" 3) "u5"
ZRANGEBYSCORE/ZREVRANGEBYSCORE 返回按指定分数段内的元素列表
127.0.0.1:6379> ZRANGEBYSCORE rank 80 100 # 分数段从小到大 1) "u2" 2) "u1" 127.0.0.1:6379> ZREVRANGEBYSCORE rank 100 80 # 分数段从大到小 1) "u1" 2) "u2"
ZREM 删除指定元素
127.0.0.1:6379> ZREM rank u2 (integer) 1
实际应用场景
1.排行榜
实现一个实时变化的积分排行榜,可以利用 ZREVRANGE 轻松展现最高排名,并实现分页查询
首先将成员名单录入
127.0.0.1:6379> ZADD rank 0 player1 0 player2 0 player3 0 player4 0 player5 0 player6 (integer) 6
当然也可以在合适的时机将新增的成员单个写入
然后开始实时计算增加的分数 使用ZINCRBY, 增加完后会直接返回当前成员的最终分数
127.0.0.1:6379> ZINCRBY rank 50 player1 "50" 127.0.0.1:6379> ZINCRBY rank 53 player3 "53" 127.0.0.1:6379> ZINCRBY rank 20 player5 "20" 127.0.0.1:6379> ZINCRBY rank 40 player6 "40" 127.0.0.1:6379> ZINCRBY rank 40 player5 "60" 127.0.0.1:6379> ZINCRBY rank -2 player6 # 负数也是可以的 相当于减分 "38"
也可以不用初始化成员分数 直接给一个不存在的的成员增加分数,默认会创建并增加分数,默认从0开始
127.0.0.1:6379> ZINCRBY rank 55 player7 "55"
最后获取排行榜上前3的成员,使用ZREVRANGE
127.0.0.1:6379> ZREVRANGE rank 0 2 1) "player5" 2) "player7" 3) "player3"
如果要带上分数,需要增加一个参数 WITHSCORES
127.0.0.1:6379> ZREVRANGE rank 0 2 WITHSCORES 1) "player5" 2) "60" 3) "player7" 4) "55" 5) "player3" 6) "53"
2.定时/延迟任务
定时/延迟任务,或是延迟队列类功能,最重要的是在指定时间后才开始处理相关任务。
以下是大致实现方式
任务生产端:
每个任务生成一个任务ID(要有唯一性),将具体的任务信息可以放在其他数据库里,当然也可以放在Redis里。然后将任务ID作为sorted set的成员, 需要执行任务时间点的时间戳作为score写入(如果是延迟任务就计算出具体的时间戳)
127.0.0.1:6379> ZADD tasks 1000 t1 # 假设当前时间戳从1000开始 (integer) 1 127.0.0.1:6379> ZADD tasks 1003 t2 # 3s后才能触发的任务 (integer) 1 127.0.0.1:6379> ZADD tasks 1004 t3 (integer) 1 127.0.0.1:6379> ZADD tasks 1004 t4 (integer) 1
这时sorted set里的任务顺序大概是这样(按分数从小达到)
t1 | t2 | t3 | t4 |
---|---|---|---|
1000 | 1003 | 1004 | 1004 |
任务消费端
启动一个进程循环获取任务, 获取分数的范围在当前时间戳之前(表示已经到期的任务)
127.0.0.1:6379> ZRANGEBYSCORE tasks -inf 1000 1) "t1"
程序代码大概可以这样写
# 伪代码 while true { nowTimestamp = time.now() # 获取当前时间戳 taskIds = redis.ZRANGEBYSCORE("tasks", "-inf", nowTimestamp) for taskId in TaskIds { # TODO 执行任务处理逻辑 } }
不过有个很明显的问题, 当时间戳到1003的时候,任务t1还是能够获取到
127.0.0.1:6379> ZRANGEBYSCORE tasks -inf 1003 1) "t1" 2) "t2"
这是因为ZRANGEBYSCORE只是查询出来,并没有删除元素,删除元素需要使用ZREM
完善一下代码
# 伪代码 while true { nowTimestamp = time.now() # 获取当前时间戳 taskIds = redis.ZRANGEBYSCORE("tasks", "-inf", nowTimestamp) for taskId in taskIds { # TODO 执行任务处理逻辑 # 正常处理了调用ZREM redis.ZREM("tasks", taskId) } }
接下来随着任务越来越多,上面的处理逻辑可能短时间内也处理不完,或者单个任务处理时间较长,就会导致已经到期的任务无法及时处理。
一般情况下,任务之间是没有依赖关系,这时可以考虑横向库容消费进程,并行处理任务。
不过直接将上面的逻辑复制在多个进程里,会有任务重复的问题。
t1 | t2 | t3 | t4 |
---|---|---|---|
1000 | 1003 | 1004 | 1004 |
进程1查询点 | |||
进程2查询点 |
就像上面一样,可能出现,两个进程同时获取到t1任务
有两种处理方式:
第一种 增加分布式锁,可以基于Redis的set类型数据
代码如下
# 伪代码 while true { nowTimestamp = time.now() # 获取当前时间戳 taskIds = redis.ZRANGEBYSCORE("tasks", "-inf", nowTimestamp) for taskId in taskIds { addSuccess = redis.SADD("tasks:lock", taskId) if addSuccess == 0 { # 写入不成功 说明有其他进程在处理该任务 continue } # TODO 执行任务处理逻辑 redis.ZREM("tasks", taskId) # 正常处理了调用ZREM 删除任务 redis.SREM("tasks:lock", taskId) # 释放锁, 当然如果是任务处理失败 也可以释放锁 以便可以重试任务 } }
如果某一段时间没有任务,会存在 ZRANGEBYSCORE 空转的情况,增加了不必要的网络调用
可以在获取不到任务的时候,等待一个任务时间最小间隔 通常是1s
# 伪代码 while true { nowTimestamp = time.now() # 获取当前时间戳 taskIds = redis.ZRANGEBYSCORE("tasks", "-inf", nowTimestamp) if len(taskIds) == 0 { sleep(1) continue } for taskId in taskIds { addSuccess = redis.SADD("tasks:lock", taskId) if addSuccess == 0 { # 写入不成功 说明有其他进程在处理该任务 continue } # TODO 执行任务处理逻辑 redis.ZREM("tasks", taskId) # 正常处理了调用ZREM 删除任务 redis.SREM("tasks:lock", taskId) # 释放锁, 当然如果是任务处理失败 也可以释放锁 以便可以重试任务 } }
第二种 任务实际处理逻辑继续下沉
当前程序不进行任务处理 只取到期的任务,任务处理交给其他任务处理进程专门处理,只向List里推送任务
# 伪代码 # 定时获取任务程序 保留一个 while true { nowTimestamp = time.now() # 获取当前时间戳 taskIds = redis.ZRANGEBYSCORE("tasks", "-inf", nowTimestamp) if len(taskIds) == 0 { sleep(1) continue } for taskId in taskIds { redis.LPUSH("tasks:list", taskId) # 下推到队列任务 redis.ZREM("tasks", taskId) # 删除指定元素 } } # 下层的任务处理程序 可以横向扩展运行多个进程 while true { taskId = redis.BRPOP("tasks:list", 10) # 还考虑服务器 闲置连接超时 断开的异常 if taskId is null { continue } # TODO 执行任务处理逻辑 # 任务如果处理失败需要重试 可以考虑重新ZADD到 tasks 里 }
如果生产消息的速度非常快,也是可以考虑把 “定时获取任务程序” 也横向扩展,不过需要增加分布式锁,避免重复消费,可参考 第一种 的处理方式;也可以将任务按业务类型分类,放在不同的sorted set 中分别处理,互不影响。
到此这篇关于Redis Sorted Set类型使用及应用场景的文章就介绍到这了,更多相关Redis Sorted Set使用内容请搜索IT俱乐部以前的文章或继续浏览下面的相关文章希望大家以后多多支持IT俱乐部!