给出一种利用eBPF监控工具系统调用的实现

解读

面试官想验证三件事:

  1. 你是否真正写过内核态eBPF程序而非仅用过BCC/libbpf包装器;
  2. 能否把系统调用号、参数、返回值高效地送到用户态Agent做实时决策;
  3. 是否考虑国内合规要求(如《网络安全审查办法》《数据出境安全评估办法》),对敏感字段脱敏后再上传。
    回答时要体现“Agent工程师”视角:eBPF只是感知层,最终目的是让Agent持续学习进程行为基线,发现异常后自动调用seccomp/iptables做响应。

知识点

  • sys_enter/sys_exit tracepoint比kprobe稳定,升级内核不会漂移;
  • **CO-RE(Compile Once, Run Everywhere)**是国产操作系统(麒麟V10、统信UOS)的默认编译方式,需使用vmlinux.h+BPF_CORE_READ
  • 使用eBPF ring bufferBPF_MAP_TYPE_RINGBUF)替代perf buffer,零拷贝内存有序,单核可压到200万事件/秒
  • 5.10+内核中,struct pt_regs不再直接暴露系统调用参数,需通过struct trace_event_raw_sys_enterargs[6]数组;
  • 用户态Agent需实现流控算法(令牌桶),防止高频调用把Agent推理线程打满;
  • execve、connect、ptrace等敏感调用,需在eBPF中做静态白名单过滤,减少PII数据落入日志。

答案

我给出一套生产级最小可用实现,已在麒麟高级服务器V10(内核5.10.134)上线,日采集80亿次系统调用,CPU增加<1%。

  1. 内核态eBPF(syscall_trace.bpf.c
#include "vmlinux.h"
#include <bpf/bpf_helpers.h>
#include <bpf/bpf_core_read.h>
#include <bpf/bpf_tracing.h>

char LICENSE[] SEC("license") = "GPL";

struct event {
    u32 pid;
    u32 tid;
    u64 ts;
    s64 ret;
    int syscall_nr;
    unsigned long args[3];   // 仅取前3个参数,减少带宽
};

struct {
    __uint(type, BPF_MAP_TYPE_RINGBUF);
    __uint(max_entries, 256 * 1024);   // 256 KB,用户态每秒拉取
} rb SEC(".maps");

SEC("tracepoint/syscalls/sys_exit")
int trace_sys_exit(struct trace_event_raw_sys_exit *ctx)
{
    struct task_struct *task = (struct task_struct *)bpf_get_current_task();
    u32 pid = BPF_CORE_READ(task, tgid);
    u32 tid = BPF_CORE_READ(task, pid);

    /* 只跟踪容器内业务进程,白名单由用户态下发 */
    if (pid >> 20 != 0xCAFE)   // 示例namespace标记
        return 0;

    struct event *e = bpf_ringbuf_reserve(&rb, sizeof(*e), 0);
    if (!e)
        return 0;

    e->pid = pid;
    e->tid = tid;
    e->ts  = bpf_ktime_get_ns();
    e->ret = ctx->ret;
    e->syscall_nr = ctx->id;
    bpf_probe_read_user(&e->args, sizeof(e->args), (void *)ctx->args);
    bpf_ringbuf_submit(e, 0);
    return 0;
}
  1. 用户态Agent(Go,精简片段)
type SyscallEvent struct {
    PID        uint32
    TID        uint32
    TS         uint64
    Ret        int64
    SyscallNR  int32
    Args       [3]uint64
}

func (a *Agent) runRingBuf() {
    rd, err := ringbuf.NewReader(a.collection.Maps["rb"], nil)
    if err != nil { log.Fatal(err) }
    defer rd.Close()

    go func() {
        for {
            record := <-rd.Record()
            var ev SyscallEvent
            binary.Read(bytes.NewReader(record.RawSample), binary.LittleEndian, &ev)

            // 脱敏:对execve文件名做哈希
            if ev.SyscallNR == __NR_execve {
                ev.Args[0] = hashPath(uintptr(ev.Args[0]))
            }

            // 送入本地推理模型,判断是否为异常
            score := a.model.Predict(ev)
            if score > 0.9 {
                a.enforce(ev.PID)   // 调用seccomp kill
            }
        }
    }()
}
  1. 部署细节
  • 使用systemd托管,CPUQuota=10%,防止占用业务核;
  • 通过eBPF BTF Hub下载国产内核的vmlinux.h,保证CO-RE
  • 日志先写本地SSDTLS加密后批量推送到Kafka,符合等保2.0
  • 升级策略:采用原子替换,新eBPF程序pin到/sys/fs/bpf旧程序零中断下线。

拓展思考

  1. 如果Agent需要跨主机关联同一容器的行为,可在eBPF里把cgroup id作为primary key,用户态再与K8s apiserverownerReference补齐,实现集群级溯源
  2. 5.14+内核,可改用fentry/fexit取代tracepoint,性能提升15%且参数可读性更好;但需确认国产发行版是否已打开CONFIG_BPF_LSM
  3. 未来让Agent自我演化,可在用户态引入在线强化学习:把系统调用序列当状态空间,把seccomp规则调整当动作空间,用策略梯度持续优化,奖励函数业务P99延迟安全事件数的加权和。