PingCap的Rust训练课程2:日志结构文件I/O

前言

任务:创建一个键/值存储程序,能够从命令行访问,支持持久化

目标

  • 编写健壮的错误和异常处理
  • 使用serde进行序列化
  • 使用标准文件API将数据作为日志写入磁盘
  • 从磁盘读取键/值数据的状态
  • 将内存中的索引键映射在磁盘的对应值上
  • 定期压缩日志以删除过期数据

关键词:日志结构文件I/O、bitcask、failurecrate、Read/Writetrait、serdecrate。

扩展练习:尝试使用structoptcrate。

介绍

在这个项目中,您将创建一个简单的基于磁盘的键/值存储程序,并可以通过命令行进行修改和查询。它将使用简化版的bitcask存储算法,该算法在简单和有效之间做了较好的平衡。您的第一个任务是在磁盘上维护先前写入命令的日志(有时称为“预写日志”或“WAL”),在程序启动时,通过对日志求值(译注:即按顺序执行日志)就能够在内存中重建数据库状态。您的第二个任务是继续拓展该程序,将键存储在内存中,将值存储到磁盘的日志中。您的最后一个任务是添加日志压缩功能,以确保日志不会无限增长。在这个项目结束时,您将使用Rust文件API构建一个简单且架构良好的数据库。

专业术语

下面列出本课程中使用的一些术语,部分参考了bitcask,不同的数据库会有略微不同的术语:

  • 命令(command) - 一条对数据库的请求或一条请求的表示。命令来源于命令行或网络。命令有三种表示:内存表示、文本表示以及机器可读的序列化表示。
  • 日志(log) - 由多条命令组成的序列,按照最初接收和执行的顺序放置在磁盘上。我们数据库在磁盘上的格式几乎完全由日志组成。这种实现很简单,而且非常高效。
  • 日志指针(log pointer) - 日志中的文件偏移量,有时我们将其简称为“文件偏移量”(file offset)。
  • 日志压缩(log compaction) - 当向数据库发出写入请求时,新的请求有时会使旧的日志条目无效。例如,写入键/值a = 0,之后再写入a = 1,这就会使”a”的第一条日志失效。压缩 — 至少在我们的数据库中 — 是一个从日志中删除过期的命令以减小数据库大小的过程。
  • 内存索引/索引(in-memory index/index) - 指向日志指针的键映射。当发出读请求时,会在内存索引中查找相应的日志指针,找到后从磁盘日志中取回该值。我们的键/值存储程序与bitcask类似,整个数据库的索引都存储在内存中。
  • 索引文件(index file) - 内存索引的磁盘表示。如果没有该文件,则每次启动数据库时都需要重放日志(重新执行整个日志)以恢复内存索引的状态。

项目需求规格

cargo项目kvs构建了一个名为kvs的命令行键值存储客户端,该客户端又调用了一个名为kvs的库。

kvs可执行文件支持以下命令行参数:

  • kvs set <KEY> <VALUE>
    将字符串键的值设置为字符串值。打印错误并在失败时返回非零退出码。
  • kvs get <KEY>
    获取给定字符串键的字符串值。打印错误并在失败时返回非零退出码。
  • kvs rm <KEY>
    删除给定的键。打印错误并在失败时返回非零退出码。
  • kvs -V
    打印版本。

kvs库包含一个类型,KvStore,它支持以下方法:

  • KvStore::set(&mut self, key: String, value: String) -> Result<()>
    将字符串键设置为字符串值。如果值未设置成功,则返回错误。
  • KvStore::get(&mut self, key: String) -> Result<Option<String>>
    获取字符串键的字符串值。如果键不存在,则返回None。如果读取键值未成功,则返回错误。
  • KvStore::remove(&mut self, key: String) -> Result<()>
    删除给定的key。如果key不存在或未成功删除,则返回错误。
  • KvStore::open(path: impl Into<PathBuf>) -> Result<KvStore>
    从给定路径打开KvStore。返回KvStore。

在设置键值时,kvs将这条set命令写入位于磁盘的顺序日志中,然后将该命令的日志指针(文件偏移量)存储在内存索引中,即从键映射到日志指针。删除键时,类似地,kvs会在日志中写入rm命令,然后从内存索引中删除该键。当使用get命令检索键的值时,它会搜索内存索引,如果找到,则从日志中加载对应日志指针处的命令,对命令求值(译注:执行命令)并返回结果。

