PingCap的Rust训练课程5:异步编程

前言

任务:通过自定义协议,使用异步网络创建一个多线程、持久化键/值存储的服务端和客户端。

目标

  • 了解在Rust中编写future时使用基本模式
  • 了解future的错误处理
  • 学会调试类型系统
  • 使用Tokio运行时执行异步网络
  • 使用boxed future处理棘手的类型系统问题
  • 使用impl Trait创建匿名Future类型

关键词:异步、future、tokio、impl Trait

介绍

注意:本文仅仅是本项目的提纲,并没有写完。如果您已经读到这里了,请电子邮件至brian@pingcap.com以提醒我,我将尽快写写完本文。

在本项目中,您将创建一个使用自定义协议通信的简单键/值服务端和客户端。该服务端将基于Tokio运行时构建的异步网络。键/值引擎的日志文件读写部分依然使用同步模式,在基础线程池上进行调度工作,并提供一个异步接口。在此过程中,您将尝试使用多种方法多种定义和使用future类型。

因为学习使用Rust中的futures编程有较大的挑战性,而且相关文档目前仍然较少,因此该项目的范围相对有限,相关的解释也比之前的项目更加直接。

请务必阅读有关此项目的背景资料。而如果您在本项目中感觉力不从心,那就让自己放松,休息一下,然后再试试。对于每个人来说,在Rust使用异步编程都是比较困难的。

项目需求规格

cargo项目kvs建立了一个名为kvs-client的命令行键值存储客户端,和一个名为kvs-server的键值存储服务端,二者又都调用了一个名为kvs的库。客户端通过一个自定义协议与服务端通信。

CLI参数与上一个项目中的相同。引擎实现大致相同,即通过线程池分发同步的文件I/O作业。

本项目的不同之处在于,所有的网络操作都是异步执行的。

为了实现异步操作,KvsClient将提供基于future的API,而KvsEngine trait也将提供基于future的API,尽管它是通过线程池使用同步(阻塞)I/O实现的。

您的KvsServer将基于tokio运行时,tokio将负责把异步作业分发给自己的多个线程(tokio自带的线程池)。这意味着您的架构中实际包含了两层线程池,第一层用来异步的处理网络请求,每个线程占用一个核心;第二层用来同步的处理文件I/O,使用充足的线程以保持负责处理网络的线程持续忙碌。

这种架构带来的变化,就是作业将从多个线程中生成到您的线程池,您的ThreadPool trait及其实现将变为实现了Clone + Send + 'sync的共享类型,就像KvsEngine那样。

因为您将实验多种不同的future返回方式,这里不再一一列出定义,而是在需要时定义。

更具体地说,您将使用以下所有函数签名:

  • Client::get(&mut self, key: String) -> Box<Future<Item = Option<String>, Error = Error>
  • Client::get(&mut self, key: String) -> future::SomeExplicitCombinator<...>
  • Client::get(&mut self, key: String) -> impl Future<Item = Option<String>, Error = Error>
  • Client::get(&mut self, key: String) -> ClientGetFuture

项目设置

继续上一个项目,删除之前的测试目录,并复制本项目的测试目录。这个项目应该包含一个名为kvs的库,以及两个可执行文件,kvs-serverkvs-client

Cargo.toml中需要以下dev-dependencies:

Cargo.toml
1
2
3
4
5
6
7
8
9
[dev-dependencies]
assert_cmd = "0.11"
criterion = "0.2.11"
crossbeam-utils = "0.6.5"
predicates = "1.0.0"
rand = "0.6.5"
tempfile = "3.0.7"
walkdir = "2.2.7"
panic-control = "0.1.4"

与之前的项目不同,不必费心编写足够的类型定义来编译测试套件。因为这样做将会一次向前跳过多个步骤。文本将明确指出何时使用测试套件。

知识背景:深入思考Rust中的future

  • 为什么使用future?网络 vs 文件/io,阻塞 vs 非阻塞,同步 vs 异步
  • 从用户角度看future(不是以poll为核心的实现)
  • 关于执行器和运行时的技术细节,不需要思考过多
  • 考虑函数调用链及其如何转换为future类型
  • 如何调试Rust类型
  • Result vs Future vs FutureResult
  • future的错误处理
  • 确定的future vs 智能指针future vs 匿名future
  • 关于future 0.1和future 0.3 的注意事项(我们将使用future 0.1)
  • 关于async/await的注意事项

第1部分:将tokio引入客户端

最终我们要将客户端和服务器都转换为future,由于客户端非常简单,我们就从这里入手。我们将首先在现有的同步KvsClient上,介绍tokio运行时。

对于客户端,我们将在保持同步KvsClient的同时引入异步运行时,然后再改造KvsClient

KvsClientconnect方法。

请注意,作为一个库,KvsClient可以提供基于futures的最高效率,但是我们的kvs-client可执行文件并没有利用它,所以这个可执行文件运行单个future就立刻退出的行为看起来有点傻。

TODO @sticnarf - 看看您是否可以编写与具体未来类型无关的测试用例,以便它们与以下所有策略一起使用。

第2部分:将KvsClient转换为智能指针future

转为future类型时最省事的方式。

第3部分:具有显式future类型的KvsClient

只是为了体验一下它是多么不靠谱。

第4部分:具有匿名future类型的KvsClient

最终的解决方案

第5部分:使ThreadPool可共享

第6部分:将KvsEngine转换为future

对于服务器,我们将做在客户端中相反的事,为KvsEngine编写一个异步接口。这将表明future和底层运行时是相互独立的,并提供了一系列经验。

第7部分:使用tokio驱动KvsEngine

请注意,即使我们自己编写的异步代码很少,tokio本身也在num_cpus个线程间分发异步作业。权衡将CPU密集型作业直接放在网络线程或文件线程上的利弊,例如,将序列化操作放在哪里?

扩展1:使用tokio-fs替换同步文件I/O

干得漂亮,朋友。休息一下吧。

1 Tokio概述


Tokio是Rust的异步运行时,提供了编写网络应用程序所需的模块。Tokio可灵活部署在大多数系统中,从具有数十个内核的大型服务器到小型嵌入式设备。

概括的说,Tokio提供了几个主要组件:

  • 用于执行异步代码的多线程运行时。
  • 标准库的异步版本。
  • 一个庞大的相关库生态系统。

Tokio 在您的项目中的角色

当您以异步方式编写应用程序时,您可以通过降低同时做许多事情的成本来提升性能。但是,异步Rust代码不会自行运行,因此您必须选择一个运行时来执行它们。Tokio库是应用最广泛的运行时,其使用量超过了所有其他运行时的总和。

此外,Tokio提供了许多有用的实用工具。编写异步代码时,不能使用Rust标准库提供的普通阻塞API,而必须使用它们的异步版本。这些替代版本由Tokio提供,在有意义时这些API将于Rust标准库API保持一致。

Tokio的优势

本章将概述Tokio的一些优点。

快速

Tokio很,基于Rust,而Rust本身也很快。遵循Rust精神,即目标是用户不应该通过手动编写等效代码来提高性能。

Tokio可扩展,基于async/await语言特性,本身就是可扩展的。在处理网络时,由于延迟,处理连接的速度受到限制,因此扩展的唯一方法是一次处理多个连接。使用async/await语言特性,可以非常方便的增加并发操作的数量,允许您扩展到大规模并发任务。

可靠

Tokio基于Rust编写,而Rust是一种可以让开发者编写可靠且高效软件的语言。大量研究表明,大约70%的高危安全漏洞是由内存安全引起的。使用Rust可以消除应用程序中的这一类错误。

Tokio还非常注重提供一致的行为。Tokio的主要目标是允许用户部署行为可预测的软件,这些软件将日复一日地执行,具有可靠的响应时间,杜绝不可预测的延迟峰值。

简单

借助Rust的async/await特性,编写异步应用程序的复杂性已大大降低。结合Tokio的实用工具和充满活力的生态系统,编写应用程序变得轻而易举。

Tokio在有意义的情况下遵循标准库的命名约定,以使用户可以轻松地将使用标准库编写的代码转换为使用Tokio编写的代码。借助Rust的强大类型系统,轻松交付正确代码的能力是无与伦比的。

灵活

Tokio提供了多种运行时架构。从多线程的工作窃取运行时,到轻量级的单线程运行时,应有尽有。这些运行时中的每一个都带有许多可配置参数,以允许用户根据自己的需要调整它们。

不适用Tokio的场景

尽管Tokio对于许多需要同时做很多事情的项目很有用,但也有一些用例不适合Tokio。

  • 通过在多个线程上并发执行以提速的CPU密集型计算。Tokio专为IO密集型应用程序而设计,其中每个单独的任务大部分时间都在等待IO。如果您的应用程序唯一要做的就是并发计算,那么您应该使用rayon。不过,如果您两种任务都做,仍然可以混合使用Tokio。
  • 读取大量文件。尽管Tokio看上去对于只需要读取大量文件的项目很有用,但与普通线程池相比,Tokio在这里没有任何优势。因为操作系统通常不提供异步文件API。
  • 发送单个Web请求。Tokio的优势是在需要同时做很多事情时。如果你需要使用一个用于异步Rust的库,例如reqwest,但你不需要一次做很多事情,你应该更喜欢那个库的阻塞版本,因为它会让你的项目更简单。当然,使用Tokio仍然有效,但与阻塞API相比并没有真正的优势。如果库不提供阻塞API,请参阅桥接同步代码章节。

获得帮助

在任何时候,如果您遇到困难,您总是可以在DiscordGitHub讨论中获得帮助。请不要担心问“初学者”问题。我们都是从某个地方开始,并乐于提供帮助。

2 环境设置


本教程将带您逐步完成构建Redis客户端和服务器的过程。我们将从使用Rust进行异步编程的基础知识开始,并从那里开始构建。我们将实现Redis命令的一个子集,以全面了解Tokio。

Mini-Redis

您将在本教程中构建的项目在GitHub上以Mini-Redis 的形式提供。Mini-Redis的主要目标是学习Tokio,也得到了很好的评价,但这也意味着Mini-Redis缺少一些您在真正的Redis库中需要的功能。您可以在crates.io上找到可部署于生产环境的Redis库。

我们将在教程中直接使用Mini-Redis,也就是在教程后面部分实现Mini-Redis之前就提前使用。

先修课

我们假设读者熟悉Rust《Rust Book》就是很好的启蒙读物。

虽然不是必须的,但使用Rust标准库或其他语言编写网络代码的经验将会有所帮助。

我们不需要读者熟悉Redis。

Rust

在教程开始之前,您应该确保安装了Rust工具链。如果没有,最简单的安装方法是使用rustup

教程需要不低于1.45.0的Rust版本,建议使用最新的Rust版本。

检查计算机上安装的Rust版本,执行:

1
rustc --version

您将会看到类似rustc 1.46.0 (04488afe3 2020-08-24)的输出。

Mini-Redis server

接下来,安装Mini-Redis server,以在我们完成客户端编写时进行测试。

1
cargo install mini-redis

启动服务器以确定安装成功:

1
mini-redis-server

然后,在另一个终端窗口中,尝试使用mini-redis-cli获取键foo的值。

1
mini-redis-cli get foo

你应该会看到输出为(nil)

万事俱备

如此,一切就绪。阅读下一章以编写您的第一个异步Rust应用程序。

3 Hello Tokio


作为开胃菜,我们将编写一个非常基础的Rust应用。它将连接到Mini-Redis server,将键Hello的值设置为World,然后再读回键。这将使用Mini-Redis client库完成。

代码

初始化新crate

让我们从生成一个新的Rust App开始:

1
2
cargo new my-redis
cd my-redis

添加依赖

然后,向Cargo.toml[dependencies]中添加以下依赖:

Cargo.toml
1
2
tokio = { version = "1", features = ["full"] }
mini-redis = "0.4"

编写代码

main.rs编写如下代码:

main.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
use mini_redis::{client, Result};

#[tokio::main]
async fn main() -> Result<()> {
// Open a connection to the mini-redis address.
let mut client = client::connect("127.0.0.1:6379").await?;

// Set the key "hello" with value "world"
client.set("hello", "world".into()).await?;

// Get key "hello"
let result = client.get("hello").await?;

println!("got value from the server; result={:?}", result);

Ok(())
}

确保Mini-Redis server正在运行,在单独的终端中执行:

1
mini-redis-server

如果你还没有安装mini-redis,执行:

1
cargo install mini-redis

现在,运行my-redis:

1
2
cargo run
got value from the server; result=Some(b"world")

执行成功!

你可以在这里找到全部代码。

代码详解

让我们花一些时间来理解刚刚做的事情。虽然没有太多代码,但执行了很多动作。

1
let mut client = client::connect("127.0.0.1:6379").await?;

client::connect函数由mini-Redis Crate提供。它将异步的与指定的远端地址建立TCP连接,并在链接成功建立时返回client句柄。虽然这里执行了一个异步操作,但我们编写的代码看起来跟同步代码没什么区别。只有.await动作表明这个操作是异步的。

什么是异步编程?

大多数计算机程序会以与代码编写相同的顺序执行。先执行第一行,再执行下一行,依此类推。在同步编程中,当程序遇到无法立即完成的操作时,它将被阻塞直到操作完成。例如,建立TCP连接需要与网络上的其他节点交换数据,这可能需要大量时间,而在此期间,线程将被阻塞。

而在异步编程中,无法立即完成的操作在后台挂起。该线程没有被阻止,并且可以继续执行其他任务。一旦操作完成,挂起的任务就会恢复,并从挂起的位置继续执行。我们之前的示例中只有一个任务,因此在挂起时什么都没有发生,但是真正的异步程序通常具有大量此类任务。

尽管异步编程可以大幅提升应用程序的执行速度,但通常也会提升程序的复杂度。要求程序员在异步操作完成后跟踪所有可能的状态以恢复工作。以程序员的经验来说,这是一项繁琐且容易出错的任务。

编译时绿色线程

Rust使用称为async/await的特性来实现异步编程。执行异步操作的函数将用async关键字标记。在我们的示例中,connect函数是这样定义的:

1
2
3
4
5
6
7
use mini_redis::Result;
use mini_redis::client::Client;
use tokio::net::ToSocketAddrs;

pub async fn connect<T: ToSocketAddrs>(addr: T) -> Result<Client> {
// ...
}

