Skip to content

Commit

Permalink
Include "last_responded" in telemetry (qdrant#1668)
Browse files Browse the repository at this point in the history
* feat: include "last_queried" in /telemetry

* fix: anonymize responses in the WebApiTelemetry and GrpcTelemetry Anonymize impls

* fix: anonymize the last_queried field

* fix: use YYYY/MM/DD hh:mm:ss format on 'last_queried'

* tests: add and fix telemetry test

* refactor: rename last_queried -> last_responded

* cargo fmt

* update openapi models

* refactor: rename last_request_time -> last_response_date

* tests: update test

* review changes

* add chrono to schemars

---------

Co-authored-by: Andrey Vasnetsov <andrey@vasnetsov.com>
  • Loading branch information
coszio and generall committed Apr 11, 2023
1 parent c3c9688 commit e1d494d
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 19 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions docs/redoc/master/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,7 @@
]
},
"result": {
"type": "array",
"items": {
"$ref": "#/components/schemas/TelemetryData"
}
"$ref": "#/components/schemas/TelemetryData"
}
}
}
Expand Down Expand Up @@ -6757,6 +6754,11 @@
"type": "number",
"format": "float",
"nullable": true
},
"last_responded": {
"type": "string",
"format": "date-time",
"nullable": true
}
}
},
Expand Down
2 changes: 1 addition & 1 deletion lib/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ prost = "0.11.8"
prost-types = "0.11.8"
serde = { version = "~1.0", features = ["derive"] }
serde_json = "~1.0"
schemars = { version = "0.8.12", features = ["uuid1", "preserve_order"] }
schemars = { version = "0.8.12", features = ["uuid1", "preserve_order", "chrono"] }
uuid = { version = "1.3", features = ["v4", "serde"] }
tower = "0.4.13"
tokio = "1.27.0"
Expand Down
3 changes: 2 additions & 1 deletion lib/segment/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ thiserror = "1.0"
atomic_refcell = "0.1.9"
atomicwrites = "0.4.0"
memmap2 = "0.5.10"
schemars = { version = "0.8.12", features = ["uuid1", "preserve_order"] }
schemars = { version = "0.8.12", features = ["uuid1", "preserve_order", "chrono"] }
log = "0.4"
geo = "0.24.1"
geohash = "0.13.0"
Expand All @@ -50,6 +50,7 @@ semver = "1.0.17"
tinyvec = { version = "1.6.0", features = ["alloc"] }
quantization = { git = "https://github.com/qdrant/quantization.git" }
validator = { version = "0.16", features = ["derive"] }
chrono = "0.4.24"

[[bench]]
name = "vector_search"
Expand Down
10 changes: 10 additions & 0 deletions lib/segment/src/common/anonymize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use std::collections::hash_map::DefaultHasher;
use std::collections::{BTreeMap, HashMap};
use std::hash::{Hash, Hasher};

use chrono::{DateTime, Utc};

pub trait Anonymize {
fn anonymize(&self) -> Self;
}
Expand Down Expand Up @@ -60,3 +62,11 @@ impl Anonymize for usize {
}
}
}

impl Anonymize for DateTime<Utc> {
fn anonymize(&self) -> Self {
let coeff: f32 = rand::random();

*self + chrono::Duration::seconds(((coeff * 20.0) - 10.0) as i64)
}
}
16 changes: 13 additions & 3 deletions lib/segment/src/common/operation_time_statistics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;
use std::time::{Duration, Instant};

use chrono::{DateTime, SubsecRound, Utc};
use parking_lot::Mutex;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -29,6 +30,10 @@ pub struct OperationDurationStatistics {
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub max_duration_micros: Option<f32>,

#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub last_responded: Option<DateTime<Utc>>,
}

pub struct OperationDurationsAggregator {
Expand All @@ -39,6 +44,7 @@ pub struct OperationDurationsAggregator {
timing_loops: usize,
min_value: Option<f32>,
max_value: Option<f32>,
last_response_date: Option<DateTime<Utc>>,
}

pub struct ScopeDurationMeasurer {
Expand All @@ -52,9 +58,8 @@ impl Anonymize for OperationDurationStatistics {
Self {
count: self.count.anonymize(),
fail_count: self.fail_count.anonymize(),
avg_duration_micros: self.avg_duration_micros,
min_duration_micros: self.min_duration_micros,
max_duration_micros: self.max_duration_micros,
last_responded: self.last_responded.anonymize(),
..*self
}
}
}
Expand Down Expand Up @@ -82,6 +87,7 @@ impl std::ops::Add for OperationDurationStatistics {
other.max_duration_micros,
|a, b| a > b,
),
last_responded: std::cmp::max(self.last_responded, other.last_responded),
}
}
}
Expand Down Expand Up @@ -174,6 +180,7 @@ impl OperationDurationsAggregator {
timing_loops: 0,
min_value: None,
max_value: None,
last_response_date: Some(Utc::now().round_subsecs(2)),
}))
}

