如何缓存 _changes 作为流式 DataFrame?
解读
在国内实时数仓、移动 App 离线同步、IoT 边缘计算等场景里,CouchDB 的 _changes 接口常被当作“事件总线”使用。面试官抛出“缓存成流式 DataFrame”这一问,核心想验证三点:
- 你是否理解 _changes 的连续性与断点续传机制(since、last_seq、heartbeat)。
- 能否把 JSON 变更事件无落地地转成 Spark/Flink 的 DataFrame,并解决Schema 变化、背压与exactly-once 问题。
- 是否知道国内常用国产化框架(如 Flink-CDC、Spark Structured Streaming)与 CouchDB 对接时的版权合规与性能调优细节。
知识点
- _changes 语义
- 三种 feed 模式:normal、longpoll、continuous;流式场景必须用 continuous 并保持长连接。
- 返回行字段:seq(全局有序)、id、changes、deleted、doc(若 include_docs=true)。
- 流式 DataFrame 本质
- Spark 侧是 Micro-Batch 模型,需要把 CouchDB 事件先映射成 Offset + 二元组 (seq, json),再按 epochDuration 切分。
- Flink 侧是 true streaming,可用 Async I/O 或 自定义 SourceFunction 拉取 _changes,通过 CheckpointedFunction 保存 last_seq 到状态后端。
- 缓存策略
- 内存缓存:在 Source 端用 Guava Cache 或 Caffeine 缓存最近 N 条,用于幂等去重;TTL 建议小于 CouchDB _revs_limit 窗口。
- 外部缓存:国内合规 Redis 版本(华为云 GaussDB(for Redis)、阿里云 Tair)存 last_seq 与 doc _rev 映射,实现断点续传与故障转移。
- Schema 演化
- CouchDB 文档无固定 Schema,需在 Schema Inference 阶段开启 mergeSchema 选项,或维护 Avro Schema Registry(国内可用 华为云 Schema Registry 服务)。
- Exactly-Once
- Spark 用 id + _rev 作为 composite key 做 upsert,Sink 到 Delta Lake 1.2 国产发行版(如 星环科技 Transwarp ArgoDB)。
- Flink 用 two-phase-commit,把 last_seq 作为 checkpoint 的一部分,Sink 到 Hologres 或 GaussDB(DWS)。
答案
给出国内落地最稳的 Spark Structured Streaming 方案,兼顾版权与性能:
- 自定义 CouchDBChangesSource 继承 Source trait,内部用 OkHttp 保持 continuous 长连接,解析 chunked JSON。
- 维护 last_seq 为 offset,在 getOffset() 里返回;commit(offset) 时把 last_seq 写入华为云 Redis 企业版(支持 lua + 事务),实现秒级断点续传。
- deserialize() 阶段把每行转成 StructType{seq: Long, id: String, rev: String, deleted: Boolean, doc: String},其中 doc 保持原始 JSON 字符串,避免早期绑定 Schema。
- 缓存层
- 在 Source 端内置 Caffeine 缓存最近 10 000 个 _rev,用于幂等过滤,防止 CouchDB compaction 后重复投递。
- 若下游要做 维表 join,再把热数据 cache() 到 Spark 内存,并设置 watermark 不超过 ttl 的 1/2。
- Sink 到 国产 Delta Lake 时,采用 foreachBatch 方式,用 MERGE INTO 语法按 id + rev 主键去重,保证 exactly-once。
- 监控
- 通过 Prometheus + 华为云 AOM 暴露 lag = max(seq) – committed_seq,lag > 30 万 即触发 SMS 告警。
核心代码片段(已脱敏):
val changes = spark.readStream
.format("couchdb.changes")
.option("continuous", "true")
.option("includeDocs", "true")
.option("cacheSize", "10000")
.option("checkpointLocation", "obs://your-bucket/chk")
.load()
val df = changes
.selectExpr("seq", "id", "rev", "deleted", "from_json(doc, schema) as data")
.withWatermark("timestamp", "10 minutes")
df.writeStream
.outputMode("append")
.foreachBatch { (batch, epochId) =>
batch.createOrReplaceTempView("updates")
spark.sql(
"""MERGE INTO delta_db.user_profile t
|USING updates s
|ON s.id = t.id AND s.rev = t.rev
|WHEN MATCHED THEN UPDATE SET *
|WHEN NOT MATCHED THEN INSERT *""".stripMargin)
}
.start()
拓展思考
- 如果 CouchDB 部署在涉密内网且无法直连,如何通过公安部合规的网闸把 _changes 事件单向导出到 Kafka,再消费成 DataFrame?需要额外考虑国密 SSL 与白名单端口策略。
- 当 _changes 量达到 5 万条/秒 时,continuous 长连接会出现TCP 序列号绕回问题,国内云厂商负载均衡默认 300 s 断链,如何基于 gRPC-Web + HTTP/2 ping 做保活并重置 last_seq?
- 在信创 ARM 服务器(鲲鹏 920)上,OpenJDK 8 + Spark 3.3 的 JNI 调用 Snappy 压缩会出现指令集不兼容,如何改用华为自研压缩算法并重新编译 CouchDB 3.2 以获得 15% 性能提升?