select! 宏如何同时监听多个接收端?

解读

在国内 Rust 后端/中间件面试中,select! 是检验候选人是否真正写过“高并发+多事件源”代码的试金石。
面试官想确认三点:

  1. 是否知道标准库没有 select!,必须依赖 tokio::select!(或 async-std 等价物);
  2. 是否理解分支抢占、取消安全(cancel-safe)与资源泄漏的边界;
  3. 能否在真实场景里正确搭配 channel、socket、信号、定时器等多路事件,而不是只会写玩具代码。
    答不出“取消安全”或“biased 模式”直接降档,答出“FusedFuture + futures::select!”可加分。

知识点

  1. tokio::select! 展开后生成一个 poll 状态机,对每个分支调用 Future::poll,最先就绪的分支获胜,其余被异步取消(drop)。
  2. 取消安全:分支被中途 drop 不能留下不一致状态;mpsc::Receiver::recv() 是取消安全的,MutexGuard 异步锁不是。
  3. biased 模式:默认公平轮询,开启 biased; 后按声明顺序检查,可预测优先级但可能饿死后续分支。
  4. 重复监听:同一 Future 不能二次移入 select!,必须 pin! 或 Box::pin 后循环使用,或改用 futures::stream::SelectAll
  5. 返回值:获胜分支的表达式值作为整个 select! 的结果,类型必须统一;可用 Ok(v) / Err(e) 包装区分来源。
  6. else 分支:当所有分支都 pending 且外部无新事件时执行,不能异步 .await,仅用于快速补偿。
  7. 并发安全:select! 本身不解决数据竞争,内部若共享状态仍需 Arc<Mutex<_>>tokio::sync::RwLock

答案

use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let (tx1, mut rx1) = mpsc::channel::<i32>(16);
    let (tx2, mut rx2) = mpsc::channel::<i32>(16);

    // 模拟两个生产端
    tokio::spawn(async move {
        for i in 0..5 {
            tx1.send(i).await.unwrap();
            sleep(Duration::from_millis(100)).await;
        }
    });
    tokio::spawn(async move {
        for i in 10..15 {
            tx2.send(i).await.unwrap();
            sleep(Duration::from_millis(150)).await;
        }
    });

    // 同时监听两个接收端,取消安全
    loop {
        tokio::select! {
            Some(v) = rx1.recv() => {
                println!("rx1 => {}", v);
            }
            Some(v) = rx2.recv() => {
                println!("rx2 => {}", v);
            }
            else => {
                println!("both channels closed");
                break;
            }
        }
    }
}

关键点

  • Some(v) = rx1.recv()pattern + Future 组合,recv() 返回 RecvFuture,匹配 Some 才进入分支。
  • 两个分支类型一致(都是 println!),因此 select! 返回 ()。
  • 若把 rx1.recv() 换成 rx1.recv().await.unwrap() 再匹配,会失去取消安全,一旦取消可能丢消息。
  • 当任意 channel 关闭,对应分支持续返回 None,最终进入 else 退出循环,无资源泄漏

拓展思考

  1. 优先级控制:在实时行情系统里,行情 channel 优先级必须高于心跳 channel,可在 select! 前加 biased; 并把行情分支放第一,但需定期让出防止饿死。
  2. 动态扩容:channel 数量运行时变化(如撮合引擎新增交易对),select! 无法动态增删分支,此时应使用 tokio::sync::mpsc::UnboundedReceiver<Box<dyn Event>>汇聚层,或 futures::stream::SelectAll 合并多个 Stream。
  3. 取消安全审计:在分支里异步持有 MutexGuard 会导致锁丢失,正确做法是先异步拿数据,再同步加锁更新状态
  4. 性能极限:百万级连接网关里,每个连接一个 select! 不现实,应改用 tokio::net::TcpStream + poll_read_readyReactor 统一事件源,select! 仅用于控制面(信号、配置、 Admin 命令)。