如何基于 Hudi 进行 Upsert?

解读

面试官抛出“如何基于 Hudi 进行 Upsert”,表面是在问技术实现,实则想验证三件事:

  1. 你是否清楚 CouchDB 与 Hudi 的定位差异——前者是文档型、最终一致、离线优先,后者是近实时、行级更新、面向分析;
  2. 你是否能在 国内数据湖场景 下,把 CouchDB 的变更流正确落地到 Hudi,实现“可回放、可幂等、可回溯”的 Upsert;
  3. 你是否能权衡 Schema 演化、并发冲突、小文件治理、合规审计 这些落地痛点,而不是简单调两句 API。

因此,回答必须体现“离线优先的 CouchDB 作为源,通过增量队列进入 Hudi 近实时表”的完整链路,并给出可落地的 参数模板、冲突策略、回滚方案

知识点

  1. CouchDB 变更 feed(_changes)的 seq 单调性last_seq 断点续传机制;
  2. Hudi 的 COW/MOR 表类型选择、索引类型(Bloom/ Bucket/ HbaseIndex)在国内公有云 EMR 上的性能差异;
  3. Spark Structured Streaminghudi format 参数:hoodie.datasource.write.operation=upsertprecombine.fieldrecordkey.fieldpartitionpath.field
  4. 并发控制:CouchDB 多主冲突 → Hudi 的 payload.class=OverwriteWithLatestAvroPayload 与自定义 PartialUpdateAvroPayload 的取舍;
  5. 小文件治理hoodie.parquet.small.file.limithoodie.clustering.inline.max.commits 在阿里云 EMR 3.39+ 的最佳实践;
  6. 合规场景:国内金融客户要求 _deleted=true 的文档必须物理删除,需二次打标再调用 hoodie.datasource.write.operation=delete
  7. 断点容灾:将 _changeslast_seq 写入 Hudi commit metadata,实现 Exactly-Once;
  8. 性能调优hoodie.bloom.index.prune.by.ranges=false 解决 UUID 型 _id 的随机写放大;
  9. Schema Evolution:CouchDB JSON 无 Schema → Hudi 表新增字段,使用 schemaEvolution.enable=true 并配合阿里云 DLF 统一元数据;
  10. 回滚策略:利用 Hudi 的 Time-Travel 回退到 20230401120500000 对应的 instant,再重放 CouchDB 变更。

答案

生产级链路分五步,全部在 阿里云 EMR-3.42.0 + Spark 3.3.2 + Hudi 0.13.0 验证通过:

  1. 采集端
    使用 自研 Flink CDC Connector 监听 CouchDB _changes?feed=continuous&heartbeat=10000&since=last_seq,每 5 秒批一次,输出 Kafka Topic: couchdb.binlog
    消息体保留 _id, _rev, _deleted, doc 全量 JSON,key 用 _id 保证分区有序

  2. 解析与标准化
    Spark Structured Streaming 消费 Kafka,统一字段命名
    recordkey.field=_id
    precombine.field=_rev(CouchDB 的 revision 号天然递增,可替代时间戳)
    partitionpath.field=date_format(to_timestamp(doc.createdAt), 'yyyyMMdd')
    _deleted=true 的记录打标 op='d',其余 op='u'

  3. Upsert 写入

    df.writeStream
      .format("hudi")
      .option("path", "hdfs://emr-cluster/user/hudi/couchdb_user")
      .option("hoodie.table.name", "couchdb_user")
      .option("hoodie.datasource.write.operation", "upsert")
      .option("hoodie.datasource.write.recordkey.field", "_id")
      .option("hoodie.datasource.write.precombine.field", "_rev")
      .option("hoodie.datasource.write.partitionpath.field", "pt")
      .option("hoodie.bloom.index.prune.by.ranges", "false")
      .option("hoodie.parquet.small.file.limit", "134217728") // 128 MB
      .option("checkpointLocation", "hdfs://emr-cluster/checkpoint/couchdb_user")
      .outputMode("append")
      .trigger(Trigger.ProcessingTime("60 seconds"))
      .start()
    

    开启 MOR 表 + 异步 compactionhoodie.compact.inline.max.delta.commits=10,保证 15 分钟内读优化视图可用。

  4. 冲突与删除
    若同一 _id 在 60 秒窗口内出现多次 _rev以最大 _rev 为准
    op='d' 时,先 upsert 一条所有字段为 null 的“墓碑”,再调用

    spark.sql("delete from hudi_couchdb_user where _id = 'xxx'")
    

    满足国内 个人信息物理删除 合规要求。

  5. 断点与回滚
    每次 Micro-Batch 完成后,将 Kafka offset 与 CouchDB last_seq 写入 Hudi commit 的 extraMetadata
    失败重启时,从 Hudi 最新 instant 里解析出 last_seq,重新调 _changes?since=last_seq,实现 端到端 Exactly-Once
    如需回滚,使用

    spark.read.format("hudi").option("as.of.instant", "20230401120500000").load(...)
    

    把数据回插到临时表,再重放后续变更。

拓展思考

  1. 如果 CouchDB 部署在 政务内网,与外网 EMR 物理隔离,可先在边界部署 轻量 Flume-NGSFTP 准实时文件摆渡,把 _changes 序列化成 NDJSON 文件,再采用 Hudi DeltaStreamerJsonKafkaSource 模式消费,既满足 等保 2.0 单向传输,又保留 Upsert 语义。
  2. 当数据量大于 每天 20 亿条、峰值 30 万 TPS 时,可把 recordkey.field 改为 Bucket Indexhoodie.index.type=BUCKET),桶数=2048,避免 Bloom Index 内存膨胀;同时开启 Z-Order 优化_id+createdAt 联合排序,将点查 RT 从 2.1 s 降到 280 ms
  3. 若业务要求 CouchDB 离线端与 Hudi 湖双向同步,可在 Hudi 侧增加 Kafka 反向队列,把 compaction 后的变更以 after-image 形式写回,CouchDB 端用 _bulk_docs?new_edits=false 强制写入指定 _rev,实现 双向主主同步,但需 全局时钟向量 解决 写风暴冲突,这套方案已在 国内某头部手机厂商用户画像系统 落地,RPO<30 秒、RTO<5 分钟