消息队列
消息队列顾名思义就是存放消息的队列,最简单的消息队列至少应该包含下面几个角色:
- 队列:存储和管理消息,也被称为消息代理(broker)
- 生产者:把消息发送到消息队列
- 消费者:从消息队列中获取并处理消息
基于 List
Redis 中的 List 本质上是一个双端队列(双向链表),我们可以借助 RPUSH
和 LPOP
、LPUSH
和 RPOP
实现队列的功能。不过需要注意的是,如果 List 中没有元素,那么 LPOP
或者 RPOP
会返回 null,并不会进行阻塞。因此,我们需要使用 BRPOP
、BLPOP
来实现这个功能。
- client 1
# 超时时间为 0 表示无限等待
brpop mylist 0
- client 2
blpush mylist 100
注意
Redis 中的 List 是一个无界队列。
Redis 与 JVM 中的阻塞队列有些不同:
- Redis 中的 List 是无界队列;Java 支持固定容量的队列
- Redis 中的数据可以持久化;Java 的数据是放在内存中的
- Redis 天然是分布式的;Java 中的阻塞队列只对同一个 Java 进程可见
基于 List 实现的队列有以下缺点:
- 无法避免消息丢失:消费者取出消息后,还没来得及处理,如果这时候消费者挂了,那么消息就丢了
- 只支持单消费者:一条消息无法被多个消费者消费
发布订阅
PubSub(发布订阅)是 Redis 2.0 中引入的消息传递模型。消费者可以订阅多个 channel,生产者往对应的 channel 发送消息后,该 channel 中的所有消费者都可以收到此消息。
基本用法如下:
命令 | 说明 |
---|---|
SUBSCRIBE channel ... | 订阅多个 channel |
PSUBSCRIBE pattern ... | 通过通配符订阅 channel。? 表示一个字符、* 表示零个或多个字符、[] 表示匹配其中的一个字符,可以使用 \ 进行转义。 |
UNSUBSCRIBE [channel ...] | 取消订阅。如果未指定任何 channel,则客户端将取消订阅所有先前订阅的 channel,在这种情况下,每个取消订阅的 channel 都会向客户端发送一条消息。 |
PUBLISH channel message | 往指定 channel 发送消息 |
通配符规则如下:
h?llo
订阅 hello、hallo 和 hxllo 等h*llo
订阅 hllo 和 heeeello 等h[ae]llo
只能订阅 hello 和 hallo
注意
发布订阅模式天然是阻塞的,订阅者会一直监听对应的 channel。
- client 1
subscribe channel1
- client 2
# publish 命令的返回值表示有多少个消费者收到了此消息
publish channel1 hh
它的优点如下:
- 支持多生产和多消费者模式
它的缺点如下:
- 不支持数据持久化:必须先订阅 channel,才能收到消息。如果先往 channel 中发送消息,然后再订阅 channel,那么发送的消息就丢了。也就是说,消费者只能收到订阅 channel 之后的消息。
- 无法避免消息丢失:消费者取出消息后,还没来得及处理,如果这时候消费者挂了,那么消息就丢了。
- 消息堆积有上限:每个消费者都有一个消息缓冲区,消息会被放到这个缓冲区中,一旦积压的消息超过缓冲区上限,那么之前的消息就会被丢弃。
Stream
Stream 是 Redis 5.0 中引入的一种全新的数据类型,可以实现一个完善的消息队列。
发送消息的命令如下:
如果我们没有指定消息队列的最大容量,则默认是无界的。如果消息数量超过了队列的容量,那么最早的消息会被丢弃。
# 返回值是消息的 ID
127.0.0.1:6379> xadd users.queue * name zs age 11
"1745574101756-0"
读取消息的命令如下:
默认情况下,如果没有指定 COUNT
,则会读取所有消息。
# 从最早的消息开始读取,并读取所有数据
127.0.0.1:6379> xread STREAMS users.queue 0
1) 1) "users.queue"
2) 1) 1) "1745574101756-0"
2) 1) "name"
2) "zs"
3) "age"
4) "11"
# 如果需要读取最新的消息,则必须配合 BLOCK 使用,用于指定阻塞的毫秒数,为 0 表示一直阻塞。
# 如果不使用阻塞的方式读取,那么读取到的永远为 null
127.0.0.1:6379> xread BLOCK 0 STREAMS users.queue $
1) 1) "users.queue"
2) 1) 1) "1745574448221-0"
2) 1) "name"
2) "ww"
它的优点如下:
- 支持持久化:一条消息可以被多次反复消费,消息消费之后不会被移除
- 一条消息可以被多个消费者消费
- 支持阻塞读
它的缺点如下:
- 存在漏消费的情况:比如我需要读取最新的消息,读取消息后需要进行业务逻辑的处理,那么在业务逻辑处理期间发送的消息在下一次是无法被消费的,因为下一次我读取的是最新的消息。
是不是感觉 Stream 比较鸡肋,没有办法像消息队列那样从上一次消费的末尾接着进行消费?Redis 团队也意识到了这个问题,他们引入了消费者组来解决这个问题。
与 Kafka 中的概念很相似,一条消息只能被同一个消费者组中的一个消费者消费,但是可以被不同消费者组中的消费者同时消费。消费者组会维护一个标识,来记录该组最后处理的一条消息,哪怕消费者宕机,重启之后依旧会从上一次消费的地方继续消费,从而保证每条消息至少被消费一次。
除此之外,Redis 还引入了“消费确认”的机制。消费者消费消息后,该条消息就处于 pending 状态,会被放入一个 pending-list 中。当消息处理完成后,需要通过 XACK 来确认消费,标记该条消息已被处理,从而把该条消息从 pending-list 中移除。因此,此方式可以解决消息丢失的问题。
- 创建消费者组
# $ 的意义:如果 myqueue 这个队列已经存在了,那么是否需要把该队列中之前已有的消息放入消费者组中,$ 表示不需要,而 0 表示需要
# MKSTREAM 表示当 myqueue 这个队列不存在时,自动创建,可以省略 MKSTREAM
XGROUP CREATE myqueue group1 $ MKSTREAM
其它一些命令如下:
基于消费者组的 Stream 的优点如下:
- 可以有多个消费者竞争消息,加快消息的读取速度
- 有消息确认机制,保证每条消息至少被消费一次
- 可以解决漏读问题