Skip to content

Commit

Permalink
Fixes for group-by (#1938)
Browse files Browse the repository at this point in the history
* fix payload seletor

* clippy

* except cardinality estimation

* implement match except iterator and api

* use except instead of must-not + test

* Fix doc error

* Update lib/collection/src/grouping/group_by.rs

Co-authored-by: Tim Visée <tim+github@visee.me>

* Update lib/segment/src/index/field_index/map_index.rs

Co-authored-by: Tim Visée <tim+github@visee.me>

* Update lib/segment/src/index/field_index/map_index.rs

Co-authored-by: Tim Visée <tim+github@visee.me>

* Update lib/segment/src/index/field_index/map_index.rs

Co-authored-by: Tim Visée <tim+github@visee.me>

* Update lib/segment/src/index/query_optimization/condition_converter.rs

Co-authored-by: Tim Visée <tim+github@visee.me>

* Update lib/segment/src/index/query_optimization/condition_converter.rs

Co-authored-by: Tim Visée <tim+github@visee.me>

* Update lib/segment/src/index/query_optimization/condition_converter.rs

Co-authored-by: Tim Visée <tim+github@visee.me>

* Update lib/segment/src/vector_storage/mod.rs

Co-authored-by: Tim Visée <tim+github@visee.me>

* Update lib/segment/src/index/field_index/map_index.rs

Co-authored-by: Tim Visée <tim+github@visee.me>

* Update lib/collection/src/grouping/group_by.rs

Co-authored-by: Arnaud Gourlay <arnaud.gourlay@gmail.com>

* Update lib/segment/src/index/field_index/map_index.rs

Co-authored-by: Arnaud Gourlay <arnaud.gourlay@gmail.com>

* Update lib/segment/src/index/field_index/map_index.rs [skip ci]

Co-authored-by: Luis Cossío <luis.cossio@qdrant.com>

* fix: `except_on` and `match_on` now produce `Vec<Condition>`s

* Apply suggestions from code review (lib/segment/src/index/field_index/map_index.rs)

* fix: reset review suggestion

* Remove unnecessary move

* Use Rust idiomatic map_else rather than match-none-false

* is-null -> is-empty

* de-comment drop_collection

---------

Co-authored-by: timvisee <tim@visee.me>
Co-authored-by: Tim Visée <tim+github@visee.me>
Co-authored-by: Arnaud Gourlay <arnaud.gourlay@gmail.com>
Co-authored-by: Luis Cossío <luis.cossio@qdrant.com>
Co-authored-by: Luis Cossío <luis.cossio@outlook.com>
  • Loading branch information
6 people committed May 22, 2023
1 parent 2e5e029 commit f5dfeef
Show file tree
Hide file tree
Showing 19 changed files with 493 additions and 99 deletions.
2 changes: 2 additions & 0 deletions docs/grpc/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -1657,6 +1657,8 @@ The JSON representation for `Value` is a JSON value.
| text | [string](#string) | | Match text |
| keywords | [RepeatedStrings](#qdrant-RepeatedStrings) | | Match multiple keywords |
| integers | [RepeatedIntegers](#qdrant-RepeatedIntegers) | | Match multiple integers |
| except_integers | [RepeatedIntegers](#qdrant-RepeatedIntegers) | | Match any other value except those integers |
| except_keywords | [RepeatedStrings](#qdrant-RepeatedStrings) | | Match any other value except those keywords |



Expand Down
15 changes: 15 additions & 0 deletions docs/redoc/master/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -5023,6 +5023,9 @@
},
{
"$ref": "#/components/schemas/MatchAny"
},
{
"$ref": "#/components/schemas/MatchExcept"
}
]
},
Expand Down Expand Up @@ -5093,6 +5096,18 @@
}
]
},
"MatchExcept": {
"description": "Should have at least one value not matching the any given values",
"type": "object",
"required": [
"except"
],
"properties": {
"except": {
"$ref": "#/components/schemas/AnyVariants"
}
}
},
"Range": {
"description": "Range filter request",
"type": "object",
Expand Down
14 changes: 14 additions & 0 deletions lib/api/src/grpc/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,12 @@ impl TryFrom<Match> for segment::types::Match {
MatchValue::Text(text) => segment::types::Match::Text(text.into()),
MatchValue::Keywords(kwds) => kwds.strings.into(),
MatchValue::Integers(ints) => ints.integers.into(),
MatchValue::ExceptIntegers(kwds) => {
segment::types::Match::Except(kwds.integers.into())
}
MatchValue::ExceptKeywords(ints) => {
segment::types::Match::Except(ints.strings.into())
}
}),
_ => Err(Status::invalid_argument("Malformed Match condition")),
}
Expand All @@ -984,6 +990,14 @@ impl From<segment::types::Match> for Match {
MatchValue::Integers(RepeatedIntegers { integers })
}
},
segment::types::Match::Except(except) => match except.except {
segment::types::AnyVariants::Keywords(strings) => {
MatchValue::ExceptKeywords(RepeatedStrings { strings })
}
segment::types::AnyVariants::Integers(integers) => {
MatchValue::ExceptIntegers(RepeatedIntegers { integers })
}
},
};
Self {
match_value: Some(match_value),
Expand Down
2 changes: 2 additions & 0 deletions lib/api/src/grpc/proto/points.proto
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,8 @@ message Match {
string text = 4; // Match text
RepeatedStrings keywords = 5; // Match multiple keywords
RepeatedIntegers integers = 6; // Match multiple integers
RepeatedIntegers except_integers = 7; // Match any other value except those integers
RepeatedStrings except_keywords = 8; // Match any other value except those keywords
}
}