启动时,按时间顺序遍历日志中的命令,即可重建内存索引。

当未压缩日志记录大小达到给定阈值时,kvs会将其压缩到新日志中,删除冗余条目以回收磁盘空间。

请注意,我们的kvs项目既是一个无状态的命令行程序,也是一个包含有状态KvStore类型的库:作为命令行程序使用,KvStore类型将加载索引、执行命令、并退出;作为库使用,它将加载索引,然后执行多条命令,维护索引状态,直到它被删除。

项目设置

在上一个项目的基础上进行本项目,删除上一个项目的tests目录,将本项目的tests目录复制到该位置。和上一个项目一样,本项目应该包含一个库和一个可执行文件,两者都叫做kvs

Cargo.toml中需要以下开发依赖项:

Cargo.toml
1
2
3
4
5
[dev-dependencies]
assert_cmd = "0.11.0"
predicates = "1.0.0"
tempfile = "3.0.7"
walkdir = "2.2.7"

与上一个项目一样,先编写空函数或panic来构建测试用例。

现在就试试吧。


我的作业:

lib.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/// 若仅让tests能够运行,则需要修改的地方很少

// 添加`PathBuf`导入
use std::{collections::HashMap, path::PathBuf};

// 添加Result<T>类型
pub type Result<T> = std::result::Result<T, ()>;

// 修改`KvStore`中函数的返回类型
impl KvStore {
//...
pub fn open(path: impl Into<PathBuf>) -> Result<KvStore> {
Ok(Self::new())
}
//...
pub fn get(&self, key: String) -> Result<Option<String>> {
match self.db.get(&key) {
Some(value) => Ok(Some(value.to_string())),
None => Err(())
}
}
//...
pub fn set(&mut self, key: String, value: String) -> Result<()> {
self.db.insert(key, value);
Ok(())
}
//...
pub fn remove(&mut self, key: String) -> Result<()>{
self.db.remove(&key);
Ok(())
}

cargo test结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
...
running 17 tests
test cli_invalid_subcommand ... ok
test cli_get_non_existent_key ... FAILED
test cli_rm_non_existent_key ... FAILED
test cli_get_stored ... FAILED
test cli_no_args ... ok
test get_non_existent_value ... FAILED
test get_stored_value ... FAILED
test compaction ... FAILED
test cli_invalid_get ... ok
test overwrite_value ... FAILED
test remove_non_existent_key ... FAILED
test cli_version ... ok
test remove_key ... FAILED
test cli_invalid_rm ... ok
test cli_set ... FAILED
test cli_rm_stored ... FAILED
test cli_invalid_set ... ok
...

第1部分:异常处理

在本项目中,代码运行时可能会因为I/O错误而失败。因此,在开始实现数据库之前,我们需要再做一件对Rust项目至关重要的事情:确定错误处理策略。

Rust的错误处理功能强大,但需要大量模板代码才能正确使用。本项目使用failurecrate以获得轻松处理各种错误的工具。

异常处理指南描述了几种常见的错误处理模式。

在您的实现中,选择其中一种策略,定义您自己的错误类型,或导入failuresError。您将在所有Result中使用该错误类型,使用?运算符将其他crate中的错误类型转换为您自己定义的错误类型。

接下来,给包含您自定义错误类型的Result起一个类型别名,这样您就不需要在项目中键入Result<T, YourErrorType>了,只需Result<T>即可。这是Rust中常见的模式。

最后,使用use语句将这些类型导入您的可执行文件,并更改main函数签名以返回Result<()>。库中所有可能失败的函数都会将这些Result沿栈一路向下传递到main,然后传递到Rust运行时并打印出错误信息。

运行cargo check查找编译器错误并予以修复。目前可以用panic!()结束main函数以使项目通过编译。

在继续之前确定您的异常处理策略

与之前的项目一样,您需要创建用于占位的数据结构和方法,以使测试能够编译。现在您已经定义了一个错误类型,通过编译应该很简单。可以在任何地方添加panic以使得测试套件能够通过编译(cargo test --no-run)。

注意:Rust中的错误处理实践仍在不断发展。本课程目前使用failurecrate来简化定义错误类型的过程。虽然failure设计良好,但使用它可能并不是最佳实践。 Rust专家可能不会继续看好这一实践。本课程也在迭代更新,未来可能不会使用failure。不过起码目前这一实践还不错,也提供了一个了解更多Rust错误处理发展细节的机会。


我的作业:

lib.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
//...
mod error;
pub use error::{Result, KvsError};
//...
impl KvStore {
//...
pub fn get(&self, key: String) -> Result<Option<String>> {
match self.db.get(&key) {
Some(value) => Ok(Some(value.to_string())),
None => Err(KvsError::KeyNotFound(key))
}
}
//...
}

新增的自定义错误error模块(其实只定义一个错误就能通过编译):

error.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
use failure::Fail;
use std::io;

/// custom error type of kvs
#[derive(Fail, Debug)]
pub enum KvsError {
/// Key not found in kvs index
#[fail(display="Key `{}` not found", _0)]
KeyNotFound(String),
}

/// simplify `Result<T, KvsError>` to `Result<T>`
pub type Result<T> = std::result::Result<T, KvsError>;

第2部分 日志行为

现在我们终于要开始实现一个数据库的基础功能并读写磁盘了。您将使用serde将”set”和”rm”命令序列化为字符串,再使用标准文件I/O API将其写入磁盘。

这是带日志kvs的基本行为:

