使用Rust编写操作系统 - 4.1 - Async/Await

本文将探讨协作式多任务处理以及Rust的async/await特性。我们将详细研究async/await在Rust中的工作方式,包括Futuretrait的设计,状态机转换和pinning(译注:内存固定)。然后,我们通过创建异步键盘任务和基本执行器,使得内核具备对async/await的基本支持。

这个博客是在GitHub上公开开发的。如果你有任何问题或疑问,请在那里开一个issue。你也可以在底部留言。这篇文章的完整源代码可以在post-12分支中找到。

多任务

多任务处理是大多数操作系统的基本特征之一,这是一种能够同时执行多个任务的功能。例如,在看这篇文章时,你可能会打开其他程序,例如文本编辑器或终端窗口。即使只打开一个浏览器窗口,也可能会有各种后台任务来管理桌面窗口,检查更新或为文件建立索引。

尽管所有任务看上去都是并行运行的,但实际在CPU内核上一次只能执行一个任务。为了产生任务可以并行运行的错觉,操作系统会在活动任务之间快速切换,以使得每个任务都可以有一些进展。由于计算机速度很快,因此大多数时候我们不会注意到这些切换。

单核CPU一次只能执行一个任务,而多核CPU可以真正并行地运行多个任务。例如,具有8个内核的CPU可以同时运行8个任务。我们将在以后的文章中解释如何设置多核CPU。在本文中,为简单起见,我们将重点介绍单核CPU。(值得注意的是,所有多核CPU都仅从单个活动核启动,因此我们目前仍然可以将它们视为单核CPU。)

多任务处理有两种形式:协作式多任务要求任务定期放弃对CPU的控制,以便其他任务可以继续执行。抢占式多任务使用操作系统功能通过强行暂停线程,以在任意时间点切换线程。在下文中,我们将更详细地探讨多任务的两种形式,并讨论它们各自的优缺点。

抢占式多任务

抢占式多任务处理的思路是使用操作系统控制何时切换任务,思路利用了操作系统在每个中断上可以重获CPU控制权这一机制。这样,只要系统有新输入可用,就可以切换任务。例如,当鼠标移动或网络数据包到达时,就可以执行任务切换。操作系统还可以通过配置硬件计时器在到达特定时间时发送中断,来确定允许任务运行的确切时间。

下图说明了硬件中断上的任务切换过程:

中断时重获控制权

在第一行中,CPU正在执行程序A的任务A1,所有其他任务均被暂停。在第二行中,硬件中断到达CPU。就像硬件中断一文中介绍的那样,CPU立即停止执行任务A1,并跳转到中断描述符表(IDT)中定义的中断处理程序。通过中断处理程序,操作系统现在可以再次控制CPU,从而使它可以切换到任务B1,而不是继续执行任务A1

状态保存

鉴于任务会在任意时间点中断,这些任务可能正处于某些计算中。为了能够在以后恢复这些任务,操作系统必须备份任务的整个状态,包括其调用栈和所有CPU寄存器的值。此过程称为上下文切换

由于调用栈可能会非常大,操作系统通常会为每个任务设置单独的调用栈,而不是在每次任务切换时备份调用栈的内容。具有单独调用栈的此类任务称为执行线程或简称线程。通过为每个任务设置单独的栈,上下文切换时仅需要保存寄存器的内容即可(包括程序计数器和栈指针)。这种方法可以最大程度地减少上下文切换的性能开销,这非常重要,因为上下文切换通常能够到达每秒100次。

讨论

抢占式多任务处理的主要优点是操作系统可以完全控制任务的执行时间。这样即使不要求任务间的相互协作,也能够确保公平分配各任务的CPU时间。在运行第三方任务时或在多个用户共享系统时,这一点尤其重要。

抢占式的缺点是每个任务都需要自己的调用栈。与共享调用栈相比,这会导致每个任务的内存使用量增加,并且通常会限制系统中的任务的总量。抢占式的另一个缺点是,即使任务仅使用了一小部分寄存器,操作系统在每个任务切换时仍必须保存全部的CPU寄存器状态。

抢占式多任务处理和线程是操作系统的基本要素,因为它们能够使操作系统能够运行不受信任的用户空间程序,我们将在以后的文章中详细讨论这些概念。在本文中,我们将专注于协作式多任务处理,这种方式也能够为内核提供有用的多任务功能。

协作式多任务

协作多任务处理并不是在任意时间点强行暂停正在运行的任务,而是让每个任务运行到自愿放弃对CPU的控制为止。这使任务可以在方便的时间点暂停自己,比如当该任务需要等待I/O操作时。

合作多任务通常使用在语言层,例如以协程async/await的形式。该思路是由程序员或编译器都将yield操作插入到程序中,从而使该任务放弃对CPU的控制以允许其他任务运行。在复杂循环的每次迭代之后都可以插入一个yield。

协作式多任务通常与异步操作一起使用。与抢占式的一直等待直到操作完成并在期间阻止其他任务的执行,在协作式中若操作尚未完成,则异步操作将返回一个“未就绪”状态。在这种情况下,等待的任务可以执行一个yield操作以允许其他任务运行。

状态保存

由于任务会为自己定义暂停点,因此它们不需要操作系统来保存其状态。与抢占式相比,协作式可以在暂停之前只保存用以在继续时恢复操作的必要的寄存器状态,这通常可以提高性能。例如,刚完成复杂计算的任务可能只需要备份计算的最终结果,因为此时中间结果已经不再需要了。

由语言支持实现的协作任务甚至可以在暂停之前只备份调用堆栈的所需部分。例如,Rust的async/await实现会将所有需要的局部变量存储在自动生成的结构体中(见下文)。通过在暂停之前备份调用堆栈的相关部分,所有任务可以共享一个调用堆栈,从而减少了每个任务的内存消耗。这样就可以创建几乎无限的协作任务,而不用担心内存耗尽。

讨论

协作式多任务处理的缺点是,一个非协作的任务可能会无限期地运行下去。那么,恶意程序或有bug的任务可能会阻止其他任务的运行,并拖慢甚至阻塞整个系统。因此,当且仅当所有任务都支持协作时才能使用协作式多任务处理。而让操作系统依赖于任意用户级程序的协作并不是一个好主意。

但是,协作多任务的强大性能和内存优势,使其成为在程序内部使用的好方法,特别是与异步操作结合使用时。由于操作系统内核需要与异步硬件交互,性能至关重要,因此协作多任务似乎是一个实现并发的好方法。

Rust的async/await

Rust语言以async/await的形式为协作式多任务提供了原生支持。在探讨什么是async/await及其工作原理之前,我们需要了解Rust中的futures和异步编程是如何工作的。

Futures

Future表示一个可能尚不可用的值。例如,这个值可能是由另一个任务计算的整数,也可能是从网络下载的文件。Future并不需要一直要等待到该值可用,它可以继续执行其他代码直到需要用到该值为止。

示例

最好用一个小例子说明futures的概念:

async示例

此序列图展示了一个main函数,它从文件系统读取文件,然后调用foo函数。图中以不同的方式执行两次此过程:一次是调用同步函数read_file,一次是调用异步函数async_read_file

在同步调用中,main函数需要一直等待,直到该文件从文件系统中加载完毕。只有这样,它才能继续调用foo函数,并再次等待其执行结果。

通过异步async_read_file调用,文件系统直接返回一个future并在后台异步加载文件。这使main函数可以更早地调用foo函数,然后foo函数与文件加载并行运行。在此示例中,文件加载甚至在foo返回之前就完成了,因此main无需继续等待foo返回,可以直接处理文件。

Rust中的Futures

在Rust中,futures由Futuretrait表示,其定义如下:

1
2
3
4
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}

关联类型Output指定异步返回值的类型。例如,上图中的async_read_file函数将返回一个Output设置为FileFuture实例。

调用poll方法可以检查该值是否已经可用。它将返回一个枚举Poll,如下所示:

1
2
3
4
pub enum Poll<T> {
Ready(T),
Pending,
}

如果该值已经可用(例如,已从磁盘中读取完整的文件),则将其封装在Ready变量中返回。否则,将返回Pending变量,以通知调用方该值尚不可用。

poll方法采用两个参数:self: Pin<&mut Self>cx: &mut Context上下文。前者的行为类似于普通的&mut self引用,不同之处在于,Selfpinned在一个固定的内存位置上。如果不先了解async/await的工作原理,就很难理解Pin以及为什么需要Pin。因此,我们将在后文中进行详细解释。

cx: &mut Context参数用于将一个Waker实例传递给异步任务,例如加载文件系统。这个Waker允许异步任务发信号通知任务(或部分任务)已完成,例如该文件已从磁盘加载。由于主任务知道将在Future就绪时将会收到通知,因此就不需要一遍又一遍地调用poll了。我们会在后文中实现自己的waker类型,届时将更加详细地说明此过程。

使用Futures

现在,我们知道了如何定义future,了解了poll方法的基本思路。但是,我们仍然不知道如何有效地使用futures。由于futures代表异步任务的结果,可能尚不可用。但实际上,我们经常立即需要这些值以进行进一步的计算。所以问题是:在需要时如何有效地检索一个future的值?

等待Futures

一种可能的答案是一直等待到future就绪为止。该过程看起来像这样:

1
2
3
4
5
6
7
let future = async_read_file("foo.txt");
let file_content = loop {
match future.poll(…) {
Poll::Ready(value) => break value,
Poll::Pending => {}, // 什么也不做
}
}

在上面的代码中,我们通过循环中积极地调用poll来等待future完成。在这里poll的参数并不重要,因此已被省略。尽管此方法可行,但是效率很低,因为包含poll的循环会一直用占用CPU直到该值可用为止。

一种较为高效的方法是阻塞当前线程,直到future可用为止。当然,这只有在有线程支持的情况下才可行,因此并不适用于我们的内核,至少目前还不行。即使在支持阻塞的系统上,通常也不会这么做,因为阻塞会将异步任务再次变为同步任务,从而失去了并行任务的性能优势。

Future组合器

另一种等待方式是使用future组合器。Future组合器是类似于map的方法,它允许将futures链接并组合在一起,就像在Iterator上做的那样。这些组合器不等待future,而是自己返回future,即为poll应用了map操作。

下面举个简单的例子,一个用于将Future<Output = String>转换为Future<Output = usize>string_len组合器可能看起来像这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
struct StringLen<F> {
inner_future: F,
}

impl<F> Future for StringLen<F> where F: Future<Output = String> {
type Output = usize;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
match self.inner_future.poll(cx) {
Poll::Ready(s) => Poll::Ready(s.len()),
Poll::Pending => Poll::Pending,
}
}
}

fn string_len(string: impl Future<Output = String>)
-> impl Future<Output = usize>
{
StringLen {
inner_future: string,
}
}

// Usage
fn file_len() -> impl Future<Output = usize> {
let file_content_future = async_read_file("foo.txt");
string_len(file_content_future)
}

