如何在应用层实现自定义冲突解决函数并写回合并后的文档?

解读

国内互联网、金融、政企项目在使用 CouchDB 时,多活数据中心边缘节点离线写入移动 App 端同步 是三大高频场景。冲突不可避免,而 CouchDB 默认的“以胜利修订版本为准”往往无法满足业务规则(例如金额不能丢、库存不能超卖、操作人不能错)。因此面试官想确认候选人是否真正在应用层做过可验证、可回滚、可灰度的冲突解决闭环,而不是简单地把 _conflicts 数组一删了之。回答时要突出“读冲突→业务合并→写回 CAS→再次校验→异步审计”五步,并给出可直接落地的代码骨架与回滚策略。

知识点

  1. 修订树(Rev-Tree)与 _rev 格式_rev数字-哈希 的不可变令牌,写回时必须携带,否则触发 409。
  2. 冲突读取GET /db/docid?conflicts=true 返回 _conflicts 数组,包含所有失败分支的最新修订。
  3. CAS(Compare-And-Swap):写回时把最新 _rev 放在 If-Match 头,防止并发覆盖。
  4. 批量获取_all_docs?include_docs=true&keys=[...] 一次拉取冲突文档,减少往返。
  5. 幂等设计:合并函数必须幂等,支持重试;建议给合并后文档打 mergeIdsourceRev[] 字段,方便审计。
  6. 灰度回滚:合并前先写入 shadow 文档_id=docid_merge_${timestamp}),验证通过后再替换正式文档;出问题可秒级回滚。
  7. 国内合规:金融类项目需留痕,合并结果要同步到审计库(MySQL 或 Elasticsearch),保存操作人、IP、合并策略版本号。

答案

下面给出基于 Node.js 的完整可运行骨架,演示“拉取冲突→业务合并→CAS 写回→异步审计”四步,可直接嵌入 Egg.js、Spring Cloud sidecar 或 Serverless 函数。重点步骤已加注释,方便在面试时白板手写。

/**
 * 自定义冲突解决主函数
 * @param {string} dbUrl  CouchDB 地址,如 http://couchdb:5984/orders
 * @param {string} docId  业务主键
 * @param {Function} mergeFn 用户传入的合并策略 (winner, losers) => newDoc
 */
async function resolveConflict(dbUrl, docId, mergeFn) {
  const nano = require('nano')(dbUrl);
  const db = nano.use(dbUrl.split('/').pop());

  // **第 1 步:一次性拉取胜利分支 + 所有冲突分支**
  const { data: main } = await db.get(docId, { conflicts: true });
  if (!main._conflicts || main._conflicts.length === 0) return; // 无冲突直接返回

  const conflictRevs = main._conflicts; // 例如 ['2-aaa', '3-bbb']
  const docs = await db.fetch({ keys: conflictRevs.map(r => ({ id: docId, rev: r })) });
  const losers = docs.rows.map(r => r.doc);

  // **第 2 步:业务侧合并**
  const merged = mergeFn(main, losers);   // 用户自定义逻辑
  merged._id = docId;                     // 保持主键不变
  merged._rev = main._rev;                // 必须携带胜利分支的 _rev,才能覆盖
  merged._mergeMeta = {
    sourceRev: [main._rev, ...conflictRevs],
    mergedAt: new Date().toISOString(),
    mergeId: require('uuid').v4()
  };

  // **第 3 步:CAS 写回,失败自动重试 3 次**
  let retries = 3;
  while (retries-- > 0) {
    try {
      const putResp = await db.insert(merged, { rev: merged._rev });
      console.log('合并成功,新修订:', putResp.rev);

      // **第 4 步:异步审计(国内合规)**
      await saveAuditLog({
        docId,
        mergeId: merged._mergeMeta.mergeId,
        operator: process.env.USER_ID || 'system',
        ip: process.env.POD_IP || require('ip').address()
      });
      return;
    } catch (e) {
      if (e.statusCode === 409 && retries > 0) {
        // 并发写导致 _rev 过期,重新拉取再合并
        const { data: fresh } = await db.get(docId, { conflicts: true });
        merged._rev = fresh._rev;
        continue;
      }
      throw e;
    }
  }
}

/** 示例合并策略:订单金额累加,库存取最小可用 */
function orderMergeStrategy(winner, losers) {
  const all = [winner, ...losers];
  const result = JSON.parse(JSON.stringify(winner)); // 深拷贝
  result.amount = all.reduce((sum, doc) => sum + (doc.amount || 0), 0);
  result.stock = Math.min(...all.map(doc => doc.stock || 0));
  return result;
}

// 调用入口
resolveConflict(
  'http://couchdb:5984/orders',
  'order_10086',
  orderMergeStrategy
).catch(console.error);

关键点回顾

  1. 必须携带胜利分支的 _rev,否则 CouchDB 返回 409。
  2. 合并函数要幂等,支持重入;给合并结果打 mergeId 方便追踪。
  3. 灰度方案:正式替换前可先写 shadow 文档,验证无误后删除 shadow 并替换主文档,实现秒级回滚。
  4. 国内审计:合并结果异步写入 MySQL 审计表,字段至少包含 merge_idsource_rev[]operatoriptimestamp,满足等保与金融合规。

拓展思考

  1. 多语言 SDK 差异:Java 使用 Ektorp 时,@Revision 注解会自动处理 _rev,但批量拉冲突仍需手动构造 /_all_docs 请求;Go 的 kivik 支持 RevsDiff 接口,可提前算出冲突,减少流量。
  2. 性能调优:当冲突分支数 >100 时,建议先在后端任务队列(RocketMQ/RabbitMQ)里串行处理,避免长事务;也可通过 ?_revs_limit=10 把修订树深度砍短,降低存储。
  3. 边缘云场景:国内运营商推 5G MEC,CouchDB 运行在边缘节点,网络抖动大。可把合并函数下沉到Node-RED 流计算,离线时先写本地 SQLite,恢复后批量解决冲突,进一步降低中心压力。
  4. 与 Kafka 结合:合并完成后发 OrderMerged 事件到 Kafka,下游库存、计费系统消费幂等键(mergeId) 做 Exactly-Once 处理,实现分布式事务最终一致。