Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息,可以实现进程间的消息传递

发布订阅其实是一个轻量的队列,只不过数据不会被持久化,一般用来处理实时性较高的异步消息

使用

  • 基于频道(channel):
    • 发布:publish <channel> <message>
    • 订阅:subscribe <channel1> [<channel2> ...]
    • 退订: unsubscribe <channel1> [<channel2> ...]
    • 退订所有频道:unsubscribe
  • 基于模式(pattern)
    • 发布:psubscribe <pattern> <message>
    • 订阅:punsubscribe <pattern1> [<pattern2> ...]
    • 退订: punsubscribe <pattern1> [<pattern2> ...]
    • 退订所有模式:punsubscribe

基于频道的发布订阅

发布者发布消息:

> publish channel:1 hi
(integer) 1 # 有几个订阅者接收到消息

订阅者订阅频道:

> subscribe channel:1
Reading messages... (press Ctrl-C to quit)
1) "message"   # 消息类型
2) "channel:1" # 频道
3) "hi"        # 消息内容

注意:

  • 订阅者不会收到订阅之前就发布到该频道的消息
  • 执行上面命令后客户端会进入订阅状态,处于此状态下客户端不能使用除 subscribeunsubscribepsubscribepunsubscribe 这四个属于发布订阅之外的命令,否则报错
  • 进入订阅状态后客户端可能收到 6 种类型的回复。每种类型的回复都包含 3 个值,第一个值是消息的类型,根据消类型的不同,第二个和第三个参数的含义可能不同
    • subscribe:表示订阅成功的反馈信息,第二个值是订阅成功的频道名称,第三个是当前客户端订阅的频道数量
    • message:表示接收到的消息,第二个值表示产生消息的频道名称,第三个值是消息的内容
    • unsubscribe:表示成功取消订阅某个频道,第二个值是对应的频道名称,第三个值是当前客户端订阅的频道数量(当此值为 0 时客户端会退出订阅状态)
    • psubscribe
    • pmessage
    • punsubscribe

基于模式的发布订阅

如果有某个/某些模式和这个频道匹配的话,那么所有订阅这个/这些频道的客户端也同样会收到信息

发布者发布消息:

> publish c m1
(integer) 0
> publish c1 m1
(integer) 1
> publish c11 m1
(integer) 0
> publish b m1
(integer) 1
> publish b1 m1
(integer) 1
> publish b11 m1
(integer) 1
> publish d m1
(integer) 0
> publish d1 m1
(integer) 1
> publish d11 m1
(integer) 1

订阅者订阅频道:

> psubscribe c? b* d?*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "c?"
3) (integer) 1
1) "psubscribe"
2) "b*"
3) (integer) 2
1) "psubscribe"
2) "d?*"
3) (integer) 3
1) "pmessage"
2) "c?"
3) "c1"
4) "m1"
1) "pmessage"
2) "b*"
3) "b"
4) "m1"
1) "pmessage"
2) "b*"
3) "b1"
4) "m1"
1) "pmessage"
2) "b*"
3) "b11"
4) "m1"
1) "pmessage"
2) "d?*"
3) "d1"
4) "m1"
1) "pmessage"
2) "d?*"
3) "d11"
4) "m1"

注意:

  • 使用 psubscribe 可以重复订阅同一个频道
    • 如:客户端执行了 psubscribe c? c?*,这时向 c1 频道发布消息客户端会接受到两条消息,而且 publish 的返回值是 2 而不是 1
    • 如:客户端执行了 subscribe c1psubscribe c?*,向 c1 频道发送一条消息客户端也会受到两条消息(两种类型:messagepmessage),同时 publish 也返回 2
  • 使用 punsubscribe 只能退订通过 psubscribe 命令订阅的规则,不会影响通过 subscribe 订阅的频道;同样 unsubscribe 也不会影响通过 psubscribe 订阅的规则
  • 另外需要注意 punsubscribe 退订某个规则时不会将其中的通配符展开,而是进行严格的字符串匹配,所以 punsubscribe * 无法退订 c* 规则,而是必须使用 punsubscribe c* 才可以退订

底层实现

基于频道的发布订阅

底层是通过字典(图中的 pubsub_channels)实现的,这个字典用于保存订阅频道的信息:键为正在被订阅的频道;值是一个链表,链表中保存了所有订阅这个频道的客户端

基于模式的发布订阅

redisServer.pubsub_patterns 属性是一个链表,链表中保存着所有和模式相关的信息:

struct redisServer {
    // ...
    list *pubsub_patterns;
    // ...
};

链表中的每个节点都包含一个 redis.h/pubsubPattern 结构:

typedef struct pubsubPattern {
    redisClient *client; // 订阅模式的客户端
    robj *pattern;       // 被订阅的模式
} pubsubPattern;

每当调用 PSUBSCRIBE 命令订阅一个模式时,程序就创建一个包含客户端信息和被订阅模式的 pubsubPattern 结构,并将该结构添加到 redisServer.pubsub_patterns 链表中:

发布订阅机制可以作为消息队列吗?

发布订阅机制存在以下缺点,都是跟丢失数据有关:

  1. 必须先执行订阅,再等待消息发布。如果先发布了消息,那么该消息由于没有订阅者,会被直接丢弃
  2. 也没有 ACK 机制,无法保证消息的消费成功
  3. 发布订阅机制没有基于任何数据类型实现,所以不具备「数据持久化」的能力,就是不会写入到 RDB 和 AOF 中,当 Redis 宕机重启,数据全部丢失
  4. 发布订阅模式是“发后既忘”的工作模式,如果有订阅者离线重连之后不能消费之前的历史消息
  5. 当消费端有一定的消息积压时,如果超过 32M 或者是 60s 内持续保持在 8M 以上,消费端会被强行断开(可配置 client-output-buffer-limit pubsub 32mb 8mb 60

所以,发布订阅机制只适合即时通讯的场景,如:构建哨兵集群