Expand All @@ -199,6 +206,8 @@ impl OperationDurationsAggregator {
} else {
self.fail_count += 1;
}

self.last_response_date = Some(Utc::now().round_subsecs(2));
}

pub fn get_statistics(&self) -> OperationDurationStatistics {
Expand All @@ -212,6 +221,7 @@ impl OperationDurationsAggregator {
},
min_duration_micros: self.min_value,
max_duration_micros: self.max_value,
last_responded: self.last_response_date,
}
}

Expand Down
2 changes: 1 addition & 1 deletion lib/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ wal = { git = "https://github.com/qdrant/wal.git", rev = "9fe5a0c97c148152adacca
tokio = { version = "~1.27", features = ["rt-multi-thread"] }
serde = { version = "~1.0", features = ["derive"] }
serde_json = "~1.0"
schemars = { version = "0.8.12", features = ["uuid1", "preserve_order"] }
schemars = { version = "0.8.12", features = ["uuid1", "preserve_order", "chrono"] }
itertools = "0.10"
async-trait = "0.1.68"
log = "0.4"
Expand Down
2 changes: 1 addition & 1 deletion openapi/openapi-service.ytt.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ paths:
required: false
schema:
type: boolean
responses: #@ response(array(reference("TelemetryData")))
responses: #@ response(reference("TelemetryData"))

/metrics:
get:
Expand Down
29 changes: 29 additions & 0 deletions openapi/tests/openapi_integration/test_service.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
import pytest
from datetime import datetime, timedelta

from .helpers.helpers import request_with_validation
from .helpers.collection_setup import basic_collection_setup, drop_collection

collection_name = 'test_collection_telemetry'

@pytest.fixture(autouse=True)
def setup():
basic_collection_setup(collection_name=collection_name)
yield
drop_collection(collection_name=collection_name)

def test_metrics():
response = request_with_validation(
Expand All @@ -15,3 +24,23 @@ def test_metrics():
assert '# TYPE app_info counter' in response.text
assert 'app_info{name="qdrant",version="' in response.text
assert 'collections_total ' in response.text

def test_telemetry():
response = request_with_validation(
api='/telemetry',
method="GET",
)

assert response.ok

result = response.json()['result']

assert result['collections']['number_of_collections'] >= 1

endpoint = result['requests']['rest']['responses']['PUT /collections/{name}/points']
assert endpoint['200']['count'] > 0

last_queried = endpoint['200']['last_responded']
last_queried = datetime.strptime(last_queried, "%Y-%m-%dT%H:%M:%S.%f%z")
# Assert today
assert last_queried.date() == datetime.now().date()
28 changes: 20 additions & 8 deletions src/common/telemetry_ops/requests_telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,11 @@ pub type HttpStatusCode = u16;

#[derive(Serialize, Deserialize, Clone, Default, Debug, JsonSchema)]
pub struct WebApiTelemetry {
#[serde(skip_serializing_if = "HashMap::is_empty")]
pub responses: HashMap<String, HashMap<HttpStatusCode, OperationDurationStatistics>>,
}

#[derive(Serialize, Deserialize, Clone, Default, Debug, JsonSchema)]
pub struct GrpcTelemetry {
#[serde(skip_serializing_if = "HashMap::is_empty")]
pub responses: HashMap<String, OperationDurationStatistics>,
}

Expand Down Expand Up @@ -174,16 +172,30 @@ impl Anonymize for RequestsTelemetry {

impl Anonymize for WebApiTelemetry {
fn anonymize(&self) -> Self {
WebApiTelemetry {
responses: self.responses.clone(),
}
let responses = self
.responses
.iter()
.map(|(key, value)| {
let value: HashMap<_, _> = value
.iter()
.map(|(key, value)| (*key, value.anonymize()))
.collect();
(key.clone(), value)
})
.collect();

WebApiTelemetry { responses }
}
}

impl Anonymize for GrpcTelemetry {
fn anonymize(&self) -> Self {
GrpcTelemetry {
responses: self.responses.clone(),
}
let responses = self
.responses
.iter()
.map(|(key, value)| (key.clone(), value.anonymize()))
.collect();

GrpcTelemetry { responses }
}
}

0 comments on commit e1d494d

Please sign in to comment.