将 Harbor 审计日志接入 ELK 并设置告警规则
解读
在国内金融、运营商、政务云等合规场景下,Harbor 作为企业级镜像仓库必须留存180 天以上审计日志,并对接公司统一SOC/等保平台。面试官想验证三件事:
- 你是否理解 Harbor 审计事件的产生机制与字段含义;
- 能否在不改 Harbor 源码的前提下,用容器化方式把日志送进 ELK,并保证高可用与低延迟;
- 能否结合ATT&CK 容器矩阵设计可落地的告警规则,而非简单关键字匹配。
知识点
- Harbor 审计日志源:core 组件的
/var/log/hbor/core.log及 database 表audit_log,推荐采集落盘文件避免直接连库。 - 日志格式:单行 JSON,关键字段
username、operation、resource、resource_type、timestamp、project_name。 - 容器采集方案:
– Sidecar + HostPath:Harbor 用 Helm 部署,给 core Pod 注入 Filebeat sidecar,共享 emptyDir 卷,零侵入。
– DaemonSet:节点级 Filebeat 采集/data/hbor/logs/*,需加multiline.pattern防止 Java 堆栈换行。 - 传输链:Filebeat → Kafka(三节点)→ Logstash → Elasticsearch(冷热分层,hot 节点 SSD 7 天,warm 节点 HDD 90 天)。
- 索引策略:按
project_name做 ILM rollover,每日 50 GB 一个索引,shard 数 = 数据节点数 × 1.5。 - 字段清洗:Logstash 用 ruby filter 把
timestamp转成@timestamp,并补全user_domain(LDAP 场景)。 - 告警引擎:ElasAlert2 或 Kibana Watcher,必须走公司统一告警通道(飞书/钉钉/短信),支持工作日 9-18 点静默。
- 规则设计:
- 镜像投毒:同一用户 5 分钟内推送 3 个以上不同版本且镜像 size < 1 MB;
- 凭证泄露:
operation=delete且resource_type=robot_account且username!=system-admin; - 越权拉取:
project_name=finance且username不在预设 LDAP 组cn=finance-pullers; - 暴力破解:同一 IP 15 分钟内产生 20 条以上
operation=login且http_status=401。
- 性能调优:Filebeat 开启
queue.mem.events=4096和harvester_limit=64,避免 Docker stdout 阻塞;Logstash 使用 persistent queue 防止 ES 宕机丢数据。 - 安全加固:Kafka 开启 SASL/SCRAM,Filebeat 到 Logstash 走 TLS 双向认证;索引设置 xpack.security.role 实现“安全团队只读、运维读写”。
答案
-
采集侧
a. 用 Helm 安装 Harbor 时追加 values:core: extraVolumes: - name: audit-logs emptyDir: {} extraVolumeMounts: - name: audit-logs mountPath: /var/log/hborb. 构建 Filebeat sidecar 镜像(基于官方 8.5.3,加入 harbor-audit.yml):
filebeat.inputs: - type: log paths: - /var/log/hbor/core.log json.keys_under_root: true json.add_error_key: true multiline.pattern: '^\{' multiline.negate: true multiline.match: after output.kafka: hosts: ["kafka-0.kafka:9092","kafka-1.kafka:9092","kafka-2.kafka:9092"] topic: 'harbor-audit' required_acks: 1 compression: gzip ssl.certificate_authorities: ["/certs/ca.crt"] ssl.certificate: "/certs/filebeat.crt" ssl.key: "/certs/filebeat.key"c. 将 sidecar 注入 core deployment,保证滚动升级不断流。
-
传输与存储
a. Kafka 三节点跨 AZ 部署,min.insync.replicas=2,保证等保 3 级容灾。
b. Logstash pipeline:input { kafka { bootstrap_servers => "kafka:9092" topics => ["harbor-audit"] codec => json group_id => "logstash-harbor" auto_offset_reset => "latest" security_protocol => "SASL_SSL" sasl_mechanism => "SCRAM-SHA-512" } } filter { ruby { code => 'event.set("@timestamp", LogStash::Timestamp.new(Time.at(event.get("timestamp").to_i)))' } if [resource_type] == "robot_account" and [operation] == "delete" and [username] !~ /^system-/ { mutate { add_tag => ["credential_leak"] } } } output { elasticsearch { hosts => ["https://es-hot:9200"] index => "harbor-audit-%{+YYYY.MM.dd}" ilm_enabled => true ilm_rollover_alias => "harbor-audit" ilm_pattern => "000001" ilm_policy => "harbor-audit-policy" user => "logstash_writer" password => "${LOGSTASH_PWD}" ssl => true cacert => "/certs/es-ca.crt" } }c. ILM 策略:hot 7 天 → warm 90 天 → cold 180 天 → delete,warm 阶段副本数 0 节省 50% 存储。
-
告警
a. 使用 ElasAlert2 部署为 StatefulSet,规则文件以 ConfigMap 挂载,热更新。
b. 示例规则(镜像投毒):name: Harbor Image Poisoning type: frequency index: harbor-audit num_events: 3 timeframe: minutes: 5 filter: - terms: operation: ["push"] - script: script: source: "doc['resource.size'].size() > 0 && doc['resource.size'].value < 1048576" alert: - "dingtalk" dingtalk_webhook: "https://oapi.dingtalk.com/robot/send?access_token=${TOKEN}" dingtalk_msgtype: "text" alert_text: | 用户 {0} 于 {1} 在项目 {2} 连续推送 3 个小镜像,疑似投毒,请立即核查。c. 告警收敛:同一规则 5 分钟内只发一次,并自动创建 JIRA 工单(通过 webhook 调用内部工单系统)。
-
验证
a. 使用 helm test 启动一个临时 Pod,执行:docker tag alpine:latest harbor.example.com/finance/alpine:test docker push harbor.example.com/finance/alpine:test重复 3 次,5 分钟内收到钉钉告警即算闭环。
b. 通过 Kibana Discover 检查字段完整、@timestamp 无 8 小时时差(时区统一 Asia/Shanghai)。
拓展思考
- 如果 Harbor 部署在边缘 K3s 集群,带宽只有 10 Mbps,可改用 Filebeat 本地先写磁盘队列,夜间低峰时段通过 rsync+VPN 把日志文件批量传到中心 Kafka,降低专线费用。
- 对多 Harbor 实例(北京主、上海备)场景,可在 Kafka 层做 MirrorMaker 2 双向复制,告警规则只需部署一套,避免重复建设。
- 未来 Harbor 接入 OPA Gatekeeper 后,可把策略拒绝事件也送进同一索引,实现“事前拦截 + 事后审计”统一视图。