奔跑吧Linux内核(第2版)卷2:调试与案例分析

978-7-115-55252-5
作者: 笨叔
译者:
编辑: 谢晓芳

图书目录:

详情

本书基于Linux 5.0内核的源代码讲述Linux内核的调试技巧和案例。本书共6章。主要内容包括并发与同步,中断管理,内核调试和性能优化,基于x86_64的宕机难题解决方案,基于ARM64的宕机题解决方案,安全漏洞的产生原理与修复方案等。 本书适合从事Linux系统开发人员、嵌入式系统开发人员及Android开发人员阅读,也可供计算机相关专业的师生阅读。

图书摘要

版权信息

书名:奔跑吧Linux内核(第2版)卷2:调试与案例分析

ISBN:978-7-115-55252-5

本书由人民邮电出版社发行数字版。版权所有,侵权必究。

您购买的人民邮电出版社电子书仅供您个人使用,未经授权,不得以任何方式复制和传播本书内容。

我们愿意相信读者具有这样的良知和觉悟,与我们共同保护知识产权。

如果购买者有侵权行为,我们可能对该用户实施包括但不限于关闭该帐号等维权措施,并可能追究法律责任。


著    笨 叔

责任编辑 谢晓芳

人民邮电出版社出版发行  北京市丰台区成寿寺路11号

邮编 100164  电子邮件 315@ptpress.com.cn

网址 http://www.ptpress.com.cn

读者服务热线:(010)81055410

反盗版热线:(010)81055315


本书基于Linux 5.0内核的源代码讲述Linux内核的调试技巧和案例。本书共6章,主要内容包括并发与同步,中断管理,内核调试与性能优化,基于x86_64解决宕机难题,基于ARM64解决宕机难题,安全漏洞分析等。

本书适合从事Linux系统开发人员、嵌入式系统开发人员及Android开发人员阅读,也可供计算机相关专业的师生阅读。


2017年本书第1版出版后得到了广大Linux开发人员和开源工程师的喜爱。2019年3月3日,Linux内核创始人Linus Torvalds在社区正式宣布了Linux 5.0内核的发布。Linus Torvalds在邮件列表里提到Linux 5.0并不是一个大幅修改和具有很多新特性的版本。然而,因为Linux 4.20内核的次版本号太大了,所以才发布了Linux 5.x内核。但是Linux内核的开发速度并没有因此而变慢,依然每隔两个多月会发布一个新版本,新的版本支持更多的硬件和特性。从本书第1版采用的Linux 4.0内核到Linux 5.0内核经历了20个版本,Linux 5.0内核中新增了很多特性并且很多内核的实现已经发生了很大的变化。

最近两年,我国操作系统和开源软件的研究氛围越来越浓厚,很多大公司开始基于Linux内核打造自己的操作系统,包括手机操作系统、服务器操作系统、IoT(物联网)嵌入式系统等。另外,我国很多公司开始探索使用ARM64架构来构建自己的硬件生态系统,包括手机芯片、服务器芯片等。因此,作者觉得很有必要基于Linux 5.0内核修订本书第1版。第2版的修订工作非常艰辛,工作量巨大,修订工作持续整整一年。作者对第1版进行了大幅度的修订,删除了部分过时的内容,新增了很多实用的内容。由于篇幅较长,因此本书第2版分成卷1和卷2两本书。

卷1重点介绍ARM64架构、Linux内核的内存管理、进程管理等,卷2重点介绍调试技巧和案例分析,如Linux内核调试、性能优化、宕机难题的解决方案以及安全漏洞分析等内容。

第2版的新特性如下。

第2版完全基于Linux 5.0内核来讲解。Linux 5.0内核中,不少重要模块(如绿色调度器、自旋锁等)的实现相对于Linux 4.0已经发生了天翻地覆的变化。同时,Linux 5.0内核中修复了Linux 4.0内核中的很多故障,比如KSM导致的虚拟机宕机故障等。由于ARM64架构和x86_64架构是目前主流的处理器架构,因此本书主要基于ARM64/x86_64架构来讲解Linux 5.0内核的实现。很多内核模块的实现与架构的相关性很低,因此本书也适合使用其他架构的读者阅读。在目前服务器领域,大部分厂商依然使用x86_64架构加上Red Hat或者Ubuntu Linux企业发行版的方案,因此卷2第4章会介绍x86_64架构服务器的宕机修复案例。

第2版新增了很多实战案例,例如,卷1在内存管理方面新增了4个实战案例,这些案例都是从实际项目中提取出来的,对读者提升实战能力有非常大的帮助。另外,卷2还新增了解决宕机难题的实战案例。在实际项目中,我们常常会遇到系统宕机(如手机宕机、服务器宕机等),因此本书总结了多个宕机案例,利用Kdump+Crash工具来详细分析如何解决宕机难题。考虑到部分读者使用ARM64处理器做产品开发,部分读者在x86_64架构的服务器上做运维和性能调优等工作,因此本书分别介绍了针对这两个架构的处理器如何快速解决宕机问题。

2019年出现的CPU熔断和CPU“幽灵”漏洞牵动了全球开发人员的心,了解这两个漏洞对读者熟悉计算机架构和Linux内核的相关实现非常有帮助,因此,卷2的第6章会详细分析这两个漏洞的攻击原理和Linux内核修复方案。

卷2新增了很多内核调试和优化技巧。Linux内核通过proc和sysfs提供了很多有用的日志信息。在内存管理、调优过程中,可以通过内核提供的日志信息(如meminfo、zone等)快速了解和分析系统内存并进行内核调试与优化。卷2的第3章里新增了性能优化的内容,如使用perf工具以及eBPF/BCC来进行性能分析等。

在第1版出版后,部分读者反馈粘贴的代码太多。在第2版中,我们尽可能不粘贴代码或者只列出少量核心代码,这样可以用更多的篇幅来介绍新内容。为了分析Linux内核的原理,第2版比第1版新增了很多插图和表格。

卷1的第1章和第2章详细介绍了ARM64架构,这部分内容包括ARM64指令集、ARM64寄存器、页表、内存管理、TLB、内存屏障等。

第2版中卷2新增的内容如下。

本书主要介绍Linux内核中的并发和同步、中断管理、内核调试与性能优化、宕机难题的解决方案以及安全漏洞的攻击原理和修复方案等内容。本书的侧重点是实践以及案例分析。

本书共6章。每一章的主要内容如下。

第1章介绍并发与同步,包括原子操作、内存屏障、经典自旋锁、MCS锁、排队自旋锁、信号量、互斥锁、读写锁、读写信号量、RCU等。

第2章介绍中断管理,包括中断控制器、硬件中断号和Linux中断号的映射、注册中断、ARM64底层中断处理、ARM64高层中断处理、软中断、tasklet、工作队列等。

第3章介绍内核调试与性能优化,包括ARM64实验平台的打造、ftrace工具、内存检测、死锁检测、内核调试方法、perf工具、SystemTap工具、eBPF与BCC等内容。

第4章讲述Kdump工具、Crash工具、crash命令、死锁检查机制等,并展示6个基于x86-64的宕机案例。

第5章介绍Kdump实验环境的搭建,并展示4个基于ARM64的宕机案例。

第6章分析安全漏洞,包括侧信道攻击的原理、CPU熔断漏洞、CPU“幽灵”漏洞的攻击原理和修复方案等内容。

由于作者知识水平有限,书中难免存在纰漏,敬请各位读者批评指正。作者的邮箱是runninglinuxkernel@126.com。也欢迎扫描下方的二维码,在微信公众号中提问和交流。

笨 叔


在编写本书的过程中,我得到了众多Linux开发人员的热心帮助,其中王龙、彭东林、龙小昆、张毅峰、郑琦等审阅了大部分章节,提出了很多修改意见。另外,陈宝剑、周明朋、刘新朋、周明华、席贵峰、张文博、时洋、藏春鑫、艾强、胡茂留、郭述帆、陈启武、陈国龙、陈胡冠申、马福元、郭健、蔡琛、梅赵云、倪晓、刘新鹏、梁嘉荣、何花、陈渝、沈友进等审阅了部分章节,感谢这些人的热心帮助。

感谢西安邮电大学的陈莉君老师,她非常关注本书的修订工作,并且提供了很多的帮助。同时,感谢陈老师的几位研究生,他们利用寒假帮忙审阅全部书稿,提出了很多有建设性的修改意见。他们是戴君宜、梁金荣、贺东升、张孝家、白嘉庆、薛晓雯、马明慧以及崔鹏程。感谢南京大学的夏耐老师在教学方面提出的建议。

同时感谢人民邮电出版社的各位编辑的大力支持。最后感谢家人对我的支持和鼓励,虽然周末时间我都在忙于写作本书,但是他们总是给我无限的温暖。


为了帮助读者更好地阅读本书,我们针对本书做一些约定。

1.内核版本

本书主要讲解Linux内核中核心模块的实现,因此以Linux 5.0内核为研究对象。读者可以从Linux内核官网上下载Linux 5.0内核的源代码。在Linux主机中通过如下命令来下载。

$ wget https://mirrors.edge.kernel.org/pub/linux/kernel/v5.x/linux-5.0.tar.xz

$ tar -Jxf linux-5.0.tar.xz

读者可以使用Source Insight或者Vim工具来阅读源代码。Source Insight是收费软件,需自行购买授权。Vim是开源软件,可以在Linux发行版中安装。关于如何使用Vim来阅读Linux内核源代码,请参考《奔跑吧Linux内核入门篇》。

2.代码示例和讲解方式

为了避免篇幅过长、内容过多,本书尽量不展示源代码,只展示关键性代码片段,甚至不粘贴代码。我们根据不同情况采用如下两种方式来讲解代码。

1)不展示代码

本书讲解的绝大部分代码是Linux 5.0内核的源代码,因此我们根据源代码实际的行号来讲解。例如,对于__alloc_pages_nodemask()函数的实现,本书会采用如下方式来显示。

<mm/page_alloc.c>

struct page *
__alloc_pages_nodemask(gfp_tgfp_mask, unsigned int order, intpreferred_nid,nodemask_t *nodemask)

“<mm/page_alloc.c>”表示该函数实现在mm/page_alloc.c文件中,接下来列出了该函数的定义。

对于这类代码,需要读者在计算机上打开源代码文件,如__alloc_pages_nodemask()函数定义在第4516~4517行(见图0.1)。

▲图0.1 __alloc_pages_nodemask()函数的源代码

这种不展示代码的讲解方式主要针对Linux内核的C代码。

2)展示关键代码

这种方式是指给出关键代码并且给出行号,行号从1开始,而非源代码中的实际行号。如本书在讲解el2_setup汇编函数时显示了代码的路径、关键代码以及行号。

<arch/arm64/kernel/head.S>

1    ENTRY(el2_setup)
2        msr    SPsel, #1            
3        mrs    x0, CurrentEL
4        cmp    x0, #CurrentEL_EL2
5        b.eq    1f
6        mov_q    x0, (SCTLR_EL1_RES1 | ENDIAN_SET_EL1)
7        msr    sctlr_el1, x0
8        mov    w0, #BOOT_CPU_MODE_EL1 
9        isb
10       ret
11
12    1: mov_q    x0, (SCTLR_EL2_RES1 | ENDIAN_SET_EL2)
13       msr    sctlr_el2, x0
14    ...

这种方式主要用于讲解汇编代码和一些不在Linux内核中的示例代码。

3.实验平台

本书主要基于ARM64架构来讲解,但是会涉及x86_64架构方面的一部分内容,比如第4章讲解了企业服务器方面的宕机问题的解决方案,因为目前大部分企业服务器采用x86_64处理器和CentOS。本书基于QEMU虚拟机与Debian根文件系统的实验平台讲述,它有如下新特性。

读者可以通过https://benshushu.coding.net/public/runninglinuxkernle_5.0/runninglinuxkernel_5.0/ git/files或者https:github.com/figozhang/runninglinuxkernel_5.0下载本书配套的源代码。

本书推荐使用的实验环境如下。

4.补丁说明

本书在讲解实际代码时会在脚注里列举一些关键的补丁,阅读这些补丁的代码有助于帮助读者理解Linux内核的源代码。建议读者下载官方Linux内核的代码。下载命令如下。

$ git clone <a>https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git</a>

$ cd linux
$ git reset v5.0 ––hard

列举的补丁格式如下。

Linux 5.0 patch, commit: 679db70,"arm64: entry: Place an SB sequence following an ERET instruction".

上述代码表示该补丁是在Linux 5.0内核中加入的补丁,读者可以通过“git show 679db70”命令来查看该补丁,该补丁的标题是“arm64: entry: Place an SB sequence following an ERET instruction”。

5.指令集的大小写

在ARM官方芯片手册里,指令使用大写形式;而在Linux内核源代码中,指令使用小写形式。在GNU汇编器中可以混用大小写,因此本书在描述汇编指令时不区分大小写字母。


