PingCap的Rust训练课程3:同步的“客户端-服务端”网络模块

前言

任务:创建一个单线程、持久化的键/值存储的服务器和客户端,使用自定义协议进行同步联网

目标

  • 创建一个客户端-服务器应用程序
  • std库的网络API编写自定义协议
  • 为服务端引入日志记录功能
  • 用trait实现可插拔的后端
  • sled对手写的后端进行基准测试

关键词std::net、日志、trait、基准测试。

介绍

在这个项目中,你将创建一个简单的键/值存储服务端和客户端,它们将用你自定义的网络协议进行通信。你将使用标准的日志crate生成日志,并正确处理网络边界上的错误。一旦有了一个可运行的客户端-服务端架构,那么你就能够抽象出trait背后的存储引擎,并将你的实现与sled引擎进行性能比较。

项目需求规格

cargo项目kvs建立了一个名为kvs-client的命令行键值存储客户端,和一个名为kvs-server的键值存储服务端,二者又都调用了一个名为kvs的库。客户端通过一个自定义协议与服务端通信。

kvs-server可执行文件支持以下命令行参数:

  • kvs-server [--addr IP-PORT] [--engine ENGINE-NAME]

    启动服务端并开始监听进入的连接。--addr接受一个IP地址(可以是v4或v6)以及一个端口号,格式为IP:PORT。如果没有指定--addr,则默认监听127.0.0.1:4000

    如果指定了--engine,那么ENGINE-NAME必须是”kvs”(即使用内置引擎),或者是”sled”(即使用sled引擎)。如果这是首次运行程序(没有以前保存的数据),那么默认值是”kvs”;如果以前有保存的数据,那么默认使用已经在用的引擎。如果以前保存的数据与当前选择的引擎不同,则打印一个错误并以非零的退出码退出。

    如果套接字绑定失败,或ENGINE-NAME无效,或IP-PORT不能解析为一个地址,则打印一个错误并以非零的退出码退出。

  • kvs-server -V

    打印版本信息。

kvs-client可执行文件支持以下命令行参数:

  • kvs-client set <KEY> <VALUE> [--addr IP-PORT]

    将一个字符串键的值设置为一个字符串。

    --addr接受一个IP地址(可以是v4或v6)以及一个端口号,格式为IP:PORT。如果没有指定--addr,则在默认127.0.0.1:4000上连接。

    如果服务器出错,或者IP-PORT不能解析为一个地址,则打印错误并以非零的退出码退出。

  • kvs-client get <KEY> [--addr IP-PORT]

    获取一个给定的字符串键的字符串值。

    --addr接受一个IP地址(可以是v4或v6)以及一个端口号,格式为IP:PORT。如果没有指定--addr,则在默认127.0.0.1:4000上连接。

    如果服务器出错,或者IP-PORT不能解析为一个地址,则打印错误并以非零的退出码退出。

  • kvs-client rm <KEY> [--addr IP-PORT]

    删除一个给定的字符串键。

    --addr接受一个IP地址(可以是v4或v6)以及一个端口号,格式为IP:PORT。如果没有指定--addr,则在默认127.0.0.1:4000上连接。

    如果服务器出错,或者IP-PORT不能解析为一个地址,则打印错误并以非零的退出码退出。

  • kvs-client -V

    打印版本信息。

所有错误信息都应打印到stderr。

kvs库包含四种类型。

  • KvsClient - 为kvs-client实现与kvs-server通信所需的功能。
  • KvsServer - 为kvs-server实现响应kvs-client请求的功能。
  • KvsEngine trait - 定义了KvsServer需要调用的存储接口。
  • KvStore - 手动实现KvsEngine trait
  • SledKvsEngine - 为sled存储引擎实现KvsEngine trait。

KvsClientKvsServer的设计由你决定,并将使用你自定义的网络协议来通信。本项目的测试套件并不直接调用这两种类型,而仅是通过CLI测试它们。

KvsEngine trait支持以下方法:

  • KvsEngine::set(&mut self, key: String, value: String) -> Result<()>

    将一个字符串键的值设置为一个字符串。

    如果值没有成功写入,则返回错误。

  • KvsEngine::get(&mut self, key: String) -> Result<Option<String>>

    获取一个字符串键的字符串值。如果键不存在,返回None

    如果没有成功读取该值,则返回错误。

  • KvsEngine::remove(&mut self, key: String) -> Result<()>

    删除一个给定的字符串键。

    如果键不存在或值未被成功读取,则返回错误。

当为一个键设置值时,KvStore会把set命令写入磁盘的顺序日志。当删除一个键时,KvStorerm命令写入日志。在启动时,日志中的命令将被重新求值,各键的最新一条设置命令的日志指针(文件偏移量)将会被保存在内存索引中。

当用get命令检索一个键的值时,它会搜索索引,如果找到了,就从日志的相应位置中加载相应命令并求值。

当未压缩的日志条目的大小达到一个给定的阈值时,KvStore将其压缩成一个新的日志,删除多余的条目以释放磁盘空间。

项目设置

继续你以前的项目,删除你以前的tests目录,把这本项目的tests目录复制到它的位置。这个项目应该包含一个名为kvs的库,以及两个可执行文件,kvs-serverkvs-client

