如何关闭发送端并优雅退出?
解读
在国内 Rust 后端/中间件面试中,面试官问“如何关闭发送端并优雅退出”时,真正想考察的是你对异步通道(tokio::sync::mpsc)、资源生命周期、任务取消(Cancellation Token)以及进程退出码的综合掌控能力。
他希望你回答的不只是“drop(sender)”,而是:
- 主动关闭发送端,让接收端能优雅地处理剩余消息;
- 通知所有异步任务有序收尾,避免 tokio 运行时强制杀掉任务导致数据丢失;
- 最终主函数返回 ExitCode::SUCCESS,符合国内运维对“优雅退出”的审计要求。
知识点
- tokio::sync::mpsc::Sender<T>:实现了 Clone,当所有 Sender 被 drop 后,Receiver::recv() 返回 None,这是 Rust 社区公认的“半关”信号。
- tokio::select! 的 biased 模式 与 CancellationToken:在关闭阶段优先处理取消信号,防止忙等。
- Drop 顺序与 RAII:把 Sender 包在自定义 struct 里实现 Drop,打印审计日志,国内金融级代码评审常看这一点。
- std::process::ExitCode:Rust 1.61+ 提供的标准退出码,比 std::process::exit 不会提前刷新问题,更符合银行/券商合规要求。
- tokio::signal::ctrl_c():在 Linux 下注册 SIGINT/SIGTERM,国内容器平台(阿里云 ACK、腾讯云 TKE)默认发 SIGTERM,10 s 后强制 SIGKILL,必须在这段时间内完成优雅退出。
答案
use std::process::ExitCode;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
#[tokio::main]
async fn main() -> ExitCode {
let (tx, mut rx) = mpsc::channel::<String>(1024);
let guard = SenderGuard(tx); // 1. RAII 管理 Sender
let token = CancellationToken::new();
// 2. 业务任务
let producer = tokio::spawn({
let guard = guard.clone();
let token = token.clone();
async move {
for i in 0.. {
if token.is_cancelled() { break; }
guard.0.send(format!("msg-{i}")).await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
// 3. 主动 drop 本任务持有的 Sender,通知接收端
drop(guard);
}
});
// 4. 优雅关机任务
let shutdown = tokio::spawn({
let token = token.clone();
async move {
tokio::signal::ctrl_c().await.expect("failed to install ctrl_c handler");
eprintln!("收到 SIGTERM/SIGINT,开始优雅退出");
token.cancel(); // 5. 广播取消信号
}
});
// 6. 接收端收尾
let mut drain = 0usize;
while let Some(msg) = rx.recv().await {
eprintln!("处理: {}", msg);
drain += 1;
}
eprintln!("通道已空,共处理 {} 条消息", drain);
// 7. 等待所有任务完成
let _ = tokio::join!(producer, shutdown);
eprintln!("所有任务已结束,主进程退出");
ExitCode::SUCCESS
}
#[derive(Clone)]
struct SenderGuard(mpsc::Sender<String>);
impl Drop for SenderGuard {
fn drop(&mut self) {
eprintln!("SenderGuard 被 drop,剩余 Sender 数减一");
}
}
运行后在终端按 Ctrl-C,输出顺序为:
收到 SIGTERM/SIGINT,开始优雅退出
…继续把缓冲区消息消费完…
通道已空,共处理 N 条消息
SenderGuard 被 drop,剩余 Sender 数减一
所有任务已结束,主进程退出
进程返回码 0,符合国内运维审计标准。
拓展思考
- 多实例 Sender 的引用计数:如果业务层把 Sender 存在 Arc<Mutex<Vec<Sender<T>>>> 里,必须在收到取消信号后遍历 Vec 逐一 drop,否则接收端永远无法收到 None。
- 零停机发布:在 Kubernetes 场景,preStop 钩子先调用 /graceful-shutdown 接口,接口里做 token.cancel(),再 sleep 5 s,给 istio-sidecar 足够时间把剩余请求转发完,这段逻辑用 Rust 写就是 warp::Filter 里嵌套一个 CancellationToken。
- 通道背压与丢弃策略:国内高并发支付系统允许在关闭阶段丢弃部分可重试消息,此时可以把通道换成 tokio::sync::mpsc::bounded(0),配合 try_send 和 tokio::select!,在取消信号到达后主动丢弃,保证 10 s 内一定退出,避免被 K8s 暴力 SIGKILL 导致订单状态不一致。