Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

logger: adjust logger to receive logs blobs #1172

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
40a3a79
codegen: removed the otel layer generation
iulianbarbu Aug 22, 2023
1339888
proto: updated logger proto file
iulianbarbu Aug 22, 2023
7615495
logger: add store_logs RPC impl
iulianbarbu Aug 22, 2023
12fea55
logger: fixed tests
iulianbarbu Aug 23, 2023
6f7c1da
logger: decreased the visibility for Log type
iulianbarbu Aug 23, 2023
616fe2b
logger: remove the unwrap
iulianbarbu Aug 23, 2023
a9c0bd5
proto/logger: differentiate between a fetched/stored log items
iulianbarbu Aug 23, 2023
4c46e57
logger: simplify tests
iulianbarbu Aug 24, 2023
c813a61
runtime: removed tracing/otel dependencies
iulianbarbu Aug 24, 2023
2411767
runtime: removed the tracing dependencies
iulianbarbu Aug 24, 2023
3bae7ab
codegen: cleanup tracing layers and tests
iulianbarbu Aug 24, 2023
9e0fb45
runtime/next: fix compilation
iulianbarbu Aug 24, 2023
fc255d6
logger: remove claim verification on store_logs
iulianbarbu Aug 24, 2023
b1ab984
logger: implemented from_stored for StoredLogItem convertion to Log
iulianbarbu Aug 24, 2023
093e223
codegen: readd a missing import
iulianbarbu Aug 24, 2023
35f737e
codegen: fix tests
iulianbarbu Aug 24, 2023
ceaa9ff
addressed P review
iulianbarbu Aug 24, 2023
2659387
logger: removed integration test comment
iulianbarbu Aug 24, 2023
bf9c9b8
logger/tests: remove the DeploymentPush claim scope
iulianbarbu Aug 24, 2023
5fb7053
address orhun feedback
iulianbarbu Aug 24, 2023
ae860f5
logger: fixed error message typo
iulianbarbu Aug 24, 2023
67781d4
address Johan review
iulianbarbu Aug 24, 2023
53e7369
logger/proto: remove dedup protobuf defs
iulianbarbu Aug 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
logger/proto: remove dedup protobuf defs
  • Loading branch information
iulianbarbu committed Aug 24, 2023
commit 53e73695a650dd0421fcec990e0e3dc3d34a3965
21 changes: 12 additions & 9 deletions logger/src/dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,12 @@ pub struct Log {
}