你需要在Cargo.toml中加入以下dev-dependencies依赖:

1
2
3
4
5
6
7
[dev-dependencies]
assert_cmd = "0.11"
criterion = "0.3"
predicates = "1.0.0"
rand = "0.6.5"
tempfile = "3.0.7"
walkdir = "2.2.7"

与以前的项目一样,添加足够的定义,以构建测试套件。


使测试套件编译通过只需要:

  1. 按上文要求在lib.rs中添加一个新模块mod engines
  2. 创建新模块engines\mod.rs
  3. 在新模块mod.rs中按要求加入KvsEngine trait的定义即可。
  4. lib.rs中公开引用这个trait即可:pub use engines::KvsEngine;

目录结构为:

1
2
3
4
5
6
7
8
9
10
11
12
13
.
├── Cargo.toml
├── src
│ ├── bin
│ │ └── kvs.rs
│ ├── engines
│ │ └── mod.rs
│ ├── error.rs
│ ├── kv.rs
│ └── lib.rs
└── tests
├── cli.rs
└── kv_store.rs

目前cli.rs中的11个测试均未通过:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
cargo test --test cli
...
running 11 tests
test cli_log_configuration ... FAILED
test cli_access_server_kvs_engine ... FAILED
test cli_wrong_engine ... FAILED
test client_cli_invalid_get ... FAILED
test client_cli_invalid_rm ... FAILED
test client_cli_invalid_set ... FAILED
test cli_access_server_sled_engine ... FAILED
test client_cli_invalid_subcommand ... FAILED
test client_cli_version ... FAILED
test server_cli_version ... FAILED
test client_cli_no_args ... FAILED
...

kv_store.rs中的均通过测试:

1
2
3
4
5
6
7
8
cargo test --test kv_store
running 6 tests
test remove_non_existent_key ... ok
test remove_key ... ok
test get_stored_value ... ok
test get_non_existent_value ... ok
test overwrite_value ... ok
test compaction ... ok

第1部分:解析命令行

本项目中的命令行解析同之前的项目相比没有多少新功能。kvs-client二进制文件接受与之前项目相同的命令行参数。而kvs-server有一组自己的命令行参数需要处理,就像上文描述的那样。

处理kvs-server的命令行参数。


kvs-server的参数比较简单,依然使用clapderive特性编写,由于Rust已经为&str实现了ToSocketAddrs特性,Args中直接使用SocketAddr类型会自动触发字符串到IP地址的转换:

kvs-server.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
use clap::{Parser};
use kvs::{Result};
use std::net::SocketAddr;

const DEFAULT_LISTENING_ADDRESS: &str = "127.0.0.1:4000";
const DEFAULT_ENGINE: Engine = Engine::kvs;

#[allow(non_camel_case_types)]
#[derive(clap::ArgEnum, Clone, Debug)]
enum Engine {
kvs,
sled,
}

impl FromStr for Engine {
type Err = KvsError;

fn from_str(s: &str) -> Result<Self> {
match s {
"kvs" => {
Ok(Engine::kvs)
},
"sled" => {
Ok(Engine::sled)
}
_ => {
Err(KvsError::UnexpectedEngineType)
}
}
}
}

#[derive(Parser, Debug)]
#[clap(name = "kvs-server")]
#[clap(author, version, about="server of key value storage", long_about=None)]
struct Args {
/// [IPADDR:PORT] of server.
#[clap(long, default_value=DEFAULT_LISTENING_ADDRESS)]
addr: SocketAddr,

/// backend engine of key value storage.
#[clap(arg_enum, long, required(false))]
engine: Option<Engine>,
}


fn main() -> Result<()> {
let args = Args::parse();
println!("{:?} {:?}", args.addr, args.engine);

Ok(())
}

kvs-client需要在子命令后面再添加可选参数(IP地址),所以可以使用clap的builder编写模式增加可读性。当然,也可以继续使用derive特性编写更简略的代码:

kvs-client.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
// builder模式

use clap::{Command, arg, crate_version};
use kvs::{Result};
use std::net::SocketAddr;

const DEFAULT_LISTENING_ADDRESS: &str = "127.0.0.1:4000";

fn cli() -> Command<'static> {
Command::new("kvs-client")
.about("client of key value storage")
.version(crate_version!())
.subcommand_required(true)
.disable_help_subcommand(true)
.args_conflicts_with_subcommands(true)
.subcommand(
Command::new("get")
.about("Get the string value of a given string key")
.args(&[
arg!(<KEY> "A string key"),
arg!(--addr <"IP:PORT"> "Set the server address as IP:PORT").required(false).default_value(DEFAULT_LISTENING_ADDRESS)
])
)
.subcommand(
Command::new("set")
.about("Set the value of a string key to a string")
.args(&[
arg!(<KEY> "A string key"),
arg!(<VALUE> "The string value of the key").required(true),
arg!(--addr <"IP:PORT"> "Set the server address as IP:PORT").required(false).default_value(DEFAULT_LISTENING_ADDRESS)
])
)
.subcommand(
Command::new("rm")
.about("Remove a given string key")
.args(&[
arg!(<KEY> "A string key"),
arg!(--addr <"IP:PORT"> "Set the server address as IP:PORT").required(false).default_value(DEFAULT_LISTENING_ADDRESS)
])
)
}

