Skip to content

Commit

Permalink
[order-by] Fix remote shard calling (qdrant#4054)
Browse files Browse the repository at this point in the history
* failing test

* - better deserialization of ordering value
- ensure ordering value crosses a remote shard call
- fetch `with_payload` after merging from shards

* WIP: Fix inter shard order by tests fix (qdrant#4056)

* tests: Adjust assertion to check point.payload.index value

* tests: Fix failing test

* Revert "tests: Fix failing test"

This reverts commit f21fa4a.

* fix default `with_payload` value

* set with_payload_interface=false

* remove the extra `retrieve` call

---------

Co-authored-by: tellet-q <166374656+tellet-q@users.noreply.github.com>
  • Loading branch information
2 people authored and timvisee committed Apr 22, 2024
1 parent 27595d4 commit ef5b606
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 12 deletions.
19 changes: 16 additions & 3 deletions lib/collection/src/collection/point_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,9 @@ impl Collection {
// Needed to return next page offset.
limit += 1;
};

let local_only = shard_selection.is_shard_id();

let retrieved_points: Vec<_> = {
let shards_holder = self.shards_holder.read().await;
let target_shards = shards_holder.select_shards(shard_selection)?;
Expand All @@ -271,7 +274,7 @@ impl Collection {
&with_vector,
request.filter.as_ref(),
read_consistency,
shard_selection.is_shard_id(),
local_only,
order_by.as_ref(),
)
.and_then(move |mut records| async move {
Expand Down Expand Up @@ -302,8 +305,18 @@ impl Collection {
// Extract and remove order value from payload
.map(|records| {
records.into_iter().map(|mut record| {
let value =
order_by.remove_order_value_from_payload(record.payload.as_mut());
let value;
if local_only {
value =
order_by.get_order_value_from_payload(record.payload.as_ref());
} else {
value = order_by
.remove_order_value_from_payload(record.payload.as_mut());
if !with_payload_interface.is_required() {
// Use None instead of empty hashmap
record.payload = None;
}
};
(value, record)
})
})
Expand Down
5 changes: 4 additions & 1 deletion lib/collection/src/shards/remote_shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -678,10 +678,13 @@ impl ShardOperation for RemoteShard {
.await?
.into_inner();

// We need the `____ordered_with____` value even if the user didn't request payload
let parse_payload = with_payload_interface.is_required() || order_by.is_some();

let result: Result<Vec<Record>, Status> = scroll_response
.result
.into_iter()
.map(|point| try_record_from_grpc(point, with_payload_interface.is_required()))
.map(|point| try_record_from_grpc(point, parse_payload))
.collect();

result.map_err(|e| e.into())
Expand Down
69 changes: 61 additions & 8 deletions lib/segment/src/data_types/order_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ impl OrderBy {
StartFrom::Datetime(dt) => OrderingValue::Int(dt.timestamp()),
})
.unwrap_or_else(|| match self.direction() {
Direction::Asc => OrderingValue::Int(std::i64::MIN),
Direction::Desc => OrderingValue::Int(std::i64::MAX),
Direction::Asc => OrderingValue::MIN,
Direction::Desc => OrderingValue::MAX,
})
}

Expand All @@ -109,22 +109,39 @@ impl OrderBy {
new_payload
}

pub fn remove_order_value_from_payload(&self, payload: Option<&mut Payload>) -> f64 {
payload
.and_then(|payload| payload.0.remove(INTERNAL_KEY_OF_ORDER_BY_VALUE))
.and_then(|v| v.as_f64())
fn json_value_to_ordering_value(&self, value: Option<serde_json::Value>) -> OrderingValue {
value
.and_then(|v| OrderingValue::try_from(v).ok())
.unwrap_or_else(|| match self.direction() {
Direction::Asc => std::f64::MAX,
Direction::Desc => std::f64::MIN,
Direction::Asc => OrderingValue::MAX,
Direction::Desc => OrderingValue::MIN,
})
}

pub fn get_order_value_from_payload(&self, payload: Option<&Payload>) -> OrderingValue {
self.json_value_to_ordering_value(
payload.and_then(|payload| payload.0.get(INTERNAL_KEY_OF_ORDER_BY_VALUE).cloned()),
)
}

pub fn remove_order_value_from_payload(&self, payload: Option<&mut Payload>) -> OrderingValue {
self.json_value_to_ordering_value(
payload.and_then(|payload| payload.0.remove(INTERNAL_KEY_OF_ORDER_BY_VALUE)),
)
}
}

#[derive(Debug)]
pub enum OrderingValue {
Float(FloatPayloadType),
Int(IntPayloadType),
}

impl OrderingValue {
const MAX: Self = Self::Float(f64::NAN);
const MIN: Self = Self::Float(f64::MIN);
}

impl From<OrderingValue> for serde_json::Value {
fn from(value: OrderingValue) -> Self {
match value {
Expand All @@ -136,6 +153,18 @@ impl From<OrderingValue> for serde_json::Value {
}
}

impl TryFrom<serde_json::Value> for OrderingValue {
type Error = ();

fn try_from(value: serde_json::Value) -> Result<Self, Self::Error> {
value
.as_i64()
.map(Self::from)
.or_else(|| value.as_f64().map(Self::from))
.ok_or(())
}
}

impl From<FloatPayloadType> for OrderingValue {
fn from(value: FloatPayloadType) -> Self {
OrderingValue::Float(value)
Expand Down Expand Up @@ -188,3 +217,27 @@ impl Ord for OrderingValue {
}
}
}

#[cfg(test)]
mod tests {
use proptest::proptest;

use crate::data_types::order_by::OrderingValue;

proptest! {

#[test]
fn test_min_ordering_value(a in i64::MIN..0, b in f64::MIN..0.0) {
assert!(OrderingValue::MIN.cmp(&OrderingValue::from(a)).is_le());
assert!(OrderingValue::MIN.cmp(&OrderingValue::from(b)).is_le());
assert!(OrderingValue::MIN.cmp(&OrderingValue::from(f64::NAN)).is_le());
}

#[test]
fn test_max_ordering_value(a in 0..i64::MAX, b in 0.0..f64::MAX) {
assert!(OrderingValue::MAX.cmp(&OrderingValue::from(a)).is_ge());
assert!(OrderingValue::MAX.cmp(&OrderingValue::from(b)).is_ge());
assert!(OrderingValue::MAX.cmp(&OrderingValue::from(f64::NAN)).is_ge());
}
}
}
163 changes: 163 additions & 0 deletions tests/consensus_tests/test_order_by.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
import requests

from .assertions import assert_http_ok
from .fixtures import create_collection
from .utils import every_test, start_cluster

COLL_NAME = "test_collection"


def test_order_by_from_remote_shard(tmp_path, every_test):
peer_api_uris, peer_dirs, bootstrap_uri = start_cluster(tmp_path, num_peers=2, port_seed=10000)

uri = peer_api_uris[0]

create_collection(uri, collection=COLL_NAME, shard_number=2, replication_factor=1)

requests.put(
f"{uri}/collections/{COLL_NAME}/index",
json={"field_name": "index", "field_schema": "integer"},
).raise_for_status()

res = requests.put(
f"{uri}/collections/{COLL_NAME}/points",
params={
"wait": "true",
},
json={
"points": [
{
"id": 0,
"payload": {
"index": 0,
"timestamp": "2024-04-12T08:40:06.189301",
"text": "Italy: tomatoes, olive oil, pasta.",
"title": "food",
},
"vector": {},
}
]
},
)
assert_http_ok(res)

requests.put(
f"{uri}/collections/{COLL_NAME}/points",
params={
"wait": "true",
},
json={
"batch": {
"ids": [11, 12, 13, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
"payloads": [
{
"timestamp": "2024-04-12T08:40:20.015587",
"text": "Japan: Polite, punctual, public transport reliance",
"index": 11,
"title": "travel",
},
{
"timestamp": "2024-04-12T08:40:20.290047",
"text": "Italy: Relaxed, historical tours, slow-paced",
"index": 12,
"title": "travel",
},
{
"timestamp": "2024-04-12T08:40:20.562687",
"text": "USA: Road trips, diverse landscapes.",
"index": 13,
"title": "travel",
},
{
"timestamp": "2024-04-12T08:40:09.482208",
"text": "Japan: seafood-centric with artistic presentation.",
"index": 1,
"title": "food",
},
{
"timestamp": "2024-04-12T08:40:09.744967",
"text": "Mexico: beans, chili peppers, and meat.",
"index": 2,
"title": "food",
},
{
"timestamp": "2024-04-12T08:40:10.020073",
"text": "India: diverse vegetarian and meat dishes.",
"index": 3,
"title": "food",
},
{
"timestamp": "2024-04-12T08:40:10.285342",
"text": "France: Gourmet dining, focuses on cheese, wine",
"index": 4,
"title": "food",
},
{
"timestamp": "2024-04-12T08:40:10.883714",
"text": "China: diverse cuisines, emphasizes balance, uses rice",
"index": 5,
"title": "food",
},
{
"timestamp": "2024-04-12T08:40:13.166867",
"text": "Thailand: five flavors, spicy, sweet, salty, sour.",
"index": 6,
"title": "food",
},
{
"timestamp": "2024-04-12T08:40:15.436143",
"text": "USA: Melting pot, diverse, large portions, fast food.",
"index": 7,
"title": "food",
},
{
"timestamp": "2024-04-12T08:40:15.697163",
"text": "Brazil: barbecue heavy",
"index": 8,
"title": "food",
},
{
"timestamp": "2024-04-12T08:40:15.963504",
"text": "Greece: Olive oil, feta, yogurt, seafood",
"index": 9,
"title": "food",
},
{
"timestamp": "2024-04-12T08:40:16.738832",
"text": "Ethiopia: bread with spicy stews and vegetables",
"index": 10,
"title": "food",
},
],
"vectors": {},
}
},
).raise_for_status()

res = requests.post(
f"{uri}/collections/{COLL_NAME}/points/scroll", json={"limit": 10, "order_by": "index"}
)

assert_http_ok(res)

for point in res.json()["result"]["points"]:
assert point["payload"] is not None

values = [point["payload"]["index"] for point in res.json()["result"]["points"]]

assert values == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

# It should also work when with_payload is False
res = requests.post(
f"{uri}/collections/{COLL_NAME}/points/scroll",
json={"limit": 10, "order_by": "index", "with_payload": False},
)

assert_http_ok(res)

ids = [point["id"] for point in res.json()["result"]["points"]]

assert ids == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

for point in res.json()["result"]["points"]:
assert point["payload"] is None

0 comments on commit ef5b606

Please sign in to comment.