如何配置 Druid 摄入 _changes feed?
解读
在国内的实时数仓或日志分析场景中,CouchDB 的 _changes feed 常被当作“轻量级 CDC 源”,需要把增量文档流实时打入 Druid(阿里云叫“实时计算 for Apache Druid”,开源社区多称 Apache Druid)。面试官问“如何配置”,并不是让你背一段 JSON,而是考察你对 CouchDB 变更流语义、Druid ingestion spec 生命周期、网络与权限合规、容错与幂等 四个维度的闭环思考。回答时要体现“能让系统明天上线”的落地感,而不是“Demo 能跑就行”。
知识点
- CouchDB _changes 三种模式:normal、longpoll、continuous;国内公网/专有云环境必须 continuous + heartbeat=60000 防止网关断链。
- _changes 返回的 seq 字段是逻辑时钟,Druid 需将其映射为 primary timestamp 或 ingestion-time dimension,否则同一文档多次更新会被 Druid 视为重复行。
- Druid 端没有官方 CouchDB indexer,需用 Kafka Indexing Service 或 Kinesis Indexing Service 做桥接;CouchDB 侧用 “changes + filter” 把增量写进 Kafka,Druid 再订阅 Topic。
- 国内等保要求 TLS 双向认证 + 白名单 IP,CouchDB 需在 local.ini 里打开 [ssl] 并配置 verify_ssl_certificates = true,同时在 Druid Overlord 的 ingestionSpec 里把 kafka.consumer.properties 的 security.protocol 设为 SASL_SSL。
- 幂等:CouchDB 的 id + _rev 唯一,但 Druid 只有 rollup 或 replace 语义;要在 transformSpec 里把 _rev 作为 dimension,并在 metricsSpec 里用 “longMax” 聚合 seq,保证 同一版本只保留最新 seq,实现 exactly-once 语义。
- 监控:必须给 /db/_changes 加 Prometheus exporter(国内常用 couchdb-prometheus 二次开发版),把 changes_pending > 1000 作为 Druid 消费 lag 的预警前置条件,否则 Kafka 没堆积也会误报警。
答案
线上落地的最小闭环配置分三步:
第一步:CouchDB 侧
在 local.ini 追加:
[httpd]
enable_cors = true
bind_address = 0.0.0.0
[ssl]
cert_file = /opt/couchdb/cert/couchdb.pem
key_file = /opt/couchdb/cert/couchdb.key
verify_ssl_certificates = true
重启后创建 专用只读账号:
curl -X PUT https://couchdb-internal:6984/_node/couchdb@127.0.0.1/_config/admins/druid_ro -d '"Druid@2025!"'
创建 filter 函数 只同步业务文档:
curl -X PUT https://druid_ro:Druid@2025!@couchdb-internal:6984/mydb/_design/druid -d '{"filters":{"druid":"function(doc,req){return doc.type==\"order\"&&doc.region==\"CN\");}"}}'
第二步:Kafka 桥接
用 Debezium CouchDB Connector(国内阿里云 Kafka 托管版已预编译)启动任务:
name=couchdb-druid-source
connector.class=io.debezium.connector.couchdb.CouchdbConnector
couchdb.hosts=couchdb-internal:6984
couchdb.ssl.enabled=true
couchdb.user=druid_ro
couchdb.password=Druid@2025!
database.include.list=mydb
collection.include.list=mydb
changes.filter=druid
topic.prefix=cdc_couchdb
tombstones.on.delete=false
Connector 会把 continuous _changes 转成 Kafka JSON,key=docid,value={“before”:null,”after”:{…},”seq”:…”}。
第三步:Druid ingestion spec
在 阿里云 Druid 控制台 -> 实时摄入 -> JSON 模式 提交:
{
"type": "kafka",
"spec": {
"ioConfig": {
"type": "kafka",
"consumerProperties": {
"bootstrap.servers": "alikafka-internal:9092",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "PLAIN",
"sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"XXX\" password=\"XXX\";"
},
"topic": "cdc_couchdb.mydb",
"inputFormat": {
"type": "json",
"flattenSpec": {
"useFieldDiscovery": true,
"fields": [
{ "type": "path", "name": "doc_id", "expr": "$.after._id" },
{ "type": "path", "name": "rev", "expr": "$.after._rev" },
{ "type": "path", "name": "seq", "expr": "$.seq" }
]
}
}
},
"dataSchema": {
"dataSource": "couchdb_order_cn",
"timestampSpec": { "column": "seq", "format": "posix" },
"dimensionsSpec": {
"dimensions": ["doc_id", "rev", "region", "order_status"]
},
"metricsSpec": [
{ "name": "seq_max", "type": "longMax", "fieldName": "seq" }
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "HOUR",
"queryGranularity": "NONE",
"rollup": false
}
},
"tuningConfig": {
"type": "kafka",
"maxRowsPerSegment": 5000000,
"intermediateHandoffPeriod": "PT10M"
}
}
}
提交后 Overlord 会起 Kafka Indexing Task,exactly-once 由 Kafka 自身的 offset + Druid 的 segment 提交协议保证;seq_max 聚合确保同一 _rev 只保留最新版本,避免重复计算。
拓展思考
- 如果 CouchDB 是 3.x 集群 + 分片,_changes 返回的 seq 是 shard-range 字符串,不能直接当时间戳;需要 **在 transformSpec 里用 JavaScript 解析 seq 并生成 ingestionTime,或者 **在 Kafka Connect 里加 SMT 把 seq 换成 Kafka 事件时间,否则 Druid 会出现 空时间戳拒绝写入。
- 国内金融场景要求 5 级容灾,CouchDB 多活 + Druid 双集群;可在 Kafka MirrorMaker 2 做 跨 Region 复制,Druid 侧用 replace 覆盖策略 保证 北京、上海两机房数据最终一致,同时 seq 全局唯一 用 UUID1 前缀 解决。
- 如果数据量 <1 万条/秒,但单条文档 >2 MB,建议 **在 Kafka Connect 里把 “after” 内容写入 OSS/NAS,Kafka 只传 URL 指针,Druid 用 native batch + index_parallel 每小时拉取一次,避免 Kafka 消息过大被云厂商限流。