fn main() -> Result<()>{
let matches = cli().get_matches();

match matches.subcommand() {
Some(("get", sub_matches)) => {
println!("get {} {}", sub_matches.value_of("KEY").unwrap(), sub_matches.value_of("addr").unwrap());
let key = sub_matches.value_of("KEY").unwrap();
let addr: SocketAddr = sub_matches.value_of("addr").unwrap().parse()?;
},
Some(("set", sub_matches)) => {
println!("set {} {} {}", sub_matches.value_of("KEY").unwrap(), sub_matches.value_of("VALUE").unwrap(), sub_matches.value_of("addr").unwrap());
let key = sub_matches.value_of("KEY").unwrap();
let value = sub_matches.value_of("VALUE").unwrap();
let addr: SocketAddr = sub_matches.value_of("addr").unwrap().parse()?;
}
Some(("rm", sub_matches)) => {
println!("rm {} {}", sub_matches.value_of("KEY").unwrap(), sub_matches.value_of("addr").unwrap());
let key = sub_matches.value_of("KEY").unwrap();
let addr: SocketAddr = sub_matches.value_of("addr").unwrap().parse()?;
},

_ => unreachable!(), // If all subcommands are defined above, anything else is unreachabe!()

}

Ok(())
}

// derive模式

use clap::{Parser, Subcommand};
use kvs::{KvStore, Result};
use std::process::exit;
use std::env::current_dir;
use std::net::SocketAddr;

const DEFAULT_LISTENING_ADDRESS: &str = "127.0.0.1:4000";

#[derive(Parser)]
#[clap(name = "kvs")]
#[clap(author, version, about="an in-memory key/value store", long_about=None, args_conflicts_with_subcommands(true))]
struct Cli {
#[clap(subcommand)]
command: Commands,
}

#[derive(Subcommand)]
enum Commands {
// Set the value of a string key to a string
#[clap(arg_required_else_help = true)]
set {
// A string key
key: String,
// The string value of the key
value: String,
#[clap(long, required(false), default_value(DEFAULT_LISTENING_ADDRESS), help="Set the server address as [IP:PORT]")]
addr: SocketAddr
},

// Get the string value of a given string key
#[clap(arg_required_else_help = true)]
get {
// A string key
key: String,
#[clap(long, required(false), default_value(DEFAULT_LISTENING_ADDRESS), help="Set the server address as [IP:PORT]")]
addr: SocketAddr
},

// Remove a given key
#[clap(arg_required_else_help = true)]
rm {
// A string key
key: String,
#[clap(long, required(false), default_value(DEFAULT_LISTENING_ADDRESS), help="Set the server address as [IP:PORT]")]
addr: SocketAddr
},
}

fn main() -> Result<()> {
let cli= Cli::parse();
match cli.command {
Commands::get { key, addr } => {},
Commands::set { key, value, addr } => {},
Commands::rm { key, addr } => {},
}

Ok(())
}

第2部分:日志记录

生产环境下的服务器应用程序应具有功能强大且可配置的日志记录。因此,现在我们应为kvs-server添加日志功能,并在后续功能实现的过程中记录有用的信息。在开发过程中,通常使用debug!trace!级日志来打印调试信息。

Rust中有两个好用的日志系统:logslog。两者均为不同级别的日志输出类似的宏,如error!info!等。两者都是可扩展的,支持不同的后端,以输出日志到控制台、日志文件或系统日志等。

最主要的区别是,log相当简单,只记录格式化的字符串;slog功能丰富,支持“结构化日志”,其日志条目是以容易解析的格式类型化和序列化的。

log可以追溯到Rust最早的时候,它是编译器的一部分,然后是标准库的一部分,最后作为独立的crate发布。它由Rust项目组维护。slog是较新的crate,独立维护。两者均已被广泛使用。

对于这两种系统,都需要选择一个“接收器”crate,用于将日志发送到该接收器以进行显示或存储。

*阅读这两个系统,选择一个对你喜欢的,将它们作为依赖项添加,然后修改kvs-server以在传递命令行参数之前初始化日志。将日志设置为输出到stderr*(将日志发送到其他地方也可以,但日志必须也发送到stderr以通过本项目的测试)。

在启动时日志记录服务器端的版本信息,还要记录配置信息。目前指的是IP地址和端口,以及存储引擎的名称。


因为第一次使用日志功能,还是先用简单的log入门,在Cargo.toml里引入logenv_logger(用于配置日志)。然后按要求输出日志即可:

kvs-server.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// ...
fn main() -> Result<()> {
env_logger::builder().filter_level(LevelFilter::Debug).init();
let mut args = Args::parse();
//...
run(args)
}

fn run(args: Args) -> Result<()> {
let engine = args.engine.unwrap_or(DEFAULT_ENGINE);
info!("kvs-server {}", env!("CARGO_PKG_VERSION"));
info!("Storage engine: {:?}", engine);
info!("Listening on {}", args.addr);
// ...
}
// ...

第3部分:客户端-服务端网络设置

接下来,我们将搭建网络模块。对于本项目,您将使用std::net中使用最基础的TCP/IP网络API:TcpListenerTcpStream