async fn定义看起来像是常规的同步函数,但其实执行的是异步操作。Rust在编译时async fn转换为一个异步运行的例程。在async fn中的任何.await动作都会将控制权转回线程,以使线程再异步操作挂起在后台的时间里继续执行其他工作。

虽然其他语言也实现了async/await,但Rust采用了更独特的方法。最基础的,Rust的异步是惰性操作,这将会导致与其他语言不同的运行时语义。

如果这个解释不够明确,请不要担心。在整个指南中,我们将探索更多关于async/await的内容。

使用async/await

在Rust中调用异步函数与调用普通函数没什么区别。但是,仅调用这些函数并不会执行函数体,而是会返回一个值用来表示async fn的操作。这在概念上类似于一个无参闭包。要实际运行该操作,您应该在返回值上使用.await运算符。

以下面的代码为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
async fn say_world() {
println!("world");
}

#[tokio::main]
async fn main() {
// Calling `say_world()` does not execute the body of `say_world()`.
let op = say_world();

// This println! comes first
println!("hello");

// Calling `.await` on `op` starts executing `say_world`.
op.await;
}

输出为:

1
2
hello
world

async fn的返回值是一个实现了Future trait的匿名类型。

异步main函数

用于启动异步应用程序的main函数,与大多数Rust crate中的常用main函数不同。

  1. 这是一个async fn
  2. 带有#[tokio::main]标记

当我们想要进入异步上下文时,需要使用async fn。但是,异步函数必须由一个运行时执行。该运行时包含异步任务调度器,提供事件I/O、定时器等特性。运行时不会自动启动,所以需要main函数启动它。

#[tokio::main]函数是一个宏。它将async fn main()转换为同步fn main(),初始化一个运行时实例并执行异步main函数。

例如,下面的代码:

1
2
3
4
#[tokio::main]
async fn main() {
println!("hello");
}

将转换为:

1
2
3
4
5
6
fn main() {
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
println!("hello");
})
}

后面将会介绍关于Tokio运行时的详细信息。

Cargo特性

本教程依赖于Tokio,并启用了full特性:

1
tokio = { version = "1", features = ["full"] }

Tokio有很多功能(TCP、UDP、Unix sockets、计时器、同步实用程序、多种调度程序等)。并非所有应用程序都需要完整的Tokio功能。在尝试优化编译时间或最终程序的大小时,用户可以只选择需要使用的功能。

在本教程中,选择Tokio依赖时将使用full功能。

4 生成并发


我们将更换目标,开始研究Redis server端。

首先,将上一章客户端的SET/GET代码移至example文件。这样,就可以在我们的服务端上进行测试了。

1
2
mkdir -p examples
mv src/main.rs examples/hello-redis.rs

然后创建一个新的空src/main.rs文件。

接受sockets

我们的Redis服务端需要做的第一件事是接受入站TCP sockets,即使用tokio::net::TcpListener

Tokio的许多类型的名称,与这些类型在Rust标准库中的同步版本相同。如果定义合理,Tokio将暴露与std完全相同的API,只不过是使用async fn

TcpListener绑定到6379端口,并在循环中接受套sockets。每个socket都将被处理然后被关闭。现在,我们将读取命令,将其打印到标准输出并响应为一个错误。

src/main.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
use tokio::net::{TcpListener, TcpStream};
use mini_redis::{Connection, Frame};

#[tokio::main]
async fn main() {
// Bind the listener to the address
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

loop {
// The second item contains the IP and port of the new connection.
let (socket, _) = listener.accept().await.unwrap();
process(socket).await;
}
}

async fn process(socket: TcpStream) {
// The `Connection` lets us read/write redis **frames** instead of
// byte streams. The `Connection` type is defined by mini-redis.
let mut connection = Connection::new(socket);

if let Some(frame) = connection.read_frame().await.unwrap() {
println!("GOT: {:?}", frame);

// Respond with an error
let response = Frame::Error("unimplemented".to_string());
connection.write_frame(&response).await.unwrap();
}
}

现在运行这个循环:

1
cargo run

在另一个终端里运行hello-redis的示例(即上一章那个SET/GET命令的例子):

1
cargo run --example hello-redis

输出应为:

1
Error: "unimplemented"

在服务端的终端窗口输出应为:

1
GOT: Array([Bulk(b"set"), Bulk(b"hello"), Bulk(b"world")])

并发

我们的服务端仍有个小问题(除了只响应错误)。它一次仅处理一个入站请求,当一个连接被接受时,服务器将停留在接受循环代码块内,直到响应完全写入socket。

我们希望Redis服务端能够并发处理大量请求,为此,我们需要添加一些并发性。

并发和并行不是一回事。如果您在两个任务之间交替进行,那么您是在并发处理这两个任务,而不是并行处理。如果是并行,您需要两个人,每个人专门负责一项任务。
使用Tokio的优点之一,是异步代码允许您同时处理许多任务,而不必新建线程并行处理它们。事实上,Tokio可以在一个线程上并发运行多个任务!

为了并发处理连接,我们将为每个入站连接生成一个新任务,并在此任务上处理连接。

接受连接的循环变为:

src/main.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
use tokio::net::TcpListener;

#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

loop {
let (socket, _) = listener.accept().await.unwrap();
// A new task is spawned for each inbound socket. The socket is
// moved to the new task and processed there.
tokio::spawn(async move {
process(socket).await;
});
}
}

任务

Tokio任务是一个异步绿色线程,通过将async块传给tokio::spawn创建。tokio::spawn函数返回一个JoinHandle,调用者可以使用它与生成的任务进行交互。async块可能会有返回值。调用者可以在JoinHandle上使用.await获取返回值。

例如:

1
2
3
4
5
6
7
8
9
10
11
12
#[tokio::main]
async fn main() {
let handle = tokio::spawn(async {
// Do some async work
"return value"
});

// Do some other work

let out = handle.await.unwrap();
println!("GOT {}", out);
}

等待JoinHandle返回一个Result。当任务在执行过程中遇到错误时,JoinHandle会返回一个Err,这将在任务panic、或任务因运行时关闭被强制取消时发生。

调度程序以任务为单位管理执行,生成任务将提交给Tokio调度程序,调度程序将确保在有工作要做时执行该任务。生成的任务可以在其生成的线程上执行,也可以在运行时其他线程上执行,该任务也可以在产生后在线程间移动。

Tokio中的任务非常轻量。实际上在底层,任务只需要一次64字节的内存分配。应用程序大可放心生成数千个乃至数百万个任务。

'static限制

当您在Tokio运行时生成任务时,其类型的生命周期必须是'static。这意味着生成的任务不得引用任务作用域外的任何数据。

'static总是意味着“永远存在”是一个常见的误解,事实并非如此。仅仅因为一个值是'static并不意味着你有内存泄漏。你可以在常见的Rust生命周期误解中了解到更多内容。

比如下面的代码将无法通过编译:

1
2
3
4
5
6
7
8
9
10
use tokio::task;

#[tokio::main]
async fn main() {
let v = vec![1, 2, 3];

task::spawn(async {
println!("Here's a vec: {:?}", v);
});
}

尝试编译将会产生如下错误:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
error[E0373]: async block may outlive the current function, but
it borrows `v`, which is owned by the current function
--> src/main.rs:7:23
|
7 | task::spawn(async {
| _______________________^
8 | | println!("Here's a vec: {:?}", v);
| | - `v` is borrowed here
9 | | });
| |_____^ may outlive borrowed value `v`
|
note: function requires argument type to outlive `'static`
--> src/main.rs:7:17
|
7 | task::spawn(async {
| _________________^
8 | | println!("Here's a vector: {:?}", v);
9 | | });
| |_____^
help: to force the async block to take ownership of `v` (and any other
referenced variables), use the `move` keyword
|
7 | task::spawn(async move {
8 | println!("Here's a vec: {:?}", v);
9 | });
|

发生这种情况是因为默认情况下,变量所有权不会移动到异步代码块中。v向量仍然归主函数所有,而println!一行却借用v。rust编译器向我们解释这一点,甚至建议将第7行更改为task::spawn(async move {,以告诉编译器将v移动到生成的任务中。如此,该任务拥有其需要的所有数据,使其成为'static

如果必须从多个任务同时访问单个数据,则该数据需要通过诸如Arc之类的同步原语来共享。

注意到错误消息说,参数类型超过了'static生命周期。这个术语可能会相当迷惑,因为'static生命周期一直持续到程序结束,所以如果超过它,不就是有内存泄漏吗?实际上这个消息说的是参数的类型,而不是其值必须超过'static生命周期,并且值可能在其类型不再有效之前就被销毁。

当我们说一个值是'static时,意味着永远保持该值不会是错的。这很重要,因为编译器无法推断新生成的任务会保留多长时间。我们必须确保允许任务永远存在,以便Tokio可以在需要时让任务运行起来。

上面信息框中链接的文章使用术语“受'static限制”,而不是“它的类型比'static寿命长”或“值是'static”来指代T: 'static。这些都意味着同样的事情,但与&'static T中的“用'static'标识”不同。

Send限制

tokio::spawn生成的任务必须实现Send。这允许Tokio运行时在任务被.await挂起时,在线程之间移动它们。

.await调用保存的所有数据都是Send时,任务也是Send的,这有点微妙。当调用.await时,任务将控制权转交给调度程序,在下次执行任务时,它会从上次转交的点恢复。为了确保这个策略生效,在.await之后使用的所有状态都必须由任务保存。如果这些状态为Send的,即可以跨线程移动,则任务本身就可以跨线程移动。相反,如果状态不是Send的,那么任务也不是。

这里的代码是有效的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
use tokio::task::yield_now;
use std::rc::Rc;

#[tokio::main]
async fn main() {
tokio::spawn(async {
// The scope forces `rc` to drop before `.await`.
{
let rc = Rc::new("hello");
println!("{}", rc);
}

// `rc` is no longer used. It is **not** persisted when
// the task yields to the scheduler
yield_now().await;
});
}

这里则是无效的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
use tokio::task::yield_now;
use std::rc::Rc;

#[tokio::main]
async fn main() {
tokio::spawn(async {
let rc = Rc::new("hello");

// `rc` is used after `.await`. It must be persisted to
// the task's state.
yield_now().await;

println!("{}", rc);
});
}

尝试编译会得到如下错误:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
error: future cannot be sent between threads safely
--> src/main.rs:6:5
|
6 | tokio::spawn(async {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
::: [..]spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ---- required by this bound in
| `tokio::task::spawn::spawn`
|
= help: within `impl std::future::Future`, the trait
| `std::marker::Send` is not implemented for
| `std::rc::Rc<&str>`
note: future is not `Send` as this value is used across an await
--> src/main.rs:10:9
|
7 | let rc = Rc::new("hello");
| -- has type `std::rc::Rc<&str>` which is not `Send`
...
10 | yield_now().await;
| ^^^^^^^^^^^^^^^^^ await occurs here, with `rc` maybe
| used later
11 | println!("{}", rc);
12 | });
| - `rc` is later dropped here

我们将在下一章更深入地讨论这个错误的一个特例。

保存值

我们现在将实现process函数来处理传入的命令,使用HashMap来存储值。SET命令将值插入到HashMap中,而GET值将读取这些值。此外,我们将使用循环从而让每个连接能够接受多个命令。

src/main.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};

async fn process(socket: TcpStream) {
use mini_redis::Command::{self, Get, Set};
use std::collections::HashMap;

// A hashmap is used to store data
let mut db = HashMap::new();

// Connection, provided by `mini-redis`, handles parsing frames from
// the socket
let mut connection = Connection::new(socket);

// Use `read_frame` to receive a command from the connection.
while let Some(frame) = connection.read_frame().await.unwrap() {
let response = match Command::from_frame(frame).unwrap() {
Set(cmd) => {
// The value is stored as `Vec<u8>`
db.insert(cmd.key().to_string(), cmd.value().to_vec());
Frame::Simple("OK".to_string())
}
Get(cmd) => {
if let Some(value) = db.get(cmd.key()) {
// `Frame::Bulk` expects data to be of type `Bytes`. This
// type will be covered later in the tutorial. For now,
// `&Vec<u8>` is converted to `Bytes` using `into()`.
Frame::Bulk(value.clone().into())
} else {
Frame::Null
}
}
cmd => panic!("unimplemented {:?}", cmd),
};

// Write the response to the client
connection.write_frame(&response).await.unwrap();
}
}

现在,启动服务:

1
cargo run

在另一个终端里运行hello-redis示例:

1
cargo run --example hello-redis

现在的输出应为:

1
got value from server; result=Some(b"world")

我们现在可以查询和设置键值了,但是有一个问题:这些键值并不能在连接之间共享。如果另一个套接字连接并尝试GEThello,它将查询不到任何东西。

你可以在这里找到完整的代码。

在下一章中,我们将为所有套接字实现数据持久化。

5 共享状态


到目前为止,我们有了键值服务端,但却存在一个缺陷:状态并未在连接之间共享。本文将解决这个问题。

策略

在Tokio中共享状态通常有以下几种方式。

  1. 使用互斥锁封装共享状态。
  2. 再生成一个任务,专门用于管理状态,并使用消息传递操作状态。

第一种方法通常用于简单数据,而第二种方法用于需要异步工作(例如I/O原语)的场景。在本章中,共享状态是一个hashmap,并且操作是insertget。这些操作都不是异步的,因此我们将使用互斥锁Mutex

下一章将介绍后一种方法。

添加bytes依赖

Mini-Redis crate将使用bytes crate的Bytes代替Vec<u8>Bytes专门用于为网络编程提供强大的字节阵列结构。相较于Vec<u8>,它添加了强大的浅拷贝功能。换句话说,在Bytes实例上调用clone()将不会复制底层数据,只会让实例的引用计数器加一,即Byte实例仅是某个底层数据的引用计数器句柄,类似于添加了新功能的Arc<Vec<u8>>

要使用bytes,请在Cargo.toml[dependencies]中添加以下内容:

Cargo.toml
1
bytes = "1"

初始化HashMap

为了让hashmap能够在许多任务和许多线程中共享,需要将其封装在Arc<Mutex<_>>中。

首先,为方便使用,在use之后给该类型起别名。

1
2
3
4
5
use bytes::Bytes;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

type Db = Arc<Mutex<HashMap<String, Bytes>>>;

然后,更新main函数以初始化Hashmap并将Arc的句柄传给process函数。使用Arc允许从许多任务中同时引用Hashmap,并可能在许多线程上运行。在整个Tokio中,术语句柄指的是那些可以访问某些共享状态的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
use tokio::net::TcpListener;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

println!("Listening");

let db = Arc::new(Mutex::new(HashMap::new()));

loop {
let (socket, _) = listener.accept().await.unwrap();
// Clone the handle to the hash map.
let db = db.clone();

println!("Accepted");
tokio::spawn(async move {
process(socket, db).await;
});
}
}

