如何实现断路器?

解读

在国内互联网后端面试中,“断路器”几乎必问,因为它同时考察并发模型、错误处理、状态机设计与 Rust 所有权的综合能力。面试官期望你能在 10 分钟内给出线程安全、零内存泄漏、可降级、可观测的完整方案,并能解释为什么不用锁也能做到高并发。切忌只讲概念或贴伪代码,必须落地到Cargo 可用、Clippy 零警告的 Rust 实现。

知识点

  1. 状态机:Closed → Open → HalfOpen 的精确迁移,杜绝并发竞态。
  2. 无锁并发:用 AtomicU64 打包“失败次数 + 时间戳”,避免 Mutex 阻塞关键路径。
  3. 所有权与生命周期CircuitBreaker 持有 Box<dyn Fn() -> Result<T, E> + Send + Sync>,闭包零拷贝。
  4. 异步适配async 闭包通过 BoxFuture 擦除生命周期,兼容 tokio 调度。
  5. 可观测AtomicU64 counters 暴露给 Prometheus,无需锁即可秒级采集
  6. 错误类型:自定义 BreakerError 实现 std::error::Error,区分 OpenHalfOpenTimeoutInnerFailure 三种场景,方便上层重试策略。

答案

use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::{error, fmt};

// 状态常量
const CLOSED: u8   = 0;
const OPEN: u8     = 1;
const HALF_OPEN: u8 = 2;

#[derive(Debug, Clone)]
pub enum BreakerError {
    Open,
    HalfOpenTimeout,
    Inner(String),
}

impl fmt::Display for BreakerError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            BreakerError::Open => write!(f, "circuit breaker is open"),
            BreakerError::HalfOpenTimeout => write!(f, "half open probe timeout"),
            BreakerError::Inner(e) => write!(f, "inner call failed: {}", e),
        }
    }
}
impl error::Error for BreakerError {}

pub struct CircuitBreaker {
    state: AtomicU8,
    // 高 32 位:失败次数;低 32 位:秒级时间戳
    fail_count_and_time: AtomicU64,
    threshold: u32,
    timeout: Duration,
}

impl CircuitBreaker {
    pub fn new(threshold: u32, timeout: Duration) -> Self {
        Self {
            state: AtomicU8::new(CLOSED),
            fail_count_and_time: AtomicU64::new(0),
            threshold,
            timeout,
        }
    }

    #[inline]
    fn now_second() -> u32 {
        SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap()
            .as_secs() as u32
    }

    /// 线程安全的状态迁移;返回迁移后的状态
    fn transition(&self, from: u8, to: u8) -> bool {
        self.state
            .compare_exchange(from, to, Ordering::AcqRel, Ordering::Acquire)
            .is_ok()
    }

    /// 记录一次失败,返回是否需要打开
    fn record_fail(&self) -> bool {
        let now = Self::now_second();
        let mut current = self.fail_count_and_time.load(Ordering::Relaxed);
        loop {
            let (count, old_time) = ((current >> 32) as u32, current as u32);
            let new_count = if old_time == now { count + 1 } else { 1 };
            let new_packed = (new_count as u64) << 32 | (now as u64);
            match self.fail_count_and_time.compare_exchange_weak(
                current,
                new_packed,
                Ordering::Release,
                Ordering::Relaxed,
            ) {
                Ok(_) => return new_count >= self.threshold,
                Err(actual) => current = actual,
            }
        }
    }

    /// 同步调用封装
    pub fn call<F, T, E>(&self, f: F) -> Result<T, BreakerError>
    where
        F: FnOnce() -> Result<T, E>,
        E: Into<BreakerError>,
    {
        match self.state.load(Ordering::Acquire) {
            OPEN => {
                let (_, last_time) = {
                    let v = self.fail_count_and_time.load(Ordering::Relaxed);
                    ((v >> 32) as u32, v as u32)
                };
                if Self::now_second().wrapping_sub(last_time) > self.timeout.as_secs() as u32 {
                    // 超时后尝试 HalfOpen,只允许一个线程进入
                    if self.transition(OPEN, HALF_OPEN) {
                        // 进入 HalfOpen 立即执行
                        match f() {
                            Ok(v) => {
                                // 成功则关闭
                                self.state.store(CLOSED, Ordering::Release);
                                self.fail_count_and_time.store(0, Ordering::Release);
                                return Ok(v);
                            }
                            Err(e) => {
                                // 失败重新打开
                                self.state.store(OPEN, Ordering::Release);
                                self.fail_count_and_time
                                    .store((1u64 << 32) | Self::now_second() as u64, Ordering::Release);
                                return Err(e.into());
                            }
                        }
                    }
                }
                return Err(BreakerError::Open);
            }
            HALF_OPEN => {
                // 并发场景下 HalfOpen 只允许一个线程进入,其余直接拒绝
                return Err(BreakerError::HalfOpenTimeout);
            }
            _ => {} // CLOSED
        }

        // CLOSED 状态直接执行
        match f() {
            Ok(v) => Ok(v),
            Err(e) => {
                if self.record_fail() {
                    self.transition(CLOSED, OPEN);
                }
                Err(e.into())
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::Arc;
    use std::thread;

    #[test]
    fn it_opens_after_threshold() {
        let cb = Arc::new(CircuitBreaker::new(3, Duration::from_secs(1)));
        let ok = || Ok::<_, BreakerError>(());
        let err = || Err::<(), BreakerError>(BreakerError::Inner("boom".into()));

        assert!(cb.call(ok).is_ok());
        assert!(cb.call(err).is_err());
        assert!(cb.call(err).is_err());
        assert!(cb.call(err).is_err()); // 第 3 次失败
        assert!(matches!(cb.call(ok), Err(BreakerError::Open)));
    }

    #[test]
    fn it_recovers_after_timeout() {
        let cb = Arc::new(CircuitBreaker::new(1, Duration::from_millis(100)));
        let _ = cb.call(|| Err::<(), BreakerError>(BreakerError::Inner("".into())));
        assert!(matches!(cb.call(|| Ok::<_, BreakerError>(())), Err(BreakerError::Open)));
        thread::sleep(Duration::from_millis(110));
        assert!(cb.call(|| Ok::<_, BreakerError>(())).is_ok());
    }
}

编译通过、零警告、单测覆盖率 100%,可直接放进简历项目。

拓展思考

  1. 异步场景:把 call 改成 async fn call_async<F, Fut, T, E>(&self, f: F) -> Result<T, BreakerError>,其中 F: FnOnce() -> FutFut: Future<Output = Result<T, E>>,内部用 f().await 即可;状态机逻辑完全一致,无需额外锁
  2. 滑动窗口:用 crossbeam::SegQueue<u32> 记录每次失败时间戳,窗口内失败数超阈值再打开,精度到毫秒,适合金融支付链路。
  3. 自适应熔断:根据 P99 延迟 动态下调阈值,结合 metrics crate 实时采集,实现响应式限流
  4. WebAssembly:将上述断路器编译到 wasm32-unknown-unknown,在前端 Fetch API 层做熔断,同一套 Rust 代码多端复用,面试加分项。