Stream 与 PubSub 选型
解读
在国内一线/二线互联网公司的 PHP 后端面试中,面试官问“Stream 与 PubSub 如何选型”并不是想听 Redis 命令怎么敲,而是考察候选人能否把“消息模型”与“业务场景”对应起来,并给出可落地的 PHP 技术方案。
核心矛盾只有两点:
- 消息是否需要“持久化 + 可重放”——决定用 Stream;
- 消息是否只要“ fire-and-forget 广播”——决定用 Pub/Sub。
把这两个点讲透,再补一句“PHP 侧如何接入、如何容灾、如何压测”,基本就能拿到高分。
知识点
-
Redis Stream
- 底层是 radix tree + 链表,消息持久化到内存(可配置淘汰策略),每个消息有唯一 ID;
- 支持消费者组(Consumer Group),具备 ACK 机制,可实现“至少一次”语义;
- 支持范围查询、阻塞读、PEL(Pending Entries List),可做重试队列;
- 内存随消息量线性增长,必须设置 maxlen 或定期 XTRIM,否则容易 OOM;
- PHP 接入:phpredis 5.3+ 已支持 xAdd、xReadGroup、xAck,Laravel 5.8+ 封装了 Illuminate\Queue\RedisQueue,可直接把 Stream 当队列驱动。
-
Redis Pub/Sub
- 纯内存广播,无持久化,发布者 > 订阅者 > 消息即消失;
- 不支持 ACK、不支持积压,客户端掉线期间的消息直接丢弃;
- 支持模式订阅(psubscribe),适合做配置推送、实时报警、秒杀倒计时广播;
- 当订阅者实例重启,需自己实现“冷启动”补偿(如定时拉 DB);
- PHP 接入:phpredis 的 subscribe/psubscribe 是阻塞 IO,生产环境必须配合 Swoole\Coroutine\Redis 或 ReactPHP 事件循环,防止 FPM 进程被吃光。
-
国内业务场景映射
- 订单超时关单、支付结果通知、库存回滚——需要可靠消费,选 Stream;
- 直播间弹幕、秒杀活动倒计时、配置热更新——可丢消息,选 Pub/Sub;
- 日志汇聚链路(ELK)——如果日志允许丢失,用 Pub/Sub 延迟最低;若要求 100% 可达,用 Stream 或直接走 Kafka。
-
性能与运维
- 单实例 Redis 5.0,Stream 写 QPS 约 8w,Pub/Sub 写 QPS 约 12w,差距不大;
- Stream 内存随消息累积,国内云厂商 16 G 主从实例大约 4 亿条 512 B 消息就满,需要 32 G 或分片;
- 阿里云、腾讯云均提供“Redis Stream 监控模板”,指标:内存使用率、xAdd 延迟、消费者组 lag;
- 私有云场景,必须给 Stream 配置“maxmemory-policy: volatile-lru” + “xtrim maxlen ~100000”,防止高并发下瞬间打满。
-
PHP 实战代码骨架(面试现场可手撕)
Stream 生产者:$redis = new Redis(); $redis->pconnect('127.0.0.1', 6379); $msgId = $redis->xAdd('order_stream', '*', ['orderId'=>123, 'status'=>'pay']);Stream 消费者组:
$redis->xGroup('CREATE', 'order_stream', 'group_1', '0', true); while (true) { $msgs = $redis->xReadGroup('group_1', 'consumer_1', ['order_stream'=>'>'], 1, 2000); foreach ($msgs as $stream => $entries) { foreach ($entries as $id => $data) { handle($data); $redis->xAck('order_stream', 'group_1', $id); } } }Pub/Sub 热配置:
$redis->psubscribe(['__keyevent@0__:expired'], function($r, $p, $chan, $msg){ if (strpos($msg, 'config:') === 0) { apcu_delete($msg); // 清本地缓存 } });
答案
“选型先问消息能不能丢”。
- 如果业务要求‘至少一次’、可重放、可回溯,例如订单超时关单、库存回滚、对账补单,直接用 Redis Stream;PHP 侧用 phpredis 的 xAdd + xReadGroup,配合 Laravel Queue 的 redis-stream 驱动,消费失败写入 PEL 表,定时巡检重试;运维层面给 Stream 设置 maxlen 或定时 XTRIM,防止内存爆炸。
- 如果业务是纯广播、可接受断线丢包,例如直播间弹幕、秒杀倒计时、配置推送,用 Redis Pub/Sub;PHP 侧一定不能用传统 FPM 阻塞订阅,而是用 Swoole 协程或 CLI 常驻进程,订阅端重启后必须自己补偿冷数据。
- 当单 Redis 实例内存或网卡打满,国内主流方案是:
- Stream 场景 → 水平拆分到多个 8 G 主从实例,按业务 key 做哈希分片,或者干脆迁移到阿里云 Kafka;
- Pub/Sub 场景 → 直接换云厂商的 MQTT、RocketMQ 广播模式,省掉自己运维。
一句话总结:Stream = 可靠队列,Pub/Sub = 内存广播;先定业务语义,再定技术实现,PHP 只是客户端,选错模型再牛的扩展也救不了。
拓展思考
-
如果公司已经把 Redis Stream 当事件总线,如何做到“跨机房容灾”?
答:国内多活架构通常采用“双写 + 幂等”:业务层写北京 Stream 的同时异步写上海 Stream,消费者两端同时消费,用 orderId 做幂等键;PHP 侧在写入前先在本地 MySQL 插入“事件发布表”做事务兜底,失败重试。 -
Stream 消费者组出现“消息堆积”如何快速定位?
答:- 云监控看 xinfo groups 的 pending 长度;
- 在 PHP 消费者里埋点:每处理 1000 条写一次 Prometheus gauge,标签带 group/consumer;
- 如果 lag 持续增长,先水平扩容 consumer 实例,再检查 handle() 里是否出现慢 SQL 或外部接口超时。
-
Pub/Sub 想做“离线补推”有没有低成本方案?
答:PHP 侧在发布前把消息多写一条到 Stream(当审计日志),订阅端重启后先 xread 这条 Stream 做回放,再进入实时 Pub/Sub;这样只用一套 Redis,运维成本最低,国内中小厂普遍这么玩。