关于使用std::sync::Mutex

注意这里使用了std::sync::mutex而不是tokio::sync::Mutex来封装Hashmap。一个常见的错误是在异步代码中无条件使用tokio::sync::mutex。异步互斥锁用于锁住所有.await操作。

同步互斥锁在等待获取锁定时会阻塞当前线程,于是,将阻止其他任务处理。而如果使用tokio::sync::Mutex通常没有作用,因为异步互斥锁的在内部也使用同步互斥锁。

根据经验,只要竞争不高,并且不会在调用.await时保持锁,则使用异步代码内的同步互斥锁也是可以的。此外,也可以考虑使用parking_lot::mutex作为std::sync::Mutex的更快替代方案。

修改process()

process()函数不再需要初始化Hashmap,现在,它将接受一个共享的句柄作为参数,并且需要在使用前锁定该Hashmap。要注意的是Hashmap的值现在是Bytes类型(可以高效的复制),因此也需要修改。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};

async fn process(socket: TcpStream, db: Db) {
use mini_redis::Command::{self, Get, Set};

// Connection, provided by `mini-redis`, handles parsing frames from
// the socket
let mut connection = Connection::new(socket);

while let Some(frame) = connection.read_frame().await.unwrap() {
let response = match Command::from_frame(frame).unwrap() {
Set(cmd) => {
let mut db = db.lock().unwrap();
db.insert(cmd.key().to_string(), cmd.value().clone());
Frame::Simple("OK".to_string())
}
Get(cmd) => {
let db = db.lock().unwrap();
if let Some(value) = db.get(cmd.key()) {
Frame::Bulk(value.clone())
} else {
Frame::Null
}
}
cmd => panic!("unimplemented {:?}", cmd),
};

// Write the response to the client
connection.write_frame(&response).await.unwrap();
}
}

任务、线程与竞争

当竞争较少时,使用互斥锁阻保护少量关键变量是可行的策略。当出现锁竞争时,执行任务的线程将会阻塞并等待互斥锁释放。这不仅会阻塞当前任务,而且还将阻止在当前线程上调度任何其他任务。

默认情况下,Tokio运行时使用多线程调度程序。任务被安排在由运行时管理的任意数量的线程上。如果需要调度执行大量任务,并且他们都需要访问互斥锁,则将产生竞争。不过,如果启用了current_thread运行时,则永远不会产生互斥锁竞争。

current_thread运行时是一个轻量级的单线程运行时。当仅需生成少量任务并打开少量sockets时,这是一个不错的选择。例如,当在异步客户端库之上提供同步API时,就可以使用这个选项。

即使同步互斥锁竞争成为问题,也很少使用Tokio的互斥锁,而是应当考虑:

  1. 使用一个专用任务,以管理状态并使用消息传递。
  2. 互斥锁分片。
  3. 重组代码以避免使用互斥锁。

在我们的例子中,由于各个相互独立,因此可以选择将互斥锁分片。因此,我们将用N个不同的实例代替例子中的单个Mutex<HashMap<_, _>>

1
2
3
4
5
6
7
8
9
type ShardedDb = Arc<Vec<Mutex<HashMap<String, Vec<u8>>>>>;

fn new_sharded_db(num_shards: usize) -> ShardedDb {
let mut db = Vec::with_capacity(num_shards);
for _ in 0..num_shards {
db.push(Mutex::new(HashMap::new()));
}
Arc::new(db)
}

如此,找到任何给定键就需要两个步骤:先是要确定该键在哪个分片中,再在这个HashMap中查找该键。

1
2
let shard = db[hash(key) % db.len()].lock().unwrap();
shard.insert(key, value);

上面的这段简单实现要求确定的分片数量,而且一旦创建了分片映射分片数量就不能更改。dashmap crate提供了一个更复杂的分片哈希映射的实现。

.await上持有MutexGuard

你可能会编写这样的代码:

1
2
3
4
5
6
7
8
use std::sync::{Mutex, MutexGuard};

async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;

do_something_async().await;
} // lock goes out of scope here

当你尝试并发调用那些调用此函数的代码时,将遇到以下错误消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
error: future cannot be sent between threads safely
--> src/lib.rs:13:5
|
13 | tokio::spawn(async move {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ---- required by this bound in `tokio::task::spawn::spawn`
|
= help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, i32>`
note: future is not `Send` as this value is used across an await
--> src/lib.rs:7:5
|
4 | let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
| -------- has type `std::sync::MutexGuard<'_, i32>` which is not `Send`
...
7 | do_something_async().await;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `mut lock` maybe used later
8 | }
| - `mut lock` is later dropped here

这是因为std::sync::MutexGuard类型不可Send,即你不能将互斥锁发送到另一个线程,于是这将导致错误,因为Tokio运行时可以在每个.await时在线程间移动任务。为避免这种情况,你应该重组代码,使互斥锁在.await之前析构:

1
2
3
4
5
6
7
8
9
// This works!
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
{
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;
} // lock goes out of scope here

do_something_async().await;
}

请注意这样也不行:

1
2
3
4
5
6
7
8
9
10
use std::sync::{Mutex, MutexGuard};

// This fails too.
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;
drop(lock);

do_something_async().await;
}

目前,编译器仅根据作用域推断future是否为Send。将来,编译器可能会支持显式析构,但现在,您只能使用显式作用域。

请注意,这里讨论的错误也在并发一章的Send限制小节中讨论过。

如果您想尝试某种方式,通过生成不需要Send的并发任务来绕过此问题,是不可行的。因为如果Tokio在任务持有锁时将任务挂起在.await处,则可能会在同一线程上调度运行其他任务,而这里的其他任务也可能尝试获取该互斥锁,这将导致死锁,即等待获取互斥锁的任务会阻塞持有互斥锁的任务释放锁。

下面我们将讨论一些上述错误消息的方法:

重构你的代码,不跨.await持锁

我们已经在上面的代码片段中看到了一个例子,但是还有一些更健壮的代码实现。例如,你可以将互斥锁包装在一个结构体中,并只在非异步方法中锁定互斥锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
use std::sync::Mutex;

struct CanIncrement {
mutex: Mutex<i32>,
}
impl CanIncrement {
// This function is not marked async.
fn increment(&self) {
let mut lock = self.mutex.lock().unwrap();
*lock += 1;
}
}

async fn increment_and_do_stuff(can_incr: &CanIncrement) {
can_incr.increment();
do_something_async().await;
}

这种模式保证你不会遇到Send错误,因为MutexGuard不会出现在异步函数中。

生成一个任务来管理状态并使用消息传递对其进行操作

这是本章开头提到的第二种方法,通常在共享的资源是I/O资源时使用。有关详细信息,请参阅下一章。

使用Tokio的异步互斥锁

也可以使用Tokio提供的tokio::sync::Mutex类型。Tokio互斥锁的主要特性是它可以跨.await 保存而不会出现任何问题。也就是说,异步互斥锁比普通互斥锁开销更大,因此一般建议从前面两种方法中选一种实现。

1
2
3
4
5
6
7
8
9
10
use tokio::sync::Mutex; // note! This uses the Tokio mutex

// This compiles!
// (but restructuring the code would be better in this case)
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock = mutex.lock().await;
*lock += 1;

do_something_async().await;
} // lock goes out of scope here

6 消息通道


现在我们已经了解了一些关于Tokio的并发性,现在让我们在客户端上应用它。把我们之前写的服务端代码放到一个显式的二进制文件中:

1
2
mkdir src/bin
mv src/main.rs src/bin/server.rs

创建一个包含客户端代码的新二进制文件:

1
touch src/bin/client.rs

在此文件中,您将编写本章的代码。要记住在运行客户端前,必须首先在单独的终端中启动服务端:

1
cargo run --bin server

再用单独的终端里运行客户端:

1
cargo run --bin client

让我们开始编码!

假设我们要并发执行两个Redis命令。我们可以为每个命令生成一个任务,然后这两个命令将同时发生。

起初,我们可能会进行如下尝试:

src/bin/client.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
use mini_redis::client;

#[tokio::main]
async fn main() {
// Establish a connection to the server
let mut client = client::connect("127.0.0.1:6379").await.unwrap();

// Spawn two tasks, one gets a key, the other sets a key
let t1 = tokio::spawn(async {
let res = client.get("hello").await;
});

let t2 = tokio::spawn(async {
client.set("foo", "bar".into()).await;
});

t1.await.unwrap();
t2.await.unwrap();
}

上面的代码无法通过编译,因为这两个任务都需要以某种方式访问client。由于Client没有实现Copy,所以如果没有一些代码来实现共享,就无法编译。此外,Client::set采用&mut self,这意味着需要互斥存取才能调用。我们可以为每个任务打开一个连接,但这并不理想。我们不能使用std::sync::Mutex因为.await需要在持有锁的情况下调用。我们可以使用tokio::sync::Mutex,但这只会允许一个运行中的请求。如果客户端实现了流水线,异步互斥锁还将导致连接不能被充分利用。

消息传递

解决方案是使用消息传递。该模式需要生成一个专门的任务来管理client资源。任何希望发出请求的任务,都会向client任务发送消息。client任务代理发送者发出请求,并将响应返回给发送者。

使用此策略,可以只建立一个连接。client管理任务能够获得互斥存取权限,以便调用getset。此外,通道也起到了缓冲区的作用。即使client任务忙时,操作也可能会被发送到client任务。当client任务可用于处理新请求时,它就会从通道中获取下一个请求。这可以带来更好的吞吐量,并可扩展以支持连接池。

Tokio的通道原语

Tokio提供了许多通道,每个通道都有不同的用途。

  1. mpsc:多生产者,单消费者通道。可以发送许多值。
  2. oneshot:单生产者,单消费者渠道。可以发送单个值。
  3. broadcast:多生产者,多消费者。可以发送许多值,每个消费者都能看到每个值。
  4. watch:单生产者,多消费者。可以发送许多值,但不保留任何历史记录。消费者只能看到最新的值。

如果您需要一个多生产者多消费者通道,其中只有一个消费者可以看到每条消息,您可以使用async-channel crate。在异步Rust之外还有一些通道可以使用,例如std::sync::mpsccrossbeam::channel。这些通道在等待消息时将阻塞线程,这在异步代码中是不允许的。

在本节中,我们将使用mpsconeshot,其他类型的消息传递通道将在后面的章节中探讨。你可在这里找到本节的完整代码。

定义消息类型

在大多数情况下,当使用消息传递时,接收消息的任务会响应多个命令。在我们的例子中,任务将响应GETSET命令。为了对此建模,我们首先定义一个Command枚举并为每个命令指定一个成员变量。

src/bin/client.rs
1
2
3
4
5
6
7
8
9
10
11
12
use bytes::Bytes;

#[derive(Debug)]
enum Command {
Get {
key: String,
},
Set {
key: String,
val: Bytes,
}
}

创建通道

main函数中新建一个mpsc通道:

src/bin/client.rs
1
2
3
4
5
6
7
8
9
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
// Create a new channel with a capacity of at most 32.
let (tx, mut rx) = mpsc::channel(32);

// ... Rest comes here
}

mpsc通道用于向管理redis连接的任务发送命令,多生产者特性允许从多个任务发送消息。创建通道返回两个值,一个发送端和一个接收端,两个句柄各有用处。他们可能会被转移到不同的任务中。

创建通道的容量为32,如果消息的发送速度快于接收速度,通道将存储它们。一旦有32条消息存储在通道中,调用send(...).await就将进入睡眠状态,直到接收方取走了一条消息为止。

从多个任务发送是通过克隆Sender来完成的。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);
let tx2 = tx.clone();

tokio::spawn(async move {
tx.send("sending from first handle").await;
});

tokio::spawn(async move {
tx2.send("sending from second handle").await;
});

while let Some(message) = rx.recv().await {
println!("GOT = {}", message);
}
}

两条消息都发送到同一个Receiver句柄。mpsc通道的接收端无法被克隆。

当所有Sender超出作用域或是被析构时,就无法再向通道发送更多消息。此时,在Receiver上调用recv会返回None,表示所有发送端都不在了,通道已关闭。

在我们管理Redis连接的任务中,当通道关闭时该任务就将关闭Redis连接,因为该连接将不再被使用。

生成管理任务

接下来,生成一个专门处理通道的消息的任务。首先,客户端建立到Redis服务端的连接;然后,通过Redis连接发送收到的命令。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
use mini_redis::client;
// The `move` keyword is used to **move** ownership of `rx` into the task.
let manager = tokio::spawn(async move {
// Establish a connection to the server
let mut client = client::connect("127.0.0.1:6379").await.unwrap();

// Start receiving messages
while let Some(cmd) = rx.recv().await {
use Command::*;

match cmd {
Get { key } => {
client.get(&key).await;
}
Set { key, val } => {
client.set(&key, val).await;
}
}
}
});

现在可以修改任务,使用通道发送命令,以代替直接使用Redis连接发送命令。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// The `Sender` handles are moved into the tasks. As there are two
// tasks, we need a second `Sender`.
let tx2 = tx.clone();

// Spawn two tasks, one gets a key, the other sets a key
let t1 = tokio::spawn(async move {
let cmd = Command::Get {
key: "hello".to_string(),
};

tx.send(cmd).await.unwrap();
});

let t2 = tokio::spawn(async move {
let cmd = Command::Set {
key: "foo".to_string(),
val: "bar".into(),
};

tx2.send(cmd).await.unwrap();
});

main函数最后,在join句柄上使用.await,以确保命令在进程退出前能够执行完毕。

接收响应

最后一步是从管理任务接收响应。GET命令需要获取值,而SET命令需要知道操作是否成功完成。

