最近在使用 ringbuf 的 bpf_ringbuf_reserve() 时踩了一个坑,记录一下。

ringbuf 简介

ringbuf 是 BPF 中能够取代 PERF_EVENT_ARRAY 的特殊 map 类型,提供了类似的 helpers:

根据该 commit,推荐的用法是 bpf_ringbuf_reserve()bpf_ringbuf_submit()/bpf_ringbuf_discard(),而不是 bpf_ringbuf_output();因为 bpf_ringbuf_output() 需要拷贝数据。

bpf_ringbuf_reserve() 有锁吗?

答案:

直接翻看 bpf_ringbuf_reserve() 的源码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// https://github.com/torvalds/linux/blob/5be63fc19fcaa4c236b307420483578a56986a37/kernel/bpf/ringbuf.c#L408


static void *__bpf_ringbuf_reserve(struct bpf_ringbuf *rb, u64 size)
{
    // ...

    cons_pos = smp_load_acquire(&rb->consumer_pos);

    if (in_nmi()) {
        if (!spin_trylock_irqsave(&rb->spinlock, flags))
            return NULL;
    } else {
        spin_lock_irqsave(&rb->spinlock, flags);
    }

    pend_pos = rb->pending_pos;
    prod_pos = rb->producer_pos;
    new_prod_pos = prod_pos + len;

    // ...

    /* pairs with consumer's smp_load_acquire() */
    smp_store_release(&rb->producer_pos, new_prod_pos);

    spin_unlock_irqrestore(&rb->spinlock, flags);

    return (void *)hdr + BPF_RINGBUF_HDR_SZ;
}

BPF_CALL_3(bpf_ringbuf_reserve, struct bpf_map *, map, u64, size, u64, flags)
{
    struct bpf_ringbuf_map *rb_map;

    if (unlikely(flags))
        return 0;

    rb_map = container_of(map, struct bpf_ringbuf_map, map);
    return (unsigned long)__bpf_ringbuf_reserve(rb_map->rb, size);
}

在写 bpf 代码时,没留意该锁对性能的影响,导致性能变得很差。示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
static __always_inline void
record_event(struct xdp_md *xdp)
{
    struct event_t *event;

    event = bpf_ringbuf_reserve(&ringbuf, sizeof(*event));
    if (!event)
        return;

    event->pkt_len = xdp->data_end - xdp->data;
    if (event->pkt_len <= MTU)
        return;

    __fill_event(event, xdp);
    bpf_ringbuf_submit(event, &ringbuf);
}

对于这种情况,需要将 bpf_ringbuf_reserve() 调整到 if 语句之后,避免不必要的锁操作:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
static __always_inline void
record_event(struct xdp_md *xdp)
{
    struct event_t *event;
    int pkt_len;

    pkt_len = xdp->data_end - xdp->data;
    if (pkt_len <= MTU)
        return;

    event = bpf_ringbuf_reserve(&ringbuf, sizeof(*event));
    if (!event)
        return;

    event->pkt_len = pkt_len;
    __fill_event(event, xdp);
    bpf_ringbuf_submit(event, &ringbuf);
}

有没有更高效率的使用办法呢?

bpf_ringbuf_query() BPF_RB_AVAIL_DATA

查看 bpf_ringbuf_query() 的 helper 文档:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
/*
 * bpf_ringbuf_query
 *
 *  Query various characteristics of provided ring buffer. What
 *  exactly is queries is determined by *flags*:
 *
 *  * **BPF_RB_AVAIL_DATA**: Amount of data not yet consumed.
 *  * **BPF_RB_RING_SIZE**: The size of ring buffer.
 *  * **BPF_RB_CONS_POS**: Consumer position (can wrap around).
 *  * **BPF_RB_PROD_POS**: Producer(s) position (can wrap around).
 *
 *  Data returned is just a momentary snapshot of actual values
 *  and could be inaccurate, so this facility should be used to
 *  power heuristics and for reporting, not to make 100% correct
 *  calculation.
 *
 * Returns
 *  Requested value, or 0, if *flags* are not recognized.
 */