对于本项目,服务器是同步、单线程的。这意味着您将监听一个socket,接受一个连接,然后一次执行/响应一条命令。在未来,我们将在实现异步、多线程和高性能数据库的过程中,多次重新回味这一设定。

考虑一下您的手动测试工作流程。现在有两个可执行文件要处理,您需要一种同时运行它们的方法。您可能和我们一样,同时使用两个终端,一个运行cargo run --bin kvs-server ,服务端将一直运行到你按下CTRL-D;另一个运行cargo run --bin kvs-client

这是使用日志进行调试的好机会。继续记录有关每个已接受连接的信息。

在考虑协议之前,先修改kvs-server来监听和接受连接,在修改kvs-client来发起连接。


是时候把KvStore变成真正的KV引擎了,把为KvStore实现的getsetremove拿出来,用于为KvStore实现KvsEngine特性:

engines/kvs.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
//...
impl KvsEngine for KvStore {
//...
fn set(&mut self, key: String, value: String) -> Result<()> {
let cmd = Command::set(key, value);
let pos = self.writer.pos;
serde_json::to_writer(&mut self.writer, &cmd)?;
self.writer.flush()?;
if let Command::Set { key, .. } = cmd {
if let Some(old_cmd) = self
.index
.insert(key, (self.current_gen, pos..self.writer.pos).into())
{
self.uncompacted += old_cmd.len;
}
}

if self.uncompacted > COMPACTION_THRESHOLD {
self.compact()?;
}
Ok(())
}
//...
fn get(&mut self, key: String) -> Result<Option<String>> {
if let Some(cmd_pos) = self.index.get(&key) {
let reader = self
.readers
.get_mut(&cmd_pos.gen)
.expect("Cannot find log reader");
reader.seek(SeekFrom::Start(cmd_pos.pos))?;
let cmd_reader = reader.take(cmd_pos.len);
if let Command::Set { value, .. } = serde_json::from_reader(cmd_reader)? {
Ok(Some(value))
} else {
Err(KvsError::UnexpectedCommandType)
}
} else {
Ok(None)
}
}
//...
fn remove(&mut self, key: String) -> Result<()> {
if self.index.contains_key(&key) {
let cmd = Command::remove(key);
serde_json::to_writer(&mut self.writer, &cmd)?;
self.writer.flush()?;
if let Command::Remove { key } = cmd {
let old_cmd = self.index.remove(&key).expect("key not found");
self.uncompacted += old_cmd.len;
}
Ok(())
} else {
Err(KvsError::KeyNotFound)
}
}

}

在实现客户端-服务端之前,可以先定义用于序列化的枚举类型,作为客户端与服务端之间的通信协议(在后面编写服务端和客户端时,可以方便的将这些支持序列化的类型放在TcpStream上进行读写):

common.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
use serde::{Serialize, Deserialize};

#[derive(Debug, Serialize, Deserialize)]
pub enum Request {
Get { key: String },
Set { key: String, value: String },
Remove { key: String },
}

#[derive(Debug, Serialize, Deserialize)]
pub enum GetResponse {
Ok(Option<String>),
Err(String),
}

#[derive(Debug, Serialize, Deserialize)]
pub enum SetResponse {
Ok(()),
Err(String),
}

#[derive(Debug, Serialize, Deserialize)]
pub enum RemoveResponse {
Ok(()),
Err(String),
}

然后在server.rs中实现KvsServer

server.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
use std::{net::{ToSocketAddrs, TcpListener, TcpStream}, io::{BufReader, BufWriter, Write}};
use log::{error, debug};
use serde_json::Deserializer;
use crate::{KvsEngine, Result, common::{Request, GetResponse, SetResponse, RemoveResponse}};

/// The server of a key value store.
pub struct KvsServer<E: KvsEngine> {
engine: E,
}

impl<E: KvsEngine> KvsServer<E> {
/// Create a `KvsServer` with a given storage engine.
pub fn new(engine: E) -> Self {
KvsServer { engine }
}

/// Run the server listening on the given address
pub fn run(&mut self, addr: impl ToSocketAddrs) -> Result<()> {
let listener = TcpListener::bind(addr)?;
for stream in listener.incoming() {
match stream {
Ok(stream) => {
if let Err(e) = self.serve(stream) {
error!("Error on serving client: {}", e);
}
}
Err(e) => error!("Connection failed: {}", e),
}
}
Ok(())
}

fn serve(&mut self, tcp: TcpStream) -> Result<()> {
let peer_addr = tcp.peer_addr()?;
let reader = BufReader::new(&tcp);
let mut writer = BufWriter::new(&tcp);
let req_reader = Deserializer::from_reader(reader).into_iter::<Request>();

macro_rules! send_resp {
($resp:expr) => {{
let resp = $resp;
serde_json::to_writer(&mut writer, &resp)?;
writer.flush()?;
debug!("Response sent to {}: {:?}", peer_addr, resp);
};};
}

for req in req_reader {
let req = req?;
debug!("Receive request from {} {:?}", peer_addr, req);
match req {
Request::Get { key } => send_resp!(
match self.engine.get(key) {
Ok(value) => { GetResponse::Ok(value) },
Err(e) => { GetResponse::Err(format!("{}", e)) },
}
),
Request::Set { key, value } => send_resp!(
match self.engine.set(key, value) {
Ok(_) => SetResponse::Ok(()),
Err(e) => SetResponse::Err(format!("{}", e)),
}
),
Request::Remove { key } => send_resp!(
match self.engine.remove(key) {
Ok(_) => RemoveResponse::Ok(()),
Err(e) => RemoveResponse::Err(format!("{}", e)),
}
),
}
}

Ok(())
}
}