本书由异步社区出品,社区(https://www.epubit.com/)为您提供后续服务。

作者和编辑尽最大努力来确保书中内容的准确性,但难免会存在疏漏。欢迎您将发现的问题反馈给我们,帮助我们提升图书的质量。

当您发现错误时,请登录异步社区,按书名搜索,进入本书页面,单击“提交勘误”,输入勘误信息,单击“提交”按钮即可(见下图)。本书的作者和编辑会对您提交的勘误进行审核,确认并接受后,您将获赠异步社区的100积分。积分可用于在异步社区兑换优惠券、样书或奖品。

我们的联系邮箱是contact@epubit.com.cn。

如果您对本书有任何疑问或建议,请您发邮件给我们,并请在邮件标题中注明本书书名,以便我们更高效地做出反馈。

如果您有兴趣出版图书、录制教学视频,或者参与图书翻译、技术审校等工作,可以发邮件给我们;有意出版图书的作者也可以到异步社区在线投稿(直接访问www.epubit.com/contribute即可)。

如果您所在学校、培训机构或企业想批量购买本书或异步社区出版的其他图书,也可以发邮件给我们。

如果您在网上发现有针对异步社区出品图书的各种形式的盗版行为,包括对图书全部或部分内容的非授权传播,请您将怀疑有侵权行为的链接通过邮件发送给我们。您的这一举动是对作者权益的保护,也是我们持续为您提供有价值的内容的动力之源。

“异步社区”是人民邮电出版社旗下IT专业图书社区,致力于出版精品IT图书和相关学习产品,为作译者提供优质出版服务。异步社区创办于2015年8月,提供大量精品IT图书和电子书,以及高品质技术文章和视频课程。更多详情请访问异步社区官网https://www.epubit.com。

“异步图书”是由异步社区编辑团队策划出版的精品IT专业图书的品牌,依托于人民邮电出版社近30年的计算机图书出版积累和专业编辑团队,相关图书在封面上印有异步图书的LOGO。异步图书的出版领域包括软件开发、大数据、人工智能、测试、前端、网络技术等。

异步社区

微信服务号


 

1.在ARM64处理器中,如何实现独占访问内存?

2.atomic_cmpxchg()和atomic_xchg()分别表示什么含义?

3.在ARM64中,CAS指令包含了加载-获取和存储-释放指令,它们的作用是什么?

4.atomic_try_cmpxchg()函数和atomic_cmpxchg()函数有什么区别?

5.cmpxchg_acquire()函数、cmpxchg_release()函数、cmpxchg_relaxed()函数以及cmpxchg()函数的区别是什么?

6.请举例说明内核使用内存屏障的场景。

7.smp_cond_load_relaxed()函数的作用和使用场景是什么?

8.smp_mb__before_atomic()函数和smp_mb__after_atomic()函数的作用和使用场景是什么?

9.为什么自旋锁的临界区不能睡眠(不考虑RT-Linux的情况)?

10.Linux内核中经典自旋锁的实现有什么缺点?

11.为什么自旋锁的临界区不允许发生抢占?

12.基于排队的自旋锁机制是如何实现的?

13.如果在spin_lock()和spin_unlock()的临界区中发生了中断,并且中断处理程序也恰巧修改了该临界区,那么会发生什么后果?该如何避免呢?

14.排队自旋锁是如何实现MCS锁的?

15.排队自旋锁把32位的变量划分成几个域,每个域的含义和作用是什么?

16.假设CPU0先持有了自旋锁,接着CPU1、CPU2、CPU3都加入该锁的争用中,请阐述这几个CPU如何获取锁,并画出它们申请锁的流程图。

17.与自旋锁相比,信号量有哪些特点?

18.请简述信号量是如何实现的。

19.乐观自旋等待的判断条件是什么?

20.为什么在互斥锁争用中进入乐观自旋等待比睡眠等待模式要好?

21.假设CPU0~CPU3同时争用一个互斥锁,CPU0率先申请了互斥锁,然后CPU1也加入锁的申请。CPU1在持有锁期间会进入睡眠状态。然后CPU2和CPU3陆续加入该锁的争用中。请画出这几个CPU争用锁的时序图。

22.Linux内核已经实现了信号量机制,为何要单独设置一个互斥锁机制呢?

23.请简述MCS锁机制的实现原理。

24.在编写内核代码时,该如何选择信号量和互斥锁?

25.什么时候使用读者锁?什么时候使用写者锁?怎么判断?

26.读写信号量使用的自旋等待机制是如何实现的?

27.RCU相比读写锁有哪些优势?

28.请解释静止状态和宽限期。

29.请简述RCU实现的基本原理。

30.在大型系统中,经典RCU遇到了什么问题?Tree RCU又是如何解决该问题的?

31.在RCU实现中,为什么要使用ULONG_CMP_GE()和ULONG_CMP_LT()宏来比较两个数的大小,而不直接使用大于号或者小于号来比较?

32.请简述一个宽限期的生命周期及其状态机的变化。

33.请阐述原子操作、自旋锁、信号量、互斥锁以及RCU的特点和使用规则。

34.在KSM中扫描某个VMA以寻找有效的匿名页面时,假设此VMA恰巧被其他CPU销毁了,会不会有问题呢?

35.请简述PG_locked的常见使用方法。

36.在mm/rmap.c文件中的page_get_anon_vma()函数中,为什么要使用rcu_read_lock()函数?什么时候注册RCU回调函数呢?

37.在mm/oom_kill.c的select_bad_process()函数中,为什么要使用rcu_read_lock()函数?什么时候注册RCU回调函数呢?

 

编写内核代码或驱动代码时需要留意共享资源的保护,防止共享资源被并发访问。并发访问是指多个内核代码路径同时访问和操作数据,这可能发生相互覆盖共享数据的情况,造成被访问数据的不一致。内核代码路径可以是一个内核执行路径、中断处理程序或者内核线程等。并发访问可能会造成系统不稳定或产生错误,且很难跟踪和调试。

在早期不支持对称多处理器(Symmetric Multiprocessor,SMP)的Linux内核中,导致并发访问的因素是中断服务程序。只有中断发生时,或者内核代码路径显式地要求重新调度并且执行另外一个进程时,才可能发生并发访问。在支持SMP的Linux内核中,在不同CPU中并发执行的内核线程完全可能在同一时刻并发访问共享数据,并发访问随时都可能发生。特别是现在的Linux内核早已经支持内核抢占,调度器可以抢占正在执行的进程,重新调度其他进程。

在计算机术语中,临界区(critical region)是指访问和操作共享数据的代码段,其中的资源无法同时被多个执行线程访问,访问临界区的执行线程或代码路径称为并发源。为了避免并发访问临界区,开发者必须保证访问临界区的原子性,即在临界区内不能有多个并发源同时执行,整个临界区就像一个不可分割的整体。

在内核中产生并发访问的并发源主要有如下4种。

上述情况需要针对单核和多核系统区别对待。对于单处理器系统,主要有如下并发源。

对于SMP系统,情况会更复杂。

如进程上下文在操作某个临界区中的资源时发生了中断,恰巧在对应中断处理程序中也访问了这个资源。如果不使用内核同步机制来保护,那么可能会发生并发访问的 bug。如果进程上下文正在访问和修改临界区中的资源时发生了抢占调度,可能会发生并发访问的bug。如果在自旋锁的临界区中主动睡眠以让出CPU,那这也可能是一个并发访问的bug。如果两个CPU同时修改临界区中的一个资源,那这也可能是一个bug。在实际项目中,真正困难的是如何发现内核代码存在并发访问的可能性并采取有效的保护措施。因此在编写代码时,应该考虑哪些资源位于临界区,应该采取哪些保护机制。如果在代码设计完成之后再回溯查找哪些资源需要保护,会非常困难。

在复杂的内核代码中找出需要保护的资源或数据是一件不容易的事情。任何可能被并发访问的数据都需要保护。那究竟什么样的数据需要保护呢?如果从多个内核代码路径可能访问某些数据,那就应该对这些数据加以保护。记住,要保护资源或数据,而不是保护代码。保护对象包括静态局部变量、全局变量、共享的数据结构、缓存、链表、红黑树等各种形式所隐含的数据。在实际内核代码和驱动的编写过程中,关于数据需要做如下一些思考。

Linux内核提供了多种并发访问的保护机制,如原子操作、自旋锁、信号量、互斥锁、读写锁、RCU等,本章将详细分析这些机制的实现。了解Linux内核中的各种保护机制只是第一步,重要的是要思考清楚哪些地方是临界区,该用什么机制来保护这些临界区。1.11节将以内存管理为例来探讨锁的运用。

原子操作是指保证指令以原子的方式执行,执行过程不会被打断。在如下代码片段中,假设thread_A_func和thread_B_func都尝试进行i++操作,请问thread_A_func和thread_B_func执行完后,i的值是多少?

static int i =0;

//线程A函数
void thread_A_func()
{
     i++;
}

//线程B函数
void thread_B_func()
{
     i++;
}

有的读者可能认为是2,但也可能不是2,代码的执行过程如下。

      CPU0                               CPU1
-------------------------------------------------------------------
  thread_A_func
    load i= 0                          thread_B_func
                                          Load i=0
     i++
                                           i++
     store i (i=1)
                                         store i (i=1)

从上面的代码执行过程来看,最终结果可能等于1。因为变量i是临界区的一个,CPU0和CPU1可能同时访问,发生并发访问。从CPU角度来看,变量i是一个静态全局变量,存储在数据段中,首先读取变量的值并存储到通用寄存器中,然后在通用寄存器里做i++运算,最后把寄存器的数值写回变量i所在的内存中。在多处理器架构中,上述动作可能同时进行。如果thread_B_func在某个中断处理函数中执行,在单处理器架构上依然可能会发生并发访问。

针对上述例子,有的读者认为可以使用加锁的方式,如使用自旋锁来保证i++操作的原子性,但是加锁操作会导致比较大的开销,用在这里有些浪费。Linux内核提供了atomic_t类型的原子变量,它的实现依赖于不同的架构。atomic_t类型的具体定义为如下。

<include/linux/types.h>

typedef struct {
    int counter;
} atomic_t;

atomic_t类型的原子操作函数可以保证一个操作的原子性和完整性。在内核看来,原子操作函数就像一条汇编语句,保证了操作时不会被打断,如上述的i++语句就可能被打断。要保证操作的完整性和原子性,通常需要“原子地”(不间断地)完成“读-修改-回写”机制,中间不能被打断。在下述过程中,如果其他CPU同时对该原子变量进行写操作,则会影响数据完整性。

(1)读取原子变量的值。

(2)修改原子变量的值。

(3)把新值写回内存中。

在读取原子变量的值、修改原子变量的值、把新值写入内存的过程中,处理器必须提供原子操作的汇编指令来完成上述操作,如ARM64处理器提供cas指令,x86处理器提供cmpxchg指令。

Linux内核提供了很多操作原子变量的函数。

1.基本原子操作函数

Linux内核提供最基本的原子操作函数包括atomic_read()函数和atomic_set()函数。

<include/asm-generic/atomic.h>

#define ATOMIC_INIT(i)  //原子变量初始化为i
#define atomic_read(v)  //读取原子变量的值
#define atomic_set(v,i) //设置变量v的值为i

上述两个函数直接调用READ_ONCE()宏或者WRITE_ONCE()宏来实现,不包括“读-修改-回写”机制,直接使用上述函数容易引发并发访问。

2.不带返回值的原子操作函数

不带返回值的原子操作函数如下。

上述函数会实现“读-修改-回写”机制,可以避免多处理器并发访问同一个原子变量带来的并发问题。在不考虑具体架构优化问题的条件下,上述函数会调用指令cmpxchg来实现。以atomic_{add,sub,inc,dec}()函数为例,它实现在include/asm-generic/atomic.h文件中。

<include/asm-generic/atomic.h>

#define ATOMIC_OP(op, c_op)                     \
static inline void atomic_##op(int i, atomic_t *v)           \
{                                   \
     int c, old;                        \
                                    \
     c = v->counter;                       \
     while ((old = cmpxchg(&v->counter, c, c c_op i)) != c)    \
          c = old;                       \
}

3.带返回值的原子操作函数

Linux内核提供了两类带返回值的原子操作函数,一类返回原子变量的新值,另一类返回原子变量的旧值。

返回原子变量新值的原子操作函数如下。

返回原子变量旧值的原子操作函数如下。

上述两类原子操作函数都使用cmpxchg指令来实现“读-修改-回写”机制。

4.原子交换函数

Linux内核提供了一类原子交换函数。

5.处理引用计数的原子操作函数

Linux内核提供了一组处理引用计数原子操作函数。

上述原子操作函数在内核代码中很常见,特别是对一些引用计数进行操作,如page的_refcount和_mapcount。

6.内嵌内存屏障原语的原子操作函数

Linux内核提供了一组内嵌内存屏障原语的原子操作函数。

关于加载-获取内存屏障原语和存储-释放内存屏障原语,请参考卷1。以atomic_cmpxchg()函数为例,内嵌内存屏障原语的变体包括atomic_cmpxchg_relaxed(v, old, new)、atomic_cmpxchg_acquire(v, old, new)、atomic_cmpxchg_release(v, old, new)。

atomic_add()函数通过调用cmpxchg()函数来实现“读-修改-回写”机制,保证原子变量的完整性。这个函数在不同架构中会有相应的特殊优化,如有些架构的处理器实现了特殊的原子操作指令。Linux内核根据是否支持大系统扩展(LSE)有两种实现方式,一种是使用ldxr和stxr指令的组合,另外一种是使用原子加法指令stadd。

我们先看使用ldxr和stxr指令实现的方式。

<arch/arm64/include/asm/atomic_ll_sc.h>
1     void atomic_op(int i, atomic_t *v)
2     {
3         unsigned long tmp;
4         int result;
5         
6         asm volatile(
7         "    prfm    pstl1strm, %2\n"
8         "1:    ldxr    %w0, %2\n"
9         "    add    %w0, %w0, %w3\n"
10        "    stxr    %w1, %w0, %2\n"
11        "    cbnz    %w1, 1b"
12        : "=&r" (result), "=&r" (tmp), "+Q" (v->counter)
13        : "Ir" (i));    
14    }

在第6~13行中,通过内嵌汇编的方式来实现atomic_add功能。

在第7行中,通过prfm指令提前预取v->counter。

在第8行中,通过ldxr独占加载指令来加载v->counter的值到result变量中,该指令会标记v->counter的地址为独占。

在第9行中,通过add指令使v->counter的值加上变量i的值。

在第10行中,通过stxr独占存储指令来把最新的v->counter的值写入v->counter地址处。

在第11行中,判断tmp的值。如果tmp的值为0,说明stxr指令存储成功;否则,存储失败。如果存储失败,那只能跳转到第8行重新使用ldxr指令。

在第12行中,有3个输出的变量,其中,变量result和tmp具有可写属性,v->counter具有可读可写属性。

第13行表示输入,其中,变量i只有只读属性。

下面来看在支持LSE的情况下atomic_add()函数是如何实现的。

<arch/arm64/include/asm/atomic_lse.h>

1 #define ATOMIC_OP(op, asm_op)                    \
2 static inline void atomic_##op(int i, atomic_t *v)      \
3 {                                 \
4    register int w0 asm ("w0") = i;                \
5    register atomic_t *x1 asm ("x1") = v;             \
6                                   \
7    asm volatile(ARM64_LSE_ATOMIC_INSN(__LL_SC_ATOMIC(op),   \
8 "  " #asm_op "     %w[i], %[v]\n")             \
9    : [i] "+r" (w0), [v] "+Q" (v->counter)             \
10   : "r" (x1)                         \
11   : __LL_SC_CLOBBERS);                         \
12 }
13
14 ATOMIC_OP(add, stadd)

在ARMv8.1指令集中增加了原子加法指令——stadd[1]。在Linux内核中使用CONFIG_ARM64_LSE_ATOMICS宏来表示系统支持新增的指令。

在第4行中,把变量i存放到寄存器w0中。

在第5行中,把atomic_t指针v存放到寄存器x1中。

在第8行中,使用STADD指令来把变量i的值添加到v->counter中。

在第9行中,输出操作数列表,描述在指令中可以修改的C语言变量以及约束条件。其中,变量w0和v->counter都具有可读可写属性。

在第10行中,输入操作数列表,描述在指令中只能读取的C语言变量以及约束条件。其中,x1指针不能被修改。

在第11行中,改变资源列表。即告诉编译器哪些资源已修改,需要更新。

在第14行中,使用ATOMIC_OP()宏来实现atomic_add()函数,其中第二个参数stadd是新增的指令。

比较并交换(Compare and Swap)指令在无锁实现中起到非常重要的作用。原子比较并交换指令的伪代码如下。

int compare_swap(int *ptr, int expected, int new)
{
     Int actual = *ptr;
     If (actual == expected) {
          *ptr = new;
     }
     Return actual;
}

比较并交换指令的基本思路是检查ptr指向的值与expected是否相等。若相等。则把new的值赋值给ptr;否则,什么也不做。不管是否相等,最终都会返回ptr的旧值,让调用者来判断该比较和交换指令执行是否成功。

1.cas指令

ARM64处理器提供了比较并交换指令——cas指令[2]。cas指令根据不同的内存屏障属性分成4类,如表1.1所示。

表1.1 cas指令

指令

访问类型

内存屏障原语

casab

8位

加载-获取

casalb

8位

加载-获取和存储-释放

casb

8位

caslb

8位

存储-释放

casah

16位

加载-获取

casalh

16位

加载-获取和存储-释放

cash

16位

caslh

16位

存储-释放

casa

32位或者64位

加载-获取

casal

32位或者64位

加载-获取和存储-释放

cas

32位或者64位

casl

32位或者64位

存储-释放

2.cmpxchg()函数

Linux内核中常见的比较并交换函数是cmpxchg()。由于Linux内核最早是基于x86架构来实现的,x86指令集中对应的指令是CMPXCHG指令,因此Linux内核保留了该名字作为函数名。

对于ARM64架构,cmpxchg()函数定义在arch/arm64/include/asm/cmpxchg.h头文件中。

<arch/arm64/include/asm/cmpxchg.h>

#define cmpxchg(...)    __cmpxchg_wrapper( _mb, __VA_ARGS__)

cmpxchg()函数会调用__cmpxchg_wrapper()宏,这里第一个参数_mb表示同时需要加载-获取和存储-释放内存屏障原语。

__cmpxchg_wrapper()宏经过多次宏转换,它最终会调用__CMPXCHG_CASE()宏,实现在arch/arm64/include/asm/atomic_lse.h头文件中。下面以64位位宽为例。

<arch/arm64/include/asm/atomic_lse.h>

#define __CMPXCHG_CASE(w, sfx, name, sz, mb, cl...)         \
static inline u##sz __cmpxchg_case_##name##sz(volatile void *ptr,  \
                          u##sz old,    \
                          u##sz new)    \
{                                    \
     register unsigned long x0 asm ("x0") = (unsigned long)ptr;  \
     register u##sz x1 asm ("x1") = old;           \
     register u##sz x2 asm ("x2") = new;           \
                                    \
     asm volatile(ARM64_LSE_ATOMIC_INSN(           \
     /* LL/SC */                        \
     __LL_SC_CMPXCHG(name##sz)                \
     __nops(2),                         \
                                                 \
     "     mov     " #w "30, %" #w "[old]\n"   \
     "     cas" #mb #sfx "\t" #w "30, %" #w "[new], %[v]\n"  \
     "     mov     %" #w "[ret], " #w "30")       \
     : [ret] "+r" (x0), [v] "+Q" (*(unsigned long *)ptr)       \
     : [old] "r" (x1), [new] "r" (x2)               \
     : __LL_SC_CLOBBERS, ##cl);                 \
                                    \
     return x0;                        \
}
__CMPXCHG_CASE(x, , mb_, 64, al, "memory")

__CMPXCHG_CASE()宏包含6个参数。

上述宏最后会变成如下代码,函数变成__cmpxchg_case_mb_64()。

<__CMPXCHG_CASE宏展开后的代码>

1 static inline u64 __cmpxchg_case_mb_64(volatile void *ptr,
2                         u64 old,
3                         u64 new)
4 {                                
5    register unsigned long x0 asm ("x0") = (unsigned long)ptr;
6    register u64 x1 asm ("x1") = old;
7    register u64 x2 asm ("x2") = new;
8    
9    asm volatile(ARM64_LSE_ATOMIC_INSN(
10   /* LL/SC */
11   __LL_SC_CMPXCHG(mb_64)
12   __nops(2),
13   /* LSE 原子操作 */
14   "     mov     x30, %x[old]\n"
15   "     casal  x30, %x[new], %[v]\n"
16   "     mov     %x[ret], x30")
17   : [ret] "+r" (x0), [v] "+Q" (*(unsigned long *)ptr)
18   : [old] "r" (x1), [new] "r" (x2)
19   : __LL_SC_CLOBBERS, "memory");
20   
21   return x0;
22}

在第5行中,使用x0寄存器来存储ptr参数。

在第6行中,使用x1寄存器来存储old参数。

在第7行中,使用x2寄存器来存储new参数。

在第9行中,ARM64_LSE_ATOMIC_INSN()宏起到一个动态打补丁的作用。若系统配置了ARM64_HAS_LSE_ATOMICS,则执行大系统扩展(Large System Extension,LSE)的代码。ARM64_HAS_LSE_ATOMICS表示系统支持LSE原子操作的扩展指令,这是在ARMv8.1架构中实现的。本场景假设系统支持LSE特性,那么将执行第13~16行的汇编代码。

ARM64_LSE_ATOMIC_INSN()宏的代码实现如下,它利用ALTERNATIVE宏来做一个选择。如果系统定义了ARM64_HAS_LSE_ATOMICS,那么将执行第13~16行的汇编代码;如果系统没有定义ARM64_HAS_LSE_ATOMICS,那么将执行第10~12行的汇编代码。

<arch/arm64/include/asm/lse.h>

#define ARM64_LSE_ATOMIC_INSN(llsc, lse)                \
ALTERNATIVE(llsc, lse, ARM64_HAS_LSE_ATOMICS)

在第14行中,把old参数加载到x30寄存器中。

在第15行中,使用casal指令来执行比较并交换操作。比较ptr的值是否与x30的值相等,若相等,则把new的值设置到ptr中。注意,这里casal指令隐含了加载-获取和存储-释放内存屏障原语。

在第16行中,通过ret参数返回x30寄存器的值。

除了cmpxchg()函数,Linux内核还实现了多个变体,如表1.2所示,这些函数在无锁机制的实现上起到了非常重要的作用。

表1.2 cmpxchg()函数的变体

函数

描述

cmpxchg_acquire()

比较并交换操作,隐含了加载-获取内存屏障原语

cmpxchg_release()

比较并交换操作,隐含了存储-释放内存屏障原语

cmpxchg_relaxed()

比较并交换操作,不隐含任何内存屏障原语

cmpxchg()

比较并交换操作,隐含了加载-获取和存储-释放内存屏障原语

在互斥锁的实现中还广泛使用了cmpxchg()函数的另一个变体——atomic_try_cmpxchg()函数。如果读者使用cmpxchg()函数的语义去理解它,会得到错误的结论。atomic_try_cmpxchg()函数的实现如下。

#define __atomic_try_cmpxchg(type, _p, _po, _n)                \
({                                  \
     typeof(_po) __po = (_po);                  \
     typeof(*(_po)) __r, __o = *__po;              \
     __r = atomic_cmpxchg##type((_p), __o, (_n));               \
     if (unlikely(__r != __o))                  \
          *__po = __r;                      \
     likely(__r == __o);                    \
})

#define atomic_try_cmpxchg(_p, _po, _n) ss  __atomic_try_cmpxchg(, _p, _po, _n)

atomic_try_cmpxchg()函数的核心还是调用cmpxchg()函数做比较并交换的操作,但是返回值发生了变化,它返回一个判断值(类似于bool值),即判断cmpxchg()函数的返回值是否和第二个参数的值相等。

3.xchg()函数

除了cmpxchg()函数,还广泛使用另一个交换函数——xchg(new, v)。它的实现机制是把new赋给原子变量v,返回原子变量v的旧值。

卷1已经介绍过ARM架构中如下3条内存屏障指令。

下面介绍Linux内核中的内存屏障接口函数,如表1.3所示。

表1.3 Linux内核中的内存屏障接口函数

接口函数

描述

barrier()

编译优化屏障,阻止编译器为了性能优化而进行指令重排

mb()

内存屏障(包括读和写),用于SMP和UP

rmb()

读内存屏障,用于SMP和UP

wmb()

写内存屏障,用于SMP和UP

smp_mb()

用于SMP的内存屏障。对于UP不存在内存一致性的问题(对汇编指令),在UP上就是一个优化屏障,确保汇编代码和C代码的内存一致性

smp_rmb()

用于SMP的读内存屏障

smp_wmb()

用于SMP的写内存屏障

smp_read_barrier_depends()

读依赖屏障

smp_mb__before_atomic/smp_mb__after_atomic

用于在原子操作中插入一个通用内存屏障

在ARM64 Linux内核中实现内存屏障函数的代码如下。

<arch/arm64/include/asm/barrier.h>

#define mb()     dsb(sy)
#define rmb()        dsb(ld)
#define wmb()        dsb(st)

#define dma_rmb(     dmb(oshld)
#define dma_wmb()    dmb(oshst)

在Linux内核中有很多使用内存屏障指令的例子,下面举两个例子。

例1.1:在一个网卡驱动中发送数据包。把网络数据包写入缓冲区后,由DMA引擎负责发送,wmb()函数保证在DMA传输之前,数据被完全写入缓冲区中。

<drivers\net\ethernet\realtek\8139too.c>

static netdev_tx_t rtl8139_start_xmit (struct sk_buff *skb,
                         struct net_device *dev)
{

     skb_copy_and_csum_dev(skb, tp->tx_buf[entry]);
     /*
      写入 TxStatus 以触发 DMA 传输,
      * 使用一条内存屏障指令以保证设备可以看到这些更新后的数据
      */
     wmb();
     RTL_W32_F (TxStatus0 + (entry * sizeof (u32)),
          tp->tx_flag | max(len, (unsigned int)ETH_ZLEN));
      ...
}

例1.2:Linux内核里面的睡眠和唤醒接口函数也运用了内存屏障指令,通常一个进程因为等待某些事件需要睡眠,如调用wait_event()函数。睡眠者的代码片段如下。

for (;;) {
     set_current_state(TASK_UNINTERRUPTIBLE);
     if (event_indicated)
          break;
     schedule();
}

其中,set_current_state()函数在修改进程的状态时隐含插入了内存屏障函数smp_mb()。

<include/linux/sched.h>

#define set_current_state(state_value)                  \
     smp_store_mb(current->state, (state_value))

<include/asm-generic/barrier.h>

#define smp_store_mb(var, value)  do { WRITE_ONCE(var, value); __smp_mb(); } while (0)

唤醒者通常会调用 wake_up()函数,它在修改 task 状态之前也隐含地插入内存屏障函数smp_wmb()。

<wake_up()→autoremove_wake_function()→try_to_wake_up()>

static int
try_to_wake_up(struct task_struct *p, unsigned int state, int wake_flags)
{
     /*
      * 如果要唤醒一个等待 CONDITION 的线程,需要确保
      * CONDITION=1 
      * p->state 之间的访问顺序不能改变

      */
     smp_wmb();

     /* 准备修改 p->state */
     ...
}

在SMP的情况下来观察睡眠者和唤醒者之间的关系如下。

          CPU 1                                CPU 2
=======================     ===============================
set_current_state();               STORE event_indicate
                                     wake_up();
STORE current->state               <write barrier>
<general barrier>                  STORE current->state
LOAD event_indicated
if (event_indicated)
          break;

1.自旋等待的接口函数

Linux内核提供了一个自旋等待的接口函数,它在排队自旋锁机制的实现中广泛应用。smp_cond_load_relaxed()接口函数的定义如下。

<include/asm-generic/barrier.h>

#define smp_cond_load_relaxed(ptr, cond_expr) ({    \
     typeof(ptr) __PTR = (ptr);             \
     typeof(*ptr) VAL;                  \
     for (;;) {                      \
         VAL = READ_ONCE(*__PTR);            \
         if (cond_expr)                  \
             break;                  \
         cpu_relax();                    \
     }                           \
     VAL;                             \
})

该函数有两个参数,第一个参数ptr表示要加载的地址,第二个参数是判断条件,因此该函数会一直原子地加载并判断条件是否成立。

另外,该函数还有一个变体——smp_cond_load_acquire()。二者的区别是在smp_cond_load_relaxed()函数执行完成之后插入一条加载-获取内存屏障指令,而smp_cond_load_relaxed()函数没有隐含任何的内存屏障指令。

#define smp_cond_load_acquire(ptr, cond_expr) ({          \
     typeof(*ptr) _val;                         \
     _val = smp_cond_load_relaxed(ptr, cond_expr);          \
     smp_acquire__after_ctrl_dep();                    \
     _val;                                   \
})

Linux内核中的排队自旋锁机制会使用到smp_cond_load_relaxed()函数。

2.原子变量接口函数

Linux内核提供了两个与原子变量相关的接口函数。

     void smp_mb__before_atomic(void);
     void smp_mb__after_atomic(void);

这两个接口函数用在没有返回值的原子操作函数中,如atomic_add()函数、atomic_dec()函数等,特别适用引用计数递增或者递减的场景。通常原子操作函数是没有隐含内存屏障的。下面是一个使用smp_mb__before_atomic()函数的例子。

     obj->dead = 1;
     smp_mb__before_atomic();
     atomic_dec(&obj->ref_count);

smp_mb__before_atomic()函数用于保证所有的内存操作在递减obj->ref_count之前都已经完成,确保其他CPU观察到这些变化——在递减obj->ref_count之前已经把obj->dead设置为1了。

如果临界区中只有一个变量,那么原子变量可以解决问题,但是大多数情况下临界区有一个数据操作的集合。例如,先从一个数据结构中移出数据,对其进行数据解析,然后写回该数据结构或者其他数据结构中,类似于read-modify-write操作。另外一个常见的例子是临界区里有链表的相关操作。整个执行过程需要保证原子性,在数据更新完毕前,不能从其他内核代码路径访问和改写这些数据。这个过程使用原子变量不合适,需要使用锁机制来完成,自旋锁(spinlock)是Linux内核中最常见的锁机制。

自旋锁在同一时刻只能被一个内核代码路径持有。如果另外一个内核代码路径试图获取一个已经被持有的自旋锁,那么该内核代码路径需要一直忙等待,直到自旋锁持有者释放该锁。如果该锁没有被其他内核代码路径持有(或者称为锁争用),那么可以立即获得该锁。自旋锁的特性如下。

先看spinlock数据结构的定义。

<include/linux/spinlock_types.h>

typedef struct spinlock {
     struct raw_spinlock rlock;
} spinlock_t;

typedef struct raw_spinlock {
     arch_spinlock_t raw_lock;
} raw_spinlock_t;

<早期Linux内核中的定义>

typedef struct {
     union {
          u32 slock;
          struct __raw_tickets {
               u16 owner;
               u16 next;
          } tickets;
     }s;
} arch_spinlock_t;

spinlock数据结构的定义既考虑到了不同处理器架构的支持和实时性内核的要求,还定义了raw_spinlock和arch_spinlock_t数据结构,其中arch_spinlock_t数据结构和架构有关。在Linux 2.6.25内核之前,spinlock数据结构就是一个简单的无符号类型变量。若slock值为1,表示锁未被持有;若为0,表示锁被持有。之前的自旋锁机制比较简洁,特别是在没有锁争用的情况下;但也存在很多问题,尤其是在很多CPU争用同一个自旋锁时,会导致严重的不公平性和性能下降。当该锁释放时,事实上,可能刚刚释放该锁的CPU马上又获得了该锁的使用权,或者在同一个NUMA节点上的CPU都可能抢先获取了该锁,而没有考虑那些已经在锁外面等待了很久的CPU。因为刚刚释放锁的CPU的L1 高速缓存中存储了该锁,它比别的CPU更快获取锁,这对于那些已经等待很久的CPU是不公平的。在NUMA处理器中,锁争用会严重影响系统的性能。测试表明,在一个双CPU插槽的8核处理器中,自旋锁争用情况愈发明显,有些线程甚至需要尝试1000000次才能获取锁。因此在Linux 2.6.25内核后,自旋锁实现了“基于排队的FIFO”算法的自旋锁机制,本书中简称为排队自旋锁。

基于排队的自旋锁仍然使用原来的数据结构,但slock域被拆分成两个部分,如图1.1所示,owner表示自旋锁持有者的牌号,next表示外面排队的队列中末尾者的牌号。这类似于排队吃饭的场景,在用餐高峰时段,各大饭店人满为患,顾客来晚了都需要排队。为了简化模型,假设某个饭店只有一张饭桌,刚开市时,next和owner都是0。

▲图1.1 slock域的定义

顾客A来时,因为next和owner都是0,说明锁未被持有。此时因为饭店还没有顾客,所以顾客A的牌号是0,直接进餐,这时next++。

顾客B来时,因为next为1,owner为0,说明锁被人持有;服务员给他1号牌,让他在饭店门口等待,next++。

顾客C来了,因为next为2,owner为0,服务员给他2号牌,让他在饭店门口排队等待,next++。

这时顾客A吃完并买单了,owner++,owner的值变为1。服务员会让牌号和owner值相等的顾客就餐,顾客B的牌号是 1,所以现在请顾客B就餐。有新顾客来时next++,服务员分配牌号;顾客结账时,owner++,服务员叫号,owner值和牌号相等的顾客就餐。

自旋锁的原型定义在include/linux/spinlock.h头文件中。

<include/linux/spinlock.h>

static inline void spin_lock(spinlock_t *lock)
{
     raw_spin_lock(&lock->rlock);
}

static inline void __raw_spin_lock(raw_spinlock_t *lock)
{
     preempt_disable();
     spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
     LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}

spin_lock()函数最终调用__raw_spin_lock()函数来实现。首先关闭内核抢占,这是自旋锁实现的关键点之一。那么为什么自旋锁临界区中不允许发生抢占呢?

如果自旋锁临界区中允许抢占,假设在临界区内发生中断,中断返回时会检查抢占调度,这里将有两个问题:一是抢占调度会导致持有锁的进程睡眠,这违背了自旋锁不能睡眠和快速执行完成的设计语义;二是抢占调度进程也可能会申请自旋锁,这样会导致发生死锁。关于中断返回时检查抢占调度的内容可以参考卷1。

如果系统没有打开CONFIG_LOCKDEP和CONFIG_LOCK_STAT选项,spin_acquire()函数其实是一个空函数,并且LOCK_CONTENDED()只是直接调用do_raw_spin_lock()函数。

static inline void do_raw_spin_lock(raw_spinlock_t *lock) __acquires(lock)
{
     arch_spin_lock(&lock->raw_lock);
}

由于Linux 5.0内核里的spin_lock已经实现了排队的自旋锁机制,该机制的分析会在后面的章节里介绍。本章介绍Linux 4.0内核中spin_lock的实现。

下面来看arch_spin_lock()函数的实现。

<linux4.0/arch/arm64/include/asm/spinlock.h>

1 static inline void arch_spin_lock(arch_spinlock_t *lock)
2 {
3    unsigned int tmp;
4    arch_spinlock_t lockval, newval;
5 
6    asm volatile(
7    /* 自动实现下一次排队 */
8   "prfm     pstl1strm, %3\n"
9   "1:    ldaxr     %w0, %3\n"
10  "add     %w1, %w0, %w5\n"
11  "stxr     %w2, %w1, %3\n"
12  "cbnz     %w2, 1b\n"
13   /* 是否获得了锁 */
14  "eor     %w1, %w0, %w0, ror #16\n"
15  "cbz     %w1, 3f\n"
16   /*
17    * 若没有获得锁,自旋,
18    * 发送本地事件,以避免在独占加载前忘记解锁
19    */
20  "sevl\n"
21  "2:    wfe\n"
22  "ldaxrh     %w2, %4\n"
23  "eor     %w1, %w2, %w0, lsr #16\n"
24  "cbnz     %w1, 2b\n"
25   /* 获得锁,临界区从这里开始 */
26  "3:"
27   : "=&r" (lockval), "=&r" (newval), "=&r" (tmp), "+Q" (*lock)
28   : "Q" (lock->owner), "I" (1 << TICKET_SHIFT)
29   : "memory");
30}

该函数只有一个参数lock。

在第9行中,通过ldaxr指令把参数lock的值加载到变量lockval中。

在第10行中,通过add指令把lockval的值增加1 << TICKET_SHIFT(其中,TICKET_SHIFT为16),这相当于把lockval中的next域加1,然后保存到newval变量中。

在第11行中,使用stxr指令把newval的值写入lock中。当stxr指令原子地存储完成时,tmp的值为0。

在第12行中,使用cbnz指令来判断tmp值是否为0。若为零,则说明stxz指令执行完成;若不为0,则跳转到标签1处。

在第14行中,%w0表示临时变量lockval的值。这里使用ror把lockval值右移16位获得owner域,然后和next域进行“异或”。

在第15行中,cbz指令用来判断%w1的值是否为0。若为0,说明ower域和next域相等,即owner等于该CPU持有的牌号(lockval.next)时,该CPU成功获取了自旋锁,跳转到标签3处并返回。若不为0,则调用wfe指令让CPU进入等待状态。

在第21~24行中,让CPU进入等待状态。当有其他CPU唤醒本CPU时,说明该自旋锁的owner域发生了变化,即该锁被释放。在第23~24行中,若新owner域的值和next域的值相等,即owner等于该CPU持有的牌号(lockval.next),说明该CPU成功获取了自旋锁。若不相等,只能继续跳转到标签2处让CPU进入等待状态。

接下来说明ARM64架构中的wfe指令。ARM 64架构中的等待中断(Wait For Interrupt,WFI)和等待事件(Wait For Event,WFE)指令都可以让ARM核进入睡眠模式。WFI直到有WFI唤醒事件发生才会唤醒CPU,WFE直到有WFE唤醒事件发生才会唤醒CPU。这两类事件大致相同,唯一的不同在于WFE指令可以被其他CPU上的SEV指令唤醒,SEV指令是用于修改Event寄存器的指令。

下面来看释放自旋锁的arch_spin_unlock()函数的实现。

static inline void arch_spin_unlock(arch_spinlock_t *lock)
{
     asm volatile(
"    stlrh     %w1, %0\n"
     : "=Q" (lock->owner)
     : "r" (lock->owner + 1)
     : "memory");
}

arch_spin_unlock()函数实现比较简单,使用stlrh指令来让lock->owner域加1。另外,使用stlrh指令来释放锁,并且让处理器的独占监视器(exclusives monitor)监测到锁临界区被清除,即处理器的全局监视器监测到部分内存区域从独占访问状态(exclusive access state)变成了开放访问状态(open access state),从而触发一个WFE唤醒事件。

在编写驱动代码的过程中常常会遇到这样一个问题。假设某个驱动中有一个链表a_driver_list,在驱动中很多操作都需要访问和更新该链表,如open、ioctl等,因此操作链表的地方就是一个临界区,需要自旋锁来保护。若在临界区中发生了外部硬件中断,系统暂停当前进程的执行转而处理该中断。假设中断处理程序恰巧也要操作该链表,链表的操作是一个临界区,所以在操作之前要调用spin_lock()函数来对该链表进行保护。中断处理程序试图获取该自旋锁,但因为它已经被其他CPU持有了,于是中断处理程序进入忙等待状态或者wfe睡眠状态。在中断上下文中出现忙等待或者睡眠状态是致命的,中断处理程序要求“短”和“快”,自旋锁的持有者因为被中断打断而不能尽快释放锁,而中断处理程序一直在忙等待该锁,从而导致死锁的发生。Linux内核的自旋锁的变体spin_lock_irq()函数通过在获取自旋锁时关闭本地CPU中断,可以解决该问题。

<include/linux/spinlock.h>

static inline void spin_lock_irq(spinlock_t *lock)
{
     raw_spin_lock_irq(&lock->rlock);
}

static inline void __raw_spin_lock_irq(raw_spinlock_t *lock)
{
     local_irq_disable();
     preempt_disable();
     do_raw_spin_lock();
}

spin_lock_irq()函数的实现比spin_lock()函数多了一个local_irq_disable()函数。local_irg_disable()函数用于关闭本地处理器中断,这样在获取自旋锁时可以确保不会发生中断,从而避免发生死锁问题,即spin_lock_irq()函数主要防止本地中断处理程序和自旋锁持有者之间产生锁的争用。可能有的读者会有疑问,既然关闭了本地CPU的中断,那么别的CPU依然可以响应外部中断,这会不会也可能导致死锁呢?自旋锁持有者在CPU0上,CPU1响应了外部中断且中断处理程序同样试图去获取该锁,因为CPU0上的自旋锁持有者也在继续执行,所以它很快会离开临界区并释放锁,这样CPU1上的中断处理程序可以很快获得该锁。

在上述场景中,如果CPU0在临界区中发生了进程切换,会是什么情况?注意,进入自旋锁之前已经显式地调用preempt_disable()函数关闭了抢占,因此内核不会主动发生抢占。但令人担心的是,驱动编写者主动调用睡眠函数,从而发生了调度。使用自旋锁的重要原则是拥有自旋锁的临界区代码必须原子地执行,不能休眠和主动调度。但在实际项目中,驱动代码编写者常常容易犯错误。如调用分配内存函数kmalloc()时,可能因为系统空闲内存不足而进入睡眠模式,除非显式地使用GFP_ATOMIC分配掩码。

spin_lock_irqsave()函数会保存本地CPU当前的irq状态并且关闭本地CPU中断,然后获取自旋锁。local_irq_save()函数在关闭本地CPU中断前把CPU当前的中断状态保存到flags变量中。在调用local_irq_restore()函数时把flags值恢复到相关寄存器中,如ARM的CPSR,这样做的目的是防止破坏中断响应的状态。

自旋锁还有另外一个常用的变体——spin_lock_bh()函数,用于处理进程和延迟处理机制导致的并发访问的互斥问题。

若在一个项目中有的代码中使用spin_lock()函数,而有的代码使用raw_spin_lock()函数,并且spin_lock()函数直接调用raw_spin_lock()函数,这样可能会给读者造成困惑。

这要从Linux内核的实时补丁(RT-patch)说起。实时补丁旨在提升Linux内核的实时性,它允许在自旋锁的临界区内抢占锁,且在临界区内允许进程睡眠,这样会导致自旋锁语义被修改。当时内核中大约有10 000处使用了自旋锁,直接修改自旋锁的工作量巨大,但是可以修改那些真正不允许抢占和睡眠的地方,大概有100处,因此改为使用raw_spin_lock()函数。spin_lock()和raw_spin_lock()函数的区别如下。

在绝对不允许抢占和睡眠的临界区,应该使用raw_spin_lock()函数,否则使用spin_lock()。

因此对于没有更新实时补丁的Linux内核来说,spin_lock()函数可以直接调用raw_spin_lock(),对于更新实时补丁的Linux内核来说,spin_lock()会变成可抢占和睡眠的锁,这一点需要特别注意。

MCS锁是自旋锁的一种优化方案,它是以两个发明者Mellor-Crummey和Scott的名字来命名的,论文“Algorithms for Scalable Synchronization on Shared-Memory Multiprocessor”发表在1991年的ACM Transactions on Computer Systems期刊上。自旋锁是Linux内核中使用最广泛的一种锁机制。长期以来,内核社区一直关注自旋锁的高效性和可扩展性。在Linux 2.6.25 内核中,自旋锁已经采用排队自旋算法进行优化,以解决早期自旋锁争用的问题。但是在多处理器和NUMA系统中,排队自旋锁仍然存在一个比较严重的问题。假设在一个锁争用激烈的系统中,所有等待自旋锁的线程都在同一个共享变量上自旋,申请和释放锁都在同一个变量上修改,高速缓存一致性原理(如MESI协议)导致参与自旋的CPU中的高速缓存行变得无效。在锁争用的激烈过程中,可能导致严重的CPU高速缓存行颠簸现象(CPU cacheline bouncing),即多个CPU上的高速缓存行反复失效,大大降低系统整体性能。

MCS算法可以解决自旋锁遇到的问题,显著缓解 CPU 高速缓存行颠簸问题。MCS算法的核心思想是每个锁的申请者只在本地 CPU 的变量上自旋,而不是全局的变量上。虽然MCS算法的设计是针对自旋锁的,但是早期MCS算法的实现需要比较大的数据结构,而自旋锁常常嵌入系统中一些比较关键的数据结构中,如物理页面数据结构 page。这类数据结构对大小相当敏感,因此目前MCS算法只用在读写信号量和互斥锁的自旋等待机制中。Linux内核版本的MCS锁最早是由社区专家Waiman Long在Linux 3.10内核中实现的,后来经过其他的社区专家的不断优化成为现在的osq_lock,OSQ锁是MCS锁机制的一个具体的实现。内核社区并没有放弃对自旋锁的持续优化,在Linux 4.2内核中引进了基于MCS算法的排队自旋锁(Queued Spinlock,Qspinlock)。

MCS锁本质上是一种基于链表结构的自旋锁,OSQ锁的实现需要两个数据结构。

<include/linux/osq_lock.h>

struct optimistic_spin_queue {
     atomic_t tail;
};

struct optimistic_spin_node {
     struct optimistic_spin_node *next, *prev;
     int locked;
     int cpu; 
};

每个MCS锁有一个optimistic_spin_queue数据结构,该数据结构只有一个成员tail,初始化为0。optimistic_spin_node数据结构表示本地CPU上的节点,它可以组织成一个双向链表,包含next和prev指针,locked成员用于表示加锁状态,cpu成员用于重新编码CPU编号,表示该节点在哪个CPU上。optimistic_spin_node数据结构会定义成per-CPU变量,即每个CPU有一个节点结构。

<kernel/locking/osq_lock.c>

static DEFINE_PER_CPU_SHARED_ALIGNED(struct optimistic_spin_node, osq_node);

MCS锁在osq_lock_init()函数中初始化。如互斥锁会初始化为一个MCS锁,因为__mutex_init()函数会调用osq_lock_init()函数。

<kernel/locking/mutex.c>

void
__mutex_init(struct mutex *lock, const char *name, struct lock_class_key *key)
{
...
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
     osq_lock_init(&lock->osq);
#endif
...
}

static inline void osq_lock_init(struct optimistic_spin_queue *lock)
{
     atomic_set(&lock->tail, 0);
}

osq_lock()函数用于申请MCS锁。下面来看该函数如何进入快速申请通道。

<kernel/locking/osq_lock.c>

 bool osq_lock(struct optimistic_spin_queue *lock)
 {
    struct optimistic_spin_node *node = this_cpu_ptr(&osq_node);
    struct optimistic_spin_node *prev, *next;
    int curr = encode_cpu(smp_processor_id());
    int old;

    node->locked = 0;
    node->next = NULL;
    node->cpu = curr;

   old = atomic_xchg(&lock->tail, curr);
   if (old == OSQ_UNLOCKED_VAL)
         return true;

node指向当前CPU的optimistic_spin_node。

optimistic_spin_node数据结构中cpu成员用于表示CPU编号,这里的编号方式和CPU编号方式不太一样,0表示没有CPU,1表示CPU0,以此类推,见encode_cpu()函数。

接着,使用函数atomic_xchg()交换全局lock->tail和当前CPU编号。如果lock->tail的旧值等于初始化值OSQ_UNLOCKED_VAL(值为0),说明还没有CPU持有锁,那么让lock->tail等于当前CPU编号,表示当前CPU成功持有锁,这是最快捷的方式。如果lock->tail的旧值不等于OSQ_UNLOCKED_VAL,获取锁失败,那么将进入中速申请通道。

下面看看如果没能成功获取锁的情况,即lock->tail的值指向其他CPU编号,说明有CPU已经持有该锁。

<osq_lock()>

  prev = decode_cpu(old);
  node->prev = prev;
  ACCESS_ONCE(prev->next) = node;

之前获取锁失败,变量old的值(lock->tail的旧值)指向某个CPU编号,因此decode_cpu()函数返回的是变量old指向的CPU所属的节点。

接着把curr_node插入MCS链表中,curr_node->prev指向前继节点,而prev_node->next指向当前节点。

<osq_lock()>

  while (!ACCESS_ONCE(node->locked)) {
       /*
        *检查是否需要被调度
        */
       if (need_resched())
            goto unqueue;

       cpu_relax_lowlatency();
  }
  return true;

while循环一直查询curr_node->locked是否变成了1,因为prev_node释放锁时会把它的下一个节点中的locked成员设置为1,然后才能成功释放锁。在理想情况下,若前继节点释放锁,那么当前进程也退出自旋,返回true。

在自旋等待过程中,如果有更高优先级的进程抢占或者被调度器调度出去(见need_resched()函数),那应该放弃自旋等待,退出MCS链表,跳转到unqueue标签,处理MCS链表删除节点的情况。unqueue标签用于处理异常情况,正常情况是在while循环中等待锁,如图1.2所示。

▲图1.2 申请MCS锁的流程图

OSQ锁的实现比较复杂的原因在于OSQ锁必须要处理need_resched()函数的异常情况,否则可以很简洁。

unqueue标签处实现删除链表等操作,这里仅使用了原子比较并交换指令,并没有使用其他的锁,这体现了无锁并发编程的精髓。

删除MCS链表节点分为如下3个步骤。

(1)解除前继节点(prev_node)的next指针的指向。

(2)解除当前节点(curr_node)的next指针的指向,并且找出后继节点(next_node)。

(3)让前继节点的next指针指向next_node,next_node的prev指针指向前继节点。

下面先看步骤(1)的实现。

<osq_lock()>

unqueue:
     for (;;) {
         if (prev->next == node &&
         cmpxchg(&prev->next, node, NULL) == node)
             break;

         if (smp_load_acquire(&node->locked))
             return true;

         cpu_relax();

         prev = READ_ONCE(node->prev);
     }
 

如果前继节点的next指针指向当前节点,说明其间链表还没有被修改,接着用cmpxchg()函数原子地判断前继节点的next指针是否指向当前节点。如果指向,则把prev->next指向NULL,并且判断返回的前继节点的next指针是否指向当前节点。如果上述条件都成立,就达到解除前继节点的next指针指向的目的了。

如果上述原子比较并交换指令判断失败,说明其间MCS链表被修改了。利用这个间隙,smp_load_acquire()宏会再一次判断当前节点是否持有了锁。smp_load_acquire()宏的定义如下。

<arch/arm/include/asm/barrier.h>

#define smp_load_acquire(p)                   \
({                                \
    typeof(*p) ___p1 = ACCESS_ONCE(*p);            \
    compiletime_assert_atomic_type(*p);            \
    smp_mb();                            \
    ___p1;                               \
})

ACCESS_ONCE()宏使用volatile关键字强制重新加载p的值,smp_mb()函数保证内存屏障之前的读写指令都执行完毕。如果这时判断curr_node的locked为1,说明当前节点持有了锁,返回true。为什么当前节点莫名其妙地持有了锁呢?因为前继节点释放了锁并且把锁传递给当前节点。

如果前继节点的next指针不指向当前节点,就说明当前节点的前继节点发生了变化,这里重新加载新的前继节点,继续下一次循环。

接下来看步骤(2)的实现。

<osq_lock()>

     next = osq_wait_next(lock, node, prev);
     if (!next)
          return false;

步骤(1)处理前继节点的next指针指向问题,现在轮到处理当前节点的next指针指向问题,关键实现在osq_wait_next()函数中。

<osq_lock()->osq_wait_next()>

0 static inline struct optimistic_spin_node *
1 osq_wait_next(struct optimistic_spin_queue *lock,
2        struct optimistic_spin_node *node,
3        struct optimistic_spin_node *prev)
4 {
5    struct optimistic_spin_node *next = NULL;
6    int curr = encode_cpu(smp_processor_id());
7    int old;
8
9    old = prev ? prev->cpu : OSQ_UNLOCKED_VAL;
10
11   for (;;) {
12        if (atomic_read(&lock->tail) == curr &&
13             atomic_cmpxchg(&lock->tail, curr, old) == curr) {
14              break;
15        }
16
17        if (node->next) {
18              next = xchg(&node->next, NULL);
19              if (next)
20                   break;
21        }
22
23        cpu_relax_lowlatency();
24   }
25
26   return next;
27}

变量curr是指当前进程所在的CPU编号,变量old是指前继节点prev_node所在的CPU编号。如果前继节点为空,那么old值为0。

在第12~13行中,判断当前节点是否为MCS链表中的最后一个节点。如果是,说明当前节点位于链表末尾,即没有后继节点,直接返回next(即NULL)。为什么通过原子地判断lock->tail值是否等于curr即可判断当前节点是否位于链表末尾呢?

如图1.3所示,如果当前节点位于MCS链表的末尾,curr值和lock->tail值相等。如果其间有进程正在申请锁,那么curr值为2,但是lock->tail值会变成其他值,因为在快速申请通道中使用atomic_xchg()函数修改了lock->tail的值。当CPU2加入该锁的争用时,lock->tail=3。

▲图1.3 osq_wait_next()函数中判断当前节点是否在链表末尾的方法

在第17~21行中,如果当前节点有后继节点,那么把当前节点的next指针设置为NULL,解除当前节点的next指针的指向,并且返回后继节点,这样就完成了步骤(2)的目标。第23行的cpu_relax_lowlatency()函数在ARM中是一条BARRIER指令。

接下来看步骤(3)的实现。

<osq_lock()>

     WRITE_ONCE(next->prev, prev);
     WRITE_ONCE(prev->next, next);

     return false;

后继节点的prev指针指向前继节点,前继节点的next指针指向后继节点,这样就完成了当前节点脱离MCS链表的操作。最后返回false,因为没有成功获取锁。

MCS锁的架构,如图1.4所示。

▲图1.4 MCS锁的架构

接下来看MCS锁是如何释放锁的。

<kernel/locking/osq_lock.c>

void osq_unlock(struct optimistic_spin_queue *lock)
{
     struct optimistic_spin_node *node, *next;
     int curr = encode_cpu(smp_processor_id());
     /*
      * 快速通道
      */
     if (likely(atomic_cmpxchg_release(&lock->tail, curr,
                      OSQ_UNLOCKED_VAL) == curr))
         return;
     /*
      * 慢速通道
      */
     node = this_cpu_ptr(&osq_node);
     next = xchg(&node->next, NULL);
     if (next) {
         WRITE_ONCE(next->locked, 1);
         return;
     }
     next = osq_wait_next(lock, node, NULL);
     if (next)
         WRITE_ONCE(next->locked, 1);
}

如果lock->tail保存的CPU编号正好是当前进程的CPU编号,说明没有CPU来竞争该锁,那么直接把lock->tail设置为0,释放锁,这是最理想的情况,代码中把此种情况描述为快速通道(fast path)。注意,此处依然要使用函数atomic_cmpxchg()。

下面进入慢速通道,首先通过xchg()函数使当前节点的指针(node->next)指向NULL。如果当前节点有后继节点,并返回后继节点,那么把后继节点的locked成员设置为1,相当于把锁传递给后继节点。这里相当于告诉后继节点锁已经传递给它了。

如果后继节点为空,说明在执行osq_unlock()函数期间有成员擅自离队,那么只能调用osq_wait_next()函数来确定或者等待确定的后继节点,也许当前节点就在链表末尾。当然,也会有“后继无人”的情况。

读者可以在阅读完1.5节之后再来细细体会MCS锁设计的精妙之处。

在Linux 2.6.25内核中,为了解决自旋锁在争用激烈场景下导致的性能低下问题,引入了基于排队的FIFO算法,但是该算法依然没能解决高速缓存行颠簸问题,学术界因此提出了MCS锁。MCS锁机制会导致spinlock数据结构变大,在内核中很多数据结构内嵌了spinlock数据结构,这些数据结构对大小很敏感,这导致了MCS锁机制一直没能在spinlock数据结构上应用,只能屈就于互斥锁和读写信号量。但内核社区的专家Waiman Long和Peter Zijlstra并没有放弃对自旋锁的持续优化,他们在Linux 4.2内核中引进了排队自旋锁(Queued Spinlock,Qspinlock)机制。Waiman Long在双CPU插槽的计算机上运行一些系统测试项目时发现,排队自旋锁机制比排队机制在性能方面提高了20%,特别是在锁争用激烈的场景下,文件系统的测试性能会有116%的提高。排队自旋锁机制非常适合NUMA架构的服务器,特别是有大量的CPU内核且锁争用激烈的场景。

q_spinlock数据结构依然采用spinlock数据结构。

<include/asm-generic/qspinlock_types.h>

typedef struct qspinlock {
     union {
         atomic_t val;
         struct {
             u8     locked;
             u8     pending;
         };
         struct {
             u16     locked_pending;
             u16     tail;
         };
     };
} arch_spinlock_t;

qspinlock数据结构把val字段分成多个域,如表1.4所示。

表1.4 qspinlock中val字段的含义

描述

Bit[0:7]

locked域,表示成功持有了锁

Bit[8]

pending域,表示第一顺位继承者,自旋等待锁释放

Bit[9:15]

未使用

Bit[16:17]

tail_idx 域,用来获取q_nodes,目前支持4种上下文的mcs_nodes——进程上下文task、软中断上下文softirq、硬中断上下文hardirq和不可屏蔽中断上下文nmi

Bit[18:31]

tail_cpu域,用来标识等待队列末尾的CPU

原来的spinlock数据结构中的val字段被分割成locked、pending、tail_idx和tail_cpu这4个域。Linux内核使用一个 {x, y, z}三元组来表示锁的状态,其中x表示tail,即tail_cpu和tail_idx域,y表示pending域,z表示locked域,如图1.5所示。

▲图1.5 锁的三元组

另外,排队自旋锁还利用了MCS锁机制,每个CPU都定义一个mcs_spinlock数据结构。

<kernel/locking/mcs_spinlock.h>

struct mcs_spinlock {
     struct mcs_spinlock *next;
     int locked; 
     int count;  
};

<kernel/locking/qspinlock.c>

struct qnode {
     struct mcs_spinlock mcs;
};

static DEFINE_PER_CPU_ALIGNED(struct qnode, qnodes[4]);

这里为每个CPU都定义了4个qnode,用于4个上下文,分别为task、softirq、hardirq和nmi。但这里只是预先规划,实际代码暂时还没有用到4个qnode。

我们以一个实际场景来分析排队自旋锁的实现,假设CPU0、CPU1、CPU2以及CPU3争用一个自旋锁,它们都在进程上下文中争用该锁。

在初始状态,没有CPU获取该锁,那么锁的三元组的初始值为{0, 0, 0},如图1.6(a)所示。

这时,CPU0的进程调用queued_spin_lock()函数去申请该锁,因为没有其他CPU竞争,所以CPU0很快获取了锁,如图1.6(b)所示。lock->val中,locked=1,pending=0,tail=0,即三元组的值为{0, 0, 1}。

▲图1.6 锁的初始状态

假设这时CPU1也尝试获取该锁,CPU1上的进程调用queued_spin_lock()函数申请该锁。 queued_spin_lock()函数的主要代码如下。

<include/asm-generic/qspinlock.h>

static __always_inline void queued_spin_lock(struct qspinlock *lock)
{
     u32 val = 0;

     if (likely(atomic_try_cmpxchg_acquire(&lock->val, &val, _Q_LOCKED_VAL)))
         return;

     queued_spin_lock_slowpath(lock, val);
}

queued_spin_lock()函数会区分两种场景。

读者需要注意的是 atomic_try_cmpxchg_acquire()函数和 cmpxchg()函数的细微差别。atomic_try_cmpxchg_acquire()函数会调用 cmpxchg()函数来做比较并交换操作,比较lock->val和val的值是否相等。若相等,则设置lock->val的值为_Q_LOCKED_VAL,然后判断cmpxchg()函数的返回值是否和val相等。

当快速申请通道无法获取锁时,进入中速申请通道,即queued_spin_lock_slowpath()函数。这个场景下,CPU0持有锁,CPU1尝试获取锁。

<kernel/locking/qspinlock.c>
<queued_spin_lock()->queued_spin_lock_slowpath()>

void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
{
     struct mcs_spinlock *prev, *next, *node;
     u32 old, tail;
     int idx;

     if (val == _Q_PENDING_VAL) {
         int cnt = _Q_PENDING_LOOPS;
         val = atomic_cond_read_relaxed(&lock->val,
                           (VAL != _Q_PENDING_VAL) || !cnt--);

首先,判断val的值是否设置了pending域。如果设置了pending域,说明此时处于一个临时状态,即锁持有者正在释放锁的过程中,这个过程会把锁传递给第一顺位继承者。此时,需要等待pending域释放。这里使用atomic_cond_read_relaxed()函数来自旋等待。

<queued_spin_lock_slowpath()>

     /*
      * 如果我们发现了锁争用,那么跳转到queue标签处
      */
     if (val & ~_Q_LOCKED_MASK)
         goto queue;

然后,判断pending域和tail域是否有值,其中_Q_LOCKED_MASK为0xFF。如果有值,如tail_cpu域有值,即有CPU在等待队列中等候,说明已经有CPU在等待这个锁,那么只好跳转到queue标签处去排队。

<queued_spin_lock_slowpath()>

    /*
     * trylock || pending
     *
     * 0,0,* -> 0,1,* -> 0,0,1 pending, trylock
     */
     val = queued_fetch_set_pending_acquire(lock);

设置pending域。这里设置pending域的含义是表明CPU1为第一顺位继承者,那么自旋等待是最合适的。现在lock->val的状态变为locked=1,pending=1,tail=0,如图1.7(a)所示。注意,queued_fetch_set_pending_acquire()函数返回的是lock->val的旧值。

在本场景中,CPU1会运行到这里并通过queued_fetch_set_pending_acquire()函数来设置pending域。然而,若CPU2也加入该锁的争用,则在前面的判断中就发现锁的pending域被CPU1置1了,直接跳转到queue标签处,加入MCS队列中。

<queued_spin_lock_slowpath()>

     /*
      * 如果我们发现了锁的争用,那是一个并发的锁
      *
      */
     if (unlikely(val & ~_Q_LOCKED_MASK)) {
         if (!(val & _Q_PENDING_MASK))
             clear_pending(lock);

         goto queue;
     }

我们继续做一些检查,这里使用lock->val的旧值来判断pending域和tail域是否有值。如果有值,则说明已经有CPU在等待这个锁,那么只好跳转到queue标签处去排队。在之前已经判断了pending域和tail域,为什么这里还需要判断呢?什么场景下会运行到这里呢?这里有一个比较复杂并且特殊的场景需要考虑,详细分析请看1.5.5节。

<queued_spin_lock_slowpath()>

     /*
      * 自旋等待锁持有者释放锁
      *
      * 0,1,1 -> 0,1,0
      *
      */
     if (val & _Q_LOCKED_MASK)
         atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_MASK));

由于刚才不仅设置了pending域而且做了必要的检查,因此就自旋等待。atomic_cond_read_acquire()函数内置了自旋等待的机制,一直原子地加载和判断条件是否成立。这里判断条件为val字段的locked域是否为0,也就是锁持有者是否释放了锁。当锁持有者释放锁时,lock->val中的locked域会被清零,atomic_cond_read_acquire()函数会退出for循环。

<queued_spin_lock_slowpath()>

    /*
     * 成功持有锁,pending域清零
     *
     * 0,1,0 -> 0,0,1
     */
    clear_pending_set_locked(lock);
    qstat_inc(qstat_lock_pending, true);
    return;

接下来,clear_pending_set_locked()函数把pending域的值清零并且设置locked域为1,表示CPU1已经成功持有了该锁并返回,如图1.7(b)所示。

static __always_inline void clear_pending_set_locked(struct qspinlock *lock)
{
     struct __qspinlock *l = (void *)lock;

     WRITE_ONCE(l->locked_pending, _Q_LOCKED_VAL);
}

上述内容是比较理想的状况,也就是中速获取锁的情况。

▲图1.7 中速申请通道

慢速申请通道的场景是CPU0持有锁,CPU1设置pending域并且在自旋等待,CPU2也加入锁的争用行列。在此场景下,CPU2会跳转到queue标签处。

<queued_spin_lock_slowpath()>

     /*
      * 如果我们发现了锁争用,那么跳转到queue标签处
      */
     if (val & ~_Q_LOCKED_MASK)
         goto queue;

接下来看queue的代码。

<queued_spin_lock_slowpath()>

queue:
     node = this_cpu_ptr(&qnodes[0].mcs);
     idx = node->count++;
     tail = encode_tail(smp_processor_id(), idx);

     node = grab_mcs_node(node, idx);

     node->locked = 0;
     node->next = NULL;

     if (queued_spin_trylock(lock))
         goto release;

前面提到排队自旋锁会利用MCS锁机制来进行排队。首先,获取当前CPU对应的mcs_spinlock节点,通常使用mcs_spinlock[0]节点[3]

encode_tail()函数把lock->val中的tail域再进行细分,其中Bit[16:17]存放tail_idx,Bit[18:31]存放tail_cpu(CPU编号)。encode_tail()函数的实现如下。

static inline __pure u32 encode_tail(int cpu, int idx)
{
     u32 tail;
     tail  = (cpu + 1) <<_Q_TAIL_CPU_OFFSET;
     tail |= idx <<_Q_TAIL_IDX_OFFSET;
     return tail;
}

假设CPU0持有锁,CPU1为第一顺位继承者,CPU2为当前锁申请者,那么lock->val中locked=1,pending=1,tail_idx=0,tail_cpu=0。node->locked设置为0,表示当前CPU2的mcs_spinlock节点并没有持有锁。

queued_spin_trylock()函数表示尝试获取锁。这时可能访问高速缓存中 pre-cpu 的mcs_spinlock节点,因此可能在这个时间点成功获取锁。

<queued_spin_lock_slowpath()>

     old = xchg_tail(lock, tail);
     next = NULL;

xchg_tail()函数把新的tail值原子地设置到lock->tail中。新的lock->val中,locked=1,pending=1,tail_idx=0,tail_cpu=3。旧的lock->val中,locked=1,pending=1,tail_idx=0,tail_cpu=0,如图1.8所示。

▲图1.8 CPU2加入锁的争用

<queued_spin_lock_slowpath()>

     /*
      * 若有前继节点,那么挂入该链表后并等待,直到成为等待队列的链表头
      */
     if (old & _Q_TAIL_MASK) {
         prev = decode_tail(old);
         WRITE_ONCE(prev->next, node);
         arch_mcs_spin_lock_contended(&node->locked);

         next = READ_ONCE(node->next);
         if (next)
             prefetchw(next);
     }

前面已经把新tail域的值设置到lock->val变量中,变量old是交换之前的旧值。

如果旧的lock->val中的tail域有值,说明之前已经有别的CPU在MCS等待队列中。我们需要把自己的节点添加到MCS等待队列末尾,然后等待前继节点释放锁,并且把锁传递给自己,这是MCS算法的特点。

decode_tail()函数取出前继节点。

通过设置prev->next域中指针的指向来把当前节点加入MCS等待队列中。

在arch_mcs_spin_lock_contended()函数中,当前节点会在自己的MCS节点中自旋并等待node->locked被设置为1。注意,这是MCS算法的优点,每个等待的线程都在本地的MCS节点上自旋,而不是在全局的自旋锁中自旋,这样能够有效地缓解CPU高速缓存行颠簸现象。

arch_mcs_spin_lock_contended()函数的定义如下。

<kernel/locking/mcs_spinlock.h>

#define arch_mcs_spin_lock_contended(l)                  \
do {                                 \
     smp_cond_load_acquire(l, VAL);                  \
} while (0)
#endif

当前继节点把锁传递给当前节点时,当前CPU会从睡眠状态唤醒,然后退出arch_mcs_spin_lock_contended()函数中的while循环。

假设这时CPU3也加入该锁的争用中,CPU3对应的MCS节点有前继节点,那么CPU3会在MCS节点中自旋,等待node->locked被设置为1,如图1.9所示。4个CPU的状态如下。

我们接着往下看代码。

<queued_spin_lock_slowpath()>

     val = atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_PENDING_MASK));

