Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/backend/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,16 @@ pub enum WriteFlags {
APPEND,
APPEND_DUP,
}

/// Strategy to use when corrupted data is detected while opening a database.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum RecoveryStrategy {
/// Bubble up the error on detecting a corrupted data file. The default.
Error,

/// Discard the corrupted data and start with an empty database.
Discard,

/// Move the corrupted data file to `$file.corrupt` and start with an empty database.
Rename,
}
4 changes: 3 additions & 1 deletion src/backend/impl_lmdb/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use super::{
DatabaseFlagsImpl, DatabaseImpl, EnvironmentFlagsImpl, ErrorImpl, InfoImpl, RoTransactionImpl,
RwTransactionImpl, StatImpl,
};
use crate::backend::common::RecoveryStrategy;
use crate::backend::traits::{
BackendEnvironment, BackendEnvironmentBuilder, BackendInfo, BackendIter, BackendRoCursor,
BackendRoCursorTransaction, BackendStat,
Expand Down Expand Up @@ -86,7 +87,8 @@ impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
self
}

fn set_discard_if_corrupted(&mut self, _discard_if_corrupted: bool) -> &mut Self {
/// **UNIMPLEMENTED.** Will panic at runtime.
fn set_corruption_recovery_strategy(&mut self, _strategy: RecoveryStrategy) -> &mut Self {
// Unfortunately, when opening a database, LMDB doesn't handle all the ways it could have
// been corrupted. Prefer using the `SafeMode` backend if this is important.
unimplemented!();
Expand Down
46 changes: 32 additions & 14 deletions src/backend/impl_safe/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ use super::{
database::Database, DatabaseFlagsImpl, DatabaseImpl, EnvironmentFlagsImpl, ErrorImpl, InfoImpl,
RoTransactionImpl, RwTransactionImpl, StatImpl,
};
use crate::backend::common::RecoveryStrategy;
use crate::backend::traits::{BackendEnvironment, BackendEnvironmentBuilder};

const DEFAULT_DB_FILENAME: &str = "data.safe.bin";
const DEFAULT_CORRUPT_DB_EXTENSION: &str = "bin.corrupt";

type DatabaseArena = Arena<Database>;
type DatabaseNameMap = HashMap<Option<String>, DatabaseImpl>;
Expand All @@ -38,7 +40,7 @@ pub struct EnvironmentBuilderImpl {
max_dbs: Option<usize>,
map_size: Option<usize>,
make_dir_if_needed: bool,
discard_if_corrupted: bool,
corruption_recovery_strategy: RecoveryStrategy,
}

impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
Expand All @@ -53,7 +55,7 @@ impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
max_dbs: None,
map_size: None,
make_dir_if_needed: false,
discard_if_corrupted: false,
corruption_recovery_strategy: RecoveryStrategy::Error,
}
}

Expand Down Expand Up @@ -85,8 +87,8 @@ impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
self
}

fn set_discard_if_corrupted(&mut self, discard_if_corrupted: bool) -> &mut Self {
self.discard_if_corrupted = discard_if_corrupted;
fn set_corruption_recovery_strategy(&mut self, strategy: RecoveryStrategy) -> &mut Self {
self.corruption_recovery_strategy = strategy;
self
}

Expand All @@ -106,7 +108,7 @@ impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
self.max_dbs,
self.map_size,
)?;
env.read_from_disk(self.discard_if_corrupted)?;
env.read_from_disk(self.corruption_recovery_strategy)?;
Ok(env)
}
}
Expand Down Expand Up @@ -152,16 +154,32 @@ impl EnvironmentImpl {
Ok(bincode::serialize(&data)?)
}