我们使用oneshot通道,以传递响应。oneShot通道是一种单次生产、单次消费的通道,专门针对对发送单个值的场景进行了优化。在我们的例子中,这单个值就是响应。

MPSC类似,oneShot::Channel()返回一对发送端、接收端句柄。

1
2
3
use tokio::sync::oneshot;

let (tx, rx) = oneshot::channel();

mpsc不同的是,它无需指定通道数量,因为始终是一,此外,这两个句柄都不能克隆。

要从管理任务接收响应,在发送命令之前,就应创建oneshot通道。通道的发送端包含在命令中传给管理任务,接收端则用于接收响应结果。

首先,修改Command以包含Sender。为了方便起见,使用类型别名来指代Sender

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
use tokio::sync::oneshot;
use bytes::Bytes;

/// Multiple different commands are multiplexed over a single channel.
#[derive(Debug)]
enum Command {
Get {
key: String,
resp: Responder<Option<Bytes>>,
},
Set {
key: String,
val: Bytes,
resp: Responder<()>,
},
}

/// Provided by the requester and used by the manager task to send
/// the command response back to the requester.
type Responder<T> = oneshot::Sender<mini_redis::Result<T>>;

然后,修改管理任务以能够发送包含oneshot::Sender的命令。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
let t1 = tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel();
let cmd = Command::Get {
key: "hello".to_string(),
resp: resp_tx,
};

// Send the GET request
tx.send(cmd).await.unwrap();

// Await the response
let res = resp_rx.await;
println!("GOT = {:?}", res);
});

let t2 = tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel();
let cmd = Command::Set {
key: "foo".to_string(),
val: "bar".into(),
resp: resp_tx,
};

// Send the SET request
tx2.send(cmd).await.unwrap();

// Await the response
let res = resp_rx.await;
println!("GOT = {:?}", res);
});

最后,修改管理任务,以使用oneshot通道返回响应。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
while let Some(cmd) = rx.recv().await {
match cmd {
Command::Get { key, resp } => {
let res = client.get(&key).await;
// Ignore errors
let _ = resp.send(res);
}
Command::Set { key, val, resp } => {
let res = client.set(&key, val).await;
// Ignore errors
let _ = resp.send(res);
}
}
}

调用oneshot::Sendersend会立即完成,不需要使用.await。这是因为在oneshot通道上send总是立刻失败或成功,不会有任何等待。

oneshot通道上发送值时,如果接收端被析构则将返回Err,以表明接收端不再关注响应。在我们的例子中,接收端可以不接收响应,因此无需处理resp.send(...)返回的Err

你可以在这里找到完整代码。

背压调节和通道限制

当引入并发或队列时,一定要确保队列是有限的,以使系统能优雅地处理负载。无限的队列最终将耗尽所有可用的内存,并导致系统以不可预测的方式宕机。

Tokio能够避免定义不明确的队列,主要也是因为Tokio中的异步操作是惰性求值。考虑以下代码:

1
2
3
loop {
async_op();
}

如果异步操作立即求值,则此循环会不停的将新的async_op排入队列,且并不关心之前的操作是否完成,如此则会隐式的产生无限制队列。基于回调和立即求future的系统尤其容易受到上述影响。

但是,使用Tokio和异步Rust,上述代码片段的async_op根本不会运行,因为.await从未被调用。如果修改上述代码,加入.await,则循环会等待操作完成才进入下一次循环。

1
2
3
4
loop {
// Will not repeat until `async_op` completes
async_op().await;
}

使用并发和队列时必须明确引入。这样做的方法包括:

  • tokio::Spawn
  • select!
  • join!
  • mpsc::channel

这样做时,请注意确保并发的总数有上限。例如,在编写接受TCP连接的循环时,请确保打开的sockets的总数有上限。使用mpsc::channel时,选择合适的通道容量。针对特定的应用程序选择特定的约束限制。

编写可靠的Tokio应用程序的一个关键,就是留意并挑选良好的限制值。

7 I/O


Tokio中的I/O以与std中的运行方式几乎一样,只不过是异步的。有一个读trait(AsyncRead)和一个写trait(AsyncWrite)。一些类型已经良好的地实现了这些trait(TCPStreamFileStdout)。许多数据结构也实现了AsyncReadAsyncWrite(如Vec<u8>&[u8]),这使得我们可以在需要用到reader和writer的地方使用字节数组。

本章将通过一些例子演示使用Tokio时的基本I/O读写操作。下一章江更加深入的介绍高级的I/O示例。

AsyncReadAsyncWrite

这两个trait为异步读写字节流提供了基础工具。我们通常不直接调用这些trait上的方法,类似于你不会手动调用future trait的poll方法,而是使用AsyncReadExtAsyncWriteExt提供的工具方法。

让我们简要看一下其中的一些方法,所有这些函数都是异步的,必须与配合.await使用。

async fn read()

AsyncReadExt::read提供了一种将数据读取到缓冲区的异步方法,返回值为读到的字节数。

注意:当read()返回Ok(0)时,意味着该流已关闭。任何调用read()的future江立即完成并返回Ok(0)。比如对TcpStream的实例,这表示socket的读端已关闭。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
use tokio::fs::File;
use tokio::io::{self, AsyncReadExt};

#[tokio::main]
async fn main() -> io::Result<()> {
let mut f = File::open("foo.txt").await?;
let mut buffer = [0; 10];

// read up to 10 bytes
let n = f.read(&mut buffer[..]).await?;

println!("The bytes: {:?}", &buffer[..n]);
Ok(())
}

async fn read_to_end()

AsyncReadExt::read_to_end将读取流中的所有字节,直到遇到EOF。

1
2
3
4
5
6
7
8
9
10
11
12
use tokio::io::{self, AsyncReadExt};
use tokio::fs::File;

#[tokio::main]
async fn main() -> io::Result<()> {
let mut f = File::open("foo.txt").await?;
let mut buffer = Vec::new();

// read the whole file
f.read_to_end(&mut buffer).await?;
Ok(())
}

async fn write()

AsyncWriteExt::write将一个缓冲区写入writer,返回值为写入的字节数。

1
2
3
4
5
6
7
8
9
10
11
12
13
use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;

#[tokio::main]
async fn main() -> io::Result<()> {
let mut file = File::create("foo.txt").await?;

// Writes some prefix of the byte string, but not necessarily all of it.
let n = file.write(b"some bytes").await?;

println!("Wrote the first {} bytes of 'some bytes'.", n);
Ok(())
}

async fn write_all()

AsyncWriteExt::write_all将整个缓冲区写入writer。

1
2
3
4
5
6
7
8
9
10
use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;

#[tokio::main]
async fn main() -> io::Result<()> {
let mut file = File::create("foo.txt").await?;

file.write_all(b"some bytes").await?;
Ok(())
}

这两个trait都包括许多其他有用的方法,详情请参阅API文档。

帮助函数

此外,就像stdtokio::io模块也包含许多实用的工具函数,以及用于标准输入标准输出标准错误的API。例如,使用tokio::io::copy异步将读端的所有内容复制到写端。

1
2
3
4
5
6
7
8
9
10
11
use tokio::fs::File;
use tokio::io;

#[tokio::main]
async fn main() -> io::Result<()> {
let mut reader: &[u8] = b"hello";
let mut file = File::create("foo.txt").await?;

io::copy(&mut reader, &mut file).await?;
Ok(())
}

留意到上例实际上利用了实现了AsyncRead的字节数组。

回音服务端

让我们上手试一下异步I/O,来编写一个简单的回音服务端。

回音服务端绑定一个TcpListener,使用一个循环接收入站连接。对于每个入站连接,从socket读取数据,然后立即将读到的数据写回socket。客户端将数据发送到服务器,然后接收完全相同的数据。

我们将使用两种稍微不同的策略分别实现回音服务端。

使用io::copy()

首先,我们将使用io:copy实现回音逻辑。

您可以在新的二进制文件中写入此代码:

1
touch src/bin/echo-server-copy.rs

您可以通过以下方式启动(或仅检查编译):

1
cargo run --bin echo-server-copy

您将可以直接使用标准命令行工具(例如telnet)测试服务端,或使用tokio::net::TcpStream的文档中示例的简单客户测试服务端。

TCP服务端使用循环接受请求,每接受一个socket就生成一个新任务。

src/bin/echo-server-copy.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
use tokio::io;
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:6142").await?;

loop {
let (mut socket, _) = listener.accept().await?;

tokio::spawn(async move {
// Copy data here
});
}
}

之前提到过,该工具函数将接受一个读端和一个写端,并将数据从一端拷贝到另一端,但此处我们只有一个Tcpstream,而这一个值同时实现了AsyncReadAsyncWrite。由于io::copy要求读端和写端均为&mut,如果这两个参数都写成socket的话就不能通过编译。

1
2
// This fails to compile
io::copy(&mut socket, &mut socket).await

拆分读端和写端

要解决问题,我们需要将socket拆分为读句柄和写句柄,特定的类型有特定的最佳读/写端拆分方式。

任何读端+写端的类型,均可用io::split工具函数拆分,函数接受一个参数,返回一对读句柄/写句柄。这两个句柄可以独立使用,比如用在不同的任务中。

例如,回音客户端可以按如下的方式处理的并发读取:

echo-client.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;

#[tokio::main]
async fn main() -> io::Result<()> {
let socket = TcpStream::connect("127.0.0.1:6142").await?;
let (mut rd, mut wr) = io::split(socket);

// Write data in the background
tokio::spawn(async move {
wr.write_all(b"hello\r\n").await?;
wr.write_all(b"world\r\n").await?;

// Sometimes, the rust type inferencer needs
// a little help
Ok::<_, io::Error>(())
});

let mut buf = vec![0; 128];

loop {
let n = rd.read(&mut buf).await?;

if n == 0 {
break;
}

println!("GOT {:?}", &buf[..n]);
}

Ok(())
}

io::split能够接受任何实现了AsyncRead + AsyncWrite的类型并返回独立的句柄,是因为io::split内部使用一个Arc和一个Mutex。我们可以使用TcpStream避免使用此开销。TcpStream提供了两个专用的拆分函数。

TcpStream::split接受一个流的引用,返回一对读/写句柄。由于使用了引用,因此两个句柄都必须呆在这个调用split()的任务中。这种专用拆分为零成本,不需要ArcMutexTcpStream还提供了into_split函数,以仅使用一个Arc为代价,让句柄能够在任务间移动。

由于调用io::copy()的任务就拥有TcpStream的所有权,因此我们可以使用TcpStream::split。服务端中处理回音逻辑的任务变成:

1
2
3
4
5
6
7
tokio::spawn(async move {
let (mut rd, mut wr) = socket.split();

if io::copy(&mut rd, &mut wr).await.is_err() {
eprintln!("failed to copy");
}
});

你可以在这里找到完整的代码。

手动拷贝

现在,让我们来了解一下如何通过手动复制数据的方式编写回音服务端。这里,我们需要使用AsyncReadExt::readAsyncWriteExt::write_all

完整的回音服务端如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:6142").await?;

loop {
let (mut socket, _) = listener.accept().await?;

tokio::spawn(async move {
let mut buf = vec![0; 1024];

loop {
match socket.read(&mut buf).await {
// Return value of `Ok(0)` signifies that the remote has
// closed
Ok(0) => return,
Ok(n) => {
// Copy the data back to socket
if socket.write_all(&buf[..n]).await.is_err() {
// Unexpected socket error. There isn't much we can
// do here so just stop processing.
return;
}
}
Err(_) => {
// Unexpected socket error. There isn't much we can do
// here so just stop processing.
return;
}
}
}
});
}
}

(您可以将此代码放入src/bin/echo-server.rs中,并使用cargo run --bin echo-server运行)。

我们来逐行理解代码。首先,由于使用AsyncReadAsyncWrite实用工具,因此必须引入这些扩展trait。

1
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};

分配缓冲区

我们的策略是将一些从socket中读取的数据放入缓冲区,然后将缓冲区的内容写回到socket中。

1
let mut buf = vec![0; 1024];

这里显式禁止使用栈缓冲区。回想前面,我们注意到,跨.await调用存活的所有任务数据都必须保存在任务中。上面的代码中,buf.await调用使用。所有任务数据都存储在这单个分配中。您可以将其视为一个enum,其中每个成员变量都是需要保存在任务中以便特定的.await调用的数据。

如果缓冲区是一个栈数组,则为每个接受的socket生成的任务内部结构可能看起来像:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
struct Task {
// internal task fields here
task: enum {
AwaitingRead {
socket: TcpStream,
buf: [BufferType],
},
AwaitingWriteAll {
socket: TcpStream,
buf: [BufferType],
}

}
}

如果将栈数组当做缓冲区,则它将内联存储在任务结构体中。这将使任务结构体变得非常大。另外,缓冲的大小通常是页大小,于是,这将使任务大小变得很尴尬:$页大小 + 一些额外字节

当然,编译器将异步代码块进行了优化,性能远超基本的enmu。实际上,变量并不会像enum可能需要的那样,在成员间移动。但是,任务结构体大小至少与最大的变量一样大。

因此,为缓冲区使用专用的分配通常能更加高效。

处理EOF

当关闭了TCP流的读端后,再调用read()将返回Ok(0),此时必须让读循环终止。忘记在已经EOF的内容上继续循环读取,是一个常见的错误。

1
2
3
4
5
6
7
8
loop {
match socket.read(&mut buf).await {
// Return value of `Ok(0)` signifies that the remote has
// closed
Ok(0) => return,
// ... other cases handled here
}
}

忘记停止读循环,通常会导致CPU使用率100%的无限循环。当socket关闭时,socket.read()将立即返回,然后不停的重复循环。

你可以在这里找到完整的代码。

8 组帧


现在,我们将使用我们刚刚学到的I/O相关知识,实现Mini-Redis组帧层。组帧是获取字节流并将其转换为帧流的过程,帧是两节点间传输数据的单位。Redis协议帧定义如下:

1
2
3
4
5
6
7
8
9
10
use bytes::Bytes;

enum Frame {
Simple(String),
Error(String),
Integer(u64),
Bulk(Bytes),
Null,
Array(Vec<Frame>),
}

请留意该帧定义是如何使用没有语义的数据组成的,命令解析和实现位于更上层。