▲图1.9 CPU3加入锁的争用中

atomic_cond_read_acquire()函数读取lock->val的值。因为CPU2没有前继节点,所以它能直接调用atomic_cond_read_acquire()函数来读取lock->val的值。若CPU3也直接读取lock->val的值,那么说明CPU3对应的mcs_spinlock节点已经在MCS等待队列头了,并且获取了MCS锁(node->locked),但获取了MCS锁不代表可以获取自旋锁。因此,还要等待锁持有者释放锁,即锁持有者把lock->val中的locked域和pending域清零。

<queued_spin_lock_slowpath()>

locked:
     if ((val & _Q_TAIL_MASK) == tail) {
         if (atomic_try_cmpxchg_relaxed(&lock->val, &val, _Q_LOCKED_VAL))
             goto release; 
     }

     set_locked(lock);

     if (!next)
         next = smp_cond_load_relaxed(&node->next, (VAL));

     arch_mcs_spin_unlock_contended(&next->locked);

release:

     __this_cpu_dec(qnodes[0].mcs.count);
}

CPU2运行到标签处locked处,说明CPU0已经释放了该锁,lock->val中的locked域和pending域的值都被清零。

接下来,判断当前节点是否为MCS等待队列的唯一的节点。为什么当前lock->tail的值和当前CPU获取的tail值相等,即表示MCS等待队列中只有一个节点呢?