该代码无法正常工作,因为尚未处理pinning,不过作为例子已经足够了。基本思路是string_len函数将给定的实现了Futuretrait的实例封装到新的StringLen结构体中,而该结构体也实现了Futuretrait。当poll封装的future时,即是poll其内部的future。如果该值尚未就绪,封装的future也将返回Poll::Pending。如果该值已就绪,则从Poll::Ready变量中获取字符串,并计算其长度。最后再将其封装在Poll::Ready中返回。

通过string_len函数,我们不需要等待一个异步字符串,就可以计算其长度。由于该函数也返回Future,因此调用者无法直接操作返回的值,而需要再次使用组合器函数。这样,整个调用过程就变为异步的了,我们可以高效地在某个时刻一次等待多个future,例如 在main函数上。

手动编写组合器函数比较困难,所以它们通常由库直接提供。尽管Rust标准库本身还没有提供官方组合器方法,但半官方(兼容no_std)的futurecrate可以。其FutureExt特性提供了诸如mapthen之类的高级组合器方法,可用于任意闭合操作结果。

优点

Future组合器最大的优点是能够使操作保持异步。这种方法和异步I/O接口结合使用时性能非常高。实际上future组合器将被实现为具有trait的普通结构体,以使编译器能够对其做出进一步优化。有关更多详细信息,请参阅Rust的零成本future一文,就是这篇文章宣布了在Rust生态系统中添加future。

缺点

尽管future组合器可以编写出非常高效的代码,但由于类型系统和基于闭包的接口的限制,组合器可能会在某些情况下变得难以使用。例如,考虑下面的代码:

1
2
3
4
5
6
7
8
9
fn example(min_len: usize) -> impl Future<Output = String> {
async_read_file("foo.txt").then(move |content| {
if content.len() < min_len {
Either::Left(async_read_file("bar.txt").map(|s| content + &s))
} else {
Either::Right(future::ready(content))
}
})
}

使用play rust在线运行上面的代码

代码先读取文件foo.txt,然后使用then组合器根据文件内容链接第二个future。如果内容长度小于给定的min_len,我们将读取另一个bar.txt文件,然后使用map组合器将其附加到content中。否则,只返回foo.txtcontent

我们需要在传递给then的闭包上使用move关键字,否则min_len变量会产生生命周期错误。使用Either封装的原因是让if块和else块始终具有相同的类型。由于我们在块中返回了不同的future类型,因此必须使用封装类型将它们统一为一个类型。ready函数将立刻就绪的值封装到future中。这里需要使用该函数是因为Either封装要求值实现Futuretrait。

您可以想象,这种用法很快就会导致大型项目的代码变得非常复杂。尤其是再涉及借用和生命周期,就会变得更加复杂。因此,为了使异步代码从根本上更易于编写,我们投入了大量工作来为Rust添加对async/await的支持。

Async/Await模式

Async/Await的思路是让程序员以编写看起来像同步代码的方式编写异步代码,只不最后是由编译器将同步代码转换为异步代码。它基于两个关键字asyncawait。在函数签名中使用async关键字,就可以将同步函数转换为一个返回future的异步函数:

1
2
3
4
5
6
7
8
async fn foo() -> u32 {
0
}

// 上面的函数大概会变编译器转换为:
fn foo() -> impl Future<Output = u32> {
future::ready(0)
}

仅使用此关键字并没有那么有用。但是,在异步函数内部,可以使用await关键字来取回future的异步值:

1
2
3
4
5
6
7
8
async fn example(min_len: usize) -> String {
let content = async_read_file("foo.txt").await;
if content.len() < min_len {
content + &async_read_file("bar.txt").await
} else {
content
}
}

使用play rust在线运行上面的代码

上面使用组合器实现的example函数直接转换为async/await模式:使用.await运算符就可以取回future的值,无需使用闭包或Either类型。如此,我们就可以像编写普通的同步代码一样编写异步代码

状态机转换

在这种场景中,编译器的作用就是将async函数体转换为一个状态机,每个.await调用代表一个不同的状态。对于上面的example函数,编译器创建具有以下四个状态的状态机:

async状态机的状态

不同状态代表该函数的不同暂停点。”Start“和”End“状态代表函数在其执行的开始和结束时的状态。”Waiting on foo.txt“状态表示该函数目前正在等待第一个async_read_file的结果。同样的,”Waiting on bar.txt“状态表示函数在等待第二个async_read_file的结果的暂停点。

状态机通过将每个poll调用都变为一个可能的状态转换来实现Futuretrait:

async状态机本质

图中使用箭头表示状态开关,并使用菱形表示条件路径。例如,如果foo.txt文件尚未准备好,则采用标记为”no“的路径,并达到”Waiting on foo.txt“的状态。否则,就采用标记为“yes”路径。没有字的红色小菱形代表example函数中if content.len() < 100的条件分支。

我们看到第一个poll调用启动了该函数并使它运行,直到遇到一个尚未就绪的future。如果路径上的所有future都已就绪,则该函数可以一直运行到”End“状态,并返回封装在Poll::Ready中的结果。否则,状态机将进入等待状态并返回Poll::Pending。然后在下一个poll调用中,状态机从上一个等待状态开始重试其最后一次操作。

状态保存

为了能够从上一个等待状态中恢复,状态机必须在内部跟踪当前状态。此外,它还必须保存在下一个poll调用中恢复执行所需的变量。这就是编译器真正发挥作用的地方:由于编译器知道在何时要使用哪些变量,因此它可以自动生成具有所需变量的结构体。

作为示例,编译器为上面的example函数生成类似于下面这样的结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 这是async版的`example`函数
async fn example(min_len: usize) -> String {
let content = async_read_file("foo.txt").await;
if content.len() < min_len {
content + &async_read_file("bar.txt").await
} else {
content
}
}

// 这是编译器生成的状态结构体

struct StartState {
min_len: usize,
}

struct WaitingOnFooTxtState {
min_len: usize,
foo_txt_future: impl Future<Output = String>,
}

struct WaitingOnBarTxtState {
content: String,
bar_txt_future: impl Future<Output = String>,
}

struct EndState {}

在”Start“和”Waiting on foo.txt“状态下,需要存储min_len参数,因为稍后与content.len()做比较时需要使用该参数。”Waiting on foo.txt“状态还存储了一个foo_txt_future,用来表示async_read_file调用返回的future。状态机继续运行时会再次poll该future,因此需要将其保存。

Waiting on bar.txt“状态包含content变量,是因为在bar.txt就绪后需要使用该变量进行字符串连接。该状态还存储了一个bar_txt_future,用来表示正在加载中的bar.txt。该结构体不包含min_len变量,因为在content.len()比较之后就不再需要该变量了。在”End“状态下,没有存储任何变量,因为此时函数已经运行完毕。

请记住,这只是编译器可能生成的代码的一个示例。结构体名称和字段布局是实现细节,可能会有所不同。

全状态机类型

尽管编译器生成的确切代码是实现细节,但这个示例还是有助于我们理解并想象example函数生成的状态机可能的样子。我们已经定义了代表不同状态的结构体,并给出了其中包含的所需变量。为了基于这些结构体创建一个状态机,我们可以将它们组合成一个枚举:

1
2
3
4
5
6
enum ExampleStateMachine {
Start(StartState),
WaitingOnFooTxt(WaitingOnFooTxtState),
WaitingOnBarTxt(WaitingOnBarTxtState),
End(EndState),
}

我们为每个状态定义一个单独的枚举变量,并将对应状态的结构体作为字段添加到每个变量。为了实现状态转换,编译器根据example函数实现Futuretrait:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
impl Future for ExampleStateMachine {
type Output = String; // `example`函数的返回类型

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
loop {
match self { // TODO: 处理pinning
ExampleStateMachine::Start(state) => {…}
ExampleStateMachine::WaitingOnFooTxt(state) => {…}
ExampleStateMachine::WaitingOnBarTxt(state) => {…}
ExampleStateMachine::End(state) => {…}
}
}
}
}

FutureOutput类型为String,即example函数的返回类型。为了实现poll函数,我们在loop内的当前状态上使用match语句。思路是我们尽可能长时间地切换到下一个状态,并在无法继续时显式的使用return Poll::Pending

为简单起见,这里仅给出简化的代码,且暂不处理pinning、所有权、生命周期等内容。因此,这里的代码和下面的代码应被看做伪代码,不能直接使用。当然,真正的编译器生成的代码可以正确处理所有内容,尽管可能使用了与我们不同的方式。

为了使示意的代码更简洁,我们将分别显示每个匹配分支的代码。从”Start”状态开始:

1
2
3
4
5
6
7
8
9
10
ExampleStateMachine::Start(state) => {
// from body of `example`
let foo_txt_future = async_read_file("foo.txt");
// `.await` operation
let state = WaitingOnFooTxtState {
min_len: state.min_len,
foo_txt_future,
};
*self = ExampleStateMachine::WaitingOnFooTxt(state);
}

当状态机处于Start状态时,其对应位置正是函数体的最开始。在这种情况下,我们将执行example函数体中的所有代码,直到遇到第一个.await。为了处理.await操作,我们将self状态机的状态修改为WaitingOnFooTxt,并令状态中包含WaitingOnFooTxtState结构体。

由于match self {…}语句是在循环中执行的,因此该执行将跳至下一个分支WaitingOnFooTxt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
ExampleStateMachine::WaitingOnFooTxt(state) => {
match state.foo_txt_future.poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(content) => {
// from body of `example`
if content.len() < state.min_len {
let bar_txt_future = async_read_file("bar.txt");
// `.await` operation
let state = WaitingOnBarTxtState {
content,
bar_txt_future,
};
*self = ExampleStateMachine::WaitingOnBarTxt(state);
} else {
*self = ExampleStateMachine::End(EndState));
return Poll::Ready(content);
}
}
}
}

在这一匹配分支中,我们首先调用foo_txt_futurepoll函数。如果尚未就绪,则退出循环并返回Poll::Pending。由于在这种情况下self仍位于WaitingOnFooTxt状态,因此状态机的下一次poll调用也将进入相同的匹配分支并重试foo_txt_future

foo_txt_future就绪时,我们将结果赋给content变量,然后继续执行example函数的代码:如果content.len()小于状态结构体中保存的min_len,则异步读取bar.txt文件。我们再次将.await操作转换为状态更改,而这次应转换为WaitingOnBarTxt状态。由于我们是在循环内执行匹配,因此下一轮循环将直接跳转到新状态的匹配分支,然后在该状态下pollbar_txt_future

如果我们进入else分支,则不会进行进一步的.await操作。此时已到达函数的结尾,并将content封装在Poll::Ready中返回。我们还需要将当前状态更改为End状态。

WaitingOnBarTxt状态的代码如下所示:

1
2
3
4
5
6
7
8
9
10
ExampleStateMachine::WaitingOnBarTxt(state) => {
match state.bar_txt_future.poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(bar_txt) => {
*self = ExampleStateMachine::End(EndState));
// from body of `example`
return Poll::Ready(state.content + &bar_txt);
}
}
}