fn deserialize(
bytes: &[u8],
discard_if_corrupted: bool,
fn load(
path: &Path,
strategy: RecoveryStrategy,
) -> Result<(DatabaseArena, DatabaseNameMap), ErrorImpl> {
let bytes = fs::read(path)?;

match Self::deserialize(&bytes) {
Ok((arena, name_map)) => Ok((arena, name_map)),
Err(err) => match strategy {
RecoveryStrategy::Error => Err(err),
RecoveryStrategy::Discard => Ok((DatabaseArena::new(), HashMap::new())),
RecoveryStrategy::Rename => {
let corrupted_path = path.with_extension(DEFAULT_CORRUPT_DB_EXTENSION);
fs::rename(path, corrupted_path)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the file already exists because of the previous corruption? Can we have a timestamp on the filename to prevent collision?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the assumption was that one deals with the $db.corrupt file soon, before hitting another problem. But I can see that that's not always possible.
If we use timestamps rkv-users need to make sure to actually clean up to avoid piling up old corrupted files.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By using Rename I'd expect the user to deal with the files, indeed. I wonder what they can really do with it if rkv can't do anything, though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For debugging purporses someone could manually explore that file and see if they are able to detect why the file got corrupted.

I thought about the .corrupt vs .$timestamp option again and I would say we should stay with .corrupt for now:

  1. It's what's been discussed a while back
  2. It avoids piling up files over files
  3. The logic to find it is kept simple
  4. We still have the option in the future to change it. Let's see if it's even gonna be adopted inside m-c


Ok((DatabaseArena::new(), HashMap::new()))
}
},
}
}

fn deserialize(bytes: &[u8]) -> Result<(DatabaseArena, DatabaseNameMap), ErrorImpl> {
let mut arena = DatabaseArena::new();
let mut name_map = HashMap::new();
let data: HashMap<_, _> = match bincode::deserialize(bytes) {
Err(_) if discard_if_corrupted => Ok(HashMap::new()),
result => result,
}?;
let data: HashMap<_, _> = bincode::deserialize(bytes)?;

for (name, db) in data {
name_map.insert(name, DatabaseImpl(arena.alloc(db)));
}
Expand Down Expand Up @@ -199,15 +217,15 @@ impl EnvironmentImpl {
})
}

pub(crate) fn read_from_disk(&mut self, discard_if_corrupted: bool) -> Result<(), ErrorImpl> {
pub(crate) fn read_from_disk(&mut self, strategy: RecoveryStrategy) -> Result<(), ErrorImpl> {
let mut path = Cow::from(&self.path);
if fs::metadata(&path)?.is_dir() {
path.to_mut().push(DEFAULT_DB_FILENAME);
};
if fs::metadata(&path).is_err() {
return Ok(());
};
let (arena, name_map) = Self::deserialize(&fs::read(&path)?, discard_if_corrupted)?;
let (arena, name_map) = Self::load(&path, strategy)?;
self.dbs = RwLock::new(EnvironmentDbs { arena, name_map });
Ok(())
}
Expand Down
5 changes: 3 additions & 2 deletions src/backend/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::{
};

use crate::{
backend::common::{DatabaseFlags, EnvironmentFlags, WriteFlags},
backend::common::{DatabaseFlags, EnvironmentFlags, RecoveryStrategy, WriteFlags},
error::StoreError,
};

Expand Down Expand Up @@ -83,7 +83,8 @@ pub trait BackendEnvironmentBuilder<'b>: Debug + Eq + PartialEq + Copy + Clone {

fn set_make_dir_if_needed(&mut self, make_dir_if_needed: bool) -> &mut Self;

fn set_discard_if_corrupted(&mut self, discard_if_corrupted: bool) -> &mut Self;
/// Set the corruption recovery strategy. See [`RecoveryStrategy`] for details.
fn set_corruption_recovery_strategy(&mut self, strategy: RecoveryStrategy) -> &mut Self;

fn open(&self, path: &Path) -> Result<Self::Environment, Self::Error>;
}
Expand Down
95 changes: 93 additions & 2 deletions tests/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tempfile::Builder;
#[cfg(feature = "lmdb")]
use rkv::backend::{Lmdb, LmdbEnvironment};
use rkv::{
backend::{BackendEnvironmentBuilder, SafeMode, SafeModeEnvironment},
backend::{BackendEnvironmentBuilder, RecoveryStrategy, SafeMode, SafeModeEnvironment},
CloseOptions, Rkv, StoreOptions, Value,
};

