Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

logger: adjust logger to receive logs blobs #1172

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
40a3a79
codegen: removed the otel layer generation
iulianbarbu Aug 22, 2023
1339888
proto: updated logger proto file
iulianbarbu Aug 22, 2023
7615495
logger: add store_logs RPC impl
iulianbarbu Aug 22, 2023
12fea55
logger: fixed tests
iulianbarbu Aug 23, 2023
6f7c1da
logger: decreased the visibility for Log type
iulianbarbu Aug 23, 2023
616fe2b
logger: remove the unwrap
iulianbarbu Aug 23, 2023
a9c0bd5
proto/logger: differentiate between a fetched/stored log items
iulianbarbu Aug 23, 2023
4c46e57
logger: simplify tests
iulianbarbu Aug 24, 2023
c813a61
runtime: removed tracing/otel dependencies
iulianbarbu Aug 24, 2023
2411767
runtime: removed the tracing dependencies
iulianbarbu Aug 24, 2023
3bae7ab
codegen: cleanup tracing layers and tests
iulianbarbu Aug 24, 2023
9e0fb45
runtime/next: fix compilation
iulianbarbu Aug 24, 2023
fc255d6
logger: remove claim verification on store_logs
iulianbarbu Aug 24, 2023
b1ab984
logger: implemented from_stored for StoredLogItem convertion to Log
iulianbarbu Aug 24, 2023
093e223
codegen: readd a missing import
iulianbarbu Aug 24, 2023
35f737e
codegen: fix tests
iulianbarbu Aug 24, 2023
ceaa9ff
addressed P review
iulianbarbu Aug 24, 2023
2659387
logger: removed integration test comment
iulianbarbu Aug 24, 2023
bf9c9b8
logger/tests: remove the DeploymentPush claim scope
iulianbarbu Aug 24, 2023
5fb7053
address orhun feedback
iulianbarbu Aug 24, 2023
ae860f5
logger: fixed error message typo
iulianbarbu Aug 24, 2023
67781d4
address Johan review
iulianbarbu Aug 24, 2023
53e7369
logger/proto: remove dedup protobuf defs
iulianbarbu Aug 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
address orhun feedback
  • Loading branch information
iulianbarbu committed Aug 24, 2023
commit 5fb7053e41b8106bbb67fba95bd5b5d7a6eb9e7f
6 changes: 4 additions & 2 deletions logger/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@ where
.broadcast(logs.into_iter().map(Log::from_stored).collect())
.await
.map_err(|err| {
println!("failed to send log to storage: {}", err);
});
Status::internal(format!(
"Errored while storing trying to store the logs in persistence: {err}"
))
})?;
}

Ok(Response::new(StoreLogsResponse { success: true }))
Expand Down
64 changes: 24 additions & 40 deletions logger/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,46 +13,31 @@ use shuttle_proto::logger::{
logger_client::LoggerClient, logger_server::LoggerServer, FetchedLogItem, LogsRequest,
StoreLogsRequest, StoredLogItem,
};
use tokio::time::timeout;
use tokio::{task::JoinHandle, time::timeout};
use tonic::{transport::Server, Request};

const SHUTTLE_SERVICE: &str = "test";

#[tokio::test]
async fn store_and_get_logs() {
let port = pick_unused_port().unwrap();
let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), port);
const DEPLOYMENT_ID: &str = "runtime-fetch-logs-deployment-id";

// Start the logger server in the background.
let sqlite = Sqlite::new_in_memory().await;
let sqlite_clone = sqlite.clone();
let server = tokio::task::spawn(async move {
Server::builder()
.layer(JwtScopesLayer::new(vec![Scope::Logs]))
.add_service(LoggerServer::new(Service::new(
sqlite_clone.get_sender(),
sqlite_clone,
)))
.serve(addr)
.await
.unwrap()
});
let deployment_id = "runtime-fetch-logs-deployment-id";

let server = get_server(port);
let test = tokio::task::spawn(async move {
let dst = format!("http://localhost:{port}");
let mut client = LoggerClient::connect(dst).await.unwrap();

// Get the generated logs
let expected_stored_logs = vec![
StoredLogItem {
deployment_id: DEPLOYMENT_ID.to_string(),
deployment_id: deployment_id.to_string(),
service_name: SHUTTLE_SERVICE.to_string(),
tx_timestamp: Some(Timestamp::from(SystemTime::UNIX_EPOCH)),
data: "log 1 example".as_bytes().to_vec(),
},
StoredLogItem {
deployment_id: DEPLOYMENT_ID.to_string(),
deployment_id: deployment_id.to_string(),
service_name: SHUTTLE_SERVICE.to_string(),
tx_timestamp: Some(Timestamp::from(
SystemTime::UNIX_EPOCH
Expand All @@ -74,7 +59,7 @@ async fn store_and_get_logs() {
// Get logs
let logs = client
.get_logs(Request::new(LogsRequest {
deployment_id: DEPLOYMENT_ID.into(),
deployment_id: deployment_id.into(),
}))
.await
.unwrap()
Expand All @@ -98,38 +83,24 @@ async fn store_and_get_logs() {
#[tokio::test]
async fn get_stream_logs() {
let port = pick_unused_port().unwrap();
let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), port);
const DEPLOYMENT_ID: &str = "runtime-fetch-logs-deployment-id";
let deployment_id = "runtime-fetch-logs-deployment-id";

// Start the logger server in the background.
let sqlite = Sqlite::new_in_memory().await;
let sqlite_clone = sqlite.clone();
let server = tokio::task::spawn(async move {
Server::builder()
.layer(JwtScopesLayer::new(vec![Scope::Logs]))
.add_service(LoggerServer::new(Service::new(
sqlite_clone.get_sender(),
sqlite_clone,
)))
.serve(addr)
.await
.unwrap()
});

let server = get_server(port);
let test = tokio::task::spawn(async move {
let dst = format!("http://localhost:{port}");
let mut client = LoggerClient::connect(dst).await.unwrap();

// Get the generated logs
let expected_stored_logs = vec![
StoredLogItem {
deployment_id: DEPLOYMENT_ID.to_string(),
deployment_id: deployment_id.to_string(),
service_name: SHUTTLE_SERVICE.to_string(),
tx_timestamp: Some(Timestamp::from(SystemTime::UNIX_EPOCH)),
data: "log 1 example".as_bytes().to_vec(),
},
StoredLogItem {
deployment_id: DEPLOYMENT_ID.to_string(),
deployment_id: deployment_id.to_string(),
service_name: SHUTTLE_SERVICE.to_string(),
tx_timestamp: Some(Timestamp::from(
SystemTime::UNIX_EPOCH
Expand All @@ -152,7 +123,7 @@ async fn get_stream_logs() {
// Subscribe to stream
let mut response = client
.get_logs_stream(Request::new(LogsRequest {
deployment_id: DEPLOYMENT_ID.into(),
deployment_id: deployment_id.into(),
}))
.await
.unwrap()
Expand All @@ -178,3 +149,16 @@ async fn get_stream_logs() {
_ = test => ()
}
}

fn get_server(port: u16) -> JoinHandle<()> {
let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), port);
tokio::task::spawn(async move {
let sqlite = Sqlite::new_in_memory().await;
Server::builder()
.layer(JwtScopesLayer::new(vec![Scope::Logs]))
.add_service(LoggerServer::new(Service::new(sqlite.get_sender(), sqlite)))
.serve(addr)
.await
.unwrap()
})
}