假设这时CPU3加入了该锁的争用中,那么CPU3在执行queued_spin_lock_slowpath()函数时,在tail域中,tail_idx=0,tail_cpu=4,并且会把tail值原子地设置到lock->val中。这里判断出lock->tail和CPU2的tail值不一样,因为CPU3已加入MCS等待队列中。

既然MCS等待队列中已经没有其他等待者了,那么通过atomic_cmpxchg_relaxed()函数来原子地比较并设置lock->val为1。这种情况下,在lock->val中,locked=1,pending=0,tail_idx=0,tail_cpu=0。

如果MCS等待队列中还有其他等待者,那么直接设置lock->locked域为_Q_LOCKED_VAL,表示成功持有锁。这种情况下,在 lock->val中,locked=1,pending=0,tail_idx=0,tail_cpu=3。

smp_cond_load_relaxed()函数处理后继节点被删除的情况。

arch_mcs_spin_unlock_contended()会把锁传递给后继节点。

最后,CPU2成功获取自旋锁并释放mcs_spinlock节点,具体过程如图1.10所示。

▲图1.10 CPU2获取自旋锁的过程

要释放排队自旋锁,只要原子地把lock->val值减_Q_LOCKED_VAL即可。注意,这时lock中的其他域(如pending域或者tail_cpu域)可能还有值。

