Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

logger: adjust logger to receive logs blobs #1172

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
40a3a79
codegen: removed the otel layer generation
iulianbarbu Aug 22, 2023
1339888
proto: updated logger proto file
iulianbarbu Aug 22, 2023
7615495
logger: add store_logs RPC impl
iulianbarbu Aug 22, 2023
12fea55
logger: fixed tests
iulianbarbu Aug 23, 2023
6f7c1da
logger: decreased the visibility for Log type
iulianbarbu Aug 23, 2023
616fe2b
logger: remove the unwrap
iulianbarbu Aug 23, 2023
a9c0bd5
proto/logger: differentiate between a fetched/stored log items
iulianbarbu Aug 23, 2023
4c46e57
logger: simplify tests
iulianbarbu Aug 24, 2023
c813a61
runtime: removed tracing/otel dependencies
iulianbarbu Aug 24, 2023
2411767
runtime: removed the tracing dependencies
iulianbarbu Aug 24, 2023
3bae7ab
codegen: cleanup tracing layers and tests
iulianbarbu Aug 24, 2023
9e0fb45
runtime/next: fix compilation
iulianbarbu Aug 24, 2023
fc255d6
logger: remove claim verification on store_logs
iulianbarbu Aug 24, 2023
b1ab984
logger: implemented from_stored for StoredLogItem convertion to Log
iulianbarbu Aug 24, 2023
093e223
codegen: readd a missing import
iulianbarbu Aug 24, 2023
35f737e
codegen: fix tests
iulianbarbu Aug 24, 2023
ceaa9ff
addressed P review
iulianbarbu Aug 24, 2023
2659387
logger: removed integration test comment
iulianbarbu Aug 24, 2023
bf9c9b8
logger/tests: remove the DeploymentPush claim scope
iulianbarbu Aug 24, 2023
5fb7053
address orhun feedback
iulianbarbu Aug 24, 2023
ae860f5
logger: fixed error message typo
iulianbarbu Aug 24, 2023
67781d4
address Johan review
iulianbarbu Aug 24, 2023
53e7369
logger/proto: remove dedup protobuf defs
iulianbarbu Aug 24, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 0 additions & 42 deletions codegen/src/shuttle_main/mod.rs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to remove these deps from shuttle-runtime too, which should be a nice compile time win for users.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, and the tests here need to be updated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I actually removed the custom tracing layer injection. We'll have to follow up with a subscriber here if we want to structure the pre-user code logs or filter them. We'll probably want to set up some filtering within the business logic of the codegen, but still TBD.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to add back a macro annotation for enabling a default tracing subscriber like the one we've been using thus far?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't for this PR. We will track though this discussion separately in a ticket.

Original file line number Diff line number Diff line change
Expand Up @@ -273,50 +273,8 @@ impl ToTokens for Loader {
logger_uri: String,
deployment_id: String,
) -> #return_type {
use shuttle_runtime::Context;
use shuttle_runtime::tracing_subscriber::prelude::*;
use shuttle_runtime::opentelemetry_otlp::WithExportConfig;
#extra_imports

let filter_layer = shuttle_runtime::tracing_subscriber::EnvFilter::try_from_default_env()
.or_else(|_| shuttle_runtime::tracing_subscriber::EnvFilter::try_new("info"))
.unwrap();

let tracer = shuttle_runtime::opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
shuttle_runtime::opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(logger_uri),
)
.with_trace_config(
shuttle_runtime::opentelemetry::sdk::trace::config()
.with_resource(
shuttle_runtime::opentelemetry::sdk::Resource::new(
vec![
shuttle_runtime::opentelemetry::KeyValue::new(
"service.name",
"shuttle-runtime",
),
shuttle_runtime::opentelemetry::KeyValue::new(
"deployment_id",
deployment_id,
)
]
)
),
)
.install_batch(shuttle_runtime::opentelemetry::runtime::Tokio)
.unwrap();
let otel_layer = shuttle_runtime::tracing_opentelemetry::layer().with_tracer(tracer);

let registry = shuttle_runtime::tracing_subscriber::registry()
.with(filter_layer)
.with(otel_layer);

#inject_tracing_layer