Expand Down Expand Up @@ -247,7 +247,7 @@ fn test_safe_mode_corrupt_while_open_1() {

// But we can use a builder and pass `discard_if_corrupted` to deal with it.
let mut builder = Rkv::environment_builder::<SafeMode>();
builder.set_discard_if_corrupted(true);
builder.set_corruption_recovery_strategy(RecoveryStrategy::Discard);
manager
.get_or_create_from_builder(root.path(), builder, Rkv::from_builder::<SafeMode>)
.expect("created");
Expand Down Expand Up @@ -378,3 +378,94 @@ fn test_safe_mode_corrupt_while_open_2() {
Some(Value::Str("byé, yöu"))
);
}

/// Test how the manager can discard corrupted databases, while moving the corrupted one aside for
/// later inspection.
#[test]
fn test_safe_mode_corrupt_while_open_3() {
type Manager = rkv::Manager<SafeModeEnvironment>;

let root = Builder::new()
.prefix("test_safe_mode_corrupt_while_open_3")
.tempdir()
.expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");

let mut safebin = root.path().to_path_buf();
safebin.push("data.safe.bin");

// Oops, corruption.
fs::write(&safebin, "bogus").expect("dbfile corrupted");
assert!(safebin.exists(), "Corrupted database file was written to");

// Create environment.
let mut manager = Manager::singleton().write().unwrap();

// Recreating environment fails.
manager
.get_or_create(root.path(), Rkv::new::<SafeMode>)
.expect_err("not created");
assert!(manager.get(root.path()).expect("success").is_none());

// But we can use a builder and pass `RecoveryStrategy::Rename` to deal with it.
let mut builder = Rkv::environment_builder::<SafeMode>();
builder.set_corruption_recovery_strategy(RecoveryStrategy::Rename);
manager
.get_or_create_from_builder(root.path(), builder, Rkv::from_builder::<SafeMode>)
.expect("created");
assert!(manager.get(root.path()).expect("success").is_some());

assert!(!safebin.exists(), "Database file was moved out of the way");

let mut corruptbin = root.path().to_path_buf();
corruptbin.push("data.safe.bin.corrupt");
assert!(corruptbin.exists(), "Corrupted database file exists");

let shared_env = manager
.get_or_create(root.path(), Rkv::new::<SafeMode>)
.expect("created");
let env = shared_env.read().unwrap();

// Writing still works.
let store = env
.open_single("store", StoreOptions::create())
.expect("opened");

let reader = env.read().expect("reader");
assert_eq!(store.get(&reader, "foo").expect("read"), None, "Nothing to be read");

// We can write.
let mut writer = env.write().expect("writer");
store
.put(&mut writer, "foo", &Value::I64(5678))
.expect("wrote");
writer.commit().expect("committed");
env.sync(true).expect("synced");

assert!(safebin.exists(), "Database file exists");

// Close everything.
drop(env);
drop(shared_env);
manager
.try_close(root.path(), CloseOptions::default())
.expect("closed without deleting");
assert!(manager.get(root.path()).expect("success").is_none());

// Recreate environment.
let shared_env = manager
.get_or_create(root.path(), Rkv::new::<SafeMode>)
.expect("created");
let env = shared_env.read().unwrap();

// Verify that the dbfile is not corrupted.
let store = env
.open_single("store", StoreOptions::default())
.expect("opened");
let reader = env.read().expect("reader");
assert_eq!(
store.get(&reader, "foo").expect("read"),
Some(Value::I64(5678)),
"Database contains expected value"
);
}