  • “set”:
    • 用户调用kvs set mykey myvalue
    • kvs创建一个值来表示”set”命令,包含命令的键和值
    • 将该命令序列化为String
    • 将序列化后的命令追加到日志文件中
    • 若执行成功,则返回错误码0,静默退出
    • 若执行失败,则打印错误,返回非零错误码并退出
  • “get”
    • 用户调用kvs get mykey
    • kvs读取整个日志,一次一条命令,将被命令影响的键及文件偏移量录入到内存中的“键->日志指针”映射中
    • 查找日志指针映射
    • 若查找失败,则打印”Key not found”,并以退出码0退出
    • 若查找成功:
      • 反序列化命令以获取该键的最新记录值
      • 将值打印到标准输出,并以退出码0退出
  • “rm”
    • 用户调用kvs rm mykey
    • 与”get”命令相同,kvs读取整个日志以重建内存索引
    • 在映射中查找给定的键
    • 若键不存在,则打印”Key not found”,并以非零错误码退出
    • 若找到该键:
      • 创建一个值来表示”rm”命令,包含命令的键
      • 将序列化的命令追加到日志中
      • 若成功完成,则以错误码0静默退出

日志是一份提交到数据库的事务的记录。通过在启动时“重放”日志中的记录,即可重建数据库先前的状态。

在本次迭代中,您可以将键的值直接存储在内存中(因此在程序初始化和日志重放后就不再需要从读取日志了)。在后面的迭代中,您只需将“日志指针”(文件偏移量)存储在内存中。

第3部分:写日志

您将从实现”set”命令开始,有很多步骤,大多数步骤都很容易实现。您可以通过运行相应的的cli_*测试用例来验证。

serde是一个有有很多选项的大型库,支持多种序列化格式。只需要正确注释您的数据结构即可使用基础的序列化/反序列化功能,然后再调用函数将其写入String或任何实现了Writetrait的流。

您需要选择一种序列化格式。考虑您想要的序列化属性 — 需要优先考虑性能吗?需要以纯文本形式查看日志内容吗?这都是您选择时需要考虑的属性,您应该在代码注释中解释您的选择。

其他需要考虑的因素包括:系统的哪些部分需要使用缓冲、您需要在哪里设置缓冲?缓冲对后续的读操作有什么影响?什么时候打开/关闭文件句柄?每个命令都需要操作句柄吗?还是在KvStore的整个生命周期内都不释放句柄?(译注,可以使用BufWriter提高写效率)

您会用到的部分API可能会调用失败,并返回包含某种错误类型的Result。确保函数返回包含您自定义错误类型的Result,以决定是否使用?传播异常。

实现”rm”命令也类似,但您应该在将命令写入日志前,检查该键是否已经存在。由于我们必须区分两个不同的命令,因此您可以使用枚举类型变量来表示每个命令。而serde可以完美应用在枚举类型上。

您现在可以实现”set”和”rm”命令了,专注于通过set/rm的相关测试用例。此外您也可以继续阅读下一节的”get”命令相关内容。在实现过程中,同时考虑读、写两种操作可能使代码更容易编写。逐项实现或同时实现均可。

第4部分:读日志

现在该实现”get”命令了。您暂时还不用考虑在索引中存储日志指针的特性,我们将该特性的实现留到下一部分。目前,您只需在启动时读取日志中的每个命令,执行这些指令,以将所有键/值保存在内存中,再从内存中读取这些键值。

此时您需要考虑:应该把日志中的所有记录一次读入内存,再将一次性重放以初始化你的映射类型;还是应该一次读取一条的重放命令并初始化您的映射?应该在反序列化前先将日志读取到缓冲区;还是应该直接反序列化一个文件流?思考您的实现对应的内存使用情况,也要考虑从I/O流中读取数据时与内核交互的情况。(译注,可以使用BufReader提高读效率)

请记住,”get”也可能找不到指定键,这种情况需要特别处理。本项目的设定为,API返回None,且命令行客户端打印一条特定的消息并以零退出码退出。

读取日志有一个复杂之处,您可能在编写”set”代码时已经考虑过:如何区分日志文件中的每条记录?也就是说,程序怎么知道一条记录在哪里结束,而下一题记录从哪里开始?这会是一个问题吗?也许serde会直接从I/O流中反序列化一条记录,并在完成后停止读取,将文件光标留在正确的位置以便继续读取后面的记录?也许serde在看到两条记录粘在一起时会报错?也许你需要插入额外的信息来区分每条记录的长度?也许不用?

现在实现”get”命令。

(译注:此时使用BufReaderBufWriter实现即可,暂不需要记录命令在日志中的位置。虽然此时应新建Command枚举类型,并使用serde_json从日志文件中读取整条命令,但内存中的存储索引的形式暂时仍为BTreeMap<String, String>。同样,写操作也使用serde_json直接写入Command枚举类型。)

第5部分:在索引中存储日志指针

此时,测试条件中,除了压缩测试外的其他测试都应该能够通过。后续步骤中引入的是一些优化特性,这对于提高性能和减少存储空间是十分必要的。在实现这些特性时,请注意我们在优化什么。

正如我们所描述的,您正在构建的数据库将在内存中维护所有键的索引。该索引从键的字符串映射到日志的指针,而不是映射值本身。

此更改需要我们能够从日志的任意偏移量处读取数据,考虑如何按照此更改修改您的文件句柄管理方式。

如果在前面的步骤中,您选择将值的字符串直接存储在内存中,那么现在是时候更新代码,以存放日志指针了,然后实现按需从磁盘加载。

(译注:此时需要引入变量记录读写日志文件的位置,并将内存中的存储索引形式修改为BTreeMap<String, CommandPos{ pos: u64, len: u64 }>

第6部分:无状态KvStore 与 有状态KvStore

请记住,我们的项目既是库又是命令行程序。它们的要求略有不同:kvs命令行程序将单个更改提交到磁盘,然后退出(它是无状态的);KvStore类将更改提交到磁盘,然后驻留在内存中以提供的查询服务(它是有状态的)。

您的KvStore是有状态的还是无状态的?

让您的KvStore将索引保留在内存中,这样它就不需要为每次接受get命令时都对日志求值。

第7部分:压缩日志

此时数据库工作正常,但日志无限增长。这适用于某些数据库,但不适用于我们正在构建的数据库 — 希望尽可能减少磁盘用量。

因此,创建数据库的最后一步是压缩日志。在使用中,随着日志的增长,对于某个键,可能会有多条命令进行设置值或删除键。不过对于该键,只有最新的修改命令才会修改它的值:

idx command
0 Command::Set(“key-1”, “value-1a”)
20 Command::Set(“key-2”, “value-2”)
100 Command::Set(“key-1”, “value-1b”)

在上面的例子中,索引0的命令显然是多余的,因此不需要存储它。日志压缩就是重建日志,以删除冗余命令:

idx command
0 Command::Set(“key-2”, “value-2”)
99 Command::Set(“key-1”, “value-1b”)

基本算法如下:

您应当自行考虑如何重建日志。同时思考以下问题:朴素的解决方案是什么?需要多少内存?压缩日志所需的最小复制量是多少?压缩可以就地完成吗?如果压缩失败,您如何保持数据完整性?

到目前为止,我们一直再说“日志文件”,但实际上数据库通常会将大量日志存储在不同的文件中。在实现过程中您可能会发现,如果跨文件拆分日志将会更容易的压缩日志。

为您的数据库实现日志压缩功能。

恭喜!您已经成功实现了一个功能齐全的数据库。

如果您仍然好奇,那么现在可以对比您的键/值数据库与其他同类数据库(如sledbitcaskbadgerRocksDB)的性能。您可以继续研究他们的架构,思考他们的架构与您的架构的异同,以及架构如何影响性能。接下来的几个项目将为您提供优化的机会。


目录结构:

1
2
3
4
5
6
7
8
9
10
11
12
.
├── Cargo.lock
├── Cargo.toml
├── README.md
├── src
│ ├── bin
│ │ └── kvs.rs
│ ├── error.rs
│ ├── kv.rs
│ └── lib.rs
└── tests
└── tests.rs

主要改动了lib.rs,将功能实现迁移至kv.rs,将错误处理迁移至error.rs

lib.rs
1
2
3
4
5
6
7
8
#![deny(missing_docs)]
//! A simple key/value store.

pub use error::{KvsError, Result};
pub use kv::KvStore;

mod error;
mod kv;
kv.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
use std::collections::{BTreeMap, HashMap};
use std::fs::{self, File, OpenOptions};
use std::io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write};
use std::ops::Range;
use std::path::{Path, PathBuf};

use serde::{Deserialize, Serialize};
use serde_json::Deserializer;

use crate::{KvsError, Result};
use std::ffi::OsStr;

const COMPACTION_THRESHOLD: u64 = 1024 * 1024;

/// The `KvStore` stores string key/value pairs.
///
/// Key/value pairs are persisted to disk in log files. Log files are named after
/// monotonically increasing generation numbers with a `log` extension name.
/// A `BTreeMap` in memory stores the keys and the value locations for fast query.
///
/// ```rust
/// # use kvs::{KvStore, Result};
/// # fn try_main() -> Result<()> {
/// use std::env::current_dir;
/// let mut store = KvStore::open(current_dir()?)?;
/// store.set("key".to_owned(), "value".to_owned())?;
/// let val = store.get("key".to_owned())?;
/// assert_eq!(val, Some("value".to_owned()));
/// # Ok(())
/// # }
/// ```
pub struct KvStore {
// directory for the log and other data.
path: PathBuf,
// map generation number to the file reader.
readers: HashMap<u64, BufReaderWithPos<File>>,
// writer of the current log.
writer: BufWriterWithPos<File>,
current_gen: u64,
index: BTreeMap<String, CommandPos>,
// the number of bytes representing "stale" commands that could be
// deleted during a compaction.
uncompacted: u64,
}

impl KvStore {
/// Opens a `KvStore` with the given path.
///
/// This will create a new directory if the given one does not exist.
///
/// # Errors
///
/// It propagates I/O or deserialization errors during the log replay.
pub fn open(path: impl Into<PathBuf>) -> Result<KvStore> {
let path = path.into();
fs::create_dir_all(&path)?;

let mut readers = HashMap::new();
let mut index = BTreeMap::new();

let gen_list = sorted_gen_list(&path)?;
let mut uncompacted = 0;

for &gen in &gen_list {
let mut reader = BufReaderWithPos::new(File::open(log_path(&path, gen))?)?;
uncompacted += load(gen, &mut reader, &mut index)?;
readers.insert(gen, reader);
}

let current_gen = gen_list.last().unwrap_or(&0) + 1;
let writer = new_log_file(&path, current_gen, &mut readers)?;

Ok(KvStore {
path,
readers,
writer,
current_gen,
index,
uncompacted,
})
}

/// Sets the value of a string key to a string.
///
/// If the key already exists, the previous value will be overwritten.
///
/// # Errors
///
/// It propagates I/O or serialization errors during writing the log.
pub fn set(&mut self, key: String, value: String) -> Result<()> {
let cmd = Command::set(key, value);
let pos = self.writer.pos;
serde_json::to_writer(&mut self.writer, &cmd)?;
self.writer.flush()?;
if let Command::Set { key, .. } = cmd {
if let Some(old_cmd) = self
.index
.insert(key, (self.current_gen, pos..self.writer.pos).into())
{
self.uncompacted += old_cmd.len;
}
}

if self.uncompacted > COMPACTION_THRESHOLD {
self.compact()?;
}
Ok(())
}

/// Gets the string value of a given string key.
///
/// Returns `None` if the given key does not exist.
///
/// # Errors
///
/// It returns `KvsError::UnexpectedCommandType` if the given command type unexpected.
pub fn get(&mut self, key: String) -> Result<Option<String>> {
if let Some(cmd_pos) = self.index.get(&key) {
let reader = self
.readers
.get_mut(&cmd_pos.gen)
.expect("Cannot find log reader");
reader.seek(SeekFrom::Start(cmd_pos.pos))?;
let cmd_reader = reader.take(cmd_pos.len);
if let Command::Set { value, .. } = serde_json::from_reader(cmd_reader)? {
Ok(Some(value))
} else {
Err(KvsError::UnexpectedCommandType)
}
} else {
Ok(None)
}
}

/// Removes a given key.
///
/// # Errors
///
/// It returns `KvsError::KeyNotFound` if the given key is not found.
///
/// It propagates I/O or serialization errors during writing the log.
pub fn remove(&mut self, key: String) -> Result<()> {
if self.index.contains_key(&key) {
let cmd = Command::remove(key);
serde_json::to_writer(&mut self.writer, &cmd)?;
self.writer.flush()?;
if let Command::Remove { key } = cmd {
let old_cmd = self.index.remove(&key).expect("key not found");
self.uncompacted += old_cmd.len;
}
Ok(())
} else {
Err(KvsError::KeyNotFound)
}
}

/// Clears stale entries in the log.
pub fn compact(&mut self) -> Result<()> {
// increase current gen by 2. current_gen + 1 is for the compaction file.
let compaction_gen = self.current_gen + 1;
self.current_gen += 2;
self.writer = self.new_log_file(self.current_gen)?;

let mut compaction_writer = self.new_log_file(compaction_gen)?;

let mut new_pos = 0; // pos in the new log file.
for cmd_pos in &mut self.index.values_mut() {
let reader = self
.readers
.get_mut(&cmd_pos.gen)
.expect("Cannot find log reader");
if reader.pos != cmd_pos.pos {
reader.seek(SeekFrom::Start(cmd_pos.pos))?;
}

let mut entry_reader = reader.take(cmd_pos.len);
let len = io::copy(&mut entry_reader, &mut compaction_writer)?;
*cmd_pos = (compaction_gen, new_pos..new_pos + len).into();
new_pos += len;
}
compaction_writer.flush()?;

// remove stale log files.
let stale_gens: Vec<_> = self
.readers
.keys()
.filter(|&&gen| gen < compaction_gen)
.cloned()
.collect();
for stale_gen in stale_gens {
self.readers.remove(&stale_gen);
fs::remove_file(log_path(&self.path, stale_gen))?;
}
self.uncompacted = 0;

Ok(())
}

/// Create a new log file with given generation number and add the reader to the readers map.
///
/// Returns the writer to the log.
fn new_log_file(&mut self, gen: u64) -> Result<BufWriterWithPos<File>> {
new_log_file(&self.path, gen, &mut self.readers)
}
}

/// Create a new log file with given generation number and add the reader to the readers map.
///
/// Returns the writer to the log.
fn new_log_file(
path: &Path,
gen: u64,
readers: &mut HashMap<u64, BufReaderWithPos<File>>,
) -> Result<BufWriterWithPos<File>> {
let path = log_path(&path, gen);
let writer = BufWriterWithPos::new(
OpenOptions::new()
.create(true)
.write(true)
.append(true)
.open(&path)?,
)?;
readers.insert(gen, BufReaderWithPos::new(File::open(&path)?)?);
Ok(writer)
}

/// Returns sorted generation numbers in the given directory.
fn sorted_gen_list(path: &Path) -> Result<Vec<u64>> {
let mut gen_list: Vec<u64> = fs::read_dir(&path)?
.flat_map(|res| -> Result<_> { Ok(res?.path()) })
.filter(|path| path.is_file() && path.extension() == Some("log".as_ref()))
.flat_map(|path| {
path.file_name()
.and_then(OsStr::to_str)
.map(|s| s.trim_end_matches(".log"))
.map(str::parse::<u64>)
})
.flatten()
.collect();
gen_list.sort_unstable();
Ok(gen_list)
}

/// Load the whole log file and store value locations in the index map.
///
/// Returns how many bytes can be saved after a compaction.
fn load(
gen: u64,
reader: &mut BufReaderWithPos<File>,
index: &mut BTreeMap<String, CommandPos>,
) -> Result<u64> {
// To make sure we read from the beginning of the file.
let mut pos = reader.seek(SeekFrom::Start(0))?;
let mut stream = Deserializer::from_reader(reader).into_iter::<Command>();
let mut uncompacted = 0; // number of bytes that can be saved after a compaction.
while let Some(cmd) = stream.next() {
let new_pos = stream.byte_offset() as u64;
match cmd? {
Command::Set { key, .. } => {
if let Some(old_cmd) = index.insert(key, (gen, pos..new_pos).into()) {
uncompacted += old_cmd.len;
}
}
Command::Remove { key } => {
if let Some(old_cmd) = index.remove(&key) {
uncompacted += old_cmd.len;
}
// the "remove" command itself can be deleted in the next compaction.
// so we add its length to `uncompacted`.
uncompacted += new_pos - pos;
}
}
pos = new_pos;
}
Ok(uncompacted)
}

fn log_path(dir: &Path, gen: u64) -> PathBuf {
dir.join(format!("{}.log", gen))
}

/// Struct representing a command.
#[derive(Serialize, Deserialize, Debug)]
enum Command {
Set { key: String, value: String },
Remove { key: String },
}

impl Command {
fn set(key: String, value: String) -> Command {
Command::Set { key, value }
}

fn remove(key: String) -> Command {
Command::Remove { key }
}
}

/// Represents the position and length of a json-serialized command in the log.
struct CommandPos {
gen: u64,
pos: u64,
len: u64,
}

impl From<(u64, Range<u64>)> for CommandPos {
fn from((gen, range): (u64, Range<u64>)) -> Self {
CommandPos {
gen,
pos: range.start,
len: range.end - range.start,
}
}
}

struct BufReaderWithPos<R: Read + Seek> {
reader: BufReader<R>,
pos: u64,
}

impl<R: Read + Seek> BufReaderWithPos<R> {
fn new(mut inner: R) -> Result<Self> {
let pos = inner.seek(SeekFrom::Current(0))?;
Ok(BufReaderWithPos {
reader: BufReader::new(inner),
pos,
})
}
}

impl<R: Read + Seek> Read for BufReaderWithPos<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let len = self.reader.read(buf)?;
self.pos += len as u64;
Ok(len)
}
}

