最近刚好打算使用 for 遍历 CIDR 数组的方式替换性能较差的 LPM trie bpf map,就顺便总结一下 loop 相关经验。

在 eBPF 中,出于性能、安全考虑,并不支持无限循环。这并不是说 eBPF 不支持循环,eBPF 支持的是有限的循环:使用 #pragma unrollfor 语句,bpf_for_each_map_elem(),和 bpf_loop()

bpf_for_each_map_elem()

见名知意,bpf_for_each_map_elem() 就是用来遍历 bpf map 的。比如,将所有防火墙规则保存到一个 bpf map 里,然后通过遍历的方式一条一条规则地进行匹配。eBPF Talk: 低性能 eBPF ACL 里使用的便是 bpf_for_each_map_elem()

bpf_for_each_map_elem() 的使用文档请到 ${KERNEL}/include/uapi/linux/bpf.h 里搜索。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
static int
matching_rule(struct bpf_map *map, const __u32 *key, struct rule_policy *value, struct rule_matching *match) {
    if (*key >= ACL_RULE_NUM)
        return 1;

    // protocol
    proto = (typeof(proto))bpf_map_lookup_elem(&acl_protocol, key);

    // sport
    pr = (typeof(pr))bpf_map_lookup_elem(&acl_sport, key);

    // dport
    pr = (typeof(pr))bpf_map_lookup_elem(&acl_dport, key);

    // saddr
    m = (typeof(m))bpf_map_lookup_elem(&acl_saddr, key);
    val = (typeof(val))bpf_map_lookup_elem(m, &k);

    // daddr
    m = (typeof(m))bpf_map_lookup_elem(&acl_daddr, key);
    val = (typeof(val))bpf_map_lookup_elem(m, &k);

    __builtin_memcpy(&match->policy, value, sizeof(*value));
    match->matched = 1;
    return 1;
}

static __always_inline int
match_acl_rules(struct xdp_md *ctx) {
    struct rule_matching match = {};

    bpf_for_each_map_elem(&acl_rule_policy, matching_rule, &match, 0);

    return match.matched ? match.policy.action : XDP_PASS;
}

bpf_loop()

尴尬一笑,bpf_loop() 我也还没用过呢。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/*
 * long bpf_loop(u32 nr_loops, void *callback_fn, void *callback_ctx, u64 flags)
 *  Description
 *      For **nr_loops**, call **callback_fn** function
 *      with **callback_ctx** as the context parameter.
 *      The **callback_fn** should be a static function and
 *      the **callback_ctx** should be a pointer to the stack.
 *      The **flags** is used to control certain aspects of the helper.
 *      Currently, the **flags** must be 0. Currently, nr_loops is
 *      limited to 1 << 23 (~8 million) loops.
 *
 *      long (\*callback_fn)(u32 index, void \*ctx);
 *
 *      where **index** is the current index in the loop. The index
 *      is zero-indexed.
 *
 *      If **callback_fn** returns 0, the helper will continue to the next
 *      loop. If return value is 1, the helper will skip the rest of
 *      the loops and return. Other return values are not used now,
 *      and will be rejected by the verifier.
 *
 *  Return
 *      The number of loops performed, **-EINVAL** for invalid **flags**,
 *      **-E2BIG** if **nr_loops** exceeds the maximum number of loops.
 */

不过,类似 bpf_for_each_map_elem(),在使用 bpf_loop() 时,给回调函数 callback_fn 传的 ctx 也是一个指向当前栈变量的指针。

不过,不像 bpf_for_each_map_elem() 受限于 bpf map 大小,bpf_loop() 的循环次数高达 1«23 = 8388608(超过 8 百万)次;大大地扩展了 bpf_loop() 的应用场景。

不过,bpf_loop() 并没有受限于 BPF 指令数(1 百万条),这是因为循环发生在 bpf_loop() 帮助函数内部。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// ${KERNEL}/kernel/bpf/bpf_iter.c

