如何缓存 _changes 作为流式 DataFrame?

解读

在国内实时数仓、移动 App 离线同步、IoT 边缘计算等场景里,CouchDB 的 _changes 接口常被当作“事件总线”使用。面试官抛出“缓存成流式 DataFrame”这一问,核心想验证三点:

  1. 你是否理解 _changes连续性与断点续传机制(since、last_seq、heartbeat)。
  2. 能否把 JSON 变更事件无落地地转成 Spark/Flink 的 DataFrame,并解决Schema 变化背压exactly-once 问题。
  3. 是否知道国内常用国产化框架(如 Flink-CDC、Spark Structured Streaming)与 CouchDB 对接时的版权合规性能调优细节。

知识点

  1. _changes 语义
    • 三种 feed 模式:normallongpollcontinuous;流式场景必须用 continuous 并保持长连接。
    • 返回行字段:seq(全局有序)、idchangesdeleteddoc(若 include_docs=true)。
  2. 流式 DataFrame 本质
    • Spark 侧是 Micro-Batch 模型,需要把 CouchDB 事件先映射成 Offset + 二元组 (seq, json),再按 epochDuration 切分。
    • Flink 侧是 true streaming,可用 Async I/O自定义 SourceFunction 拉取 _changes,通过 CheckpointedFunction 保存 last_seq 到状态后端。
  3. 缓存策略
    • 内存缓存:在 Source 端用 Guava CacheCaffeine 缓存最近 N 条,用于幂等去重;TTL 建议小于 CouchDB _revs_limit 窗口。
    • 外部缓存:国内合规 Redis 版本(华为云 GaussDB(for Redis)阿里云 Tair)存 last_seqdoc _rev 映射,实现断点续传故障转移
  4. Schema 演化
    • CouchDB 文档无固定 Schema,需在 Schema Inference 阶段开启 mergeSchema 选项,或维护 Avro Schema Registry(国内可用 华为云 Schema Registry 服务)。
  5. Exactly-Once
    • Spark 用 id + _rev 作为 composite keyupsert,Sink 到 Delta Lake 1.2 国产发行版(如 星环科技 Transwarp ArgoDB)。
    • Flink 用 two-phase-commit,把 last_seq 作为 checkpoint 的一部分,Sink 到 HologresGaussDB(DWS)

答案

给出国内落地最稳的 Spark Structured Streaming 方案,兼顾版权与性能:

  1. 自定义 CouchDBChangesSource 继承 Source trait,内部用 OkHttp 保持 continuous 长连接,解析 chunked JSON
  2. 维护 last_seqoffset,在 getOffset() 里返回;commit(offset) 时把 last_seq 写入华为云 Redis 企业版(支持 lua + 事务),实现秒级断点续传
  3. deserialize() 阶段把每行转成 StructType{seq: Long, id: String, rev: String, deleted: Boolean, doc: String},其中 doc 保持原始 JSON 字符串,避免早期绑定 Schema。
  4. 缓存层
    • Source 端内置 Caffeine 缓存最近 10 000_rev,用于幂等过滤,防止 CouchDB compaction 后重复投递。
    • 若下游要做 维表 join,再把热数据 cache()Spark 内存,并设置 watermark 不超过 ttl 的 1/2
  5. Sink国产 Delta Lake 时,采用 foreachBatch 方式,用 MERGE INTO 语法按 id + rev 主键去重,保证 exactly-once
  6. 监控
    • 通过 Prometheus + 华为云 AOM 暴露 lag = max(seq) – committed_seqlag > 30 万 即触发 SMS 告警。

核心代码片段(已脱敏):

val changes = spark.readStream
  .format("couchdb.changes")
  .option("continuous", "true")
  .option("includeDocs", "true")
  .option("cacheSize", "10000")
  .option("checkpointLocation", "obs://your-bucket/chk")
  .load()

val df = changes
  .selectExpr("seq", "id", "rev", "deleted", "from_json(doc, schema) as data")
  .withWatermark("timestamp", "10 minutes")

df.writeStream
  .outputMode("append")
  .foreachBatch { (batch, epochId) =>
    batch.createOrReplaceTempView("updates")
    spark.sql(
      """MERGE INTO delta_db.user_profile t
        |USING updates s
        |ON s.id = t.id AND s.rev = t.rev
        |WHEN MATCHED THEN UPDATE SET *
        |WHEN NOT MATCHED THEN INSERT *""".stripMargin)
  }
  .start()

拓展思考

  1. 如果 CouchDB 部署在涉密内网且无法直连,如何通过公安部合规的网闸_changes 事件单向导出Kafka,再消费成 DataFrame?需要额外考虑国密 SSL白名单端口策略。
  2. _changes 量达到 5 万条/秒 时,continuous 长连接会出现TCP 序列号绕回问题,国内云厂商负载均衡默认 300 s 断链,如何基于 gRPC-Web + HTTP/2 ping保活重置 last_seq
  3. 信创 ARM 服务器(鲲鹏 920)上,OpenJDK 8 + Spark 3.3JNI 调用 Snappy 压缩会出现指令集不兼容,如何改用华为自研压缩算法并重新编译 CouchDB 3.2 以获得 15% 性能提升?