From e1d494d6be2f73f27d80934f5c59b86f5cfa01b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Coss=C3=ADo?= Date: Mon, 10 Apr 2023 14:20:00 -0400 Subject: [PATCH] Include "last_responded" in telemetry (#1668) * feat: include "last_queried" in /telemetry * fix: anonymize responses in the WebApiTelemetry and GrpcTelemetry Anonymize impls * fix: anonymize the last_queried field * fix: use YYYY/MM/DD hh:mm:ss format on 'last_queried' * tests: add and fix telemetry test * refactor: rename last_queried -> last_responded * cargo fmt * update openapi models * refactor: rename last_request_time -> last_response_date * tests: update test * review changes * add chrono to schemars --------- Co-authored-by: Andrey Vasnetsov --- Cargo.lock | 1 + docs/redoc/master/openapi.json | 10 ++++--- lib/api/Cargo.toml | 2 +- lib/segment/Cargo.toml | 3 +- lib/segment/src/common/anonymize.rs | 10 +++++++ .../src/common/operation_time_statistics.rs | 16 ++++++++-- lib/storage/Cargo.toml | 2 +- openapi/openapi-service.ytt.yaml | 2 +- .../tests/openapi_integration/test_service.py | 29 +++++++++++++++++++ .../telemetry_ops/requests_telemetry.rs | 28 +++++++++++++----- 10 files changed, 84 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3a41b347bc7..1b9194696bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3640,6 +3640,7 @@ dependencies = [ "atomicwrites", "bincode", "bitvec", + "chrono", "criterion", "fs_extra", "geo", diff --git a/docs/redoc/master/openapi.json b/docs/redoc/master/openapi.json index 08fcb797955..01b230ad40c 100644 --- a/docs/redoc/master/openapi.json +++ b/docs/redoc/master/openapi.json @@ -59,10 +59,7 @@ ] }, "result": { - "type": "array", - "items": { - "$ref": "#/components/schemas/TelemetryData" - } + "$ref": "#/components/schemas/TelemetryData" } } } @@ -6757,6 +6754,11 @@ "type": "number", "format": "float", "nullable": true + }, + "last_responded": { + "type": "string", + "format": "date-time", + "nullable": true } } }, diff --git a/lib/api/Cargo.toml b/lib/api/Cargo.toml index 85f4b5bff4d..a244c9125ed 100644 --- a/lib/api/Cargo.toml +++ b/lib/api/Cargo.toml @@ -13,7 +13,7 @@ prost = "0.11.8" prost-types = "0.11.8" serde = { version = "~1.0", features = ["derive"] } serde_json = "~1.0" -schemars = { version = "0.8.12", features = ["uuid1", "preserve_order"] } +schemars = { version = "0.8.12", features = ["uuid1", "preserve_order", "chrono"] } uuid = { version = "1.3", features = ["v4", "serde"] } tower = "0.4.13" tokio = "1.27.0" diff --git a/lib/segment/Cargo.toml b/lib/segment/Cargo.toml index 4d7bb541f57..58db89e75a8 100644 --- a/lib/segment/Cargo.toml +++ b/lib/segment/Cargo.toml @@ -35,7 +35,7 @@ thiserror = "1.0" atomic_refcell = "0.1.9" atomicwrites = "0.4.0" memmap2 = "0.5.10" -schemars = { version = "0.8.12", features = ["uuid1", "preserve_order"] } +schemars = { version = "0.8.12", features = ["uuid1", "preserve_order", "chrono"] } log = "0.4" geo = "0.24.1" geohash = "0.13.0" @@ -50,6 +50,7 @@ semver = "1.0.17" tinyvec = { version = "1.6.0", features = ["alloc"] } quantization = { git = "https://github.com/qdrant/quantization.git" } validator = { version = "0.16", features = ["derive"] } +chrono = "0.4.24" [[bench]] name = "vector_search" diff --git a/lib/segment/src/common/anonymize.rs b/lib/segment/src/common/anonymize.rs index 3fca222b9a8..3115aebccbe 100644 --- a/lib/segment/src/common/anonymize.rs +++ b/lib/segment/src/common/anonymize.rs @@ -2,6 +2,8 @@ use std::collections::hash_map::DefaultHasher; use std::collections::{BTreeMap, HashMap}; use std::hash::{Hash, Hasher}; +use chrono::{DateTime, Utc}; + pub trait Anonymize { fn anonymize(&self) -> Self; } @@ -60,3 +62,11 @@ impl Anonymize for usize { } } } + +impl Anonymize for DateTime { + fn anonymize(&self) -> Self { + let coeff: f32 = rand::random(); + + *self + chrono::Duration::seconds(((coeff * 20.0) - 10.0) as i64) + } +} diff --git a/lib/segment/src/common/operation_time_statistics.rs b/lib/segment/src/common/operation_time_statistics.rs index 22bc7910d62..06a97481ada 100644 --- a/lib/segment/src/common/operation_time_statistics.rs +++ b/lib/segment/src/common/operation_time_statistics.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; +use chrono::{DateTime, SubsecRound, Utc}; use parking_lot::Mutex; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -29,6 +30,10 @@ pub struct OperationDurationStatistics { #[serde(skip_serializing_if = "Option::is_none")] #[serde(default)] pub max_duration_micros: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + pub last_responded: Option>, } pub struct OperationDurationsAggregator { @@ -39,6 +44,7 @@ pub struct OperationDurationsAggregator { timing_loops: usize, min_value: Option, max_value: Option, + last_response_date: Option>, } pub struct ScopeDurationMeasurer { @@ -52,9 +58,8 @@ impl Anonymize for OperationDurationStatistics { Self { count: self.count.anonymize(), fail_count: self.fail_count.anonymize(), - avg_duration_micros: self.avg_duration_micros, - min_duration_micros: self.min_duration_micros, - max_duration_micros: self.max_duration_micros, + last_responded: self.last_responded.anonymize(), + ..*self } } } @@ -82,6 +87,7 @@ impl std::ops::Add for OperationDurationStatistics { other.max_duration_micros, |a, b| a > b, ), + last_responded: std::cmp::max(self.last_responded, other.last_responded), } } } @@ -174,6 +180,7 @@ impl OperationDurationsAggregator { timing_loops: 0, min_value: None, max_value: None, + last_response_date: Some(Utc::now().round_subsecs(2)), })) } @@ -199,6 +206,8 @@ impl OperationDurationsAggregator { } else { self.fail_count += 1; } + + self.last_response_date = Some(Utc::now().round_subsecs(2)); } pub fn get_statistics(&self) -> OperationDurationStatistics { @@ -212,6 +221,7 @@ impl OperationDurationsAggregator { }, min_duration_micros: self.min_value, max_duration_micros: self.max_value, + last_responded: self.last_response_date, } } diff --git a/lib/storage/Cargo.toml b/lib/storage/Cargo.toml index eddb7ca8379..c80f80aa146 100644 --- a/lib/storage/Cargo.toml +++ b/lib/storage/Cargo.toml @@ -18,7 +18,7 @@ wal = { git = "https://github.com/qdrant/wal.git", rev = "9fe5a0c97c148152adacca tokio = { version = "~1.27", features = ["rt-multi-thread"] } serde = { version = "~1.0", features = ["derive"] } serde_json = "~1.0" -schemars = { version = "0.8.12", features = ["uuid1", "preserve_order"] } +schemars = { version = "0.8.12", features = ["uuid1", "preserve_order", "chrono"] } itertools = "0.10" async-trait = "0.1.68" log = "0.4" diff --git a/openapi/openapi-service.ytt.yaml b/openapi/openapi-service.ytt.yaml index a041a10cc08..6adf3a0d8d2 100644 --- a/openapi/openapi-service.ytt.yaml +++ b/openapi/openapi-service.ytt.yaml @@ -15,7 +15,7 @@ paths: required: false schema: type: boolean - responses: #@ response(array(reference("TelemetryData"))) + responses: #@ response(reference("TelemetryData")) /metrics: get: diff --git a/openapi/tests/openapi_integration/test_service.py b/openapi/tests/openapi_integration/test_service.py index 28e58c262ef..5ca8e143c37 100644 --- a/openapi/tests/openapi_integration/test_service.py +++ b/openapi/tests/openapi_integration/test_service.py @@ -1,7 +1,16 @@ import pytest +from datetime import datetime, timedelta from .helpers.helpers import request_with_validation +from .helpers.collection_setup import basic_collection_setup, drop_collection +collection_name = 'test_collection_telemetry' + +@pytest.fixture(autouse=True) +def setup(): + basic_collection_setup(collection_name=collection_name) + yield + drop_collection(collection_name=collection_name) def test_metrics(): response = request_with_validation( @@ -15,3 +24,23 @@ def test_metrics(): assert '# TYPE app_info counter' in response.text assert 'app_info{name="qdrant",version="' in response.text assert 'collections_total ' in response.text + +def test_telemetry(): + response = request_with_validation( + api='/telemetry', + method="GET", + ) + + assert response.ok + + result = response.json()['result'] + + assert result['collections']['number_of_collections'] >= 1 + + endpoint = result['requests']['rest']['responses']['PUT /collections/{name}/points'] + assert endpoint['200']['count'] > 0 + + last_queried = endpoint['200']['last_responded'] + last_queried = datetime.strptime(last_queried, "%Y-%m-%dT%H:%M:%S.%f%z") + # Assert today + assert last_queried.date() == datetime.now().date() \ No newline at end of file diff --git a/src/common/telemetry_ops/requests_telemetry.rs b/src/common/telemetry_ops/requests_telemetry.rs index 26f3b4087a6..b1f94f53a2c 100644 --- a/src/common/telemetry_ops/requests_telemetry.rs +++ b/src/common/telemetry_ops/requests_telemetry.rs @@ -13,13 +13,11 @@ pub type HttpStatusCode = u16; #[derive(Serialize, Deserialize, Clone, Default, Debug, JsonSchema)] pub struct WebApiTelemetry { - #[serde(skip_serializing_if = "HashMap::is_empty")] pub responses: HashMap>, } #[derive(Serialize, Deserialize, Clone, Default, Debug, JsonSchema)] pub struct GrpcTelemetry { - #[serde(skip_serializing_if = "HashMap::is_empty")] pub responses: HashMap, } @@ -174,16 +172,30 @@ impl Anonymize for RequestsTelemetry { impl Anonymize for WebApiTelemetry { fn anonymize(&self) -> Self { - WebApiTelemetry { - responses: self.responses.clone(), - } + let responses = self + .responses + .iter() + .map(|(key, value)| { + let value: HashMap<_, _> = value + .iter() + .map(|(key, value)| (*key, value.anonymize())) + .collect(); + (key.clone(), value) + }) + .collect(); + + WebApiTelemetry { responses } } } impl Anonymize for GrpcTelemetry { fn anonymize(&self) -> Self { - GrpcTelemetry { - responses: self.responses.clone(), - } + let responses = self + .responses + .iter() + .map(|(key, value)| (key.clone(), value.anonymize())) + .collect(); + + GrpcTelemetry { responses } } }