其中send_resp!宏的作用与下面的代码相同,编写这个宏可以减少重复:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 使用代码
Request::Get { key } => {
let resp = match self.engine.get(key) {
Ok(value) => { GetResponse::Ok(value) },
Err(e) => { GetResponse::Err(format!("{}", e)) },
};
serde_json::to_writer(writer, &resp);
writer.flush()?;
debug!("Response sent to {}: {:?}", peer_addr, resp);
},
// 使用宏
macro_rules! send_resp {
($resp:expr) => {{
let resp = $resp;
serde_json::to_writer(&mut writer, &resp)?;
writer.flush()?;
debug!("Response sent to {}: {:?}", peer_addr, resp);
};};
}

Request::Get { key } => send_resp!(
match self.engine.get(key) {
Ok(value) => { GetResponse::Ok(value) },
Err(e) => { GetResponse::Err(format!("{}", e)) },
}
),

kvs-server.rs中使用KvsServer,此时可以将日志等级调整为Debug以方便调试:

bin/kvs-server.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
use clap::Parser;
use kvs::{Result, KvsError, KvStore, KvsEngine, KvsServer};
use std::env::current_dir;
use std::fs;
use std::net::SocketAddr;
use std::process::exit;
use log::LevelFilter;
use log::{error, info, warn};
use std::str::FromStr;


const DEFAULT_LISTENING_ADDRESS: &str = "127.0.0.1:4000";
const DEFAULT_ENGINE: Engine = Engine::kvs;

#[allow(non_camel_case_types)]
#[derive(clap::ArgEnum, Copy, Clone, Debug, PartialEq, Eq)]
enum Engine {
kvs,
sled,
}

impl FromStr for Engine {
type Err = KvsError;

fn from_str(s: &str) -> Result<Self> {
match s {
"kvs" => {
Ok(Engine::kvs)
},
"sled" => {
Ok(Engine::sled)
}
_ => {
Err(KvsError::UnexpectedEngineType)
}
}
}
}

#[derive(Parser, Debug)]
#[clap(name = "kvs-server")]
#[clap(author, version, about="server of key value storage", long_about=None)]
struct Args {
/// [IPADDR:PORT] of server.
#[clap(long, default_value=DEFAULT_LISTENING_ADDRESS)]
addr: SocketAddr,

/// backend engine of key value storage.
#[clap(arg_enum, long, required(false))]
engine: Option<Engine>,
}


fn main() -> Result<()>{
env_logger::builder().filter_level(LevelFilter::Debug).init();
let mut args = Args::parse();

let res = current_engine().and_then(move |curr_engine| {
if args.engine.is_none() {
args.engine = curr_engine;
}
if curr_engine.is_some() && args.engine != curr_engine {
error!("Wrong engine!");
exit(1);
}
run(args)
});
if let Err(e) = res {
error!("{}", e);
exit(1);
}
Ok(())
}

fn run(args: Args) -> Result<()> {
let engine = args.engine.unwrap_or(DEFAULT_ENGINE);
info!("kvs-server {}", env!("CARGO_PKG_VERSION"));
info!("Storage engine: {:?}", engine);
info!("Listening on {}", args.addr);

// write engine to engine file
fs::write(current_dir()?.join("engine"), format!("{:?}", engine))?;

match engine {
Engine::kvs => run_with_engine(KvStore::open(current_dir()?)?, args.addr),
Engine::sled => Ok(()), //TODO
}
}

fn run_with_engine(engine: impl KvsEngine, addr: SocketAddr) -> Result<()> {
let mut server = KvsServer::new(engine);
server.run(addr)
}

fn current_engine() -> Result<Option<Engine>> {
let engine = current_dir()?.join("engine");
if !engine.exists() {
return Ok(None);
}

match fs::read_to_string(engine)?.parse() {
Ok(engine) => Ok(Some(engine)),
Err(e) => {
warn!("The content of engine file is invalid: {}", e);
Ok(None)
}
}
}

接下来在client.rs中实现KvsClient

client.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
use std::{io::{BufReader, BufWriter, Write}, net::{TcpStream, ToSocketAddrs}};
use crate::{Result, common::{Request, GetResponse, SetResponse, RemoveResponse}, KvsError};
use serde::Deserialize;
use serde_json::{Deserializer, de::IoRead};

/// Key value store client
pub struct KvsClient {
reader: Deserializer<IoRead<BufReader<TcpStream>>>,
writer: BufWriter<TcpStream>,
}

