如何扩展 Locust 支持 MQTT、Dubbo 私有协议,并贡献回社区
解读
面试官真正想考察的是:
- 对 Locust 插件机制(Client、EventHook、TaskSet、User 类继承体系)的理解深度;
- 能否把“公司内部的私有协议”抽象成可复用的开源组件,而非一次性脚本;
- 是否熟悉国内开源贡献流程(Gitee PR、CLA 签署、Issue 模板、中文文档规范);
- 在性能场景里如何同时保证“协议正确性”与“压测指标可信”。
回答时要体现“可落地、可复用、可合并”三板斧:代码结构清晰、单测覆盖、文档完整,并主动适配国内社区习惯。
知识点
-
Locust 插件扩展点
- 继承 User 类,重写 client 属性,必须实现 request 事件触发(fire_success/fire_failure)
- 使用 locust.events.request_success/request_failure 把自定义协议耗时、响应长度、异常注入到统计队列
- 通过 @events.test_start/@events.test_stop 做连接池初始化和优雅关闭,避免并发泄漏
-
MQTT 适配要点
- 基于 paho-mqtt 的 loop_start/loop_stop 做异步网络线程,防止阻塞 gevent
- 消息 QoS、Retain、Will 遗嘱等特性要暴露为 User 参数,方便场景差异化
- 需要实现“发布-订阅”双角色:PubTaskSet 与 SubTaskSet,并在 on_message 回调里手动 fire_success,保证吞吐量统计不丢失
-
Dubbo 适配要点
- 走 Telnet 协议或 Triple/HTTP2 网关时,可直接用 requests.Session;若走原生 Dubbo2 二进制,需用 dubbo-py 或 hessian4py,注意序列化兼容性
- 接口级、方法级、版本号、group 作为 Locust 变量,支持 CSV 数据驱动
- 因为 Dubbo 是“长连接+多路复用”,要在 client 内部维护连接池,并统计“连接等待时间”与“服务端线程池排队时间”两个细分指标
-
指标可信
- 自定义 name 字段把“主题/接口+方法”拼成指标维度,方便 Grafana 按业务下钻
- 对 MQTT 订阅场景,用消息序号做“端到端时延”校验,防止只统计到“发送成功”而忽略“到达成功”
- 对 Dubbo,用 attachments 带回服务端耗时,计算 RTT – ServerTime 得到网络+序列化耗时,辅助定位瓶颈侧
-
社区贡献流程(国内)
- 先在公司内部仓库建 feature 分支,跑完 1w 并发 8h 稳定性基线,收集火焰图与内存泄漏报告
- 在 Gitee 主仓库提 Issue,标题前缀 “feat:” 并贴设计文档(架构图+时序图),@maintainer 做 RFC 评审
- 代码规范:PEP8 + black 格式化,单测覆盖≥90%,必须提供 examples/mqtt_simple.py、examples/dubbo_simple.py
- 签署 CLA(电子协议),PR 描述用中英双语,changelog 写清楚“非破坏式更新,默认不引入新依赖”
- 通过后,在 Locust 官方文档“Third Party Plugins”列表里提交 PR,把 pip install locust-mqtt、locust-dubbo 指向你的 Gitee 仓库,形成双向引流
答案
“我会按五步落地:
第一步,抽象协议客户端。对 MQTT 封装 MqttUser,内部持有一个 paho.mqtt.client 实例,把 publish()/subscribe() 包装成 locust 的 client.request() 语义,并在 on_publish 回调里触发 events.request_success;对 Dubbo 封装 DubboUser,基于 dubbo-py 维护长连接池,把 invoke() 耗时和异常映射到同样的统计事件,保证 Locust WebUI 能实时看到 QPS、百分位线。
第二步,解决 gevent 协程阻塞。MQTT 的 loop_start() 已经异步,但 Dubbo 的 hessian 解码是 CPU 密集,我会用 gevent.threadpool 线程池做 offload,防止阻塞主循环。
第三步,场景分层。把“连接建立”放到 on_start,只执行一次;把“业务报文”做成 TaskSet,支持权重、数据驱动、断言,确保同一套代码既能跑容量也能跑稳定性。
第四步,指标增强。除了默认的 response_time、exception,我再扩展三个自定义指标:mqtt_e2e_latency、dubbo_queue_time、dubbo_net_time,用 locust.stats.print_stats() 的钩子实时打印,方便和内部监控系统对接。
第五步,贡献回社区。代码先在内部灰度 7 天,压测 5w 并发无内存泄漏后,整理成独立仓库 locust-mqtt、locust-dubbo,写中英文 README、docker-compose 示例、GitHub Action CI。接着在 Gitee 提 Issue 申请进入 Locust-China 官方组织,PR 通过后再向主仓库文档追加第三方插件链接,实现国内用户 pip install 即可用,同时把使用案例反哺社区,形成闭环。”
拓展思考
- 如果公司 Dubbo 接口启用了 Token 鉴权与动态注册中心(Nacos),如何在 Locust 客户端里做“服务发现+缓存+自动重连”而不破坏压测线程模型?
- MQTT 5.0 引入了“共享订阅”与“流量控制”,当订阅端采用 Shared Subscription 做集群扩容时,如何验证消息在多个 consumer 间的均衡性,并把“消息倾斜度”量化成新的 SLA 指标?
- 当插件被社区合并后,如何建立长期维护机制?例如:Locust 主版本升级导致事件钩子改名,如何在国内组建 SIG(特别兴趣小组)做兼容层,并让企业工程师轮值做 Maintainer,既保证社区活跃度,也让公司品牌持续曝光?