对于HTTP,帧可能看起来像:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
enum HttpFrame {
RequestHead {
method: Method,
uri: Uri,
version: Version,
headers: HeaderMap,
},
ResponseHead {
status: StatusCode,
version: Version,
headers: HeaderMap,
},
BodyChunk {
chunk: Bytes,
},
}

为了实现Mini-Redis的帧,我们将实现Connection结构体,里面封装了一个TcpStream用以读/写mini_redis::Frame

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
use tokio::net::TcpStream;
use mini_redis::{Frame, Result};

struct Connection {
stream: TcpStream,
// ... other fields here
}

impl Connection {
/// Read a frame from the connection.
///
/// Returns `None` if EOF is reached
pub async fn read_frame(&mut self)
-> Result<Option<Frame>>
{
// implementation here
}

/// Write a frame to the connection.
pub async fn write_frame(&mut self, frame: &Frame)
-> Result<()>
{
// implementation here
}
}

你可以在这里找到Redis Wire协议的详细信息,可以在这里找到完整的代码。

缓冲读

read_frame方法会等待收到整个帧才返回,而调用一次TcpStream::read()返回的数据长度不确定,可能包含整个帧,或是部分帧,也可能包含了多个帧。如果收到的是部分帧,则应缓冲数据并从socket中读取更多数据。如果收到的是多个帧,则应返回第一帧,缓冲其余数据,以等待下一次read_frame调用。

为了实现上述功能,需要给Connection添加一个读缓冲区字段,以数据将从socket读取到读缓冲区。解析帧时,再从缓冲区中返回并删除相应的数据即可。

我们选用BytesMut作为缓冲区类型,即Bytes的可变版本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
use bytes::BytesMut;
use tokio::net::TcpStream;

pub struct Connection {
stream: TcpStream,
buffer: BytesMut,
}

impl Connection {
pub fn new(stream: TcpStream) -> Connection {
Connection {
stream,
// Allocate the buffer with 4kb of capacity.
buffer: BytesMut::with_capacity(4096),
}
}
}

接下来实现read_frame()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
use tokio::io::AsyncReadExt;
use bytes::Buf;
use mini_redis::Result;

pub async fn read_frame(&mut self)
-> Result<Option<Frame>>
{
loop {
// Attempt to parse a frame from the buffered data. If
// enough data has been buffered, the frame is
// returned.
if let Some(frame) = self.parse_frame()? {
return Ok(Some(frame));
}

// There is not enough buffered data to read a frame.
// Attempt to read more data from the socket.
//
// On success, the number of bytes is returned. `0`
// indicates "end of stream".
if 0 == self.stream.read_buf(&mut self.buffer).await? {
// The remote closed the connection. For this to be
// a clean shutdown, there should be no data in the
// read buffer. If there is, this means that the
// peer closed the socket while sending a frame.
if self.buffer.is_empty() {
return Ok(None);
} else {
return Err("connection reset by peer".into());
}
}
}
}

让我们逐行分析这段代码。read_frame方法内是一个循环:首先调用self.parse_frame(),以将尝试从self.buffer中解析出一个完整的redis帧,如果有足够的数据来解析出一个完整帧,则将该帧返回给read_frame()的调用者;反之,则尝试将从socket读更多的数据取到缓冲区中。读取更多数据后将再次调用parse_frame(),此时,如果收到足够的数据,解析可能会成功。

从流中读取数据时,返回值为0表示将不能从对端收到更多数据,此时,如果读缓冲区仍有数据,则表明收到了部分帧,但该连接突然终止。这是一个错误,应返回Err

Buf trait

从流中读取数据时调用了read_buf。此读函数接受的参数,需要实现bytes crate的BufMut trait。

这里先思考如果使用read()实现相同的读循环,该怎么写。(可以先用Vec<u8>代替上面的BytesMut)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
use tokio::net::TcpStream;

pub struct Connection {
stream: TcpStream,
buffer: Vec<u8>,
cursor: usize,
}

impl Connection {
pub fn new(stream: TcpStream) -> Connection {
Connection {
stream,
// Allocate the buffer with 4kb of capacity.
buffer: vec![0; 4096],
cursor: 0,
}
}
}

再给Conneciont添加read_frame()函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
use mini_redis::{Frame, Result};

pub async fn read_frame(&mut self)
-> Result<Option<Frame>>
{
loop {
if let Some(frame) = self.parse_frame()? {
return Ok(Some(frame));
}

// Ensure the buffer has capacity
if self.buffer.len() == self.cursor {
// Grow the buffer
self.buffer.resize(self.cursor * 2, 0);
}

// Read into the buffer, tracking the number
// of bytes read
let n = self.stream.read(
&mut self.buffer[self.cursor..]).await?;

if 0 == n {
if self.cursor == 0 {
return Ok(None);
} else {
return Err("connection reset by peer".into());
}
} else {
// Update our cursor
self.cursor += n;
}
}
}

使用字节数组read时,需要手动维护一个游标用来跟踪缓冲了多少数据,还必须确保将缓冲区仍为空的部分传给read();否则,将覆盖掉缓冲的数据。而当缓冲区用完时,还要手动扩大缓冲区以能够继续读到缓冲区。在parse_frame()(尚未提到)中,还需要指定解析位于self.buffer[..self.cursor]中的数据。

因为于将字节数组与游标搭配使用的场景很常见,bytes crate提供了表示字节数组和游标的抽象。Buf trait由那些可以从中读取数据的源头类型来实现。BufMut由那些可以进将数据写入的目的类型来实现。当把T: BufMut的参数传给read_buf()时,缓冲区内部的游标将被read_buf()自动更新。因此,在我们的read_frame,不再需要自己手动管理游标。

另外,使用vec<u8>时,必须初始化缓冲区。vec![0; 4096]将分配4096字节的数组,并将每个字节都置为零。调整缓冲区大小时,新增部分还必须使用零来初始化。而初始化操作是有代价的。使用BytesMutBufMut时,缓冲区是未初始化的。BytesMut抽象会防止我们从未初始化的内存中读取数据,这就避免了初始化操作。

解析帧

现在,让我们看一下parse_frame(),解析分两个步骤进行。

  1. 确保全帧被缓冲,并索引到到帧的末尾。
  2. 解析帧。

mini-redis crate为我们提供了这两个步骤所需的函数:

  1. Frame::check
  2. Frame::parse

这里又需要使用到Buf抽象。一个实现了Buf的对象会被传递给Frame::check,而在check函数检查传入的缓冲对象时,内部的游标会随着移动。当check返回时,缓冲区内部的游标就自动指向帧的末端了。

我们将使用std::io::Cursor<&[u8]>类型实现Buf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
use mini_redis::{Frame, Result};
use mini_redis::frame::Error::Incomplete;
use bytes::Buf;
use std::io::Cursor;

fn parse_frame(&mut self)
-> Result<Option<Frame>>
{
// Create the `T: Buf` type.
let mut buf = Cursor::new(&self.buffer[..]);

// Check whether a full frame is available
match Frame::check(&mut buf) {
Ok(_) => {
// Get the byte length of the frame
let len = buf.position() as usize;

// Reset the internal cursor for the
// call to `parse`.
buf.set_position(0);

// Parse the frame
let frame = Frame::parse(&mut buf)?;

// Discard the frame from the buffer
self.buffer.advance(len);

// Return the frame to the caller.
Ok(Some(frame))
}
// Not enough data has been buffered
Err(Incomplete) => Ok(None),
// An error was encountered
Err(e) => Err(e.into()),
}
}

完整的Frame::check函数可以在这里找到,此处就不贴上它的完整代码了。

值得注意的是Buf使用了“字节迭代器”风格的API,以读取数据并移动内部游标。例如,解析帧时,检查第一个字节以确定帧的类型,使用的函数是Buf::get_u8,可以获取游标当前位置的字节,并将游标后移一位。

Buf trait还有更多有用的方,可以查看API文档获取更多信息。

缓冲写

帧相关API的另一半是write_frame(frame)函数,它会将一个完整的帧写入socket。为了最大程度地减少write系统调用,写将被缓冲。我们维护一个写缓冲区,并在写入socket之前将帧编码到该缓冲区。不过,与read_frame()不同,完整的帧并不总是在写入socket前缓冲到字节数组。

比如对一个大块流帧,要写入的值为Frame::Bulk(Bytes)。大块帧的的物理格式是一个帧头,即$符后接以字节计算的数据长度。帧的主体是内容的Bytes。如果数据量很大,则将其复制到中间缓冲区将非常耗资源。

为了实现缓冲写,我们将使用BufWriter结构体,它用T: AsyncWrite初始化并自己实现AsyncWrite。当在BufWriter上调用write时,并非直接调用内部的写操作,而是提交给缓冲区。当缓冲区满时,内容才一次性交给内部写操作,然后清理缓冲区。当然,在某些情况下,还有一些可以绕过缓冲区的优化操作。

这里的教程不会贴上完整的write_frame()实现了,你可以在这里找到完整代码。

首先,修改Connection结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
use tokio::io::BufWriter;
use tokio::net::TcpStream;
use bytes::BytesMut;

pub struct Connection {
stream: BufWriter<TcpStream>,
buffer: BytesMut,
}

impl Connection {
pub fn new(stream: TcpStream) -> Connection {
Connection {
stream: BufWriter::new(stream),
buffer: BytesMut::with_capacity(4096),
}
}
}

然后实现write_frame()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
use tokio::io::{self, AsyncWriteExt};
use mini_redis::Frame;

