Skip to content

Commit

Permalink
fix oom issues and add logs to monitor memory usage under high conten…
Browse files Browse the repository at this point in the history
…tion (qdrant#4994)

* fix oom issues and add logs to monitor memory usage under high contention

---------

Signed-off-by: Vamshi Maskuri <117595548+varshith257@users.noreply.github.com>
  • Loading branch information
varshith257 authored and timvisee committed Sep 17, 2024
1 parent 7cfdd04 commit 6f72e78
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 4 deletions.
18 changes: 16 additions & 2 deletions lib/collection/src/operations/snapshot_storage_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::fs::File;
use std::io::{BufReader, Read};
use std::path::{Path, PathBuf};

use common::cpu::CpuBudget;
use futures::StreamExt;
use object_store::WriteMultipart;
use tokio::io::AsyncWriteExt;
Expand Down Expand Up @@ -90,7 +91,7 @@ pub async fn get_appropriate_chunk_size(local_source_path: &Path) -> CollectionR
// check if the file size exceeds the maximum part number
// if so, adjust the chunk size to fit the maximum part number
if file_size > DEFAULT_CHUNK_SIZE * MAX_PART_NUMBER {
let chunk_size = (file_size - 1 / MAX_PART_NUMBER) + 1; // ceil(file_size / MAX_PART_NUMBER)
let chunk_size = ((file_size - 1) / MAX_PART_NUMBER) + 1; // ceil((file_size) / MAX_PART_NUMBER)
return Ok(chunk_size);
}
Ok(DEFAULT_CHUNK_SIZE)
Expand All @@ -113,6 +114,10 @@ pub async fn multipart_upload(
let mut reader = BufReader::new(file);
let mut buffer = vec![0u8; chunk_size];

// Initialize CpuBudget to manage concurrency
let cpu_budget = CpuBudget::default();
let max_concurrency = cpu_budget.available_cpu_budget();

// Note:
// 1. write.write() is sync but a worker thread is spawned internally.
// 2. write.finish() will wait for all the worker threads to finish.
Expand All @@ -121,7 +126,16 @@ pub async fn multipart_upload(
break;
}
let buffer = &buffer[..bytes_read];
write.write(buffer); // 1. write.write() is sync but a worker thread is spawned internally.

// Wait for capacity before writing the buffer
write
.wait_for_capacity(max_concurrency)
.await
.map_err(|e| {
CollectionError::service_error(format!("Failed to wait for capacity: {e}"))
})?;

write.write(buffer);
}
write
.finish() // 2. write.finish() will wait for all the worker threads to finish.
Expand Down
7 changes: 6 additions & 1 deletion lib/common/common/src/cpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ impl CpuBudget {
}
}

/// Returns the total CPU budget.
pub fn available_cpu_budget(&self) -> usize {
self.cpu_budget
}

/// For the given desired number of CPUs, return the minimum number of required CPUs.
fn min_permits(&self, desired_cpus: usize) -> usize {
desired_cpus.min(self.cpu_budget).div_ceil(2)
Expand Down Expand Up @@ -115,7 +120,7 @@ impl CpuBudget {
/// Check if there are at least `budget` available CPUs in this budget.
///
/// A budget of `0` will always return `true`.
fn has_budget_exact(&self, budget: usize) -> bool {
pub fn has_budget_exact(&self, budget: usize) -> bool {
self.semaphore.available_permits() >= budget
}

Expand Down
10 changes: 9 additions & 1 deletion tests/shard-snapshot-api.sh
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,16 @@ function upload {
fixture-with-downloaded-snapshot
fixture-with-empty-collection

# Memory usage before upload
echo "Memory usage before upload:"
free -h

do-upload "$@"
check-recovered - "$DOWNLOADED_SNAPSHOT_POINTS" "$@"

# Memory usage after upload
echo "Memory usage after upload:"
free -h
}

function upload-priority-snapshot {
Expand Down Expand Up @@ -437,7 +445,7 @@ function fixture-with-points {
--uri "http://$QDRANT_HOST:$QDRANT_GRPC_PORT" \
--collection-name "$(basename "$(url)")" \
--dim 128 \
--num-vectors 10000 \
--num-vectors 100000 \
--skip-create
else
curl-ok \
Expand Down

0 comments on commit 6f72e78

Please sign in to comment.