Skip to content

Commit

Permalink
remove special early exit
Browse files Browse the repository at this point in the history
  • Loading branch information
PSeitz committed Sep 25, 2023
1 parent 14caacb commit 6c53745
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 34 deletions.
2 changes: 1 addition & 1 deletion src/collector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 97,7 @@ pub use self::multi_collector::{FruitHandle, MultiCollector, MultiFruit};
mod top_collector;

mod top_score_collector;
pub use self::top_score_collector::TopDocs;
pub use self::top_score_collector::{TopDocs, TopNComputer};

mod custom_score_top_collector;
pub use self::custom_score_top_collector::{CustomScorer, CustomSegmentScorer};
Expand Down
29 changes: 5 additions & 24 deletions src/collector/top_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 60,8 @@ pub(crate) struct TopCollector<T> {
}

impl<T> TopCollector<T>
where T: PartialOrd Clone
where
T: PartialOrd Clone,
{
/// Creates a top collector, with a number of documents equal to "limit".
///
Expand All @@ -86,31 87,11 @@ where T: PartialOrd Clone

pub fn merge_fruits(
&self,
mut children: Vec<Vec<(T, DocAddress)>>,
children: Vec<Vec<(T, DocAddress)>>,
) -> crate::Result<Vec<(T, DocAddress)>> {
if self.limit == 0 {
return Ok(Vec::new());
}
if children.len() == 1 {
let mut child = children.pop().unwrap();
child.sort_by(|a, b| {
if a.0 == b.0 {
a.1.cmp(&b.1)
} else {
b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal)
}
});
if self.offset != 0 {
return Ok(child
.into_iter()
.skip(self.offset)
.take(self.limit)
.collect());
}
child.truncate(self.limit);

return Ok(child);
}
let mut top_collector = TopNComputer::new(self.limit self.offset);
for child_fruit in children {
for (feature, doc) in child_fruit {
Expand Down Expand Up @@ -150,9 131,9 @@ where T: PartialOrd Clone
/// The Top Collector keeps track of the K documents
/// sorted by type `T`.
///
/// The implementation is based on a `BinaryHeap`.
/// The implementation is based on a repeatedly truncating on the median after K * 2 documents
/// The theoretical complexity for collecting the top `K` out of `n` documents
/// is `O(n log K)`.
/// is `O(n K)`.
pub(crate) struct TopSegmentCollector<T> {
topn_computer: TopNComputer<T, DocId>,
segment_ord: u32,
Expand Down
68 changes: 59 additions & 9 deletions src/collector/top_score_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,22 722,27 @@ impl SegmentCollector for TopScoreSegmentCollector {
}
}

/// Fast TopN Computation
///
/// For TopN == 0, it will be relative expensive.
pub struct TopNComputer<Score, DocId> {
buffer: Vec<ComparableDoc<Score, DocId>>,
max_size: usize,
pub threshold: Option<Score>,
top_n: usize,
pub(crate) threshold: Option<Score>,
}

impl<Score, DocId> TopNComputer<Score, DocId>
where
Score: PartialOrd Clone,
DocId: Ord Clone,
{
/// Create a new `TopNComputer`.
/// Internally it will allocate a buffer of size `2 * top_n`.
pub fn new(top_n: usize) -> Self {
let max_size = top_n.max(1) * 2;
let vec_cap = top_n.max(1) * 2;
TopNComputer {
buffer: Vec::with_capacity(max_size),
max_size,
buffer: Vec::with_capacity(vec_cap),
top_n,
threshold: None,
}
}
Expand All @@ -749,13 754,21 @@ where
return;
}
}
// Check on capacity should eleminate the check in `push`
if self.buffer.len() == self.buffer.capacity() {
let median = self.truncate_median();
self.threshold = Some(median);
}

self.buffer.push(doc);
// This is faster since it avoids the buffer resizing to be inlined from vec.push()
// (this is in the hot path)
let uninit = self.buffer.spare_capacity_mut();
// This cannot panic, because we truncate_median will at least remove one element, since
// the min capacity is 2.
uninit[0].write(doc);
// This is safe because it would panic in the line above
unsafe {
self.buffer.set_len(self.buffer.len() 1);
}
}

#[inline(never)]
Expand All @@ -776,13 789,14 @@ where
sorted_buffer.sort_unstable();

// Return the sorted top N elements
sorted_buffer.into_iter().take(self.max_size / 2)
sorted_buffer.into_iter().take(self.top_n)
}
}

#[cfg(test)]
mod tests {
use super::TopDocs;
use super::{TopDocs, TopNComputer};
use crate::collector::top_collector::ComparableDoc;
use crate::collector::Collector;
use crate::query::{AllQuery, Query, QueryParser};
use crate::schema::{Field, Schema, FAST, STORED, TEXT};
Expand Down Expand Up @@ -811,6 825,42 @@ mod tests {
}
}

#[test]
fn test_empty_topn_computer() {
let mut computer: TopNComputer<u32, u32> = TopNComputer::new(0);

computer.push(ComparableDoc {
feature: 1u32,
doc: 1u32,
});
computer.push(ComparableDoc {
feature: 1u32,
doc: 2u32,
});
computer.push(ComparableDoc {
feature: 1u32,
doc: 3u32,
});
computer
.into_iter_sorted()
.for_each(|_| panic!("Should be empty"));
}

#[test]
fn test_topn_computer_no_panic() {
for top_n in 0..10 {
let mut computer: TopNComputer<u32, u32> = TopNComputer::new(top_n);

for _ in 0..1 top_n * 2 {
computer.push(ComparableDoc {
feature: 1u32,
doc: 1u32,
});
}
let _vals = computer.into_iter_sorted().collect::<Vec<_>>();
}
}

#[test]
fn test_top_collector_not_at_capacity_without_offset() -> crate::Result<()> {
let index = make_index()?;
Expand Down

0 comments on commit 6c53745

Please sign in to comment.