static __u64 (*bpf_ringbuf_query)(void *ringbuf, __u64 flags) = (void *) 134;

可以通过 bpf_ringbuf_query() 获取 ringbuf 未消费的数据量,从而推算出可用来塞数据的空间大小。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static __always_inline void
record_event(struct xdp_md *xdp)
{
    struct event_t *event;
    __u64 avail_data;
    int pkt_len;

    pkt_len = xdp->data_end - xdp->data;
    if (pkt_len <= MTU)
        return;

    avail_data = bpf_ringbuf_query(&ringbuf, BPF_RB_AVAIL_DATA);
    if (RINGBUF_SIZE - avail_data < sizeof(*event))
        return;

    event = bpf_ringbuf_reserve(&ringbuf, sizeof(*event));
    if (!event)
        return;

    event->pkt_len = pkt_len;
    __fill_event(event, xdp);
    bpf_ringbuf_submit(event, &ringbuf);
}

不过,查询 BPF_RB_AVAIL_DATA 得付出一点代价:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// https://github.com/torvalds/linux/blob/5be63fc19fcaa4c236b307420483578a56986a37/kernel/bpf/ringbuf.c#L299

static unsigned long ringbuf_avail_data_sz(struct bpf_ringbuf *rb)
{
    unsigned long cons_pos, prod_pos;

    cons_pos = smp_load_acquire(&rb->consumer_pos);
    prod_pos = smp_load_acquire(&rb->producer_pos);
    return prod_pos - cons_pos;
}

BPF_CALL_2(bpf_ringbuf_query, struct bpf_map *, map, u64, flags)
{
    struct bpf_ringbuf *rb;

    rb = container_of(map, struct bpf_ringbuf_map, map)->rb;

    switch (flags) {
    case BPF_RB_AVAIL_DATA:
        return ringbuf_avail_data_sz(rb);
    case BPF_RB_RING_SIZE:
        return ringbuf_total_data_sz(rb);
    case BPF_RB_CONS_POS:
        return smp_load_acquire(&rb->consumer_pos);
    case BPF_RB_PROD_POS:
        return smp_load_acquire(&rb->producer_pos);
    default:
        return 0;
    }
}

其中的 smp_load_acquire() 涉及到内存屏障,会有一定的开销。

参考:LINUX KERNEL MEMORY BARRIERS.

ringbuf 的大小要求

在使用 ringbuf 时,max_entries 必须是 2 的幂次方、而且还要求是 PAGE_SIZE 的倍数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// https://github.com/torvalds/linux/blob/5be63fc19fcaa4c236b307420483578a56986a37/kernel/bpf/ringbuf.c#L189

static struct bpf_map *ringbuf_map_alloc(union bpf_attr *attr)
{
    struct bpf_ringbuf_map *rb_map;

    if (attr->map_flags & ~RINGBUF_CREATE_FLAG_MASK)
        return ERR_PTR(-EINVAL);

    if (attr->key_size || attr->value_size ||
        !is_power_of_2(attr->max_entries) ||
        !PAGE_ALIGNED(attr->max_entries))
        return ERR_PTR(-EINVAL);

    rb_map = bpf_map_area_alloc(sizeof(*rb_map), NUMA_NO_NODE);
    if (!rb_map)
        return ERR_PTR(-ENOMEM);

    bpf_map_init_from_attr(&rb_map->map, attr);

    rb_map->rb = bpf_ringbuf_alloc(attr->max_entries, rb_map->map.numa_node);
    if (!rb_map->rb) {
        bpf_map_area_free(rb_map);
        return ERR_PTR(-ENOMEM);
    }

    return &rb_map->map;
}

总结

记住以下 3 点经验:

  1. 使用 bpf_ringbuf_reserve() 时,要注意锁的开销。
  2. 可以通过 bpf_ringbuf_query() 查询 ringbuf 的未消费数据量,从而推算出可以用来塞数据的空间大小。
  3. ringbuf 的 max_entries 必须是 2 的幂次方、而且还要求是 PAGE_SIZE 的倍数。