Skip to content

Commit

Permalink
docid deltas while indexing
Browse files Browse the repository at this point in the history
storing deltas is especially helpful for repetitive data like logs.
In those cases, recording a doc on a term costed 4 bytes instead of 1
byte now.

HDFS Indexing 1.1GB Total memory consumption:
Before:  760 MB
Now:     590 MB
  • Loading branch information
PSeitz committed Nov 12, 2023
1 parent 4837c78 commit f6512d4
Showing 1 changed file with 32 additions and 36 deletions.
68 changes: 32 additions & 36 deletions src/postings/recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,21 82,12 @@ pub(crate) trait Recorder: Copy Default Send Sync 'static {
}

/// Only records the doc ids
#[derive(Clone, Copy)]
#[derive(Clone, Copy, Default)]
pub struct DocIdRecorder {
stack: ExpUnrolledLinkedList,
current_doc: DocId,
}

impl Default for DocIdRecorder {
fn default() -> Self {
DocIdRecorder {
stack: ExpUnrolledLinkedList::default(),
current_doc: u32::MAX,
}
}
}

impl Recorder for DocIdRecorder {
#[inline]
fn current_doc(&self) -> DocId {
Expand All @@ -105,8 96,9 @@ impl Recorder for DocIdRecorder {

#[inline]
fn new_doc(&mut self, doc: DocId, arena: &mut MemoryArena) {
let delta = doc - self.current_doc;
self.current_doc = doc;
self.stack.writer(arena).write_u32_vint(doc);
self.stack.writer(arena).write_u32_vint(delta);
}

#[inline]
Expand All @@ -125,18 117,22 @@ impl Recorder for DocIdRecorder {
let (buffer, doc_ids) = buffer_lender.lend_all();
self.stack.read_to_end(arena, buffer);
// TODO avoid reading twice.
let mut prev_doc = 0;
if let Some(doc_id_map) = doc_id_map {
doc_ids.extend(
VInt32Reader::new(&buffer[..])
.map(|old_doc_id| doc_id_map.get_new_doc_id(old_doc_id)),
);
doc_ids.extend(VInt32Reader::new(&buffer[..]).map(|delta_doc_id| {
let old_doc_id = prev_doc delta_doc_id;
prev_doc = old_doc_id;
doc_id_map.get_new_doc_id(old_doc_id)
}));
doc_ids.sort_unstable();

for doc in doc_ids {
serializer.write_doc(*doc, 0u32, &[][..]);
}
} else {
for doc in VInt32Reader::new(&buffer[..]) {
for delta_doc_id in VInt32Reader::new(&buffer[..]) {
let doc = prev_doc delta_doc_id;
prev_doc = doc;
serializer.write_doc(doc, 0u32, &[][..]);
}
}
Expand Down Expand Up @@ -164,9 160,10 @@ impl Recorder for TermFrequencyRecorder {

#[inline]
fn new_doc(&mut self, doc: DocId, arena: &mut MemoryArena) {
let delta = doc - self.current_doc;
self.term_doc_freq = 1;
self.current_doc = doc;
self.stack.writer(arena).write_u32_vint(doc);
self.stack.writer(arena).write_u32_vint(delta);
}

#[inline]
Expand All @@ -191,21 188,26 @@ impl Recorder for TermFrequencyRecorder {
let buffer = buffer_lender.lend_u8();
self.stack.read_to_end(arena, buffer);
let mut u32_it = VInt32Reader::new(&buffer[..]);
let mut prev_doc = 0;
if let Some(doc_id_map) = doc_id_map {
let mut doc_id_and_tf = vec![];
while let Some(old_doc_id) = u32_it.next() {
while let Some(delta_doc_id) = u32_it.next() {
let doc_id = prev_doc delta_doc_id;
prev_doc = doc_id;
let term_freq = u32_it.next().unwrap_or(self.current_tf);
doc_id_and_tf.push((doc_id_map.get_new_doc_id(old_doc_id), term_freq));
doc_id_and_tf.push((doc_id_map.get_new_doc_id(doc_id), term_freq));
}
doc_id_and_tf.sort_unstable_by_key(|&(doc_id, _)| doc_id);

for (doc_id, tf) in doc_id_and_tf {
serializer.write_doc(doc_id, tf, &[][..]);
}
} else {
while let Some(doc) = u32_it.next() {
while let Some(delta_doc_id) = u32_it.next() {
let doc_id = prev_doc delta_doc_id;
prev_doc = doc_id;
let term_freq = u32_it.next().unwrap_or(self.current_tf);
serializer.write_doc(doc, term_freq, &[][..]);
serializer.write_doc(doc_id, term_freq, &[][..]);
}
}
}
Expand All @@ -216,23 218,13 @@ impl Recorder for TermFrequencyRecorder {
}

/// Recorder encoding term frequencies as well as positions.
#[derive(Clone, Copy)]
#[derive(Clone, Copy, Default)]
pub struct TfAndPositionRecorder {
stack: ExpUnrolledLinkedList,
current_doc: DocId,
term_doc_freq: u32,
}

impl Default for TfAndPositionRecorder {
fn default() -> Self {
TfAndPositionRecorder {
stack: ExpUnrolledLinkedList::default(),
current_doc: u32::MAX,
term_doc_freq: 0u32,
}
}
}

impl Recorder for TfAndPositionRecorder {
#[inline]
fn current_doc(&self) -> DocId {
Expand All @@ -241,9 233,10 @@ impl Recorder for TfAndPositionRecorder {

#[inline]
fn new_doc(&mut self, doc: DocId, arena: &mut MemoryArena) {
let delta = doc - self.current_doc;
self.current_doc = doc;
self.term_doc_freq = 1u32;
self.stack.writer(arena).write_u32_vint(doc);
self.stack.writer(arena).write_u32_vint(delta);
}

#[inline]
Expand All @@ -269,7 262,10 @@ impl Recorder for TfAndPositionRecorder {
self.stack.read_to_end(arena, buffer_u8);
let mut u32_it = VInt32Reader::new(&buffer_u8[..]);
let mut doc_id_and_positions = vec![];
while let Some(doc) = u32_it.next() {
let mut prev_doc = 0;
while let Some(delta_doc_id) = u32_it.next() {
let doc_id = prev_doc delta_doc_id;
prev_doc = doc_id;
let mut prev_position_plus_one = 1u32;
buffer_positions.clear();
loop {
Expand All @@ -287,9 283,9 @@ impl Recorder for TfAndPositionRecorder {
if let Some(doc_id_map) = doc_id_map {
// this simple variant to remap may consume to much memory
doc_id_and_positions
.push((doc_id_map.get_new_doc_id(doc), buffer_positions.to_vec()));
.push((doc_id_map.get_new_doc_id(doc_id), buffer_positions.to_vec()));
} else {
serializer.write_doc(doc, buffer_positions.len() as u32, buffer_positions);
serializer.write_doc(doc_id, buffer_positions.len() as u32, buffer_positions);
}
}
if doc_id_map.is_some() {
Expand Down

0 comments on commit f6512d4

Please sign in to comment.