Skip to content

Commit

Permalink
Compatibility for mmap sparse vectors (qdrant#5454)
Browse files Browse the repository at this point in the history
* implement mmap sparse vector storage

* add to VectorStorageEnum

* clippy

* add tests, fix both simple and mmap storages

* smol correction on total_vector_count

* add sparse storage type to config

* fix reading config without storage type

* generate openapi

* use blob_store by path

* hidden setting to enable new storage

* validate existing path in `BlobStore::open()`

* use new dir for each sparse vector name

* fix and rename `max_point_offset`

Plus some extra refactors

* add storage compat test, to always check both storages work

* fix opening of storage + other misc fixes

* FIX!!!

`Unset` operations in the Tracker weren't updating the
`next_pointer_id`. So, when reopening the storage, those points wouldn't
get marked as deleted in the bitslice, thus creating the illusion that
they should exist, when they did not.

* refactor naming from `iter_*` to `for_each_*`

* fix checking for BlobStore existance

* fix typo

* fix error message

* better docs for open_or_create

* fix after rebase
  • Loading branch information
coszio authored and timvisee committed Dec 9, 2024
1 parent 716e85e commit c10c145
Show file tree
Hide file tree
Showing 32 changed files with 632 additions and 46 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions docs/redoc/master/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -11527,6 +11527,9 @@
"properties": {
"index": {
"$ref": "#/components/schemas/SparseIndexConfig"
},
"storage_type": {
"$ref": "#/components/schemas/SparseVectorStorageType"
}
}
},
Expand Down Expand Up @@ -11586,6 +11589,24 @@
}
]
},
"SparseVectorStorageType": {
"oneOf": [
{
"description": "Storage on disk",
"type": "string",
"enum": [
"on_disk"
]
},
{
"description": "Storage in memory maps",
"type": "string",
"enum": [
"mmap"
]
}
]
},
"PayloadStorageType": {
"description": "Type of payload storage",
"oneOf": [
Expand Down
58 changes: 57 additions & 1 deletion lib/blob_store/src/blob_store.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::ops::ControlFlow;
use std::path::PathBuf;

use io::file_operations::atomic_save_json;
Expand Down Expand Up @@ -75,6 +76,22 @@ impl<V: Blob> BlobStore<V> {
self.tracker.read().pointer_count()
}

/// Opens an existing storage, or initializes a new one.
/// Depends on the existence of the config file at the `base_path`.
///
/// In case of opening, it ignores the `create_options` parameter.
pub fn open_or_create(base_path: PathBuf, create_options: StorageOptions) -> Result<Self> {
let config_path = base_path.join(CONFIG_FILENAME);
if config_path.exists() {
Self::open(base_path)
} else {
// create folder if it does not exist
std::fs::create_dir_all(&base_path)
.map_err(|err| format!("Failed to create blob_store storage directory: {err}"))?;
Self::new(base_path, create_options)
}
}

/// Initializes a new storage with a single empty page.
///
/// `base_path` is the directory where the storage files will be stored.
Expand Down Expand Up @@ -114,6 +131,13 @@ impl<V: Blob> BlobStore<V> {
/// Open an existing storage at the given path
/// Returns None if the storage does not exist
pub fn open(path: PathBuf) -> Result<Self> {
if !path.exists() {
return Err(format!("Path '{path:?}' does not exist"));
}
if !path.is_dir() {
return Err(format!("Path '{path:?}' is not a directory"));
}

// read config file first
let config_path = path.join(CONFIG_FILENAME);
let config_file = std::fs::File::open(&config_path).map_err(|err| err.to_string())?;
Expand Down Expand Up @@ -382,7 +406,14 @@ impl<V: Blob> BlobStore<V> {
where
F: FnMut(PointOffset, &V) -> std::io::Result<bool>,
{
for (point_offset, pointer) in self.tracker.read().iter_pointers().flatten() {
for (point_offset, pointer) in
self.tracker
.read()
.iter_pointers()
.filter_map(|(point_offset, opt_pointer)| {
opt_pointer.map(|pointer| (point_offset, pointer))
})
{
let ValuePointer {
page_id,
block_offset,
Expand All @@ -403,6 +434,31 @@ impl<V: Blob> BlobStore<V> {
pub fn get_storage_size_bytes(&self) -> usize {
self.bitmask.read().get_storage_size_bytes()
}

/// Iterate over all the values in the storage, including deleted ones
pub fn for_each_unfiltered<F>(&self, mut callback: F) -> Result<()>
where
F: FnMut(PointOffset, Option<&V>) -> ControlFlow<String, ()>,
{
for (point_offset, opt_pointer) in self.tracker.read().iter_pointers() {
let value = opt_pointer.map(
|ValuePointer {
page_id,
block_offset,
length,
}| {
let raw = self.read_from_pages(page_id, block_offset, length);
let decompressed = Self::decompress(&raw);
V::from_bytes(&decompressed)
},
);
match callback(point_offset, value.as_ref()) {
ControlFlow::Continue(()) => continue,
ControlFlow::Break(message) => return Err(message),
}
}
Ok(())
}
}

impl<V> BlobStore<V> {
Expand Down
5 changes: 3 additions & 2 deletions lib/blob_store/src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,8 @@ impl Tracker {
}

/// Iterate over the pointers in the tracker
pub fn iter_pointers(&self) -> impl Iterator<Item = Option<(PointOffset, ValuePointer)>> + '_ {
(0..self.next_pointer_offset).map(move |i| self.get(i as PointOffset).map(|p| (i, p)))
pub fn iter_pointers(&self) -> impl Iterator<Item = (PointOffset, Option<ValuePointer>)> + '_ {
(0..self.next_pointer_offset).map(move |i| (i, self.get(i as PointOffset)))
}

/// Get the raw value at the given point offset
Expand Down Expand Up @@ -293,6 +293,7 @@ impl Tracker {
self.pending_updates
.insert(point_offset, PointerUpdate::Unset(pointer));
}
self.next_pointer_offset = self.next_pointer_offset.max(point_offset + 1);

pointer_opt
}
Expand Down
9 changes: 9 additions & 0 deletions lib/collection/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ pub struct CollectionParams {
// TODO: remove this setting after integration is finished
#[serde(skip)]
pub on_disk_payload_uses_mmap: bool,
// TODO: remove this setting after integration is finished
#[serde(skip)]
pub on_disk_sparse_vectors_uses_mmap: bool,
/// Configuration of the sparse vector storage
#[serde(default, skip_serializing_if = "Option::is_none")]
#[validate(nested)]
Expand Down Expand Up @@ -137,6 +140,7 @@ impl CollectionParams {
read_fan_out_factor: _, // May be changed
on_disk_payload: _, // May be changed
on_disk_payload_uses_mmap: _, // Temporary
on_disk_sparse_vectors_uses_mmap: _, // Temporary
sparse_vectors, // Parameters may be changes, but not the structure
} = other;

Expand Down Expand Up @@ -188,6 +192,7 @@ impl Anonymize for CollectionParams {
read_fan_out_factor: self.read_fan_out_factor,
on_disk_payload: self.on_disk_payload,
on_disk_payload_uses_mmap: self.on_disk_payload_uses_mmap,
on_disk_sparse_vectors_uses_mmap: self.on_disk_sparse_vectors_uses_mmap,
sparse_vectors: self.sparse_vectors.anonymize(),
}
}
Expand Down Expand Up @@ -274,6 +279,7 @@ impl CollectionParams {
read_fan_out_factor: None,
on_disk_payload: default_on_disk_payload(),
on_disk_payload_uses_mmap: false,
on_disk_sparse_vectors_uses_mmap: false,
sparse_vectors: None,
}
}
Expand Down Expand Up @@ -500,6 +506,9 @@ impl CollectionParams {
.and_then(|index| index.datatype)
.map(VectorStorageDatatype::from),
},
// Not configurable by user (at this point). When we switch the default, it will be switched here too.
storage_type: params
.storage_type(self.on_disk_sparse_vectors_uses_mmap),
},
))
})
Expand Down
1 change: 1 addition & 0 deletions lib/collection/src/operations/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1788,6 +1788,7 @@ impl TryFrom<api::grpc::qdrant::CollectionConfig> for CollectionConfig {
.map(sharding_method_from_proto)
.transpose()?,
on_disk_payload_uses_mmap: false,
on_disk_sparse_vectors_uses_mmap: false,
},
},
hnsw_config: match config.hnsw_config {
Expand Down
15 changes: 13 additions & 2 deletions lib/collection/src/operations/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ use segment::data_types::vectors::{
};
use segment::types::{
Distance, Filter, HnswConfig, MultiVectorConfig, Payload, PayloadIndexInfo, PayloadKeyType,
PointIdType, QuantizationConfig, SearchParams, SeqNumberType, ShardKey, StrictModeConfig,
VectorStorageDatatype, WithPayloadInterface, WithVector,
PointIdType, QuantizationConfig, SearchParams, SeqNumberType, ShardKey,
SparseVectorStorageType, StrictModeConfig, VectorStorageDatatype, WithPayloadInterface,
WithVector,
};
use semver::Version;
use serde;
Expand Down Expand Up @@ -1497,6 +1498,16 @@ pub struct SparseVectorParams {
pub modifier: Option<Modifier>,
}

impl SparseVectorParams {
pub fn storage_type(&self, use_new_storage: bool) -> SparseVectorStorageType {
if use_new_storage {
SparseVectorStorageType::Mmap
} else {
SparseVectorStorageType::default()
}
}
}

impl Anonymize for SparseVectorParams {
fn anonymize(&self) -> Self {
Self {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,9 @@ impl GpuVectorStorage {
VectorStorageEnum::SparseSimple(_) => Err(OperationError::from(
gpu::GpuError::NotSupported("Sparse vectors are not supported on GPU".to_string()),
)),
VectorStorageEnum::SparseMmap(_) => Err(OperationError::from(
gpu::GpuError::NotSupported("Sparse vectors are not supported on GPU".to_string()),
)),
VectorStorageEnum::MultiDenseSimple(vector_storage) => Self::new_multi_f32(
device.clone(),
vector_storage,
Expand Down
2 changes: 1 addition & 1 deletion lib/segment/src/payload_storage/mmap_payload_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl PayloadStorage for MmapPayloadStorage {
fn flusher(&self) -> Flusher {
let storage = self.storage.clone();
Box::new(move || {
storage.write().flush().map_err(|err| {
storage.read().flush().map_err(|err| {
OperationError::service_error(format!(
"Failed to flush mmap payload storage: {err}"
))
Expand Down
16 changes: 10 additions & 6 deletions lib/segment/src/segment_constructor/segment_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,17 @@ impl SegmentBuilder {
vector_storages.insert(vector_name.to_owned(), vector_storage);
}

#[allow(clippy::for_kv_map)]
for (vector_name, _sparse_vector_config) in &segment_config.sparse_vector_data {
// `_sparse_vector_config` should be used, once we are able to initialize storage with
// different datatypes
for (vector_name, sparse_vector_config) in &segment_config.sparse_vector_data {
let vector_storage_path = get_vector_storage_path(temp_dir.path(), vector_name);

let vector_storage = create_sparse_vector_storage(
database.clone(),
&vector_storage_path,
vector_name,
&sparse_vector_config.storage_type,
&stopped,
)?;

let vector_storage =
create_sparse_vector_storage(database.clone(), vector_name, &stopped)?;
vector_storages.insert(vector_name.to_owned(), vector_storage);
}

Expand Down
38 changes: 30 additions & 8 deletions lib/segment/src/segment_constructor/segment_constructor_base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::payload_storage::simple_payload_storage::SimplePayloadStorage;
use crate::segment::{Segment, SegmentVersion, VectorData, SEGMENT_STATE_FILE};
use crate::types::{
Distance, Indexes, PayloadStorageType, SegmentConfig, SegmentState, SegmentType, SeqNumberType,
VectorDataConfig, VectorStorageDatatype, VectorStorageType,
SparseVectorStorageType, VectorDataConfig, VectorStorageDatatype, VectorStorageType,
};
use crate::vector_storage::dense::appendable_dense_vector_storage::{
open_appendable_in_ram_vector_storage, open_appendable_in_ram_vector_storage_byte,
Expand All @@ -49,6 +49,7 @@ use crate::vector_storage::dense::simple_dense_vector_storage::{
open_simple_dense_byte_vector_storage, open_simple_dense_half_vector_storage,
open_simple_dense_vector_storage,
};
use crate::vector_storage::mmap_sparse_vector_storage::MmapSparseVectorStorage;
use crate::vector_storage::multi_dense::appendable_mmap_multi_dense_vector_storage::{
open_appendable_in_ram_multi_vector_storage, open_appendable_in_ram_multi_vector_storage_byte,
open_appendable_in_ram_multi_vector_storage_half, open_appendable_memmap_multi_vector_storage,
Expand Down Expand Up @@ -312,8 +313,14 @@ pub(crate) fn open_segment_db(
.chain(
config
.sparse_vector_data
.keys()
.map(|vector_name| get_vector_name_with_prefix(DB_VECTOR_CF, vector_name)),
.iter()
.filter(|(_, sparse_vector_config)| {
matches!(
sparse_vector_config.storage_type,
SparseVectorStorageType::OnDisk
)
})
.map(|(vector_name, _)| get_vector_name_with_prefix(DB_VECTOR_CF, vector_name)),
)
.collect();
open_db(segment_path, &vector_db_names)
Expand Down Expand Up @@ -451,11 +458,21 @@ pub(crate) fn create_sparse_vector_index(

pub(crate) fn create_sparse_vector_storage(
database: Arc<RwLock<DB>>,
path: &Path,
vector_name: &str,
storage_type: &SparseVectorStorageType,
stopped: &AtomicBool,
) -> OperationResult<VectorStorageEnum> {
let db_column_name = get_vector_name_with_prefix(DB_VECTOR_CF, vector_name);
open_simple_sparse_vector_storage(database, &db_column_name, stopped)
match storage_type {
SparseVectorStorageType::OnDisk => {
let db_column_name = get_vector_name_with_prefix(DB_VECTOR_CF, vector_name);
open_simple_sparse_vector_storage(database, &db_column_name, stopped)
}
SparseVectorStorageType::Mmap => {
let mmap_storage = MmapSparseVectorStorage::open_or_create(path, stopped)?;
Ok(VectorStorageEnum::SparseMmap(mmap_storage))
}
}
}

fn create_segment(
Expand Down Expand Up @@ -492,21 +509,26 @@ fn create_segment(
let vector_storage_path = get_vector_storage_path(segment_path, vector_name);

// Select suitable vector storage type based on configuration
let vector_storage = Arc::new(AtomicRefCell::new(open_vector_storage(
let vector_storage = sp(open_vector_storage(
&database,
vector_config,
stopped,
&vector_storage_path,
vector_name,
)?));
)?);

vector_storages.insert(vector_name.to_owned(), vector_storage);
}

for vector_name in config.sparse_vector_data.keys() {
for (vector_name, sparse_config) in config.sparse_vector_data.iter() {
let vector_storage_path = get_vector_storage_path(segment_path, vector_name);

// Select suitable sparse vector storage type based on configuration
let vector_storage = sp(create_sparse_vector_storage(
database.clone(),
&vector_storage_path,
vector_name,
&sparse_config.storage_type,
stopped,
)?);

Expand Down
1 change: 1 addition & 0 deletions lib/segment/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ impl Anonymize for SparseVectorDataConfig {
fn anonymize(&self) -> Self {
SparseVectorDataConfig {
index: self.index.anonymize(),
storage_type: self.storage_type,
}
}
}
Expand Down
Loading

0 comments on commit c10c145

Please sign in to comment.