如何配置 Druid 摄入 _changes feed?

解读

在国内的实时数仓或日志分析场景中,CouchDB 的 _changes feed 常被当作“轻量级 CDC 源”,需要把增量文档流实时打入 Druid(阿里云叫“实时计算 for Apache Druid”,开源社区多称 Apache Druid)。面试官问“如何配置”,并不是让你背一段 JSON,而是考察你对 CouchDB 变更流语义Druid ingestion spec 生命周期网络与权限合规容错与幂等 四个维度的闭环思考。回答时要体现“能让系统明天上线”的落地感,而不是“Demo 能跑就行”。

知识点

  1. CouchDB _changes 三种模式:normal、longpoll、continuous;国内公网/专有云环境必须 continuous + heartbeat=60000 防止网关断链。
  2. _changes 返回的 seq 字段是逻辑时钟,Druid 需将其映射为 primary timestampingestion-time dimension,否则同一文档多次更新会被 Druid 视为重复行。
  3. Druid 端没有官方 CouchDB indexer,需用 Kafka Indexing ServiceKinesis Indexing Service 做桥接;CouchDB 侧用 “changes + filter” 把增量写进 Kafka,Druid 再订阅 Topic。
  4. 国内等保要求 TLS 双向认证 + 白名单 IP,CouchDB 需在 local.ini 里打开 [ssl] 并配置 verify_ssl_certificates = true,同时在 Druid Overlord 的 ingestionSpec 里把 kafka.consumer.propertiessecurity.protocol 设为 SASL_SSL
  5. 幂等:CouchDB 的 id + _rev 唯一,但 Druid 只有 rollup 或 replace 语义;要在 transformSpec 里把 _rev 作为 dimension,并在 metricsSpec 里用 “longMax” 聚合 seq,保证 同一版本只保留最新 seq,实现 exactly-once 语义
  6. 监控:必须给 /db/_changesPrometheus exporter(国内常用 couchdb-prometheus 二次开发版),把 changes_pending > 1000 作为 Druid 消费 lag 的预警前置条件,否则 Kafka 没堆积也会误报警。

答案

线上落地的最小闭环配置分三步:

第一步:CouchDB 侧
local.ini 追加:

[httpd]
enable_cors = true
bind_address = 0.0.0.0

[ssl]
cert_file = /opt/couchdb/cert/couchdb.pem
key_file = /opt/couchdb/cert/couchdb.key
verify_ssl_certificates = true

重启后创建 专用只读账号

curl -X PUT https://couchdb-internal:6984/_node/couchdb@127.0.0.1/_config/admins/druid_ro -d '"Druid@2025!"'

创建 filter 函数 只同步业务文档:

curl -X PUT https://druid_ro:Druid@2025!@couchdb-internal:6984/mydb/_design/druid -d '{"filters":{"druid":"function(doc,req){return doc.type==\"order\"&&doc.region==\"CN\");}"}}'

第二步:Kafka 桥接
Debezium CouchDB Connector(国内阿里云 Kafka 托管版已预编译)启动任务:

name=couchdb-druid-source
connector.class=io.debezium.connector.couchdb.CouchdbConnector
couchdb.hosts=couchdb-internal:6984
couchdb.ssl.enabled=true
couchdb.user=druid_ro
couchdb.password=Druid@2025!
database.include.list=mydb
collection.include.list=mydb
changes.filter=druid
topic.prefix=cdc_couchdb
tombstones.on.delete=false

Connector 会把 continuous _changes 转成 Kafka JSONkey=docid,value={“before”:null,”after”:{…},”seq”:…”}

第三步:Druid ingestion spec
阿里云 Druid 控制台 -> 实时摄入 -> JSON 模式 提交:

{
  "type": "kafka",
  "spec": {
    "ioConfig": {
      "type": "kafka",
      "consumerProperties": {
        "bootstrap.servers": "alikafka-internal:9092",
        "security.protocol": "SASL_SSL",
        "sasl.mechanism": "PLAIN",
        "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"XXX\" password=\"XXX\";"
      },
      "topic": "cdc_couchdb.mydb",
      "inputFormat": {
        "type": "json",
        "flattenSpec": {
          "useFieldDiscovery": true,
          "fields": [
            { "type": "path", "name": "doc_id", "expr": "$.after._id" },
            { "type": "path", "name": "rev", "expr": "$.after._rev" },
            { "type": "path", "name": "seq", "expr": "$.seq" }
          ]
        }
      }
    },
    "dataSchema": {
      "dataSource": "couchdb_order_cn",
      "timestampSpec": { "column": "seq", "format": "posix" },
      "dimensionsSpec": {
        "dimensions": ["doc_id", "rev", "region", "order_status"]
      },
      "metricsSpec": [
        { "name": "seq_max", "type": "longMax", "fieldName": "seq" }
      ],
      "granularitySpec": {
        "type": "uniform",
        "segmentGranularity": "HOUR",
        "queryGranularity": "NONE",
        "rollup": false
      }
    },
    "tuningConfig": {
      "type": "kafka",
      "maxRowsPerSegment": 5000000,
      "intermediateHandoffPeriod": "PT10M"
    }
  }
}

提交后 Overlord 会起 Kafka Indexing Taskexactly-once 由 Kafka 自身的 offset + Druid 的 segment 提交协议保证seq_max 聚合确保同一 _rev 只保留最新版本,避免重复计算。

拓展思考

  1. 如果 CouchDB 是 3.x 集群 + 分片,_changes 返回的 seq 是 shard-range 字符串不能直接当时间戳;需要 **在 transformSpec 里用 JavaScript 解析 seq 并生成 ingestionTime,或者 **在 Kafka Connect 里加 SMT 把 seq 换成 Kafka 事件时间,否则 Druid 会出现 空时间戳拒绝写入
  2. 国内金融场景要求 5 级容灾,CouchDB 多活 + Druid 双集群;可在 Kafka MirrorMaker 2跨 Region 复制,Druid 侧用 replace 覆盖策略 保证 北京、上海两机房数据最终一致,同时 seq 全局唯一UUID1 前缀 解决。
  3. 如果数据量 <1 万条/秒,但单条文档 >2 MB,建议 **在 Kafka Connect 里把 “after” 内容写入 OSS/NAS,Kafka 只传 URL 指针,Druid 用 native batch + index_parallel 每小时拉取一次,避免 Kafka 消息过大被云厂商限流