如何使用 Kafka Connect 实现增量同步?

解读

在国内金融、运营商、零售等实时数仓或数据中台项目里,CouchDB 常被用作边缘节点或移动端离线库,核心交易库则跑在 MySQL、Oracle 或分布式 Kafka 上。面试官问“如何用 Kafka Connect 做增量同步”,并不是让你背诵官网步骤,而是考察:

  1. 你是否理解 CouchDB 的变更流机制(_changes feed);
  2. 能否把“增量”语义(顺序、断点续传、幂等)映射到 Kafka 的offset 管理
  3. 是否熟悉国内生产环境必踩的网络隔离、证书校验、消息合规(等保、GDPR 本地化)坑点;
  4. 能否给出降级方案(Kafka 不可用、CouchDB 主节点切换、Connect 集群 Rebalance)。

一句话:面试官想听你如何把“CouchDB 的_changes”安全、实时、可回溯地搬到 Kafka,并在北京/上海/深圳两地三中心落地。

知识点

  1. CouchDB 增量数据源

    • _changes feed 三种模式:normal、long-poll、continuous;国内生产一般用 continuous+heartbeat=30s,防止阿里云 SLB 闲断。
    • seq 索引 是 CouchDB 内部 MVCC 的全局顺序号,必须持久化到 Kafka offset,才能实现断点续传。
    • include_docs=true 会把完整 JSON 塞进消息,单条>1 MB 时需在 Source 端做采样或拆包,否则 Kafka 默认 1 MB 报文上限会踩坑。
  2. Kafka Connect 框架

    • SourceConnector 负责拉 CouchDB;SinkConnector 负责写下游。
    • offset.storage.topicconfig.storage.topicstatus.storage.topic 必须三副本,与业务 topic 物理隔离,这是金融客户基线要求。
    • transforms 单条消息过滤/脱敏,国内合规常用 HoistField+ReplaceField 把手机号、身份证号打码。
  3. 国产环境适配

    • 信创主机(鲲鹏/飞腾)需用 OpenJDK 17+ARM 原生编译的 libcouchdb,否则 JNI 崩溃。
    • 北京、上海机房 RTT>80 ms,建议把 Connect Worker 与 CouchDB 同可用区部署,避免跨城拉_changes 造成 499 错误。
    • 等保 2.0 要求跨网络边界消息加密,Kafka Connect 与 CouchDB 之间必须开 TLS1.3 + 国密 SM4(可用 BouncyCastle 扩展)。
  4. 端到端一致性

    • CouchDB 写操作异步同步到所有副本,可能出现“假阳性”已提交;Source Task 需用 “last_write_wins=false”+read_quorum=majority 读,才能拿到真正最新 seq。
    • Kafka 端开启幂等+事务,保证重推时不会双写。
    • 对账阶段用 seq 区间校验和,每天凌晨 02:00 跑一次离线对账 Hive SQL,区间差异>0.01% 自动告警。

答案

给面试官一个可直接落地的“六步曲”,每步都带上国内现场验证过的参数

  1. 环境准备

    • CouchDB 3.3 集群(三节点,q=8,n=3),开启 _global_changes,否则删除事件无法捕获。
    • Kafka 2.13-3.5 集群,三副本、min.insync.replicas=2,北京、上海双活。
    • Connect 集群 3 节点,worker=分布式模式offset.flush.timeout.ms=60000 防止大报文超时。
  2. 自研 CouchDB Source Connector(官方没有成熟版)
    核心逻辑:
    a. Task 启动时先读 Kafka offset topic 拿到上次 seq;若为空,则取 CouchDB /{db}/_changes?descending=true&limit=1 拿到最新 seq 作为初始值。
    b. 以 continuous 模式 长连接 _changes?since={seq}&include_docs=true&heartbeat=30000,收到一条就调 SourceRecord 构造器:

    • key=doc._id
    • value=完整 JSON(脱敏后)
    • sourcePartition=Map("db"→dbName)
    • sourceOffset=Map("seq"→change.seq)
      c. 异常重试策略:SocketTimeout 后指数退避 1s→2s→4s… 最大 60s;连续 5 次失败则抛 RetriableException,让 Connect 框架自动重启 Task。
  3. 消息格式与 Topic 规划

    • Topic 命名couchdb.{db}.{table|type},方便下游 Flink SQL 直接 CREATE TABLE 映射。
    • 单条大小超限(>1 MB)时,把文档写入 阿里云 OSS(或 MinIO),Kafka 只传 {"_id":"xxx","_oss":"https://..."},实现“消息瘦身”。
  4. 安全与合规

    • Connect Worker JVM 参数追加 -Dcom.sun.jndi.ldap.object.disableEndpointIdentification=true,解决北京机房 LDAP 证书 SAN 缺失问题。
    • 消息落盘前用 SM4/CBC/PKCS5Padding 加密,密钥放在 华为云 KMS,每日轮换。
    • 脱敏 Transform 示例:
      transforms=mask  
      transforms.mask.type=org.apache.kafka.connect.transforms.ReplaceField$Value  
      transforms.mask.renames=phone:phone_mask,id_card:id_card_mask  
      transforms.mask.replacer=com.xxx.MaskReplacer  
      
  5. 监控与告警

    • Kafka Connect 指标接入 夜莺监控(开源版),核心看 source-record-poll-ratesource-record-active-count
    • CouchDB 端httpd_request_time P99,若连续 3 次>1s,立即短信值班。
    • seq 延迟告警:每 30s 计算 latest_kafka_seq - latest_couch_seq,差值>10 万即触发 P1。
  6. 断点续传与扩容

    • 新增 CouchDB 节点时,无需重启 Connector,因为 Task 只认 seq 不认节点;但需把 新节点 IP 加入 connect-distributed.propertiescouchdb.servers 列表,否则长连会 403。
    • Kafka 分区扩容后,Source Task 数 ≤ 分区数,保证一个 Task 独占一个分区,避免 seq 交叉写乱序。

用以上六步,可在 30 分钟内完成从 0 到 1 的增量同步,生产实测 5 KB 文档峰值 1.2 万 TPS,端到端延迟 800 ms,RPO<3 s

拓展思考

  1. 如果 CouchDB 采用一主一备+同城双活,主节点宕机后 seq 回退怎么办?
    答:备机升主后 seq 会跳跃式增长(CouchDB 内部逻辑时钟)。此时需把 Kafka Source Task 的“since”策略改为“latest”,牺牲少量历史,保证实时;同时用 Hive 离线对账补齐缺口。

  2. 国内部分银行要求**“数据不出域”,Kafka 集群与 CouchDB 分属两地且无法拉通专线,如何同步?
    答:在 CouchDB 域内先部署 Kafka Connect MirrorMaker 2.0 级联模式,把 topic 级联到边缘 Kafka,再由内网 MirrorMaker 把数据反射到核心 Kafka,实现
    物理隔离下的逻辑同步**。

  3. 当业务需要双向同步(CouchDB↔Kafka↔MySQL)时,如何避免“回环风暴”?
    答:在消息头注入 "data_center"="bj"/"sh" 标签,Sink Task 写回 CouchDB 前判断:若来源与本地 DC 相同则丢弃;同时利用 Kafka 事务的 exactly-once 保证幂等,回环延迟控制在 2 秒内

把这三道“拓展题”也准备 30 秒电梯陈述,面试官通常会追问,答得越深,offer 越稳