impl<R: Read + Seek> Seek for BufReaderWithPos<R> {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
self.pos = self.reader.seek(pos)?;
Ok(self.pos)
}
}

struct BufWriterWithPos<W: Write + Seek> {
writer: BufWriter<W>,
pos: u64,
}

impl<W: Write + Seek> BufWriterWithPos<W> {
fn new(mut inner: W) -> Result<Self> {
let pos = inner.seek(SeekFrom::Current(0))?;
Ok(BufWriterWithPos {
writer: BufWriter::new(inner),
pos,
})
}
}

impl<W: Write + Seek> Write for BufWriterWithPos<W> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let len = self.writer.write(buf)?;
self.pos += len as u64;
Ok(len)
}

fn flush(&mut self) -> io::Result<()> {
self.writer.flush()
}
}

impl<W: Write + Seek> Seek for BufWriterWithPos<W> {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
self.pos = self.writer.seek(pos)?;
Ok(self.pos)
}
}
error.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
use failure::Fail;
use std::io;

/// Error type for kvs.
#[derive(Fail, Debug)]
pub enum KvsError {
/// IO error.
#[fail(display = "{}", _0)]
Io(#[cause] io::Error),
/// Serialization or deserialization error.
#[fail(display = "{}", _0)]
Serde(#[cause] serde_json::Error),
/// Removing non-existent key error.
#[fail(display = "Key not found")]
KeyNotFound,
/// Unexpected command type error.
/// It indicated a corrupted log or a program bug.
#[fail(display = "Unexpected command type")]
UnexpectedCommandType,
}

impl From<io::Error> for KvsError {
fn from(err: io::Error) -> KvsError {
KvsError::Io(err)
}
}

impl From<serde_json::Error> for KvsError {
fn from(err: serde_json::Error) -> KvsError {
KvsError::Serde(err)
}
}

/// Result type for kvs.
pub type Result<T> = std::result::Result<T, KvsError>;

干得漂亮,朋友,休息一下吧。

评论

Your browser is out-of-date!

Update your browser to view this website correctly.&npsb;Update my browser now

×