其实,bpf 子系统里并没有一个叫 bpf_binary_search() 的 helper 函数。

但并不是说,在 eBPF 里就实现不了二分查找了。

小需求

怎么判断一个 IP 地址是否在某几个 CIDR 里?

已知方法有二:

  1. lpm_trie
  2. eBPF Talk: 实战经验之 loop

但它们各自有各自的优缺点:

  1. lpm_trie 实用性强,适合以 CIDR 为 key 的比较复杂的场景。
  2. lpm_trie 在性能上并不占优势,它的查找性能不稳定,取决于 CIDR 的 bit 分布情况,不适用于高效地判断一个 IP 是否在某几个 CIDR 里的场景。
  3. loop 方法的实现简单,性能稳定,对于少量(可能小于等于 8)CIDR 的情况,它能比较高效地判断一个 IP 是否在这几个 CIDR 里。
  4. loop 方法的缺点也比较明显,随着 CIDR 的数量的增加,它的性能会线性下降。

有没有一种查找性能稳定、且查找性能跟 loop 方法差不多的方法呢?

如果不直接限定在 eBPF 里,那么,答案就是二分查找了。

二分查找

对二分查找算法不是很熟悉,上 ChatGPT:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
int binarySearch(int arr[], int left, int right, int target) {
    while (left <= right) {
        int mid = left + (right - left) / 2;

        // Check if target is present at the middle
        if (arr[mid] == target)
            return mid;

        // If target is greater, ignore the left half
        if (arr[mid] < target)
            left = mid + 1;

        // If target is smaller, ignore the right half
        else
            right = mid - 1;
    }

    // Target was not found
    return -1;
}

eBPF 里的二分查找

抄,但不能直接抄:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
struct delay_cidr {
    __u32 start;
    __u32 end;
};

typedef struct {
    struct delay_cidr cidrs[DELAY_CIDR_CAPACITY];
} delay_cidrs_t;

static const volatile delay_cidrs_t delay_cidrs;
static const volatile __u32 delay_cidrs_len = 0;

static __always_inline bool
__should_delay_sip(__be32 ip)
{
    __u32 lo = 0;
    __u32 hi = delay_cidrs_len - 1;
    __u32 addr = bpf_ntohl(ip);

#pragma clang loop unroll(full)
    for (__u32 index = 0; index < 32; index++) {
        if (lo > hi)                    // Checking lo > hi for the end of binary search.
            return false;

        __u32 mid = (lo + hi) >> 1;
        if (mid >= DELAY_CIDR_CAPACITY) // It's required to do bound check for mid.
            return false;

        struct delay_cidr *cidr = (typeof(cidr))&delay_cidrs.cidrs[mid];
        if (addr >= cidr->start && addr <= cidr->end) {
            return true;
        }

        if (addr < cidr->start) {
            hi = mid - 1;
        } else {
            lo = mid + 1;
        }
    }

    return false;
}

其中需要注意:

  1. 将 IP 地址转换为 __u32 类型,是为了方便比较。
  2. eBPF 里只支持有限次数的循环,所以不能直接套用 while (lo <= hi)
  3. 访问 cidrs 数组前,需要先对 mid 进行边界检查。
  4. 先判断 IP 地址是否在 mid 对应的 CIDR 里,再调整 lohi

完美,以上代码片段实现了 log2 级别的查找性能。

但是,以上代码片段通不过 verifier,报错 “R3 unbounded memory access, make sure to bounds check any such access”。

eBPF 里的二分查找(bpf_loop

因为上面的代码片段过不了 verifier,所以,我们需要换一种思路:借助 bpf_loop() helper 函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
struct delay_ctx {
    __u32 lo, hi;
    __u32 ip;
    bool found;
};

static long loop_delay_cidrs(__u32 index, struct delay_ctx *ctx)
{
    if (!ctx)                               // It's required to check NULL ctx.
        return 1;

    if (ctx->lo > ctx->hi)                  // Checking lo > hi for the end of binary search.
        return 1;

    __u32 mid = (ctx->lo + ctx->hi) >> 1;
    if (mid >= DELAY_CIDR_CAPACITY)         // It's required to do bound check for mid.
        return 1;

    struct delay_cidr *cidr = (typeof(cidr))&delay_cidrs.cidrs[mid];
    if (ctx->ip >= cidr->start && ctx->ip <= cidr->end) {
        ctx->found = true;
        return 1;
    }

    if (ctx->ip < cidr->start) {
        ctx->hi = mid - 1;
    } else {
        ctx->lo = mid + 1;
    }

    return 0;
}

static __always_inline bool
__should_delay_sip(__be32 ip)
{
    struct delay_ctx ctx = {
        .lo = 0,
        .hi = delay_cidrs_len - 1,
        .ip = bpf_ntohl(ip),
        .found = false,
    };

    bpf_loop(32, loop_delay_cidrs, &ctx, 0);

    return ctx.found;
}

该代码片段的实现逻辑跟 for 循环的实现逻辑一样,但是,它可以通过 verifier。

不过嘛,bpf_loop() helper 函数要求 5.17 内核。

小结

没有做性能测试,不过能够在 eBPF 里实现二分查找是一件很有意思的事情,不是吗?

二分查找的实现思路有两个:

  1. 通过 for 循环实现。
  2. 通过 bpf_loop() helper 函数实现。

其中,相比 bpf_loop() helper 的实现方式,for 循环实现的方式更具有普适性。

问题:有没有办法解决 for 循环实现方式的 verifier 问题呢?