cargo test --test cli ... running 11 tests test cli_log_configuration ... FAILED test cli_access_server_kvs_engine ... FAILED test cli_wrong_engine ... FAILED test client_cli_invalid_get ... FAILED test client_cli_invalid_rm ... FAILED test client_cli_invalid_set ... FAILED test cli_access_server_sled_engine ... FAILED test client_cli_invalid_subcommand ... FAILED test client_cli_version ... FAILED test server_cli_version ... FAILED test client_cli_no_args ... FAILED ...
kv_store.rs中的均通过测试:
1 2 3 4 5 6 7 8
cargo test --test kv_store running 6 tests test remove_non_existent_key ... ok test remove_key ... ok test get_stored_value ... ok test get_non_existent_value ... ok test overwrite_value ... ok test compaction ... ok
fncli() -> Command<'static> { Command::new("kvs-client") .about("client of key value storage") .version(crate_version!()) .subcommand_required(true) .disable_help_subcommand(true) .args_conflicts_with_subcommands(true) .subcommand( Command::new("get") .about("Get the string value of a given string key") .args(&[ arg!(<KEY> "A string key"), arg!(--addr <"IP:PORT"> "Set the server address as IP:PORT").required(false).default_value(DEFAULT_LISTENING_ADDRESS) ]) ) .subcommand( Command::new("set") .about("Set the value of a string key to a string") .args(&[ arg!(<KEY> "A string key"), arg!(<VALUE> "The string value of the key").required(true), arg!(--addr <"IP:PORT"> "Set the server address as IP:PORT").required(false).default_value(DEFAULT_LISTENING_ADDRESS) ]) ) .subcommand( Command::new("rm") .about("Remove a given string key") .args(&[ arg!(<KEY> "A string key"), arg!(--addr <"IP:PORT"> "Set the server address as IP:PORT").required(false).default_value(DEFAULT_LISTENING_ADDRESS) ]) ) }
fnmain() -> Result<()>{ let matches = cli().get_matches();
match matches.subcommand() { Some(("get", sub_matches)) => { println!("get {} {}", sub_matches.value_of("KEY").unwrap(), sub_matches.value_of("addr").unwrap()); let key = sub_matches.value_of("KEY").unwrap(); let addr: SocketAddr = sub_matches.value_of("addr").unwrap().parse()?; }, Some(("set", sub_matches)) => { println!("set {} {} {}", sub_matches.value_of("KEY").unwrap(), sub_matches.value_of("VALUE").unwrap(), sub_matches.value_of("addr").unwrap()); let key = sub_matches.value_of("KEY").unwrap(); let value = sub_matches.value_of("VALUE").unwrap(); let addr: SocketAddr = sub_matches.value_of("addr").unwrap().parse()?; } Some(("rm", sub_matches)) => { println!("rm {} {}", sub_matches.value_of("KEY").unwrap(), sub_matches.value_of("addr").unwrap()); let key = sub_matches.value_of("KEY").unwrap(); let addr: SocketAddr = sub_matches.value_of("addr").unwrap().parse()?; },
_ => unreachable!(), // If all subcommands are defined above, anything else is unreachabe!()
}
Ok(()) }
// derive模式
use clap::{Parser, Subcommand}; use kvs::{KvStore, Result}; use std::process::exit; use std::env::current_dir; use std::net::SocketAddr;
#[derive(Subcommand)] enumCommands { // Set the value of a string key to a string #[clap(arg_required_else_help = true)] set { // A string key key: String, // The string value of the key value: String, #[clap(long, required(false), default_value(DEFAULT_LISTENING_ADDRESS), help="Set the server address as [IP:PORT]")] addr: SocketAddr },
// Get the string value of a given string key #[clap(arg_required_else_help = true)] get { // A string key key: String, #[clap(long, required(false), default_value(DEFAULT_LISTENING_ADDRESS), help="Set the server address as [IP:PORT]")] addr: SocketAddr },
// Remove a given key #[clap(arg_required_else_help = true)] rm { // A string key key: String, #[clap(long, required(false), default_value(DEFAULT_LISTENING_ADDRESS), help="Set the server address as [IP:PORT]")] addr: SocketAddr }, }
use std::{net::{ToSocketAddrs, TcpListener, TcpStream}, io::{BufReader, BufWriter, Write}}; use log::{error, debug}; use serde_json::Deserializer; use crate::{KvsEngine, Result, common::{Request, GetResponse, SetResponse, RemoveResponse}};
/// The server of a key value store. pubstructKvsServer<E: KvsEngine> { engine: E, }
impl<E: KvsEngine> KvsServer<E> { /// Create a `KvsServer` with a given storage engine. pubfnnew(engine: E) -> Self { KvsServer { engine } } /// Run the server listening on the given address pubfnrun(&mutself, addr: impl ToSocketAddrs) -> Result<()> { let listener = TcpListener::bind(addr)?; for stream in listener.incoming() { match stream { Ok(stream) => { ifletErr(e) = self.serve(stream) { error!("Error on serving client: {}", e); } } Err(e) => error!("Connection failed: {}", e), } } Ok(()) }
fnserve(&mutself, tcp: TcpStream) -> Result<()> { let peer_addr = tcp.peer_addr()?; let reader = BufReader::new(&tcp); letmut writer = BufWriter::new(&tcp); let req_reader = Deserializer::from_reader(reader).into_iter::<Request>();
macro_rules! send_resp { ($resp:expr) => {{ let resp = $resp; serde_json::to_writer(&mut writer, &resp)?; writer.flush()?; debug!("Response sent to {}: {:?}", peer_addr, resp); };}; }
use clap::Parser; use kvs::{Result, KvsError, KvStore, KvsEngine, KvsServer}; use std::env::current_dir; use std::fs; use std::net::SocketAddr; use std::process::exit; use log::LevelFilter; use log::{error, info, warn}; use std::str::FromStr;
use std::{io::{BufReader, BufWriter, Write}, net::{TcpStream, ToSocketAddrs}}; use crate::{Result, common::{Request, GetResponse, SetResponse, RemoveResponse}, KvsError}; use serde::Deserialize; use serde_json::{Deserializer, de::IoRead};
/// Key value store client pubstructKvsClient { reader: Deserializer<IoRead<BufReader<TcpStream>>>, writer: BufWriter<TcpStream>, }
impl KvsClient { /// Connect to `addr` to access `KvsServer`. pubfnconnect(addr: impl ToSocketAddrs) -> Result<Self> { let tcp_reader = TcpStream::connect(addr)?; let tcp_writer = tcp_reader.try_clone()?; Ok(KvsClient{ reader: Deserializer::from_reader(BufReader::new(tcp_reader)), writer: BufWriter::new(tcp_writer), }) }
/// Get the value of a given key from the server. pubfnget(&mutself, key: String) -> Result<Option<String>> { serde_json::to_writer(&mutself.writer, &Request::Get { key })?; self.writer.flush()?; let resp = GetResponse::deserialize(&mutself.reader)?; match resp { GetResponse::Ok(value) => Ok(value), GetResponse::Err(msg) => Err(KvsError::StringError(msg)), } }
/// Set the value of a string key in the server. pubfnset(&mutself, key: String, value: String) -> Result<()> { serde_json::to_writer(&mutself.writer, &Request::Set { key, value })?; self.writer.flush()?; let resp = SetResponse::deserialize(&mutself.reader)?; match resp { SetResponse::Ok(_) => Ok(()), SetResponse::Err(msg) => Err(KvsError::StringError(msg)), } }
/// Remove a string key in the server. pubfnremove(&mutself, key: String) -> Result<()> { serde_json::to_writer(&mutself.writer, &Request::Remove { key })?; self.writer.flush()?; let resp = RemoveResponse::deserialize(&mutself.reader)?; match resp { RemoveResponse::Ok(_) => Ok(()), RemoveResponse::Err(msg) => Err(KvsError::StringError(msg)), } } }
#[derive(Subcommand)] #[allow(non_camel_case_types)] enumCommands { // Set the value of a string key to a string #[clap(arg_required_else_help=true)] set { // A string key key: String, // The string value of the key value: String,
#[clap(long, required(false), default_value(DEFAULT_LISTENING_ADDRESS), help="Set the server address as [IP:PORT]")] addr: SocketAddr },
// Get the string value of a given string key #[clap(arg_required_else_help=true)] get { // A string key key: String, #[clap(long, required(false), default_value(DEFAULT_LISTENING_ADDRESS), help="Set the server address as [IP:PORT]")] addr: SocketAddr },
// Remove a given key #[clap(arg_required_else_help=true)] rm { // A string key key: String, #[clap(long, required(false), default_value(DEFAULT_LISTENING_ADDRESS), help="Set the server address as [IP:PORT]")] addr: SocketAddr }, }
use criterion::{Criterion, BatchSize, criterion_group, criterion_main}; use kvs::{KvStore, KvsEngine, SledKvStore}; use rand::{rngs::SmallRng, SeedableRng, Rng}; use sled; use tempfile::TempDir;