registry.init();
#vars
#(let #fn_inputs = shuttle_runtime::get_resource(
#fn_inputs_builder::new()#fn_inputs_builder_options,
Expand Down
5 changes: 1 addition & 4 deletions logger/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ tokio = { workspace = true, features = ["rt-multi-thread"] }
tokio-stream = { workspace = true }
tonic = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["default"] }
tracing-opentelemetry = { workspace = true }
tracing-subscriber = { workspace = true, features = ["default", "json"] }
iulianbarbu marked this conversation as resolved.
Show resolved Hide resolved

[dependencies.shuttle-common]
workspace = true
Expand All @@ -40,5 +39,3 @@ portpicker = { workspace = true }
pretty_assertions = { workspace = true }
serde_json = { workspace = true }
shuttle-common-tests = { workspace = true }
opentelemetry-otlp = { workspace = true }
opentelemetry = { workspace = true }
5 changes: 2 additions & 3 deletions logger/migrations/0000_init.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
CREATE TABLE IF NOT EXISTS logs (
deployment_id TEXT, -- The deployment that this log line pertains to.
shuttle_service_name TEXT, -- The shuttle service which created this log.
timestamp INTEGER, -- Unix epoch timestamp.
level INTEGER, -- The log level
fields TEXT -- Log fields object.
tx_timestamp INTEGER, -- Unix epoch timestamp.
iulianbarbu marked this conversation as resolved.
Show resolved Hide resolved
data BLOB -- Log fields object.
);
182 changes: 12 additions & 170 deletions logger/src/dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,8 @@ use std::{path::Path, str::FromStr, time::SystemTime};

