Skip to content
This repository has been archived by the owner on Jan 2, 2025. It is now read-only.

Commit

Permalink
rip out qdrant
Browse files Browse the repository at this point in the history
  • Loading branch information
oppiliappan committed Nov 2, 2023
1 parent 6955381 commit 4a06a80
Show file tree
Hide file tree
Showing 8 changed files with 403 additions and 428 deletions.
2 changes: 2 additions & 0 deletions server/bleep/src/collector.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
mod bytes_filter;
mod frequency;
mod group;

pub use bytes_filter::BytesFilterCollector;
pub use frequency::FrequencyCollector;
pub use group::GroupCollector;
148 changes: 148 additions & 0 deletions server/bleep/src/collector/group.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
use tantivy::{
collector::{Collector, SegmentCollector},
schema::Field,
DocAddress, Score, SegmentReader,
};
use tantivy_columnar;

use std::collections::HashMap;

#[derive(Debug, Default)]
pub struct Group {
pub items: Vec<DocAddress>,
}

#[derive(Debug)]
pub struct Groups {
pub items: HashMap<blake3::Hash, Group>,
}

impl Groups {
fn non_zero_count(self) -> Option<Self> {
if self.items.len() == 0 {
None
} else {
Some(self)
}
}
}

pub struct GroupCollector {
field: Field,
group_size: usize,
limit: usize,
}

impl GroupCollector {
pub fn with_field(field: Field) -> Self {
Self {
field,
group_size: 1,
limit: 100,
}
}

pub fn with_group_size(self, group_size: usize) -> Self {
Self { group_size, ..self }
}

pub fn with_limit(self, limit: usize) -> Self {
Self { limit, ..self }
}
}

impl Collector for GroupCollector {
type Fruit = Option<Groups>;
type Child = GroupSegmentCollector;

fn for_segment(
&self,
segment_local_id: u32,
segment_reader: &SegmentReader,
) -> tantivy::Result<GroupSegmentCollector> {
let field_name = segment_reader.schema().get_field_name(self.field);
let fast_field_reader = segment_reader.fast_fields().bytes(field_name)?.unwrap();
Ok(GroupSegmentCollector {
fast_field_reader,
segment_local_id,
group_size: self.group_size,
groups: Groups {
items: HashMap::new(),
},
})
}

fn requires_scoring(&self) -> bool {
// this collector does not care about score.
false
}

fn merge_fruits(&self, segment_groups: Vec<Option<Groups>>) -> tantivy::Result<Option<Groups>> {
let mut groups = Groups {
items: HashMap::new(),
};

for segment_group in segment_groups.into_iter().flatten() {
// merge segment_group into groups
let permitted_groups = self.limit.saturating_sub(groups.items.len());
for (k, v) in segment_group.items.into_iter().take(permitted_groups) {
groups
.items
.entry(k)
.and_modify(|entries| {
let permitted_items = self.group_size.saturating_sub(entries.items.len());
entries.items.extend(v.items.iter().take(permitted_items));
})
.or_insert_with(|| v);
}
}
Ok(groups.non_zero_count())
}
}

pub struct GroupSegmentCollector {
fast_field_reader: tantivy_columnar::BytesColumn,
segment_local_id: u32,
groups: Groups,
group_size: usize,
}

impl SegmentCollector for GroupSegmentCollector {
type Fruit = Option<Groups>;

fn collect(&mut self, doc: u32, _score: Score) {
let mut value = Vec::new();
self.fast_field_reader
.ords()
.values_for_doc(doc)
.for_each(|ord| {
self.fast_field_reader
.ord_to_bytes(ord, &mut value)
.unwrap();
});
let hash = {
let mut h = blake3::Hasher::new();
h.update(&value);
h.finalize()
};
self.groups
.items
.entry(hash)
.and_modify(|group| {
// group.combined_score += score;
if group.items.len() < self.group_size {
group
.items
.push(DocAddress::new(self.segment_local_id, doc))
}
})
.or_insert_with(|| Group {
// combined_score: score,
items: vec![DocAddress::new(self.segment_local_id, doc)],
});
}

fn harvest(self) -> <Self as SegmentCollector>::Fruit {
self.groups.non_zero_count()
}
}
4 changes: 2 additions & 2 deletions server/bleep/src/indexes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl Indexes {
pub async fn new(
config: &Configuration,
sql: crate::SqlDb,
semantic: crate::semantic::Semantic,
embedder: Arc<dyn crate::semantic::Embedder>,
) -> Result<Self> {
Ok(Self {
repo: Indexer::create(
Expand All @@ -101,7 +101,7 @@ impl Indexes {
config.buffer_size,
config.max_threads,
)?,
doc: Doc::create(sql, semantic, config.index_path("doc").as_ref()).await?,
doc: Doc::create(sql, embedder, config.index_path("doc").as_ref()).await?,
write_mutex: Default::default(),
})
}
Expand Down
Loading

0 comments on commit 4a06a80

Please sign in to comment.