async fn write_frame(&mut self, frame: &Frame)
-> io::Result<()>
{
match frame {
Frame::Simple(val) => {
self.stream.write_u8(b'+').await?;
self.stream.write_all(val.as_bytes()).await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::Error(val) => {
self.stream.write_u8(b'-').await?;
self.stream.write_all(val.as_bytes()).await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::Integer(val) => {
self.stream.write_u8(b':').await?;
self.write_decimal(*val).await?;
}
Frame::Null => {
self.stream.write_all(b"$-1\r\n").await?;
}
Frame::Bulk(val) => {
let len = val.len();

self.stream.write_u8(b'$').await?;
self.write_decimal(len as u64).await?;
self.stream.write_all(val).await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::Array(_val) => unimplemented!(),
}

self.stream.flush().await;

Ok(())
}

这里使用的诸多函数均由AsyncWriteExt trait提供,该trait也被TcpStream实现,但是不建议在没有中间缓冲区的情况下发送写单个字节的命令(译注:即直接在TcpStream上调用这些write_*函数会产生大量系统调用,建议的做法是在有缓冲包装的写对象上调用,如文中的stream是一个BufWriter<TcpStream>对象)。

该函数结束时将调用self.stream.flush().await。由于BufWriter在中间缓冲区保存写内容,因此调用write不能保证数据立即写入socket。在返回之前,我们希望将帧写入socket,手动调用flush()以将缓冲区中等待的所有数据写入socket。

另一种实现是在Connection上提供一个flush()方法,来代替在write_frame()中调用flush()。这将允许调用者将多个小帧排队写入写缓冲区,然后只用一个write系统调用将它们全部写入socket。不过这样会使Connection的API复杂化。简单也是Mini-Redis的目标,因此我们决定在fn write_frame()中调用flush().await

9 深入异步


至此,我们已经较为全面的了解了关于异步Rust和Tokio的知识。现在,让我们更深入了研究Rust的异步运行时模型。在本教程的最开始,我们暗示异步Rust采用了一种独特的实现方法。本章,我们就解释一下这意味着什么。

Futures

我们用一个非常基本的异步函数,来快速回顾一下了解到的异步相关知识,在这个例子中并没有超出前几章的知识点。

1
2
3
4
5
6
7
use tokio::net::TcpStream;

async fn my_async_fn() {
println!("hello from async");
let _socket = TcpStream::connect("127.0.0.1:3000").await.unwrap();
println!("async TCP operation complete");
}

我们调用该函数,它返回一些值。我们又在该值上调用.await

1
2
3
4
5
6
7
8
#[tokio::main]
async fn main() {
let what_is_this = my_async_fn();
// Nothing has been printed yet.
what_is_this.await;
// Text has been printed and socket has been
// established and closed.
}

my_async_fn()的返回值是一个future。实现了标准库std::future::Future trait的值就是future。这些值里包含了正在执行的异步计算。

std::future::Future trait的定义为:

1
2
3
4
5
6
7
8
9
use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Future {
type Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context)
-> Poll<Self::Output>;
}

关联类型Output是future执行完毕后返回的类型。Pin类型使Rust能够支持异步函数内的借用。有关更多详细信息,请参阅标准库文档

与用其他语言实现future的方式不同,Rust的future并不是正在后台执行的计算过程,而是计算本身。future的所有者负责通过轮询未来来推进计算。这是通过调用Future::poll来实现的。

(译注,本教程因主要关注tokio的应用,所以本章关于深入Rust异步机制的介绍篇幅并不大,更深入的讨论可以参考 使用Rust编写操作系统 - 4.1 - Async/Await

实现Future

让我们实现一个非常简单的future。 这个future将:

  1. 等待,直到某个特定的时刻才执行。
  2. 将一些文本输出到标准输出。
  3. 生成一个字符串。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

struct Delay {
when: Instant,
}

impl Future for Delay {
type Output = &'static str;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<&'static str>
{
if Instant::now() >= self.when {
println!("Hello world");
Poll::Ready("done")
} else {
// Ignore this line for now.
cx.waker().wake_by_ref();
Poll::Pending
}
}
}

#[tokio::main]
async fn main() {
let when = Instant::now() + Duration::from_millis(10);
let future = Delay { when };

let out = future.await;
assert_eq!(out, "done");
}

将异步函数当做Future

main函数中,我们实例化future并在其上调用.await。从异步函数中,我们可以在任何实现了Future的值上调用.await。反过来,调用async函数会返回一个实现了Future的匿名类型。在async fn main()的情况下,生成的future大致为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

enum MainFuture {
// Initialized, never polled
State0,
// Waiting on `Delay`, i.e. the `future.await` line.
State1(Delay),
// The future has completed.
Terminated,
}

impl Future for MainFuture {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<()>
{
use MainFuture::*;

loop {
match *self {
State0 => {
let when = Instant::now() +
Duration::from_millis(10);
let future = Delay { when };
*self = State1(future);
}
State1(ref mut my_future) => {
match Pin::new(my_future).poll(cx) {
Poll::Ready(out) => {
assert_eq!(out, "done");
*self = Terminated;
return Poll::Ready(());
}
Poll::Pending => {
return Poll::Pending;
}
}
}
Terminated => {
panic!("future polled after completion")
}
}
}
}
}

Rust future实际上是状态机。这里用一个包含了三种future可能状态的enum来表示MainFuture。future状态机始于State0状态。当调用poll时,future会尽可能地推进其内部状态。如果future能够完成,则返回包含了异步计算输出的Poll::Ready

如果future无法完成,通常是由于它正在等待的资源尚未准备好,则返回Poll::Pending。若调用者收到Poll::Pending,则表明future将在稍后完成,调用者应该稍后再次调用poll

我们还看到future也包含了其他future。调用外部future的poll会导致调用内部future的poll函数。

执行器

异步Rust函数返回future,而future必须依靠不断的poll调用来推进器状态。Future包含了其他future,那么问题来了,是谁在最外层的future调用poll呢?

回想一下,要运行异步函数,要么必须传递给tokio::spawn,要么使用带有#[tokio::main]标识的main函数。这实际上这是将生成的外层future提交给了Tokio执行器。执行器负责在外部future上调用Future::poll,以驱动异步计算完成执行。

迷你Tokio

为了更好地理解这一切是如何组合在一起的,让我们实现我们自己的最小版本的Tokio!你可以在这里找到完整的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use futures::task;

fn main() {
let mut mini_tokio = MiniTokio::new();

mini_tokio.spawn(async {
let when = Instant::now() + Duration::from_millis(10);
let future = Delay { when };

let out = future.await;
assert_eq!(out, "done");
});

mini_tokio.run();
}

struct MiniTokio {
tasks: VecDeque<Task>,
}

type Task = Pin<Box<dyn Future<Output = ()> + Send>>;

impl MiniTokio {
fn new() -> MiniTokio {
MiniTokio {
tasks: VecDeque::new(),
}
}

/// Spawn a future onto the mini-tokio instance.
fn spawn<F>(&mut self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
self.tasks.push_back(Box::pin(future));
}

fn run(&mut self) {
let waker = task::noop_waker();
let mut cx = Context::from_waker(&waker);

while let Some(mut task) = self.tasks.pop_front() {
if task.as_mut().poll(&mut cx).is_pending() {
self.tasks.push_back(task);
}
}
}
}

这里执行了一个异步代码块。使用给定的延迟创建了一个Delay实例,并处于等待状态。但是现在的实现存在一个问题,即执行器不会休眠。这个执行器会在生成的所有future上不断调用轮询。大多数时间里,future都不会就绪,只会继续返回Poll::Pending。这一过程将吃满CPU周期,所以通常效率不高。

理想情况下,我们希望mini-tokio只在future就绪时才去调用轮询。这种情况发生在任务所需的被阻塞资源转为可用,从而能够执任务所求情的操作时,例如,若任务想从TCP socket读取数据,则我们只希望在TCP socket收到数据时才进行轮询。在mini-tokio中,任务在给定的Instant时间内被阻塞,所以理想情况下,mini-tokio应只在任务给定的Instance时间过去后,再执行一次轮询。

为了实现这一目标,当轮询了某资源,且尚未就绪时,该资源应在过度到就绪状态后立即发送通知。

唤醒器

现在我们还缺少唤醒器,即用于在资源就绪时,通知等待的任务资源已就绪,可以继续后面的操作。

再次观察Future:poll的函数签名:

1
2
fn poll(self: Pin<&mut Self>, cx: &mut Context)
-> Poll<Self::Output>;

pollContext参数有一个waker()方法,它返回一个绑定当前任务的WakerWaker有一个wake()方法,调用此方法将通知执行器以安排执行相关任务。当资源过度到就绪状态时就会调用wake(),通知执行器轮询该任务以推进作业的执行。

修改Delay

Delay代码加上唤醒器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::thread;

struct Delay {
when: Instant,
}

impl Future for Delay {
type Output = &'static str;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<&'static str>
{
if Instant::now() >= self.when {
println!("Hello world");
Poll::Ready("done")
} else {
// Get a handle to the waker for the current task
let waker = cx.waker().clone();
let when = self.when;

// Spawn a timer thread.
thread::spawn(move || {
let now = Instant::now();

if now < when {
thread::sleep(when - now);
}

waker.wake();
});

Poll::Pending
}
}
}

现在,一旦等待到了指定的延时,就会通知调用的任务,执行器就可以再次安排轮询任务了。接下来我们修改mini-tokio的代码,从而能够接受唤醒通知。

这里的Delay实现仍存在一些问题,我们会稍后修复。

当future返回Poll:Pending时,必须确保在某个时刻通知唤醒器,若忘记执行此操作,会导致任务永久挂起。

在任务返回Poll:Pending后忘记唤醒是一个常见的bug。

回忆我们的第一版Delay,是这样实现future的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
impl Future for Delay {
type Output = &'static str;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<&'static str>
{
if Instant::now() >= self.when {
println!("Hello world");
Poll::Ready("done")
} else {
// Ignore this line for now.
cx.waker().wake_by_ref();
Poll::Pending
}
}
}

在返回Poll:Pending之前,我们调用了cx.waker().wake_by_ref()。这是为了符合future协议。由于返回了Poll::Pending,我们得负责通知唤醒器。由于此时我们还没实现计时器线程,所以我们在内部通知了唤醒器。这将导致该future被立刻安排轮询,从而再次执行,并且此刻的future依然没有就绪。

这里可以注意到,你确实可以比非必要不通知的理想状态,更频繁的通知唤醒器。比如上面的代码中,我们确实在未就绪的情况下通知了唤醒器。这也没什么大问题,只是浪费了点CPU周期,不过这种实现确实使得循环更加繁忙。

修改Mini Tokio

接下来是修改Mini Tokio的代码,以接收唤醒器通知。我们希望执行器仅在任务唤醒时运行它们,为此,需要给Mini Tokio实现唤醒器。调用唤醒器时,其关联的任务将被排入执行队列。Mini-Tokio在轮询future时会将该唤醒器传给该future。

新的Mini Tokio将使用通道存储排入计划的任务。通道能够让排入队列的任务在任意线程上执行。唤醒器必须是SendSync的,所以我们使用crossbeam提供的通道,因为标准库的通道不是Sync的。

SendSync是Rust提供的并发相关的标记trait。能够被**发送**到其他线程的类型就是Send的,除了Rc之类的类型不是外,大多数类型都是Send的。能够通过不可变引用并发访问的类型就是Sync的。可以Send却不能Sync的类型,比如Cell,就可以通过不可变引用修改,却不能被安全的并发访问。

有关更多详情请参阅 Rust Book的相关章节

将以下依赖项添加到您的Cargo.toml,引入通道。

Cargo.toml
1
crossbeam = "0.8"

再更新MiniTikio结构体:

1
2
3
4
5
6
7
8
9
10
11
use crossbeam::channel;
use std::sync::Arc;

struct MiniTokio {
scheduled: channel::Receiver<Arc<Task>>,
sender: channel::Sender<Arc<Task>>,
}

struct Task {
// 将在下文中实现此结构体。
}

WakersSync又可被克隆。调用wake时,任务必须被安排执行。为了实现这一点,我们建立一个通道。当唤醒器调用wake()时,任务被推送到通道的发送端,我们的Task结构体将实现这个唤醒逻辑。为此,它需要同时包含生成的future和通道的发送端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
use std::sync::{Arc, Mutex};

struct Task {
// `Mutux`用以让`Task`实现`Sync`。同时只允许一个线程访问`future`。
// `Mutex`在此处并不是为了保证正确性。实际上Tokio在这里也并没使用什么互斥锁,
// 而是使用了大量其他代码实现这一功能。
future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,
executor: channel::Sender<Arc<Task>>,
}

impl Task {
fn schedule(self: &Arc<Self>) {
self.executor.send(self.clone());
}
}

为了安排任务,将Arc克隆并使用通道发送。现在,我们需要将schedule函数与std::task::Waker挂钩。标准库提供了一个低级API,让我们能够手动构造vtable以执行此操作。这种策略为实现者提供了最大的灵活性,但需要大量非安全的样板代码。这里,我们将使用futures crate提供的ArcWake实用程序,以替代直接用 RawWakerVTable。这允许我们通过实现一个简单的trait来将Task结构体呈现为一个唤醒器。

将以下依赖项添加到Cargo.toml中以引入futures

Cargo.toml
1
futures = "0.3"

然后,实现futures::task::ArcWake

1
2
3
4
5
6
7
use futures::task::{self, ArcWake};
use std::sync::Arc;
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.schedule();
}
}

当上面的定时器线程调用waker.wake()时,任务被推送到通道。接下来,我们在MiniTokio::run()函数中实现任务的接收和执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
impl MiniTokio {
fn run(&self) {
while let Ok(task) = self.scheduled.recv() {
task.poll();
}
}

/// 初始化一个新的mini-tokio实例。
fn new() -> MiniTokio {
let (sender, scheduled) = channel::unbounded();

MiniTokio { scheduled, sender }
}

/// 在mini-tokio中生成一个future
///
/// 给定的future将包装在`Task`中并被送入`scheduled`队列。调用`run`时将执行此future。
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
Task::spawn(future, &self.sender);
}
}

impl Task {
fn poll(self: Arc<Self>) {
// 使用`Task`的实例创建一个唤醒器。这用到了上面的`ArcWake`实现。
let waker = task::waker(self.clone());
let mut cx = Context::from_waker(&waker);

// 没有其他线程尝试获取future的锁。
let mut future = self.future.try_lock().unwrap();

// 轮询future
let _ = future.as_mut().poll(&mut cx);
}

// 使用给定future生成一个新的task。
//
// 初始化一个包裹着给定future的新Task,并将其推送至`sender`。通道的接收端将获取这个task并执行。
fn spawn<F>(future: F, sender: &channel::Sender<Arc<Task>>)
where
F: Future<Output = ()> + Send + 'static,
{
let task = Arc::new(Task {
future: Mutex::new(Box::pin(future)),
executor: sender.clone(),
});

let _ = sender.send(task);
}

}

以上代码做了很多事,首先,首先,实现MiniTokio::run(),该函数执行一个从通道接收计划任务的循环。由于任务在被唤醒后推送到通道,因此这些任务在执行时能够取得进展。

此外,MiniTokio::new()MiniTokio::spawn()函数现在改为使用通道,而不是之前的VecDeque。当生成新任务时,它们会得到一份通道发送端的克隆,从而让任务在在运行时自行使用。

Task::poll()函数使用futures crate中的ArcWake实用程序创建唤醒器。唤醒器用于创建task::Contexttask::Context将传递给poll

小结

我们现在已经看到了异步Rust如何工作的端到端示例。Rust的async/await特性通过多个trait共同实现。这允许第三方crate使用自定义的执行细节,如Tokio。

  • 异步Rust操作是惰性的,需要被调用者轮询。
  • 唤醒器被传递给future,使得future能够找到到调用它的任务。
  • 当资源尚未就绪时,返回Poll::Pending并记录任务的唤醒器。
  • 当资源准备就绪时,通知任务的唤醒器。
  • 执行器收到通知并安排任务执行。
  • 再次轮询任务,这次资源就绪,任务取得进展。

一些未解决的问题

回忆实现Delay future时,我们提到还有一些事情需要解决。Rust的异步模型允许单个future跨任务执行。考虑以下情形:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
use futures::future::poll_fn;
use std::future::Future;
use std::pin::Pin;

#[tokio::main]
async fn main() {
let when = Instant::now() + Duration::from_millis(10);
let mut delay = Some(Delay { when });

poll_fn(move |cx| {
let mut delay = delay.take().unwrap();
let res = Pin::new(&mut delay).poll(cx);
assert!(res.is_pending());
tokio::spawn(async move {
delay.await;
});

Poll::Ready(())
}).await;
}

poll_fn函数使用闭包创建一个Future实例。上面的代码片段创建了一个Delay实例,轮询它一次,然后将Delay实例发送到新任务并执行.await。在这个例子中,Delay::poll不同Waker实例调用了不止一次。发生这种情况时,您必须确保在最近一次调用pollWaker上调用wake

在实现future时,应当假设每次调用poll可能返回不同的Waker实例,轮询函数必须使用新的唤醒器代替旧的唤醒器。

我们早期的Delay实现在每次轮询时都会生成一个新线程。这没什么问题,但如果轮询过于频繁,效率可能会非常低(例如,如果您select!该future和其他future,只要其中之一有事件发生,则两者都会被轮询)。一种方法是记住你是否已经生成了一个线程,并只有在未生成线程时才执行生成。但是如果这样做,则必须确保在之后调用轮询时更新线程的唤醒器,否则你执行唤醒时将可能使用旧的唤醒器。

为了修复之前的实现,我们可以这样做:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::thread;
use std::time::{Duration, Instant};

struct Delay {
when: Instant,
// 当我们生一个新线程时为`Some`,否则为`None`。
waker: Option<Arc<Mutex<Waker>>>,
}

impl Future for Delay {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
// 首先,如果这是第一次调用future,则生成一个计时器线程。
// 如果计时器线程已经存在,则应确保我们保存的`Waker`就是当前任务的唤醒器。
if let Some(waker) = &self.waker {
let mut waker = waker.lock().unwrap();

// 检测保存的唤醒器是否匹配当前线程的唤醒器。
// 这一步是必要的,因为`Delay` future的实例可能在两次轮询间隔内移动到另一个任务中。
// 如果发生移动,则给定的`Context`中包含的唤醒器将会不同,
// 我们必须更新之前存储的唤醒器,以对此变化做出反应。
if !waker.will_wake(cx.waker()) {
*waker = cx.waker().clone();
}
} else {
let when = self.when;
let waker = Arc::new(Mutex::new(cx.waker().clone()));
self.waker = Some(waker.clone());

// 此处为第一次调用`poll`,生成计时器线程
thread::spawn(move || {
let now = Instant::now();

if now < when {
thread::sleep(when - now);
}

// 当持续时间过去之后,调用唤醒器以通知调用者。
let waker = waker.lock().unwrap();
waker.wake_by_ref();
});
}

// 在保存了唤醒器并启动了计时器线程后,就该检查给定的延时是否到达。
// 这可以通过检测当前时刻来实现,如果持续时间已经过去,则future完成并返回`Poll::Ready``。
if Instant::now() >= self.when {
Poll::Ready(())
} else {
// 如果持续时间没到,则future未完成,应返回`Poll:Pending`。
// `Future` trait协议规定,返回`Pending`时,future应通知一次给定唤醒器,
// future应被再次轮询。在我们的例子中,此处返回`Pending`,
// 我们承诺一旦持续时间过去,就将调用参数`Context`中给定的唤醒器。
// 我们能够做出承诺,是依靠上面生成的计时器线程。
//
// 如果我们忘记调用唤醒器,任务将永久挂起
Poll::Pending
}
}
}

