Skip to content

Commit

Permalink
Fix values count loading in geoindex (qdrant#1563)
Browse files Browse the repository at this point in the history
* fix values count loading in geoindex

* WIP: fix loading statistics in geo-index

* WIP: fmt

---------

Co-authored-by: Andrey Vasnetsov <andrey@vasnetsov.com>
  • Loading branch information
IvanPleshkov and generall committed Mar 15, 2023
1 parent 36c46a8 commit 97fafce
Show file tree
Hide file tree
Showing 5 changed files with 383 additions and 38 deletions.
1 change: 1 addition & 0 deletions lib/collection/src/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod snapshot_test;
mod wal_recovery_test;

use std::sync::Arc;
use std::time::Duration;
Expand Down
2 changes: 1 addition & 1 deletion lib/collection/src/tests/snapshot_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::shards::channel_service::ChannelService;
use crate::shards::collection_shard_distribution::CollectionShardDistribution;
use crate::shards::replica_set::ChangePeerState;

const TEST_OPTIMIZERS_CONFIG: OptimizersConfig = OptimizersConfig {
pub const TEST_OPTIMIZERS_CONFIG: OptimizersConfig = OptimizersConfig {
deleted_threshold: 0.9,
vacuum_min_vector_number: 1000,
default_segment_number: 2,
Expand Down
187 changes: 187 additions & 0 deletions lib/collection/src/tests/wal_recovery_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
use std::num::{NonZeroU32, NonZeroU64};
use std::sync::Arc;

use segment::types::{Distance, PayloadFieldSchema, PayloadSchemaType};
use tempfile::Builder;
use tokio::runtime::Handle;
use tokio::sync::RwLock;

use crate::config::{CollectionConfig, CollectionParams, WalConfig};
use crate::operations::point_ops::{PointOperations, PointStruct};
use crate::operations::types::{VectorParams, VectorsConfig};
use crate::operations::{CollectionUpdateOperations, CreateIndex, FieldIndexOperations};
use crate::shards::local_shard::LocalShard;
use crate::shards::shard_trait::ShardOperation;
use crate::tests::snapshot_test::TEST_OPTIMIZERS_CONFIG;

fn create_collection_config() -> CollectionConfig {
let wal_config = WalConfig {
wal_capacity_mb: 1,
wal_segments_ahead: 0,
};

let collection_params = CollectionParams {
vectors: VectorsConfig::Single(VectorParams {
size: NonZeroU64::new(4).unwrap(),
distance: Distance::Dot,
}),
shard_number: NonZeroU32::new(1).unwrap(),
replication_factor: NonZeroU32::new(1).unwrap(),
write_consistency_factor: NonZeroU32::new(1).unwrap(),
on_disk_payload: false,
};

let mut optimizer_config = TEST_OPTIMIZERS_CONFIG.clone();

optimizer_config.default_segment_number = 1;
optimizer_config.flush_interval_sec = 0;

CollectionConfig {
params: collection_params,
optimizer_config,
wal_config,
hnsw_config: Default::default(),
quantization_config: Default::default(),
}
}

fn upsert_operation() -> CollectionUpdateOperations {
CollectionUpdateOperations::PointOperation(
vec![
PointStruct {
id: 1.into(),
vector: vec![1.0, 2.0, 3.0, 4.0].into(),
payload: Some(
serde_json::from_str(r#"{ "location": { "lat": 10.12, "lon": 32.12 } }"#).unwrap(),
),
},
PointStruct {
id: 2.into(),
vector: vec![2.0, 1.0, 3.0, 4.0].into(),
payload: Some(
serde_json::from_str(r#"{ "location": { "lat": 11.12, "lon": 34.82 } }"#).unwrap(),
),
},
PointStruct {
id: 3.into(),
vector: vec![3.0, 2.0, 1.0, 4.0].into(),
payload: Some(
serde_json::from_str(r#"{ "location": [ { "lat": 12.12, "lon": 34.82 }, { "lat": 12.2, "lon": 12.82 }] }"#).unwrap(),
),
},
PointStruct {
id: 4.into(),
vector: vec![4.0, 2.0, 3.0, 1.0].into(),
payload: Some(
serde_json::from_str(r#"{ "location": { "lat": 13.12, "lon": 34.82 } }"#).unwrap(),
),
},
PointStruct {
id: 5.into(),
vector: vec![5.0, 2.0, 3.0, 4.0].into(),
payload: Some(
serde_json::from_str(r#"{ "location": { "lat": 14.12, "lon": 32.12 } }"#).unwrap(),
),
},

]
.into(),
)
}

fn create_payload_index_operation() -> CollectionUpdateOperations {
CollectionUpdateOperations::FieldIndexOperation(FieldIndexOperations::CreateIndex(
CreateIndex {
field_name: "location".to_string(),
field_schema: Some(PayloadFieldSchema::FieldType(PayloadSchemaType::Geo)),
},
))
}

fn delete_point_operation(idx: u64) -> CollectionUpdateOperations {
CollectionUpdateOperations::PointOperation(PointOperations::DeletePoints {
ids: vec![idx.into()],
})
}

#[tokio::test]
async fn test_delete_from_indexed_payload() {
let collection_dir = Builder::new().prefix("test_collection").tempdir().unwrap();

let config = create_collection_config();

let collection_name = "test".to_string();

let current_runtime: Handle = Handle::current();

let mut shard = LocalShard::build(
0,
collection_name.clone(),
collection_dir.path(),
Arc::new(RwLock::new(config.clone())),
Arc::new(Default::default()),
current_runtime.clone(),
)
.await
.unwrap();

let upsert_ops = upsert_operation();

shard.update(upsert_ops, true).await.unwrap();

let index_op = create_payload_index_operation();

shard.update(index_op, true).await.unwrap();

let delete_point_op = delete_point_operation(4);
shard.update(delete_point_op, true).await.unwrap();

let info = shard.info().await.unwrap();
eprintln!("info = {:#?}", info.payload_schema);
let number_of_indexed_points = info.payload_schema.get("location").unwrap().points;

shard.before_drop().await;

drop(shard);

let mut shard = LocalShard::load(
0,
collection_name.clone(),
collection_dir.path(),
Arc::new(RwLock::new(config.clone())),
Arc::new(Default::default()),
current_runtime.clone(),
)
.await
.unwrap();

tokio::time::sleep(std::time::Duration::from_secs(1)).await;

eprintln!("dropping point 5");
let delete_point_op = delete_point_operation(5);
shard.update(delete_point_op, true).await.unwrap();

shard.before_drop().await;
drop(shard);

let mut shard = LocalShard::load(
0,
collection_name,
collection_dir.path(),
Arc::new(RwLock::new(config)),
Arc::new(Default::default()),
current_runtime,
)
.await
.unwrap();

let info = shard.info().await.unwrap();
eprintln!("info = {:#?}", info.payload_schema);

let number_of_indexed_points_after_load = info.payload_schema.get("location").unwrap().points;

shard.before_drop().await;

assert_eq!(number_of_indexed_points, 4);
assert_eq!(number_of_indexed_points_after_load, 3);
}
145 changes: 108 additions & 37 deletions lib/segment/src/index/field_index/geo_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,94 @@ impl GeoMapIndex {
self.db_wrapper.recreate_column_family()
}

fn increment_hash_value_counts(&mut self, geo_hash: &GeoHash) {
for i in 0..=geo_hash.len() {
let sub_geo_hash = &geo_hash[0..i];
match self.values_per_hash.get_mut(sub_geo_hash) {
None => {
self.values_per_hash.insert(sub_geo_hash.to_string(), 1);
}
Some(count) => {
*count += 1;
}
};
}
}

fn decrement_hash_value_counts(&mut self, geo_hash: &GeoHash) {
for i in 0..=geo_hash.len() {
let sub_geo_hash = &geo_hash[0..i];
match self.values_per_hash.get_mut(sub_geo_hash) {
None => {
debug_assert!(
false,
"Hash value count is not found for hash: {}",
sub_geo_hash
);
self.values_per_hash.insert(sub_geo_hash.to_string(), 0);
}
Some(count) => {
*count -= 1;
}
};
}
}

fn increment_hash_point_counts(&mut self, geo_hashes: &[GeoHash]) {
let mut seen_hashes: HashSet<&str> = Default::default();

for geo_hash in geo_hashes {
for i in 0..=geo_hash.len() {
let sub_geo_hash = &geo_hash[0..i];
if seen_hashes.contains(sub_geo_hash) {
continue;
}
seen_hashes.insert(sub_geo_hash);
match self.points_per_hash.get_mut(sub_geo_hash) {
None => {
self.points_per_hash.insert(sub_geo_hash.to_string(), 1);
}
Some(count) => {
*count += 1;
}
};
}
}
}

fn decrement_hash_point_counts(&mut self, geo_hashes: &[GeoHash]) {
let mut seen_hashes: HashSet<&str> = Default::default();
for geo_hash in geo_hashes {
for i in 0..=geo_hash.len() {
let sub_geo_hash = &geo_hash[0..i];
if seen_hashes.contains(sub_geo_hash) {
continue;
}
seen_hashes.insert(sub_geo_hash);
match self.points_per_hash.get_mut(sub_geo_hash) {
None => {
debug_assert!(
false,
"Hash point count is not found for hash: {}",
sub_geo_hash
);
self.points_per_hash.insert(sub_geo_hash.to_string(), 0);
}
Some(count) => {
*count -= 1;
}
};
}
}
}

fn load(&mut self) -> OperationResult<bool> {
if !self.db_wrapper.has_column_family()? {
return Ok(false);
};

let mut points_to_hashes: BTreeMap<PointOffsetType, Vec<GeoHash>> = Default::default();

for (key, value) in self.db_wrapper.lock_db().iter()? {
let key_str = std::str::from_utf8(&key).map_err(|_| {
OperationError::service_error("Index load error: UTF8 error while DB parsing")
Expand All @@ -102,8 +185,26 @@ impl GeoMapIndex {
self.points_count += 1;
}

points_to_hashes
.entry(idx)
.or_default()
.push(geo_hash.clone());

self.point_to_values[idx as usize].push(geo_point);
self.points_map.entry(geo_hash).or_default().insert(idx);
self.points_map
.entry(geo_hash.clone())
.or_default()
.insert(idx);

self.values_count += 1;
}

for (_idx, geo_hashes) in points_to_hashes.into_iter() {
self.max_values_per_point = max(self.max_values_per_point, geo_hashes.len());
self.increment_hash_point_counts(&geo_hashes);
for geo_hash in geo_hashes {
self.increment_hash_value_counts(&geo_hash);
}
}
Ok(true)
}
Expand Down Expand Up @@ -233,7 +334,6 @@ impl GeoMapIndex {
self.points_count -= 1;
self.values_count -= removed_points.len();

let mut seen_hashes: HashSet<&str> = Default::default();
let mut geo_hashes = vec![];

for removed_point in removed_points {
Expand Down Expand Up @@ -263,20 +363,11 @@ impl GeoMapIndex {
self.points_map.remove(removed_geo_hash);
}

for i in 0..=removed_geo_hash.len() {
let sub_geo_hash = &removed_geo_hash[0..i];
if let Some(count) = self.values_per_hash.get_mut(sub_geo_hash) {
*count -= 1;
}
if !seen_hashes.contains(sub_geo_hash) {
if let Some(count) = self.points_per_hash.get_mut(sub_geo_hash) {
*count -= 1;
}
seen_hashes.insert(sub_geo_hash);
}
}
self.decrement_hash_value_counts(removed_geo_hash);
}

self.decrement_hash_point_counts(&geo_hashes);

Ok(())
}

Expand All @@ -296,7 +387,6 @@ impl GeoMapIndex {

self.point_to_values[idx as usize] = values.to_vec();

let mut seen_hashes: HashSet<&str> = Default::default();
let mut geo_hashes = vec![];

for added_point in values {
Expand All @@ -317,30 +407,11 @@ impl GeoMapIndex {
.or_insert_with(HashSet::new)
.insert(idx);

for i in 0..=geo_hash.len() {
let sub_geo_hash = &geo_hash[0..i];
match self.values_per_hash.get_mut(sub_geo_hash) {
None => {
self.values_per_hash.insert(sub_geo_hash.to_string(), 1);
}
Some(count) => {
*count += 1;
}
};
if !seen_hashes.contains(sub_geo_hash) {
match self.points_per_hash.get_mut(sub_geo_hash) {
None => {
self.points_per_hash.insert(sub_geo_hash.to_string(), 1);
}
Some(count) => {
*count += 1;
}
}
seen_hashes.insert(sub_geo_hash);
}
}
self.increment_hash_value_counts(geo_hash);
}

self.increment_hash_point_counts(&geo_hashes);

self.values_count += values.len();
self.points_count += 1;
self.max_values_per_point = self.max_values_per_point.max(values.len());
Expand Down
Loading

0 comments on commit 97fafce

Please sign in to comment.