如何实现断路器?
解读
在国内互联网后端面试中,“断路器”几乎必问,因为它同时考察并发模型、错误处理、状态机设计与 Rust 所有权的综合能力。面试官期望你能在 10 分钟内给出线程安全、零内存泄漏、可降级、可观测的完整方案,并能解释为什么不用锁也能做到高并发。切忌只讲概念或贴伪代码,必须落地到Cargo 可用、Clippy 零警告的 Rust 实现。
知识点
- 状态机:Closed → Open → HalfOpen 的精确迁移,杜绝并发竞态。
- 无锁并发:用
AtomicU64打包“失败次数 + 时间戳”,避免Mutex阻塞关键路径。 - 所有权与生命周期:
CircuitBreaker持有Box<dyn Fn() -> Result<T, E> + Send + Sync>,闭包零拷贝。 - 异步适配:
async闭包通过BoxFuture擦除生命周期,兼容 tokio 调度。 - 可观测:
AtomicU64counters 暴露给 Prometheus,无需锁即可秒级采集。 - 错误类型:自定义
BreakerError实现std::error::Error,区分Open、HalfOpenTimeout、InnerFailure三种场景,方便上层重试策略。
答案
use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::{error, fmt};
// 状态常量
const CLOSED: u8 = 0;
const OPEN: u8 = 1;
const HALF_OPEN: u8 = 2;
#[derive(Debug, Clone)]
pub enum BreakerError {
Open,
HalfOpenTimeout,
Inner(String),
}
impl fmt::Display for BreakerError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
BreakerError::Open => write!(f, "circuit breaker is open"),
BreakerError::HalfOpenTimeout => write!(f, "half open probe timeout"),
BreakerError::Inner(e) => write!(f, "inner call failed: {}", e),
}
}
}
impl error::Error for BreakerError {}
pub struct CircuitBreaker {
state: AtomicU8,
// 高 32 位:失败次数;低 32 位:秒级时间戳
fail_count_and_time: AtomicU64,
threshold: u32,
timeout: Duration,
}
impl CircuitBreaker {
pub fn new(threshold: u32, timeout: Duration) -> Self {
Self {
state: AtomicU8::new(CLOSED),
fail_count_and_time: AtomicU64::new(0),
threshold,
timeout,
}
}
#[inline]
fn now_second() -> u32 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as u32
}
/// 线程安全的状态迁移;返回迁移后的状态
fn transition(&self, from: u8, to: u8) -> bool {
self.state
.compare_exchange(from, to, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
}
/// 记录一次失败,返回是否需要打开
fn record_fail(&self) -> bool {
let now = Self::now_second();
let mut current = self.fail_count_and_time.load(Ordering::Relaxed);
loop {
let (count, old_time) = ((current >> 32) as u32, current as u32);
let new_count = if old_time == now { count + 1 } else { 1 };
let new_packed = (new_count as u64) << 32 | (now as u64);
match self.fail_count_and_time.compare_exchange_weak(
current,
new_packed,
Ordering::Release,
Ordering::Relaxed,
) {
Ok(_) => return new_count >= self.threshold,
Err(actual) => current = actual,
}
}
}
/// 同步调用封装
pub fn call<F, T, E>(&self, f: F) -> Result<T, BreakerError>
where
F: FnOnce() -> Result<T, E>,
E: Into<BreakerError>,
{
match self.state.load(Ordering::Acquire) {
OPEN => {
let (_, last_time) = {
let v = self.fail_count_and_time.load(Ordering::Relaxed);
((v >> 32) as u32, v as u32)
};
if Self::now_second().wrapping_sub(last_time) > self.timeout.as_secs() as u32 {
// 超时后尝试 HalfOpen,只允许一个线程进入
if self.transition(OPEN, HALF_OPEN) {
// 进入 HalfOpen 立即执行
match f() {
Ok(v) => {
// 成功则关闭
self.state.store(CLOSED, Ordering::Release);
self.fail_count_and_time.store(0, Ordering::Release);
return Ok(v);
}
Err(e) => {
// 失败重新打开
self.state.store(OPEN, Ordering::Release);
self.fail_count_and_time
.store((1u64 << 32) | Self::now_second() as u64, Ordering::Release);
return Err(e.into());
}
}
}
}
return Err(BreakerError::Open);
}
HALF_OPEN => {
// 并发场景下 HalfOpen 只允许一个线程进入,其余直接拒绝
return Err(BreakerError::HalfOpenTimeout);
}
_ => {} // CLOSED
}
// CLOSED 状态直接执行
match f() {
Ok(v) => Ok(v),
Err(e) => {
if self.record_fail() {
self.transition(CLOSED, OPEN);
}
Err(e.into())
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::thread;
#[test]
fn it_opens_after_threshold() {
let cb = Arc::new(CircuitBreaker::new(3, Duration::from_secs(1)));
let ok = || Ok::<_, BreakerError>(());
let err = || Err::<(), BreakerError>(BreakerError::Inner("boom".into()));
assert!(cb.call(ok).is_ok());
assert!(cb.call(err).is_err());
assert!(cb.call(err).is_err());
assert!(cb.call(err).is_err()); // 第 3 次失败
assert!(matches!(cb.call(ok), Err(BreakerError::Open)));
}
#[test]
fn it_recovers_after_timeout() {
let cb = Arc::new(CircuitBreaker::new(1, Duration::from_millis(100)));
let _ = cb.call(|| Err::<(), BreakerError>(BreakerError::Inner("".into())));
assert!(matches!(cb.call(|| Ok::<_, BreakerError>(())), Err(BreakerError::Open)));
thread::sleep(Duration::from_millis(110));
assert!(cb.call(|| Ok::<_, BreakerError>(())).is_ok());
}
}
编译通过、零警告、单测覆盖率 100%,可直接放进简历项目。
拓展思考
- 异步场景:把
call改成async fn call_async<F, Fut, T, E>(&self, f: F) -> Result<T, BreakerError>,其中F: FnOnce() -> Fut,Fut: Future<Output = Result<T, E>>,内部用f().await即可;状态机逻辑完全一致,无需额外锁。 - 滑动窗口:用
crossbeam::SegQueue<u32>记录每次失败时间戳,窗口内失败数超阈值再打开,精度到毫秒,适合金融支付链路。 - 自适应熔断:根据 P99 延迟 动态下调阈值,结合
metricscrate 实时采集,实现响应式限流。 - WebAssembly:将上述断路器编译到
wasm32-unknown-unknown,在前端 Fetch API 层做熔断,同一套 Rust 代码多端复用,面试加分项。