PingCap的Rust训练课程4:并发与并行

前言

任务:使用自定义协议,创建一个具有同步网络的多线程、持久化键/值存储的服务端和客户端。

目标

  • 写一个简单的线程池
  • 使用通道进行跨线程通信
  • 利用锁共享数据结构
  • 无锁执行读操作
  • 对单线程与多线程版本进行基准测试

关键词:线程池、通道、锁、无锁数据结构、原子化、参数化基准测试。

介绍

在这个项目中,您将创建一个使用自定义协议进行通信的简单键/值服务端和客户端。服务端将使用同步网络,并将使用较为复杂的并发实现来响应多个请求。内存索引将改为并发数据结构,由所有线程共享,而压缩操作将在专用的线程上完成,以减少单个请求的延迟。

项目需求规格

cargo项目kvs建立了一个名为kvs-client的命令行键值存储客户端,和一个名为kvs-server的键值存储服务端,二者又都调用了一个名为kvs的库。客户端通过一个自定义协议与服务端通信。

命令行规格与前一个项目相同。本项目与前一个项目的的不同之处在于并发实现,在我们实现时会对其进行描述。

库的规格几乎相同,除了以下两点:一是这一次所有的KvsEngineKvStore等类型的方法都使用&self而不是&mut self,我们将实现Clone trait,这在并发数据结构中很常见。但为什么呢?其实并不是说我们将不再编写不可变代码,尽管它们将会在线程之间共享。那为什么要在方法签名中避免使用&mut self?也许您现在还不清楚,但在本项目结束时,它会变得显而易见。

二是本项目中的库包含一个新的trait:ThreadPool。它包含以下方法:

  • ThreadPool::new(threads: u32) -> Result<ThreadPool>
    创建一个新的线程池,立即生成指定数量的线程。
    如果未能生成线程,则返回错误。所有先前产生的线程都被结束。
  • ThreadPool::spawn<F>(&self, job: F) where F: FnOnce() + Send + 'static
    在线程池中运行一个函数。
    运行操作总是成功的,但如果函数发生panic,线程池仍将继续以相同数量的线程运行 — 线程数不会减少,线程池也不会被析构、损坏或失效。

到这个项目结束时,该特性将有几个实现,您将再次执行基准测试来比较它们。

本项目完全不需要对客户端代码进行任何修改。

项目设置

继续上一个项目,删除之前的测试目录,并复制本项目的测试目录。这个项目应该包含一个名为kvs的库,以及两个可执行文件,kvs-serverkvs-client

Cargo.toml中需要以下开发依赖项:

Cargo.toml
1
2
3
4
5
6
7
8
9
[dev-dependencies]
assert_cmd = "0.11"
criterion = "0.2.11"
crossbeam-utils = "0.6.5"
predicates = "1.0.0"
rand = "0.6.5"
tempfile = "3.0.7"
walkdir = "2.2.7"
panic-control = "0.1.4"

与以前的项目一样,添加足够的定义以使测试套件通过编译。


添加新模块thread_pool

1
2
3
4
5
6
7
.
└── src
└── thread_pool
├── mod.rs
├── naive.rs
├── rayon.rs
└── shared_queue.rs

lib.rs中添加thread_pool模块:

lib.rs
1
pub mod thread_pool;

mod.rs中声明三种线程池:

./thread_pool/mod.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//! This module provides various thread pools. All thread pools should implement
//! the `ThreadPool` trait.
use crate::Result;

mod naive;
mod rayon;
mod shared_queue;

pub use self::naive::NaiveThreadPool;
pub use self::rayon::RayonThreadPool;
pub use self::shared_queue::SharedQueueThreadPool;

/// The trait that all thread pools should implement.
pub trait ThreadPool {
/// Creates a new thread pool, immediately spawning the specified number of
/// threads.
///
/// Returns an error if any thread fails to spawn. All previously-spawned threads
/// are terminated.
fn new(threads: u32) -> Result<Self>
where
Self: Sized;

/// Spawns a function into the thread pool.
///
/// Spawning always succeeds, but if the function panics the threadpool continues
/// to operate with the same number of threads &mdash; the thread count is not
/// reduced nor is the thread pool destroyed, corrupted or invalidated.
fn spawn<F>(&self, job: F)
where
F: FnOnce() + Send + 'static;
}

三种线程池都用最简单的标准线程做实现,以通过编译。这里以naive.rs为例,其他两个也使用这样的实现即可:

naive.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
use std::thread;
use super::ThreadPool;

use crate::Result;

/// It is actually not a thread pool. It spawns a new thread every time
/// the `spawn` method is called.
pub struct NaiveThreadPool;

impl ThreadPool for NaiveThreadPool {
fn new(_threads: u32) -> Result<Self> {
Ok(NaiveThreadPool)
}

fn spawn<F>(&self, job: F)
where
F: FnOnce() + Send + 'static,
{
thread::spawn(job);
}
}

知识背景:阻塞和多线程

到目前为止,您已经在单线程上处理了所有请求,包括读取和写入(例如”get”和”set”)。换句话说,数据库中的所有请求都是串行的。使用我们将在这个项目中重复的图表,时间流向如下所示:

1
2
3
4
5
6
thread
+ +--------+--------+--------+--------+
T1 | | R1 | R2 | W1 | W2 |
+ +--------+--------+--------+--------+

--> read/write reqs over time -->

读取和写入操作都可能需要阻塞。阻塞是指线程在等待访问资源时停止执行(例如等待访问文件中的数据,或等待访问受锁保护的变量)。当一个线程在一个任务上被阻塞时,便可不能执行另一个任务。因此,在I/O密集型系统中,任何请求可能都在花费大量时间等待操作系统和内存从磁盘读取数据或将数据写入磁盘:

1
2
3
4
5
    +---------+----------------------------+---------+
R1 | working | waiting for data ... | working |
+---------+----------------------------+---------+

--> time -->

The simplest way to put the CPU back to work while one request is blocked is to service requests on multiple threads, so that ideally our requests are all processed concurrently, and — if we have enough CPUs — in parallel:

在一个请求被阻塞时,让CPU能够继续工作的最简单方法,是创建多个线程一起提供服务。因此理想情况下,我们的请求都是并发处理的 — 如果我们有足够的 CPU — 即并行处理:

1
2
3
4
5
6
7
8
9
10
11
12
thread
+ +--------+
T1 | | R1 |
| +--------+
T2 | | R2 |
| +--------+
T3 | | W1 |
| +--------+
T4 | | W2 |
+ +--------+

--> read/write reqs over time -->

所以这将是本项目的重点 — 并行处理请求。

第1部分:多线程

鉴于我们首次引入并发概念,就用最简单的方法:为每个传入的连接创建一个新线程,并响应该连接上的请求,然后让线程退出。如此,将工作分发到多个线程将提供怎样的性能收益?您预计延迟会受到怎样的影响?吞吐量呢?

第一步是为这种简单方法编写一个ThreadPool实现,其中ThreadPool::spawn将为每个生成的作业创建一个新线程。我们称之为NaiveThreadPool(实际上它甚至不能算是一个真正的线程池,因为这个实现不会在各作业间重用线程,但仍需要符合我们的trait规范以供之后的基准测试比较)。

我们现在不关注更复杂的实现,因为即使将这个简单方法集成到我们现有的设计中仍需要不少努力。请注意,ThreadPool::new构造函数接受一个threads参数,用于指定池中的线程数。在此实现中,该参数将处于未使用状态。