WaitingOnFooTxt状态类似,我们从pollbar_txt_future开始。如果仍未就绪,则退出循环并返回Poll::Pending。否则,我们就执行example函数的最后一个操作:用content变量与future的结果做字符串连接。我们将状态机更新为End状态,然后将结果封装在Poll::Ready中返回。

最后,End状态的代码如下所示:

1
2
3
ExampleStateMachine::End(_) => {
panic!("poll called after Poll::Ready was returned");
}

Future返回Poll::Ready后就不应再被poll了,因此,当我们已经处于End状态时,如果再次调用poll,就产生一个panic。

现在我们知道了编译器可能会生成怎样的状态机,以及怎样去给状态机实现Futuretrait。但实际上,编译器会以不同的方式生成代码。(如果您感兴趣的话,该实现目前基于生成器,不过这只是实现细节。)

最后一步是为example函数本身生成代码。记住,函数签名是这样定义的:

1
async fn example(min_len: usize) -> String

由于现在整个函数体是由状态机实现的,因此该函数唯一需要做的就是初始化状态机并将其返回。为此生成的代码如下所示:

1
2
3
4
5
fn example(min_len: usize) -> ExampleStateMachine {
ExampleStateMachine::Start(StartState {
min_len,
})
}

该函数不再使用async修饰符,因为它现在显式返回一个实现了Futuretrait的ExampleStateMachine类型。如预期的那样,状态机被初始化为Start状态,并且使用min_len参数初始化了对应的状态结构体。

请注意,此函数并不会直接启动状态机。这是Rust中future的一个基本设计决策:在第一次被poll之前什么也不做。

Pinning

在这篇文章中,我们已经遇到pinning很多次了。现在终于是时候看看究竟什么是pinning以及为什么需要pinning了。

自引用结构体

如上所述,状态机转换将每个暂停点的局部变量存储在结构体中。对于像example函数这样的小例子就很简单,并不会导致任何问题。但是,当变量相互引用时,事情就会变得困难。例如,考虑以下函数:

1
2
3
4
5
6
async fn pin_example() -> i32 {
let array = [1, 2, 3];
let element = &array[2];
async_write_file("foo.txt", element.to_string()).await;
*element
}

该函数创建一个包含123的小数组。然后再创建对最后一个数组元素的引用,存储在element变量中。接下来,该函数将转换为字符串的数字异步写入到foo.txt文件中。最后返回由element引用的数字。

由于该函数使用了一个await操作,因此结果状态机具有三种状态:”Start”、”End”、”Waiting on write”。该函数没有参数,因此开始状态对应的结构体为空。像以前一样,结束状态对应的结构体也为空,因为该函数此时已完成。而”Waiting on write”状态对应的结构体就很有趣:

1
2
3
4
struct WaitingOnWriteState {
array: [1, 2, 3],
element: 0x1001c, // 最后一个数组元素的地址
}

我们需要存储arrayelement变量,因为返回值需要element,而element又引用了array。由于element是一个引用,因此它存储指向所引用元素的指针(即内存地址)。我们在这里使用0x1001c作为示例存储地址。而实际上,element字段必须是array字段最后一个元素的地址,所以这与该结构体在内存中的位置有关。由于此类结构体对自身的某些字段做了引用,因此这种具有内部指针的结构体也叫做自引用结构体。

自引用结构体的问题

自引用结构的内部指针将导致一个很基本的问题,尤其是在查看其内存布局时会更加明显:

自引用结构体

array字段从地址0x10014开始,element字段从地址0x10020开始。它指向地址0x1001c,即最后一个数组元素的地址。至此仍没什么问题。但是,当我们将此结构体移动到其他内存地址时,就会发生问题:

自引用结构体移动后

稍微移动该结构体,现在使其从地址0x10024开始。这是有可能的,例如当我们将该结构体作为函数参数传递时,或是将其分配给其他栈变量时。问题在于,尽管最后一个数组元素现已位于地址0x1002c,但element字段仍指向地址0x1001c。于是指针悬空,结果就是在下一个poll调用中发生未定义的行为。

可行的解决方案

有三种解决指针悬空问题的基本方法:

  • 在移动时更新指针:思路是每当结构体在内存中移动时都更新其内部指针,以使该指针在移动后仍然有效。不幸的是,这种方法需要对Rust进行大量更改,并可能导致巨大的性能损失。因为如果实现这种方法,就需要某种运行时持续跟踪结构体中各种类型的字段,并在每次移动发生时检查是否需要更新指针。
  • 存储偏移量而不是自引用:为了不去更新指针,编译器可以尝试将自引用存储为从结构体开始地址算起的偏移量。例如,上面的WaitingOnWriteState结构的element字段可以以element_offset字段的形式存储,其值为8,因为该引用指向的数组元素位于该结构体的起始地址后的第8个字节处。由于在移动结构时偏移量保持不变,因此不需要字段更新。
    问题是,如果实现这种方法,就必须让编译器检测所有自引用。这在编译时是不可能的,因为引用的值可能取决于用户输入,因此这又需要使用运行时系统来分析引用,从而正确地创建状态结构体。这不仅会导致运行时成本增加,而且还会阻止某些编译器优化,从而又会导致较大的性能损失。
  • 禁止移动结构体:如上所示,仅当我们在内存中移动该结构体时,才会出现悬空指针。那么,通过完全禁止对自引用结构的移动操作,就可以避免该问题。这种方法的最大优点是可以实现在类型系统级别,并不会增加运行时成本。不过它的缺点是需要程序员自己处理在可能的自引用结构体上发生的移动操作。

为了遵守提供零成本抽象的原则(这意味着抽象不应产生额外的运行时成本),Rust选择了第三种解决方案。为此,在RFC 2349中提出了pinning API。在下文中,我们将简要概述此API,并说明它将如何与async/await和futures一起使用。

堆上的值

首先,很明显,堆分配的值在大多数情况下已经具有固定的内存地址。这些值是由allocate调用创建,并由如类似Box<T>的指针类型进行引用的。尽管这种指针类型可以移动,但指针所指向的堆值将始终位于相同的内存地址中,除非调用deallocate将其释放否则地址将一直不变。

使用堆分配,我们可以尝试创建一个自引用结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
fn main() {
let mut heap_value = Box::new(SelfReferential {
self_ptr: 0 as *const _,
});
let ptr = &*heap_value as *const SelfReferential;
heap_value.self_ptr = ptr;
println!("heap value at: {:p}", heap_value);
println!("internal reference: {:p}", heap_value.self_ptr);
}

struct SelfReferential {
self_ptr: *const Self,
}

使用play rust在线运行上面的代码

我们创建一个名为SelfReferential的简单结构体,其中包含一个指针字段。首先,我们使用空指针初始化结构体,然后使用Box::new将其分配到堆上。接下来,我们确定分配给堆的结构体的内存地址,并将其存储在ptr变量中。最后,通过将ptr变量分配给self_ptr字段,将结构体变为自引用结构体。

play rust上执行此代码时,会看到堆值的地址及其内部指针地址相同,这意味着self_ptr字段是有效的自引用。由于heap_value变量仅是一个指针,因此移动它(例如,通过将其传递给函数)不会更改结构体本身的地址,因此,即使移动了指针,self_ptr也依然有效。

但是,还是有一种方法可以破坏这个例子:我们可以移出Box<T>或替换其内容:

1
2
3
4
5
let stack_value = mem::replace(&mut *heap_value, SelfReferential {
self_ptr: 0 as *const _,
});
println!("value at: {:p}", &stack_value);
println!("internal reference: {:p}", stack_value.self_ptr);

使用play rust在线运行上面的代码

这里我们使用mem::replace函数,将堆分配内存中的原值替换为一个新结构体实例。这使我们可以将堆分配中的原值heap_value移动到栈中,而现在该结构体的self_ptr字段就是一个悬空指针了,它仍然指向旧的堆地址。当你尝试在play rust上运行该代码时,就会看到“value at:”行和“internal reference:”行确实打印了不同的指针地址。因此,堆分配的值并不能完全保证自引用的安全性。

导致上述现象的根本问题是Box<T>允许我们获取对堆分配值的&mut T引用。通过&mut引用,就可以使用诸如mem::replacemem::swap一类的方法来使堆分配的自引用值无效。要解决此问题,我们必须禁止创建对自引用结构的&mut引用。

Pin<Box<T>>Unpin

Pinning API用Pin封装类型和Unpin标记trait的方式为&mut T问题提供了解决方案。其思路是为Pin类型中所有可用于从Unpintrait上获取其封装值&mut可变引用方法(例如get_mutderef_mut)设置关卡。Unpintrait是自动trait,即对于所有类型(显式声明退出的类型除外)都默认自动实现。而如果让自引用结构体显式声明退出Unpin,就再也没有(安全的)方法能够从Pin<Box<T>>类型中获取&mut T可变引用了。如此便能够保证它们的内部自引用始终有效。

让我们更新上面的SelfReferential类型以选择取消Unpin

1
2
3
4
5
6
use core::marker::PhantomPinned;

struct SelfReferential {
self_ptr: *const Self,
_pin: PhantomPinned,
}

通过为结构体添加类型为PhantomPinned的第二个字段_pin来退出trait。这是一个零大小的标记类型,唯一目的就是退出Unpintrait。鉴于自动trait的工作方式,只需一个非Unpin的字段就可以使整个结构体退出Unpin

第二步是将示例中的Box<SelfReferential>类型更改为Pin<Box<SelfReferential>>类型。最简单的方法是使用Box::pin函数代替Box::new来创建堆分配的值:

1
2
3
4
let mut heap_value = Box::pin(SelfReferential {
self_ptr: 0 as *const _,
_pin: PhantomPinned,
});

除了将Box::new改为Box::pin,还需要在结构体初始字段中添加新的_pin字段。由于PhantomPinned是零大小的类型,因此我们只需要使用其类型名称即可进行初始化。

现在,当我们尝试在play rust中运行修改后的示例代码,会发现代码有编译错误:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
error[E0594]: cannot assign to data in a dereference of `std::pin::Pin<std::boxed::Box<SelfReferential>>`
--> src/main.rs:10:5
|
10 | heap_value.self_ptr = ptr;
| ^^^^^^^^^^^^^^^^^^^^^^^^^ cannot assign
|
= help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `std::pin::Pin<std::boxed::Box<SelfReferential>>`

