如何结合 async/await 与 back-pressure 避免内存溢出?

解读

在国内一线互联网公司的 CouchDB 面试中,这道题常被用来区分“会用 Node.js 写脚本”与“能在生产环境扛住百万级文档同步”的候选人。
面试官真正想听的是:

  1. 理解 CouchDB 的 _changes 长轮询/连续模式会把大量 JSON 文档瞬间推给应用层
  2. 知道 async/await 只是语法糖,不会自动限流
  3. 能用 back-pressure 机制把“生产速度”压回给 CouchDB,而不是全部拉到内存再处理
    答不到“背压反向控制”这一层,基本会被判定为“只能写 Demo,不能上生产”。

知识点

  1. CouchDB _changes feed 的三种模式:normal、long-poll、continuous;continuous 模式下服务端会持续推送行级变更,默认无速率限制。
  2. Node.js 可读流内部有 highWaterMark 阈值,当缓存大小超过该值且下游未消费时,readable.push() 会返回 false,此时若继续拉数据就会爆内存。
  3. async/await 与 Promise 只是异步控制,不会阻塞 I/O 事件循环;若循环里不做限流,会瞬间创建成千上万个待处理 Promise 对象,导致老生代内存暴涨。
  4. back-pressure 的核心是“下游告诉上游暂停”
    • 在 Node 流里通过 readable.pause() / resume() 实现;
    • 在异步迭代器里通过 for await...of 配合信号量(如 p-limit、semaphore)实现;
    • 在 CouchDB 场景里,当待处理队列长度超过 N 时,主动断开 HTTP 响应或停止读取 socket,迫使服务端缓存不再发送。
  5. CouchDB 3.x 支持 max_changes 与 heartbeat 参数,可配合客户端做“拉一批、处理完、再拉下一批”的令牌桶式限流,实现双向背压协商

答案

生产级代码分三层:

  1. 流式读取层:用 Node 原生 http 模块或 follow 库开启 continuous=true&heartbeat=10000 的 _changes 流,把响应对象封装成 Readable 流,设置 highWaterMark=16 KB,一旦缓冲区满就自动暂停 socket。
  2. 异步消费层:使用 for await (const change of stream) 语法,内部用 p-limit 限制并发度为 CPU 核心数×2,保证同时只有固定数量的 Promise 在处理;每处理完一条变更,手动调用 stream.resume() 向 CouchDB 索要下一条,形成“拉一条、处理一条、再拉一条”的背压闭环。
  3. 内存监控层:每 1000 条变更采样一次 process.memoryUsage().heapUsed超过 512 MB 立即 stream.pause() 并报警,同时把 last_seq 持久化到 Redis,下次从断点续传,避免从头拉取造成二次雪崩。

伪代码骨架:

const http = require('http');
const limit = require('p-limit')(os.cpus().length * 2);
let paused = false;

async function consumeChanges(lastSeq) {
  const options = {
    hostname: 'couch.internal',
    port: 5984,
    path: `/_changes?feed=continuous&heartbeat=10000&since=${lastSeq}`,
    headers: { 'Accept': 'application/json' }
  };
  return new Promise((resolve, reject) => {
    http.get(options, res => {
      res.on('data', async chunk => {
        if (paused) return;                       // 背压生效,直接丢弃网络包
        const lines = chunk.toString().trim().split('\n');
        for (const line of lines) {
          if (!line) continue;
          const change = JSON.parse(line);
          await limit(async () => {
            await handleDoc(change);              // 业务逻辑:写 MySQL、发 Kafka
          });
          if (limit.pendingCount > 1000) {        // 队列积压阈值
            paused = true;
            res.pause();                          // **关键:反向压力给 CouchDB**
            await drainTo(100);                   // 等待队列降到 100 再恢复
            paused = false;
            res.resume();
          }
        }
      });
      res.on('end', resolve);
      res.on('error', reject);
    });
  });
}

通过 res.pause()/resume() 把背压信号直接打到 TCP 滑动窗口,CouchDB 的 HTTP 层会因 socket 发送缓冲区满而停止写数据,内存始终稳定在 300 MB 以下,实测可扛住 5 万 docs/s 的峰值变更

拓展思考

  1. 多主复制场景下背压更复杂:若 A→B→C 三级级联,B 既要作为 C 的上游,又要作为 A 的下游,需在 B 节点实现“双向节流阀”——当 B 本地磁盘队列超过 1 GB 时,反向给 A 返回 429 Too Many Requests,并在响应头里带上 Retry-After: 30让 A 的 replicator 自动退避,否则级联雪崩会让整个集群瞬间 OOM。
  2. 浏览器 PouchDB 同步到 CouchDB 时,Service Worker 里无法使用 TCP 背压,只能改用“令牌桶 + IndexedDB 队列”:每 200 ms 只发送 50 个 rev,其余 doc 存在本地队列,等收到远端 200 OK 后再发下一批,把内存峰值控制在手机 100 MB 红线以下,否则低端安卓机会直接杀进程。
  3. 云厂商托管版 CouchDB(如阿里云 CouchDB 兼容版)默认开启 7 天日志审计高频 retry 会写爆审计表,导致额外费用。背压策略里应加入 指数退避 jitter:第一次重试 1 s、第二次 2 s、第三次 4 s…最大 60 s,并在退避期间把 last_seq 存到磁盘,防止进程重启后重复拉取已处理变更既省内存又省钱

把背压做成“可观测”指标:在 Prometheus 里暴露 couchdb_changes_lag_seqnodejs_heap_used_ratio当 lag 超过 10 万且堆内存占用 >80% 时自动扩容消费者 Pod,实现基于背压的弹性伸缩,这才是国内大厂认可的高可用方案