最近刚好打算使用 for
遍历 CIDR 数组的方式替换性能较差的 LPM trie bpf map,就顺便总结一下 loop 相关经验。
在 eBPF 中,出于性能、安全考虑,并不支持无限循环。这并不是说 eBPF 不支持循环,eBPF 支持的是有限的循环:使用 #pragma unroll
的 for
语句,bpf_for_each_map_elem()
,和 bpf_loop()
。
bpf_for_each_map_elem()
见名知意,bpf_for_each_map_elem()
就是用来遍历 bpf map 的。比如,将所有防火墙规则保存到一个 bpf map 里,然后通过遍历的方式一条一条规则地进行匹配。eBPF Talk: 低性能 eBPF ACL 里使用的便是 bpf_for_each_map_elem()
。
bpf_for_each_map_elem()
的使用文档请到 ${KERNEL}/include/uapi/linux/bpf.h 里搜索。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
static int
matching_rule(struct bpf_map *map, const __u32 *key, struct rule_policy *value, struct rule_matching *match) {
if (*key >= ACL_RULE_NUM)
return 1;
// protocol
proto = (typeof(proto))bpf_map_lookup_elem(&acl_protocol, key);
// sport
pr = (typeof(pr))bpf_map_lookup_elem(&acl_sport, key);
// dport
pr = (typeof(pr))bpf_map_lookup_elem(&acl_dport, key);
// saddr
m = (typeof(m))bpf_map_lookup_elem(&acl_saddr, key);
val = (typeof(val))bpf_map_lookup_elem(m, &k);
// daddr
m = (typeof(m))bpf_map_lookup_elem(&acl_daddr, key);
val = (typeof(val))bpf_map_lookup_elem(m, &k);
__builtin_memcpy(&match->policy, value, sizeof(*value));
match->matched = 1;
return 1;
}
static __always_inline int
match_acl_rules(struct xdp_md *ctx) {
struct rule_matching match = {};
bpf_for_each_map_elem(&acl_rule_policy, matching_rule, &match, 0);
return match.matched ? match.policy.action : XDP_PASS;
}
|
bpf_loop()
尴尬一笑,bpf_loop()
我也还没用过呢。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
/*
* long bpf_loop(u32 nr_loops, void *callback_fn, void *callback_ctx, u64 flags)
* Description
* For **nr_loops**, call **callback_fn** function
* with **callback_ctx** as the context parameter.
* The **callback_fn** should be a static function and
* the **callback_ctx** should be a pointer to the stack.
* The **flags** is used to control certain aspects of the helper.
* Currently, the **flags** must be 0. Currently, nr_loops is
* limited to 1 << 23 (~8 million) loops.
*
* long (\*callback_fn)(u32 index, void \*ctx);
*
* where **index** is the current index in the loop. The index
* is zero-indexed.
*
* If **callback_fn** returns 0, the helper will continue to the next
* loop. If return value is 1, the helper will skip the rest of
* the loops and return. Other return values are not used now,
* and will be rejected by the verifier.
*
* Return
* The number of loops performed, **-EINVAL** for invalid **flags**,
* **-E2BIG** if **nr_loops** exceeds the maximum number of loops.
*/
|
不过,类似 bpf_for_each_map_elem()
,在使用 bpf_loop()
时,给回调函数 callback_fn
传的 ctx
也是一个指向当前栈变量的指针。
不过,不像 bpf_for_each_map_elem()
受限于 bpf map 大小,bpf_loop()
的循环次数高达 1«23 = 8388608(超过 8 百万)次;大大地扩展了 bpf_loop()
的应用场景。
不过,bpf_loop()
并没有受限于 BPF 指令数(1 百万条),这是因为循环发生在 bpf_loop()
帮助函数内部。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
// ${KERNEL}/kernel/bpf/bpf_iter.c
BPF_CALL_4(bpf_loop, u32, nr_loops, void *, callback_fn, void *, callback_ctx,
u64, flags)
{
bpf_callback_t callback = (bpf_callback_t)callback_fn;
u64 ret;
u32 i;
/* Note: these safety checks are also verified when bpf_loop
* is inlined, be careful to modify this code in sync. See
* function verifier.c:inline_bpf_loop.
*/
if (flags)
return -EINVAL;
if (nr_loops > BPF_MAX_LOOPS)
return -E2BIG;
for (i = 0; i < nr_loops; i++) {
ret = callback((u64)i, (u64)(long)callback_ctx, 0, 0, 0);
/* return value: 0 - continue, 1 - stop and return */
if (ret)
return i + 1;
}
return i;
}
|
有趣的是,为了 bpf runtime 正确地执行 bpf_loop()
,bpf verifier 进行了一点 BPF 指令层面的优化。感兴趣的小司机们请查看 ${KERNEL}/kernel/bpf/verifier.c
里的 inline_bpf_loop()
函数。
简单学习 bpf_loop()
后,eBPF Talk: 低性能 eBPF ACL 里的遍历 bpf map 的方式便可使用 bpf_loop()
作无损替换。
bpf_loop()
的使用例子请参考:${KERNEL}/tools/testing/selftests/bpf/progs/bpf_loop.c。
for
循环
eBPF 里支持有限循环次数的 for
循环,且需要使用 #pragma unroll
将 for
循环展开。
因此,for
循环受限于 BPF 指令数量,不过从 5.2 内核开始支持一百万条 BPF 指令(bpf: increase complexity limit and maximum program size)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
static __always_inline int get_rule_action_v4(__u64 *rule_array[], __u32 rules_num) {
// ...
#pragma unroll
for (; rule_array_outer_index < BITMAP_ARRAY_SIZE; rule_array_outer_index += 8) {
get_hit_rules_optimize(rule_array, rules_num, &rule_array_index, &hit_rules);
if (hit_rules > 0) {
break;
}
rule_array_index++;
}
if (rule_array_index >= XDPACL_BITMAP_ARRAY_SIZE_LIMIT) {
// 特殊情况 2
return XDP_PASS;
}
// ...
return XDP_PASS;
}
|
此代码片段来自 eBPF Talk: 再论高性能 eBPF ACL。
注意该 ACL 里遍历 bitmap 的实现方式:使用 #pragma unroll
的 for
循环遍历一段 bpf map 里的内存。
遍历少量 CIDR
这就有趣了。最近刚好想用这种方式来避免性能较低的 LPM trie bpf map。
因为需要处理的 CIDR 数量不多,预期就只有几个。而且,考虑到 LPM trie bpf map 的性能损耗。
所以,考虑使用 bpf_for_each_map_elem()
来遍历 CIDR。不过,保存 CIDR 的 bpf map 需要通过 HTTP API 的方式进行动态增删;每次都使用 bpf_for_each_map_elem()
来完全遍历保存 CIDR 的 bpf map 的做法并不是很理想;或是动态调整 bpf map 里的 CIDR 不是很方便。
于是,当想起高性能 eBPF ACL 里遍历 bitmap 的方式的时候,便有了解决办法:
- 保存 CIDR 的 bpf map 的容量为 1。
- bpf map 的值为 CIDR 数组,如
struct { __be32 addr; __be32 mask; }[10]
。
- 使用 0 索引查询出 CIDR 数组。
- 使用
#pragma unroll
的 for
循环遍历 CIDR 数组;当 addr
和 mask
都是 0 时提前结束遍历。
- 用户态应用程序动态增删 CIDR 时,每次都将 CIDR 全量更新到 bpf map 里;而且确保 CIDR 之间没有空洞。
完美,我都为如此优雅、如此高性能的解决办法感到惊讶!
总结
#pragma unroll
的 for
循环对内核版本要求较小,但适用的场景较少。不过对于一些特殊需求,用其遍历 bpf map 内存的方式会带来奇效。
bpf_for_each_map_elem()
since 5.13 kernel,在不少 5.15 内核的服务器上能够用其处理一些需要遍历的场景,比如遍历匹配防火墙规则。
bpf_loop()
since 5.17 kernel,更是扩展了 bpf_for_each_map_elem()
的遍历能力,使得 eBPF 遍历的能力不再受限于 BPF 指令数量(for
)和 bpf map(bpf_for_each_map_elem()
)。
经验易于沉淀,以后还是多多总结经验吧。