死信队列配置与监控

解读

在国内互联网公司的PHP面试中,"死信队列"并不是单纯考察RabbitMQ或Kafka的运维指令,而是考察候选人是否具备"消息可靠性闭环"的工程思维。
面试官想确认三件事:

  1. 你是否知道PHP业务侧如何产生死信(TTL超时、拒绝、队列满、路由失败)。
  2. 你是否能在PHP代码里把死信路由到独立队列,并用最小侵入的方式记录轨迹。
  3. 你是否能给出可落地的监控方案,让运维凌晨3点收到飞书/企业微信告警,而不是等用户投诉。
    回答时切忌只背参数,要结合PHP-FPM/CLI、Laravel/Symfony、阿里云MNS/腾讯云CMQ等国内真实组件给出"可复制"的套路。

知识点

  1. DLX(Dead-Letter-Exchange)与DLK(Dead-Letter-Routing-Key)的声明方式
  2. 消息TTL的两种设置:队列级x-message-ttl与消息级expiration的区别
  3. PHP-AmqpLib、Laravel-queue-RabbitMQ、Hyperf/amqp的声明API差异
  4. 死信队列的命名规范:app.service.dlx.{原队列名},方便自动发现
  5. 监控维度:死信数量、死信增长率、最早死信存活时间、业务拒绝原因分布
  6. 国内云厂商的"消息轨迹"功能:阿里云MNS日志转储、腾讯云CMQ消息回溯、华为云DMS死信告警
  7. Prometheus exporter:php-amqplib-metrics、hyperf/metric,配合Grafana模板
  8. 重试策略:阶梯退避(0s、1s、5s、30s、300s)与"重试队列+延迟插件"实现,避免直接压回业务队列
  9. 最终人工干预:PHP命令行消费死信,写入MySQL审计表,提供后台UI一键重发或丢弃
  10. 灰度与回滚:利用PHP的feature-flag库(如Laravel Pennant)动态关闭死信投递,防止新版本bug瞬间打爆DLX

答案

以RabbitMQ+Laravel为例,给出一套可直接落地的配置与监控代码。

  1. 声明阶段(服务提供器里完成,防止重复声明)
use PhpAmqpLib\Wire\AMQPTable;

$channel = app('amqp')->channel();
// 原业务队列
$channel->queue_declare(
    'order.pay.q',
    false, true, false, false, false,
    new AMQPTable([
        'x-dead-letter-exchange'    => 'dlx.direct',
        'x-dead-letter-routing-key' => 'order.pay.dlx',
        'x-message-ttl'             => 30000, // 30s超时
    ])
);
// 死信队列
$channel->queue_declare(
    'order.pay.dlx.q',
    false, true, false, false, false,
    new AMQPTable([
        'x-message-ttl' => 86400000, // 一天后自动删除
    ])
);
$channel->queue_bind('order.pay.dlx.q', 'dlx.direct', 'order.pay.dlx');
  1. 业务消费端拒绝时触发死信
public function handle(AMQPMessage $msg)
{
    try {
        $payload = json_decode($msg->body, true);
        // 幂等校验
        if (Order::where('out_trade_no', $payload['trade_no'])->exists()) {
            $msg->ack();
            return;
        }
        // 业务处理
        (new PayService)->handle($payload);
        $msg->ack();
    } catch (BizException $e) {
        // 可预期异常,直接拒绝并记录原因
        $msg->nack(false, false); // requeue=false,进入死信
        Log::channel('deadletter')->warning('nack', ['reason'=>$e->getMessage()]);
    }
}
  1. 监控与告警(基于Prometheus+企业微信)
// 定时任务每分钟跑一次
$dlxCount = Cache::remember('mq:dlx:order.pay.dlx.q', 60, function () {
    return app('amqp')->channel()->queue_declare('order.pay.dlx.q', true, true);
})[1]; // 返回消息数量

// 埋点
app('prometheus')->getOrRegisterGauge(
    'mq_dlx_messages',
    'Dead letter queue depth',
    ['queue']
)->set($dlxCount, ['order.pay.dlx.q']);

// 告警规则(Prometheus)
// increase(mq_dlx_messages{queue="order.pay.dlx.q"}[5m]) > 10
// 触发后通过Alertmanager调用企业微信机器人
  1. 后台人工干预命令
 Artisan::command('mq:dlx:replay {queue}', function ($queue) {
     $channel = app('amqp')->channel();
     while ($msg = $channel->basic_get($queue . '.dlx.q')) {
         $body = json_decode($msg->body, true);
         // 重新投递到原队列
         app('amqp')->basic_publish(
             new AMQPMessage($msg->body, ['delivery_mode' => 2]),
             '',
             $queue
         );
         $msg->ack();
         $this->info("Replayed: " . $body['trade_no']);
     }
 });

通过以上四步,PHP团队可以做到:

  • 代码零侵入声明DLX,TTL与拒绝策略清晰;
  • Prometheus实时采集死信深度,5分钟内异常即可收到企业微信卡片;
  • 值班同学凌晨一键php artisan mq:dlx:replay,无需登录服务器敲rabbitmqctl。

拓展思考

  1. 如果公司采用阿里云RocketMQ,PHP客户端不支持延迟消息,如何用"定时任务+本地消息表"模拟死信重试?
  2. 当死信增长过快,PHP-FPM可能瞬间打满连接池,如何借助Swoole协程消费死信,实现单机1万QPS的清理能力?
  3. 金融场景下,死信队列本身也可能成为攻击面(如大量伪造订单),PHP侧如何结合OpenResty+WAF实现消息内容校验,防止死信被恶意重放?