Skip to content

Commit

Permalink
perf: disjoint tree range optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
marvin-j97 committed Jul 7, 2024
1 parent 6d9089a commit 019e721
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 59 deletions.
24 changes: 12 additions & 12 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 48,20 @@ harness = false
path = "benches/bloom.rs"
required-features = ["bloom"]

[[bench]]
name = "tree"
harness = false
path = "benches/tree.rs"
required-features = []
# [[bench]]
# name = "level_manifest"
# harness = false
# path = "benches/level_manifest.rs"
# required-features = []

[[bench]]
name = "level_manifest"
harness = false
path = "benches/level_manifest.rs"
required-features = []
# [[bench]]
# name = "fd_table"
# harness = false
# path = "benches/fd_table.rs"
# required-features = []

[[bench]]
name = "fd_table"
name = "tree"
harness = false
path = "benches/fd_table.rs"
path = "benches/tree.rs"
required-features = []
58 changes: 58 additions & 0 deletions benches/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,11 539,69 @@ fn disk_point_read(c: &mut Criterion) {
});
}

fn disjoint_tree_minmax(c: &mut Criterion) {
let mut group = c.benchmark_group("Disjoint tree");

let folder = tempfile::tempdir().unwrap();

let tree = Config::new(folder)
.block_size(1_024)
.block_cache(Arc::new(BlockCache::with_capacity_bytes(0)))
.open()
.unwrap();

tree.insert("a", "a", 0);
tree.flush_active_memtable().unwrap();
tree.compact(Arc::new(lsm_tree::compaction::PullDown(0, 6)))
.unwrap();

tree.insert("b", "b", 0);
tree.flush_active_memtable().unwrap();
tree.compact(Arc::new(lsm_tree::compaction::PullDown(0, 5)))
.unwrap();

tree.insert("c", "c", 0);
tree.flush_active_memtable().unwrap();
tree.compact(Arc::new(lsm_tree::compaction::PullDown(0, 4)))
.unwrap();

tree.insert("d", "d", 0);
tree.flush_active_memtable().unwrap();
tree.compact(Arc::new(lsm_tree::compaction::PullDown(0, 3)))
.unwrap();

tree.insert("e", "e", 0);
tree.flush_active_memtable().unwrap();
tree.compact(Arc::new(lsm_tree::compaction::PullDown(0, 2)))
.unwrap();

tree.insert("f", "f", 0);
tree.flush_active_memtable().unwrap();
tree.compact(Arc::new(lsm_tree::compaction::PullDown(0, 1)))
.unwrap();

tree.insert("g", "g", 0);
tree.flush_active_memtable().unwrap();

group.bench_function(&"Tree::first_key_value".to_string(), |b| {
b.iter(|| {
assert_eq!(&*tree.first_key_value().unwrap().unwrap().1, b"a");
});
});

group.bench_function(&"Tree::last_key_value".to_string(), |b| {
b.iter(|| {
assert_eq!(&*tree.last_key_value().unwrap().unwrap().1, b"g");
});
});
}

// TODO: benchmark point read disjoint vs non-disjoint level vs disjoint *tree*
// TODO: benchmark .prefix().next() and .next_back(), disjoint and non-disjoint

