最近在使用 ringbuf 的 bpf_ringbuf_reserve()
时踩了一个坑,记录一下。
ringbuf 简介
ringbuf 是 BPF 中能够取代 PERF_EVENT_ARRAY 的特殊 map 类型,提供了类似的 helpers:
根据该 commit,推荐的用法是 bpf_ringbuf_reserve()
加 bpf_ringbuf_submit()
/bpf_ringbuf_discard()
,而不是 bpf_ringbuf_output()
;因为 bpf_ringbuf_output()
需要拷贝数据。
bpf_ringbuf_reserve()
有锁吗?
答案:有。
直接翻看 bpf_ringbuf_reserve()
的源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
// https://github.com/torvalds/linux/blob/5be63fc19fcaa4c236b307420483578a56986a37/kernel/bpf/ringbuf.c#L408
static void *__bpf_ringbuf_reserve(struct bpf_ringbuf *rb, u64 size)
{
// ...
cons_pos = smp_load_acquire(&rb->consumer_pos);
if (in_nmi()) {
if (!spin_trylock_irqsave(&rb->spinlock, flags))
return NULL;
} else {
spin_lock_irqsave(&rb->spinlock, flags);
}
pend_pos = rb->pending_pos;
prod_pos = rb->producer_pos;
new_prod_pos = prod_pos + len;
// ...
/* pairs with consumer's smp_load_acquire() */
smp_store_release(&rb->producer_pos, new_prod_pos);
spin_unlock_irqrestore(&rb->spinlock, flags);
return (void *)hdr + BPF_RINGBUF_HDR_SZ;
}
BPF_CALL_3(bpf_ringbuf_reserve, struct bpf_map *, map, u64, size, u64, flags)
{
struct bpf_ringbuf_map *rb_map;
if (unlikely(flags))
return 0;
rb_map = container_of(map, struct bpf_ringbuf_map, map);
return (unsigned long)__bpf_ringbuf_reserve(rb_map->rb, size);
}
|
在写 bpf 代码时,没留意该锁对性能的影响,导致性能变得很差。示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
static __always_inline void
record_event(struct xdp_md *xdp)
{
struct event_t *event;
event = bpf_ringbuf_reserve(&ringbuf, sizeof(*event));
if (!event)
return;
event->pkt_len = xdp->data_end - xdp->data;
if (event->pkt_len <= MTU)
return;
__fill_event(event, xdp);
bpf_ringbuf_submit(event, &ringbuf);
}
|
对于这种情况,需要将 bpf_ringbuf_reserve()
调整到 if
语句之后,避免不必要的锁操作:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
static __always_inline void
record_event(struct xdp_md *xdp)
{
struct event_t *event;
int pkt_len;
pkt_len = xdp->data_end - xdp->data;
if (pkt_len <= MTU)
return;
event = bpf_ringbuf_reserve(&ringbuf, sizeof(*event));
if (!event)
return;
event->pkt_len = pkt_len;
__fill_event(event, xdp);
bpf_ringbuf_submit(event, &ringbuf);
}
|
有没有更高效率的使用办法呢?
bpf_ringbuf_query()
BPF_RB_AVAIL_DATA
查看 bpf_ringbuf_query()
的 helper 文档:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
/*
* bpf_ringbuf_query
*
* Query various characteristics of provided ring buffer. What
* exactly is queries is determined by *flags*:
*
* * **BPF_RB_AVAIL_DATA**: Amount of data not yet consumed.
* * **BPF_RB_RING_SIZE**: The size of ring buffer.
* * **BPF_RB_CONS_POS**: Consumer position (can wrap around).
* * **BPF_RB_PROD_POS**: Producer(s) position (can wrap around).
*
* Data returned is just a momentary snapshot of actual values
* and could be inaccurate, so this facility should be used to
* power heuristics and for reporting, not to make 100% correct
* calculation.
*
* Returns
* Requested value, or 0, if *flags* are not recognized.
*/
static __u64 (*bpf_ringbuf_query)(void *ringbuf, __u64 flags) = (void *) 134;
|
可以通过 bpf_ringbuf_query()
获取 ringbuf 未消费的数据量,从而推算出可用来塞数据的空间大小。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
static __always_inline void
record_event(struct xdp_md *xdp)
{
struct event_t *event;
__u64 avail_data;
int pkt_len;
pkt_len = xdp->data_end - xdp->data;
if (pkt_len <= MTU)
return;
avail_data = bpf_ringbuf_query(&ringbuf, BPF_RB_AVAIL_DATA);
if (RINGBUF_SIZE - avail_data < sizeof(*event))
return;
event = bpf_ringbuf_reserve(&ringbuf, sizeof(*event));
if (!event)
return;
event->pkt_len = pkt_len;
__fill_event(event, xdp);
bpf_ringbuf_submit(event, &ringbuf);
}
|
不过,查询 BPF_RB_AVAIL_DATA 得付出一点代价:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
// https://github.com/torvalds/linux/blob/5be63fc19fcaa4c236b307420483578a56986a37/kernel/bpf/ringbuf.c#L299
static unsigned long ringbuf_avail_data_sz(struct bpf_ringbuf *rb)
{
unsigned long cons_pos, prod_pos;
cons_pos = smp_load_acquire(&rb->consumer_pos);
prod_pos = smp_load_acquire(&rb->producer_pos);
return prod_pos - cons_pos;
}
BPF_CALL_2(bpf_ringbuf_query, struct bpf_map *, map, u64, flags)
{
struct bpf_ringbuf *rb;
rb = container_of(map, struct bpf_ringbuf_map, map)->rb;
switch (flags) {
case BPF_RB_AVAIL_DATA:
return ringbuf_avail_data_sz(rb);
case BPF_RB_RING_SIZE:
return ringbuf_total_data_sz(rb);
case BPF_RB_CONS_POS:
return smp_load_acquire(&rb->consumer_pos);
case BPF_RB_PROD_POS:
return smp_load_acquire(&rb->producer_pos);
default:
return 0;
}
}
|
其中的 smp_load_acquire()
涉及到内存屏障,会有一定的开销。
参考:LINUX KERNEL MEMORY BARRIERS.
ringbuf 的大小要求
在使用 ringbuf 时,max_entries
必须是 2 的幂次方、而且还要求是 PAGE_SIZE 的倍数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
// https://github.com/torvalds/linux/blob/5be63fc19fcaa4c236b307420483578a56986a37/kernel/bpf/ringbuf.c#L189
static struct bpf_map *ringbuf_map_alloc(union bpf_attr *attr)
{
struct bpf_ringbuf_map *rb_map;
if (attr->map_flags & ~RINGBUF_CREATE_FLAG_MASK)
return ERR_PTR(-EINVAL);
if (attr->key_size || attr->value_size ||
!is_power_of_2(attr->max_entries) ||
!PAGE_ALIGNED(attr->max_entries))
return ERR_PTR(-EINVAL);
rb_map = bpf_map_area_alloc(sizeof(*rb_map), NUMA_NO_NODE);
if (!rb_map)
return ERR_PTR(-ENOMEM);
bpf_map_init_from_attr(&rb_map->map, attr);
rb_map->rb = bpf_ringbuf_alloc(attr->max_entries, rb_map->map.numa_node);
if (!rb_map->rb) {
bpf_map_area_free(rb_map);
return ERR_PTR(-ENOMEM);
}
return &rb_map->map;
}
|
总结
记住以下 3 点经验:
- 使用
bpf_ringbuf_reserve()
时,要注意锁的开销。
- 可以通过
bpf_ringbuf_query()
查询 ringbuf 的未消费数据量,从而推算出可以用来塞数据的空间大小。
- ringbuf 的
max_entries
必须是 2 的幂次方、而且还要求是 PAGE_SIZE 的倍数。