spinlock 指的是 eBPF 代码里使用的 struct bpf_spin_lock

eBPF Talk: 正确地进行统计 里学习到可以使用 spinlock 对统计进行保护。

spinlock 的用法就是那么简单。

但是,spinlock 真的那么简单吗?

1. 禁止读写 spinlock 属性

按照正常的用法,spinlock 属性就是用来 bpf_spin_lock()bpf_spin_unlock() 的,不会再作它用。

可是,如果在 eBPF 程序里对 spinlock 属性进行读写,会发生什么呢?

答案是:在校验阶段报错 “bpf_spin_lock cannot be accessed directly by load/store”。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// ${KERNEL}/kernel/bpf/verifier.c

/* check read/write into a map element with possible variable offset */
static int check_map_access(struct bpf_verifier_env *env, u32 regno,
                int off, int size, bool zero_size_allowed,
                enum bpf_access_src src)
{
    // ...

        if (map_value_has_spin_lock(map)) {
        u32 lock = map->spin_lock_off;

        /* if any part of struct bpf_spin_lock can be touched by
         * load/store reject this program.
         * To check that [x1, x2) overlaps with [y1, y2)
         * it is sufficient to check x1 < y2 && y1 < x2.
         */
        if (reg->smin_value + off < lock + sizeof(struct bpf_spin_lock) &&
             lock < reg->umax_value + off + size) {
            verbose(env, "bpf_spin_lock cannot be accessed directly by load/store\n");
            return -EACCES;
        }
    }

    // ...
}

2. 不使用 BTF 时,能使用 spinlock 吗?

不可以。

BTF 信息用于校验阶段;在校验时,需要根据 BTF 信息计算 struct bpf_spin_lock 属性的 offset。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// ${KERNEL}/kernel/bpf/verifier.c

static int process_spin_lock(struct bpf_verifier_env *env, int regno,
                 bool is_lock)
{
    // ...

    if (!map->btf) {
        verbose(env,
            "map '%s' has to have BTF in order to use bpf_spin_lock\n",
            map->name);
        return -EINVAL;
    }

    // ...
}

3. 是否可以在 spinlock 里使用另一个 spinlock 呢?

不可以。

为了预防死锁,当前版本里不允许使用多个 spinlock

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// ${KERNEL}/kernel/bpf/verifier.c
static int process_spin_lock(struct bpf_verifier_env *env, int regno,
                 bool is_lock)
{
    // ...

    if (is_lock) {
        if (cur->active_spin_lock) {
            verbose(env,
                "Locking two bpf_spin_locks are not allowed\n");
            return -EINVAL;
        }
        cur->active_spin_lock = reg->id;
    }

    // ...
}

4. kprobe 里可以使用 spinlock 吗?

不可以。

因为无法充分检查 kprobe bpf prog 的抢占情况,所以目前不允许在 kprobe 里使用 spinlock