static __always_inline void queued_spin_unlock(struct qspinlock *lock)
{
     (void)atomic_sub_return_release(_Q_LOCKED_VAL, &lock->val);
}

在中速申请通道的代码里有如下代码清单,为了方便叙述将其分成代码段A、B、C和D。

<kernel/locking/qspinlock.c>

void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
{
    ...
     if (val & ~_Q_LOCKED_MASK)   //代码段A
         goto queue;

     val = queued_fetch_set_pending_acquire(lock);  //代码段B

     if (unlikely(val & ~_Q_LOCKED_MASK)) {        //代码段C
          if (!(val & _Q_PENDING_MASK))             //代码段D
              clear_pending(lock);
          goto queue;
     }
     ...
queue:
    ...
}

读者可能对上述代码逻辑有疑问。

这里有一个比较复杂且特殊的场景需要考虑。假设CPU0~CPU3同时争用这个自旋锁,如图1.11所示。

T0时刻,CPU0率先获取锁,此时锁的三元组的值为{0, 0, 1}。

T1时刻,CPU3来申请该锁,运行到代码段A时发现锁中的pending域和tail域都为0。

T2时刻,CPU3发生中断,跳转到中断处理程序。

T3时刻,CPU1来申请该锁,运行到代码段B时,设置了该锁的pending域,CPU1变成了第一顺位继承者。此时,锁的三元组的值为{0, 1, 1},锁的旧值为{0, 0, 1}。

T4时刻,CPU2也来申请该锁,运行到代码段A时发现pending域置 1 了,因此跳转到queue标签处,加入MCS的等待队列。这时,锁的三元组的值为{3, 1, 1}。

T5时刻,CPU1在中速申请通道里自旋等待CPU0释放锁。

T6时刻,CPU0释放锁,即locked域清零,此时,锁的三元组的值{3, 1, 0}。

T7时刻,CPU1把pending域清零并且设置locked域,成功获取锁。此时,锁的三元组的值为{3, 0, 1}。

T8时刻,CPU3中断返回,继续运行申请锁的代码。

T9时刻,CPU3运行到代码段B处,设置pending域。此时,锁的三元组的值为{3, 1, 1},锁的旧值为{3, 0, 1}。

T10时刻,CPU3运行到代码段C和代码段D处,发现此时的val满足判断条件,因此调用clear_pending()函数来使pending域清零并且跳转到queue标签处。

▲图1.11 自旋锁争用问题

综上所述,排队自旋锁实现的逻辑如图1.12所示,在本节描述的场景中,CPU0是锁持有者,CPU1为第一顺位继承者,CPU2和CPU3也是锁的申请者,CPU2和CPU3会在MCS等待队列中等待,而锁的三元组的值如图1.12所示。也许有读者会问:从宏观来看,系统中有成千上万个自旋锁,但是每个CPU只有唯一的mcs_spinlock节点,那么这些自旋锁怎么和这个唯一的mcs_spinlock节点[4]映射呢?其实从微观角度来看,同一时刻一个CPU只能持有一个自旋锁,其他CPU只是在自旋等待这个被持有的自旋锁,因此每个CPU上有一个mcs_spinlock节点就足够了。

▲图1.12 排队自旋锁实现的逻辑

排队自旋锁的特点如下。

信号量(semaphore)是操作系统中最常用的同步原语之一。自旋锁是一种实现忙等待的锁,而信号量则允许进程进入睡眠状态。简单来说,信号量是一个计数器,它支持两个操作原语,即P和V操作。P和V原指荷兰语中的两个单词,分别表示减少和增加,后来美国人把它改成down和up,现在Linux内核中也叫这两个名字。

信号量中经典的例子莫过于生产者和消费者问题,它是操作系统发展历史上经典的进程同步问题,最早由Dijkstra提出。假设生产者生产商品,消费者购买商品,通常消费者需要到实体商店或者网上商城购买商品。用计算机来模拟这个场景,一个线程代表生产者,另外一个线程代表消费者,缓冲区代表商店。生产者生产的商品被放置到缓冲区中以供应给消费者线程消费,消费者线程从缓冲区中获取物品,然后释放缓冲区。若生产者线程生产商品时发现没有空闲缓冲区可用,那么生产者必须等待消费者线程释放出一个空闲缓冲区。若消费者线程购买商品时发现商店没货了,那么消费者必须等待,直到新的商品生产出来。如果采用自旋锁机制,那么当消费者发现商品没货时,他就搬个凳子坐在商店门口一直等送货员送货过来;如果采用信号量机制,那么商店服务员会记录消费者的电话,等到货了通知消费者来购买。显然,在现实生活中,如果是面包这类很快可以做好的商品,大家愿意在商店里等;如果是家电等商品,大家肯定不会在商店里等。

semaphore数据结构的定义如下。

<include/linux/semaphore.h>

struct semaphore {
     raw_spinlock_t     lock;
     unsigned int       count;
     struct list_head   wait_list;
};

通常通过sema_init()函数进行信号量的初始化,其中__SEMAPHORE_INITIALIZER()宏会完成对semaphore数据结构的填充,val值通常设为1。

<include/linux/semaphore.h>

0 static inline void sema_init(struct semaphore *sem, int val)
1 {
2    static struct lock_class_key __key;
3    *sem = (struct semaphore) __SEMAPHORE_INITIALIZER(*sem, val);
4 }
5
6 #define __SEMAPHORE_INITIALIZER(name, n)                \
7 {                                \
8    .lock    = __RAW_SPIN_LOCK_UNLOCKED((name).lock),  \
9    .count    = n,                    \
10   .wait_list = LIST_HEAD_INIT((name).wait_list),   \
11}

下面来看down()函数。down()函数有如下一些变体。其中down()函数和down_interruptible()函数的区别在于,down_interruptible()函数在争用信号量失败时进入可中断的睡眠状态,而down()函数进入不可中断的睡眠状态。若down_trylock()函数返回0,表示成功获取锁;若返回1,表示获取锁失败。

void down(struct semaphore *sem);
int down_interruptible(struct semaphore *sem);
int down_killable(struct semaphore *sem);
int down_trylock(struct semaphore *sem);
int down_timeout(struct semaphore *sem, long jiffies);

接下来看down_interruptible()函数的实现。

<kernel/locking/semaphore.c>

0 int down_interruptible(struct semaphore *sem)
1 {
2    unsigned long flags;
3    int result = 0;
4
5    raw_spin_lock_irqsave(&sem->lock, flags);
6    if (likely(sem->count > 0))
7           sem->count--;
8    else
9           result = __down_interruptible(sem);
10   raw_spin_unlock_irqrestore(&sem->lock, flags);
11
12   return result;
13}

首先,第6~9行代码判断是否进入自旋锁的临界区。注意,后面的操作会临时打开自旋锁,若涉及对信号量中最重要的count的操作,需要自旋锁来保护,并且在某些中断处理函数中也可能会操作该信号量。由于需要关闭本地CPU中断,因此这里采用raw_spin_lock_irqsave()函数。当成功进入自旋锁的临界区之后,首先判断sem->count是否大于0。如果大于0,则表明当前进程可以成功地获得信号量,并将sem->count值减1,然后退出。如果sem->count小于或等于0,表明当前进程无法获得该信号量,则调用__down_interruptible()函数来执行睡眠操作。

static noinline int __sched __down_interruptible(struct semaphore *sem)
{
     return __down_common(sem, TASK_INTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT);
}

__down_interruptible()函数内部调用__down_common()函数来实现。state参数为TASK_INTERRUPTIBLE。timeout参数为MAX_SCHEDULE_TIMEOUT,是一个很大的LONG_MAX值。

<down_interruptible()->__down_interruptible()->__down_common()>

0 static inline int __sched __down_common(struct semaphore *sem, long state,
1 long timeout)
2 {
3    struct task_struct *task = current;
4    struct semaphore_waiter waiter;
5
6    list_add_tail(&waiter.list, &sem->wait_list);
7    waiter.task = task;
8    waiter.up = false;
9
10   for (;;) {
11         if (signal_pending_state(state, task))
12               goto interrupted;
13         if (unlikely(timeout <= 0))
14               goto timed_out;
15         __set_task_state(task, state);
16         raw_spin_unlock_irq(&sem->lock);
17         timeout = schedule_timeout(timeout);
18         raw_spin_lock_irq(&sem->lock);
19         if (waiter.up)
20              return 0;
21   }
22
23 timed_out:
24   list_del(&waiter.list);
25   return -ETIME;
26 interrupted:
27   list_del(&waiter.list);
28   return -EINTR;
29 }

在第4行中,semaphore_waiter数据结构用于描述获取信号量失败的进程,每个进程会有一个semaphore_waiter数据结构,并且把当前进程放到信号量sem的成员变量wait_list链表中。接下来的for循环将当前进程的task_struct状态设置成TASK_INTERRUPTIBLE,然后调用schedule_timeout()函数主动让出CPU,相当于当前进程睡眠。注意schedule_timeout()函数的参数是MAX_SCHEDULE_TIMEOUT,它并没有实际等待MAX_SCHEDULE_TIMEOUT的时间。当进程再次被调度回来执行时,schedule_timeout()函数返回并判断再次被调度的原因。当waiter.up为true时,说明睡眠在wait_list队列中的进程被该信号量的UP操作唤醒,进程可以获得该信号量。如果进程被其他CPU发送的信号(Signal)或者由于超时等而唤醒,则跳转到timed_out或interrupted标签处,并返回错误代码。

down_interruptible()函数中,在调用__down_interruptible()函数时加入sem->lock的自旋锁,这是自旋锁的一个临界区。前面提到,自旋锁临界区中绝对不能睡眠,难道这是例外?仔细阅读__down_common()函数,会发现for循环在调用schedule_timeout()函数主动让出CPU时,先调用raw_spin_unlock_irq()函数释放了该锁,即调用schedule_timeout()函数时已经没有自旋锁了,可以让进程先睡眠,“醒来时”再补加一个锁,这是内核编程的常用技巧。

下面来看与down()函数对应的up()函数。

<kernel/locking/semaphore.c>

0 void up(struct semaphore *sem)
1 {
2    unsigned long flags;
3
4    raw_spin_lock_irqsave(&sem->lock, flags);
5    if (likely(list_empty(&sem->wait_list)))
6           sem->count++;
7    else
8           __up(sem);
9    raw_spin_unlock_irqrestore(&sem->lock, flags);
10 }

如果信号量上的等待队列(sem->wait_list)为空,则说明没有进程在等待该信号量,直接把sem->count加1即可。如果不为空,则说明有进程在等待队列里睡眠,需要调用__up()函数唤醒它们。

0 static noinline void __sched __up(struct semaphore *sem)
1 {
2    struct semaphore_waiter *waiter = list_first_entry(&sem->wait_list,
3                          struct semaphore_waiter, list);
4    list_del(&waiter->list);
5    waiter->up = true;
6    wake_up_process(waiter->task);
7 }

首先来看sem->wait_list中第一个成员waiter,这个等待队列是先进先出队列,在down()函数中通过list_add_tail()函数添加到等待队列尾部。 把waiter->up设置为true,把然后调用wake_up_process()函数唤醒waiter->task进程。在down()函数中,waiter->task进程醒来后会判断waiter->up变量是否为true,如果为true,则直接返回0,表示该进程成功获取信号量。

信号量有一个有趣的特点,它可以同时允许任意数量的锁持有者。信号量初始化函数为sema_init(struct semaphore *sem, int count),其中count的值可以大于或等于1。当count大于1时,表示允许在同一时刻至多有count个锁持有者,这种信号量叫作计数信号量(counting semaphore);当count等于1时,同一时刻仅允许一个CPU持有锁,这种信号量叫作互斥信号量或者二进制信号量(binary semaphore)。在Linux内核中,大多使用count值为1的信号量。相比自旋锁,信号量是一个允许睡眠的锁。信号量适用于一些情况复杂、加锁时间比较长的应用场景,如内核与用户空间复杂的交互行为等。

在Linux内核中,除信号量以外,还有一个类似的实现叫作互斥锁(mutex)。信号量是在并行处理环境中对多个处理器访问某个公共资源进行保护的机制,互斥锁用于互斥操作。

信号量根据初始count的大小,可以分为计数信号量和互斥信号量。根据著名的洗手间理论,信号量相当于一个可以同时容纳N个人的洗手间,只要洗手间人不满,其他人就可以进去,如果人满了,其他人就要在外面等待。互斥锁类似于街边的移动洗手间,每次只能进去一个人,里面的人出来后才能让排队中的下一个人进去。既然互斥锁类似于count值等于1的信号量,为什么内核社区要重新开发互斥锁,而不是复用信号量的机制呢?

互斥锁最早是在Linux 2.6.16内核中由Red Hat Enterprise Linux的资深内核专家Ingo Molnar设计和实现的。信号量的count成员可以初始化为1,并且down()和up()函数也可以实现类似于互斥锁的功能,那为什么要单独实现互斥锁机制呢?Ingo Molnar认为,在设计之初,信号量在Linux内核中的实现没有任何问题,但是互斥锁相对于信号量要简单轻便一些。在锁争用激烈的测试场景下,互斥锁比信号量执行速度更快,可扩展性更好。另外,mutex数据结构的定义比信号量小。这些都是在互斥锁设计之初Ingo Molnar提到的优点。互斥锁上的一些优化方案(如自旋等待)已经移植到了读写信号量中。

下面来看mutex数据结构的定义。

<include/linux/mutex.h>

struct mutex {
     atomic_long_t          owner;
     spinlock_t          wait_lock;
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
     struct optimistic_spin_queue osq;
#endif
     struct list_head     wait_list;
};
#define MUTEX_FLAG_WAITERS     0x01
#define MUTEX_FLAG_HANDOFF     0x02
#define MUTEX_FLAG_PICKUP     0x04

#define MUTEX_FLAGS          0x07

互斥锁实现了乐观自旋(optimistic spinning)等待机制。准确地说,互斥锁比读写信号量更早地实现了自旋等待机制。自旋等待机制的核心原理是当发现锁持有者正在临界区执行并且没有其他优先级高的进程要调度时,当前进程坚信锁持有者会很快离开临界区并释放锁,因此与其睡眠等待,不如乐观地自旋等待,以减少睡眠唤醒的开销。在实现自旋等待机制时,内核实现了一套MCS锁机制来保证只有一个等待者自旋等待锁持有者释放锁。

互斥锁的初始化有两种方式,一种是静态使用DEFINE_MUTEX()宏,另一种是在内核代码中动态使用mutex_init()函数。

<include/linux/mutex.h>

#define DEFINE_MUTEX(mutexname) \
    struct mutex mutexname = __MUTEX_INITIALIZER(mutexname)

#define __MUTEX_INITIALIZER(lockname) \
         { .owner = ATOMIC_LONG_INIT(0) \
         , .wait_lock = __SPIN_LOCK_UNLOCKED(lockname.wait_lock) \
         , .wait_list = LIST_HEAD_INIT(lockname.wait_list)  }

下面来看mutex_lock()函数是如何实现的。

<kernel/locking/mutex.c>

void __sched mutex_lock(struct mutex *lock)
{
     might_sleep();

     if (!__mutex_trylock_fast(lock))
        __mutex_lock_slowpath(lock);
}

__mutex_trylock_fast()函数判断是否可以快速获取锁。若不能通过快速通道获取锁,那么要进入慢速通道——mutex_lock_slowpath()。

<kernel/locking/mutex.c>

static __always_inline bool __mutex_trylock_fast(struct mutex *lock)
{
     unsigned long curr = (unsigned long)current;
     unsigned long zero = 0UL;

     if (atomic_long_try_cmpxchg_acquire(&lock->owner, &zero, curr))
         return true;

     return false;
}

__mutex_trylock_fast()函数实现的重点是atomic_long_try_cmpxchg_acquire()函数。如果以cmpxchg()函数的语义来理解,会得出错误的结论。比如,当lock->owner和zero相等时,说明lock这个锁没有被持有,那么可以成功获取锁,把当前进程的task_struct->curr的值赋给lock->owner,然后函数返回lock->owner的旧值,也就是0。这时if判断语句应该判断atomic_long_try_cmpxchg_acquire ()函数是否返回0才对。但是在Linux 5.0内核的代码里和我们想的完全相反,那是怎么回事呢?

细心的读者可以通过翻阅Linux内核的git日志信息找到答案。在Linux 4.18内核中有一个优化的补丁。锁的子系统维护者Peter Zijlstra通过比较反汇编代码发现在x86_64架构下使用try_cmpxchg()来代替cmpxchg()函数可以少执行一次test指令。try_cmpxchg()的函数实现如下。

<include/linux/atomic.h>

#define __atomic_try_cmpxchg(type, _p, _po, _n)             \
({                                  \
     typeof(_po) __po = (_po);                 \
     typeof(*(_po)) __r, __o = *__po;              \
     __r = atomic_cmpxchg##type((_p), __o, (_n));            \
     if (unlikely(__r != __o))                   \
          *__po = __r;                       \
     likely(__r == __o);                     \
})

try_cmpxchg()函数的核心还是调用cmpxchg()函数,但是返回值发生了变化,它返回一个判布尔值,表示cmpxchg()函数的返回值是否和第二个参数的值相等。

因此,当原子地判断出lock->owner字段为0时,说明锁没有被进程持有,那么可以进入快速通道以迅速获取锁,把当前进程的task_struct指针的值原子设置到lock->owner字段中。若lock->owner字段不为0,则说明该锁已经被进程持有,那么要进入慢速通道——mutex_lock_slowpath()。

__mutex_lock_slowpath()函数调用__mutex_lock_common()函数来实现,它实现在kernel/locking/mutex.c文件中。

<kernel/locking/mutex.c>

static int 
__mutex_lock_common(struct mutex *lock, long state,
             unsigned int subclass,
             struct lockdep_map *nest_lock, unsigned long ip,
             struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)

__mutex_lock_common()函数中的主要操作如下。

在第903行中,mutex_waiter数据结构用于描述一个申请互斥锁失败的等待者。

在第924行中,关闭内核抢占。

在第927~935行中,__mutex_trylock()函数尝试获取互斥锁。mutex_optimistic_spin()函数实现乐观自旋等待机制。稍后会详细分析mutex_optimistic_spin()函数的实现。

在第937行中,申请lock->wait_lock自旋锁。

在第941行中,第二次尝试申请互斥锁。互斥锁的实现有一个特点——不断地尝试申请锁。

在第954行中,为描述每一个失败的锁申请申请者准备了一个数据结构,即mutex_waiter 。__mutex_add_waiter()函数把waiter添加到锁的等待队列里。

在第972行中,waiter数据结构中的task成员指向当前进程的进程描述符。

在第974行中,设置当前进程的运行状态,这个state是传递进来的参数。使用mutex_lock()接口函数时,state为TASK_UNINTERRUPTIBLE;使用mutex_lock_interruptible()接口函数时,state为TASK_INTERRUPTIBLE。

