Skip to content

Commit

Permalink
feat: separate apply and snapshot cmd channel (#108)
Browse files Browse the repository at this point in the history
  • Loading branch information
MrCroxx authored Apr 20, 2022
1 parent 64b2590 commit cefd52b
Show file tree
Hide file tree
Showing 8 changed files with 316 additions and 221 deletions.
36 changes: 0 additions & 36 deletions storage/src/lsm_tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,39 10,3 @@ pub const DEFAULT_ENTRY_SIZE: usize = 1024; // 1 KiB
pub const DEFAULT_BLOOM_FALSE_POSITIVE: f64 = 0.1;
pub const DEFAULT_SSTABLE_META_SIZE: usize = 4 * 1024; // 4 KiB
pub const DEFAULT_MEMTABLE_SIZE: usize = 4 * 1024 * 1024; // 4 MiB

use async_trait::async_trait;
use bytes::Bytes;

use crate::error::Result;

// TODO: Support iterator on [`LsmTree`].
#[async_trait]
pub trait LsmTree: Send Sync Clone 'static {
/// Put a new `key` `value` pair into LSM-Tree with given `timestamp`.
///
/// # Safety
///
/// The interface exposes `timestamp` to user for the compatibility with upper system. It's
/// caller's responsibility to ensure that the new timestamp is higher than the old one on the
/// same key. Otherwise there will be consistency problems.
async fn put(&self, key: &Bytes, value: &Bytes, timestamp: u64) -> Result<()>;

/// Delete a the given `key` in LSM-Tree by tombstone with given `timestamp`.
///
/// # Safety
///
/// The interface exposes `timestamp` to user for the compatibility with upper system. It's
/// caller's responsibility to ensure that the new timestamp is higher than the old one on the
/// same key. Otherwise there will be consistency problems.
async fn delete(&self, key: &Bytes, timestamp: u64) -> Result<()>;

/// Get the value of the given `key` in LSM-Tree with given `timestamp`.
///
/// # Safety
///
/// The interface exposes `timestamp` to user for the compatibility with upper system. It's
/// caller's responsibility to ensure that the new timestamp is higher than the old one on the
/// same key. Otherwise there will be consistency problems.
async fn get(&self, key: &Bytes, timestamp: u64) -> Result<Option<Bytes>>;
}
14 changes: 10 additions & 4 deletions tests/integrations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 13,7 @@ use runkv_proto::meta::KeyRange;
use runkv_proto::wheel::wheel_service_client::WheelServiceClient;
use runkv_rudder::config::RudderConfig;
use runkv_rudder::{bootstrap_rudder, build_rudder_with_object_store};
use runkv_storage::{LsmTree, MemObjectStore};
use runkv_storage::MemObjectStore;
use runkv_wheel::config::WheelConfig;
use runkv_wheel::{bootstrap_wheel, build_wheel_with_object_store};
use test_log::test;
Expand Down Expand Up @@ -97,19 97,25 @@ async fn test_concurrent_put_get() {
async move {
let mut rng = thread_rng();
tokio::time::sleep(Duration::from_millis(rng.gen_range(0..100))).await;
lsmtree_clone.put(&key(i), &value(i), 1).await.unwrap();
lsmtree_clone
.put(&key(i), &value(i), 1, 0, 0)
.await
.unwrap();
trace!("put {:?} at {}", key(i), 1);
tokio::time::sleep(Duration::from_millis(rng.gen_range(0..100))).await;
trace!("get {:?} at {}", key(i), 3);
assert_eq!(lsmtree_clone.get(&key(i), 3).await.unwrap(), Some(value(i)));
tokio::time::sleep(Duration::from_millis(rng.gen_range(0..100))).await;
lsmtree_clone.delete(&key(i), 5).await.unwrap();
lsmtree_clone.delete(&key(i), 5, 0, 0).await.unwrap();
trace!("delete {:?} at {}", key(i), 5);
tokio::time::sleep(Duration::from_millis(rng.gen_range(0..100))).await;
trace!("get {:?} at {}", key(i), 7);
assert_eq!(lsmtree_clone.get(&key(i), 7).await.unwrap(), None);
tokio::time::sleep(Duration::from_millis(rng.gen_range(0..100))).await;
lsmtree_clone.put(&key(i), &value(i), 9).await.unwrap();
lsmtree_clone
.put(&key(i), &value(i), 9, 0, 0)
.await
.unwrap();
trace!("put {:?} at {}", key(i), 9);
tokio::time::sleep(Duration::from_millis(rng.gen_range(0..100))).await;
trace!("get {:?} at {}", key(i), 11);
Expand Down
40 changes: 10 additions & 30 deletions wheel/src/components/command.rs
Original file line number Diff line number Diff line change
@@ -1,44 1,24 @@
use tokio::sync::oneshot;
use std::ops::Range;

use crate::error::Result;
use tokio::sync::oneshot;

#[derive(Debug)]
pub enum CommandRequest {
Data {
group: u64,
index: u64,
data: Vec<u8>,
},
BuildSnapshot {
group: u64,
index: u64,
},
InstallSnapshot {
group: u64,
index: u64,
snapshot: Vec<u8>,
},
pub struct Apply {
pub group: u64,
pub range: Range<u64>,
}

#[derive(Debug)]
pub enum CommandResponse {
Data {
pub enum Snapshot {
Build {
group: u64,
index: u64,
notifier: oneshot::Sender<Vec<u8>>,
},
BuildSnapshot {
Install {
group: u64,
index: u64,
snapshot: Vec<u8>,
notifier: oneshot::Sender<()>,
},
InstallSnapshot {
group: u64,
index: u64,
},
}

#[derive(Debug)]
pub struct AsyncCommand {
pub request: CommandRequest,
pub response: oneshot::Sender<Result<CommandResponse>>,
}
84 changes: 41 additions & 43 deletions wheel/src/components/gear.rs
Original file line number Diff line number Diff line change
@@ -1,60 1,63 @@
use std::io::Cursor;
use std::ops::Range;

use async_trait::async_trait;
use tokio::sync::{mpsc, oneshot};
use tracing::trace;

use super::command::{AsyncCommand, CommandRequest, CommandResponse};
use super::command::{Apply, Snapshot};
use super::fsm::Fsm;
use crate::error::{Error, Result};

#[derive(Clone)]
pub struct Gear {
sender: mpsc::UnboundedSender<AsyncCommand>,
apply_tx: mpsc::UnboundedSender<Apply>,
snapshot_tx: mpsc::UnboundedSender<Snapshot>,
}

impl Gear {
pub fn new(sender: mpsc::UnboundedSender<AsyncCommand>) -> Self {
Self { sender }
pub fn new(
apply_tx: mpsc::UnboundedSender<Apply>,
snapshot_tx: mpsc::UnboundedSender<Snapshot>,
) -> Self {
Self {
apply_tx,
snapshot_tx,
}
}
}

#[async_trait]
impl Fsm for Gear {
async fn apply(&self, group: u64, index: u64, request: &[u8]) -> Result<()> {
trace!(group = group, index = index, "notify apply:");
let (tx, rx) = oneshot::channel();
self.sender
.send(AsyncCommand {
request: CommandRequest::Data {
group,
index,
data: request.to_vec(),
},
response: tx,
})
async fn apply(&self, _group: u64, _index: u64, _request: &[u8]) -> Result<()> {
Ok(())
}

async fn post_apply(&self, group: u64, range: Range<u64>) -> Result<()> {
trace!(
"notify apply: [group: {}] [range: [{}..{})]",
group,
range.start,
range.end
);
self.apply_tx
.send(Apply { group, range })
.map_err(Error::err)?;
let res = rx.await.map_err(Error::err)??;
match res {
CommandResponse::Data { .. } => Ok(()),
_ => unreachable!(),
}
Ok(())
}

async fn build_snapshot(&self, group: u64, index: u64) -> Result<Cursor<Vec<u8>>> {
trace!("build snapshot");
let (tx, rx) = oneshot::channel();
self.sender
.send(AsyncCommand {
request: CommandRequest::BuildSnapshot { group, index },
response: tx,
self.snapshot_tx
.send(Snapshot::Build {
group,
index,
notifier: tx,
})
.map_err(Error::err)?;
let res = rx.await.map_err(Error::err)??;
match res {
CommandResponse::BuildSnapshot { snapshot, .. } => Ok(Cursor::new(snapshot)),
_ => unreachable!(),
}
let snapshot = rx.await.map_err(Error::err)?;
Ok(Cursor::new(snapshot))
}

async fn install_snapshot(
Expand All @@ -65,20 68,15 @@ impl Fsm for Gear {
) -> Result<()> {
trace!("install snapshot: {:?}", snapshot);
let (tx, rx) = oneshot::channel();
self.sender
.send(AsyncCommand {
request: CommandRequest::InstallSnapshot {
group,
index,
snapshot: snapshot.to_owned().into_inner(),
},
response: tx,
self.snapshot_tx
.send(Snapshot::Install {
group,
index,
snapshot: snapshot.to_owned().into_inner(),
notifier: tx,
})
.map_err(Error::err)?;
let res = rx.await.map_err(Error::err)??;
match res {
CommandResponse::InstallSnapshot { .. } => Ok(()),
_ => unreachable!(),
}
rx.await.map_err(Error::err)?;
Ok(())
}
}
Loading

0 comments on commit cefd52b

Please sign in to comment.