Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息,可以实现进程间的消息传递
发布订阅其实是一个轻量的队列,只不过数据不会被持久化,一般用来处理实时性较高的异步消息
使用
- 基于频道(channel):
- 发布:
publish <channel> <message>
- 订阅:
subscribe <channel1> [<channel2> ...]
- 退订:
unsubscribe <channel1> [<channel2> ...]
- 退订所有频道:
unsubscribe
- 发布:
- 基于模式(pattern)
- 发布:
psubscribe <pattern> <message>
- 订阅:
punsubscribe <pattern1> [<pattern2> ...]
- 退订:
punsubscribe <pattern1> [<pattern2> ...]
- 退订所有模式:
punsubscribe
- 发布:
基于频道的发布订阅
发布者发布消息:
> publish channel:1 hi
(integer) 1 # 有几个订阅者接收到消息
订阅者订阅频道:
> subscribe channel:1
Reading messages... (press Ctrl-C to quit)
1) "message" # 消息类型
2) "channel:1" # 频道
3) "hi" # 消息内容
注意:
- 订阅者不会收到订阅之前就发布到该频道的消息
- 执行上面命令后客户端会进入订阅状态,处于此状态下客户端不能使用除
subscribe
、unsubscribe
、psubscribe
、punsubscribe
这四个属于发布订阅之外的命令,否则报错 - 进入订阅状态后客户端可能收到 6 种类型的回复。每种类型的回复都包含 3 个值,第一个值是消息的类型,根据消类型的不同,第二个和第三个参数的含义可能不同
subscribe
:表示订阅成功的反馈信息,第二个值是订阅成功的频道名称,第三个是当前客户端订阅的频道数量message
:表示接收到的消息,第二个值表示产生消息的频道名称,第三个值是消息的内容unsubscribe
:表示成功取消订阅某个频道,第二个值是对应的频道名称,第三个值是当前客户端订阅的频道数量(当此值为 0 时客户端会退出订阅状态)psubscribe
pmessage
punsubscribe
基于模式的发布订阅
如果有某个/某些模式和这个频道匹配的话,那么所有订阅这个/这些频道的客户端也同样会收到信息
发布者发布消息:
> publish c m1
(integer) 0
> publish c1 m1
(integer) 1
> publish c11 m1
(integer) 0
> publish b m1
(integer) 1
> publish b1 m1
(integer) 1
> publish b11 m1
(integer) 1
> publish d m1
(integer) 0
> publish d1 m1
(integer) 1
> publish d11 m1
(integer) 1
订阅者订阅频道:
> psubscribe c? b* d?*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "c?"
3) (integer) 1
1) "psubscribe"
2) "b*"
3) (integer) 2
1) "psubscribe"
2) "d?*"
3) (integer) 3
1) "pmessage"
2) "c?"
3) "c1"
4) "m1"
1) "pmessage"
2) "b*"
3) "b"
4) "m1"
1) "pmessage"
2) "b*"
3) "b1"
4) "m1"
1) "pmessage"
2) "b*"
3) "b11"
4) "m1"
1) "pmessage"
2) "d?*"
3) "d1"
4) "m1"
1) "pmessage"
2) "d?*"
3) "d11"
4) "m1"
注意:
- 使用
psubscribe
可以重复订阅同一个频道- 如:客户端执行了
psubscribe c? c?*
,这时向c1
频道发布消息客户端会接受到两条消息,而且publish
的返回值是 2 而不是 1 - 如:客户端执行了
subscribe c1
和psubscribe c?*
,向c1
频道发送一条消息客户端也会受到两条消息(两种类型:message
和pmessage
),同时publish
也返回 2
- 如:客户端执行了
- 使用
punsubscribe
只能退订通过psubscribe
命令订阅的规则,不会影响通过subscribe
订阅的频道;同样unsubscribe
也不会影响通过psubscribe
订阅的规则 - 另外需要注意
punsubscribe
退订某个规则时不会将其中的通配符展开,而是进行严格的字符串匹配,所以punsubscribe *
无法退订c*
规则,而是必须使用punsubscribe c*
才可以退订
底层实现
基于频道的发布订阅
底层是通过字典(图中的 pubsub_channels
)实现的,这个字典用于保存订阅频道的信息:键为正在被订阅的频道;值是一个链表,链表中保存了所有订阅这个频道的客户端
基于模式的发布订阅
redisServer.pubsub_patterns
属性是一个链表,链表中保存着所有和模式相关的信息:
struct redisServer {
// ...
list *pubsub_patterns;
// ...
};
链表中的每个节点都包含一个 redis.h/pubsubPattern
结构:
typedef struct pubsubPattern {
redisClient *client; // 订阅模式的客户端
robj *pattern; // 被订阅的模式
} pubsubPattern;
每当调用 PSUBSCRIBE
命令订阅一个模式时,程序就创建一个包含客户端信息和被订阅模式的 pubsubPattern
结构,并将该结构添加到 redisServer.pubsub_patterns
链表中:
发布订阅机制可以作为消息队列吗?
发布订阅机制存在以下缺点,都是跟丢失数据有关:
- 必须先执行订阅,再等待消息发布。如果先发布了消息,那么该消息由于没有订阅者,会被直接丢弃
- 也没有 ACK 机制,无法保证消息的消费成功
- 发布订阅机制没有基于任何数据类型实现,所以不具备「数据持久化」的能力,就是不会写入到 RDB 和 AOF 中,当 Redis 宕机重启,数据全部丢失
- 发布订阅模式是“发后既忘”的工作模式,如果有订阅者离线重连之后不能消费之前的历史消息
- 当消费端有一定的消息积压时,如果超过 32M 或者是 60s 内持续保持在 8M 以上,消费端会被强行断开(可配置
client-output-buffer-limit pubsub 32mb 8mb 60
)
所以,发布订阅机制只适合即时通讯的场景,如:构建哨兵集群