impl KvsClient {
/// Connect to `addr` to access `KvsServer`.
pub fn connect(addr: impl ToSocketAddrs) -> Result<Self> {
let tcp_reader = TcpStream::connect(addr)?;
let tcp_writer = tcp_reader.try_clone()?;
Ok(KvsClient{
reader: Deserializer::from_reader(BufReader::new(tcp_reader)),
writer: BufWriter::new(tcp_writer),
})
}

/// Get the value of a given key from the server.
pub fn get(&mut self, key: String) -> Result<Option<String>> {
serde_json::to_writer(&mut self.writer, &Request::Get { key })?;
self.writer.flush()?;
let resp = GetResponse::deserialize(&mut self.reader)?;
match resp {
GetResponse::Ok(value) => Ok(value),
GetResponse::Err(msg) => Err(KvsError::StringError(msg)),
}
}

/// Set the value of a string key in the server.
pub fn set(&mut self, key: String, value: String) -> Result<()> {
serde_json::to_writer(&mut self.writer, &Request::Set { key, value })?;
self.writer.flush()?;
let resp = SetResponse::deserialize(&mut self.reader)?;
match resp {
SetResponse::Ok(_) => Ok(()),
SetResponse::Err(msg) => Err(KvsError::StringError(msg)),
}
}

/// Remove a string key in the server.
pub fn remove(&mut self, key: String) -> Result<()> {
serde_json::to_writer(&mut self.writer, &Request::Remove { key })?;
self.writer.flush()?;
let resp = RemoveResponse::deserialize(&mut self.reader)?;
match resp {
RemoveResponse::Ok(_) => Ok(()),
RemoveResponse::Err(msg) => Err(KvsError::StringError(msg)),
}
}

}

 并在kvs-client.rs中使用:

bin/kvs-client.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
use clap::{Parser, Subcommand};
use kvs::{Result, KvsClient};
use std::net::SocketAddr;

const DEFAULT_LISTENING_ADDRESS: &str = "127.0.0.1:4000";

#[derive(Parser)]
#[clap(name = "kvs")]
#[clap(author, version, about="an in-memory key/value store", long_about=None, args_conflicts_with_subcommands(true))]
struct Cli {
#[clap(subcommand)]
command: Commands,
}

#[derive(Subcommand)]
#[allow(non_camel_case_types)]
enum Commands {
// Set the value of a string key to a string
#[clap(arg_required_else_help=true)]
set {
// A string key
key: String,
// The string value of the key
value: String,

#[clap(long, required(false), default_value(DEFAULT_LISTENING_ADDRESS), help="Set the server address as [IP:PORT]")]
addr: SocketAddr
},

// Get the string value of a given string key
#[clap(arg_required_else_help=true)]
get {
// A string key
key: String,

#[clap(long, required(false), default_value(DEFAULT_LISTENING_ADDRESS), help="Set the server address as [IP:PORT]")]
addr: SocketAddr
},

// Remove a given key
#[clap(arg_required_else_help=true)]
rm {
// A string key
key: String,

#[clap(long, required(false), default_value(DEFAULT_LISTENING_ADDRESS), help="Set the server address as [IP:PORT]")]
addr: SocketAddr
},
}

fn main() -> Result<()> {
let cli= Cli::parse();
match cli.command {
Commands::get { key, addr } => {
let mut client = KvsClient::connect(addr)?;
if let Some(value) = client.get(key.to_string())? {
println!("{}", value);
} else {
println!("Key not found");
}
},
Commands::set { key, value, addr } => {
let mut client = KvsClient::connect(addr)?;
client.set(key.to_string(), value.to_string())?;
}
Commands::rm { key, addr } => {
let mut client = KvsClient::connect(addr)?;
client.remove(key)?;
}
}

Ok(())
}

此时一个终端打开cargo run --bin kvs-server,另一个终端运行cargo run --bin kvs-client -- get key,就可以从网络上将“训练课程2”中存入的键: 值取出来了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# server
(base) ➜ kvs git:(master) ✗ cargo run --bin kvs-server
Compiling kvs v0.1.0 (/Users/zealot/RustProjects/pingcap-projects/kvs)
Finished dev [unoptimized + debuginfo] target(s) in 1.14s
Running `target/debug/kvs-server`
[2022-06-03T10:38:45Z INFO kvs_server] kvs-server 0.1.0
[2022-06-03T10:38:45Z INFO kvs_server] Storage engine: kvs
[2022-06-03T10:38:45Z INFO kvs_server] Listening on 127.0.0.1:4000
[2022-06-03T10:38:49Z DEBUG kvs::server] Receive request from 127.0.0.1:55017 Get { key: "键" }
[2022-06-03T10:38:49Z DEBUG kvs::server] Response sent to 127.0.0.1:55017: Ok(Some("值"))

# client
(base) ➜ kvs git:(master) ✗ cargo run --bin kvs-client -- get 键
Finished dev [unoptimized + debuginfo] target(s) in 0.08s
Running `target/debug/kvs-client get '键'`

第4部分:通过网络处理命令

在上一个项目中,您定义了数据库接受的命令,并学习了如何使用serde对日志进行序列化和反序列化。

现在是时候通过网络实现键/值存储,以通过前面实现的同步单进程模型远程执行命令。与您在上一个项目中创建日志的文件I/O一样,您将使用ReadWrite特征序列化和流式传输命令。

现在您需要设计一个网络协议。有多种方法可以在TCP流上数读写数据,与上一个项目一样,也需要做出许多选择。该协议是基于文本还是二进制?数据如何从内存中的类型格式转换为字节流格式?每个连接传输一个还是多个请求?