error[E0596]: cannot borrow data in a dereference of `std::pin::Pin<std::boxed::Box<SelfReferential>>` as mutable
--> src/main.rs:16:36
|
16 | let stack_value = mem::replace(&mut *heap_value, SelfReferential {
| ^^^^^^^^^^^^^^^^ cannot borrow as mutable
|
= help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `std::pin::Pin<std::boxed::Box<SelfReferential>>`

发生这两个错误都是由Pin<Box<SelfReferential>>类型不再实现DerefMuttrait引起的。这正是我们期望的,因为DerefMuttrait会返回一个&mut引用,而我们希望禁止该引用。这两个错误就是因为我们既退出了Unpin,又将Box::new更改为Box::pin

现在的问题是,编译器不仅禁止在第16行中移动该类型,而且还禁止在第10行中初始化self_ptr字段。之所以发生这种情况,是因为编译器无法区分&mut引用的有效使用与无效使用。为了修复初始化一行的报错,我们必须使用非安全的get_unchecked_mut方法:

1
2
3
4
5
// 此处操作是安全的,因为仅修改了结构体字段,而未进行整个结构体的移动
unsafe {
let mut_ref = Pin::as_mut(&mut heap_value);
Pin::get_unchecked_mut(mut_ref).self_ptr = ptr;
}

使用play rust在线运行上面的代码

get_unchecked_mut函数的参数应为Pin<&mut T>而不是Pin<Box<T>>,因此我们必须先使用Pin::as_mut做转换。然后,我们就可以使用get_unchecked_mut返回的&mut引用来设置self_ptr字段了。

现在剩下的错误就是我们期望的mem::replace一行的错误。请记住,此操作尝试将堆分配的值移动到栈上,这会破坏存储在self_ptr字段中的自引用。通过退出Unpin并使用Pin<Box<T>>,就可以在编译时禁止执行此类操作,从而可以安全地使用自引用结构体。如我们所见,编译器(目前)还不能检测创建的自引用结构体是否安全,因此我们需要使用非安全块并自己确保其正确性。

栈上的pinning与Pin<&mut T>

在上一节中,我们学习了如何使用Pin<Box<T>>安全地创建堆分配上的自引用结构体。尽管这种方法可以很好地工作并且相对安全(除了非安全的初始化),但方法所要求的堆分配会带来一些性能损失。由于Rust始终希望尽可能的提供零成本抽象,因此pinning API还允许创建指向栈分配值的Pin<&mut T>实例。

与拥有封装值所有权Pin<Box<T>>实例不同,Pin<&mut T>实例仅临时借用其封装的值。这使事情变得更加复杂,因为它需要程序员自己确保引用带来的附加的条件。最重要的是,在被引用T的整个生命周期中,Pin<&mut T>必须保持在pin的状态,这对于基于栈的变量来说会更加难以检查。为了处理这种问题,的确存在像pin-utils这样的crate,但是除非你真的知道自己在做什么,一般都不建议使用栈上的pinning。

要进一步阅读,请查看pin模块Pin::new_unchecked方法的文档。

Pinning和Future

正如我们在本文中已经看到的那样,Future::poll方法使用的pinning参数为Pin<&mut Self>形式:

1
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>

该方法采用self: Pin<&mut Self>而不是常规&mut self作参数的原因如前文所述,从async/await创建的future实例通常是自引用的。通过将Self封装为Pin,并使编译器令async/await生成的自引用future退出Unpin,可以确保future在两次poll调用之间不会在内存中移动。如此便可以确保所有内部引用仍然有效。

值得注意的是,在第一个poll调用之前,移动future是可行的。这是因为future是惰性的,它在第一次poll之前什么也不做。所以生成状态机的”Start”状态仅包含函数参数,而没有内部引用。为了调用poll,调用者必须先将future封装到Pin中,以确保future不会在内存中移动。由于正确的使用栈pinning会更加困难,因此建议始终结合使用Box::pinPin::as_mut

如果你有兴趣了解如何使用栈pinning来安全地实现future组合器函数,可以选择查看futurecrate中map组合器方法源码(该方法的源码相对简单),以及阅读有关pin文档中投影和结构的pinning部分。

执行器和唤醒器

使用async/await,可以最大限度的以完全异步的方式使用future。但是,正如我们从上面了解到的那样,future在被poll之前什么都不做。这意味着我们必须在某些时候发起对它们的poll,否则异步代码将永远不会执行。

对于单个future,我们总是可以使用上述循环手动等待每个future。但是,这种方式效率很低,而且对于那些创建大量future的程序实际上并不可行。针对此问题的最常见解决方案是定义一个全局执行器,以负责轮询系统中的所有future,直到它们全部完成。

执行器

执行器的作用是将future生成为任务,通常是使用一些spawn方法。然后,执行器负责轮询所有future,直到它们全部完成。集中管理所有future的最大好处是,只要future返回Poll::Pending,执行器就可以切换到另一个future。所以异步操作可以并行运行,CPU也因此持续作业。

许多执行器的实现也能够利用多核CPU系统的优点。这些实现会创建一个线程池,如果提供足够多的任务,线程池就能利用所有内核,并使用像work stealing之类的技术来平衡核心间的负载。嵌入式系统还会有一些特殊的执行器实现,专门为降低延迟和减少内存开销做了优化。

为了避免重复轮询future的开销,执行器通常还利用Rust的future支持的唤醒器API。

唤醒器

唤醒器API的思路是,将特殊的Waker类型封装在Context类型中,传递给每一次poll调用。这个Waker类型是由执行器创建的,异步任务可以用该类型表示任务已(部分)完成。于是,除非相应的唤醒器通知执行器,否则执行器都不需要在先前返回Poll::Pending的future上再进行poll调用了。

举个小例子可以很好地说明这一点:

1
2
3
async fn write_file() {
async_write_file("foo.txt", "Hello").await;
}

此函数将字符串”Hello”异步写入foo.txt文件。由于硬盘写入需要一些时间,因此该future上的第一次poll调用可能会返回Poll::Pending。硬盘驱动将在内部存储传递给poll调用的Waker,并在完成文件写入时使用它通知执行器。如此,执行器在收到唤醒器的通知前,都不需要再在这个future上尝试poll了。

我们将在本文的实现一节创建具有唤醒器支持的执行器,届时将详细的看到Waker类型的工作原理。

协作式多任务?

在本文的开头,我们讨论了抢先式多任务和协作式多任务。抢占式多任务依靠操作系统在运行中的任务间做强制切换,而协作式多任务则要求任务定期通过yield操作主动放弃对CPU的控制。协作式的最大优点是任务可以自己保存状态,从而更高效地进行上下文切换,并可以在任务间共享相同的调用栈。

这可能不够直观,不过future和async/await就是协作式多任务模式的实现:

  • 添加到执行器的每个future本质上都是一个协作任务。
  • Future不使用显式的yield操作,而是通过返回Poll::Pending(或完成时的Poll::Ready)来放弃对CPU核心的控制。
    • 没有什么可以强制future放弃CPU。如果它们愿意,就能够永不从poll中返回,如在一个无限循环中打转。
    • 由于每个future都可以阻止执行器中其他future的执行,因此我们需要信任它们不是恶意的。
  • Future在内部存储它在下一次poll调用时恢复执行所需的所有状态。使用async/await,编译器会自动检测所需的所有变量,并将其存储在生成的状态机中。
    • 仅保存恢复执行所需的最少状态。
    • 由于poll方法在返回时会放弃调用栈,因此该栈可用于poll其他的future。

我们看到future和async/await完美契合协作式多任务,区别就是使用了一些不同的术语。因此,在下文中,我们将交替使用术语“任务”和“future”。

实现

现在,我们了解了Rust中基于future和async/await的协作式多任务的工作原理,是时候向我们的内核中添加对多任务的支持了。由于Futuretrait是core库的一部分,而async/await是Rust语言本身的功能,因此在#![no_std]内核中使用无需其他操作就可以使用它们。唯一的要求是我们至少应使用Rust在2020-03-25之后的nightly版本,因为之前的async/await是不兼容no_std的。

只要使用较近的nightly版本,我们就可以在main.rs中使用async/await:

in src/main.rs
1
2
3
4
5
6
7
8
async fn async_number() -> u32 {
42
}

async fn example_task() {
let number = async_number().await;
println!("async number: {}", number);
}

async_number函数是一个async fn,因此编译器将其转换为实现了Futuretrait的状态机。由于该函数仅返回42,因此生成的future将在首次poll调用时直接返回Poll::Ready(42)。像async_number一样,example_task函数也是一个async fn。 它等待async_number返回的数字,然后使用println宏将其打印出来。

要运行example_task返回的future,我们需要一直对其调用poll,到它通过返回Poll::Ready告知其已完成为止。为此,我们需要创建一个简单的执行器类型。

任务

在实现执行器之前,我们要创建一个具有Task类型的新模块task

in src/lib.rs
1
pub mod task;
in src/task/mod.rs
1
2
3
4
5
6
use core::{future::Future, pin::Pin};
use alloc::boxed::Box;

pub struct Task {
future: Pin<Box<dyn Future<Output = ()>>>,
}

Task结构体是一个新的类型封装器,它封装了一个内存固定的、堆分配的、以空类型()作为输出的动态分发的future。让我们详细研究一下:

  • 我们要求与任务关联的future返回()。这意味着任务不会返回任何结果,只是为了执行其中的副作用。例如,我们上面定义的example_task函数也没有返回值,但是作为副作用,函数将一些信息打印在了屏幕上。
  • dyn关键字告诉我们存放在Box中的是一个trait对象。这意味着future上的方法是动态分发的,因此我们才可以在Task类型中存储不同类型的future。这很重要,因为每个async fn都有自己的future类型,而我们希望能够创建多个不同的任务。
  • 正如我们在pinning一节了解到的那样,Pin<Box>类型通过将值放在堆上并防止对值创建&mut引用来确保该值不会在内存中移动。这很重要,因为async/await生成的future可能是自引用的,即包含指向自身的指针,而该指针将在future移动时失效。

为了能够适应future创建Task结构体,我们编写一个new函数:

in src/task/mod.rs
1
2
3
4
5
6
7
impl Task {
pub fn new(future: impl Future<Output = ()> + 'static) -> Task {
Task {
future: Box::pin(future),
}
}
}

该函数接受输出类型为()的任意future,然后通过Box::pin函数将其固定在内存中。然后,函数将future封装在Task结构体中并返回。这里需要使用'static生命周期,因为返回的Task生存时间不确定,因此future也必须在该时间内保持有效。

我们还添加了一个poll方法,使执行器可以轮询存储的future:

in src/task/mod.rs
1
2
3
4
5
6
7
use core::task::{Context, Poll};

impl Task {
fn poll(&mut self, context: &mut Context) -> Poll<()> {
self.future.as_mut().poll(context)
}
}

由于Futuretrait的poll方法需要在Pin<&mut T>类型上调用,因此我们先要使用Pin::as_mut方法转换类型为Pin<Box<T>>self.future字段。然后,在转换后的self.future字段上调用poll并返回结果。由于Task::poll方法仅应由我们稍后创建的执行器调用,因此保持其为task模块的私有函数。

简单执行器

由于执行器可能非常复杂,因此我们有意在开始仅创建一个非常基本的执行器,然后再渐进的实现更多功能。为此,我们先创建一个新的task::simple_executor子模块:

in src/task/mod.rs
1
pub mod simple_executor;
in src/task/simple_executor.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
use super::Task;
use alloc::collections::VecDeque;

pub struct SimpleExecutor {
task_queue: VecDeque<Task>,
}

impl SimpleExecutor {
pub fn new() -> SimpleExecutor {
SimpleExecutor {
task_queue: VecDeque::new(),
}
}

pub fn spawn(&mut self, task: Task) {
self.task_queue.push_back(task)
}
}

该结构体包含一个VecDeque类型的字段task_queue,这是一个在两端都能执行push和pop操作的向量。使用这种类型的思路是,通过spawn在队尾方法插入新任务,并从队首弹出要执行的下一个任务。这样,我们就得到了一个简单的FIFO队列(“先进先出”)。

假唤醒器

为了调用poll方法,我们需要创建一个Context类型,用来封装Waker类型。简单起见,我们将首先创建一个不执行任何操作的假唤醒器。为此,我们创建了RawWaker实例,用以定义不同Waker方法的实现,然后使用Waker::from_raw函数将其转换为Waker

in src/task/simple_executor.rs
1
2
3
4
5
6
7
8
9
use core::task::{Waker, RawWaker};

fn dummy_raw_waker() -> RawWaker {
todo!();
}

fn dummy_waker() -> Waker {
unsafe { Waker::from_raw(dummy_raw_waker()) }
}

from_raw函数是非安全的,因为如果程序员不按照RawWaker文档的要求使用,就可能导致未定义的行为。在查看dummy_raw_waker函数的实现之前,我们首先尝试了解RawWaker类型的工作方式。

RawWaker

RawWaker类型要求程序员显式定义一个虚拟方法表(vtable),用以指定在克隆,唤醒、删除RawWaker时应调用的函数。vtable的布局由RawWakerVTable类型定义。每个函数都接收一个*const ()参数,该参数本质上是一个指向某结构体的擦除类型&self指针,例如在堆上的分配。使用*const ()指针而不是某适当引用的原因是RawWaker类型应该是非泛型的,但仍需支持任意类型。作为参数传递给函数的指针值为给RawWaker::newdata指针。

通常,RawWaker结构体是为封装在BoxArc类型中的某些堆分配而创建的。对于这些类型,可以使用Box::into_raw之类的方法将Box<T>转换为*const T指针。然后可以将该指针转换为匿名*const ()指针,再传递给RawWaker::new。由于每个vtable函数都使用相同的*const ()作为参数,因此这些函数可以安全地将指针转换回Box<T>&T以执行操作。可以想象,此过程非常危险,很容易出错导致未定义行为。因此,若非必要,则并不建议手动创建RawWaker

一个假的RawWaker

虽然不建议手动创建RawWaker,但目前还没有其他方法可以创建不执行任何操作的假Waker。幸运的是,不执行任何操作也会使得实现dummy_raw_waker函数变得相对安全:

in src/task/simple_executor.rs
1
2
3
4
5
6
7
8
9
10
11
use core::task::RawWakerVTable;

fn dummy_raw_waker() -> RawWaker {
fn no_op(_: *const ()) {}
fn clone(_: *const ()) -> RawWaker {
dummy_raw_waker()
}

let vtable = &RawWakerVTable::new(clone, no_op, no_op, no_op);
RawWaker::new(0 as *const (), vtable)
}

首先,我们定义两个名为no_opclone的内部函数。no_op函数接受* const()指针,不执行任何操作。clone函数也接受* const()指针,并通过再次调用dummy_raw_waker返回一个新的RawWaker。我们使用这两个函数来创建最小化的RawWakerVTableclone函数用于克隆操作,no_op函数用于其他所有操作。由于RawWaker不执行任何操作,因此从clone返回的是一个新RawWaker还是一个真正的克隆其实并不重要。

创建vtable之后,我们使用RawWaker::new函数创建RawWaker。传递的* const()无关紧要,因为并没有任何vtable的函数会使用它。于是我们只传递了一个空指针。

一个run方法

现在,我们有了创建Waker实例的方法,就可以使用它在执行器上实现run方法。最简单的run方法是在循环中不停的poll所有队列中的任务,直到所有任务均已完成。这并不高效,因为它没有利用Waker类型的通知功能,但是它是使示例能够运行起来的的最简单方法:

in src/task/simple_executor.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
use core::task::{Context, Poll};

impl SimpleExecutor {
pub fn run(&mut self) {
while let Some(mut task) = self.task_queue.pop_front() {
let waker = dummy_waker();
let mut context = Context::from_waker(&waker);
match task.poll(&mut context) {
Poll::Ready(()) => {} // 任务已完成
Poll::Pending => self.task_queue.push_back(task),
}
}
}
}

该函数使用while let循环来处理task_queue中的所有任务。对于每个任务,我们首先把由dummy_waker函数返回的Waker实例封装为Context类型。然后调用Task::poll方法并使用该context做参数。如果poll方法返回Poll :: Ready,则说明任务已完成,就可以继续执行下一个任务。如果任务仍然处于Poll::Pending,则将其再次添加到队尾,以便在后续的循环中再次轮询它。

尝试运行

使用我们的SimpleExecutor类型,现在就可以尝试在main.rs中运行example_task函数返回的任务了:

in src/main.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
use blog_os::task::{Task, simple_executor::SimpleExecutor};

fn kernel_main(boot_info: &'static BootInfo) -> ! {
// […] initialization routines, including `init_heap`

let mut executor = SimpleExecutor::new();
executor.spawn(Task::new(example_task()));
executor.run();

// […] test_main, "it did not crash" message, hlt_loop
}


// Below is the example_task function again so that you don't have to scroll up

async fn async_number() -> u32 {
42
}

async fn example_task() {
let number = async_number().await;
println!("async number: {}", number);
}

运行代码,将看到预期的消息”async number: 42“打印到屏幕上:

QEMU简单执行器

我们来总结一下示例中执行的各个步骤:

  • 首先,我们创建一个带有空task_queueSimpleExecutor类型实例。
  • 接下来,我们调用异步example_task函数,该函数返回的是future。将这个future封装在Task类型中,以将其移动到堆中并固定在内存中,然后通过spawn方法将该任务添加到执行器的task_queue中。
  • 然后,我们调用run方法以开始执行队列中的单个任务。这涉及:
    • task_queue队首弹出任务。
    • 为任务创建RawWaker,将其转换为Waker实例,然后用它创建Context实例。
    • 在任务的future上调用poll方法,并传入我们刚才创建的Context实例。
    • 由于example_task不会等待任何操作,因此可以直接运行到第一次poll调用为止。这就是打印”async number: 42“位置。
    • 由于example_task将立刻返回Poll::Ready,因此并不会将其再添加回任务队列。

task_queue为空后,run方法返回。之后,我们的kernel_main函数将继续执行,并打印”It did not crash!“信息。

异步键盘输入

这个简单执行器并不能利用Waker通知,只会循环遍历所有任务直到它们完成。对于我们的示例,这并不是问题,因为我们的example_task在首次执行poll时就可以直接完成。要能观察到一个正确的Waker实现所带来的性能优势,我们首先需要创建一个真正异步的任务,即可能会在首次poll调用中返回Poll::Pending的任务。

其实我们的系统中已经存在了一些异步特性:硬件中断。正如我们在硬件中断一文中了解到的那样,硬件中断可以在任意时刻发生,这是由某些外部设备所决定的。例如,在经过一段预定义的时间后,硬件计时器会将中断发送到CPU。当CPU接收到中断时,它会立即将控制权转移给在中断描述符表(IDT)中定义的相应处理函数。

下面,我们将基于键盘中断创建一个异步任务。键盘中断是一个很好的选择,因为它既具有不确定性又对延迟有很高的要求。不确定性意味着无法预测下一次按键的发生时间,因为这完全取决于用户。低延迟是指我们要及时处理键盘输入,否则用户会感到滞后。为了以一种更高效的方式支持此类任务,执行器就需要对Waker通知提供适当的支持。

键盘扫描码队列

我们当前仍是直接在中断处理程序中处理键盘输入。长远考虑这并不是一个好主意,因为中断处理程序应保持尽可能短,以免长时间中断重要工作。因此,中断处理程序应仅执行必要的最少量的工作(如键盘扫描码的读取),而将其余工作(如键盘扫描码的解释)留给后台任务。

将工作委派给后台任务的常见模式是创建某种队列。中断处理程序将工作单元推送到队列,而后台任务处理程序从队列中取出工作以进行处理。对于我们的键盘中断,就是中断处理程序仅从键盘读取扫描代码,并将其推送到队列,然后直接返回。键盘任务从队列的另一端取出扫描码,并对每个扫描码进行解释和处理:

扫描码队列

可以用一个由互斥锁保护的VecDeque来简单的实现该队列。但是,在中断处理程序中使用互斥锁并不是一个好主意,因为很容易导致死锁。例如,当用户在键盘任务锁定队列时又按下了某个键,中断处理程序将再次尝试获取该锁,这会导致死锁。这种方法的另一个问题是,当VecDeque已满时,就会通过执行新的堆分配来自动增加队列容量。这可能又会导致死锁,因为我们的分配器还在内部使用了互斥锁。还有更深层次的问题,当堆碎片化时,堆分配可能会花费大量时间甚至会分配失败。

为避免这些问题,我们需要一个push操作不涉及互斥量或堆分配的队列实现。这种队列可以通过使用无锁原子操作来实现元素的push和pop。如此,就可以创建只需要&self引用即可使用的pushpop操作,因此也无需互斥锁。为了避免在push时执行堆分配,可以通过预先分配的固定大小的缓冲区来支持队列空间扩展。尽管这会让队列有界(即有最大长度限制),但在实际操作中通常能够为队列长度定义一个合理的上限,这并不是一个大问题。

crossbeamcrate

以正确且高效的方式实现这样的队列是一件非常困难的事,因此建议使用经过良好测试的现有实现。crossbeam是一个流行的Rust项目,它实现了多种用于并发编程的无互斥类型。该crate提供的ArrayQueue正是我们需要的类型。更加幸运的是,该类型恰好与具有堆分配支持的no_stdcrate完全兼容。

要使用该类型,我们需要添加crossbeam-queuecrate依赖:

in Cargo.toml
1
2
3
4
[dependencies.crossbeam-queue]
version = "0.2.1"
default-features = false
features = ["alloc"]

该crate默认依赖标准库。为了使其与no_std兼容,我们需要禁用其默认特性并启用alloc特性。(请注意,在这里选择主crossbeamcrate作为依赖将不能工作,因为它缺少用于no_stdqueue模块导出。我们提交了一个pull请求来解决此问题,但该修改尚未在crates.io上发布。)

队列实现

有了ArrayQueue类型,我们就可以在新的task::keyboard模块中创建全局的扫描码队列了:

in src/task/mod.rs
1
pub mod keyboard;
in src/task/keyboard.rs
1
2
3
4
use conquer_once::spin::OnceCell;
use crossbeam_queue::ArrayQueue;

static SCANCODE_QUEUE: OnceCell<ArrayQueue<u8>> = OnceCell::uninit();

由于ArrayQueue::new需要执行堆分配,而这在编译时是不可行的(到目前为止),所以我们不能直接初始化该静态变量。为此,我们使用了conquer_oncecrate的OnceCell类型,以能够安全的执行一次性静态变量的初始化。为了使用该crate,我们需要将其作为依赖项添加到Cargo.toml中:

in Cargo.toml
1
2
3
[dependencies.conquer-once]
version = "0.2.0"
default-features = false

我们确实也可以在这里使用lazy_static宏来代替OnceCell。不过OnceCell类型的优点是能够确保初始化不会在中断处理程序中发生,从而阻止中断处理程序执行堆分配。

填充队列

为了填充扫描代队列,我们创建了一个新的add_scancode函数,以从中断处理程序中调用:

in src/task/keyboard.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
use crate::println;

/// 由键盘中断调用程序调用
///
/// 该函数不应阻塞或执行堆分配
pub(crate) fn add_scancode(scancode: u8) {
if let Ok(queue) = SCANCODE_QUEUE.try_get() {
if let Err(_) = queue.push(scancode) {
println!("WARNING: scancode queue full; dropping keyboard input");
}
} else {
println!("WARNING: scancode queue uninitialized");
}
}

使用OnceCell::try_get函数来获取对初始化队列的引用。如果队列尚未初始化,就忽略键盘扫描码并直接打印警告。要注意的是,不能在此函数中初始化队列,因为它将被中断处理程序调用,而我们不应该在中断处理的过程中执行堆分配。由于不应从main.rs中调用此函数,因此我们使用pub(crate)使该函数仅可在lib.rs中使用。

ArrayQueue::push方法只需&self引用即可执行,这使得我们能够非常简单的从静态队列上调用该方法。ArrayQueue类型本身执行所有必要的同步,因此这里不需要使用互斥类封装。如果队列已满,我们也会打印警告。

要在键盘中断上调用add_scancode函数,就需要在interrupts模块中更新keyboard_interrupt_handler函数:

in src/interrupts.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
extern "x86-interrupt" fn keyboard_interrupt_handler(
_stack_frame: &mut InterruptStackFrame
) {
use x86_64::instructions::port::Port;

let mut port = Port::new(0x60);
let scancode: u8 = unsafe { port.read() };
crate::task::keyboard::add_scancode(scancode); // new

unsafe {
PICS.lock()
.notify_end_of_interrupt(InterruptIndex::Keyboard.as_u8());
}
}

我们从该函数中删除了所有键盘事件处理的代码,并添加了对add_scancode函数的调用,其余代码保持不变。

正如预期,现在使用cargo run运行内核时,每次按键不再显示在屏幕上,取而代之的是,每次按键时都会看到队列未初始化的警告。

扫描码流

为了初始化SCANCODE_QUEUE并以异步方式从队列中读取扫描码,我们创建了一个新的ScancodeStream类型:

in src/task/keyboard.rs
1
2
3
4
5
6
7
8
9
10
11
pub struct ScancodeStream {
_private: (),
}

impl ScancodeStream {
pub fn new() -> Self {
SCANCODE_QUEUE.try_init_once(|| ArrayQueue::new(100))
.expect("ScancodeStream::new should only be called once");
ScancodeStream { _private: () }
}
}

_private字段用于阻止从本模块外构造结构体的行为。这使得new函数成为构造该类的唯一方法。在函数中,我们首先尝试初始化SCANCODE_QUEUE静态变量。如果它已初始化,我们就产生一个panic,如此确保只会存在一个ScancodeStream实例。

为了使扫描码可用于异步任务,下一步是实现一个类似poll的方法,以尝试从队列中弹出下一个扫描码。虽然这听上去似乎是我们应该为该类型实现Futuretrait,但在这里实际上并不是。问题在于,Futuretrait仅对单个异步值进行抽象,并且期望在它返回Poll::Ready之后不会再次调用poll方法。但是,我们的扫描码队列包含多个异步值,因此可以持续对其进行轮询。

Streamtrait

由于产生多个异步值的类型很常见,因此futurecrate为此类提供了有用的抽象:Streamtrait。其定义如下:

1
2
3
4
5
6
pub trait Stream {
type Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context)
-> Poll<Option<Self::Item>>;
}

此定义与Futuretrait非常相似,但有以下区别:

  • 关联的类型叫做Item而不是Output
  • 与返回Poll<Self::Item>poll方法不同,Streamtrait定义了一个返回Poll<Option<Self::Item>>poll_next方法(请注意附加的Option)。

此外还有一个语义上的区别:poll_next可以被重复调用,直到返回Poll::Ready(None)来表示流已完成。从这方面来看,该方法类似于Iterator::next方法,即也会在返回最后一个值之后就只返回None

实现Stream

让我们为ScancodeStream实现Streamtrait,以便使用异步方式提供SCANCODE_QUEUE中的值。为此,我们首先需要添加对futures-utilcrate的依赖,其中就包含Stream类型:

in Cargo.toml
1
2
3
4
[dependencies.futures-util]
version = "0.3.4"
default-features = false
features = ["alloc"]

我们禁用默认特性以使crate兼容no_std,并启用alloc特性以使其基于堆分配的类型可用(稍后将需要它)。(请注意,我们确实可以添加对主futurescrate的依赖,从而重新导出futures-utilcrate,但这将产生更多的依赖和更长的编译时间。)

现在我们可以导入并实现Streamtrait:

in src/task/keyboard.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
use core::{pin::Pin, task::{Poll, Context}};
use futures_util::stream::Stream;

impl Stream for ScancodeStream {
type Item = u8;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<u8>> {
let queue = SCANCODE_QUEUE.try_get().expect("not initialized");
match queue.pop() {
Ok(scancode) => Poll::Ready(Some(scancode)),
Err(crossbeam_queue::PopError) => Poll::Pending,
}
}
}

我们首先使用OnceCell::try_get方法来获取初始化的扫描码队列的引用。此操作应该不会失败,因为我们已在new函数中执行了队列初始化,所以可以放心的使用expect,以在未初始化队列时直接panic。接下来,我们使用ArrayQueue::pop方法尝试从队列中获取下一个元素。如果成功,我们将返回封装在Poll::Ready(Some(…))中的扫描代码。如果失败,则意味着队列已空。在这种情况下,我们返回Poll::Pending

唤醒器支持

Futures::poll方法类似,Stream::poll_next方法要求异步任务在返回Poll::Pending之后才就绪时通知执行器。如此,执行器就不需要再次轮询相同的任务,任务完成会自动通知执行器,这大大降低了等待任务时的性能开销。

要发送此类通知,任务应从参数Context引用中获取Waker并将其储存在某处。当任务就绪时,它应该在储存的Waker上调用wake方法,以通知执行者应该再次轮询该任务了。

原子化的唤醒器

要为我们的ScancodeStream实现Waker通知功能,就需要一个可以在两次轮询调用之间储存Waker的地方。不能将其存储为ScancodeStream的字段中,因为我们需要从add_scancode函数进行访问。解决方案是使用由Futures-utilcrate提供的AtomicWaker类型创建静态变量。与ArrayQueue类型类似,此类型基于原子化的指令,可以被安全地存放在静态变量中,并支持并发修改。

定义一个AtomicWaker类型的静态变量WAKER

in src/task/keyboard.rs
1
2
3
use futures_util::task::AtomicWaker;

static WAKER: AtomicWaker = AtomicWaker::new();

思路是poll_next的实现将当前的唤醒器存储在此静态变量中,当将新的扫描码添加到队列时,add_scancode函数将在其上调用wake函数。

储存唤醒器

poll/poll_next的协议要求任务在返回Poll::Pending时为传递的Waker注册唤醒。让我们修改poll_next的实现以满足此要求:

in src/task/keyboard.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
impl Stream for ScancodeStream {
type Item = u8;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<u8>> {
let queue = SCANCODE_QUEUE
.try_get()
.expect("scancode queue not initialized");

// fast path
if let Ok(scancode) = queue.pop() {
return Poll::Ready(Some(scancode));
}

WAKER.register(&cx.waker());
match queue.pop() {
Ok(scancode) => {
WAKER.take();
Poll::Ready(Some(scancode))
}
Err(crossbeam_queue::PopError) => Poll::Pending,
}
}
}

像以前一样,我们首先使用OnceCell::try_get函数来获取对初始化的扫描码队列的引用。然后,我们乐观地尝试从队列中pop任务并在成功时返回Poll::Ready。如此,就可以避免在队列非空时注册唤醒其的性能开销。

如果对queue.pop()的第一次调用并未成功,则该队列有可能为空。仅仅有可能为空,是因为中断处理程序也许是在轮询检查后立即异步填充了队列。由于此竞争条件可能在下一次轮询检查时再次发生,因此我们需要在第二次检查之前在WAKER静态变量中注册Waker。这样,尽管在Poll::Pending返回之前也可能发起唤醒,但是可以保证我们能够收到所有在轮询检查后推送的扫描码。

使用AtomicWaker::register函数注册Context参数中包含的Waker之后,我们再次尝试从队列中弹出任务。如果这次能成功,则返回Poll::Ready。此时还需要使用AtomicWaker::take删除这个注册的唤醒器,因为任务已完成就不再需要唤醒器通知了。如果queue.pop()又失败了,我们仍将像以前一样返回Poll::Pending,区别是这次注册了唤醒器。

请注意,对于没有(或尚未)返回Poll::Pending的任务,可以通过两种方式进行唤醒。一种方法是在唤醒立刻发生于返回Poll::Pending之前,那么就使用上面提到的竞争条件。另一种方法是由于注册了唤醒器使得队列不再为空时,返回Poll::Ready。由于这些假唤醒是无法避免的,因此执行器必须能够正确处理它们。

唤醒储存的唤醒器

为了唤醒存储的Waker,我们在add_scancode函数中添加对WAKER.wake()的调用:

in src/task/keyboard.rs
1
2
3
4
5
6
7
8
9
10
11
pub(crate) fn add_scancode(scancode: u8) {
if let Ok(queue) = SCANCODE_QUEUE.try_get() {
if let Err(_) = queue.push(scancode) {
println!("WARNING: scancode queue full; dropping keyboard input");
} else {
WAKER.wake(); // new
}
} else {
println!("WARNING: scancode queue uninitialized");
}
}

此处的唯一修改就是,如果成功推送到扫描码队列,就调用WAKER.wake()。如果静态变量WAKER中已经注册了一个唤醒器,则此方法将在其上调用同名的wake方法,以通知执行器。否则,该操作将为空操作,即没有任何效果。

重要的是,我们必须在推送到队列后才能调用wake,否则在队列仍然为空时可能会过早唤醒任务。一个可能的例子,就是一个多线程执行器在CPU的其他核心中并发的唤醒任务时。虽然现在内核还不支持线程,但也会尽快添加,我们当然不希望那时再出问题。

键盘任务

我们为ScancodeStream实现了Streamtrait,现在就可以创建异步键盘任务了:

n src/task/keyboard.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
use futures_util::stream::StreamExt;
use pc_keyboard::{layouts, DecodedKey, HandleControl, Keyboard, ScancodeSet1};
use crate::print;

pub async fn print_keypresses() {
let mut scancodes = ScancodeStream::new();
let mut keyboard = Keyboard::new(layouts::Us104Key, ScancodeSet1,
HandleControl::Ignore);

while let Some(scancode) = scancodes.next().await {
if let Ok(Some(key_event)) = keyboard.add_byte(scancode) {
if let Some(key) = keyboard.process_keyevent(key_event) {
match key {
DecodedKey::Unicode(character) => print!("{}", character),
DecodedKey::RawKey(key) => print!("{:?}", key),
}
}
}
}
}

上面的代码与本文修改之前在键盘中断处理程序中使用的代码非常相似。唯一的区别是,我们不是从I/O端口读取扫描码,而是从ScancodeStream中获取。为此,我们首先创建一个新的Scancode流,然后重复使用StreamExttrait所提供的next方法来获取Future,即为该流中的下一个元素。通过在其上使用await运算符,我们可以异步等待future的结果。

使用while let循环,直到流返回None表示结束为止。由于我们的poll_next方法从不返回None,因此这实际上是一个无休止的循环,即print_keypresses任务永远不会完成。

让我们在main.rs中的执行器中添加print_keypresses任务,以再次获得有效的键盘输入:

in src/main.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
use blog_os::task::keyboard; // new

fn kernel_main(boot_info: &'static BootInfo) -> ! {

// […] initialization routines, including init_heap, test_main

let mut executor = SimpleExecutor::new();
executor.spawn(Task::new(example_task()));
executor.spawn(Task::new(keyboard::print_keypresses())); // new
executor.run();

// […] "it did not crash" message, hlt_loop
}

现在执行cargo run,将看到键盘输入再次起作用:

QEMU键盘输出

如果你密切注意计算机的CPU使用率,就会发现QEMU进程会令CPU持续繁忙。发生这种情况是因为我们的SimpleExecutor在一个循环中不停地轮询任务。因此,即使我们没有按键盘上的任何键,执行器也会在print_keypresses任务上不停的调用poll,即使该任务并未取得任何进展而每次都会返回Poll::Pending

带有唤醒器支持的执行器

要解决性能问题,我们需要创建一个能够正确利用Waker通知的执行器。这样,当下一个键盘中断发生时,将通知执行器,因此就不需要再不停的轮询print_keypresses任务了。

任务ID

要创建能够正确支持唤醒器通知的执行器,第一步就是为每个任务分配唯一的ID。这是必需的,因为我们需要一种方法来指定希望唤醒的任务。首先,创建一个新的TaskId封装类型:

in src/task/mod.rs
1
2
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
struct TaskId(u64);

TaskId结构体其实是对u64的简单封装。我们为它派生了许多trait,使其可打印、复制、比较、排序。其中可排序很重要,因为我们接下来将使用TaskId作为BTreeMap的键。

要生成一个新唯一ID,需要创建一个TaskID::new函数:

in src/task/mod.rs
1
2
3
4
5
6
7
8
use core::sync::atomic::{AtomicU64, Ordering};

impl TaskId {
fn new() -> Self {
static NEXT_ID: AtomicU64 = AtomicU64::new(0);
TaskId(NEXT_ID.fetch_add(1, Ordering::Relaxed))
}
}

该函数使用AtomicU64类型的静态变量NEXT_ID来确保每个ID仅分配一次。fetch_add方法以原子化的方式自增,并在同一个原子操作中返回上一个值。这意味着即使并行调用TaskId::new方法,每个ID也会返回一次。Ordering参数定义是否允许编译器在指令流中对fetch_add操作重新排序。因为我们只要求ID是唯一的,所以在这种情况下,使用最弱的Relaxed排序就足够了。

现在,我们可以使用附加的id字段来扩展Task类型:

in src/task/mod.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
pub struct Task {
id: TaskId, // new
future: Pin<Box<dyn Future<Output = ()>>>,
}

impl Task {
pub fn new(future: impl Future<Output = ()> + 'static) -> Task {
Task {
id: TaskId::new(), // new
future: Box::pin(future),
}
}
}

新的id字段使唯一地命名任务成为可能,这是唤醒指定任务所必需的。

Executor类型

我们在task::executor模块中创建新的Executor类型:

in src/task/mod.rs
1
pub mod executor;
in src/task/executor.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
use super::{Task, TaskId};
use alloc::{collections::BTreeMap, sync::Arc};
use core::task::Waker;
use crossbeam_queue::ArrayQueue;

pub struct Executor {
tasks: BTreeMap<TaskId, Task>,
task_queue: Arc<ArrayQueue<TaskId>>,
waker_cache: BTreeMap<TaskId, Waker>,
}

impl Executor {
pub fn new() -> Self {
Executor {
tasks: BTreeMap::new(),
task_queue: Arc::new(ArrayQueue::new(100)),
waker_cache: BTreeMap::new(),
}
}
}

我们没有像在SimpleExecutor中那样将任务存储在VecDeque中,而是存放在一个包含任务ID的task_queue字段和包含实际Task实例的BTreeMap类型的tasks字段中。该映射由TaskId索引,以允许有效地恢复执行特定任务。

task_queue字段是有任务ID组成的ArrayQueue,并封装为实现了引用计数Arc类型。引用计数使得在多个所有者之间共享变量所有权成为可能。它在堆上分配变量,并计算对变量的活动引用数。当活动引用数达到零时,即不再需要该值,因此会将其释放。

我们将这种Arc<ArrayQueue>类型用于task_queue,以使其能够被执行者和唤醒者共享。这个思路是唤醒者将已唤醒任务的ID推送到队列中。而执行器位于队列的接收端,从task映射中按其ID取回已唤醒的任务,并运行任务。使用固定大小的队列而不是无限制队列(如SegQueue)的原因是,不应在中断处理程序中进行堆分配,将推送到此队列。

除了task_queuetasks映射外,Executor类型还具有一个waker_cache字段,它也是一个映射。在创建任务后,此映射将缓存任务的Waker。首先,可以使用同一个唤醒器对同一个任务做多次唤醒操作,这相较于每次唤醒均创建一个唤醒器而言更高效。其次,它确保唤醒器的引用计数不会在中断处理程序内被释放,而这可能会导致死锁(有关此问题的更多信息,请参见下文)。

要创建Executor,我们可以编写一个简单的new函数。我们为task_queue选择100的容量,这对可预见的未来而言已经足够了。如果我们的系统在某个时刻真的保有超过100个并发任务,也可以轻松地修改这个值。

生成任务

SimpleExecutor类似,我们也在Executor类型上编写一个spawn方法,该方法将给定任务添加到tasks映射,并通过将其ID推送到task_queue中来立刻执行唤醒:

in src/task/executor.rs
1
2
3
4
5
6
7
8
9
impl Executor {
pub fn spawn(&mut self, task: Task) {
let task_id = task.id;
if self.tasks.insert(task.id, task).is_some() {
panic!("task with same ID already in tasks");
}
self.task_queue.push(task_id).expect("queue full");
}
}

如果映射中已经存在具有相同ID的任务,则BTreeMap::insert方法将直接返回它。由于每个任务都有唯一的ID,理论上不应发生这种情况,因此若出现这种情况我们就产生panic,表明代码中存在错误。同样,当task_queue填满时也会产生panic,因为如果设置的队列大小合适,就不应该发生这种情况。

运行任务

要执行task_queue中的所有任务,我们创建一个私有的run_ready_tasks方法:

in src/task/executor.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
use core::task::{Context, Poll};

impl Executor {
fn run_ready_tasks(&mut self) {
// 解构`self`以避免借用检查出错
let Self {
tasks,
task_queue,
waker_cache,
} = self;

while let Ok(task_id) = task_queue.pop() {
let task = match tasks.get_mut(&task_id) {
Some(task) => task,
None => continue, // 任务已不存在
};
let waker = waker_cache
.entry(task_id)
.or_insert_with(|| TaskWaker::new(task_id, task_queue.clone()));
let mut context = Context::from_waker(waker);
match task.poll(&mut context) {
Poll::Ready(()) => {
// 任务完成 -> 删除任务及其缓存唤醒器
tasks.remove(&task_id);
waker_cache.remove(&task_id);
}
Poll::Pending => {}
}
}
}
}

该函数的基本思路与SimpleExecutor类似:遍历task_queue中的任务,为每个任务创建唤醒器,并轮询该任务。但我们并不直接将待处理的任务添加到task_queue的队尾,而是让TaskWaker负责将唤醒的任务添加回队列。稍后将显示该唤醒器类型的实现。

让我们研究一下run_ready_tasks方法的一些实现细节:

  • 我们将self解构为三个字段,以避免某些借用检查器错误。即我们的实现需要从闭包内部访问self.task_queue,而目前的闭包会尝试直接借用整个self。这本质上是借阅检查器自己的问题,且将在实现RFC 2229时解决。
  • 对于每个弹出的任务ID,我们从tasks映射中取出其对应任务的可变引用。由于我们的ScancodeStream会在检查任务是否需要休眠之前就注册唤醒器,因此可能会发生为已经不存在的任务执行唤醒的情况。在这种情况下,我们只需忽略唤醒并继续执行队列中的下一个ID即可。
  • 为了避免在每个轮询中新建唤醒器所带来的性能开销,我们在每个任务创建后将其唤醒器缓存到waker_cache映射中。为此,我们结合使用BTreeMap::entry方法和Entry::or_insert_with,若唤醒器尚不存在则新建一个,再获得其的可变引用。为了创建新的唤醒器,我们克隆了task_queue并将其与任务ID一起传给TaskWaker::new函数(其实现见下文)。由于task_queue封装在Arc中,因此clone只会增加其引用计数,而仍指向同一堆分配的队列。请注意,并非所有唤醒器的实现都可以进行这种重用,但是我们的TaskWaker实现是允许重复的。

任务完成时将返回Poll::Ready。在这种情况下,我们使用BTreeMap::remove方法从tasks映射中删除该任务。如果该任务还有缓存的唤醒器,我们也会将其移除。

唤醒器设计

唤醒器用于将已唤醒的任务ID推送到执行器的task_queue。我们创建一个新的TaskWaker结构体,以存储任务ID和一个task_queue引用:

in src/task/executor.rs
1
2
3
4
struct TaskWaker {
task_id: TaskId,
task_queue: Arc<ArrayQueue<TaskId>>,
}

由于task_queue的所有权由执行器和唤醒器共享,因此我们使用Arc封装类型来实现共享所有权的引用计数。

唤醒操作的实现非常简单:

in src/task/executor.rs
1
2
3
4
5
impl TaskWaker {
fn wake_task(&self) {
self.task_queue.push(self.task_id).expect("task_queue full");
}
}

我们将task_id推送到引用的task_queue中。由于对ArrayQueue类型的修改仅需要共享的引用,因此我们都不需要&mut self,仅用&self即可实现。

Waketrait

为了能够使用TaskWaker类型轮询future,我们首先要将其转为Waker实例。这是必需的,因为Future::poll方法使用Context的实例作参数,而该实例只能基于Waker类型构造。尽管确实可以通过提供一个RawWaker实现来做到这一点,但是使用基于ArcWaketrait,然后再用标准库提供的Fromtrait的实现来构造Waker,将会更简单且安全。

WakeTrait实现如下所示:

in src/task/executor.rs
1
2
3
4
5
6
7
8
9
10
11
use alloc::task::Wake;

impl Wake for TaskWaker {
fn wake(self: Arc<Self>) {
self.wake_task();
}

fn wake_by_ref(self: &Arc<Self>) {
self.wake_task();
}
}

由于唤醒器通常在执行器和异步任务之间共享,因此trait方法要求将Self的实例封装在Arc类型中,以实现所有权的引用计数。这意味着我们必须将TaskWaker放在Arc中才能进行调用。

wakewake_by_ref方法之间的区别在于,后者仅需要Arc的引用,而前者需要Arc的所有权,因此通常需要增加引用计数。并非所有类型都支持通过引用调用唤醒,因此方法wake_by_ref的实现是可选的,不过由于该方法可以避免不必要的引用计数修改,所以性能较高。对我们来说,可以在两个trait方法中都简单地继续调用wake_task函数,该函数仅需要一个共享的&self引用。

创建唤醒器

由于所有Arc封装的实现了Wakertrait的类型都支持使用From转换,因此我们现在可以实现Executor::run_ready_tasks方法所需的TaskWaker::new函数了:

in src/task/executor.rs
1
2
3
4
5
6
7
8
impl TaskWaker {
fn new(task_id: TaskId, task_queue: Arc<ArrayQueue<TaskId>>) -> Waker {
Waker::from(Arc::new(TaskWaker {
task_id,
task_queue,
}))
}
}

我们使用task_idtask_queue作为参数创建TaskWaker。然后,我们将TaskWaker封装在Arc中,并使用Waker::from将其转换为Waker。此from方法负责为TaskWaker类型构造RawWakerVTableRawWaker实例。这个from方法用于TaskWaker类型构造一个RawWakerVTable和一个RawWaker实例。如果你对它的详细工作方式感兴趣,请查看alloccrate中的实现

run方法

在实现了唤醒器之后,我们终于可以为执行器添加一个run方法了:

in src/task/executor.rs
1
2
3
4
5
6
7
impl Executor {
pub fn run(&mut self) -> ! {
loop {
self.run_ready_tasks();
}
}
}

此方法仅循环调用run_ready_tasks函数。理论上,当tasks映射为空时我们确实可以从令函数返回,不过这并不会发生因为keyboard_task永远不会完成,因此一个简单的loop就足够了。鉴于该函数永不返回,因此我们使用返回类型!以告诉编译器此为发散函数。

现在,我们可以更改kernel_main,以使用新的Executor来代替SimpleExecutor

in src/main.rs
1
2
3
4
5
6
7
8
9
10
use blog_os::task::executor::Executor; // new

fn kernel_main(boot_info: &'static BootInfo) -> ! {
// […] initialization routines, including init_heap, test_main

let mut executor = Executor::new(); // new
executor.spawn(Task::new(example_task()));
executor.spawn(Task::new(keyboard::print_keypresses()));
executor.run();
}

我们只需要更改导入类型和类型名称。由于run是发散函数,编译器知道它永不返回,因此我们不再需要在kernel_main函数末尾调用hlt_loop

当我们现在使用cargo run运行内核时,将看到键盘输入依然有效:

QEMU键盘输出

不过,QEMU的CPU使用率并没有降低。这是因为我们仍然使CPU始终保持忙碌状态。现在虽然在任务唤通知醒前都不再执行轮询,但仍在循环中检查task_queue。要解决此问题,我们需要在CPU没有其他工作时进入睡眠状态。

空闲时睡眠

基本思路就是在task_queue为空时执行hlt指令。该指令使CPU进入睡眠状态,直到下一个中断到达。CPU在中断后立即会再次激活,这确保了当中断处理程序向task_queue推送任务时,CPU仍然可以直接做出反应。

为了实现这一点,我们在执行器中创建一个新的sleep_if_idle方法,并从run方法中调用它:

in src/task/executor.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
impl Executor {
pub fn run(&mut self) -> ! {
loop {
self.run_ready_tasks();
self.sleep_if_idle(); // new
}
}

fn sleep_if_idle(&self) {
if self.task_queue.is_empty() {
x86_64::instructions::hlt();
}
}
}

由于我们在run_ready_tasks之后直接调用sleep_if_idle,这将一直循环直到task_queue为空,因此似乎无需再次检查队列。但是,硬件中断可能紧接着在run_ready_tasks返回之后发生,因此在调用sleep_if_idle函数时队列中可能还有一个新任务。仅当队列仍然为空时,我们才通过x86_64crate提供的instructions::hlt函数执行hlt指令,使CPU进入睡眠状态。

不幸的是,在此实现中仍然存在微妙的竞争条件。由于中断是异步的且可以随时发生,因此有可能在is_empty检查和hlt调用之间发生中断:

1
2
3
4
if self.task_queue.is_empty() {
/// <--- 这里也可能发生中断
x86_64::instructions::hlt();
}

万一此中断推送到task_queue,即使现在确实有一个就绪的任务,CPU也会进入睡眠状态。在最坏的情况下,这可能会将键盘中断的处理,延迟到下一次按键或下一个定时器中断。那么我们应该如何预防呢?

答案是在检查之前禁用CPU上的中断,并与hlt指令一起以原子化操作再启用中断。那么,此间发生的所有中断都将延迟到hlt指令之后,从而不会丢失任何唤醒。为了实现该方法,我们可以使用x86_64crate提供的interrupts::enable_and_hlt函数。

修改后的sleep_if_idle函数如下所示:

in src/task/executor.rs
1
2
3
4
5
6
7
8
9
10
11
12
impl Executor {
fn sleep_if_idle(&self) {
use x86_64::instructions::interrupts::{self, enable_and_hlt};

interrupts::disable();
if self.task_queue.is_empty() {
enable_and_hlt();
} else {
interrupts::enable();
}
}
}

为了避免竞争条件,我们在检查task_queue是否为空之前禁用中断。如果队列确实为空,我们将通过一个原子化操作,使用enable_and_hlt函数启用中断,同时让CPU进入睡眠。如果队列不再为空,则意味着在返回run_ready_tasks之后,中断会唤醒任务。在这种情况下,我们将再次启用中断,且不再hlt而直接继续让程序执行。

现在,执行器在没有任务的情况下能够正确地让CPU进入睡眠状态。我们可以看到,当再次使用cargo run运行内核时,QEMU进程的CPU利用率要低得多。

可能的扩展

我们的执行器现在可以高效地运行任务。它利用唤醒器通知来避免持续轮询等待的任务,并在无任何工作时使CPU进入睡眠状态。但是,我们的执行器仍然非常基础,而且可以通过许多方式来扩展功能:

  • 调度:目前,我们使用VecDeque类型为task_queue实现先进先出(FIFO)策略,这通常也称为循环调度。此策略对于其上的所有任务负载可能并不是最高效的。例如,优先考虑延迟敏感的任务或执行大量I/O的任务可能更加高效。有关更多信息,请参见Operating Systems: Three Easy Pieces一书中的scheduling chapter章节,或Wikipedia上关于调度的文章
  • 任务生成:我们的Executor::spawn方法当前需要&mut self引用,因此在启动run方法之后将不再可用。为了解决这个问题,我们可以再创建一个额外的Spawner类型,该类型与执行器共享某种队列,并允许从任务自己创建新任务。例如,队列可以直接使用task_queue,也可以是一个在执行器中循环检查的单独的队列。
  • 使用线程:我们尚不支持线程,但会在下一篇文章中添加。这将使我们能够在不同的线程中启动多个执行器实例。这种方法的优点是可以减少长时长任务带来的延迟,因为其他任务可以同时运行。这种方法还可以利用多个CPU内核。
  • 负载均衡:添加线程支持时,如何在执行器间分配任务,以确保所有CPU核心都能够被用到将变得很重要。常见的技术是work stealing

小结

在这篇文章的开始,我们介绍了多任务,并区分了抢占式多任务(强制性地定期中断正在运行的任务)和协作式多任务(使任务一直运行到自愿放弃对CPU的控制)之间的区别。

然后,我们探讨了Rust的async/await如何提供协作式多任务的语言级实现。Rust的实现基于基于轮询的Futuretrait,该特质抽象了异步任务。使用async/await,就可以像编写普通同步代码一样处理future。不同之处在于异步函数会再次返回Future,为了运行它,需要在某个时刻将其添加到执行器中。

在后台,编译器将async/await代码转换为状态机,每个.await操作对应一个可能的暂停点。通过利用程序对自身执行步骤的了解,编译器就能够为每个暂停点保存最小化的状态信息,从而使每个任务的内存消耗极小。而其中的一个挑战是,生成的状态机可能包含自引用结构体,例如,当异步函数的局部变量相互引用时。为了防止指针失效,Rust使用Pin类型来确保future在被第一次轮询后就不再能够在内存中移动了。

接下来介绍了如何实现,我们首先创建了一个非常简单的执行程序,它会在一个循环中持续轮询所有生成的任务,且完全不使用Waker类型。然后,我们通过实现异步键盘任务来展示了唤醒通知的优点。该任务使用crossbeamcrate提供的无互斥的ArrayQueue类型定义了一个静态变量SCANCODE_QUEUE。现在,键盘中断处理程序不会再直接处理按键,而会将所有接收到的扫描代码放入队列中,然后唤醒已注册的Waker,以发送信号表示有新的输入可用。在接收端,我们创建了一个ScancodeStream类型,以提供一个用于获取队列中下一个scancodeFuture。这样就可以创建一个异步print_keypresses任务,以使用async/await来解释并打印队列中的扫描码。

为了利用键盘任务的唤醒通知,我们创建了一个新的Executor类型,该类型使用Arc封装task_queue,以使得就绪任务可共享。我们实现了一个TaskWaker类型,以将唤醒的任务的ID直接推送到task_queue,然后使用执行器进行轮询。为了在空闲时时节省CPU,我们利用hlt指令添加了对CPU在空闲时睡眠的支持。最后,我们讨论了执行器的一些潜在扩展,例如,提供多核支持。

下期预告

我们利用async/await使内核具备了对协作多任务的基本支持。尽管协作式多任务处理非常有效,但是当单个任务运行太长时间并导致其他任务无法运行时,系统就会出现延迟。因此,在我们的内核中添加对抢占式多任务的支持也是合理的。

在下一篇文章中,我们将介绍抢占式多任务处理的最常见形式,线程。除了解决长时长任务的问题,线程还能够利用多个CPU核心,并支持运行不受信任的用户程序。

支持本项目

创建和维护这个博客和相关库是一项繁重的工作,但我真的很喜欢。通过支持我,您可以让我在新内容、新功能和持续维护上投入更多时间。

支持我的最好方式是在GitHub上赞助我,因为他们不收取任何中间费用。如果你喜欢其他平台,我也有PatreonDonorbox账户。后者是最灵活的,因为它支持多种货币和一次性捐款。

感谢您的支持!

评论

Your browser is out-of-date!

Update your browser to view this website correctly.&npsb;Update my browser now

×