如何关闭发送端并优雅退出?

解读

在国内 Rust 后端/中间件面试中,面试官问“如何关闭发送端并优雅退出”时,真正想考察的是你对异步通道(tokio::sync::mpsc)、资源生命周期、任务取消(Cancellation Token)以及进程退出码的综合掌控能力
他希望你回答的不只是“drop(sender)”,而是:

  1. 主动关闭发送端,让接收端能优雅地处理剩余消息
  2. 通知所有异步任务有序收尾,避免 tokio 运行时强制杀掉任务导致数据丢失;
  3. 最终主函数返回 ExitCode::SUCCESS,符合国内运维对“优雅退出”的审计要求。

知识点

  • tokio::sync::mpsc::Sender<T>:实现了 Clone,当所有 Sender 被 drop 后,Receiver::recv() 返回 None,这是 Rust 社区公认的“半关”信号
  • tokio::select!biased 模式CancellationToken:在关闭阶段优先处理取消信号,防止忙等。
  • Drop 顺序与 RAII:把 Sender 包在自定义 struct 里实现 Drop,打印审计日志,国内金融级代码评审常看这一点
  • std::process::ExitCode:Rust 1.61+ 提供的标准退出码,比 std::process::exit 不会提前刷新问题,更符合银行/券商合规要求。
  • tokio::signal::ctrl_c():在 Linux 下注册 SIGINT/SIGTERM,国内容器平台(阿里云 ACK、腾讯云 TKE)默认发 SIGTERM,10 s 后强制 SIGKILL,必须在这段时间内完成优雅退出。

答案

use std::process::ExitCode;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;

#[tokio::main]
async fn main() -> ExitCode {
    let (tx, mut rx) = mpsc::channel::<String>(1024);
    let guard = SenderGuard(tx);          // 1. RAII 管理 Sender
    let token = CancellationToken::new();

    // 2. 业务任务
    let producer = tokio::spawn({
        let guard = guard.clone();
        let token = token.clone();
        async move {
            for i in 0.. {
                if token.is_cancelled() { break; }
                guard.0.send(format!("msg-{i}")).await.unwrap();
                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
            }
            // 3. 主动 drop 本任务持有的 Sender,通知接收端
            drop(guard);
        }
    });

    // 4. 优雅关机任务
    let shutdown = tokio::spawn({
        let token = token.clone();
        async move {
            tokio::signal::ctrl_c().await.expect("failed to install ctrl_c handler");
            eprintln!("收到 SIGTERM/SIGINT,开始优雅退出");
            token.cancel();               // 5. 广播取消信号
        }
    });

    // 6. 接收端收尾
    let mut drain = 0usize;
    while let Some(msg) = rx.recv().await {
        eprintln!("处理: {}", msg);
        drain += 1;
    }
    eprintln!("通道已空,共处理 {} 条消息", drain);

    // 7. 等待所有任务完成
    let _ = tokio::join!(producer, shutdown);
    eprintln!("所有任务已结束,主进程退出");
    ExitCode::SUCCESS
}

#[derive(Clone)]
struct SenderGuard(mpsc::Sender<String>);

impl Drop for SenderGuard {
    fn drop(&mut self) {
        eprintln!("SenderGuard 被 drop,剩余 Sender 数减一");
    }
}

运行后在终端按 Ctrl-C,输出顺序为:
收到 SIGTERM/SIGINT,开始优雅退出
…继续把缓冲区消息消费完…
通道已空,共处理 N 条消息
SenderGuard 被 drop,剩余 Sender 数减一
所有任务已结束,主进程退出
进程返回码 0,符合国内运维审计标准

拓展思考

  1. 多实例 Sender 的引用计数:如果业务层把 Sender 存在 Arc<Mutex<Vec<Sender<T>>>> 里,必须在收到取消信号后遍历 Vec 逐一 drop,否则接收端永远无法收到 None。
  2. 零停机发布:在 Kubernetes 场景,preStop 钩子先调用 /graceful-shutdown 接口,接口里做 token.cancel(),再 sleep 5 s,给 istio-sidecar 足够时间把剩余请求转发完,这段逻辑用 Rust 写就是 warp::Filter 里嵌套一个 CancellationToken。
  3. 通道背压与丢弃策略:国内高并发支付系统允许在关闭阶段丢弃部分可重试消息,此时可以把通道换成 tokio::sync::mpsc::bounded(0),配合 try_sendtokio::select!,在取消信号到达后主动丢弃,保证 10 s 内一定退出,避免被 K8s 暴力 SIGKILL 导致订单状态不一致