在第975~1025行中,在这个for循环里,申请锁的进程会不断地尝试获取锁,然后不断让出CPU进入睡眠状态,然后不断被调度唤醒,直到能获取锁为止。具体步骤如下。

(1)尝试获取锁。

(2)释放lock->wait_lock自旋锁。

(3)schedule_preempt_disabled()函数的当前进程让出CPU,进入睡眠状态。

(4)进程再次被调度运行,也就是进程被唤醒。

(5)判断当前进程是否在互斥锁的等待队列中排在第一位。如果排在第一位,主动给锁持有者设置一个标记位(MUTEX_FLAG_HANDOFF),让它在释放锁的时候把锁的控制权传递给当前进程。

(6)__mutex_trylock()函数再一次尝试获取锁。

(7)若当前进程在等待队列里是第一个进程,那么会调用mutex_optimistic_spin()乐观自旋等待机制。

在第1026行中,成功获取锁。

在第1027行中,在acquired标签处,设置当前进程的进程状态为TASK_RUNNING。

在第1040行中,mutex_remove_waiter()函数把waiter从等待队列中删除。

在第1053行中,释放lock->wait_lock自旋锁。

在第1054行中,打开内核抢占。

在第1055行中,成功返回。

从上述分析可以知道申请互斥锁的流程,如图1.13所示,其中最复杂和最有意思的地方就是乐观自旋等待机制。

▲图1.13 申请互斥锁的流程

乐观自旋等待机制是互斥锁的一个新特性。乐观自旋等待机制其实就是判断锁持有者正在临界区执行时,可以断定锁持有者会很快退出临界区并且释放锁,与其进入睡眠队列,不如像自旋锁一样自旋等待,因为睡眠与唤醒的代价可能更高。乐观自旋等待机制主要实现在mutex_optimistic_spin()函数中。

<kernel/locking/mutex.c>

static __always_inline bool
mutex_optimistic_spin(struct mutex *lock, struct ww_acquire_ctx *ww_ctx,
              const bool use_ww_ctx, struct mutex_waiter *waiter)

第1个参数lock是要申请的互斥锁,第4个参数waiter是等待者描述符。mutex_optimistic_spin()函数中主要实现了如下操作。

在第614~632行中,处理当等待者描述符waiter为空时的情况。因为mutex_optimistic_spin()函数在__mutex_lock_common()函数里调用了两次,一次是在__mutex_lock_common()函数入口,另外一次是在for循环里(这时当前进程已经在互斥锁的等待队列里)。

在第622行中,mutex_can_spin_on_owner()函数用来判断是否需要进行乐观自旋等待。怎么判断呢?我们稍后会单独分析这个函数。

在第630行中,osq_lock()函数申请一个MCS锁。这里为了防止许多进程同时申请同一个互斥锁并且同时自旋的情况,凡是想乐观自旋等待的进程都要先申请一个MCS锁。在自旋锁还没有实现MCS算法之前,最早把MCS算法应用到Linux内核的就是互斥锁机制了。在Linux 5.0内核中,大部分架构的自旋锁机制已经实现了MCS算法,因此以后这里可以使用自旋锁来替代。

(1)__mutex_trylock_or_owner()函数尝试获取锁。通常情况下,锁持有者和当前进程(curr)不是同一个进程。在无法获取锁的情况下,会返回锁持有者的进程描述符owner。

(2)mutex_spin_on_owner()函数一直自旋等待锁持有者尽快释放锁。该函数中也有一个for循环,一直在不断地判断锁持有者是否发生了变化,直到锁持有者释放了该锁才会退出for循环。

(3)若锁持有者发生了变化,那么说明锁持有者释放锁,mutex_spin_on_owner()函数返回true。这时会执行第(1)步。

(4)__mutex_trylock_or_owner()函数会再一次尝试获取锁,这次的情况和第(1)步不一样了。这时,锁持有者已经释放了锁,也就是lock->owner字段为0,那么很容易通过CMPXCHG指令来获取锁,并且把当前进程的进程描述符指针的值赋值给lock->owner字段。

完成一次乐观自旋等待机制。

乐观自旋等待机制如图1.14所示。乐观自旋等待机制涉及几个关键技术。一是如何判断当前进程是否应该进行乐观自旋等待。这是在mutex_can_spin_on_owner()函数中实现的。二是如何判断锁持有者释放了锁。这是在mutex_spin_on_owner()函数里实现的。

接下来,分析重要的函数。

1.mutex_can_spin_on_owner()函数

判断当前进程是否正在临界区执行的方法很简单。当进程持有互斥锁时,通过lock->owner可以获取task_struct数据结构。如果task_struct->on_cpu为1,表示锁持有者正在执行,也就是正在临界区中执行。锁持有者释放该锁后,lock->owner为0。mutex_can_spin_on_owner()函数的代码片段如下。

static inline int mutex_can_spin_on_owner(struct mutex *lock)
{
     owner = __mutex_owner(lock);
     if (owner)
         retval = owner->on_cpu;
     return retval;
}

▲图1.14 乐观自旋等待机制

该函数只需要返回owner->on_cpu即可,若返回值为1,说明锁持有者正在临界区执行,当前进程适合进行乐观自旋等待。

2.__mutex_trylock_or_owner()函数

该函数主要尝试获取锁。这里需要考虑两种情况。第一种情况是锁持有者正在临界区执行,因此当前进程(curr)和lock->owner指向的进程描述符一定是不一样的。第二种情况是当锁持有者离开临界区并释放锁时,就可以通过CMPXCHG原子指令尝试获取锁。

3.mutex_spin_on_owner()函数

该函数的作用是一直判断锁持有者是否释放锁,判断条件为lock->owner指向的进程描述符是否发生了变化。该函数的代码片段如下。

static noinline
bool mutex_spin_on_owner(struct mutex *lock, struct task_struct *owner,
             struct ww_acquire_ctx *ww_ctx, struct mutex_waiter *waiter)
{
     bool ret = true;
     rcu_read_lock();
     while (__mutex_owner(lock) == owner) {
         barrier();
         if (!owner->on_cpu || need_resched()) {
             ret = false;
             break;
        }
        cpu_relax();
     }
     rcu_read_unlock();
     return ret;
}

有3种情况需要考虑退出该函数。

下面来看mutex_unlock()函数是如何解锁的。

<kernel/locking/mutex.c>

void __sched mutex_unlock(struct mutex *lock)
{
     if (__mutex_unlock_fast(lock))
        return;
     __mutex_unlock_slowpath(lock, _RET_IP_);
}

解锁与加锁一样有快速通道和慢速通道之分,解锁的快速通道是使用__mutex_unlock_fast()函数。

static bool __mutex_unlock_fast(struct mutex *lock)
{
     unsigned long curr = (unsigned long)current;

     if (atomic_long_cmpxchg_release(&lock->owner, curr, 0UL) == curr)
         return true;

     return false;
}

解锁依然使用CMPXCHG指令。当lock->owner的值和当前进程的描述符curr指针的值相等时,可以进行快速解锁。把lock->owner重新指定为0,返回lock->owner的旧值。若lock->owner的旧值等于curr,说明快速解锁成功;否则,只能使用函数__mutex_unlock_slowpath()。

__mutex_unlock_slowpath()函数中的主要操作如下。

在第1206行中,atomic_long_read()函数原子地读取lock->owner值。

(1)最理想的情况下,若互斥锁的等待队列里没有等待者,那直接解锁即可。

(2)若互斥锁的等待队列里有等待者,则需要唤醒等待队列中的进程。

(3)锁持有者被等待队列的第一个进程设置一个标志位(MUTEX_FLAG_HANDOFF),那么要求锁持有者优先把锁传递给第一个进程。注意,这种情形下,锁持有者不会先释放锁再给第一个进程加锁,而是通过cmpxchg指令把锁传递给第一个进程。

接下来,处理MUTEX_FLAG_HANDOFF的情况。

最后,唤醒进程。

假设系统有4个CPU(每个CPU一个线程)同时争用一个互斥锁,如图1.15所示。

▲图1.15 4个CPU同时争用一个互斥锁

T0时刻,CPU0率先获取了互斥锁,进入临界区,CPU0是锁的持有者。

T1时刻,CPU1也开始申请互斥锁,因为互斥锁已经被CPU0上的线程持有,CPU1发现锁持有者(即CPU0)正在临界区里执行,所以它采用乐观自旋等待机制。

T2时刻,CPU2也开始申请同一个锁,同理,CPU2也采用乐观自旋等待机制。

T3时刻,CPU0退出临界区,释放了互斥锁。CPU1察觉到锁持有者已经退出,很快申请到了锁,这时锁持有者变成了CPU1,CPU1进入了临界区。

T4时刻,CPU1在临界区里被抢占调度了或者自己主动睡眠了。若在采用乐观自旋等待机制时发现锁持有者没有在临界区里执行,那只好取消乐观自旋等待机制,进入睡眠 模式。

T5时刻,CPU3也开始申请互斥锁,它发现锁持有者没有在临界区里执行,不能采用乐观自旋等待机制,只好进入睡眠模式。

T6时刻,CPU1的线程被唤醒,重新进入临界区。

T7时刻,CPU1退出临界区,释放了锁。这时,CPU2也退出睡眠模式,获得了锁。

T8时刻,CPU2退出临界区,释放了锁。这时,CPU3也退出睡眠模式,获得了锁。

从互斥锁实现细节的分析可以知道,互斥锁比信号量的实现要高效很多。

正是因为互斥锁的简洁性和高效性,所以互斥锁的使用场景比信号量要更严格,使用互斥锁需要注意的约束条件如下。

在实际项目中,该如何选择自旋锁、信号量和互斥锁呢?

在中断上下文中可以毫不犹豫地使用自旋锁,如果临界区有睡眠、隐含睡眠的动作及内核接口函数,应避免选择自旋锁。在信号量和互斥锁中该如何选择呢?除非代码场景不符合上述互斥锁的约束中的某一条,否则可以优先使用互斥锁。

上述介绍的信号量有一个明显的缺点——没有区分临界区的读写属性。读写锁通常允许多个线程并发地读访问临界区,但是写访问只限制于一个线程。读写锁能有效地提高并发性,在多处理器系统中允许有多个读者同时访问共享资源,但写者是排他性的,读写锁具有如下特性。

读写锁有两种,分别是读者自旋锁类型和读者信号量。自旋锁类型的读写锁数据结构定义在include/linux/rwlock_types.h头文件中。

<include/linux/rwlock_types.h>

typedef struct {
    arch_rwlock_t raw_lock;
} rwlock_t;

<include/asm-generic/qrwlock_types.h>

typedef struct qrwlock {
     union {
         atomic_t cnts;
         struct {
             u8 wlocked;  
             u8 __lstate[3];
         };
    };
    arch_spinlock_t      wait_lock;
} arch_rwlock_t;

常用的函数如下。

和自旋锁一样,读写锁有关闭中断和下半部的版本。自旋锁类型的读写锁实现比较简单,本章重点关注信号量类型读写锁的实现。

rw_semaphore数据结构的定义如下。

<include/linux/rwsem.h>

struct rw_semaphore {
     long count;
     struct list_head wait_list;
     raw_spinlock_t wait_lock;
#ifdef CONFIG_RWSEM_SPIN_ON_OWNER
     struct optimistic_spin_queue osq; 
     struct task_struct *owner;
#endif
};

count成员的语义定义如下。

<include/asm-generic/rwsem.h>

#ifdef CONFIG_64BIT
# define RWSEM_ACTIVE_MASK     0xffffffffL
#else
# define RWSEM_ACTIVE_MASK     0x0000ffffL
#endif

#define RWSEM_UNLOCKED_VALUE     0x00000000L
#define RWSEM_ACTIVE_BIAS     0x00000001L
#define RWSEM_WAITING_BIAS    (-RWSEM_ACTIVE_MASK-1)
#define RWSEM_ACTIVE_READ_BIAS    RWSEM_ACTIVE_BIAS
#define RWSEM_ACTIVE_WRITE_BIAS   (RWSEM_WAITING_BIAS + RWSEM_ACTIVE_BIAS)

上述的宏定义看起来比较复杂,转换成十进制数会清晰一些,本章以32位处理器架构为例介绍读写信号量的实现,其实这和64位处理器的原理是一样的。

# define RWSEM_ACTIVE_MASK      (0xffff或者65535)
#define RWSEM_ACTIVE_BIAS       (1)
#define RWSEM_WAITING_BIAS      (0xffff 0000 或者 -65536)
#define RWSEM_ACTIVE_READ_BIAS  (1)
#define RWSEM_ACTIVE_WRITE_BIAS (0xffff 0001 或者 -65535)

count值和activity值一样,表示读者和写者的关系。

把count值当作十六进制或者十进制数不是开发人员的原本设计意图,其实应该把count值分成两个字段:Bit[0:31]为低字段,表示正在持有锁的读者或者写者的个数;Bit[32:63]为高字段,通常为负数,表示有一个正在持有或者处于pending状态的写者,以及等待队列中有读写者在等待。因此count值可以看作一个二元数,含义如下。

kernel/locking/rwsem-xadd.c代码中有如下一段关于count值含义的比较全面的介绍。

本章中把读者类型的信号量简称为读者锁。假设这样一个场景,在调用down_read()函数申请读者锁之前,已经有一个写者持有该锁,下面来看down_read()函数的实现。

<include/asm-generic/rwsem.h>
<down_read()->__down_read()>

static inline void __down_read(struct rw_semaphore *sem)
{
     if (unlikely(atomic_long_inc_return_acquire(&sem->count) <= 0))
         rwsem_down_read_failed(sem);
}

本场景中假设一个写者率先成功持有锁,那么count值被加上了RWSEM_ACTIVE_WRITE_BIAS,即二元数[−1, 1]。

首先,如果sem->count原子地加1后大于0,则成功地获取这个读者锁;否则,说明在这之前已经有一个写者持有该锁。count值加1后变成−65534(二元数[−1, 2]),因此要跳转到rwsem_down_read_failed()函数中处理获取读者锁失败的情况。

rwsem_down_read_failed()函数最终会调用__rwsem_down_read_failed_common()函数。

<kernel/locking/rwsem-xadd.c>

static inline struct rw_semaphore __sched *
__rwsem_down_read_failed_common(struct rw_semaphore *sem, int state)

该函数有两个参数。第一个参数sem是要申请的读写信号量,第二个参数state是进程状态,在本场景里,它为TASK_UNINTERRUPTIBLE。该函数实现在kernel/locking/rwsem-xadd.c文件中,其中的主要操作如下。

在第235行中,adjustment值初始化为−1。

在第239~240行中,rwsem_waiter数据结构描述一个获取读写锁失败的“失意者”。当前情景下获取读者锁失败,因此waiter.type类型设置为RWSEM_WAITING_FOR_READ,并且在第256行中把waiter添加到该锁等待队列的尾部。

在第243行中,如果该等待队列里没有进程,即sem->wait_list链表为空,adjustment值要加上RWSEM_WAITING_BIAS(即−65536或者二元数[−1, 0]),为什么等待队列中的第一个进程要加上RWSEM_WAITING_BIAS呢?RWSEM_WAITING_BIAS通常用于表示等待队列中还有正在排队的进程。持有锁和释放锁时对count的操作是成对出现的,当count值等于RWSEM_WAITING_BIAS时,表示当前已经没有活跃的锁,即没有进程持有锁,但有进程在等待队列中。假设等待队列为空,那么当前进程就是该等待队列上第一个进程,这里count值要加上RWSEM_WAITING_BIAS(−65536或者二元数[−1, 0]),表示等待队列中还有等待的进程。adjustment值等于−65537。

在第259行中,count值将变成RWSEM_ACTIVE_WRITE_BIAS+ RWSEM_WAITING_BIAS。用十进制来表示就是−131071(sem->count+adjustment, −65534−65537)。

在第267~269行中,根据两种情况调用__rwsem_mark_wake()函数去唤醒等待队列中的进程。

假设在第256行之后持有写者锁的进程释放了锁,那么sem->count的值会变成多少呢?sem->count的值将变成RWSEM_WAITING_BIAS,第267行代码中的判断语句(count == RWSEM_WAITING_BIAS)恰巧可以捕捉到这个变化,调用__rwsem_mark_wake()函数去唤醒在等待队列中睡眠的进程。

刚才推导count值的变化情况的前提条件是当前进程为等待队列上第一个读者,若等待队列上已经有读者呢?大家可以自行推导。

在第276~288行中,当前进程会在while循环中让出CPU,直到waiter.task被设置为NULL。在__rwsem_mark_wake()函数中被唤醒的读者会设置waiter.task为空,因此被唤醒的读者就可以成功获取读者锁。

接下来看__rwsem_mark_wake()函数。

<kernel/locking/rwsem-xadd.c>

static void __rwsem_mark_wake(struct rw_semaphore *sem,
                  enum rwsem_wake_type wake_type,
                  struct wake_q_head *wake_q)

调用__rwsem_mark_wake()函数时传递的第二个参数是RWSEM_WAKE_ANY。__rwsem_mark_wake()函数实现在kernel/locking/rwsem-xadd.c文件中,它的主要操作如下。

在第138行中,首先从sem->wait_list等待队列中取出第一个排队的waiter,等待队列是先进先出队列。

在第140~153行中,如果第一个等待者是写者,那么直接唤醒它即可,因为只能一个写者独占临界区,这具有排他性。在本场景中,第一个等待者是读者类型的RWSEM_WAITING_FOR_READ。

在第160~184行中,当前进程由于申请读者锁失败才进入了rwsem_down_read_failed()函数,恰巧有一个写者释放了锁。这里有一个关键点,如果另外一个写者开始来申请锁,那么会比较麻烦,在代码中把这个写者称为“小偷”。

(1)atomic_long_fetch_add()函数先下手为强,假装先申请一个读者锁,oldcount反映了sem->count的真实值。

(2)如果sem->count的真实值小于RWSEM_WAITING_BIAS(−65536),说明在这个间隙中有一个“小偷”偷走了写者锁。因为在调用__rwsem_mark_wake()函数时,sem->count的值为−65536,现在小于−65536,说明存在“小偷”。既然已经被写者抢先占有锁,那么无法再继续唤醒睡眠在等待队列中的读者。

在第192~217行中,遍历等待队列(sem->wait_list)中所有进程。

(1)如果等待队列中有读者也有写者,那么遇到写者就退出循环。

(2)统计排在等待队列最前面的读者个数woken。

(3)wake_q_add()函数把这些读者添加到等待队列里。

(4)把这些唤醒者的waiter->task字段设置为NULL。

全是读者或既有读者也有写者的情况下,等待队列如图1.16所示。如果读者3后面还有一个写者1,那么只能唤醒读者1~读者3。

▲图1.16 等待队列

综上所述,申请读者锁的流程如图1.17所示。

▲图1.17 申请读者锁的流程

下面来看释放读者锁的情况。

<kernel/locking/rwsem.c>
<up_read()->__up_read()>

static inline void __up_read(struct rw_semaphore *sem)
{
     long tmp;

     tmp = atomic_long_dec_return_release &sem->count);
     if (unlikely(tmp < -1 && (tmp & RWSEM_ACTIVE_MASK) == 0))
          rwsem_wake(sem);
}

获取读者锁时count加1,释放时自然就减1,它们是成对出现的。如果整个过程没有写者来干扰,那么所有读者锁释放完毕后count值应该是0。count变成负数,说明其间有写者出现,并且“悄悄地”处于等待队列中。下面调用rwsem_wake()函数以唤醒这些“不速之客”。

<kernel/locking/rwsem-xadd.c>

struct rw_semaphore *rwsem_wake(struct rw_semaphore *sem)
{
     if (!list_empty(&sem->wait_list))
         __rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);

     wake_up_q(&wake_q);
     return sem;
}

这里调用__rwsem_mark_wake()函数以唤醒等待队列中的写者。

写者通常调用down_write()函数获取写者类型的信号量,本书简称写者锁。

<kernel/locking/rwsem.c>

void __sched down_write(struct rw_semaphore *sem)
{
     might_sleep();
     __down_write();
     rwsem_set_owner(sem);
}