现在来实现这个版本的ThreadPool,然后我们将它集成到新的KvStore中。

需要通过的测试用例:

  • thread_pool::naive_thread_pool_*

经过“项目设置”一节的修改,现在的代码已经能够通过cargo test --test thread_pool的所有测试(实现时再修改)。

1
2
3
4
5
6
7
8
...
running 4 tests
test rayon_thread_pool_spawn_counter ... ok
test naive_thread_pool_spawn_counter ... ok
test shared_queue_thread_pool_spawn_counter ... ok
test shared_queue_thread_pool_panic_task ... ok

test result: ok. 4 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.02s

第2部分:创建可共享KvsEngine

在我们将NaiveThreadPool集成到KvsServer之前,我们必须创建KvsEngine trait并实现KvStore(现在您可以忽略上一个项目中的 SledKvsEngine,当然,您可以重新实现它作为这个项目的附加作业)。

回想一下项目需求规格规范,在本项目中,KvsEngine会使用&self作为self,而不是上一个项目中的&mut self。此外,还需要为每个实现,显式编写Clone,并为它们隐式添加Send + 'static。具体定义如下:

1
2
3
4
5
6
7
pub trait KvsEngine: Clone + Send + 'static {
fn set(&self, key: String, value: String) -> Result<()>;

fn get(&self, key: String) -> Result<Option<String>>;

fn remove(&self, key: String) -> Result<()>;
}

这个trait签名提供了很代码实现相关的信息。首先想想当我们要使用多线程实现时,为什么需要引擎实现Clone。参考Rust中其他并发数据类型的设计,例如Arc。再想想为什么需要使用&self,而不是&mut self。您对共享可变状态了解多少?在这个项目结束时,请确保你理解这里的含义 — 这就是我们使用Rust的意义

在这个模型中,KvsEngine的行为类似于另一个对象的句柄,并且由于该对象需要在线程之间共享,它可能需要保持在上,并且因为该共享状态不能是可变的,它需要由一些同步原语保护。

因此,使用线程安全的共享指针类型将KvsEngineKvStore中的数据移动到堆上,并将其保护在您选择的锁之后。

由于SledKvsEngine实现了KvsEngine,它可能也需要更改。

此时,您的单线程kvs-server应该可以再次工作,但现在有了一个可以在以后跨线程共享的KvsEngine

需要通过的测试用例:

  • kv_store::concurrent_*

  • Arc会将内容物放在堆上,并可以使用clone创建指向同一个地址的引用;
  • 之所以修改trait定义之后可以不使用mut引用,是因为实现时将以类似self.writer.lock().unwrap().set(key, value)的方式使用KvsEngine的写操作,而读操作封装在RefCell中即可共享;
  • KvStore中的内存索引改为可并发的Arc<SkipMap<...>>方式,并将写对象封装在Arc<Mutex<...>>中,以便在多个线程中执行写操作;
  • 创建一个单线程的KvStoreReader,将日志文件句柄封装在RefCell中以便共享读操作。继续负责从gen日志文件中读取CommandPos处的json数据并解析为Command返回;
  • 创建一个跨线程的KvStoreWriter接管原KvStore中的写日志操作,包括setremovecompact

第3部分:向KvsServer添加多线程

让我们在这里快速回顾一下我们的架构:KvsServer设置一个TCP套接字并开始监听;当收到一个请求时,将反序列化并调用KvsEngine trait的一些实现来存储或检索数据;最后返回响应。KvsEngine如何工作的细节与KvsServer无关。

因此,在上一个项目中,您可能模糊地创建了一个循环,例如:

1
2
3
4
5
6
7
let listener = TcpListener::bind(addr)?;

for stream in listener.incoming() {
let cmd = self.read_cmd(&stream);
let resp = self.process_cmd(cmd);
self.respond(&stream, resp);
}

现在您只需要做类似的事情,然后把循环内的工作都放在NaiveThreadPool中执行。这会将数据库查询和响应工作放在与TCP监听不同的线程中,从而将大部分繁重的工作转移到其他线程,以允许接收线程处理更多请求。如此便能够增加吞吐量,至少在多核机器上是这样。

于是,您现在仍然有一对有效的客户端/服务端键值存储,只不过现在是多线程的。


  • 在循环中clone KvsEngine,然后将KvsEngineThreadPool一起传给线程执行函数serve
  • 此时serve由于有了KvsEngineclone引用,也不需要通过self调用引擎了,因此可以不再当做类函数使用;
  • bin/kvs-server.rs中创建服务时需要加上ThreadPool参数,可以在Cargo.toml中引入num_cpus库,以在创建线程池时自动获取主机cpu个数。

第4部分:创建真正的线程池

所以现在你已经有了你的多线程架构,是时候编写一个真正的线程池了。您可能不会在实践中编写自己的线程池,因为可以使用已经过充分测试的线程池crate,不过通过自己编写线程池可以更有效的学习并发相关经验。在本项目接下来的部分中,您将像我们在上一个项目中对引擎所做的那样,抽象线程池,并将您的实现与现有线程池进行性能比较。

那么,什么是线程池?

其实也没什么复杂的。为了不给每个要执行的多线程作业都创建一个新线程,我们选择来维护一“池”的线程,并不停重用这些线程以避免不停的创建新线程。

那么,为什么?

因为可以提高性能。重用线程可以节省出少量性能,而在编写高性能应用程序时,每一点性能都很重要。想象一下创建一个新线程需要什么:

您必须有一个调用栈才能运行该线程,从而必须为该调用栈分配空间。虽然分配空间已然相当简单,但仍不如不分配来的简单。调用栈的分配方式取决于操作系统和运行时的细节,但可能涉及锁和系统调用。同样的,虽然系统调用也很简单,但是当我们处理Rust级别的性能时它们也就算不上简单了 — 减少系统调用是简单且常见的优化方式。然后必须仔细初始化该栈,以便第一个栈帧包含适当的基指针值以及栈的初始化函数序言中所需的任何其他值。在Rust中,栈需要配置一个保护页来防止栈溢出,从而保护内存安全。这需要另外两个系统调用,mmapmprotect(尽管Linux上避免了这两个系统调用)。

而这只是设置调用栈。创建新线程至少也需要一个系统调用,而内核必须在内部对新线程进行计算。

在Rust中,C库libpthread库负责处理大多数这种复杂工作。

然后在某个时刻,操作系统在新栈上执行上下文切换,线程运行。当线程终止时,所有工作都需要再次撤消。

当使用线程池时,仅有池里的线程需要初始化开销,后续作业只是简单的上下文切换到池中的线程。

那么如何构建线程池?

有许多策略和权衡,但对于练习项目,您只需将使用一个共享队列将工作分配给空闲线程即可。这意味着您的“生产者”,即接受网络连接的线程,将作业发送到这个队列(或通道);而“消费者”,即池中的每个空闲线程,从该队列(通道)读取等待作业执行。这是最简单的工作调度策略,而且可能非常高效。那么,这种方法有什么缺点?

这里有三个重要因素需要考虑:

  1. 使用哪种数据结构来分配工作 — 应是一个队列,且应有一个发送者(“生产者”)负责监听TCP连接,同时应用许多接收者(“消费者”)即池中的线程。
  2. 如何处理panic的作业 — 线程池将运行任意作业。如果一个线程发生panic,线程池需要以某种方式恢复。
  3. 如何应对退出 — 即当ThreadPool对象超出作用域时,它需要关闭每个线程,一定不能不管这些线程。

