如何在应用层实现自定义冲突解决函数并写回合并后的文档?
解读
国内互联网、金融、政企项目在使用 CouchDB 时,多活数据中心、边缘节点离线写入、移动 App 端同步 是三大高频场景。冲突不可避免,而 CouchDB 默认的“以胜利修订版本为准”往往无法满足业务规则(例如金额不能丢、库存不能超卖、操作人不能错)。因此面试官想确认候选人是否真正在应用层做过可验证、可回滚、可灰度的冲突解决闭环,而不是简单地把 _conflicts 数组一删了之。回答时要突出“读冲突→业务合并→写回 CAS→再次校验→异步审计”五步,并给出可直接落地的代码骨架与回滚策略。
知识点
- 修订树(Rev-Tree)与 _rev 格式:
_rev是数字-哈希的不可变令牌,写回时必须携带,否则触发 409。 - 冲突读取:
GET /db/docid?conflicts=true返回_conflicts数组,包含所有失败分支的最新修订。 - CAS(Compare-And-Swap):写回时把最新
_rev放在If-Match头,防止并发覆盖。 - 批量获取:
_all_docs?include_docs=true&keys=[...]一次拉取冲突文档,减少往返。 - 幂等设计:合并函数必须幂等,支持重试;建议给合并后文档打
mergeId与sourceRev[]字段,方便审计。 - 灰度回滚:合并前先写入 shadow 文档(
_id=docid_merge_${timestamp}),验证通过后再替换正式文档;出问题可秒级回滚。 - 国内合规:金融类项目需留痕,合并结果要同步到审计库(MySQL 或 Elasticsearch),保存操作人、IP、合并策略版本号。
答案
下面给出基于 Node.js 的完整可运行骨架,演示“拉取冲突→业务合并→CAS 写回→异步审计”四步,可直接嵌入 Egg.js、Spring Cloud sidecar 或 Serverless 函数。重点步骤已加粗注释,方便在面试时白板手写。
/**
* 自定义冲突解决主函数
* @param {string} dbUrl CouchDB 地址,如 http://couchdb:5984/orders
* @param {string} docId 业务主键
* @param {Function} mergeFn 用户传入的合并策略 (winner, losers) => newDoc
*/
async function resolveConflict(dbUrl, docId, mergeFn) {
const nano = require('nano')(dbUrl);
const db = nano.use(dbUrl.split('/').pop());
// **第 1 步:一次性拉取胜利分支 + 所有冲突分支**
const { data: main } = await db.get(docId, { conflicts: true });
if (!main._conflicts || main._conflicts.length === 0) return; // 无冲突直接返回
const conflictRevs = main._conflicts; // 例如 ['2-aaa', '3-bbb']
const docs = await db.fetch({ keys: conflictRevs.map(r => ({ id: docId, rev: r })) });
const losers = docs.rows.map(r => r.doc);
// **第 2 步:业务侧合并**
const merged = mergeFn(main, losers); // 用户自定义逻辑
merged._id = docId; // 保持主键不变
merged._rev = main._rev; // 必须携带胜利分支的 _rev,才能覆盖
merged._mergeMeta = {
sourceRev: [main._rev, ...conflictRevs],
mergedAt: new Date().toISOString(),
mergeId: require('uuid').v4()
};
// **第 3 步:CAS 写回,失败自动重试 3 次**
let retries = 3;
while (retries-- > 0) {
try {
const putResp = await db.insert(merged, { rev: merged._rev });
console.log('合并成功,新修订:', putResp.rev);
// **第 4 步:异步审计(国内合规)**
await saveAuditLog({
docId,
mergeId: merged._mergeMeta.mergeId,
operator: process.env.USER_ID || 'system',
ip: process.env.POD_IP || require('ip').address()
});
return;
} catch (e) {
if (e.statusCode === 409 && retries > 0) {
// 并发写导致 _rev 过期,重新拉取再合并
const { data: fresh } = await db.get(docId, { conflicts: true });
merged._rev = fresh._rev;
continue;
}
throw e;
}
}
}
/** 示例合并策略:订单金额累加,库存取最小可用 */
function orderMergeStrategy(winner, losers) {
const all = [winner, ...losers];
const result = JSON.parse(JSON.stringify(winner)); // 深拷贝
result.amount = all.reduce((sum, doc) => sum + (doc.amount || 0), 0);
result.stock = Math.min(...all.map(doc => doc.stock || 0));
return result;
}
// 调用入口
resolveConflict(
'http://couchdb:5984/orders',
'order_10086',
orderMergeStrategy
).catch(console.error);
关键点回顾
- 必须携带胜利分支的 _rev,否则 CouchDB 返回 409。
- 合并函数要幂等,支持重入;给合并结果打
mergeId方便追踪。 - 灰度方案:正式替换前可先写 shadow 文档,验证无误后删除 shadow 并替换主文档,实现秒级回滚。
- 国内审计:合并结果异步写入 MySQL 审计表,字段至少包含
merge_id、source_rev[]、operator、ip、timestamp,满足等保与金融合规。
拓展思考
- 多语言 SDK 差异:Java 使用 Ektorp 时,
@Revision注解会自动处理_rev,但批量拉冲突仍需手动构造/_all_docs请求;Go 的kivik支持RevsDiff接口,可提前算出冲突,减少流量。 - 性能调优:当冲突分支数 >100 时,建议先在后端任务队列(RocketMQ/RabbitMQ)里串行处理,避免长事务;也可通过
?_revs_limit=10把修订树深度砍短,降低存储。 - 边缘云场景:国内运营商推 5G MEC,CouchDB 运行在边缘节点,网络抖动大。可把合并函数下沉到Node-RED 流计算,离线时先写本地 SQLite,恢复后批量解决冲突,进一步降低中心压力。
- 与 Kafka 结合:合并完成后发
OrderMerged事件到 Kafka,下游库存、计费系统消费幂等键(mergeId) 做 Exactly-Once 处理,实现分布式事务最终一致。