down_write()函数在成功获取写者锁后会调用rwsem_set_owner()函数,使sem->owner成员指向task_struct数据结构,这个特性需要在配置内核时打开CONFIG_RWSEM_SPIN_ON_OWNER选项。假设进程A首先持有sem写者锁,进程B也想获取该锁,那么进程B理应在等待队列中等待,但是打开RWSEM_SPIN_ON_OWNER选项可以让进程B一直在门外自旋,等待进程A把锁释放,这样可以避免进程在等待队列中睡眠与唤醒等一系列开销。比较常见的例子是内存管理的数据结构mm_struct,其中有一个全局的读/写锁mmap_sem,它用于保护进程地址空间中的一个读写信号量,很多与内存相关的系统调用需要这个锁来保护,如sys_mprotect、sys_madvise、sys_brk、sys_mmap和缺页中断处理程序do_page_fault等。如果进程A有两个线程,线程1调用mprotect系统调用时,在内核空间通过down_write()函数成功获取了mm_struct-> mmap_sem写者锁,那么线程2调用brk系统调用时,也会调用down_write()函数,尝试获取mm_struct->mmap_sem锁,由于线程1还没释放该锁,因此线程2会自旋等待。线程2坚信线程1会很快释放mm_struct->mmap_sem锁,线程2没必要先睡眠后被叫醒,因为这个过程存在一定的开销。该过程如图1.18所示。

▲图1.18 线程2先睡眠后被唤醒的过程

回到down_write()函数。

<down_write()->__down_write()>

static inline void __down_write(struct rw_semaphore *sem)
{
     long tmp;

     tmp = atomic_long_add_return_acquire(RWSEM_ACTIVE_WRITE_BIAS,
                     &sem->count);
     if (unlikely(tmp != RWSEM_ACTIVE_WRITE_BIAS))
           rwsem_down_write_failed(sem);
}

首先sem->count要加上RWSEM_ACTIVE_WRITE_BIAS(−65535)。以上述的线程2为例,增加完RWSEM_ACTIVE_WRITE_BIAS后,count的值变为−101070,明显不符合成功获取写者锁的条件,跳转到rwsem_down_write_failed()函数中继续处理。

<kernel/locking/rwsem-xadd.c>

static inline struct rw_semaphore *
__rwsem_down_write_failed_common(struct rw_semaphore *sem, int state)

该函数有两个参数,第一个参数sem是要申请的读/写信号量,第二个参数state是进程状态(在本场景里,它为TASK_UNINTERRUPTIBLE)。该函数实现在kernel/locking/rwsem-xadd.c文件中,其中的主要操作如下。

在第517行中,rwsem_waiter数据结构描述一个获取读写锁失败的“失意者”。当前情景下,获取写者锁失败,因此把waiter.type类型设置为RWSEM_WAITING_FOR_WRITE,并且把waiter添加到该锁的等待队列的尾部。

在第522行中,因为atomic_long_sub_return()函数没有成功获取锁,所以这里减小刚才增加的RWSEM_ACTIVE_WRITE_BIAS值。

在第525行中,rwsem_optimistic_spin()函数表示乐观自旋等待机制在读写信号量中的应用。rwsem_optimistic_spin()函数一直在门外自旋,有机会就获取锁。若成功获取锁,则直接返回。我们稍后会分析该函数。

在第528行中,假设没有成功获取锁,只能通过down_write()函数走慢速通道。和down_read()函数类似,都需要把当前进程放入信号量的等待队列中,此时waiter的类型是RWSEM_WAITING_FOR_WRITE。

在第544行中,waiting说明等待队列中有其他等待者。若等待队列为空,说明该waiter就是等待队列中第一个等待者,要加上RWSEM_WAITING_BIAS。

在第552行中,如果count大于RWSEM_WAITING_BIAS(−65536),说明现在没有活跃的写者锁,即写者已经释放了锁,但是有读者已经成功抢先获取锁,因此调用__rwsem_mark_wake()函数唤醒排在等待队列前面的读者锁。这个判断条件是怎么推导出来的呢?

如图1.19所示,系统初始化时count=0,在T0时刻,写者1成功持有锁,count= −65535(加上RWSEM_ACTIVE_WRITE_BIAS)。在T1时刻,读者1申请锁失败,它将被添加到等待队列中,由于它是等待队列中第一个成员,因此count要加上RWSEM_WAITING_BIAS,count= −65535−65536= −131071。在T2时刻,写者2申请锁,自旋失败。在T3时刻,写者1释放锁,count变成−65536。在T4时刻,读者2抢先获取锁,count要加上RWSEM_ACTIVE_BIAS,count变成−65535。在T5时刻,写者2运行到rwsem_down_write_failed()函数的第23行代码处,判断出count大于RWSEM_WAITING_BIAS(−65536),并唤醒排在等待队列前面的读者,这是该判断条件的推导过程。

▲图1.19 写者和读者争用锁

当前进程会调用schedule()函数让出CPU。当重新调度当前进程时,会判断读者是否释放了锁。如果所有的读者都释放了锁,那么count的值应该为RWSEM_WAITING_BIAS(−65536),rwsem_try_write_lock()函数依此来判断并且尝试获取写者锁。

综上所述,申请写者锁的流程如图1.20所示。

1.rwsem_optimistic_spin()函数

乐观自旋等待机制不仅应用在互斥锁实现中,还应用在写者锁中。rwsem_optimistic_spin()函数实现在kernel/locking/rwsem-xadd.c文件中。

<kernel/locking/rwsem-xadd.c>

static bool rwsem_optimistic_spin(struct rw_semaphore *sem)

▲图1.20 申请写者锁

rwsem_optimistic_spin()函数中的主要操作如下。

(1)preempt_disable()函数关闭抢占。

(2)rwsem_can_spin_on_owner()函数判断当前状态是否适合做乐观自旋等待。如果锁持有者是写者锁,并且锁持有者正在临界区里执行,那么此时是做乐观自旋等待的最佳时机。

(3)osq_lock()函数获取OSQ锁,这和互斥锁机制相同。

(4)while循环实现一个自旋的动作。自旋的前提是另外一个写者锁抢先获取了锁(sem-> owner指向写者的task_struct数据结构),并且该写者线程正在临界区中执行,因此期待写者可以尽快释放锁,从而避免进程切换的开销。

2.rwsem_can_spin_on_owner()函数

rwsem_can_spin_on_owner()函数的主要代码片段如下。

<kernel/locking/rwsem-xadd.c>

static inline bool rwsem_can_spin_on_owner(struct rw_semaphore *sem)
{
     struct task_struct *owner;
     bool ret = true;

     owner = READ_ONCE(sem->owner);
     if (owner) {
          ret = is_rwsem_owner_spinnable(owner) &&
               owner_on_cpu(owner);
     }
     return ret;
}

这里能做乐观自旋等待的条件有两个。

3.is_rwsem_owner_spinnable()函数

is_rwsem_owner_spinnable()函数的主要代码片段如下。

<kernel/locking/rwsem.h>

static inline bool is_rwsem_owner_spinnable(struct task_struct *owner)
{
     return !((unsigned long)owner & RWSEM_ANONYMOUSLY_OWNED);
}

owner 字段的最低两位分别是RWSEM_READER_OWNED 和 RWSEM_ANONYMOUSLY_OWNED。其中RWSEM_READER_OWNED表示锁被读者类型的信号量持有。RWSEM_ANONYMOUSLY_WNED表示这个锁可能被匿名者持有。对于读者类型的信号量,当进程成功申请了这个锁时,我们会设置锁持有者的进程描述符到owner字段并且设置这两位。当我们释放这个读者类型的信号量时,我们会对owner字段进行清除,但是这两位不会清除。因此,对于一个已经释放的或者正在持有的读者类型的信号量,owner字段的这两位都是已设置的。这样是做的目的是方便系统调试。

因此,当我们知道owner字段不为空并且没有设置RWSEM_ANONYMOUSLY_OWNED时,说明一个写者正在持有这个锁。

4.rwsem_spin_on_owner()函数

rwsem_spin_on_owner()函数的代码片段如下。

<kernel/locking/rwsem-xadd.c>

static noinline bool rwsem_spin_on_owner(struct rw_semaphore *sem)

rwsem_spin_on_owner()函数中的while循环一直在自旋等待,并且监控sem->owner值是否有修改。两种情况下会退出while循环:一是sem->owner值被修改,通常写者释放了锁;二是need_resched()函数判断当前进程是否需要调度出去。如果当前进程有调度出去的需求,那么一直自旋等待会很浪费CPU。另外,为了缩短系统的延时,会退出循环。owner_on_cpu()函数通过进程描述符的on_cpu字段来判断持有者是否正在执行,若持有者没有正在执行,也没有必要自旋等待了。

5.rwsem_try_write_lock_unqueued()函数

rwsem_try_write_lock_unqueued()函数是乐观自旋等待机制中最重要的一环,它不断尝试获取锁,该函数的代码片段如下。

<kernel/locking/rwsem-xadd.c>

static inline bool rwsem_try_write_lock_unqueued(struct rw_semaphore *sem)
{
     long old, count = atomic_long_read(&sem->count);

     while (true) {
         if (!(count == 0 || count == RWSEM_WAITING_BIAS))
               return false;

         old = atomic_long_cmpxchg_acquire(&sem->count, count,
                      count + RWSEM_ACTIVE_WRITE_BIAS);
         if (old == count) {
             rwsem_set_owner(sem);
             return true;
         }

         count = old;
     }
}

如果写者释放了锁,那么该锁的sem->count的值应该是0或RWSEM_WAITING_BIAS。然后使用CMPXCHG指令获取锁。为什么要使用cmpxchg()函数获取锁,而不直接使用赋值的方式呢?这是因为在当前进程成功获取锁之前其他进程可能已获取了锁,好比“螳螂捕蝉,黄雀在后”。CMPXCHG是原子操作的,如果sem->count的值和count值相等,说明其间没有“黄雀在后”,这才放心获取锁。

释放写者锁和释放读者锁类似。

<kernel/locking/rwsem.c>

void up_write(struct rw_semaphore *sem)
{
     rwsem_clear_owner(sem);
     __up_write(sem);
}

释放写者锁时有一个很重要的动作是调用rwsem_clear_owner()函数清除sem->owner,也就是使owner字段指向NULL。

static inline void __up_write(struct rw_semaphore *sem)
{
     if (unlikely(atomic_long_sub_return(RWSEM_ACTIVE_WRITE_BIAS,
                    (atomic_long_t *)&sem->count) < 0))
          rwsem_wake(sem);
}

释放锁需要count减去RWSEM_ACTIVE_WRITE_BIAS,相当于在数值上加65535。如果count值仍然是负数,说明等待队列里有进程在睡眠,那么调用rwsem_wake()函数去唤醒它们。

读写信号量在内核中应用广泛,特别是在内存管理中,除了前面介绍的mm->mmap_sem外,还有RMAP系统中的anon_vma->rwsem、address_space数据结构中的i_mmap_rwsem等。

再次总结读写信号量的重要特性。

RCU的全称Read-Copy-Update,它是Linux内核中一种重要的同步机制。Linux内核中已经有了原子操作、自旋锁、读写自旋锁、读写信号量、互斥锁等锁机制,为什么要单独设计一个比它们复杂得多的新机制呢?回忆自旋锁、读写信号量和互斥锁的实现,它们都使用了原子操作指令,即原子地访问内存,多CPU争用共享的变量会让高速缓存一致性变得很糟,使得性能下降。以读写信号量为例,除了上述缺点外,读写信号量还有一个致命弱点,它允许多个读者同时存在,但是读者和写者不能同时存在。因此RCU机制要实现的目标是,读者线程没有同步开销,或者说同步开销变得很小,甚至可以忽略不计,不需要额外的锁,不需要使用原子操作指令和内存屏障指令,即可畅通无阻地访问;而把需要同步的任务交给写者线程,写者线程等待所有读者线程完成后才会把旧数据销毁。在RCU中,如果有多个写者同时存在,那么需要额外的保护机制。RCU机制的原理可以概括为RCU记录了所有指向共享数据的指针的使用者,当要修改共享数据时,首先创建一个副本,在副本中修改。所有读者线程离开读者临界区之后,指针指向修改后的副本,并且删除旧数据。

RCU的一个重要的应用场景是链表,链表可以有效地提高遍历读取数据的效率。读取链表成员数据时通常只需要rcu_read_lock()函数,允许多个线程同时读取该链表,并且允许一个线程同时修改链表。那为什么这个过程能保证链表访问的正确性呢?

在读者遍历链表时,假设另外一个线程删除了一个节点。删除线程会把这个节点从链表中移出,但不会直接销毁它。RCU会等到所有读线程读取完成后,才销毁这个节点。

RCU提供的接口如下。

下面通过关于 RCU的一个 简单例子来理解上述接口的含义,该例子来源于内核源代码中的Documents/RCU/whatisRCU.txt,并且省略了一些异常处理情况。

<关于RCU的一个简单例子>

0 #include <linux/kernel.h>
1 #include <linux/module.h>
2 #include <linux/init.h>
3 #include <linux/slab.h>
4 #include <linux/spinlock.h>
5 #include <linux/rcupdate.h>
6 #include <linux/kthread.h>
7 #include <linux/delay.h>
8
9 struct foo {
10   int a;
11   struct rcu_head rcu;
12 };
13 
14 static struct foo *g_ptr;
15 static void myrcu_reader_thread(void *data) //读者线程
16 {
17   struct foo *p = NULL;
18
19   while (1) {
20       msleep(200);
21       rcu_read_lock();
22       p = rcu_dereference(g_ptr);
23       if (p)
24            printk("%s: read a=%d\n", __func__, p->a);
25       rcu_read_unlock();
26   }
27 }
28 
29 static void myrcu_del(struct rcu_head *rh)
30 {
31   struct foo *p = container_of(rh, struct foo, rcu);
32   printk("%s: a=%d\n", __func__, p->a);
33   kfree(p);
34 }
35 
36 static void myrcu_writer_thread(void *p) //写者线程
37 {
38   struct foo *new;
39   struct foo *old;
40   int value = (unsigned long)p;
41
42   while (1) {
43        msleep(400);
44        struct foo *new_ptr = kmalloc(sizeof (struct foo), GFP_KERNEL);
45        old = g_ptr;
46        printk("%s: write to new %d\n", __func__, value);
47        *new_ptr = *old;
48        new_ptr->a = value;
49        rcu_assign_pointer(g_ptr, new_ptr);
50        call_rcu(&old->rcu, myrcu_del);
51        value++;
52   }
53 }
54 
55 static int __init my_test_init(void)
56 {
57   struct task_struct *reader_thread;
58   struct task_struct *writer_thread;
59   int value = 5;
60
61   printk("BEN: my module init\n");
62   g_ptr = kzalloc(sizeof (struct foo), GFP_KERNEL);
63
64   reader_thread = kthread_run(myrcu_reader_thread, NULL, "rcu_reader");
65   writer_thread = kthread_run(myrcu_writer_thread, (void *)(unsigned long)value,  
"rcu_writer");
66
67   return 0;
68 }
69 static void __exit my_test_exit(void)
70 {
71   printk("goodbye\n");
72   if (g_ptr)
73        kfree(g_ptr);
74 }
75 MODULE_LICENSE("GPL");
76 module_init(my_test_init);

该例子的目的是通过RCU机制保护my_test_init()函数分配的共享数据结构g_ptr,并创建一个读者线程和一个写者线程来模拟同步场景。

对于myrcu_reader_thread,注意以下几点。

对于myrcu_writer_thread,注意以下几点。

上述过程如图1.21所示。

▲图1.21 RCU时序

在所有的读访问完成之后,内核可以释放旧数据,关于何时释放旧数据,内核提供了两个接口函数——synchronize_rcu()和call_rcu()。

本节重点介绍经典RCU和Tree RCU的实现,可睡眠RCU和可抢占RCU留给读者自行学习。RCU里有两个很重要的概念,分别是宽限期(Grace Period,GP)和静止状态(Quiescent State,QS)。

RCU在开发Linux 2.5内核时已经被添加到Linux内核中,但是在Linux 2.6.29内核之前的RCU通常称为经典RCU(Classic RCU)。经典RCU在大型系统中遇到了性能问题,后来在Linux 2.6.29内核中IBM的内核专家Paul E. McKenney提出了Tree RCU的实现,Tree RCU也称为Hierarchical RCU[5]

经典RCU的实现在超级大系统中遇到了问题,特别是有些系统的CPU内核超过了1024个,甚至达到4096个。经典RCU在判断是否完成一次GP时采用全局的cpumask位图。如果每位表示一个CPU,那么在1024个CPU内核的系统中,cpumask位图就有1024位。每个CPU在GP开始时要设置位图中对应的位,GP结束时要清除相应的位。全局的cpumask位图会导致很多CPU竞争使用,因此需要自旋锁来保护位图。这样导致锁争用变得很激烈,激烈程度随着CPU的个数线性递增。以4核CPU为例,经典RCU的实现如图1.22所示。

▲图1.22 4核CPU的经典RCU实现

而Tree RCU的实现巧妙地解决了cpumask位图竞争锁的问题。以上述的4核CPU为例,假设Tree RCU以两个CPU为1个rcu_node,这样4个CPU被分配到两个rcu_node上,使用另外1个rcu_node来管理这两个rcu_node。如图1.23所示,节点1管理cpu0和cpu1,节点2管理cpu2和cpu3,而节点0是根节点,管理节点1和节点2。每个节点只需要两位的位图就可以管理各自的CPU或者节点,每个节点都通过各自的自旋锁来保护相应的位图。

▲图1.23 4核CPU的Tree RCU

假设4个CPU都经历过一个QS,那么4个CPU首先在Level0的节点1和节点2上修改位图。对于节点1或者节点2来说,只有两个CPU竞争锁,这比经典RCU上的锁争用要减少一半。当Level0中节点1和节点2上的位图都被清除干净后,才会清除上一级节点的位图,并且只有最后清除节点的CPU才有机会尝试清除上一级节点的位图。因此对于节点0来说,还是两个CPU争用锁。整个过程中只有两个CPU争用一个锁。这类似于足球比赛,进入四强的4支球队被分成上下半区,每个半区有两支球队,只有半决赛获胜的球队才能进入决赛。

前面介绍了Linux内核中常用的锁机制,如原子操作、自旋锁、信号量、读写信号量、互斥锁以及RCU等。这些锁的机制都有自己的优势、劣势以及各自的应用范围。

Linux内核中锁机制的特点和使用规则如表1.5所示。

表1.5 Linux内核中锁机制的特点和使用规则

锁机制

特点

使用规则

原子操作

使用处理器的原子指令,开销小

临界区中的数据是变量、位等简单的数据结构

内存屏障

使用处理器内存屏障指令或GCC的屏障指令

读写指令时序的调整

自旋锁

自旋等待

中断上下文,短期持有锁,不可递归,临界区不可睡眠

信号量

可睡眠的锁

可长时间持有锁

读写信号量

可睡眠的锁,多个读者可以同时持有锁,同一时刻只能有一个写者,读者和写者不能同时存在

程序员界定出临界区后读/写属性才有用

互斥锁

可睡眠的互斥锁,比信号量快速和简洁,实现自旋等待机制

同一时刻只有一个线程可以持有互斥锁,由锁持有者负责解锁,即在同一个上下文中解锁,不能递归持有锁,不适合内核和用户空间复杂的同步场景

RCU

读者持有锁没有开销,多个读者和写者可以同时共存,写者必须等待所有读者离开临界区后才能销毁相关数据

受保护资源必须通过指针访问,如链表等

前面介绍内存管理时基本上忽略了锁的讨论,其实锁在内存管理中有着很重要的作用,下面以内存管理为例介绍锁的使用。在rmap.c文件的开始,列举了内存管理模块中锁的调用关系图。

<mm/rmap.c>

/*
 * Lock ordering in mm:
 *
 * inode->i_mutex  (while writing or truncating, not reading or faulting)
 *   mm->mmap_sem
 *    page->flags PG_locked (lock_page)
 *     mapping->i_mmap_rwsem
 *      anon_vma->rwsem
 *       mm->page_table_lock or pte_lock
 *        zone->lru_lock (in mark_page_accessed, isolate_lru_page)
 *        swap_lock (in swap_duplicate, swap_info_get)
 *          mmlist_lock (in mmput, drain_mmlist and others)
 *          mapping->private_lock (in __set_page_dirty_buffers)
 *          inode->i_lock (in set_page_dirty's __mark_inode_dirty)
 *          bdi.wb->list_lock (in set_page_dirty's __mark_inode_dirty)
 *            sb_lock (within inode_lock in fs/fs-writeback.c)
 *            mapping->tree_lock (widely used, in set_page_dirty,
 *                    in arch-dependent flush_dcache_mmap_lock,
 *                    within bdi.wb->list_lock in __sync_single_inode)
 *
 * anon_vma->rwsem,mapping->i_mutex   (memory_failure, collect_procs_anon)
 *   ->tasklist_lock
 *     pte map lock
 */

