bpf2bpf 特性简介 中提及到 bpf_tail_call(),我们就认真学习一下它吧。

bpf_tail_call

从 4.2 内核版本开始,eBPF 支持了尾调用特性。 该特性的主要特点是需要使用 bpf_tail_call() 这个帮助函数。 参考 BPF Features by Linux Kernel Version

尾调用,顾名思义就是调用另外一个 eBPF 程序,而不再返回。(bpf2bpf 特性是能够返回的)。 在尾调用的时候,会复用同一个栈空间,复用 R6-R9 寄存器(R1-R5 寄存器用于传递参数)。 这就允许将多个 eBPF 程序串联起来,从而能够突破 4096 个指令的限制(在 5.2 内核之前,每个 eBPF 程序的指令数量限制是 4096,之后则是 100 万)。

尾调用是有调用次数限制的,目前限制为 32 次,由内核宏 MAX_TAIL_CALL_CNT 指定。

bpf_tail_call() 使用说明

尾调用的帮助函数声明如下:

1
long bpf_tail_call(void *ctx, struct bpf_map *prog_array_map, u32 index)

在使用的时候,需要提供指向上下文的 ctx,类型为 BPF_MAP_TYPE_PROG_ARRAY 的 map, 和指向目标 eBPF 程序的索引。

然而,在真正使用的时候,还需要用户态应用程序在将 eBPF 程序加载进内核之前将 map 准备好; 就是需要为每个用到的索引添加对应的目标 eBPF 程序。后面有 demo 说明。

bpf_tail_call() VS bpf2bpf

它们之间的对比可以查看 bpf2bpf 特性简介,此处附加以下对比:

  1. 内核版本要求:尾调用要求 4.2,bpf2bpf 要求 4.16
  2. map:尾调用需要查一次 map,相比 bpf2bpf,性能有所损耗
  3. 在尾调用中,函数之间传递参数需要使用 per-CPU 的 map,而无法使用寄存器,即使使用 asm volatile 强行使用寄存器(因为目标 eBPF 程序是单独校验的,从而导致校验失败)
  4. 在编码实现的时候,相比 bpf2bpf,尾调用需要编写更多的代码,无论是 C 代码还是 Go 代码(见后面的 demo 代码)

bpf_tail_call() 原理浅析

咱们不懂 eBPF verifier,先跳过它吧。直接看下运行的时候是怎么实现尾调用的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// kernel/bpf/core.c

static u64 ___bpf_prog_run(u64 *regs, const struct bpf_insn *insn)
{
    // ...

    JMP_TAIL_CALL: {
        struct bpf_map *map = (struct bpf_map *) (unsigned long) BPF_R2;
        struct bpf_array *array = container_of(map, struct bpf_array, map);
        struct bpf_prog *prog;
        u32 index = BPF_R3;

        if (unlikely(index >= array->map.max_entries))
            goto out;
        if (unlikely(tail_call_cnt > MAX_TAIL_CALL_CNT))
            goto out;

        tail_call_cnt++;

        prog = READ_ONCE(array->ptrs[index]);
        if (!prog)
            goto out;

        /* ARG1 at this point is guaranteed to point to CTX from
         * the verifier side due to the fact that the tail call is
         * handled like a helper, that is, bpf_tail_call_proto,
         * where arg1_type is ARG_PTR_TO_CTX.
         */
        insn = prog->insnsi;
        goto select_insn;
out:
        CONT;

    // ...
}

由函数声明 long bpf_tail_call(void *ctx, struct bpf_map *prog_array_map, u32 index) 可知, 第二个参数是 BPF_MAP_TYPE_PROG_ARRAY 类型的 map,第三个参数是索引。

  1. 从寄存器 R2 (对应第二个参数)中拿到 map
  2. 从 map 中拿到 eBPF 程序数组
  3. 从寄存器 R3 (对应第三个参数)中拿到索引
  4. 数组越界判断
  5. 尾调用次数限制判断、尾调用次数递增
  6. 从 eBPF 程序数组中获取索引对应的 eBPF 程序
  7. 将当前指令指针指向目标 eBPF 程序的指令数组
  8. 开始执行目标 eBPF 程序的指令

bpf_tail_call() 例子一则

还是以 tcp 连接事件为例:当前服务器向外发起 tcp 连接、接收 tcp 连接时,打印其中的地址端口信息。 其中的 C 代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
struct {
    __uint(type, BPF_MAP_TYPE_PROG_ARRAY);
    __uint(key_size, 4);
    __uint(value_size, 4);
    __uint(max_entries, 1);
} progs SEC(".maps");

struct {
    __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
    __type(key, __u32);
    __type(value, __u64);
    __uint(max_entries, 1);
} socks SEC(".maps");

typedef struct event {
    __be32 saddr, daddr;
    __be16 sport, dport;
} __attribute__((packed)) event_t;

struct {
    __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
} events SEC(".maps");

SEC("kprobe/hanle_new_connection")
int handle_new_connection(void *ctx)
{
    __u32 key = 0;
    struct sock **skp = bpf_map_lookup_elem(&socks, &key);
    if (!skp)
        return 0;

    bpf_map_delete_elem(&socks, &key);

    struct sock *sk = *skp;
    event_t ev = {};
    ev.saddr = BPF_CORE_READ(sk, __sk_common.skc_rcv_saddr);
    ev.daddr = BPF_CORE_READ(sk, __sk_common.skc_daddr);
    ev.sport = BPF_CORE_READ(sk, __sk_common.skc_num);
    ev.dport = bpf_ntohs(BPF_CORE_READ(sk, __sk_common.skc_dport));

    bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, &ev, sizeof(ev));

    return 0;
}