Expand Down
8 changes: 7 additions & 1 deletion lib/api/src/grpc/qdrant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3352,7 +3352,7 @@ pub struct FieldCondition {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Match {
#[prost(oneof = "r#match::MatchValue", tags = "1, 2, 3, 4, 5, 6")]
#[prost(oneof = "r#match::MatchValue", tags = "1, 2, 3, 4, 5, 6, 7, 8")]
pub match_value: ::core::option::Option<r#match::MatchValue>,
}
/// Nested message and enum types in `Match`.
Expand All @@ -3378,6 +3378,12 @@ pub mod r#match {
/// Match multiple integers
#[prost(message, tag = "6")]
Integers(super::RepeatedIntegers),
/// Match any other value except those integers
#[prost(message, tag = "7")]
ExceptIntegers(super::RepeatedIntegers),
/// Match any other value except those keywords
#[prost(message, tag = "8")]
ExceptKeywords(super::RepeatedStrings),
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
Expand Down
111 changes: 79 additions & 32 deletions lib/collection/src/grouping/group_by.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::collections::HashMap;
use std::future::Future;

use itertools::Itertools;
use segment::data_types::vectors::DEFAULT_VECTOR_NAME;
use segment::types::{
AnyVariants, Condition, FieldCondition, Filter, IsNullCondition, Match, PointGroup,
ScoredPoint, WithPayloadInterface, WithVector,
AnyVariants, Condition, FieldCondition, Filter, Match, PointGroup, ScoredPoint,
WithPayloadInterface, WithVector,
};
use serde_json::Value;
use tokio::sync::RwLockReadGuard;
Expand All @@ -13,7 +14,7 @@ use super::aggregator::GroupsAggregator;
use crate::collection::Collection;
use crate::operations::consistency_params::ReadConsistency;
use crate::operations::types::{
BaseGroupRequest, CollectionResult, RecommendGroupsRequest, RecommendRequest,
BaseGroupRequest, CollectionError, CollectionResult, RecommendGroupsRequest, RecommendRequest,
SearchGroupsRequest, SearchRequest, UsingVector,
};
use crate::recommendations::recommend_by;
Expand Down Expand Up @@ -101,6 +102,24 @@ impl GroupRequest {
}
}