mmap_sem是mm_struct数据结构中一个读写信号量成员,用于保护进程地址空间。在brk、mmap、mprotect、mremap、msync等系统调用中都采用down_write(&mm->mmap_sem)来保护VMA,防止多个进程同时修改进程地址空间。

下面举内存管理中KSM的一个例子。在内存管理中描述进程地址空间的数据结构是VMA,新创建的VMA会加入红黑树中,进程在退出时,exit_mmap()函数或unmmap()函数都可能会销毁VMA,因此新建和销毁VMA是异步的。如图1.24所示,在KSM中,ksmd内核线程会定期扫描进程中的VMA,然后从VMA中找出可用的匿名页面。假设CPU0在扫描某个VMA时,另外一个进程在CPU1上恰巧释放了这个VMA,那么KSM是否有问题,follow_page()函数会触发oops错误吗?

▲图1.24 KSM和do_unmmap对VMA的争用

事实上,Linux内核运行得很好,并没有出现上述问题。原来每个进程的数据结构mm_struct中有一个读写锁mmap_sem,这个锁对于进程本身来说相当于一个全局的读写锁,内核中通常利用该锁来保护进程地址空间。大家可以仔细阅读内核代码,凡是涉及VMA的扫描、插入、删除等操作,都会使用mmap_sem锁来进行保护。

回到刚才的例子,KSM在扫描进程中的VMA时调用down_read(&mm->mmap_sem)函数来申请读者锁以进行保护,为什么申请读者锁呢?因为KSM扫描进程中的VMA时不会修改VMA的内容,所以使用读者锁就足够了。另外,销毁VMA的函数都需要申请down_write (&mm->mmap_sem)写者锁来保护,所以它们之间不会产生冲突。

如图1.25所示,在T0时刻,KSM内核线程已经成功持有mmap_sem读者锁。在T1时刻,进程在CPU1上执行do_unmmap()函数以销毁KSM正在操作的VMA,它必须先申请mmap_sem写者锁,但由于KSM内核线程已经率先持有读者锁,因此执行do_unmmap()函数的进程只能在队列中等待。

▲图1.25 KSM和do_unmmap之间的争用

那么何时该用读者锁,何时该用写者锁呢?这需要程序员来判断被保护的临界区的内容是只读的还是可写的,锁不能代替程序员考虑这些问题。

page_table_lock是mm_struct数据结构中一个自旋锁类型的成员,主要用于保护进程的页表。在内存管理代码中,每当需要修改进程的页表时,都需要page_table_lock锁。以do_anonymous_page()函数为例。

<mm/memory.c>

static vm_fault
do_anonymous_page(struct vm_fault*vmf)
 {
    spinlock_t *ptl;
    ...
    page_table = pte_offset_map_lock(mm, pmd, address, &ptl);
 setpte:
    set_pte_at(mm, address, page_table, entry);
    ...
unlock:
   pte_unmap_unlock(page_table, ptl);
   return 0;
}

在调用set_pte_at()设置进程页表时,需要使用pte_offset_map_lock()宏来获取page_table_lock自旋锁,防止其他CPU同时修改进程的页表。

<include/linux/mm.h>

#define pte_offset_map_lock(mm, pmd, address, ptlp)  \
({                           \
    spinlock_t *__ptl = pte_lockptr(mm, pmd);  \
    pte_t *__pte = pte_offset_map(pmd, address);     \
    *(ptlp) = __ptl;              \
    spin_lock(__ptl);             \
    __pte;                        \
})

pte_offset_map_lock()宏最终仍然调用pte_lockptr()函数来获取锁。

static inline spinlock_t *pte_lockptr(struct mm_struct *mm, pmd_t *pmd)
{
     return &mm->page_table_lock;
}

另外,如果定义了USE_SPLIT_PTE_PTLOCKS宏,那么page数据结构中也有一个类似的锁ptl。宏的判断条件如下。

<include/linux/mm_types.h>

#define USE_SPLIT_PTE_PTLOCKS  (NR_CPUS >= CONFIG_SPLIT_PTLOCK_CPUS)

page数据结构中的flags成员是一些标志位的集合,其中PG_locked标志位用作页面锁。页面锁在卷1中已详细分析。常用的函数有lock_page()和trylock_page(),二者用于给某个页面加锁。此外,还可以让进程在该锁中等待,wait_on_page_locked()函数可以让进程等待该页面的锁释放。

在RMAP系统中,anon_vma(AV)数据结构中维护了一棵红黑树,相应的VMA数据结构中维护了一个anon_vma_chain(AVC)链表。AV数据结构中定义了rwsem成员,该成员是一个读写信号量。既然是读写信号量,那么开发者就必须区分哪些临界区是只读的,哪些是可写的。

当父进程通过fork系统调用创建子进程时,子进程会复制父进程的VMA数据结构的内容作为自己的进程地址空间,并将父进程的PTE复制到子进程的页表中,使父进程和子进程共享页表。多个不同进程的VMA里的虚拟页面会同时映射到同一个物理页面,RMAP系统会创建AVC链表来连接父、子进程的VMA,子进程也会使用AVC作为连接VMA与AV的桥梁。建立连接桥梁的函数是anon_vma_chain_link(),连接的动作会修改原来AV中红黑树的数据和VMA中的AVC链表,因此该过程是一个可写的临界区。

下面以anon_vma_fork()函数为例。

<mm/rmap.c>

0 int anon_vma_fork(struct vm_area_struct *vma, struct vm_area_struct *pvma)
1 {
2   ...
3   vma->anon_vma = anon_vma;
4   anon_vma_lock_write(anon_vma);
5   anon_vma_chain_link(vma, avc, anon_vma);
6   anon_vma->parent->degree++;
7   anon_vma_unlock_write(anon_vma);
8   ...
9   return 0;
10}

在上述代码中,anon_vma指子进程的AV数据结构,AVC用于连接子进程的VMA和AV,第5行代码中的anon_vma_chain_link()函数使用AVC把VMA和AV连接到一起,并且把AVC加入子进程的AV中的红黑树中和子进程的VMA中的AVC链表。在这个过程中,其他进程可能会访问AV的红黑树或AVC链表,如内核线程Kswapd调用rmap_walk_anon()函数时也恰巧访问AV的红黑树或AVC,那么会导致链表和红黑树的访问冲突,因此这里需要添加一个写者信号量,见第4行代码中的anon_vma_lock_write()函数。

下面来看读者的情况。RMAP系统中一个很重要的功能是从page数据结构找出所有映射到该页的VMA,这个过程需要遍历前面提到的AV中的红黑树和VMA中的AVC链表,这是一个只读的过程,因此需要一个读者信号量来保护遍历的过程。

<mm/rmap.c>

0 int try_to_unmap(struct page *page, enum ttu_flags flags)
1 {
2    int ret;
3    struct rmap_walk_control rwc = {
4        .rmap_one = try_to_unmap_one,
5        .arg = (void *)flags,
6        .done = page_not_mapped,
7        .anon_lock = page_lock_anon_vma_read,
8    };
9    ret = rmap_walk(page, &rwc);
10   return ret;
11 }

try_to_unmap()函数是遍历RMAP的一个例子,具体的遍历过程在rmap_walk()函数中,其中rmap_walk_control数据结构中的anon_lock()函数指针可以用来指定如何为AV申请读写锁。如第7行中,通过page_lock_anon_vma_read()函数来实现,该函数的主要代码片段如下。

0 struct anon_vma *page_lock_anon_vma_read(struct page *page)
1 {
2    struct anon_vma *anon_vma = NULL;
3    struct anon_vma *root_anon_vma;
4    unsigned long anon_mapping;
5
6    rcu_read_lock();
7    anon_mapping = (unsigned long) ACCESS_ONCE(page->mapping);
8    if ((anon_mapping & PAGE_MAPPING_FLAGS) != PAGE_MAPPING_ANON)
9         goto out;
10
11   anon_vma = (struct anon_vma *) (anon_mapping - PAGE_MAPPING_ANON);
12   root_anon_vma = ACCESS_ONCE(anon_vma->root);
13   if (down_read_trylock(&root_anon_vma->rwsem)) {
14        ...
15        goto out;
16   }
17
18   ...
19   /* 如果锁定 anon_vma, 就可以放心地休眠了 */
20   rcu_read_unlock();
21   anon_vma_lock_read(anon_vma);
22   return anon_vma;
23}

第7~11行中,从page数据结构中的mapping成员中获取AV指针,然后尝试获取AV中的读者锁。这里首先用down_read_trylock()函数去尝试快速获取锁,如果失败,才会调用anon_vma_lock_read()函数去睡眠等待锁。

zone数据结构中有一个自旋锁用于保护zone的LRU链表,以shrink_active_list()函数为例。

<mm/vmscan.c>

0 static void shrink_active_list(unsigned long nr_to_scan,
1                    struct lruvec *lruvec,
2                    struct scan_control *sc,
3                    enum lru_list lru)
4 {
5    ...
6    spin_lock_irq(&zone->lru_lock);
7
8    nr_taken = isolate_lru_pages(nr_to_scan, lruvec, &l_hold,
9                       &nr_scanned, sc, isolate_mode, lru);
10   spin_unlock_irq(&zone->lru_lock);
11   ...
12   /*
13    * Move pages back to the lru list.
14    */
15   spin_lock_irq(&zone->lru_lock);
16   __mod_zone_page_state(zone, NR_ISOLATED_ANON + file, -nr_taken);
17   spin_unlock_irq(&zone->lru_lock);
18   ...
19}

在介绍RCU时提到,RCU的优势是对于多个读者没有任何开销,所有的开销都在写者中,因此对于读者来说这相当于无锁(lockless)编程。内存管理中很多代码使用RCU来提高系统性能,特别是读者多于写者的场景。

下面以RMAP系统中的代码为例。

<mm/rmap.c>

0 struct anon_vma *page_get_anon_vma(struct page *page)
1 {
2    struct anon_vma *anon_vma = NULL;
3    unsigned long anon_mapping;
4
5    rcu_read_lock();
6    anon_mapping = (unsigned long) ACCESS_ONCE(page->mapping);
7    if ((anon_mapping & PAGE_MAPPING_FLAGS) != PAGE_MAPPING_ANON)
8          goto out;
9    if (!page_mapped(page))
10         goto out;
11
12   anon_vma = (struct anon_vma *) (anon_mapping - PAGE_MAPPING_ANON);
13   if (!atomic_inc_not_zero(&anon_vma->refcount)) {
14         anon_vma = NULL;
15         goto out;
16   }
17   if (!page_mapped(page)) {
18         rcu_read_unlock();
19         put_anon_vma(anon_vma);
20         return NULL;
21   }
22 out:
23   rcu_read_unlock();
24   return anon_vma;
25}

page_get_anon_vma()函数实现的功能比较简单,由page数据结构来获取对应的AV指针。第5行与第23行代码使用rcu_read_lock()函数和rcu_read_unlock()函数来构建一个RCU读者临界区,这里为什么要使用RCU读者锁呢?这段代码需要保护的对象是AV指针指向的数据结构,并且临界区内没有写入数据。如果线程正在此临界区执行时另一个线程把AV指向的数据删除了,那么会出现问题,因此需要一种同步的机制来做保护,这里使用RCU机制。

对应的写者又在哪里呢?这里的写者指异步删除AV的线程,删除匿名页面,如线程调用do_unmmap操作后最终会调用unlink_anon_vmas()函数删除AV。另外,页迁移时也会删除匿名页面,详见__unmap_and_move()函数。

函数调用路径为unlink_anon_vmas()→put_anon_vma()→__put_anon_vma()→anon_vma_free()。依然没有看到RCU在何时注册了回调函数并删除被保护的对象。

在分配每个AV数据结构时,采用kmem_cache_create()函数创建一个特殊的slab缓存对象。注意,创建的标志位中有SLAB_DESTROY_BY_RCU。

void __init anon_vma_init(void)
{
    anon_vma_cachep = kmem_cache_create("anon_vma", sizeof(struct anon_vma),
           0, SLAB_DESTROY_BY_RCU|SLAB_PANIC, anon_vma_ctor);
    anon_vma_chain_cachep = KMEM_CACHE(anon_vma_chain, SLAB_PANIC);
}

SLAB_DESTROY_BY_RCU是slab分配器中一个重要的分配标志位,它会延迟释放slab缓存对象所分配的页面,而不是延迟释放对象。所以如果使用kmem_cache_free()函数释放了这个对象,那么对应的内存区域也就被释放了。这个标志位仅仅保证这个地址所在的内存是有效的,但是不能保证内存中的内容是开发者所需要的,因此需要额外的验证机制来保证对象的正确性。

<include/linux/slab.h>

rcu_read_lock()
 again:
 obj = lockless_lookup(key);  //通过key来查找对象
 if (obj) {
      if (!try_get_ref(obj))  //释放对象可能会出错
          goto again;

      if (obj->key != key) {  //验证可能不是想要的对象
          put_ref(obj);
          goto again;
      }
    }
rcu_read_unlock();

这个用法适用于通过地址间接地获取一个内核数据结构,并且不需要额外的锁保护,体现了无锁编程思想。我们可以先锁定这个数据结构,然后检查它是否还在一个给定的地址上,只要保证内存没有被重复使用即可。

回到page_get_anon_vma()函数中,SLAB_DESTROY_BY_RCU只保证anon_vma_cachep这个slab对象缓存所有的页面不会被释放,但是物理页面对应的 AV 对象可能已经被释放了,因此需要额外的判断。这里有两种情况:一是AV被释放,没有PTE引用该页,即 page_mapped()函数返回false,所以代码中page_mapped()函数可以避免这种情况;二是AV被其他的AV所替换,新的AV应该是旧AV的子集,因此返回一个子AV也是正确的。

另一个使用 RCU 的例子是链表,如 mm/vmalloc.c 中的 vmap_area 数据结构就内嵌了rcu_head。

struct vmap_area {
     unsigned long va_start;
     unsigned long va_end;
     unsigned long flags;
     struct rb_node rb_node;
     struct list_head list;
     struct list_head purge_list;
     struct vm_struct *vm;
     struct rcu_head rcu_head;
};

这些vmap_area会被添加到vmap_area_list链表中。遍历链表的过程也构成了RCU读者临界区,因此下面的get_vmalloc_info()函数使用rcu_read_lock()函数保护链表。

<mm/vmalloc.c>

void get_vmalloc_info(struct vmalloc_info *vmi)
{
     ...
     rcu_read_lock();
     if (list_empty(&vmap_area_list)) {
            vmi->largest_chunk = VMALLOC_TOTAL;
            goto out;
     }

     list_for_each_entry_rcu(va, &vmap_area_list, list) {
            ...
     }

out:
     rcu_read_unlock();
}

删除vmap_area_list链表中成员的线程可以被视为写者,通过调用__free_vmap_area()函数来删除。

<mm/vmalloc.c>

static void __free_vmap_area(struct vmap_area *va)
{
     ...
     list_del_rcu(&va->list);
     kfree_rcu(va, rcu_head);
}

通过list_del_rcu()函数来删除链表中的成员,kfree_rcu()函数最终会调用__call_rcu()函数来注册回调函数,并且等待所有CPU都处于静止状态后才会真正删除这个成员。

类似的RCU链表在内存管理代码中有很多,如在oom_kill.c文件中常常会看到遍历系统所有进程时会使用rcu_read_lock()函数来构建临界区。

<mm/oom_kill.c>

static struct task_struct *select_bad_process(unsigned int *ppoints,
          unsigned long totalpages, const nodemask_t *nodemask,
          bool force_kill)
{
     ...
     rcu_read_lock();
     for_each_process_thread(g, p) {
          ...
     }
     rcu_read_unlock();
     return chosen;
}

读者可以尝试研究、体会其中的奥秘。

在RCU中有一个CPU停滞检测机制,用来检测RCU在GP内是否长时间没有执行完。当RCU检测到此情况发生时会输出一些警告信息以及函数调用栈等信息,用来帮助开发者定位问题。以1.10节的RCU为例,通过以下代码在myrcu_reader_thread1()函数的RCU读临界区中添加一个长时间等待函数mdelay()。

static int myrcu_reader_thread1(void *data) //读者线程1
{
     struct foo *p1 = NULL;

     while (1) {
          if(kthread_should_stop())
               break;
          msleep(10);
          rcu_read_lock();
          p1 = rcu_dereference(g_ptr);
          mdelay(100000000); //新增延迟
          rcu_read_unlock();
     }
     return 0;
}

下面是上述内核模块运行后得到的RCU警告信息。

<RCU警告信息片段一>

[  398.315712] rcu: INFO: rcu_sched self-detected stall on CPU
[  398.316904] rcu:  0-....: (5173 ticks this GP) idle=d3e/1/0x4000000000000002 softirq=7673/7673 fqs=2247 
[  398.317423] rcu:   (t=5250 jiffies g=20197 q=711)

rcu_sched模块检测到本地CPU有发生停滞的情况。发生停滞的CPU是CPU0。其中,“(5173 ticks this GP)”表示在当前的GP内已经持续了5173个时钟中断,说明当前的GP被长时间停滞了。有些情况下,如果显示“(3 GPs behind)”,则表示过去3个GP内没有和RCU核心进行交互。

idle值表示tick-idle模块的信息。softirq值表示RCU软中断处理的数量。

接下来,输出发生RCU停滞时的函数调用栈信息。

[  398.318138] Task dump for CPU 0:
[  398.318727] rcu_reader1     R  running task        0   885      2 0x0000002a
[  398.319615] Call trace:
//触发RCU停滞检查的路径
[  398.320620]  dump_backtrace+0x0/0x52c
[  398.321098]  show_stack+0x28/0x34
[  398.321355]  sched_show_task+0x6f8/0x734
[  398.321734]  dump_cpu_task+0x58/0x64
[  398.322033]  rcu_dump_cpu_stacks+0x330/0x414
[  398.322297]  print_cpu_stall+0x51c/0xaf8
[  398.322542]  check_cpu_stall+0x754/0x96c
[  398.322831]  rcu_pending+0x64/0x38c
[  398.323317]  rcu_check_callbacks+0x550/0x8e8
[  398.324392]  update_process_times+0x54/0x18c
[  398.324973]  tick_sched_timer+0xa8/0x10c
[  398.325688]  __hrtimer_run_queues+0xb0/0x128
[  398.325967]  hrtimer_interrupt+0x468/0x9dc
[  398.326658]  handle_percpu_devid_irq+0x4e8/0xa18
[  398.326969]  generic_handle_irq+0x50/0x60
[  398.327706]  __handle_domain_irq+0x184/0x238
[  398.328121]  gic_handle_irq+0x1e8/0x594
[  398.328331]  el1_irq+0xb0/0x140

//发生停滞的函数调用栈
[  398.328549]  arch_counter_get_cntvct+0x200/0x20c
[  398.328775]  __delay+0x94/0xf8
[  398.328952]  __const_udelay+0x48/0x54
[  398.330126]  myrcu_reader_thread1+0x160/0x1b8 [rcu_test]
[  398.330348]  kthread+0x3c4/0x3d0
[  398.330493]  ret_from_fork+0x10/0x18

读者需要从上述函数调用栈分析发生RCU停滞的原因。

发生RCU停滞的通常有如下几种情况。

在RCU读临界区里发生了死循环(CPU looping)。

[1] 详见《ARM Architecture Reference Manual, for ARMv8-A architecture profile, v8.4》C6.2.235节。

[2] 详见《ARM Architecture Reference Manual, for ARMv8-A architecture profile, v8.4》C6.2.40节。

[3] 假设进程A获取一个自旋锁时使用mcs_spinlock[0]节点,在临界区中发生了中断,中断处理程序也申请了该自旋锁,那么会使用mcs_spinlock[1]节点。

[4] 代码提前规划定义了4个mcs_spinlock节点,用于不同的上下文——进程上下文task、软中断上下文softirq、硬中断上下文hardirq和不可屏蔽中断上下文nmi。

[5] Linux-2.6.29 patch, commit 64db4cfff,“ "Tree RCU": scalable classic RCU implementation ”, by Paul E. McKenney.


相关图书

Linux常用命令自学手册
Linux常用命令自学手册
Linux后端开发工程实践
Linux后端开发工程实践
庖丁解牛Linux操作系统分析
庖丁解牛Linux操作系统分析
轻松学Linux:从Manjaro到Arch Linux
轻松学Linux:从Manjaro到Arch Linux
Linux高性能网络详解:从DPDK、RDMA到XDP
Linux高性能网络详解:从DPDK、RDMA到XDP
跟老韩学Linux架构(基础篇)
跟老韩学Linux架构(基础篇)

相关文章

相关课程