这有点复杂,不过基本思路是,在每次调用poll时,future检查提供的唤醒器是否与先前保存的唤醒器一致,若不一致,则必须保存新的唤醒器。

Notify实用程序

我们演示了如何使用唤醒器手动实现Delay future。唤醒器是是异步Rust工作的基础。通常,没有必要深入到那个程度。例如,在Delay中,我们完全可以通过使用async/await配合tokio::sync::Notify实用程序来实现。该实用程序提供了一个基础的任务通知机制。它负责处理唤醒器的相关细节,包括确保保存的唤醒器与当前任务的唤醒器一致。

使用Notify,我们可以像这样使用async/await实现delay函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
use tokio::sync::Notify;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::thread;

async fn delay(dur: Duration) {
let when = Instant::now() + dur;
let notify = Arc::new(Notify::new());
let notify2 = notify.clone();

thread::spawn(move || {
let now = Instant::now();

if now < when {
thread::sleep(when - now);
}

notify2.notify_one();
});


notify.notified().await;
}

10 Select


现在,当想要向系统添加并发时,我们就生成一个新任务。下面,我们将介绍一些使用Tokio并发执行异步代码的其他方法。

tokio::select!

tokio::select!宏允许等待多个异步计算,并在某个计算完成时返回。

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();

tokio::spawn(async {
let _ = tx1.send("one");
});

tokio::spawn(async {
let _ = tx2.send("two");
});

tokio::select! {
val = rx1 => {
println!("rx1 completed first with {:?}", val);
}
val = rx2 => {
println!("rx2 completed first with {:?}", val);
}
}
}

对于两个oneshot通道,任一通道都可能先完成。select!语句在两个通道上等待并将val绑定到任务返回的值。当tx1tx2完成时,执行相关的代码块。

未完成的分支将被丢弃。在示例中,程序等待每个通道的oneshot::Receiver。未完成通道的oneshot::Receiver会被丢弃。

取消

异步Rust的取消操作,是通过放弃future来执行的。回顾“深入异步”章节,异步Rust使用future实现,而future是惰性的,操作只有被轮询时才会进行。如果如果future被丢弃,由于所有相关状态都已删除,因此无法进行操作。

也就是说,有时异步操作会生成后台任务,或启动在后台运行的其他操作。例如,在上面的示例中,生成了一个任务以发送消息。通常,任务将执行一些计算以生成值。

Future或其他类型可以实现Drop以清理后台资源。Tokiooneshot::Receiver通过向Sender端发送一条关闭消息来实现Drop。发送端可以收到此消息并取消进行中的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
use tokio::sync::oneshot;

async fn some_operation() -> String {
// Compute value here
}

#[tokio::main]
async fn main() {
let (mut tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();

tokio::spawn(async {
// Select on the operation and the oneshot's
// `closed()` notification.
tokio::select! {
val = some_operation() => {
let _ = tx1.send(val);
}
_ = tx1.closed() => {
// `some_operation()` is canceled, the
// task completes and `tx1` is dropped.
}
}
});

tokio::spawn(async {
let _ = tx2.send("two");
});

tokio::select! {
val = rx1 => {
println!("rx1 completed first with {:?}", val);
}
val = rx2 => {
println!("rx2 completed first with {:?}", val);
}
}
}

future的实现

为了更好的理解select!是如何工作的,让我们看看一个假想的Future实现会是什么样子。这是一个简化版本,现实中的select!还包含其他功能,例如随机选择首先轮询的分支。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
use tokio::sync::oneshot;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

struct MySelect {
rx1: oneshot::Receiver<&'static str>,
rx2: oneshot::Receiver<&'static str>,
}

impl Future for MySelect {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if let Poll::Ready(val) = Pin::new(&mut self.rx1).poll(cx) {
println!("rx1 completed first with {:?}", val);
return Poll::Ready(());
}

if let Poll::Ready(val) = Pin::new(&mut self.rx2).poll(cx) {
println!("rx2 completed first with {:?}", val);
return Poll::Ready(());
}

Poll::Pending
}
}

#[tokio::main]
async fn main() {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();

// use tx1 and tx2

MySelect {
rx1,
rx2,
}.await;
}

MySelect future包含两个分支的future。轮询MySelect时,将轮询第一个分支。若就绪,则使用该值,MySelect完成。在.await收到future的输出后,future将被丢弃,这导致两个分支的futures均被删除。由于一个分支没有完成,操作实际上被取消了。

回忆上一节的内容:

当future返回Poll:Pending时,必须确保在某个时刻通知唤醒器,若忘记执行此操作,会导致任务永久挂起。

在任务返回Poll:Pending后忘记唤醒是一个常见的bug。

MySelect实现中并没用显式使用Context参数,而是通过将cx传给内部的future来满足唤醒器的要求。通过仅在从内部future收到Poll::Pending时才返回Poll::Pending,来使内部future也满足唤醒器的要求,MySelect也满足唤醒器的要求。

语法

select!宏可以处理多个分支,目前的限制为64个分支。每个分支的结构如下:

1
<pattern> = <async expression> => <handler>,

当select宏求值时,所有<async expression>都会聚合并同时执行。当表达式完成时,结果与<pattern>匹配,若结果匹配,则取消所有剩余的异步表达式并执行<handler><handler>表达式可以访问<pattern>绑定的变量。

<pattern>的基本用法是给一个变量名赋值,即异步表达式的结果绑定到该变量名,使<handler>可以访问该变量。这就是为什么在上面的示例中,val用于<pattern><handler>能够访问val

如果<pattern>与异步计算的结果不匹配,则剩余的异步表达式将继续并发执行,直到下一个完成。这时,上面的逻辑继续应用作用于该结果。

select!中可以使用任意异步表达式,因此可以定义更复杂的选择计算。

下面的例子中,我们在oneshot通道的输出和TCP连接上进行选择操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
use tokio::net::TcpStream;
use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
let (tx, rx) = oneshot::channel();

// 生成一个使用oneshot发送信息的任务
tokio::spawn(async move {
tx.send("done").unwrap();
});

tokio::select! {
socket = TcpStream::connect("localhost:3465") => {
println!("Socket connected {:?}", socket);
}
msg = rx => {
println!("received message first {:?}", msg);
}
}
}

下面的例子中,我们在oneshot通道和TcpListenersocket间进行选择操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
use tokio::net::TcpListener;
use tokio::sync::oneshot;
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
let (tx, rx) = oneshot::channel();

tokio::spawn(async move {
tx.send(()).unwrap();
});

let mut listener = TcpListener::bind("localhost:3465").await?;

tokio::select! {
_ = async {
loop {
let (socket, _) = listener.accept().await?;
tokio::spawn(async move { process(socket) });
}

// Help the rust type inferencer out
Ok::<_, io::Error>(())
} => {}
_ = rx => {
println!("terminating accept loop");
}
}

Ok(())
}

循环将一直运行,直到遇到错误,或rx收到值。_模式表示我们对异步计算的返回值不感兴趣。

返回值

tokio::select!宏返回<handler>表达式求值的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
async fn computation1() -> String {
// .. computation
}

async fn computation2() -> String {
// .. computation
}

#[tokio::main]
async fn main() {
let out = tokio::select! {
res1 = computation1() => res1,
res2 = computation2() => res2,
};

println!("Got = {}", out);
}

因此,每个分支的<handler>表达式的求值结果应为相同的类型。如果某个select!表达式不需要输出,最好将表达式的求值结果写为()

错误

使用?运算符从表达式中传播错误,具体的行为取决于?是用在异步表达式(<async expression>)中,还是用在处理程序(<handelr>)中。在异步表达式中使用?会将错误传播到异步表达式之外,这使得异步表达式的输出为Result类型;在处理程序中使用?会立刻将错误传播到select!表达式之外。让我们再看一遍上面接受socket循环的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
use tokio::net::TcpListener;
use tokio::sync::oneshot;
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
// [setup `rx` oneshot channel]

let listener = TcpListener::bind("localhost:3465").await?;

tokio::select! {
res = async {
loop {
let (socket, _) = listener.accept().await?;
tokio::spawn(async move { process(socket) });
}

// Help the rust type inferencer out
Ok::<_, io::Error>(())
} => {
res?;
}
_ = rx => {
println!("terminating accept loop");
}
}

Ok(())
}

注意listener.accept().await??运算符将错误从该表达式传播到res绑定变量。出现错误时,res将被设置为Err(_)。处理程序中再次使用了?运算符,res?语句将错误传播到main函数之外。

模式匹配

回忆select!宏的分支定义语法:

1
<pattern> = <async expression> => <handler>,

到目前为止,我们只在<pattern>处用了变量绑定。实际上,我们可以使用任何Rust模式匹配。例如,假设我们从多个MPSC通道接收数据,就可以这样写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
let (mut tx1, mut rx1) = mpsc::channel(128);
let (mut tx2, mut rx2) = mpsc::channel(128);

tokio::spawn(async move {
// Do something w/ `tx1` and `tx2`
});

tokio::select! {
Some(v) = rx1.recv() => {
println!("Got {:?} from rx1", v);
}
Some(v) = rx2.recv() => {
println!("Got {:?} from rx2", v);
}
else => {
println!("Both channels closed");
}
}
}

在这个例子中,select!表达式等待从rx1rx2接收值。如果一个通道关闭,recv()返回None,则无法满足匹配,此分支将失效。于是select!表达式继续等待剩余的分支。

注意这个select!表达式包含一个else分支,即select!表达式必须返回一个值。使用模式匹配时,可能没有分支能够匹配其的关联模式,如果发生这种情况,则由else分支求值。

借用

当生成任务时,产生异步表达式必持有其所需数据的所有权,而select!宏并没有这种限制,每个分支的异步表达式可以借用数据且同时运行。遵循Rust的借用规则,多个异步表达式可以同时对某块数据进行不可变借用,或者,仅一个异步表达式可以对某块数据进行可变借用。

下面的例子中,我们同时将相同的数据发送到两个不同的TCP目的地址。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use std::io;
use std::net::SocketAddr;

async fn race(
data: &[u8],
addr1: SocketAddr,
addr2: SocketAddr
) -> io::Result<()> {
tokio::select! {
Ok(_) = async {
let mut socket = TcpStream::connect(addr1).await?;
socket.write_all(data).await?;
Ok::<_, io::Error>(())
} => {}
Ok(_) = async {
let mut socket = TcpStream::connect(addr2).await?;
socket.write_all(data).await?;
Ok::<_, io::Error>(())
} => {}
else => {}
};

Ok(())
}

两个异步表达式都使用了data变量这一不可变借用。当其中一个操作成功完成时,另一个操作将被丢弃。因为我们需要匹配Ok(_),如果一个表达式失败,另一个则将继续执行。

当执行到每个分支的<handler>时,select!保证只会有一个<handler>执行。因此,每个<handler>可以对同一块数据进行可变借用。

下面的例子中,两个处理程序都尝试对out进行修改:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();

let mut out = String::new();

tokio::spawn(async move {
// Send values on `tx1` and `tx2`.
});

tokio::select! {
_ = rx1 => {
out.push_str("rx1 completed");
}
_ = rx2 => {
out.push_str("rx2 completed");
}
}

println!("{}", out);
}

循环

select!宏经常用在循环中,本节将介绍一些例子,以展示在循环中使用select!宏的常见方法。第一个例子是在多个通道上进行选择:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
let (tx1, mut rx1) = mpsc::channel(128);
let (tx2, mut rx2) = mpsc::channel(128);
let (tx3, mut rx3) = mpsc::channel(128);

loop {
let msg = tokio::select! {
Some(msg) = rx1.recv() => msg,
Some(msg) = rx2.recv() => msg,
Some(msg) = rx3.recv() => msg,
else => { break }
};

println!("Got {:?}", msg);
}

println!("All channels have been closed.");
}

上面的例子在三个通道的接收端上选择,当任一通道的接收端收到信息,就输出到STDOUT。当一个通道关闭时,recv()返回None,由于模式匹配,select!宏将继续等待其余通道。当所有通道关闭时,else分支将求值从而终止循环。

select!宏以随机的方式决定首先检查哪个分支是否就绪,当多个通道的值均未就绪时,将随机选择一个通道接受信息。这是为了处理接收循环的处理速度,慢于消息推入通道的速度的情况,这意味着通道可能被填满。如果select!首选检查的分支不是随机的,而是每次循环都首选检查rx1,若rx1始终都有一条新消息,则其他通道将永远不会被检查。

如果选择时! 进行评估,多个通道有未决消息,只有一个通道弹出一个值。 所有其他频道都没有受到影响,他们的消息一直在这些频道中,直到下一次循环迭代为止。 没有消息丢失。

select!求值,只有一个通道有就绪值,其余通道都在等待时,其余通道的代码不会被执行,其余通道的信息将停留在通道内知道下一次迭代。因此不会有信息丢失。

恢复一个异步操作

下面的例子将展示如何在多个select!调用间执行同一个异步操作。在此示例中,我们有一个类型为i32的MPSC通道和一个异步函数。我们希望一直运行异步函数,直到任务完成,或在通道上接收到一个偶数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
async fn action() {
// Some asynchronous logic
}

#[tokio::main]
async fn main() {
let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);

let operation = action();
tokio::pin!(operation);

loop {
tokio::select! {
_ = &mut operation => break,
Some(v) = rx.recv() => {
if v % 2 == 0 {
break;
}
}
}
}
}