当前,以下 bpf prog 类型都不允许使用 spinlock

  • BPF_PROG_TYPE_SOCKET_FILTER
  • BPF_PROG_TYPE_KPROBE
  • BPF_PROG_TYPE_TRACEPOINT
  • BPF_PROG_TYPE_PERF_EVENT
  • BPF_PROG_TYPE_RAW_TRACEPOINT
  • BPF_PROG_TYPE_RAW_TRACEPOINT_WRITABLE
  • sleepable bpf prog(fentry.s/fmod_ret.s/fexit.s/lsm.s/iter.s
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// ${KERNEL}/kernel/bpf/verifier.c

static bool is_tracing_prog_type(enum bpf_prog_type type)
{
    switch (type) {
    case BPF_PROG_TYPE_KPROBE:
    case BPF_PROG_TYPE_TRACEPOINT:
    case BPF_PROG_TYPE_PERF_EVENT:
    case BPF_PROG_TYPE_RAW_TRACEPOINT:
    case BPF_PROG_TYPE_RAW_TRACEPOINT_WRITABLE:
        return true;
    default:
        return false;
    }
}

static int check_map_prog_compatibility(struct bpf_verifier_env *env,
                    struct bpf_map *map,
                    struct bpf_prog *prog)

{
    enum bpf_prog_type prog_type = resolve_prog_type(prog);

    if (map_value_has_spin_lock(map)) {
        if (prog_type == BPF_PROG_TYPE_SOCKET_FILTER) {
            verbose(env, "socket filter progs cannot use bpf_spin_lock yet\n");
            return -EINVAL;
        }

        if (is_tracing_prog_type(prog_type)) {
            verbose(env, "tracing progs cannot use bpf_spin_lock yet\n");
            return -EINVAL;
        }

        if (prog->aux->sleepable) {
            verbose(env, "sleepable progs cannot use bpf_spin_lock yet\n");
            return -EINVAL;
        }
    }

    // ...
}

spinlock 的限制与安全检查

P.S. 以下内容翻译自 “bpf: introduce bpf_spin_lock” 的 commit message。

  • bpf_spin_lock 只允许在 HASH 和 ARRAY maps 里使用。
  • 为了安全分析,BTF 信息是强制性带上的。
  • bpf prog 里一次只能使用一个 bpf_spin_lock,因为多个会导致死锁。
  • 每个 map 元素只允许有一个 struct bpf_spin_lock。不过,允许 bpf prog 使用任意数量的 bpf_spin_lock 是非常非常容易实现的。
  • bpf_spin_lock 使用时,不允许使用函数调用(包括 bpf2bpf、帮助函数)。
  • bpf prog 必须在 return 前 bpf_spin_unlock()
  • bpf prog 可以通过 bpf_spin_lock()/bpf_spin_unlock() 访问 struct bpf_spin_lock
  • 不允许读写 struct bpf_spin_lock lock; 属性。
  • 为了使用 bpf_spin_lock() 帮助函数,map value 的 BTF 信息必须是一个 struct,而且 struct bpf_spin_lock anyname; 属性需要在最外层。不允许将 bpf_spin_lock 内嵌到其它 struct 中。
  • “map_lookup” 系统调用不能将 bpf_spin_lock 复制到用户态。
  • “map_update” 系统调用和帮助函数不能更新 bpf_spin_lock 属性。
  • bpf_spin_lock 不能在栈上,也不能在网络包中。bpf_spin_lock 只能在 HASH 或者 ARRAY map value 中。
  • bpf_spin_lock 对 root 用户和所有程序类型开放。
  • 不允许在 map-in-map 的内层 map 里使用 bpf_spin_lock
  • bpf_spin_lock 临界区内不允许使用 “ld_abs”。
  • 因为缺乏抢占检查,不允许在 tracing 程序和 socket filter 程序中使用 bpf_spin_lock

小结

使用起来比较简单的东西,其背后往往比较复杂;spinlock 就是其中之一。

附:bpf_spin_lock()bpf_spin_unlock() 的源代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
// ${KERNEL}/kernel/bpf/helpers.c

#if defined(CONFIG_QUEUED_SPINLOCKS) || defined(CONFIG_BPF_ARCH_SPINLOCK)

static inline void __bpf_spin_lock(struct bpf_spin_lock *lock)
{
    arch_spinlock_t *l = (void *)lock;
    union {
        __u32 val;
        arch_spinlock_t lock;
    } u = { .lock = __ARCH_SPIN_LOCK_UNLOCKED };

    compiletime_assert(u.val == 0, "__ARCH_SPIN_LOCK_UNLOCKED not 0");
    BUILD_BUG_ON(sizeof(*l) != sizeof(__u32));
    BUILD_BUG_ON(sizeof(*lock) != sizeof(__u32));
    arch_spin_lock(l);
}

static inline void __bpf_spin_unlock(struct bpf_spin_lock *lock)
{
    arch_spinlock_t *l = (void *)lock;

    arch_spin_unlock(l);
}

#else

static inline void __bpf_spin_lock(struct bpf_spin_lock *lock)
{
    atomic_t *l = (void *)lock;

    BUILD_BUG_ON(sizeof(*l) != sizeof(*lock));
    do {
        atomic_cond_read_relaxed(l, !VAL);
    } while (atomic_xchg(l, 1));
}

static inline void __bpf_spin_unlock(struct bpf_spin_lock *lock)
{
    atomic_t *l = (void *)lock;

    atomic_set_release(l, 0);
}

#endif

static DEFINE_PER_CPU(unsigned long, irqsave_flags);

static inline void __bpf_spin_lock_irqsave(struct bpf_spin_lock *lock)
{
    unsigned long flags;

    local_irq_save(flags);
    __bpf_spin_lock(lock);
    __this_cpu_write(irqsave_flags, flags);
}

notrace BPF_CALL_1(bpf_spin_lock, struct bpf_spin_lock *, lock)
{
    __bpf_spin_lock_irqsave(lock);
    return 0;
}

const struct bpf_func_proto bpf_spin_lock_proto = {
    .func        = bpf_spin_lock,
    .gpl_only    = false,
    .ret_type    = RET_VOID,
    .arg1_type   = ARG_PTR_TO_SPIN_LOCK,
};

static inline void __bpf_spin_unlock_irqrestore(struct bpf_spin_lock *lock)
{
    unsigned long flags;

    flags = __this_cpu_read(irqsave_flags);
    __bpf_spin_unlock(lock);
    local_irq_restore(flags);
}

notrace BPF_CALL_1(bpf_spin_unlock, struct bpf_spin_lock *, lock)
{
    __bpf_spin_unlock_irqrestore(lock);
    return 0;
}

const struct bpf_func_proto bpf_spin_unlock_proto = {
    .func        = bpf_spin_unlock,
    .gpl_only    = false,
    .ret_type    = RET_VOID,
    .arg1_type   = ARG_PTR_TO_SPIN_LOCK,
};