criterion_group!(
benches,
disjoint_tree_minmax,
value_block_find,
index_block_find_handle,
value_block_size,
Expand Down
12 changes: 8 additions & 4 deletions src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 4,19 @@ pub(crate) mod fifo;
pub(crate) mod levelled;
pub(crate) mod maintenance;
pub(crate) mod major;
pub(crate) mod pulldown;
pub(crate) mod tiered;
pub(crate) mod worker;

use crate::{config::Config, levels::LevelManifest, segment::meta::SegmentId};

pub use fifo::Strategy as Fifo;
pub use levelled::Strategy as Levelled;
pub use tiered::Strategy as SizeTiered;

#[doc(hidden)]
pub use pulldown::Strategy as PullDown;

/// Input for compactor.
///
/// The compaction strategy chooses which segments to compact and how.
Expand Down Expand Up @@ -56,7 64,3 @@ pub trait CompactionStrategy {
/// Decides on what to do based on the current state of the LSM-tree's levels
fn choose(&self, _: &LevelManifest, config: &Config) -> Choice;
}

pub use fifo::Strategy as Fifo;
pub use levelled::Strategy as Levelled;
pub use tiered::Strategy as SizeTiered;
36 changes: 36 additions & 0 deletions src/compaction/pulldown.rs
Original file line number Diff line number Diff line change
@@ -0,0 1,36 @@
use super::{Choice, CompactionStrategy, Input};
use crate::{levels::LevelManifest, Config};

/// Pulls down and merges a level into the destination level.
///
/// Used for unit tests.
pub struct Strategy(pub u8, pub u8);

impl CompactionStrategy for Strategy {
#[allow(clippy::expect_used)]
fn choose(&self, levels: &LevelManifest, _: &Config) -> Choice {
let resolved_view = levels.resolved_view();

let level = resolved_view
.get(usize::from(self.0))
.expect("level should exist");

let next_level = resolved_view
.get(usize::from(self.1))
.expect("next level should exist");

let mut segment_ids = level
.segments
.iter()
.map(|x| x.metadata.id)
.collect::<Vec<_>>();

segment_ids.extend(next_level.segments.iter().map(|x| x.metadata.id));

Choice::Merge(Input {
segment_ids,
dest_level: self.1,
target_size: 64_000_000,
})
}
}
5 changes: 4 additions & 1 deletion src/levels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 278,10 @@ impl LevelManifest {
level.insert(segment);
}

pub fn is_disjoint(&self) -> bool {
self.levels.iter().all(|x| x.is_disjoint)
}

/// Returns `true` if there are no segments
#[must_use]
pub fn is_empty(&self) -> bool {
Expand Down Expand Up @@ -403,7 407,6 @@ impl Serializable for Vec<Level> {
#[cfg(test)]
#[allow(clippy::expect_used)]
mod tests {

use crate::{
block_cache::BlockCache,
descriptor_table::FileDescriptorTable,
Expand Down
98 changes: 56 additions & 42 deletions src/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 40,6 @@ impl<'a> DoubleEndedIterator for TreeIter<'a> {
}
}

/* fn filter_by_seqno(item_seqno: SeqNo, seqno: Option<SeqNo>) -> bool {
seqno.map_or(true, |seqno| item_seqno < seqno)
} */

impl<'a> TreeIter<'a> {
#[must_use]
pub fn create_prefix(
Expand All @@ -56,6 52,8 @@ impl<'a> TreeIter<'a> {
TreeIter::new(guard, |lock| {
let prefix = prefix.clone();

// TODO: check Tree::are_segments_disjoint

let mut segment_iters: Vec<BoxedIterator<'_>> =
Vec::with_capacity(level_manifest.len());

Expand Down Expand Up @@ -219,63 217,79 @@ impl<'a> TreeIter<'a> {

let range = (lo, hi);

let mut segment_iters: Vec<BoxedIterator<'_>> =
Vec::with_capacity(level_manifest.len());
let mut iters: Vec<BoxedIterator<'_>> = Vec::new();

for level in &level_manifest.levels {
if level.is_disjoint {
let mut level = level.clone();
// NOTE: Optimize disjoint trees (e.g. timeseries) to only use a single MultiReader.
if false {
// TODO: this could probably be smarter by comparing in which way the tree is growing
// TODO: TEST: also, unit test this by creating a descending and an ascending disjoint tree
let mut readers: Vec<_> = level_manifest.iter().collect();
readers.sort_by(|a, b| a.metadata.key_range.0.cmp(&b.metadata.key_range.0));
let readers: VecDeque<BoxedIterator<'_>> = readers
.into_iter()
.map(|x| Box::new(x.range(bounds.clone())) as BoxedIterator<'_>)
.collect::<VecDeque<_>>();

let mut readers: VecDeque<BoxedIterator<'_>> = VecDeque::new();
let multi_reader = MultiReader::new(readers);

level.sort_by_key_range();
if let Some(seqno) = seqno {
iters.push(Box::new(multi_reader.filter(move |item| match item {
Ok(item) => seqno_filter(item.seqno, seqno),
Err(_) => true,
})));
} else {
iters.push(Box::new(multi_reader));
}
} else {
for level in &level_manifest.levels {
if level.is_disjoint {
let mut level = level.clone();

for segment in &level.segments {
if segment.check_key_range_overlap(&bounds) {
let range = segment.range(bounds.clone());
readers.push_back(Box::new(range));
let mut readers: VecDeque<BoxedIterator<'_>> = VecDeque::new();

level.sort_by_key_range();

for segment in &level.segments {
if segment.check_key_range_overlap(&bounds) {
let range = segment.range(bounds.clone());
readers.push_back(Box::new(range));
}
}
}

if !readers.is_empty() {
let multi_reader = MultiReader::new(readers);
if !readers.is_empty() {
let multi_reader = MultiReader::new(readers);

if let Some(seqno) = seqno {
segment_iters.push(Box::new(multi_reader.filter(
move |item| match item {
if let Some(seqno) = seqno {
iters.push(Box::new(multi_reader.filter(move |item| match item {
Ok(item) => seqno_filter(item.seqno, seqno),
Err(_) => true,
},
)));
} else {
segment_iters.push(Box::new(multi_reader));
})));
} else {
iters.push(Box::new(multi_reader));
}
}
}
} else {
for segment in &level.segments {
if segment.check_key_range_overlap(&bounds) {
let reader = segment.range(bounds.clone());

if let Some(seqno) = seqno {
#[allow(clippy::option_if_let_else)]
segment_iters.push(Box::new(reader.filter(
move |item| match item {
} else {
for segment in &level.segments {
if segment.check_key_range_overlap(&bounds) {
let reader = segment.range(bounds.clone());

if let Some(seqno) = seqno {
#[allow(clippy::option_if_let_else)]
iters.push(Box::new(reader.filter(move |item| match item {
Ok(item) => seqno_filter(item.seqno, seqno),
Err(_) => true,
},
)));
} else {
segment_iters.push(Box::new(reader));
})));
} else {
iters.push(Box::new(reader));
}
}
}
}
}
}
};

drop(level_manifest);

let mut iters: Vec<_> = segment_iters;

// Sealed memtables
for (_, memtable) in lock.sealed.iter() {
let iter = memtable
Expand Down

0 comments on commit 019e721

Please sign in to comment.