BPF_CALL_4(bpf_loop, u32, nr_loops, void *, callback_fn, void *, callback_ctx,
       u64, flags)
{
    bpf_callback_t callback = (bpf_callback_t)callback_fn;
    u64 ret;
    u32 i;

    /* Note: these safety checks are also verified when bpf_loop
     * is inlined, be careful to modify this code in sync. See
     * function verifier.c:inline_bpf_loop.
     */
    if (flags)
        return -EINVAL;
    if (nr_loops > BPF_MAX_LOOPS)
        return -E2BIG;

    for (i = 0; i < nr_loops; i++) {
        ret = callback((u64)i, (u64)(long)callback_ctx, 0, 0, 0);
        /* return value: 0 - continue, 1 - stop and return */
        if (ret)
            return i + 1;
    }

    return i;
}

有趣的是,为了 bpf runtime 正确地执行 bpf_loop(),bpf verifier 进行了一点 BPF 指令层面的优化。感兴趣的小司机们请查看 ${KERNEL}/kernel/bpf/verifier.c 里的 inline_bpf_loop() 函数。

简单学习 bpf_loop() 后,eBPF Talk: 低性能 eBPF ACL 里的遍历 bpf map 的方式便可使用 bpf_loop() 作无损替换。

bpf_loop() 的使用例子请参考:${KERNEL}/tools/testing/selftests/bpf/progs/bpf_loop.c

for 循环

eBPF 里支持有限循环次数的 for 循环,且需要使用 #pragma unrollfor 循环展开。

因此,for 循环受限于 BPF 指令数量,不过从 5.2 内核开始支持一百万条 BPF 指令(bpf: increase complexity limit and maximum program size)。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
static __always_inline int get_rule_action_v4(__u64 *rule_array[], __u32 rules_num) {
    // ...

#pragma unroll
    for (; rule_array_outer_index < BITMAP_ARRAY_SIZE; rule_array_outer_index += 8) {
        get_hit_rules_optimize(rule_array, rules_num, &rule_array_index, &hit_rules);
        if (hit_rules > 0) {
            break;
        }
        rule_array_index++;
    }

    if (rule_array_index >= XDPACL_BITMAP_ARRAY_SIZE_LIMIT) {
        // 特殊情况 2
        return XDP_PASS;
    }

    // ...

    return XDP_PASS;
}

此代码片段来自 eBPF Talk: 再论高性能 eBPF ACL

注意该 ACL 里遍历 bitmap 的实现方式:使用 #pragma unrollfor 循环遍历一段 bpf map 里的内存

遍历少量 CIDR

这就有趣了。最近刚好想用这种方式来避免性能较低的 LPM trie bpf map

因为需要处理的 CIDR 数量不多,预期就只有几个。而且,考虑到 LPM trie bpf map 的性能损耗。

所以,考虑使用 bpf_for_each_map_elem() 来遍历 CIDR。不过,保存 CIDR 的 bpf map 需要通过 HTTP API 的方式进行动态增删;每次都使用 bpf_for_each_map_elem() 来完全遍历保存 CIDR 的 bpf map 的做法并不是很理想;或是动态调整 bpf map 里的 CIDR 不是很方便。

于是,当想起高性能 eBPF ACL 里遍历 bitmap 的方式的时候,便有了解决办法:

  1. 保存 CIDR 的 bpf map 的容量为 1。
  2. bpf map 的值为 CIDR 数组,如 struct { __be32 addr; __be32 mask; }[10]
  3. 使用 0 索引查询出 CIDR 数组。
  4. 使用 #pragma unrollfor 循环遍历 CIDR 数组;当 addrmask 都是 0 时提前结束遍历。
  5. 用户态应用程序动态增删 CIDR 时,每次都将 CIDR 全量更新到 bpf map 里;而且确保 CIDR 之间没有空洞。

完美,我都为如此优雅、如此高性能的解决办法感到惊讶!

总结

#pragma unrollfor 循环对内核版本要求较小,但适用的场景较少。不过对于一些特殊需求,用其遍历 bpf map 内存的方式会带来奇效。

bpf_for_each_map_elem() since 5.13 kernel,在不少 5.15 内核的服务器上能够用其处理一些需要遍历的场景,比如遍历匹配防火墙规则。

bpf_loop() since 5.17 kernel,更是扩展了 bpf_for_each_map_elem() 的遍历能力,使得 eBPF 遍历的能力不再受限于 BPF 指令数量(for)和 bpf map(bpf_for_each_map_elem())。

经验易于沉淀,以后还是多多总结经验吧。