将 Harbor 审计日志接入 ELK 并设置告警规则

解读

在国内金融、运营商、政务云等合规场景下,Harbor 作为企业级镜像仓库必须留存180 天以上审计日志,并对接公司统一SOC/等保平台。面试官想验证三件事:

  1. 你是否理解 Harbor 审计事件的产生机制与字段含义;
  2. 能否在不改 Harbor 源码的前提下,用容器化方式把日志送进 ELK,并保证高可用与低延迟
  3. 能否结合ATT&CK 容器矩阵设计可落地的告警规则,而非简单关键字匹配。

知识点

  • Harbor 审计日志源:core 组件的 /var/log/hbor/core.log 及 database 表 audit_log推荐采集落盘文件避免直接连库。
  • 日志格式:单行 JSON,关键字段 usernameoperationresourceresource_typetimestampproject_name
  • 容器采集方案:
    – Sidecar + HostPath:Harbor 用 Helm 部署,给 core Pod 注入 Filebeat sidecar,共享 emptyDir 卷,零侵入。
    – DaemonSet:节点级 Filebeat 采集 /data/hbor/logs/*,需加multiline.pattern防止 Java 堆栈换行。
  • 传输链:Filebeat → Kafka(三节点)→ Logstash → Elasticsearch(冷热分层,hot 节点 SSD 7 天,warm 节点 HDD 90 天)。
  • 索引策略:按 project_nameILM rollover,每日 50 GB 一个索引,shard 数 = 数据节点数 × 1.5
  • 字段清洗:Logstash 用 ruby filtertimestamp 转成 @timestamp,并补全 user_domain(LDAP 场景)。
  • 告警引擎:ElasAlert2 或 Kibana Watcher,必须走公司统一告警通道(飞书/钉钉/短信),支持工作日 9-18 点静默
  • 规则设计:
    1. 镜像投毒:同一用户 5 分钟内推送 3 个以上不同版本且镜像 size < 1 MB;
    2. 凭证泄露operation=deleteresource_type=robot_accountusername!=system-admin
    3. 越权拉取project_name=financeusername 不在预设 LDAP 组 cn=finance-pullers
    4. 暴力破解:同一 IP 15 分钟内产生 20 条以上 operation=loginhttp_status=401
  • 性能调优:Filebeat 开启 queue.mem.events=4096harvester_limit=64,避免 Docker stdout 阻塞;Logstash 使用 persistent queue 防止 ES 宕机丢数据。
  • 安全加固:Kafka 开启 SASL/SCRAM,Filebeat 到 Logstash 走 TLS 双向认证;索引设置 xpack.security.role 实现“安全团队只读、运维读写”。

答案

  1. 采集侧
    a. 用 Helm 安装 Harbor 时追加 values:

    core:
      extraVolumes:
        - name: audit-logs
          emptyDir: {}
      extraVolumeMounts:
        - name: audit-logs
          mountPath: /var/log/hbor
    

    b. 构建 Filebeat sidecar 镜像(基于官方 8.5.3,加入 harbor-audit.yml):

    filebeat.inputs:
    - type: log
      paths:
        - /var/log/hbor/core.log
      json.keys_under_root: true
      json.add_error_key: true
      multiline.pattern: '^\{'
      multiline.negate: true
      multiline.match: after
    output.kafka:
      hosts: ["kafka-0.kafka:9092","kafka-1.kafka:9092","kafka-2.kafka:9092"]
      topic: 'harbor-audit'
      required_acks: 1
      compression: gzip
      ssl.certificate_authorities: ["/certs/ca.crt"]
      ssl.certificate: "/certs/filebeat.crt"
      ssl.key: "/certs/filebeat.key"
    

    c. 将 sidecar 注入 core deployment,保证滚动升级不断流

  2. 传输与存储
    a. Kafka 三节点跨 AZ 部署,min.insync.replicas=2,保证等保 3 级容灾。
    b. Logstash pipeline:

    input {
      kafka {
        bootstrap_servers => "kafka:9092"
        topics => ["harbor-audit"]
        codec => json
        group_id => "logstash-harbor"
        auto_offset_reset => "latest"
        security_protocol => "SASL_SSL"
        sasl_mechanism => "SCRAM-SHA-512"
      }
    }
    filter {
      ruby {
        code => 'event.set("@timestamp", LogStash::Timestamp.new(Time.at(event.get("timestamp").to_i)))'
      }
      if [resource_type] == "robot_account" and [operation] == "delete" and [username] !~ /^system-/ {
        mutate { add_tag => ["credential_leak"] }
      }
    }
    output {
      elasticsearch {
        hosts => ["https://es-hot:9200"]
        index => "harbor-audit-%{+YYYY.MM.dd}"
        ilm_enabled => true
        ilm_rollover_alias => "harbor-audit"
        ilm_pattern => "000001"
        ilm_policy => "harbor-audit-policy"
        user => "logstash_writer"
        password => "${LOGSTASH_PWD}"
        ssl => true
        cacert => "/certs/es-ca.crt"
      }
    }
    

    c. ILM 策略:hot 7 天 → warm 90 天 → cold 180 天 → delete,warm 阶段副本数 0 节省 50% 存储

  3. 告警
    a. 使用 ElasAlert2 部署为 StatefulSet,规则文件以 ConfigMap 挂载,热更新。
    b. 示例规则(镜像投毒):

    name: Harbor Image Poisoning
    type: frequency
    index: harbor-audit
    num_events: 3
    timeframe:
      minutes: 5
    filter:
      - terms:
          operation: ["push"]
      - script:
          script:
            source: "doc['resource.size'].size() > 0 && doc['resource.size'].value < 1048576"
    alert:
      - "dingtalk"
    dingtalk_webhook: "https://oapi.dingtalk.com/robot/send?access_token=${TOKEN}"
    dingtalk_msgtype: "text"
    alert_text: |
      用户 {0} 于 {1} 在项目 {2} 连续推送 3 个小镜像,疑似投毒,请立即核查。
    

    c. 告警收敛:同一规则 5 分钟内只发一次,并自动创建 JIRA 工单(通过 webhook 调用内部工单系统)。

  4. 验证
    a. 使用 helm test 启动一个临时 Pod,执行:

    docker tag alpine:latest harbor.example.com/finance/alpine:test
    docker push harbor.example.com/finance/alpine:test
    

    重复 3 次,5 分钟内收到钉钉告警即算闭环。
    b. 通过 Kibana Discover 检查字段完整、@timestamp 无 8 小时时差(时区统一 Asia/Shanghai)。

拓展思考

  • 如果 Harbor 部署在边缘 K3s 集群,带宽只有 10 Mbps,可改用 Filebeat 本地先写磁盘队列,夜间低峰时段通过 rsync+VPN 把日志文件批量传到中心 Kafka,降低专线费用。
  • 多 Harbor 实例(北京主、上海备)场景,可在 Kafka 层做 MirrorMaker 2 双向复制,告警规则只需部署一套,避免重复建设。
  • 未来 Harbor 接入 OPA Gatekeeper 后,可把策略拒绝事件也送进同一索引,实现“事前拦截 + 事后审计”统一视图。