... running 17 tests test cli_invalid_subcommand ... ok test cli_get_non_existent_key ... FAILED test cli_rm_non_existent_key ... FAILED test cli_get_stored ... FAILED test cli_no_args ... ok test get_non_existent_value ... FAILED test get_stored_value ... FAILED test compaction ... FAILED test cli_invalid_get ... ok test overwrite_value ... FAILED test remove_non_existent_key ... FAILED test cli_version ... ok test remove_key ... FAILED test cli_invalid_rm ... ok test cli_set ... FAILED test cli_rm_stored ... FAILED test cli_invalid_set ... ok ...
/// custom error type of kvs #[derive(Fail, Debug)] pubenumKvsError { /// Key not found in kvs index #[fail(display="Key `{}` not found", _0)] KeyNotFound(String), }
/// simplify `Result<T, KvsError>` to `Result<T>` pubtypeResult<T> = std::result::Result<T, KvsError>;
use std::collections::{BTreeMap, HashMap}; use std::fs::{self, File, OpenOptions}; use std::io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write}; use std::ops::Range; use std::path::{Path, PathBuf};
use serde::{Deserialize, Serialize}; use serde_json::Deserializer;
use crate::{KvsError, Result}; use std::ffi::OsStr;
const COMPACTION_THRESHOLD: u64 = 1024 * 1024;
/// The `KvStore` stores string key/value pairs. /// /// Key/value pairs are persisted to disk in log files. Log files are named after /// monotonically increasing generation numbers with a `log` extension name. /// A `BTreeMap` in memory stores the keys and the value locations for fast query. /// /// ```rust /// # use kvs::{KvStore, Result}; /// # fn try_main() -> Result<()> { /// use std::env::current_dir; /// let mut store = KvStore::open(current_dir()?)?; /// store.set("key".to_owned(), "value".to_owned())?; /// let val = store.get("key".to_owned())?; /// assert_eq!(val, Some("value".to_owned())); /// # Ok(()) /// # } /// ``` pubstructKvStore { // directory for the log and other data. path: PathBuf, // map generation number to the file reader. readers: HashMap<u64, BufReaderWithPos<File>>, // writer of the current log. writer: BufWriterWithPos<File>, current_gen: u64, index: BTreeMap<String, CommandPos>, // the number of bytes representing "stale" commands that could be // deleted during a compaction. uncompacted: u64, }
impl KvStore { /// Opens a `KvStore` with the given path. /// /// This will create a new directory if the given one does not exist. /// /// # Errors /// /// It propagates I/O or deserialization errors during the log replay. pubfnopen(path: implInto<PathBuf>) -> Result<KvStore> { let path = path.into(); fs::create_dir_all(&path)?;
letmut readers = HashMap::new(); letmut index = BTreeMap::new();
let gen_list = sorted_gen_list(&path)?; letmut uncompacted = 0;
for &gen in &gen_list { letmut reader = BufReaderWithPos::new(File::open(log_path(&path, gen))?)?; uncompacted += load(gen, &mut reader, &mut index)?; readers.insert(gen, reader); }
let current_gen = gen_list.last().unwrap_or(&0) + 1; let writer = new_log_file(&path, current_gen, &mut readers)?;
/// Sets the value of a string key to a string. /// /// If the key already exists, the previous value will be overwritten. /// /// # Errors /// /// It propagates I/O or serialization errors during writing the log. pubfnset(&mutself, key: String, value: String) -> Result<()> { let cmd = Command::set(key, value); let pos = self.writer.pos; serde_json::to_writer(&mutself.writer, &cmd)?; self.writer.flush()?; iflet Command::Set { key, .. } = cmd { ifletSome(old_cmd) = self .index .insert(key, (self.current_gen, pos..self.writer.pos).into()) { self.uncompacted += old_cmd.len; } }
/// Gets the string value of a given string key. /// /// Returns `None` if the given key does not exist. /// /// # Errors /// /// It returns `KvsError::UnexpectedCommandType` if the given command type unexpected. pubfnget(&mutself, key: String) -> Result<Option<String>> { ifletSome(cmd_pos) = self.index.get(&key) { let reader = self .readers .get_mut(&cmd_pos.gen) .expect("Cannot find log reader"); reader.seek(SeekFrom::Start(cmd_pos.pos))?; let cmd_reader = reader.take(cmd_pos.len); iflet Command::Set { value, .. } = serde_json::from_reader(cmd_reader)? { Ok(Some(value)) } else { Err(KvsError::UnexpectedCommandType) } } else { Ok(None) } }
/// Removes a given key. /// /// # Errors /// /// It returns `KvsError::KeyNotFound` if the given key is not found. /// /// It propagates I/O or serialization errors during writing the log. pubfnremove(&mutself, key: String) -> Result<()> { ifself.index.contains_key(&key) { let cmd = Command::remove(key); serde_json::to_writer(&mutself.writer, &cmd)?; self.writer.flush()?; iflet Command::Remove { key } = cmd { let old_cmd = self.index.remove(&key).expect("key not found"); self.uncompacted += old_cmd.len; } Ok(()) } else { Err(KvsError::KeyNotFound) } }
/// Clears stale entries in the log. pubfncompact(&mutself) -> Result<()> { // increase current gen by 2. current_gen + 1 is for the compaction file. let compaction_gen = self.current_gen + 1; self.current_gen += 2; self.writer = self.new_log_file(self.current_gen)?;
letmut new_pos = 0; // pos in the new log file. for cmd_pos in &mutself.index.values_mut() { let reader = self .readers .get_mut(&cmd_pos.gen) .expect("Cannot find log reader"); if reader.pos != cmd_pos.pos { reader.seek(SeekFrom::Start(cmd_pos.pos))?; }
// remove stale log files. let stale_gens: Vec<_> = self .readers .keys() .filter(|&&gen| gen < compaction_gen) .cloned() .collect(); for stale_gen in stale_gens { self.readers.remove(&stale_gen); fs::remove_file(log_path(&self.path, stale_gen))?; } self.uncompacted = 0;
Ok(()) }
/// Create a new log file with given generation number and add the reader to the readers map. /// /// Returns the writer to the log. fnnew_log_file(&mutself, gen: u64) -> Result<BufWriterWithPos<File>> { new_log_file(&self.path, gen, &mutself.readers) } }
/// Create a new log file with given generation number and add the reader to the readers map. /// /// Returns the writer to the log. fnnew_log_file( path: &Path, gen: u64, readers: &mut HashMap<u64, BufReaderWithPos<File>>, ) -> Result<BufWriterWithPos<File>> { let path = log_path(&path, gen); let writer = BufWriterWithPos::new( OpenOptions::new() .create(true) .write(true) .append(true) .open(&path)?, )?; readers.insert(gen, BufReaderWithPos::new(File::open(&path)?)?); Ok(writer) }
/// Load the whole log file and store value locations in the index map. /// /// Returns how many bytes can be saved after a compaction. fnload( gen: u64, reader: &mut BufReaderWithPos<File>, index: &mut BTreeMap<String, CommandPos>, ) -> Result<u64> { // To make sure we read from the beginning of the file. letmut pos = reader.seek(SeekFrom::Start(0))?; letmut stream = Deserializer::from_reader(reader).into_iter::<Command>(); letmut uncompacted = 0; // number of bytes that can be saved after a compaction. whileletSome(cmd) = stream.next() { let new_pos = stream.byte_offset() asu64; match cmd? { Command::Set { key, .. } => { ifletSome(old_cmd) = index.insert(key, (gen, pos..new_pos).into()) { uncompacted += old_cmd.len; } } Command::Remove { key } => { ifletSome(old_cmd) = index.remove(&key) { uncompacted += old_cmd.len; } // the "remove" command itself can be deleted in the next compaction. // so we add its length to `uncompacted`. uncompacted += new_pos - pos; } } pos = new_pos; } Ok(uncompacted) }