请记住,该协议应支持执行成功并取回结果,也支持执行时遇到的错误,现在有两种错误:由您的存储引擎产生的错误,以及由网络通信产生的错误。

协议的所有细节都由您决定。测试套件不关心数据如何从一端到达另一端,只关心结果是否正确。

实现您的网络协议。


前面我们已经一并实现了该协议,需要注意的是,只要进行适当的定义,序列化的所有类型转换都是自动完成的。

第5部分:可插拔存储引擎

您的数据库目前有一个由您实现的存储引擎KvStore。现在将添加第二个存储引擎。

这样做有多种原因:

  • 不同的业务负载需要不同的性能特征。对于特定的业务负载,某些存储引擎可能比其他存储引擎工作得更好;
  • 搭建了一个熟悉的框架来比较不同的后端;
  • 给了我们一个创建和使用trait的理由;
  • 给了我们一个编写比较基准的环境。

所以你要从KvStore接口中抽象出一个新的特征:KvsEngine。这是一个典型的重构,将现有代码逐步修改为新的形式。一般在重构时,我们希望将工作最小化分解,以便进行持续构建和修改。

这是您最终需要的API:

  • KvsEnginetrait具有与KvStore具有相同签名的getsetremove方法。
  • KvStore实现了KvsEngine,不再有自己的getsetremove方法。
  • KvsEngine有一个新的实现——SledKvsEngine。稍后您需要使用sled库填充其getset方法。

如果您的测试套件通过编译,则可能已经了编写了这些定义的签名,现在是时候实现定义的内容了。将重构分解为渐进式修改,并确保项目能够持续构建,并通过先前已经通过的测试,然后再继续下一步重构。

作为最后一步,您需要考虑当kvs-server使用一个引擎启动,然后进程被终结,接下来再使用不同的引擎重新启动时会发生什么。这种情况应当产生错误,你需要想清楚如何检测这种情况才能报告错误。测试cli_wrong_engine反映了这种情况。


本项目一开始,为了通过测试,我们已经实现了KvsEngine的签名,又在第3部分提前将KvStore的几个方法移动至KvsEngine的实现中。而在本项目需求说明部分,为了实现引擎切换报错,我们已经实现了启动时检测engine文件内容的功能:

  • 若未检测到engine文件,则将本次使用的引擎写入engine文件(不指定时默认使用kvs)。
  • 若检测到engine文件,当文件记录引擎与当前指定引擎不一致时,报错;一致时则继续执行。

engine/sled.rs中为sled实现KvsEngine

engine/sled.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
use sled::{Db, Tree};
use crate::{KvsEngine, Result, KvsError};

/// Wrapper of `sled::Db`
pub struct SledKvStore {
db: Db,
}

impl SledKvStore {
/// Creates a `SledKvsEngine` from `sled::Db`.
pub fn new(db: Db) -> Self {
Self { db }
}
}

impl KvsEngine for SledKvStore {
fn get(&mut self, key: String) -> Result<Option<String>> {
let tree: &Tree = &self.db;
Ok(
tree.get(key)?
.map(|iv| AsRef::<[u8]>::as_ref(&iv).to_vec())
.map(String::from_utf8)
.transpose()?
)
}

fn set(&mut self, key: String, value: String) -> Result<()> {
let tree: &Tree = &self.db;
tree.insert(key, value.into_bytes()).map(|_| ())?;
tree.flush()?;
Ok(())
}

fn remove(&mut self, key: String) -> Result<()> {
let tree: &Tree = &self.db;
tree.remove(key)?.ok_or(KvsError::KeyNotFound)?;
tree.flush()?;
Ok(())
}
}

然后在kvs-server.rs中调用SledKvStore,以补全最后一个//TODO

bin/kvs-server.rs
1
2
3
4
5
6
// ...
match engine {
Engine::kvs => run_with_engine(KvStore::open(current_dir()?)?, args.addr),
Engine::sled => run_with_engine(SledKvStore::new(sled::open(current_dir()?)?), args.addr),
}
//...

第6部分:基准测试

随着课程的进展,我们将越来越关注数据库的性能,探索不同架构对性能产生的影响。我们希望您不要局限于本文所展示的内容,多尝试自己的优化。

性能比对需要基准测试,目前有很多方法可以对数据库进行基准测试:例如使用ycsbsysbench等标准测试套件。Rust内置了一些基准测试工具,我们将从此处入手。

Cargo支持使用cargo bench进行基准测试。基准测试可以使用Rust内置的基准测试工具编写,也可以使用外部工具编写。

为函数添加#[bench]属性即可启用内置基准测试套件。不过这一功能并不在Rust stable channel中提供,仅在the unstable booktest crate docs中有简略描述。尽管它在整个Rust生态中已被广泛使用—这些crates即使使用stable编译,也会用nightly进行基准测试。

尽管该系统实际上已被弃用—没有更新,且似乎永远不会被提升到stable channel。

不过,Rust也有更好的基准测试工具。本项目将使用criterion来对比kvs引擎与sled引擎的性能。

这这些基准测试工具的工作原理是定义一个基准测试函数,然后在该函数内循环迭代要执行的基准测试操作。基准测试工具将根据需要进行多次迭代,以获取具有统计意义的操作持续时间。