SEC("kprobe/tcp_connect")
int k_tcp_connect(struct pt_regs *ctx)
{
    struct sock *sk;
    sk = (typeof(sk))PT_REGS_PARM1(ctx);

    __u32 key = 0;
    bpf_map_update_elem(&socks, &key, &sk, BPF_ANY);

    bpf_tail_call_static(ctx, &progs, 0);

    return 0;
}

SEC("kprobe/inet_csk_complete_hashdance")
int k_icsk_complete_hashdance(struct pt_regs *ctx)
{
    struct sock *sk;
    sk = (typeof(sk))PT_REGS_PARM2(ctx);

    __u32 key = 0;
    bpf_map_update_elem(&socks, &key, &sk, BPF_ANY);

    bpf_tail_call_static(ctx, &progs, 0);

    return 0;
}

其中的 Go 代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func main() {
    if err := rlimit.RemoveMemlock(); err != nil {
        log.Fatalf("Failed to remove rlimit memlock: %v", err)
    }

    ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
    defer stop()

    var obj tcpconnObjects
    if err := loadTcpconnObjects(&obj, nil); err != nil {
        log.Fatalf("Failed to load bpf obj: %v", err)
    }
    defer obj.Close()

    // prepare programs for bpf_tail_call()
    prog := obj.tcpconnPrograms.HandleNewConnection
    key := uint32(0)
    if err := obj.tcpconnMaps.Progs.Update(key, prog, ebpf.UpdateAny); err != nil {
        log.Printf("Failed to prepare prog(handle_new_connection): %v", err)
        return
    }

    if kp, err := link.Kprobe("tcp_connect", obj.K_tcpConnect, nil); err != nil {
        log.Printf("Failed to attach kprobe(tcp_connect): %v", err)
        return
    } else {
        defer kp.Close()
        log.Printf("Attached kprobe(tcp_connect)")
    }

    if kp, err := link.Kprobe("inet_csk_complete_hashdance", obj.K_icskCompleteHashdance, nil); err != nil {
        log.Printf("Failed to attach kprobe(inet_csk_complete_hashdance): %v", err)
        return
    } else {
        defer kp.Close()
        log.Printf("Attached kprobe(inet_csk_complete_hashdance)")
    }

    go handlePerfEvent(ctx, obj.Events)

    <-ctx.Done()
}

完整 demo 的源代码请查看 bpf tail call example

小结

只有亲手实践才知道 eBPF 尾递归特性如此难用,还好有了方便又好用的 bpf2bpf 特性。

总结

源码面前,了无秘密,不管多么复杂的源码,即使是 Linux 系统内核的源码。