use async_broadcast::{broadcast, Sender};
use async_trait::async_trait;
use chrono::NaiveDateTime;
use opentelemetry_proto::tonic::trace::v1::{ResourceSpans, ScopeSpans, Span};
use prost_types::Timestamp;
use serde_json::Value;
use shuttle_common::{
backends::tracing::from_any_value_kv_to_serde_json_map, tracing::MESSAGE_KEY,
};
use shuttle_proto::logger::{self, LogItem};
use shuttle_proto::logger::LogItem;
use sqlx::{
migrate::{MigrateDatabase, Migrator},
sqlite::{SqliteConnectOptions, SqliteJournalMode},
Expand Down Expand Up @@ -85,13 +79,14 @@ impl Sqlite {

tokio::spawn(async move {
while let Ok(logs) = rx.recv().await {
let mut builder = QueryBuilder::new("INSERT INTO logs (deployment_id, shuttle_service_name, timestamp, level, fields)");
let mut builder = QueryBuilder::new(
"INSERT INTO logs (deployment_id, shuttle_service_name, data, tx_timestamp)",
);
builder.push_values(logs, |mut b, log| {
b.push_bind(log.deployment_id)
.push_bind(log.shuttle_service_name)
.push_bind(log.timestamp)
.push_bind(log.level)
.push_bind(log.fields);
.push_bind(log.data)
.push_bind(log.tx_timestamp);
});
let query = builder.build();

Expand All @@ -114,7 +109,7 @@ impl Sqlite {
impl Dal for Sqlite {
async fn get_logs(&self, deployment_id: String) -> Result<Vec<Log>, DalError> {
let result =
sqlx::query_as("SELECT * FROM logs WHERE deployment_id = ? ORDER BY timestamp")
sqlx::query_as("SELECT * FROM logs WHERE deployment_id = ? ORDER BY tx_timestamp")
.bind(deployment_id)
.fetch_all(&self.pool)
.await?;
Expand All @@ -127,170 +122,17 @@ impl Dal for Sqlite {
pub struct Log {
pub(crate) deployment_id: String,
pub(crate) shuttle_service_name: String,
pub(crate) timestamp: DateTime<Utc>,
pub(crate) level: LogLevel,
pub(crate) fields: Value,
}

#[derive(Clone, Debug, sqlx::Type)]
pub enum LogLevel {
Trace,
Debug,
Info,
Warn,
Error,
}

impl FromStr for LogLevel {
type Err = DalError;

fn from_str(value: &str) -> Result<Self, Self::Err> {
match value {
"TRACE" => Ok(Self::Trace),
"DEBUG" => Ok(Self::Debug),
"INFO" => Ok(Self::Info),
"WARN" => Ok(Self::Warn),
"ERROR" => Ok(Self::Error),
other => Err(DalError::Parsing(format!("invalid log level: {other}"))),
}
}
}

impl From<LogLevel> for logger::LogLevel {
fn from(level: LogLevel) -> Self {
match level {
LogLevel::Trace => Self::Trace,
LogLevel::Debug => Self::Debug,
LogLevel::Info => Self::Info,
LogLevel::Warn => Self::Warn,
LogLevel::Error => Self::Error,
}
}
}

impl Log {
/// Try to get a log from an OTLP [ResourceSpans]
pub fn try_from_scope_span(resource_spans: ResourceSpans) -> Option<Vec<Self>> {
let ResourceSpans {
resource,
scope_spans,
schema_url: _,
} = resource_spans;

let fields = from_any_value_kv_to_serde_json_map(resource?.attributes);
let shuttle_service_name = fields.get("service.name")?.as_str()?.to_string();
// TODO: should this be named "deployment.id" to conform to otlp standard?
let deployment_id = fields
.get("deployment_id")
.map(|v| {
v.as_str()
.expect("expected to have a string value for deployment_id key")
})
.map(|inner| inner.to_string());

let logs = scope_spans
.into_iter()
.flat_map(|scope_spans| {
let ScopeSpans {
spans,
schema_url: _,
..
} = scope_spans;

let events: Vec<_> = spans
.into_iter()
.flat_map(|span| {
Self::try_from_span(span, &shuttle_service_name, deployment_id.clone())
})
.flatten()
.collect();

Some(events)
})
.flatten()
.collect();

Some(logs)
}

/// Try to get self from an OTLP [Span]. Also enrich it with the shuttle service name and deployment id.
fn try_from_span(
span: Span,
shuttle_service_name: &str,
deployment_id: Option<String>,
) -> Option<Vec<Self>> {
// If we didn't find the id in the resource span, check the inner spans.
let mut span_fields = from_any_value_kv_to_serde_json_map(span.attributes);
let deployment_id = deployment_id.or(span_fields
.get("deployment_id")?
.as_str()
.map(|inner| inner.to_string()))?;
let mut logs: Vec<Self> = span
.events
.into_iter()
.flat_map(|event| {
let message = event.name;

let mut fields = from_any_value_kv_to_serde_json_map(event.attributes);
fields.insert(MESSAGE_KEY.to_string(), message.into());

// Since we store the "level" in the level column in the database, we remove it
// from the event fields so it is not duplicated there.
// Note: this should never fail, a tracing event should always have a level.
let level = fields.remove("level")?;

let naive = NaiveDateTime::from_timestamp_opt(
(event.time_unix_nano / 1_000_000_000)
.try_into()
.unwrap_or_default(),
(event.time_unix_nano % 1_000_000_000) as u32,
)
.unwrap_or_default();

Some(Log {
shuttle_service_name: shuttle_service_name.to_string(),
deployment_id: deployment_id.clone(),
timestamp: DateTime::from_utc(naive, Utc),
level: level.as_str()?.parse().ok()?,
fields: Value::Object(fields),
})
})
.collect();

span_fields.insert(
MESSAGE_KEY.to_string(),
format!("[span] {}", span.name).into(),
);

logs.push(Log {
shuttle_service_name: shuttle_service_name.to_string(),
deployment_id,
timestamp: DateTime::from_utc(
NaiveDateTime::from_timestamp_opt(
(span.start_time_unix_nano / 1_000_000_000)
.try_into()
.unwrap_or_default(),
(span.start_time_unix_nano % 1_000_000_000) as u32,
)
.unwrap_or_default(),
Utc,
),
// Span level doesn't exist for opentelemetry spans, so this info is not relevant.
level: LogLevel::Info,
fields: Value::Object(span_fields),
});

Some(logs)
}
pub(crate) tx_timestamp: DateTime<Utc>,
pub(crate) data: Vec<u8>,
}

impl From<Log> for LogItem {
fn from(log: Log) -> Self {
LogItem {
service_name: log.shuttle_service_name,
timestamp: Some(Timestamp::from(SystemTime::from(log.timestamp))),
level: logger::LogLevel::from(log.level) as i32,
fields: serde_json::to_vec(&log.fields).unwrap_or_default(),
deployment_id: log.deployment_id,
tx_timestamp: Some(Timestamp::from(SystemTime::from(log.tx_timestamp))),
data: log.data,
}
}
}
Loading