impl Log {
pub(crate) fn from_stored(log: LogItem) -> Self {
let timestamp = log.tx_timestamp.clone().unwrap_or_default();
Log {
pub(crate) fn from_log_item(log: LogItem) -> Option<Self> {
let log_line = log.log_line?;
let timestamp = log_line.tx_timestamp.clone().unwrap_or_default();
Some(Log {
deployment_id: log.deployment_id,
shuttle_service_name: log.service_name,
shuttle_service_name: log_line.service_name,
tx_timestamp: DateTime::from_utc(
NaiveDateTime::from_timestamp_opt(
timestamp.seconds,
Expand All @@ -142,18 +143,20 @@ impl Log {
.unwrap_or_default(),
Utc,
),
data: log.data,
}
data: log_line.data,
})
}
}

impl From<Log> for LogItem {
fn from(log: Log) -> Self {
LogItem {
service_name: log.shuttle_service_name,
deployment_id: log.deployment_id,
tx_timestamp: Some(Timestamp::from(SystemTime::from(log.tx_timestamp))),
data: log.data,
log_line: Some(LogLine {
service_name: log.shuttle_service_name,
tx_timestamp: Some(Timestamp::from(SystemTime::from(log.tx_timestamp))),
data: log.data,
}),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion logger/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ where
if !logs.is_empty() {
_ = self
.logs_tx
.broadcast(logs.into_iter().map(Log::from_stored).collect())
.broadcast(logs.into_iter().filter_map(Log::from_log_item).collect())
.await
.map_err(|err| {
Status::internal(format!(
Expand Down
54 changes: 31 additions & 23 deletions logger/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,23 @@ async fn store_and_get_logs() {
let expected_stored_logs = vec![
LogItem {
deployment_id: deployment_id.to_string(),
service_name: SHUTTLE_SERVICE.to_string(),
tx_timestamp: Some(Timestamp::from(SystemTime::UNIX_EPOCH)),
data: "log 1 example".as_bytes().to_vec(),
log_line: Some(LogLine {
service_name: SHUTTLE_SERVICE.to_string(),
tx_timestamp: Some(Timestamp::from(SystemTime::UNIX_EPOCH)),
data: "log 1 example".as_bytes().to_vec(),
}),
},
LogItem {
deployment_id: deployment_id.to_string(),
service_name: SHUTTLE_SERVICE.to_string(),
tx_timestamp: Some(Timestamp::from(
SystemTime::UNIX_EPOCH
.checked_add(Duration::from_secs(10))
.unwrap(),
)),
data: "log 2 example".as_bytes().to_vec(),
log_line: Some(LogLine {
service_name: SHUTTLE_SERVICE.to_string(),
tx_timestamp: Some(Timestamp::from(
SystemTime::UNIX_EPOCH
.checked_add(Duration::from_secs(10))
.unwrap(),
)),
data: "log 2 example".as_bytes().to_vec(),
}),
},
];
let response = client
Expand All @@ -69,7 +73,7 @@ async fn store_and_get_logs() {
logs,
expected_stored_logs
.into_iter()
.map(Into::into)
.map(|log| log.log_line.unwrap())
.collect::<Vec<LogLine>>()
);
});
Expand All @@ -95,19 +99,23 @@ async fn get_stream_logs() {
let expected_stored_logs = vec![
LogItem {
deployment_id: deployment_id.to_string(),
service_name: SHUTTLE_SERVICE.to_string(),
tx_timestamp: Some(Timestamp::from(SystemTime::UNIX_EPOCH)),
data: "log 1 example".as_bytes().to_vec(),
log_line: Some(LogLine {
service_name: SHUTTLE_SERVICE.to_string(),
tx_timestamp: Some(Timestamp::from(SystemTime::UNIX_EPOCH)),
data: "log 1 example".as_bytes().to_vec(),
}),
},
LogItem {
deployment_id: deployment_id.to_string(),
service_name: SHUTTLE_SERVICE.to_string(),
tx_timestamp: Some(Timestamp::from(
SystemTime::UNIX_EPOCH
.checked_add(Duration::from_secs(10))
.unwrap(),
)),
data: "log 2 example".as_bytes().to_vec(),
log_line: Some(LogLine {
service_name: SHUTTLE_SERVICE.to_string(),
tx_timestamp: Some(Timestamp::from(
SystemTime::UNIX_EPOCH
.checked_add(Duration::from_secs(10))
.unwrap(),
)),
data: "log 2 example".as_bytes().to_vec(),
}),
},
];

Expand All @@ -134,14 +142,14 @@ async fn get_stream_logs() {
.unwrap()
.unwrap()
.unwrap();
assert_eq!(LogLine::from(expected_stored_logs[0].clone()), log);
assert_eq!(expected_stored_logs[0].clone().log_line.unwrap(), log);

let log = timeout(std::time::Duration::from_millis(500), response.message())
.await
.unwrap()
.unwrap()
.unwrap();
assert_eq!(LogLine::from(expected_stored_logs[1].clone()), log);
assert_eq!(expected_stored_logs[1].clone().log_line.unwrap(), log);
});

tokio::select! {
Expand Down
6 changes: 2 additions & 4 deletions proto/logger.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@ message LogsResponse {
}

message LogItem {
string service_name = 1;
string deployment_id = 2;
google.protobuf.Timestamp tx_timestamp = 3;
bytes data = 4;
string deployment_id = 1;
LogLine log_line = 2;
}

message LogLine {
Expand Down
8 changes: 2 additions & 6 deletions proto/src/generated/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,9 @@ pub struct LogsResponse {
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct LogItem {
#[prost(string, tag = "1")]
pub service_name: ::prost::alloc::string::String,
#[prost(string, tag = "2")]
pub deployment_id: ::prost::alloc::string::String,
#[prost(message, optional, tag = "3")]
pub tx_timestamp: ::core::option::Option<::prost_types::Timestamp>,
#[prost(bytes = "vec", tag = "4")]
pub data: ::prost::alloc::vec::Vec<u8>,
#[prost(message, optional, tag = "2")]
pub log_line: ::core::option::Option<LogLine>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
10 changes: 0 additions & 10 deletions proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,14 +232,4 @@ pub mod resource_recorder {

pub mod logger {
include!("generated/logger.rs");

impl From<LogItem> for LogLine {
fn from(value: LogItem) -> Self {
LogLine {
service_name: value.service_name,
tx_timestamp: value.tx_timestamp,
data: value.data,
}
}
}
}