这些问题都是相互交织的,因为每一个问题都可能涉及线程间的通信和同步。有些解决方案会很简单,每个问题的解决方案都可以优雅地协同工作;有些解决方案会很复杂,这些问题的解决方案可能相互独立独立且相互交织。仔细选择您的数据结构,聪明的运用这些数据结构的特点。

您将通过在某些并发队列类型上发送消息来分发工作(Rust中的并发队列通常是具有两种连接类型的数据结构:发送者类和接收者类;任何实现了Send + 'static的类型都可以在这两个类之间传递)。

Rust中的消息通常表示为枚举类型,每个可能发送的消息都有相应变量,例如:

1
2
3
4
enum ThreadPoolMessage {
RunJob(Box<dyn FnOnce() + Send + 'static>),
Shutdown,
}

这往往是一种更简单、更有效的解决方案,而不是为了不同目的而试图“兼顾”多个通道。当然,如果只有一种类型的消息,则不需要枚举。现在,上面的示例并不一定是管理线程池所需消息集的全集,这具体取决于设计。比如,如果您的队列返回值表明发送方已被销毁,则通常可以隐式shutdown。

现在有许多种的多线程队列。在Rust中最常见的是mpsc通道,就包含在Rust标准库中。这是一个多生产者、单消费者队列,因此将其用于单队列线程池将需要某种锁。在这里使用锁有什么缺点?Rust中还有许多其他并发队列类型,每种都有优缺点。如果您愿意同时锁定生产者和消费者,那么您甚至可以使用Mutex<VecDeque>,不过如果存在更好的解决方案,就没有理由在生产中这么做了。

历史趣事:Rust标准库中包含通道这件事有些奇怪,并且被一些人认为是一个错误,因为它背离了Rust的理念 — 保持最小化标准库、专注抽象操作系统,并让crate生态提供高级数据结构。它们的存在是Rust开发史中刻意为之的,可能来源于Go这样消息传递语言。其他库(例如crossbeam)提供了更复杂的替代方案,为不同场景提供更合适的选项😉。

您的线程池将需要处理作业函数产生panic的情况 — 放任panic销毁线程可能会使池中的线程快速耗尽。因此,如果池中的某个线程panic,您需要确保线程总数不会减少。那该怎么办?您至少有两个选择:当有线程销毁时立刻新建另一个线程,或者捕获panic并保持现有线程运行。这需要您做出权衡并选择其中一种方式,在您的代码中注释您的选择。

您可能会用到的工具有:thread::spawnthread::panickingcatch_unwindmpsc通道、Mutexcrossbeam的MPMC通道threadJoinHandle。按照您的需求选择合适的工具。

创建SharedQueueThreadPool类,实现ThreadPool

需要完成的测试用例:

  • shared_queue_thread_pool_*

KvsServer中用到的NaiveThreadPool替换为SharedQueueThreadPool。同样,您的kvs-server应该仍然像以前一样工作,只不过这次的多线程模型更高效一些。您应当使用恰当的数量构造线程池,这里推荐使用num_cpus crate,以为每个CPU创建一个线程。稍后我们将再次讨论线程个数。


  • 利用crossbeam::channel创建一对tx/rx(发送者/接收者),tx用于在线程池的spawn中向channel发送作业,rx负责接收并执行这些作业;
  • 我们目前的策略为放任线程panic,并立刻创建新线程。要实现这一策略需要将rx封装在新类TaskReceiver中,然后控制该类的Drop行为,当发生thread::panicking()时,克隆本TaskReceiver实例并放入新建的线程中以继续接收新的作业;
  • 线程的执行函数,只需使用loop持续等待作业并执行作业中的函数即可。
thread_pool/shared_queue.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
use std::thread;
use crossbeam::channel::{self, Sender, Receiver};
use log::{debug, error};

use super::ThreadPool;

use crate::Result;

// Note for Rust training course: the thread pool is not implemented using
// `catch_unwind` because it would require the task to be `UnwindSafe`.

/// A thread pool using a shared queue inside.
///
/// If a spawned task panics, the old thread will be destroyed and a new one will be
/// created. It fails silently when any failure to create the thread at the OS level
/// is captured after the thread pool is created. So, the thread number in the pool
/// can decrease to zero, then spawning a task to the thread pool will panic.
pub struct SharedQueueThreadPool {
tx: Sender<Box<dyn FnOnce() + Send + 'static>>,
}

impl ThreadPool for SharedQueueThreadPool {
fn new(threads: u32) -> Result<Self> {
let (tx, rx) = channel::unbounded::<Box<dyn FnOnce() + Send + 'static>>();
for _ in 0..threads {
let rx = TaskReceiver(rx.clone());
thread::Builder::new().spawn(|| run_task(rx))?;
}
Ok(SharedQueueThreadPool { tx })
}

/// Spawns a function into the thread pool.
///
/// # Panics
///
/// Panics if the thread pool has no thread.
fn spawn<F>(&self, job: F)
where
F: FnOnce() + Send + 'static,
{
self.tx
.send(Box::new(job))
.expect("The thread pool has no thread.");
}
}


#[derive(Clone)]
struct TaskReceiver(Receiver<Box<dyn FnOnce() + Send + 'static>>);

impl Drop for TaskReceiver {
fn drop(&mut self) {
if thread::panicking() {
let rx = self.clone();
if let Err(e) = thread::Builder::new().spawn(move || run_task(rx)) {
error!("Failed to spawn a thread: {}", e);
}
}
}
}

fn run_task(rx: TaskReceiver) {
loop {
match rx.0.recv() {
Ok(task) => {
task();
},
Err(_) => debug!("Thread exits because the thread pool destoryed. "),
}
}
}

第5部分:抽象线程池

与在之前的项目中抽象出KvsEngine来比较不同的实现一样,现在您应抽象出ThreadPool来做类似的事。

如果您还没有这么做,请向KvsServer添加第二个类型参数以表示ThreadPool实现,构造函数以线程池作为第二个参数,并使用该线程池来分发作业。

最后,使用rayon crate中的ThreadPool创建另一个ThreadPool,以实现RayonThreadPool

Rayon的线程池使用一种称为“工作窃取”的更复杂的调度策略,预计它的性能会比我们的实现更好,不过在我们尝试之前谁知道呢!

thread_pool/rayon.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
use super::ThreadPool;

use crate::{Result, KvsError};

/// Wrapper of rayon::ThreadPool
pub struct RayonThreadPool{
pool: rayon::ThreadPool,
}

impl ThreadPool for RayonThreadPool {
fn new(threads: u32) -> Result<Self> {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(threads as usize)
.build()
.map_err(|e| KvsError::StringError(format!("{}", e)))?;
Ok(RayonThreadPool{pool})
}

fn spawn<F>(&self, job: F)
where
F: FnOnce() + Send + 'static,
{
self.pool.spawn(job)
}
}

第6部分:评估您的线程池

现在您将编写6个基准测试,一个写入繁重的工作负载比较SharedQueueThreadPool在不同线程数下的性能,一个读取繁重的工作负载比较SharedQueueThreadPool在不同线程数下的性能;再写两个测试,类似前面两个只不过用来测试RayonThreadPool;最后还有两个将RayonThreadPoolSledKvsEngine结合使用。

看上去要写6个其实并没有那么多 — 因为后四个基本上是复制前两个。

注意:接下来的两节描述了一组相当复杂的基准测试。它们确实可以实现出来(可能……还没有人写过),但有效理解和高效编写都比较难。两节确实介绍了一些有用的criterion特性,但如果对你来说内容过于庞杂,可以选择跳过(也可以告诉我们有哪些内容不适合你)。不过,这里的困难可能是一个很好的学习机会。最后,实现这些基准测试需要实现以编程方式关闭KvsServer的方法(即不发送SIGKILL并让操作系统执行退出),我们之前还没有讨论过。

其中一部分工作就是让将前一个项目中的SledKvsEngine在本项目的多线程环境中再次工作。这应该不难,因为sled可以被克隆并在线程间发送,就像您编写的引擎一样。

希望结果会很有趣。

您将再次使用criterion

这些将是参数化基准测试,即使用不同参数多次运行的单个测试,而criterion支持使用参数作为基准测试的输入。这里的基准测试参数将是线程池中的线程数。

您将尝试测试服务器在各种条件下的吞吐量。您将同时发送许多请求,等待响应,然后结束。您可能会好奇,服务器CPU数量与线程总数的关系,是如何影响吞吐量的;您的线程池与rayon相比性能如何;以及在多线程环境下您的KvStoreSledKvsEngine的比较。

由于您的KvsClient(可能会)被阻塞,即请求后等待响应,这会使测试变的复杂。如果是非阻塞的,那么您可以发送许多请求而无需等待响应,然后再收集响应。而使用阻塞的KvsClient,您将需要在独立的线程中发送每个请求,以使服务饱和。

在进行基准测试时,一定要清楚您希望评估哪些代码,并尽可能只去测试那一部分代码。像criterion这样的基准测试库在一个循环中多次运行一段代码,测量它通过每个循环所花费的时间。因此,应当只将您想要评估的代码放入循环中,并将无关代码尽可能的留在循环外。

因此,以这个带有输入的简单criterion为例:

1
2
3
4
5
6
7
8
let c = Criterion::default();
let inputs = &[1, 2, 3, 4, 5];

c.bench_function_over_inputs("example", |b, &&num| {
b.iter(|| {
// important measured work goes here
});
}, inputs);

iter多次调用您的闭包,测量每次迭代。但是如此就需要事先设置大量线程,您并不希望这么做。如果只需要设置一次,就能够在多次迭代中使用,则应该将设置过程放在闭包之外,例如:

1
2
3
4
5
6
7
8
9
let c = Criterion::default();
let inputs = &[1, 2, 3, 4, 5];

c.bench_function_over_inputs("example", |b, &&num| {
// do setup here
b.iter(|| {
// important measured work goes here
});
}, inputs);

只评估b.iter闭包中的代码,其余的环境设置代码都放在前面。

如果设置无法放在循环之前,那么另一种策略是使设置占用的工作量小于想要评估的代码的工作量,例如添加循环。也要考虑基准测试中的“析构”部分,通常指运行drop的成本。

如果您有一个阻塞客户端,则客户端将需要许多线程,而在执行循环之前,您只有一次机会创建这些线程。因此,在基准测试执行迭代前,您需要设置一堆可复用的线程。幸运的是,SharedQueueThreadPool就是一个很好的工具。为每个请求设置一个线程,并将其与某个通道配对,以报告收到响应,这就成了一个合适的基准测试工具。

现在开始编写前两个基准测试

前面提到这是一个参数化基准测试,参数就是服务器线程池中要使用的CPU核数。我们想看看只有1个、2个、4个等每个偶数一直到CPU核数的2倍时,吞吐量都是什么表现。至于为什么是2倍,也许拥有比内核更多的线程可能会有好处,您将通过实验来发现。

对于密集写入的作业,在环境设置期间(即调用b.iter(...)之前)先创建KvsServer<KvStore, SharedQueueThreadPool>,线程池使用参数化的线程数。然后编写一个作业,为1000个等长的键设置相同的值。请注意,尽管键不同,但为了测试结果的一致性,应在每个循环中使用同一套键。

然后在每次线程写入键和值后,也应该assert!调用成功(以确保作业执行中没有出错),从而表明作业成功完成。当所有线程都完成后,基准测试线程继续运行并结束迭代。实现这种信号式结束的最直观方法是让每个作业线程将消息发送回基准测试线程,但请记住,这些信号代码是与您希望评估的代码毫无关系的开销,因此它的工作量应尽可能少。您可以只使用一条消息,或者使用其他并发类型,仅向基准测试线程发出一次信号吗?

将此基准测试命名为write_queued_kvstore(或其他)。

对于密集读取作业,在环境设置期间先创建KvServer<KvStore, SharedQueueThreadPool>,线程池使用参数化的线程数。然后编写保护1000个线程的客户端线程池。仍然在环境设置阶段,创建另一个客户端并初始化1000个不同的等长键,并全部使用相同的值。

然后,在基准测试循环中,为客户端生成1000个检索相同键/值解析的作业,然后assert!结果是正确的。最后,像以前一样,向基准测试线程发送一条消息,表示读取已完成。

将此基准测试命名为read_queued_kvstore(或其他)。

的确有不少工作要做。

您可以像往常一样使用cargo bench运行这组criterion基准测试。

只不过这次您有更多工作要完成。由于您将要在多个参数上执行相同的基准测试,即以线程池中的线程数为参数,如果能将这些结果体现在一个漂亮的图表中,以看到不同线程数的影响,将会使测试结果更加直观。

恰好criterion就有这个功能!

请再次并阅读有关使用参数作为基准测试的输入的内容。文章解释了如何制作输入的基准测试图标。您注意到了什么?当您的线程数接近服务器CPU核数时会发生什么?当线程数超过服务器的线程数时会发生什么?您认为是什么导致了测试结果中的趋势?结果取决于许多因素,因此您的结果可能与其他任何人都不同。

这是始终进行基准测试而不非推测性能的一个很好的理由。我们可以做出有根据的猜测,但直到我们测试才会知道结果。

第7部分:评估其他线程池和引擎

到这里,您已经解决了基准测试练习中最困难的部分。现在您只需在之前的基础上做更多配置性工作即可。

拿您之前写的那两个基准,然后复制粘贴三遍。将这些副本中的SharedQueueThreadPool更改为RayonThreadPool

将第三个和第四个命名为read/write_rayon_kvstore(或其他)。这两个将与前两个SharedQueueThreadPool实现进行比较,以了解您的实现与RayonThreadPool之间的区别。

第五个和第六个,命名为read/write_rayon_sledkvengine,将引擎改成SledKvsEngine。这些您将与前两个进行比较,以了解您的KvsEngine与多线程环境中的sled有什么区别。

和以前一样,运行并绘制所有这些基准测试。如上所述将它们相互比较,在各种线程数下,您的调度与rayon相比如何?在各种线程数下,您的存储引擎与sled相比如何?结果令人惊讶吗?你能想象为什么存在差异吗?

扩展1:比较函数

现在,您为三个不同的线程池执行了相同的基准测试,您运行了也比较了它们的性能。criterion内置支持比较多个实现。查看Criterion用户手册中的“比较函数”并修改您的基准测试,以便让criterion自己进行比较,看看那些华丽的图表。

背景:锁的极限

在本项目前期,我们建议通过将KvsEngine内部数据放在堆上并保护在锁之后来保证其线程安全。您可能立即意识到这不会提高吞吐量,因为它只是将一种阻塞换成了另一种阻塞 — 将原来的阻塞磁盘访问换成了现在的阻塞互斥访问。

所以到目前为止,我们所取得的成就是:

1
2
3
4
5
6
7
8
9
10
11
thread
+ +--------+
T1 | | R1 |
| +-----------------+
T2 | | R2 |
| +-----------------+
T3 | | W1 |
| +-----------------+
T4 | | W2 |
+ +--------+
--> read/write reqs over time -->

在上一节中,您对您的引擎与SledKvsEngine的多线程吞吐量进行了基准测试。希望您已经发现,您的多线程实现比sled的性能要差得多(如果不是,要么是您的实现非常棒,要么是sled出了什么问题)。到目前为止,添加多线程导致的性能比单线程实现更差 — 现在您的实现还需要执行线程间上下文切换的额外工作,以及为了保证互斥锁而强加的阻塞。

因此,对于项目的这一部分将变得更加复杂。用锁保护整个状态很容易 — 整个状态总是以原子方式读写,于是一次只有一个客户端可以访问整个状态。但这也意味着想要访问共享状态的两个线程必须互相等待。换句话说,当KvsEngine受互斥锁保护时,尽管是多线程的,但服务器中的实际并发量非常少。

高性能、可扩展、并行的软件倾向于尽可能避免锁和锁争用。与大多数语言相比,Rust使复杂且高性能的并发模式变的更容易(因为您无需担心数据竞争和程序崩溃),但它并不能阻止您写出可能导致错误程序行为的逻辑bug。

所以你仍然需要对并发进行一些认真的思考。幸运的是,Rust crate生态中有许多复杂的并行编程工具,因此您的任务通常只是了解它们是什么以及如何将它们组合在一起,而不是了解如何编写自己的复杂无锁数据结构。

让我们看一些更复杂的例子。我们将以单线程KvStore为例,并考虑如何将其改为线程安全的。

这是一个单线程KvStore示例,就像您在早期项目中创编写的那样(这是课程示例项目中的简化版本):

1
2
3
4
5
6
7
8
9
10
11
12
13
pub struct KvStore {
/// Directory for the log and other data
path: PathBuf,
/// The log reader
reader: BufReaderWithPos<File>,
/// The log writer
writer: BufWriterWithPos<File>,
/// The in-memory index from key to log pointer
index: BTreeMap<String, CommandPos>,
/// The number of bytes representing "stale" commands that could be
/// deleted during a compaction
uncompacted: u64,
}

而这是简单的多线程版本,用锁保护一切。希望您的实现看起来已经类似这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#[derive(Clone)]
pub struct KvStore(Arc<Mutex<SharedKvStore>>);

#[derive(Clone)]
pub struct SharedKvStore {
/// Directory for the log and other data
path: PathBuf,
/// The log reader
reader: BufReaderWithPos<File>,
/// The log writer
writer: BufWriterWithPos<File>,
/// The in-memory index from key to log pointer
index: BTreeMap<String, CommandPos>,
/// The number of bytes representing "stale" commands that could be
/// deleted during a compaction
uncompacted: u64,
}

如此Arc<Mutex<T>>的解决方案,简单、正确且常见:

  • Arc将值放在堆上,以便它可以在线程之间共享,并提供一个clone方法来为每个线程创建一个“句柄”;
  • Mutex提供了一种在不使用&mut引用的情况下,获得该值的写访问权限的方法。

对于许多情况,这是一个完全合理的解决方案。但在本项目中,互斥锁将成为锁争用的源头:互斥锁不仅会串行化SharedKvStore的写访问,还会串行化读访问。任何想要使用KvStore的线程都需要等待Mutex被另一个线程解锁。所有请求都会阻塞任何其他并发请求。

我们真正想要的是不必使用锁,或者 — 如果确实需要锁的 — 它们尽量少的与其他线程竞争。

互斥锁的进阶是RwLock,即“读写锁”。这是每个并行软件开发者都必须知道的另一种常见锁。读写锁对互斥体的改进是它允许任意数量的读取,或单个写入。即用Rust术语,RwLock可以同时支持任意数量的&引用,或单个&mut引用。读会被写阻塞,写会阻塞其他所有读和写。

在我们的数据库中,这意味着可以同时满足所有读取请求,但是当单个写入请求进入时,系统中的所有其他活动都会停止并等待该写操作完成。实现这一点很简单,基本上就是将Mutex换成RwLock

现在,再次考虑我们的多线程示意图,最终的处理流程如下所示:

1
2
3
4
5
6
7
8
9
10
11
thread
+ +--------+
T1 | | R1 |
| +--------+
T2 | | R2 |
| +-----------------+
T3 | | W1 |
| +-----------------+
T4 | | W2 |
+ +--------+
--> read/write reqs over time -->

第8部分:无锁读

对于本项目,我们想要尝试去创建永远不会锁定的读取器,即发生并发写入也依然不会阻塞读取。无论写入请求如何,都可以始终为读取请求提供服务。(写如现在仍然会阻塞其他写操作 — 除了成为一项具有挑战性的并行编程问题,并行写入本身是否有意义仍是一个难以回答的问题)。

我们最终的期望是:

1
2
3
4
5
6
7
8
9
10
11
thread
+ +--------+
T1 | | R1 |
| +--------+
T2 | | R2 |
| +--------+
T3 | | W1 |
| +-----------------+
T4 | | W2 |
+ +--------+
--> read/write reqs over time -->

如果能做到这一点,那么我们将实现无锁的读操作:即使有读操作在等待来自文件系统的数据(被阻塞),所有类型的其他操作(读和写)都可以继续进行。不过,这仍然不足以保证系统始终可以为读请求提供服务。想想如果在大小为N的线程池上存在N个阻塞的写请求会发生什么。稍后你需要考虑这个问题,但现在,您的重点是从读操作中移除锁。

MutexRwLock不同,并没有一种封装类型可以应用于整个任意共享状态,以实现同时读取和写入的目标(至少不能同时还具有高性能)。

这意味着我们需要考虑如何使用SharedKvStore的每个字段,选择正确的同步方案以允许执行尽可能多的线程,并能够继续保持数据的逻辑一致性。

这就是多线程真正困难的地方。如果你移除那个大锁,Rust仍然会保护您免受数据竞争的影响,但它并不能帮助您保持数据的逻辑一致性。

所以在考虑解决方案之前,让我们考虑一下我们的需求。我们要:

  • 同时在多个线程上读取内存索引和磁盘日志;
  • 将命令写入磁盘,同时维护内存索引;
  • 读与写并行,因此:
  • 一般来说,为了保证读与写并发时总是能够读到一致的状态,这意味着:
    • 维护一个不变量,使其实在指向日志中的一个有效命令;
    • 维护一些适当的不变量以供其他的记录行为使用,比如为了下例uncompacted的记录;
  • 定期压缩磁盘数据,同时为读操作维护不变量;

本节的其余部分是对有助于实现上述目标的各种背景知识的介绍,也这是项目的最终目标:修改KvStore以同时执行读取和写入。

示例数据结构的解释

为了更具体的讨论,我们将需要一个示例,以展示应受保护的数据和应被维护的不变量。下面是一个KvStore实现及其字段的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
pub struct KvStore {
/// Directory for the log and other data
path: PathBuf,
/// The log reader
reader: BufReaderWithPos<File>,
/// The log writer
writer: BufWriterWithPos<File>,
/// The in-memory index from key to log pointer
index: BTreeMap<String, CommandPos>,
/// The number of bytes representing "stale" commands that could be
/// deleted during a compaction
uncompacted: u64,
}

这是本项目示例的简化版本。

各字段的作用非常明确:

pathPathBuf只是日志的目录路径,永远不会改变 — 它是不可变的,不可变的类型在Rust中是同步的,所以它甚至根本不需要任何保护。它可以被任何线程通过共享引用同时读取。

readerHashMap<u64, BufReaderWithPos<File>>是当前日志文件的读句柄。它是可变的,即压缩后会指向新的日志文件。

writerBufWriterWithPos<File>是当前日志文件的写句柄。任何写入都需要对writer的可变访问,并且压缩过程需要修改writercurrent_gen

indexBTreeMap<String, CommandPos>是数据库中每个键到其所在日志文件中具体位置的内存索引。它从可以从每个读线程读取,从每个写线程写入,即使在压缩期间也能正常访问。

uncompactedu64仅用于计算日志中已被后续写入命令取代的“陈旧”命令的总长度,用以触发日志压缩操作。

在以前的项目中,我们不必担心写、读和压缩之间的交叉会产生不一致的结果,因为它们都发生在同一个线程上。现在,如果您对数据结构的选择和使用不够小心,很容易破坏数据库的状态。

消除锁的策略

高级并行编程的关键是了解可用的工具以及使用它们的时机。以下是我们在实施此项目时发现的一些有用的技术,其中一些您也将用到。这些技术将以上面的数据结构为例进行讲解。

理解并维护顺序一致性

(请注意,“顺序一致性”一词有其准确含义,但在这里我们只是概括性地讨论如何确保以特定顺序执行作业)。

理解并行编程的关键在于理解代码各部分的“执行顺序”间的关系。在这个线程中,要让本线程早于其他线程看到数据,共享数据结构需要怎样改动?要让本线程早于其他线程暴露内部数据,共享数据结构需要怎样改动?我如何保证执行结果?

在单线程代码中,推断出任意行代码之前发生了什么很简单 — 如果代码写在前面,则会在前面执行,反之则会在后面执行。但这实际上并不是这样,即使在单线程代码中:为了使代码更高效的运行,CPU和编译器都会重新组织代码执行的顺序,只不过CPU使用器机码,编译器使用用以生成机器码的内部编码。实际上的代码执行顺序跟您编写代码的顺序并不相同,代码只是看起来在按您编写的顺序运行,因为CPU和编译器都会跟踪数据依赖,它们并不会打乱依赖顺序重新排列操作。

在多线程代码中,CPU和编译器使用与单线程相同的条件重新排序代码,而且您的代码块会被打碎重排,除非您使用同步类型和操作告诉编译器不允许重新排序。

任何必须在特定操作之前或之后发生的操作,都必须显式使用同步类型或操作,如使用锁、原子类型等等。

在上面的例子中,文件写入和内存索引写入显然应以特定的顺序发生 — 如果索引在文件更新前更新会发生什么?另外,例子中包含另一个状态,即未压缩命令的总长度uncompacted。错误的计算未压缩命令的长度将产生什么影响?如果在将数据写入文件之前就可以看到uncompacted发生了变化可能还好,但是必须为每个这样的独立同步值制定策略。

确定不可变值

你可能已经了解了很多关于Rust中的不可变性,以及不可变值如何轻松的在线程间共享(它们具有Sync trait)。不可变值最适合并发 — 只需将它们放在Arc后面即可。

在本例中,PathBuf是不可变的。

复制值而不是复制共享

在Rust中我们可能不太喜欢克隆,特别是克隆大小不确定的类型,如StringVec。但是克隆的存在是完全合理的:在某些情况下避免克隆可能非常困难,而且CPU也非常擅长复制内存缓冲区。此外,在本例中,服务端所需的状态副本的数量,实际上受线程池中线程数量的限制。

在本例中,PathBuf也很容易克隆。

考虑一个潜在问题,如何跨线程共享对文件的访问。File类型对读和写操作均需要可变访问。因此,其跨线程共享需要使用锁以授权可变访问。那么,什么是文件?它并不是一份实际上的文件 — 而只是磁盘上物理资源的句柄,一个文件同时存在多个打开句柄是可以的。注意File的API — 并没有实现Clone,虽然它确实有个诱人的try_clone方法,它的语义对多线程代码会产生复杂的影响,如seek原文件的位置会同步到try_clone创建的另一个文件。请考虑File::opentry_clone中文件之间的区别。使用try_clone还是File::open,将是您的选择。查看pread可能会有所帮助。

按角色分解数据结构

在例子中,我们有两个明确的角色:读和写(也许还有第三个用于压缩)。将读逻辑和写逻辑分离并各封装为一个并发类型在Rust中很常见。读逻辑有自己的数据集,写逻辑也一样,这就为封装提供了很好的条件,所有读操作划为一种类型,而所有写操作划为另一种类型。

这种划分将进一步使两者都访问哪些资源变得非常明显,因为读类型和写类型都将包含这些资源的共享句柄。

使用专门的并发数据结构

知道哪些工具可用以及该在哪些场景中使用可能是并行编程中最困难的部分。除了学校里教的基础锁类型,同步数据类型也变得越来越专业化。

在本项目中,由于内存索引是某种类型的关联数据结构(也称为“映射”),如树或哈希表,那么我们自然会想到是否存在并发关联数据结构。

确实存在,且正确的使用这些数据结构是完成本项目的关键。

但是怎么才能找到这些类型呢?第一步是确定是否存在并发映射。您可以阅读Rust Discord上的#beginners部分,但对于本项目,在网络上搜”concurrent map”即可。

这是容易的部分,在Rust中找到正确的并发映射类型则更困难。比较好的起手式是访问libs.rs。libs.rs与crates.io类似,但crates.io包含所有已发布的库,而libs.rs仅包含受到……好吧,某些人好评的库。因此,如果一个库在libs.rs上,就是库可用的一个指示,另一个是crates.io上的下载计数 — 通常,下载越多的crates被测试的越细。下载计数可以粗略的看作是“担保”库的人数。最后,在聊天中提问也是一个好主意。

将清理延后

像克隆一样,垃圾收集在Rust中经常遭到反对 — 避免GC几乎就是Rust存的意义。但实际上垃圾收集是不可避免的,“垃圾收集”和“内存回收”实际上是同义词,各种语言都复合使用了多种垃圾收集策略。GC策略轴的一端,在没有自动内存管理的语言(如C)中,垃圾收集完全由程序员决定,如使用mallocfree。另一端则是垃圾收集语言,如 Java,所有内存都由一个通用垃圾收集器管理。

但实际上,在C中并不是所有内存的管理和回收都使用malloc/free,在Java也并不是所有内存管理都通过GC完成。举个简单的例子,两者中的高性能应用程序通常都依赖于专门的内存区域,在这些区域既可以重用内存也可以大量解除分配,以优化其内存访问模式。

同样在Rust中,并非所有内存都被显式地释放。比如在RcArc类型中实现了资源计数,也算算算一种简单的GC。

全局垃圾收集器的最大好处之一,就是使许多无锁数据结构成为可能。学术文献中描述的许多无锁数据结构都依赖于GC执行。crossbeam库及其epoch类型的出现,就是在不依赖GC的情况下实现无锁算法。

也就是说垃圾收集有多种形式,其“将资源清理延迟到未来某个时间”的基本策略在许多场景中都很强大。

当您现在不知道如何执行一些并发工作时,可以试着考虑:“可以稍后再做吗?”。

与原子类型共享标志和计数器

在底层,大多数并发数据结构都是使用原子操作或“原子类型”实现的。原子类型在单个内存单元上运行,一般在8到128个字节之间,通常是字长(与指针的字节数相同,并且与Rustusize类型大小相同)。如果两个线程正确使用原子类型,则一个线程中的写入结果将立即对另一个线程中的读取可见。除了使读取或写入立即可见之外,在Rust中,原子操作还通过Ordering标志限制编译器和CPU重新排序指令的方式。

当从粗粒度并行中的锁,转向更细粒度的并行时,通常需要使用原子类型来增强现成的并发数据结构。

实现无锁读

以上就是相关的背景知识。希望这些知识已经开始引导您考虑了很多东西,并朝着正确的方向前进了。现在轮到实现了:

修改KvStore以同时执行读取和写入。


engines/kvs.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::fs::{self, File, OpenOptions};
use std::io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write};
use std::ops::Range;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};

use crossbeam_skiplist::SkipMap;
use log::error;
use serde::{Deserialize, Serialize};
use serde_json::Deserializer;
use crate::{KvsError, Result, KvsEngine};
use std::ffi::OsStr;

const COMPACTION_THRESHOLD: u64 = 1024 * 1024;

/// The `KvStore` stores string key/value pairs.
///
/// Key/value pairs are persisted to disk in log files. Log files are named after
/// monotonically increasing generation numbers with a `log` extension name.
/// A `BTreeMap` in memory stores the keys and the value locations for fast query.
///
/// ```rust
/// # use kvs::{KvStore, Result};
/// # fn try_main() -> Result<()> {
/// use std::env::current_dir;
/// use kvs::KvsEngine;
/// let mut store = KvStore::open(current_dir()?)?;
/// store.set("key".to_owned(), "value".to_owned())?;
/// let val = store.get("key".to_owned())?;
/// assert_eq!(val, Some("value".to_owned()));
/// # Ok(())
/// # }
/// ```
#[derive(Clone)]
pub struct KvStore {
// map generation number to the file reader
index: Arc<SkipMap<String, CommandPos>>,
reader: KvStoreReader,
// writer of the current log.
writer: Arc<Mutex<KvStoreWriter>>,
// the number of bytes representing "stale" commands that could be
// deleted during a compaction.
}

impl KvStore {
/// Opens a `KvStore` with the given path.
///
/// This will create a new directory if the given one does not exist.
///
/// # Errors
///
/// It propagates I/O or deserialization errors during the log replay.
pub fn open(path: impl Into<PathBuf>) -> Result<KvStore> {
let path = Arc::new(path.into());
fs::create_dir_all(&*path)?;

let mut readers = BTreeMap::new();
let index = Arc::new(SkipMap::new());

let gen_list = sorted_gen_list(&path)?;
let mut uncompacted = 0;

for &gen in &gen_list {
let mut reader = BufReaderWithPos::new(File::open(log_path(&path, gen))?)?;
uncompacted += load(gen, &mut reader, &*index)?;
readers.insert(gen, reader);
}

let current_gen = gen_list.last().unwrap_or(&0) + 1;
let writer = new_log_file(&path, current_gen)?;
let safe_point = Arc::new(AtomicU64::new(0));

let reader = KvStoreReader {
path: Arc::clone(&path),
safe_point,
readers: RefCell::new(readers),
};

let writer = KvStoreWriter {
reader: reader.clone(),
writer,
current_gen,
uncompacted,
path: Arc::clone(&path),
index: Arc::clone(&index),
};

Ok(KvStore {
reader,
index,
writer: Arc::new(Mutex::new(writer)),
})
}
}

impl KvsEngine for KvStore {
/// Sets the value of a string key to a string.
///
/// If the key already exists, the previous value will be overwritten.
///
/// # Errors
///
/// It propagates I/O or serialization errors during writing the log.
fn set(&self, key: String, value: String) -> Result<()> {
self.writer.lock().unwrap().set(key, value)
}

/// Gets the string value of a given string key.
///
/// Returns `None` if the given key does not exist.
///
/// # Errors
///
/// It returns `KvsError::UnexpectedCommandType` if the given command type unexpected.
fn get(&self, key: String) -> Result<Option<String>> {
if let Some(cmd_pos) = self.index.get(&key) {
if let Command::Set { value, .. } = self.reader.read_command(*cmd_pos.value())? {
Ok(Some(value))
} else {
Err(KvsError::UnexpectedCommandType)
}
} else {
Ok(None)
}
}

/// Removes a given key.
///
/// # Errors
///
/// It returns `KvsError::KeyNotFound` if the given key is not found.
///
/// It propagates I/O or serialization errors during writing the log.
fn remove(&self, key: String) -> Result<()> {
self.writer.lock().unwrap().remove(key)
}

}

/// A single thread reader.
///
/// Each `KvStore` instance has its own `KvStoreReader` and
/// `KvStoreReader`s open the same file separately. So the user
/// can read concurrently through multiple `KvStore`s in different
/// threads.
struct KvStoreReader {
path: Arc<PathBuf>,
// generation of the latest compaction file
safe_point: Arc<AtomicU64>,
readers: RefCell<BTreeMap<u64, BufReaderWithPos<File>>>,
}

impl KvStoreReader {
/// Close file handles with generation number less than safe_point.
///
/// `safe_point` is updated to the latest compaction gen after a compaction finished.
/// The compaction generation contains the sum of all operations before it and the
/// in-memory index contains no entries with generation number less than safe_point.
/// So we can safely close those file handles and the stales file can be deleted.
fn close_stale_handles(&self) {
let mut readers = self.readers.borrow_mut();
while !readers.is_empty() {
let first_gen = *readers.keys().next().unwrap();
if self.safe_point.load(Ordering::SeqCst) <= first_gen {
break;
}
readers.remove(&first_gen);
}
}

/// Read the log file at the given `CommandPos`.
fn read_and<F, R>(&self, cmd_pos: CommandPos, f: F) -> Result<R>
where
F: FnOnce(io::Take<&mut BufReaderWithPos<File>>) -> Result<R>,
{
self.close_stale_handles();

let mut readers = self.readers.borrow_mut();
// Open the file if we haven't open it in this `KvStoreReader`.
// We don't use entry API here because we want the errors to be propogated.
if !readers.contains_key(&cmd_pos.gen) {
let reader = BufReaderWithPos::new(File::open(log_path(&self.path, cmd_pos.gen))?)?;
readers.insert(cmd_pos.gen, reader);
}
let reader = readers.get_mut(&cmd_pos.gen).unwrap();
reader.seek(SeekFrom::Start(cmd_pos.pos))?;
let cmd_reader = reader.take(cmd_pos.len);
f(cmd_reader)
}

// Read the log file at the given `CommandPos` and deserialize it to `Command`.
fn read_command(&self, cmd_pos: CommandPos) -> Result<Command> {
self.read_and(cmd_pos, |cmd_reader| {
Ok(serde_json::from_reader(cmd_reader)?)
})
}
}

impl Clone for KvStoreReader {
fn clone(&self) -> Self {
KvStoreReader {
path: Arc::clone(&self.path),
safe_point: Arc::clone(&self.safe_point),
// don't use other KvStoreReader's readers
readers: RefCell::new(BTreeMap::new()),
}
}
}

struct KvStoreWriter {
reader: KvStoreReader,
writer: BufWriterWithPos<File>,
current_gen: u64,
// the number of bytes representing "stale" commands that could be
// delete during a compaction
uncompacted: u64,
path: Arc<PathBuf>,
index: Arc<SkipMap<String, CommandPos>>
}

impl KvStoreWriter {
fn set(&mut self, key: String, value: String) -> Result<()>{
let cmd = Command::set(key, value);
let pos = self.writer.pos;
serde_json::to_writer(&mut self.writer, &cmd)?;
self.writer.flush()?;
if let Command::Set { key, .. } = cmd {
if let Some(old_cmd) = self.index.get(&key) {
self.uncompacted += old_cmd.value().len;
}
self.index
.insert(key, (self.current_gen, pos..self.writer.pos).into());
}

if self.uncompacted > COMPACTION_THRESHOLD {
self.compact()?;
}
Ok(())
}

fn remove(&mut self, key: String) -> Result<()> {
if self.index.contains_key(&key) {
let cmd = Command::remove(key);
let pos = self.writer.pos;
serde_json::to_writer(&mut self.writer, &cmd)?;
self.writer.flush()?;
if let Command::Remove { key } = cmd {
let old_cmd = self.index.remove(&key).expect("key not found");
self.uncompacted += old_cmd.value().len;
// the "remove" command itself can be deleted in the next compaction
// so we add it's length to `uncompacted`
self.uncompacted += self.writer.pos - pos;
}

if self.uncompacted > COMPACTION_THRESHOLD {
self.compact()?;
}
Ok(())
} else {
Err(KvsError::KeyNotFound)
}
}

/// Clear stale entries in log.
fn compact(&mut self) -> Result<()> {
// increase current gen by 2. current_gen + 1 is for compaction file
let compaction_gen = self.current_gen + 1;
self.current_gen += 2;
self.writer = new_log_file(&self.path, self.current_gen)?;

let mut compaction_writer = new_log_file(&self.path, compaction_gen)?;

let mut new_pos = 0;
for entry in self.index.iter() {
let len = self.reader.read_and(*entry.value(), |mut entry_reader| {
Ok(io::copy(&mut entry_reader, &mut compaction_writer)?)
})?;
self.index.insert(
entry.key().clone(),
(compaction_gen, new_pos..new_pos+len).into(),
);
new_pos += len;
}
compaction_writer.flush()?;

self.reader
.safe_point
.store(compaction_gen, Ordering::SeqCst);
self.reader.close_stale_handles();

// remove stale log files.
// Note that actually these files are not deleted immediately because `KvStoreReader`s
// still keep open file handles. When `KvStoreReader` is used next time, it will clear
// its stale file handles. On Unix, the files will be deleted after all the handles
// are closed. On Windows, the deletions below will fail and stale files are expected
// to be deleted in the next compaction.
let stale_gens = sorted_gen_list(&self.path)?
.into_iter()
.filter(|&gen| gen < compaction_gen);
for stale_gen in stale_gens {
let file_path = log_path(&self.path, stale_gen);
if let Err(e) = fs::remove_file(&file_path) {
error!("{:?} cannot be deleted: {}", file_path, e);
}
}
self.uncompacted = 0;

Ok(())
}

}
/// Create a new log file with given generation number.
///
/// Returns the writer to the log.
fn new_log_file(path: &Path, gen: u64) -> Result<BufWriterWithPos<File>> {
let path = log_path(&path, gen);
let writer = BufWriterWithPos::new(
OpenOptions::new()
.create(true)
.write(true)
.append(true)
.open(&path)?,
)?;
Ok(writer)
}

/// Returns sorted generation numbers in the given directory.
fn sorted_gen_list(path: &Path) -> Result<Vec<u64>> {
let mut gen_list: Vec<u64> = fs::read_dir(&path)?
.flat_map(|res| -> Result<_> { Ok(res?.path()) })
.filter(|path| path.is_file() && path.extension() == Some("log".as_ref()))
.flat_map(|path| {
path.file_name()
.and_then(OsStr::to_str)
.map(|s| s.trim_end_matches(".log"))
.map(str::parse::<u64>)
})
.flatten()
.collect();
gen_list.sort_unstable();
Ok(gen_list)
}

/// Load the whole log file and store value locations in the index map.
///
/// Returns how many bytes can be saved after a compaction.
fn load(
gen: u64,
reader: &mut BufReaderWithPos<File>,
index: &SkipMap<String, CommandPos>,
) -> Result<u64> {
// To make sure we read from the beginning of the file.
let mut pos = reader.seek(SeekFrom::Start(0))?;
let mut stream = Deserializer::from_reader(reader).into_iter::<Command>();
let mut uncompacted = 0; // number of bytes that can be saved after a compaction.
while let Some(cmd) = stream.next() {
let new_pos = stream.byte_offset() as u64;
match cmd? {
Command::Set { key, .. } => {
if let Some(old_cmd) = index.get(&key) {
uncompacted += old_cmd.value().len;
}
index.insert(key, (gen, pos..new_pos).into());
}
Command::Remove { key } => {
if let Some(old_cmd) = index.remove(&key) {
uncompacted += old_cmd.value().len;
}
// the "remove" command itself can be deleted in the next compaction
// so we add its length to `uncompacted`
uncompacted += new_pos - pos;
}
}
pos = new_pos;
}
Ok(uncompacted)
}

fn log_path(dir: &Path, gen: u64) -> PathBuf {
dir.join(format!("{}.log", gen))
}

/// Struct representing a command.
#[derive(Serialize, Deserialize, Debug)]
enum Command {
Set { key: String, value: String },
Remove { key: String },
}

impl Command {
fn set(key: String, value: String) -> Command {
Command::Set { key, value }
}

fn remove(key: String) -> Command {
Command::Remove { key }
}
}

#[derive(Debug, Clone, Copy)]
/// Represents the position and length of a json-serialized command in the log.
struct CommandPos {
gen: u64,
pos: u64,
len: u64,
}

impl From<(u64, Range<u64>)> for CommandPos {
fn from((gen, range): (u64, Range<u64>)) -> Self {
CommandPos {
gen,
pos: range.start,
len: range.end - range.start,
}
}
}

struct BufReaderWithPos<R: Read + Seek> {
reader: BufReader<R>,
pos: u64,
}

impl<R: Read + Seek> BufReaderWithPos<R> {
fn new(mut inner: R) -> Result<Self> {
let pos = inner.seek(SeekFrom::Current(0))?;
Ok(BufReaderWithPos {
reader: BufReader::new(inner),
pos,
})
}
}

impl<R: Read + Seek> Read for BufReaderWithPos<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let len = self.reader.read(buf)?;
self.pos += len as u64;
Ok(len)
}
}

impl<R: Read + Seek> Seek for BufReaderWithPos<R> {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
self.pos = self.reader.seek(pos)?;
Ok(self.pos)
}
}

struct BufWriterWithPos<W: Write + Seek> {
writer: BufWriter<W>,
pos: u64,
}

impl<W: Write + Seek> BufWriterWithPos<W> {
fn new(mut inner: W) -> Result<Self> {
let pos = inner.seek(SeekFrom::Current(0))?;
Ok(BufWriterWithPos {
writer: BufWriter::new(inner),
pos,
})
}
}

impl<W: Write + Seek> Write for BufWriterWithPos<W> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let len = self.writer.write(buf)?;
self.pos += len as u64;
Ok(len)
}

fn flush(&mut self) -> io::Result<()> {
self.writer.flush()
}
}

impl<W: Write + Seek> Seek for BufWriterWithPos<W> {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
self.pos = self.writer.seek(pos)?;
Ok(self.pos)
}
}

干得漂亮,朋友。休息一下吧。

评论

Your browser is out-of-date!

Update your browser to view this website correctly.&npsb;Update my browser now

×