注意这里是如何再循环外,而不是在select!宏内调用action()的。aciont()的返回值被赋给operation,且不执行.await。然后我们在operation上调用tokio:pin!

select!循环内,我们传入&mut operation,而不是operationoperation变量正在跟踪执行中的异步操作,循环的每次迭代都使用同一个变量,而不是调用一个新的action()

另一个select!分支从通道接收数据,如果是一个偶数,则终止循环,否则再次select!

此处是我们第一次使用tokio::pin!,这里我们并不介绍关于内存固定的细节,只需要注意,如果要.await一个引用,被引用的值必须是被固定的,或是实现了Unpin的。

如果我们删除tokio::pin!一行并尝试编译,就会收到以下错误:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
error[E0599]: no method named `poll` found for struct
`std::pin::Pin<&mut &mut impl std::future::Future>`
in the current scope
--> src/main.rs:16:9
|
16 | / tokio::select! {
17 | | _ = &mut operation => break,
18 | | Some(v) = rx.recv() => {
19 | | if v % 2 == 0 {
... |
22 | | }
23 | | }
| |_________^ method not found in
| `std::pin::Pin<&mut &mut impl std::future::Future>`
|
= note: the method `poll` exists but the following trait bounds
were not satisfied:
`impl std::future::Future: std::marker::Unpin`
which is required by
`&mut impl std::future::Future: std::future::Future`

虽然我们在上一章介绍了Future,但仍然不能明白这个错误。如果你尝试在一个引用上调用.await时,遇到了这种关于Future未被实现的错误,那么你可能需要固定该future。

标准库上阅读更多有关Pin的信息。

修改一个分支

再看一个稍微复杂一点的循环,其中有:

  1. 一个i32类型的通道。
  2. i32值上执行的异步操作。

我们想要实现的逻辑是:

  1. 在通道上等待一个偶数。
  2. 使用该偶数作为输入以启动异步操作。
  3. 等待操作完成,但同时监听通道上的其他偶数。
  4. 如果在现有操作完成之前收到新的偶数,则终止现有操作并使用新偶数重新开始。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
async fn action(input: Option<i32>) -> Option<String> {
// 若输入为`None`则返回`None`。
// 此处也可以简写为`let i = input?;`
let i = match input {
Some(input) => input,
None => return None,
};
// async logic here
}

#[tokio::main]
async fn main() {
let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);

let mut done = false;
let operation = action(None);
tokio::pin!(operation);

tokio::spawn(async move {
let _ = tx.send(1).await;
let _ = tx.send(3).await;
let _ = tx.send(2).await;
});

loop {
tokio::select! {
res = &mut operation, if !done => {
done = true;

if let Some(v) = res {
println!("GOT = {}", v);
return;
}
}
Some(v) = rx.recv() => {
if v % 2 == 0 {
// `.set` is a method on `Pin`.
operation.set(action(Some(v)));
done = false;
}
}
}
}
}

我们使用与上一个例子类似的策略,在循环外调用异步函数并将结果赋给operation,再将operation变量固定。循环中的选择操作同事在operation上与通道接收端上进行。

留意action是如何将Option<i32>作为参数的。在我们收到第一个偶数之前,我们需要将operation实例化,此处通过接收一个Option并返回一个Option来创建action,而如果传入为None则返回也为None。第一次循环迭代,operation返回None后立即完成。

例子中还使用了一些新的语法,第一个分支中的, if !done,即设置分支的前置条件。在解释它是如何工作之前,让我们看看如果删掉前置条件会发生什么。删除, if !done再运行例子会产生以下输出:

1
2
thread 'main' panicked at '`async fn` resumed after completion', src/main.rs:1:55
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

operation完成后再次尝试调用就会发生此错误。通常,在使用.await时,被等待的值会被使用掉,而在这个例子中,我们等待的是一个引用。这意味着operation在完成后仍然存在。

为避免这种panic,我们必须要在操作完成后禁用第一个分支,此处done变量用于跟踪操作是否完成。一个select!分支可能包含一个前置条件,select!在等待分支之前会检查这个前提条件,如果前置条件求值为false,则分支被禁用。done变量初始化时为false,当操作完成时,done设置为true,在下一个循环迭代中将禁用operation分支。当从通道接收到的消息为偶数时,operation将被重置,done也将置为false

单任务并发

tokio::spawnselect!都可以运行异步并发操作,但是,它们运行并发操作时使用的策略并不相同。tokio::spawn函数接受一个异步操作后生成一个新任务以运行该操作,即Tokio运行时调度的对象是任务。Tokio分别安排了两个不同的任务,它们可能运行在不同的系统线程上,因此,生成任务与生成线程具有相同的限制:禁止借用。

select!宏在同一任务上同时运行所有分支。由于select!宏的所有分支在同一个任务上执行,它们永远不会同时运行。即slect!宏在单个任务上复用了异步操作。


流,就是一系列异步值,是Rust的std::iter::Iterator的异步版本,用Stream trait表示。在异步函数中可以迭代遍历流,也可以使用适配器转换流。Tokio在StreamExt trait上提供了许多通用适配器。

Tokio在一个独立的crate中提供流支持:tokio-stream

1
tokio-stream = "0.1"

目前,Tokio的Stream实用程序放在tokio-stream crate 中。一旦Steam triat在Rust标准库中稳定下来,Tokio的流实用程序将被移动到tokio中。

迭代

目前,Rust语言还不支持异步for循环,代替方案是使用while let循环配合StreamExt::next()在流上进行迭代。

1
2
3
4
5
6
7
8
9
10
use tokio_stream::StreamExt;

#[tokio::main]
async fn main() {
let mut stream = tokio_stream::iter(&[1, 2, 3]);

while let Some(v) = stream.next().await {
println!("GOT = {:?}", v);
}
}

与迭代器类似,next()方法返回Option<T>T为流内值的类型,若收到None则表示流迭代终止。

Mini-Redis广播

让我们来再看一个使用Mini-Redis客户端的稍微复杂的例子。

你可以在这里找到完整的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
use tokio_stream::StreamExt;
use mini_redis::client;

async fn publish() -> mini_redis::Result<()> {
let mut client = client::connect("127.0.0.1:6379").await?;

// Publish some data
client.publish("numbers", "1".into()).await?;
client.publish("numbers", "two".into()).await?;
client.publish("numbers", "3".into()).await?;
client.publish("numbers", "four".into()).await?;
client.publish("numbers", "five".into()).await?;
client.publish("numbers", "6".into()).await?;
Ok(())
}

async fn subscribe() -> mini_redis::Result<()> {
let client = client::connect("127.0.0.1:6379").await?;
let subscriber = client.subscribe(vec!["numbers".to_string()]).await?;
let messages = subscriber.into_stream();

tokio::pin!(messages);

while let Some(msg) = messages.next().await {
println!("got = {:?}", msg);
}

Ok(())
}

#[tokio::main]
async fn main() -> mini_redis::Result<()> {
tokio::spawn(async {
publish().await
});

subscribe().await?;

println!("DONE");

Ok(())
}

生成一个任务以在Mini-Redis服务器的“number”频道上发布消息。然后,在主任务中订阅“numbers”频道并打印收到的消息。

订阅后,在返回的订阅者上调用into_stream()。如此将消耗掉该Subscriber,并返回一个能够在消息到达时产生消息的流。请注意,在开始迭代消息之前,我们用tokio::pin!将流固定在栈上,因为在流上调用next()需要固定该流。into_stream()函数返回一个未固定的流,我们必须显式固定后才能迭代它。

固定值的一个关键属性是可以将指针指向固定数据,并且调用者可以确信指针保持有效。 async/await 使用此功能来支持跨 .await 点借用数据。

当一个Rust值不能再在内存中移动时,就称为被“固定”了。对于被固定的值,其中一个关键属性是可以将指针指向该数据,而调用者可以确信指针始终有效。async/await使用此特性来支持跨.await时对借用数据进行调用。

如果我们忘记固定流,就会看到类似下面的错误:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
error[E0277]: `from_generator::GenFuture<[static generator@Subscriber::into_stream::{closure#0} for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6> {ResumeTy, &'r mut Subscriber, Subscriber, impl Future, (), std::result::Result<Option<Message>, Box<(dyn std::error::Error + Send + Sync + 't0)>>, Box<(dyn std::error::Error + Send + Sync + 't1)>, &'t2 mut async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't3)>>>, async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't4)>>>, std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't5)>>, impl Future, Option<Message>, Message}]>` cannot be unpinned
--> streams/src/main.rs:29:36
|
29 | while let Some(msg) = messages.next().await {
| ^^^^ within `tokio_stream::filter::_::__Origin<'_, impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`, the trait `Unpin` is not implemented for `from_generator::GenFuture<[static generator@Subscriber::into_stream::{closure#0} for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6> {ResumeTy, &'r mut Subscriber, Subscriber, impl Future, (), std::result::Result<Option<Message>, Box<(dyn std::error::Error + Send + Sync + 't0)>>, Box<(dyn std::error::Error + Send + Sync + 't1)>, &'t2 mut async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't3)>>>, async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't4)>>>, std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't5)>>, impl Future, Option<Message>, Message}]>`
|
= note: required because it appears within the type `impl Future`
= note: required because it appears within the type `async_stream::async_stream::AsyncStream<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 'static)>>, impl Future>`
= note: required because it appears within the type `impl Stream`
= note: required because it appears within the type `tokio_stream::filter::_::__Origin<'_, impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`
= note: required because of the requirements on the impl of `Unpin` for `tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`
= note: required because it appears within the type `tokio_stream::map::_::__Origin<'_, tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>`
= note: required because of the requirements on the impl of `Unpin` for `tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>`
= note: required because it appears within the type `tokio_stream::take::_::__Origin<'_, tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>>`
= note: required because of the requirements on the impl of `Unpin` for `tokio_stream::take::Take<tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>>`

如果你遇到类似上面这样的错误,请尝试固定该值。

在运行上面的代码之前,先启动Mini-Redis服务端:

1
mini-redis-server

再尝试运行代码,我们就会在STDOUT中看到如下输出:

1
2
3
4
5
6
got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"two" })
got = Ok(Message { channel: "numbers", content: b"3" })
got = Ok(Message { channel: "numbers", content: b"four" })
got = Ok(Message { channel: "numbers", content: b"five" })
got = Ok(Message { channel: "numbers", content: b"6" })

由于订阅和发布之间存在竞争,一些早期消息可能会被丢弃。该程序永远不会退出,只要服务端保持活动状态,对Mini-Redis频道的订阅就会保持活动状态。

让我们看看如何使用流来扩展这个程序。

适配器

接受一个Stream并返回另一个Stream的函数通常称为“流适配器”,因为这属于“适配器模式”。常见的流适配器包括maptakefilter

更新Mini-Redis以使其可以退出。我们希望收到三个消息后就停止遍历。这可以使用take完成,此适配器限制流最多只能产生n条消息。

1
2
3
let messages = subscriber
.into_stream()
.take(3);

再次运行程序,输出为:

1
2
3
got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"two" })
got = Ok(Message { channel: "numbers", content: b"3" })

而这一次程序执行完毕后退出。

现在,让我们将流中的信息限制为一位数,我们将通过检查消息长度来实现这一点。这次使用filter适配器,以丢弃任何与此断言不匹配的消息。

1
2
3
4
5
6
7
let messages = subscriber
.into_stream()
.filter(|msg| match msg {
Ok(msg) if msg.content.len() == 1 => true,
_ => false,
})
.take(3);

再次运行程序,输出为:

1
2
3
got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"3" })
got = Ok(Message { channel: "numbers", content: b"6" })

请注意,适配器的应用顺序很重要。先调用filter再调用take与先调用take再调用filter是不同的。

最后,我们将整理输出,即删除输出中的Ok(Message { ... })部分,这次使用map。因为这是在filter之后应用的,我们知道消息是Ok,因此可以使用unwrap()

1
2
3
4
5
6
7
8
let messages = subscriber
.into_stream()
.filter(|msg| match msg {
Ok(msg) if msg.content.len() == 1 => true,
_ => false,
})
.map(|msg| msg.unwrap().content)
.take(3);

此时的输出为:

1
2
3
got = b"1"
got = b"3"
got = b"6"

另一种选择是使用filter_mapfiltermap合并在一个调用中。

还有更多可用的适配器,请查看此列表

实现Stream

Stream trait与Future trait非常相似。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Stream {
type Item;

fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Self::Item>>;

fn size_hint(&self) -> (usize, Option<usize>) {
(0, None)
}
}

Stream::poll_next()函数很像Future::poll,不同在于你可以重复调用它,以从流中接收许多更多值。正如我们在深入异步中看到的那样,当流的返回值尚未就绪时,将返回Poll::Pending,此时任务的唤醒器已注册,一旦要再次轮询流,唤醒器就会收到通知。

size_hint()方法的使用方式与其在迭代器上的使用方式相同。

通常,在手动实现Stream时,是通过组合futures和其他流来完成的。作为一个例子,让我们再次以深入异步中实现的Delay future为基础构建。这次实现为以10毫秒为间隔产生三次()的流。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
use tokio_stream::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

struct Interval {
rem: usize,
delay: Delay,
}

impl Stream for Interval {
type Item = ();

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<()>>
{
if self.rem == 0 {
// No more delays
return Poll::Ready(None);
}

match Pin::new(&mut self.delay).poll(cx) {
Poll::Ready(_) => {
let when = self.delay.when + Duration::from_millis(10);
self.delay = Delay { when };
self.rem -= 1;
Poll::Ready(Some(()))
}
Poll::Pending => Poll::Pending,
}
}
}

async-stream

使用Stream特性手动实现流可能很乏味。然而,Rust尚不支持使用async/await语法定义流,该特性正在实现,只是尚未完成。

可将async-stream crate当作临时解决方案。该crate提供了一个stream!宏可以将输入转换为流。使用该crate,上面的interval可以这样实现:

1
2
3
4
5
6
7
8
9
10
11
12
use async_stream::stream;
use std::time::{Duration, Instant};

stream! {
let mut when = Instant::now();
for _ in 0..3 {
let delay = Delay { when };
delay.await;
yield ();
when += Duration::from_millis(10);
}
}

评论

Your browser is out-of-date!

Update your browser to view this website correctly.&npsb;Update my browser now

×