这是criterion指南中的这个基础示例:

1
2
3
4
5
6
7
fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("fib 20", |b| {
b.iter(|| {
fibonacci(20)
});
});
}

调用bench_function定义基准,调用iter定义为基准需要测试的代码。调用iter之前和之后的代码不计入基准测试时间。

通过创建一个名为benches/engine_benches.rs的文件来准备编写基准测试。同tests/tests.rs一样,cargo会自动找到这个文件并将其编译为基准测试。

首先编写以下基准测试:

  • kvs_write—使用kvs引擎,使用长度为1-100000字节的随机键 和 长度为1-100000字节的随机值 写入100次;
  • sled_write—使用sled引擎,使用长度为1-100000字节的随机键 和 长度为1-100000字节的随机值 写入100次;
  • kvs_read—使用kvs引擎,从先前写入的键中读取1000个值,键和值的长度是随机的;
  • sled_read—使用sled引擎,从先前写入的键中读取1000个值,键和值的长度是随机的;

(除了像上面描述的这样编写4个基准测试,您也可以用这2个引擎名作为参数编写参数化的基准测试,参见criterion手册)。

对于实现这些基准测试的技术细节,我们至少需要考虑三个因素:

  • 哪些代码应该计时(写在基准测试循环内),哪些代码不应该计时(写在基准测试循环之外)?
  • 尽管使用“随机”数字,如何使每次迭代执行相同操作。
  • 在”read”的基准测试中,如何使用之前写入的同一组“随机”键读取值。

这些都是相互关联的:用作基准测试环境设置的代码应为非计时代码,此外还需要适当地重用随机数种子以复现测试环境。

在各种情况下,可能返回错误的操作都应该断言(使用assert!)它们没有返回错误;在”read”测试中,”get”操作应该断言找到了该键的值。

随机数可以使用randcrate生成。

编写完基准测试函数,就使用cargo bench运行它们。

完成上面的基准测试,对比kvssled的性能差距。

注意:请在没什么负载的主机上运行基准测试。基准测试对其运行​​环境非常敏感,虽然criterion库会尽力修正这些“噪音”,但最好在没有其他活动进程的干净机器上进行基准测试。如果您有一台仅用于开发的备用机器,请使用它。如果没有,AWS或其他云实例可能会产生比本地桌面更一致的结果。


Cargo.toml中声明基准测试:

Cargo.toml
1
2
3
[[bench]]
name = "engine_benches"
harness = false

engine_benches.rs中编写基准测试:

benches/engine_benches.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
use criterion::{Criterion, BatchSize, criterion_group, criterion_main};
use kvs::{KvStore, KvsEngine, SledKvStore};
use rand::{rngs::SmallRng, SeedableRng, Rng};
use sled;
use tempfile::TempDir;

fn set_benches(c: &mut Criterion) {
let mut group = c.benchmark_group("set_benches");
group.bench_function("kvs", |b| {
b.iter_batched(
|| {
let temp_dir = TempDir::new().unwrap();
(KvStore::open(temp_dir.path()).unwrap(), temp_dir)
},
|(mut store, _temp_dir)| {
for i in 1..(1 << 12) {
store.set(format!("key-{}", i), format!("value-{}", i)).unwrap();
}
},
BatchSize::SmallInput
);
});
group.bench_function("sled", |b| {
b.iter_batched(
|| {
let temp_dir = TempDir::new().unwrap();
(SledKvStore::new(sled::open(&temp_dir).unwrap()), temp_dir)
},
|(mut store, _temp_dir)| {
for i in 1..(1 << 12) {
store.set(format!("key-{}", i), format!("value-{}", i)).unwrap();
}
},
BatchSize::SmallInput
);
});
}

fn get_benches(c: &mut Criterion) {
let mut group = c.benchmark_group("get_benches");
for i in &vec![8, 12, 16, 20] {
group.bench_with_input(format!("kvs_{}", i), i, |b, i| {
let temp_dir = TempDir::new().unwrap();
let mut store = KvStore::open(temp_dir.path()).unwrap();
for key_i in 1..(1 << i) {
store.set(format!("key-{}", key_i), format!("value-{}", key_i)).unwrap();
}
let mut rng = SmallRng::from_seed([0; 16]);
b.iter(|| {
store.get(format!("key-{}", rng.gen_range(1, 1 << i))).unwrap();
});
});
}
for i in &vec![8, 12, 16, 20] {
group.bench_with_input(format!("sled_{}", i), i, |b, i| {
let temp_dir = TempDir::new().unwrap();
let mut store = SledKvStore::new(sled::open(&temp_dir).unwrap());
for key_i in 1..(1 << i) {
store.set(format!("key-{}", key_i), format!("value-{}", key_i)).unwrap();
}
let mut rng = SmallRng::from_seed([0; 16]);
b.iter(|| {
store.get(format!("key-{}", rng.gen_range(1, 1 << i))).unwrap();
});
});
}

}

criterion_group!(benches, set_benches, get_benches);
criterion_main!(benches);

干得漂亮,朋友。休息一下吧。

评论

Your browser is out-of-date!

Update your browser to view this website correctly.&npsb;Update my browser now

×