/// Apply a bunch of hacks to make `group_by` field selector work with as `with_payload`.
fn _group_by_to_payload_selector(&self, group_by: &str) -> CollectionResult<String> {
// Hack 1: `with_payload` only works with top-level fields. (ToDo: maybe fix this?)
group_by.split('.').next().map_or_else(
|| {
Err(CollectionError::bad_request(format!(
"Malformed group_by parameter which uses unsupported nested path: {}",
group_by
)))
},
|field| {
// Hack 2: `with_payload` doesn't work with `[]` at the end of the field name.
// Remove the ending `[]`.
Ok(field.strip_suffix("[]").unwrap_or(field).to_owned())
},
)
}

async fn r#do<'a, F, Fut>(
&self,
collection: &Collection,
Expand All @@ -113,24 +132,17 @@ impl GroupRequest {
F: Fn(String) -> Fut,
Fut: Future<Output = Option<RwLockReadGuard<'a, Collection>>>,
{
// Hack: "with_payload" returns empty payload when the requested field ends with `[]`.
// Remove the ending `[]`.
let include_group_by = match self.group_by.as_str() {
s if s.ends_with("[]") => s[..s.len() - 2].to_owned(),
s => s.to_owned(),
};
let include_group_by = self._group_by_to_payload_selector(&self.group_by)?;

let only_group_by_key = Some(WithPayloadInterface::Fields(vec![include_group_by]));

let key_not_null = Filter::new_must_not(Condition::IsNull(IsNullCondition::from(
self.group_by.clone(),
)));
let key_not_empty = Filter::new_must_not(Condition::IsEmpty(self.group_by.clone().into()));

match self.source.clone() {
SourceRequest::Search(mut request) => {
request.limit = self.limit * self.group_size;

request.filter = Some(request.filter.unwrap_or_default().merge(&key_not_null));
request.filter = Some(request.filter.unwrap_or_default().merge(&key_not_empty));

// We're enriching the final results at the end, so we'll keep this minimal
request.with_payload = only_group_by_key;
Expand All @@ -143,7 +155,7 @@ impl GroupRequest {
SourceRequest::Recommend(mut request) => {
request.limit = self.limit * self.group_size;

request.filter = Some(request.filter.unwrap_or_default().merge(&key_not_null));
request.filter = Some(request.filter.unwrap_or_default().merge(&key_not_empty));

// We're enriching the final results at the end, so we'll keep this minimal
request.with_payload = only_group_by_key;
Expand Down Expand Up @@ -276,8 +288,12 @@ where
// construct filter to exclude already found groups
let full_groups = aggregator.keys_of_filled_groups();
if !full_groups.is_empty() {
if let Some(match_any) = match_on(request.group_by.clone(), full_groups) {
let exclude_groups = Filter::new_must_not(match_any);
let except_any = except_on(&request.group_by, full_groups);
if !except_any.is_empty() {
let exclude_groups = Filter {
must: Some(except_any),
..Default::default()
};
source.merge_filter(&exclude_groups);
}
}
Expand Down Expand Up @@ -317,8 +333,12 @@ where

// construct filter to only include unsatisfied groups
let unsatisfied_groups = aggregator.keys_of_unfilled_best_groups();
if let Some(match_any) = match_on(request.group_by.clone(), unsatisfied_groups) {
let include_groups = Filter::new_must(match_any);
let match_any = match_on(&request.group_by, unsatisfied_groups);
if !match_any.is_empty() {
let include_groups = Filter {
must: Some(match_any),
..Default::default()
};
source.merge_filter(&include_groups);
}

Expand Down Expand Up @@ -380,21 +400,48 @@ where
Ok(groups)
}

/// Uses the set of values to create a Match::Any, if possible
fn match_on(path: String, values: Vec<Value>) -> Option<Condition> {
match values.first() {
Some(Value::Number(_)) => Some(Match::new_any(AnyVariants::Integers(
values.into_iter().filter_map(|v| v.as_i64()).collect(),
))),
Some(Value::String(_)) => Some(Match::new_any(AnyVariants::Keywords(
values
.into_iter()
.filter_map(|v| v.as_str().map(|s| s.to_owned()))
.collect(),
))),
_ => None, // also considers the case of empty values
/// Uses the set of values to create Match::Except's, if possible
fn except_on(path: &str, values: Vec<Value>) -> Vec<Condition> {
values_to_any_variants(values)
.into_iter()
.map(|v| Condition::Field(FieldCondition::new_match(path, Match::new_except(v))))
.collect()
}

/// Uses the set of values to create Match::Any's, if possible
fn match_on(path: &str, values: Vec<Value>) -> Vec<Condition> {
values_to_any_variants(values)
.into_iter()
.map(|any_variants| {
Condition::Field(FieldCondition::new_match(
path,
Match::new_any(any_variants),
))
})
.collect()
}

fn values_to_any_variants(values: Vec<Value>) -> Vec<AnyVariants> {
let mut any_variants = Vec::new();

// gather int values
let ints = values.iter().filter_map(|v| v.as_i64()).collect_vec();

if !ints.is_empty() {
any_variants.push(AnyVariants::Integers(ints));
}
.map(|m| Condition::Field(FieldCondition::new_match(path, m)))

// gather string values
let strs = values
.iter()
.filter_map(|v| v.as_str().map(|s| s.to_owned()))
.collect_vec();

if !strs.is_empty() {
any_variants.push(AnyVariants::Keywords(strs));
}

any_variants
}

#[cfg(test)]
Expand Down
27 changes: 19 additions & 8 deletions lib/segment/src/index/field_index/field_index_base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ pub trait PayloadFieldIndex {

/// Get iterator over points fitting given `condition`
/// Return `None` if condition does not match the index type
fn filter(
&self,
condition: &FieldCondition,
) -> Option<Box<dyn Iterator<Item = PointOffsetType> + '_>>;
fn filter<'a>(
&'a self,
condition: &'a FieldCondition,
) -> Option<Box<dyn Iterator<Item = PointOffsetType> + 'a>>;

/// Return estimation of points amount which satisfy given condition
fn estimate_cardinality(&self, condition: &FieldCondition) -> Option<CardinalityEstimation>;
Expand Down Expand Up @@ -222,10 +222,10 @@ impl FieldIndex {
self.get_payload_field_index().flusher()
}

pub fn filter(
&self,
condition: &FieldCondition,
) -> Option<Box<dyn Iterator<Item = PointOffsetType> + '_>> {
pub fn filter<'a>(
&'a self,
condition: &'a FieldCondition,
) -> Option<Box<dyn Iterator<Item = PointOffsetType> + 'a>> {
self.get_payload_field_index().filter(condition)
}

Expand Down Expand Up @@ -298,4 +298,15 @@ impl FieldIndex {
FieldIndex::FullTextIndex(index) => index.get_telemetry_data(),
}
}

pub fn values_count(&self, point_id: PointOffsetType) -> usize {
match self {
FieldIndex::IntIndex(index) => index.values_count(point_id),
FieldIndex::IntMapIndex(index) => index.values_count(point_id),
FieldIndex::KeywordIndex(index) => index.values_count(point_id),
FieldIndex::FloatIndex(index) => index.values_count(point_id),
FieldIndex::GeoIndex(index) => index.values_count(point_id),
FieldIndex::FullTextIndex(index) => index.values_count(point_id),
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ impl Document {
Self { tokens }
}

pub fn len(&self) -> usize {
self.tokens.len()
}

pub fn is_empty(&self) -> bool {
self.tokens.is_empty()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ impl FullTextIndex {
let parsed_query = self.parse_query(query);
self.inverted_index.filter(&parsed_query)
}

pub fn values_count(&self, point_id: PointOffsetType) -> usize {
// Maybe we want number of documents in the future?
self.get_doc(point_id).map(|x| x.len()).unwrap_or(0)
}
}

impl ValueIndexer<String> for FullTextIndex {
Expand Down
4 changes: 4 additions & 0 deletions lib/segment/src/index/field_index/geo_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,10 @@ impl GeoMapIndex {

Box::new(edge_region.into_iter())
}

pub fn values_count(&self, point_id: PointOffsetType) -> usize {
self.get_values(point_id).map(|x| x.len()).unwrap_or(0)
}
}

impl ValueIndexer<GeoPoint> for GeoMapIndex {
Expand Down
Loading

0 comments on commit f5dfeef

Please sign in to comment.