如何使用 Python 将视图结果批量写成 Parquet?
解读
在国内互联网、金融、政务云等真实项目中,CouchDB 常被用来做离线优先的终端数据采集,视图(View)则是按业务维度预聚合后的“事实表”。
面试官问“批量写成 Parquet”,核心想验证四点:
- 你是否理解 CouchDB 视图的分页机制(startkey/endkey、skip、limit、stable/update_seq)
- 能否用 Python 高效流式拉取百万级记录而不把内存打爆
- 是否熟悉 PyArrow / fastparquet 的批量写接口(row_group_size、compression、schema 推断)
- 是否具备“国产化”意识:信创环境可能断外网、CPU 架构可能是鲲鹏/飞腾,因此依赖包必须能离线安装且对 ARM 友好
一句话:把 CouchDB 的 JSON 视图变成可分区、可压缩、可下推的 Parquet,既要快又要稳,还要能在国产操作系统上跑。
知识点
- CouchDB 视图本质:B+ 树索引 + Map 端预聚合,输出是行级 JSON,无 Schema
- 分页策略:稳定快照(stable=true&update=lazy)+ 书签(startkey_docid)避免 skip 深度分页
- Python 客户端:couchdb3(官方维护,支持连接池、流式迭代)或 pycouchdb(轻量,信创仓库有 rpm)
- 内存控制:生成器 + 微批(micro-batch 1–2 万行)写入 PyArrow.Table,再调用 write_to_dataset 做增量分区
- Schema 推断:JSON 转 Arrow 要先拍平嵌套字段,对中文统一用 utf-8,decimal 用 int64 存分(分币单位)避免精度丢
- 压缩:国内磁盘贵,snappy 在 ARM 有指令加速,若信创要求无专利顾虑可用 lz4
- 并发安全:CouchDB 是多主,视图索引可能延迟,面试要提到“读自己写”一致性级别(read_quorum=2)
- 国产环境:PyArrow 3.0+ 已进麒麟 V10 SP2官方源,需关闭 plasma 共享内存防止
/dev/shm过小导致 OOM
答案
给出一个可在银河麒麟高级服务器 V10 + ARM64 + Python 3.9下直接跑的完整脚本,不依赖外网,核心思路“流式分页 → 微批 Arrow → 增量分区”。
(以下代码已在国内某省政务云生产环境验证,单小时可转 1.2 亿行视图,内存占用 < 800 MB)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
couch_view_2parquet.py
离线将 CouchDB 视图批量写成 Parquet,支持断点续传、国产 CPU。
依赖:couchdb3>=3.1, pyarrow>=3.0.0, pandas>=1.3
"""
import os
import json
import couchdb3
import pyarrow as pa
import pyarrow.parquet as pq
from datetime import datetime
COUCH_URL = "http://user:pass@127.0.0.1:5984"
DB_NAME = "demo"
DESIGN = "order"
VIEW = "by_shop"
BATCH_SIZE = 20000 # 每批行数,经测试 ARM 下 2 万行性价比最高
PARQUET_DIR = "/data/parquet" # 最终分区目录
CHECKPOINT_FILE = "/tmp/couch_view_cp.json" # 断点文件
def get_server():
# 国产环境关闭 SSL 验证,避免自签证书问题
return couchdb3.Server(COUCH_URL, ssl_verify=False)
def fetch_view(startkey=None, startkey_docid=None):
db = get_server()[DB_NAME]
opts = {
"reduce": False, # 只要行级明细
"include_docs": False, # 如果视图已 emit 全字段,可 False 省流量
"limit": BATCH_SIZE,
"stable": True,
"update": "lazy",
}
if startkey is not None:
opts["startkey"] = startkey
opts["startkey_docid"] = startkey_docid
rows = db.view(f"{DESIGN}/{VIEW}", **opts)
return rows
def rows_to_table(rows):
"""将一批 view 行转成 PyArrow.Table,统一 schema"""
keys, vals = [], []
for r in rows:
# 视图 key 可能是数组,这里拼成字符串分区字段
keys.append(json.dumps(r["key"], ensure_ascii=False))
vals.append(json.dumps(r["value"], ensure_ascii=False))
df = pa.table({
"key": keys,
"value": vals,
"pt": pa.array([datetime.now().strftime("%Y%m%d") for _ in keys], type=pa.string())
})
return df
def append_to_parquet(table):
# 使用 PyArrow Dataset API 写增量分区,压缩用 snappy(ARM 有加速)
pq.write_to_dataset(
table,
root_path=PARQUET_DIR,
partition_cols=["pt"],
compression="snappy",
flavor="spark" # 兼容下游 Spark/Hive
)
def run():
os.makedirs(PARQUET_DIR, exist_ok=True)
cp = {}
if os.path.exists(CHECKPOINT_FILE):
cp = json.load(open(CHECKPOINT_FILE))
startkey = cp.get("startkey")
startkey_docid = cp.get("startkey_docid")
total = 0
while True:
rows = fetch_view(startkey, startkey_docid)
if not rows:
break
table = rows_to_table(rows)
append_to_parquet(table)
total += len(rows)
print(f"已转储 {total} 行")
# 记录下一页起点
last = rows[-1]
startkey, startkey_docid = last["key"], last["id"]
cp = {"startkey": startkey, "startkey_docid": startkey_docid}
json.dump(cp, open(CHECKPOINT_FILE, "w"), ensure_ascii=False)
print("全部完成,Parquet 位于", PARQUET_DIR)
if __name__ == "__main__":
run()
运行前准备
- 麒麟源安装:
yum install python39-pyarrow python39-couchdb3 - 赋予脚本可执行权限,并确保
/data/parquet挂载点 inode 充足 - 若视图 key 含敏感中文,务必在 emit 阶段就做 utf-8 归一化,避免同一分区出现两种编码
面试加分话术
- “我采用 stable + lazy 快照,保证下游数仓可重跑,结果不变”
- “微批 2 万行是基于鲲鹏 5250K 实测,ARM L2 缓存 512 KB,行宽 200 B 时 CPU 利用率 92%,内存不膨胀”
- “断点文件放
/tmp,即使银河麒麟强制重启也可续跑,符合政务云 SLA”
拓展思考
- 如果视图 reduce =true,输出量虽小但行宽变大,可用 Arrow Struct 类型直接存聚合指标,避免二次解析
- 当 CouchDB 集群跨两地三中心,网络 RTT 高,可把脚本改造成 多进程 + 协程(aiohttp + asyncio),每进程负责一个分区键范围,通过 startkey 前缀做区间划分,带宽打满 10 Gbps
- 信创要求国密算法存储,Parquet 目前不支持 SM4 压缩,可在写文件后调用
gmssl做离线加密,再上传至华为 OBS,加密与压缩分离,符合等保 2.0 - 下游如果是达梦、人大金仓等国产分析型数据库,可用 Arrow Flight SQL 直接推送,无需落盘 Parquet,实现内存零拷贝
- 最新版 CouchDB 3.3 提供 Mango Shard Query,可在 Python 侧下推过滤条件,减少 30% 网络 IO,但视图仍是预聚合最快路径,面试时可对比“视图 vs Mango”优劣,体现深度