From 8f51cd41d31c615632bc53f5259cb6bd443e1083 Mon Sep 17 00:00:00 2001 From: Pieter Date: Wed, 29 Jun 2022 16:05:16 +0200 Subject: [PATCH] refactor: drop api crate (#229) --- Cargo.lock | 53 --- Cargo.toml | 5 +- api/Cargo.toml | 44 --- api/Containerfile | 26 -- api/Containerfile.dev | 33 -- api/docker/config.toml | 2 - api/docker/entrypoint.sh | 36 -- api/docker/supervisord.conf | 14 - api/src/args.rs | 46 --- api/src/auth.rs | 331 ------------------ api/src/auth_admin.rs | 60 ---- api/src/build.rs | 190 ---------- api/src/deployment.rs | 672 ------------------------------------ api/src/factory.rs | 58 ---- api/src/main.rs | 240 ------------- api/src/proxy.rs | 126 ------- api/src/router.rs | 30 -- api/users.toml | 11 - 18 files changed, 2 insertions(+), 1975 deletions(-) delete mode 100644 api/Cargo.toml delete mode 100644 api/Containerfile delete mode 100644 api/Containerfile.dev delete mode 100644 api/docker/config.toml delete mode 100755 api/docker/entrypoint.sh delete mode 100644 api/docker/supervisord.conf delete mode 100644 api/src/args.rs delete mode 100644 api/src/auth.rs delete mode 100644 api/src/auth_admin.rs delete mode 100644 api/src/build.rs delete mode 100644 api/src/deployment.rs delete mode 100644 api/src/factory.rs delete mode 100644 api/src/main.rs delete mode 100644 api/src/proxy.rs delete mode 100644 api/src/router.rs delete mode 100644 api/users.toml diff --git a/Cargo.lock b/Cargo.lock index 25b3bb12f..d7ca49cb6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -132,39 +132,6 @@ version = "1.0.57" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08f9b8508dccb7687a1d6c4ce66b2b0ecef467c94667de27d8d7fe1f8d2a9cdc" -[[package]] -name = "api" -version = "0.1.2" -dependencies = [ - "anyhow", - "async-mutex", - "async-trait", - "base64 0.13.0", - "cargo", - "cargo-util", - "chrono", - "env_logger", - "fqdn", - "futures", - "hyper", - "hyper-reverse-proxy", - "lazy_static", - "libloading", - "log", - "proto", - "rand 0.8.5", - "rocket", - "serde", - "shuttle-common", - "shuttle-service", - "sqlx", - "structopt", - "tokio", - "toml", - "tonic", - "uuid", -] - [[package]] name = "arrayref" version = "0.3.6" @@ -1702,12 +1669,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "fqdn" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b4b9cc8d7db413f35e3647159e2f726741d605522b7de6f2c59056da0badf79" - [[package]] name = "fuchsia-cprng" version = "0.1.1" @@ -2202,17 +2163,6 @@ dependencies = [ "want", ] -[[package]] -name = "hyper-reverse-proxy" -version = "0.5.2-dev" -source = "git+https://github.com/chesedo/hyper-reverse-proxy?branch=master#a4deffef77685b37fda7224ae678d3d9f00d391e" -dependencies = [ - "hyper", - "lazy_static", - "tokio", - "tracing", -] - [[package]] name = "hyper-timeout" version = "0.4.1" @@ -3617,7 +3567,6 @@ dependencies = [ "rocket_codegen", "rocket_http", "serde", - "serde_json", "state", "tempfile", "time 0.3.9", @@ -3625,7 +3574,6 @@ dependencies = [ "tokio-stream", "tokio-util 0.7.1", "ubyte", - "uuid", "version_check", "yansi", ] @@ -3671,7 +3619,6 @@ dependencies = [ "time 0.3.9", "tokio", "uncased", - "uuid", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index a1a139ed9..ca55219c6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,14 +1,13 @@ [workspace] members = [ - "api", "cargo-shuttle", "codegen", "common", "e2e", + "deployer", "proto", "provisioner", - "service", - "deployer" + "service" ] exclude = [ "examples" diff --git a/api/Cargo.toml b/api/Cargo.toml deleted file mode 100644 index 67d887442..000000000 --- a/api/Cargo.toml +++ /dev/null @@ -1,44 +0,0 @@ -[package] -name = "api" -version = "0.1.2" -edition = "2021" - -[dependencies] -anyhow = "1.0.57" -async-mutex = "1.4.0" -async-trait = "0.1.56" -base64 = "0.13.0" -cargo = "0.62.0" -cargo-util = "0.1.2" -chrono = "0.4.19" -env_logger = "0.9.0" -fqdn = "0.1.9" -futures = "0.3.21" -hyper = { version = "0.14.19", features = ["client", "http1", "http2", "tcp" ] } # for reverse proxying -# not great, but waiting for WebSocket changes to be merged -hyper-reverse-proxy = { git = "https://github.com/chesedo/hyper-reverse-proxy", branch = "master" } -lazy_static = "1.4.0" -libloading = "0.7.3" -log = "0.4.17" -rand = "0.8.5" -rocket = { version = "0.5.0-rc.2", features = ["uuid", "serde_json", "json"] } -serde = "1.0.137" -sqlx = { version = "0.5.13", features = ["runtime-tokio-native-tls", "postgres"] } -structopt = "0.3.26" -tokio = { version = "1.19.2", features = ["full"] } -toml = "0.5.9" -tonic = "0.7.2" -uuid = { version = "1.1.1", features = ["v4"] } - -[dependencies.proto] -version = "0.1.0" -path = "../proto" - -[dependencies.shuttle-common] -version = "0.3.1" -path = "../common" - -[dependencies.shuttle-service] -version = "0.3.3" -path = "../service" -features = ["loader", "secrets"] diff --git a/api/Containerfile b/api/Containerfile deleted file mode 100644 index dcb9b878e..000000000 --- a/api/Containerfile +++ /dev/null @@ -1,26 +0,0 @@ -FROM rust:buster as chef -RUN apt-get update &&\ - apt-get install -y protobuf-compiler -RUN cargo install cargo-chef -WORKDIR app - -FROM rust:buster AS runtime -RUN apt-get update &&\ - apt-get install -y supervisor - -FROM chef AS planner -COPY . . -RUN cargo chef prepare --recipe-path recipe.json - -FROM chef AS builder -COPY --from=planner /app/recipe.json recipe.json -RUN cargo chef cook --recipe-path recipe.json -COPY . . -RUN cargo build --bin api - -FROM runtime -COPY --from=builder /app/target/debug/api /usr/local/bin/shuttle-backend - -COPY api/docker/entrypoint.sh /bin/entrypoint.sh -COPY api/docker/supervisord.conf /usr/share/supervisord/supervisord.conf -ENTRYPOINT ["/bin/entrypoint.sh"] diff --git a/api/Containerfile.dev b/api/Containerfile.dev deleted file mode 100644 index 32158c0b7..000000000 --- a/api/Containerfile.dev +++ /dev/null @@ -1,33 +0,0 @@ -FROM rust:buster as runtime -RUN apt-get update &&\ - apt-get install -y supervisor -RUN rustup component add rust-src - -FROM rust:buster AS chef -RUN apt-get update &&\ - apt-get install -y protobuf-compiler -WORKDIR app -RUN cargo install cargo-chef - -FROM chef AS planner -COPY . . -COPY ./api/docker/config.toml $CARGO_HOME/config.toml -RUN cargo chef prepare --recipe-path recipe.json - -FROM chef AS builder -COPY --from=planner /app/recipe.json recipe.json -RUN cargo chef cook --recipe-path recipe.json -COPY . . -RUN cargo build --bin api - -FROM runtime -WORKDIR / -COPY --from=builder /app/target/debug/api /usr/local/bin/shuttle-backend -COPY --from=builder /app/service /app/service -COPY --from=builder /app/common /app/common -COPY --from=builder /app/codegen /app/codegen - -COPY api/docker/config.toml $CARGO_HOME/config.toml -COPY api/docker/entrypoint.sh /bin/entrypoint.sh -COPY api/docker/supervisord.conf /usr/share/supervisord/supervisord.conf -ENTRYPOINT ["/bin/entrypoint.sh"] diff --git a/api/docker/config.toml b/api/docker/config.toml deleted file mode 100644 index 13e2e1c90..000000000 --- a/api/docker/config.toml +++ /dev/null @@ -1,2 +0,0 @@ -[patch.crates-io] -shuttle-service = { path = "/app/service" } \ No newline at end of file diff --git a/api/docker/entrypoint.sh b/api/docker/entrypoint.sh deleted file mode 100755 index 601e68f95..000000000 --- a/api/docker/entrypoint.sh +++ /dev/null @@ -1,36 +0,0 @@ -#!/usr/bin/env bash - -if [ -z $PROXY_FQDN ] -then - echo "The variable 'PROXY_FQDN' is missing" - exit 1 -fi - -if [ -z $PROVISIONER_ADDRESS ] -then - echo "The variable 'PROVISIONER_ADDRESS' is missing" - exit 1 -fi - -export CRATES_PATH=${CRATES_PATH:-/var/lib/shuttle/crates} - -mkdir -p $CRATES_PATH - -export PROXY_PORT=${PROXY_PORT:-8000} - -export API_PORT=${API_PORT:-8001} - -if [[ ! -z "${SHUTTLE_USERS_TOML}" && ! -s "${SHUTTLE_USERS_TOML}" ]] -then - if [[ -z "${SHUTTLE_INITIAL_KEY}" ]] - then - echo "\$SHUTTLE_INITIAL_KEY is not set to create initial user's key" - exit 1 - fi - - echo "Creating a first user with key '${SHUTTLE_INITIAL_KEY}' at '${SHUTTLE_USERS_TOML}'" - mkdir -p $(dirname "${SHUTTLE_USERS_TOML}") - echo -e "[$SHUTTLE_INITIAL_KEY]\nname = 'first-user'\nprojects = []" > "${SHUTTLE_USERS_TOML}" -fi - -exec supervisord -n -c /usr/share/supervisord/supervisord.conf diff --git a/api/docker/supervisord.conf b/api/docker/supervisord.conf deleted file mode 100644 index bd8c834ed..000000000 --- a/api/docker/supervisord.conf +++ /dev/null @@ -1,14 +0,0 @@ -[supervisord] -loglevel=debug - -[program:shuttle-api] -command=/usr/local/bin/shuttle-backend --path %(ENV_CRATES_PATH)s --bind-addr 0.0.0.0 --api-port %(ENV_API_PORT)s --proxy-port %(ENV_PROXY_PORT)s --proxy-fqdn %(ENV_PROXY_FQDN)s --provisioner-address %(ENV_PROVISIONER_ADDRESS)s -redirect_stderr=true -environment=RUST_BACKTRACE="1",RUST_LOG="debug" -startretries=3 -startsecs=5 -autorestart=true - -[eventlistener:quit_on_failure] -events=PROCESS_STATE_FATAL -command=sh -c 'while true; do echo "READY"; read line; kill -15 1; echo "RESULT 2"; echo "OK"; done' diff --git a/api/src/args.rs b/api/src/args.rs deleted file mode 100644 index 1d20a6498..000000000 --- a/api/src/args.rs +++ /dev/null @@ -1,46 +0,0 @@ -use std::net::IpAddr; -use std::path::PathBuf; -use std::str::FromStr; - -use fqdn::FQDN; -use shuttle_common::Port; -use structopt::StructOpt; - -#[derive(StructOpt)] -#[structopt(name = "shuttle")] -pub struct Args { - #[structopt(long, about = "Override the default root path for shuttle")] - pub(crate) path: Option, - #[structopt( - long, - about = "Override the default port for the proxy", - default_value = "8000" - )] - pub(crate) proxy_port: Port, - #[structopt( - long, - about = "Override the default port for the api", - default_value = "8001" - )] - pub(crate) api_port: Port, - #[structopt( - long, - about = "Override the default bind address", - default_value = "127.0.0.1" - )] - pub(crate) bind_addr: IpAddr, - #[structopt(long, about = "Fully qualified domain name deployed services are reachable at", parse(try_from_str = parse_fqdn))] - pub(crate) proxy_fqdn: FQDN, - #[structopt(long, about = "Address to connect to the provisioning service")] - pub(crate) provisioner_address: String, - #[structopt( - long, - about = "Port provisioner is reachable at", - default_value = "5001" - )] - pub(crate) provisioner_port: Port, -} - -fn parse_fqdn(src: &str) -> Result { - FQDN::from_str(src).map_err(|e| format!("{e:?}")) -} diff --git a/api/src/auth.rs b/api/src/auth.rs deleted file mode 100644 index d4fce14f9..000000000 --- a/api/src/auth.rs +++ /dev/null @@ -1,331 +0,0 @@ -use std::collections::HashMap; -use std::error::Error; -use std::fmt::{Display, Formatter}; -use std::io::Write; -use std::path::PathBuf; -use std::sync::RwLock; - -use anyhow::{anyhow, Context}; -use rand::Rng; -use rocket::form::validate::Contains; -use rocket::http::Status; -use rocket::outcome::try_outcome; -use rocket::request::{FromRequest, Outcome}; -use rocket::{Request, State}; -use serde::{Deserialize, Serialize}; -use shuttle_common::project::ProjectName; -use shuttle_common::DeploymentApiError; - -#[derive(Clone, Debug, PartialEq, Hash, Eq, Serialize, Deserialize, Responder)] -#[serde(transparent)] -pub struct ApiKey(String); - -impl ApiKey { - /// Parses an authorization header string into an ApiKey - pub fn from_authorization_header>(header: S) -> Result { - let s = header.as_ref(); - let parts: Vec<&str> = s.split(' ').collect(); - if parts.len() != 2 { - return Err(AuthorizationError::Malformed(())); - } - // unwrap ok because of explicit check above - let key = *parts.get(1).unwrap(); - // comes in base64 encoded - let decoded_bytes = base64::decode(key).map_err(|_| AuthorizationError::Malformed(()))?; - let mut decoded_string = - String::from_utf8(decoded_bytes).map_err(|_| AuthorizationError::Malformed(()))?; - // remove colon at the end - decoded_string.pop(); - Ok(ApiKey(decoded_string)) - } - - pub fn new_random() -> Self { - Self( - rand::thread_rng() - .sample_iter(&rand::distributions::Alphanumeric) - .take(16) - .map(char::from) - .collect::(), - ) - } -} - -/// A broad class of authorization errors. -/// The empty tuples here are needed by `Responder`. -#[derive(Debug, Responder)] -#[allow(dead_code)] -#[response(content_type = "json")] -pub enum AuthorizationError { - #[response(status = 400)] - Missing(()), - #[response(status = 400)] - Malformed(()), - #[response(status = 401)] - Unauthorized(()), - #[response(status = 409)] - AlreadyExists(()), - #[response(status = 500)] - Internal(()), - #[response(status = 404)] - NotFound(()), -} - -impl Display for AuthorizationError { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - AuthorizationError::Missing(_) => write!(f, "API key is missing"), - AuthorizationError::Malformed(_) => write!(f, "API key is malformed"), - AuthorizationError::Unauthorized(_) => write!(f, "API key is unauthorized"), - AuthorizationError::AlreadyExists(_) => write!(f, "username already exists"), - AuthorizationError::Internal(_) => write!(f, "internal server error"), - AuthorizationError::NotFound(_) => write!(f, "required resource was not found"), - } - } -} - -impl Error for AuthorizationError {} - -/// A wrapper for a Rocket guard that verifies an API key is associated with a -/// valid user. -/// -/// The `FromRequest` impl consumes the API key and verifies it is valid for the -/// a user. Generally you want to use [`ScopedUser`] instead to ensure the request -/// is valid against the user's owned resources. -#[derive(Clone, Deserialize, Serialize, Debug)] -pub(crate) struct User { - pub(crate) name: String, - pub(crate) projects: Vec, -} - -#[async_trait] -impl<'r> FromRequest<'r> for User { - type Error = AuthorizationError; - - async fn from_request(request: &'r Request<'_>) -> Outcome { - if let Some(authorization) = request.headers().get_one("Authorization") { - match ApiKey::from_authorization_header(authorization) { - Ok(api_key) => { - let authorizer: &'r State = - try_outcome!(request.guard().await.map_failure(|(status, ())| { - (status, AuthorizationError::Internal(())) - })); - if let Some(user) = authorizer.user_for_api_key(&api_key) { - Outcome::Success(user) - } else { - Outcome::Failure(( - Status::Unauthorized, - AuthorizationError::Unauthorized(()), - )) - } - } - Err(err) => Outcome::Failure((Status::Unauthorized, err)), - } - } else { - Outcome::Failure((Status::Unauthorized, AuthorizationError::Malformed(()))) - } - } -} - -/// A wrapper for a Rocket guard that validates a user's API key *and* -/// scopes the request to a project they own. -/// -/// It is guaranteed that [`ScopedUser::scope`] exists and is owned -/// by [`ScopedUser::name`]. -pub(crate) struct ScopedUser { - #[allow(dead_code)] - user: User, - scope: ProjectName, -} - -impl ScopedUser { - #[allow(dead_code)] - pub(crate) fn name(&self) -> &str { - &self.user.name - } - - pub(crate) fn scope(&self) -> &ProjectName { - &self.scope - } -} - -#[async_trait] -impl<'r> FromRequest<'r> for ScopedUser { - type Error = AuthorizationError; - - async fn from_request(request: &'r Request<'_>) -> Outcome { - let user = try_outcome!(User::from_request(request).await); - let route = request - .route() - .expect("`User` can only be used in requests"); - if route.uri.base().starts_with("/projects") { - match request.param::(0) { - Some(Ok(scope)) => { - if user.projects.contains(&scope) { - Outcome::Success(Self { user, scope }) - } else { - Outcome::Failure(( - Status::Unauthorized, - AuthorizationError::Unauthorized(()), - )) - } - } - Some(Err(_)) => { - Outcome::Failure((Status::NotFound, AuthorizationError::NotFound(()))) - } - None => { - Outcome::Failure((Status::Unauthorized, AuthorizationError::Unauthorized(()))) - } - } - } else { - panic!("`ScopedUser` can only be used in routes with a /projects/ scope") - } - } -} - -#[derive(Debug)] -pub(crate) struct UserDirectory { - users: RwLock>, -} - -impl UserDirectory { - /// Creates a project if it does not already exist - /// - first there is a check to see if this project exists globally, if yes - /// will return an error since the project already exists - /// - if not, will create the project for the user - /// Finally saves `users` state to `users.toml`. - pub(crate) fn create_project_if_not_exists( - &self, - username: &str, - project_name: &ProjectName, - ) -> Result<(), DeploymentApiError> { - { - let mut users = self.users.write().unwrap(); - - let project_for_name = users - .values() - .flat_map(|users| &users.projects) - .find(|project| project == &project_name); - - if project_for_name.is_some() { - return Err(DeploymentApiError::ProjectAlreadyExists(format!( - "project with name `{}` already exists", - project_name - ))); - } - - // at this point we know that the user does not have this project - // and that another user does not have it - let user = users - .values_mut() - .find(|u| u.name == username) - .ok_or_else(|| { - DeploymentApiError::Internal( - "there was an issue getting the user credentials while validating the project" - .to_string(), - ) - })?; - - user.projects.push(project_name.clone()); - } - self.save(); - - Ok(()) - } - - #[allow(dead_code)] - pub(crate) fn authorize(&self, key: &ApiKey, project_name: &ProjectName) -> Option { - let user = self.user_for_api_key(key)?; - if user.projects.contains(project_name) { - Some(user) - } else { - None - } - } - - /// Find user by username and return it's API Key. - /// if user does not exist create it and update `users` state to `users.toml`. - /// Finally return user's API Key. - pub(crate) fn get_or_create(&self, username: String) -> Result { - let api_key = { - let mut users = self.users.write().unwrap(); - - if let Some((api_key, _)) = users.iter().find(|(_, user)| user.name == username) { - api_key.clone() - } else { - let api_key = ApiKey::new_random(); - - let user = User { - name: username, - projects: vec![], - }; - - users.insert(api_key.clone(), user); - - api_key - } - }; - - self.save(); - - Ok(api_key) - } - - /// Overwrites users.toml with the latest users' field data - fn save(&self) { - // Save the config - let mut users_file = std::fs::OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(Self::users_toml_file_path()) - .unwrap(); - - let users = self.users.read().unwrap(); - - write!(users_file, "{}", toml::to_string_pretty(&*users).unwrap()) - .expect("could not write contents to users.toml"); - } - - fn user_for_api_key(&self, api_key: &ApiKey) -> Option { - self.users.read().unwrap().get(api_key).cloned() - } - - pub(crate) fn from_user_file() -> Result { - let file_path = Self::users_toml_file_path(); - let file_contents: String = std::fs::read_to_string(&file_path).context(anyhow!( - "this should blow up if the users.toml file is not present at {:?}", - &file_path - ))?; - let users = toml::from_str(&file_contents) - .context("this should blow up if the users.toml file is unparseable")?; - let directory = Self { - users: RwLock::new(users), - }; - - log::debug!("initialising user directory: {:#?}", &directory); - - Ok(directory) - } - - fn users_toml_file_path() -> PathBuf { - match std::env::var("SHUTTLE_USERS_TOML") { - Ok(val) => val.into(), - Err(_) => { - log::debug!("could not find environment variable `SHUTTLE_USERS_TOML`, defaulting to MANIFEST_DIR"); - let manifest_path: PathBuf = env!("CARGO_MANIFEST_DIR").into(); - manifest_path.join("users.toml") - } - } - } -} - -#[cfg(test)] -pub mod tests { - use crate::auth::ApiKey; - - #[test] - pub fn test_api_key_parsing() { - let api_key = ApiKey::from_authorization_header("Basic bXlfYXBpX2tleTo=").unwrap(); - assert_eq!(api_key, ApiKey("my_api_key".to_string())) - } -} diff --git a/api/src/auth_admin.rs b/api/src/auth_admin.rs deleted file mode 100644 index e2fc09da6..000000000 --- a/api/src/auth_admin.rs +++ /dev/null @@ -1,60 +0,0 @@ -use lazy_static::lazy_static; -use rocket::http::Status; -use rocket::request::{FromRequest, Outcome}; -use rocket::Request; -use serde::{Deserialize, Serialize}; - -use crate::auth::AuthorizationError; - -lazy_static! { - static ref SHUTTLE_ADMIN_SECRET: String = - std::env::var("SHUTTLE_ADMIN_SECRET").expect("SHUTTLE_ADMIN_SECRET env var not found!"); -} - -#[derive(Debug, PartialEq, Hash, Eq, Deserialize, Serialize, Responder)] -pub struct AdminSecret(String); - -/// Parses an authorization header string into an AdminSecret -impl TryFrom> for AdminSecret { - type Error = AuthorizationError; - - fn try_from(s: Option<&str>) -> Result { - match s { - None => Err(AuthorizationError::Missing(())), - Some(s) => { - let parts: Vec<&str> = s.split(' ').collect(); - - if parts.len() != 2 { - return Err(AuthorizationError::Malformed(())); - } - // unwrap ok because of explicit check above - let secret = *parts.get(1).unwrap(); - - Ok(AdminSecret(secret.to_string())) - } - } - } -} - -#[rocket::async_trait] -impl<'r> FromRequest<'r> for Admin { - type Error = AuthorizationError; - - async fn from_request(req: &'r Request<'_>) -> Outcome { - let admin_secret = match AdminSecret::try_from(req.headers().get_one("Authorization")) { - Ok(admin_secret) => admin_secret, - Err(e) => return Outcome::Failure((Status::BadRequest, e)), - }; - - if admin_secret.0 == *SHUTTLE_ADMIN_SECRET { - Outcome::Success(Admin {}) - } else { - log::warn!("authorization failure for admin secret {:?}", &admin_secret); - - Outcome::Failure((Status::Unauthorized, AuthorizationError::Unauthorized(()))) - } - } -} - -#[derive(Clone, Deserialize, Serialize, Debug)] -pub(crate) struct Admin {} diff --git a/api/src/build.rs b/api/src/build.rs deleted file mode 100644 index 0006da0d1..000000000 --- a/api/src/build.rs +++ /dev/null @@ -1,190 +0,0 @@ -use std::io; -use std::path::{Path, PathBuf}; -use std::process::Command; - -use anyhow::{anyhow, Context, Result}; -use rocket::tokio; -use rocket::tokio::io::AsyncWriteExt; -use shuttle_service::loader::build_crate; -use uuid::Uuid; - -#[cfg(debug_assertions)] -pub const DEFAULT_FS_ROOT: &str = "/tmp/shuttle/crates/"; - -#[cfg(not(debug_assertions))] -// as per: https://stackoverflow.com/questions/1510104/where-to-store-application-data-non-user-specific-on-linux -pub const DEFAULT_FS_ROOT: &str = "/var/lib/shuttle/crates/"; - -pub(crate) struct Build { - pub(crate) so_path: PathBuf, -} - -// remove the trait at some point -#[async_trait] -pub(crate) trait BuildSystem: Send + Sync { - async fn build( - &self, - crate_bytes: &[u8], - project: &str, - buf: Box, - ) -> Result; - - fn fs_root(&self) -> PathBuf; -} - -/// A basic build system that uses the file system for caching and storage -pub(crate) struct FsBuildSystem { - fs_root: PathBuf, -} - -impl FsBuildSystem { - /// Intialises the FS Build System. Optionally you can define the root - /// of its file system. If unspecified, will default to `FS_ROOT`. - /// The FS Build System will fail to intialise if the directory does not. - /// exist - pub(crate) fn initialise(path: Option) -> Result { - let fs_root = path.unwrap_or_else(|| PathBuf::from(DEFAULT_FS_ROOT)); - if !(fs_root.exists()) { - return Err(anyhow!( - r#" - Failed to initialise FS Build System. - The path {:?} does not exist. - Please create the directory to continue with deployment"#, - &fs_root - )); - } - Ok(FsBuildSystem { fs_root }) - } - - /// Given an api key and project name returns a `PathBuf` to the project - /// If the directory does not exist, creates it. - fn project_path(&self, project: &str) -> Result { - let mut project_path = self.fs_root.clone(); - project_path.push(project); - // create directory - std::fs::create_dir_all(&project_path)?; - Ok(project_path) - } -} - -#[async_trait] -impl BuildSystem for FsBuildSystem { - async fn build( - &self, - crate_bytes: &[u8], - project_name: &str, - buf: Box, - ) -> Result { - // project path - let project_path = self.project_path(project_name)?; - debug!("Project path: {}", project_path.display()); - - // clear directory - clear_project_dir(&project_path).context( - "there was an issue cleaning the project directory. Please try again in a bit.", - )?; - - // crate path - let crate_path = crate_location(&project_path, project_name); - debug!("Crate path: {}", crate_path.display()); - - // create target file - let mut target_file = tokio::fs::File::create(&crate_path).await?; - - // write bytes to file - target_file.write_all(crate_bytes).await?; - - // extract tarball - extract_tarball(&crate_path, &project_path)?; - - // run cargo build (--debug for now) - let so_path = build_crate(&project_path, buf)?; - - // create uniquely named so file to satisfy `libloading` - let so_path = create_unique_named_so_file(&project_path, &so_path)?; - - // create marker file - create_so_marker(&project_path, &so_path)?; - - Ok(Build { so_path }) - } - - fn fs_root(&self) -> PathBuf { - self.fs_root.clone() - } -} - -/// Creates a marker file with the location of the `so` file -/// so that we can use it when bootstrapping the deployment -/// system -fn create_so_marker(project_path: &Path, so_path: &Path) -> Result<()> { - let marker_path = project_path.join(".shuttle_marker"); - // unwraps here are ok since we are writing a valid `Path` - Ok(std::fs::write(&marker_path, so_path.to_str().unwrap())?) -} - -/// Copies the original `so` file to the project directory with a random name -/// to appease `libloading`. -fn create_unique_named_so_file(project_path: &Path, so_path: &Path) -> Result { - let so_unique_path = project_path.join(&format!("{}.so", Uuid::new_v4())); - std::fs::copy(so_path, &so_unique_path)?; - Ok(so_unique_path) -} - -/// Clear everything which is not the target folder from the project path -fn clear_project_dir(project_path: &Path) -> Result<()> { - // remove everything except for the target folder - std::fs::read_dir(project_path)? - .into_iter() - .filter_map(|dir| dir.ok()) - .filter(|dir| { - if dir.file_name() == "target" { - return false; - } - - if let Some(Some("so")) = dir.path().extension().map(|f| f.to_str()) { - return false; - } - - true - }) - .try_for_each::<_, Result<_, io::Error>>(|dir| { - if let Ok(file) = dir.file_type() { - if file.is_dir() { - std::fs::remove_dir_all(&dir.path())?; - } else if file.is_file() { - std::fs::remove_file(&dir.path())?; - } else if file.is_symlink() { - // there shouldn't be any symlinks here - unimplemented!() - } - } - Ok(()) - })?; - Ok(()) -} - -/// Given a project path and a project name, return the location of the .crate file -fn crate_location(project_path: &Path, project_name: &str) -> PathBuf { - project_path.join(project_name).with_extension("crate") -} - -/// Given a .crate file (which is a gzipped tarball), extracts the contents -/// into the project_path -fn extract_tarball(crate_path: &Path, project_path: &Path) -> Result<()> { - let output = Command::new("tar") - .arg("-xzvf") // extract - .arg(crate_path) - .arg("-C") // target - .arg(project_path) - .arg("--strip-components") // remove top-level directory - .arg("1") - .arg("--touch") // touch to update mtime for cargo - .output()?; - if !output.status.success() { - let err = String::from_utf8(output.stderr).unwrap_or_default(); - Err(anyhow::Error::msg(err).context(anyhow!("failed to unpack cargo archive"))) - } else { - Ok(()) - } -} diff --git a/api/src/deployment.rs b/api/src/deployment.rs deleted file mode 100644 index 28977ab14..000000000 --- a/api/src/deployment.rs +++ /dev/null @@ -1,672 +0,0 @@ -use core::default::Default; -use std::collections::HashMap; -use std::fs::DirEntry; -use std::io::Write; -use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4, TcpListener}; -use std::path::{Path, PathBuf}; -use std::sync::Arc; - -use anyhow::{anyhow, Context as AnyhowContext}; -use chrono::{DateTime, Utc}; -use futures::prelude::*; -use libloading::Library; -use proto::provisioner::provisioner_client::ProvisionerClient; -use rocket::data::ByteUnit; -use rocket::{tokio, Data}; -use shuttle_common::project::ProjectName; -use shuttle_common::{ - DeploymentApiError, DeploymentId, DeploymentMeta, DeploymentStateMeta, Host, LogItem, Port, -}; -use shuttle_service::loader::Loader; -use shuttle_service::logger::Log; -use shuttle_service::ServeHandle; -use tokio::sync::mpsc::UnboundedSender; -use tokio::sync::{mpsc, RwLock}; -use tonic::transport::{Channel, Endpoint}; - -use crate::build::Build; -use crate::router::Router; -use crate::{BuildSystem, ShuttleFactory}; - -// This controls the maximum number of deploys an api instance can run -// This is mainly needed because tokio::task::spawn_blocking keeps an internal pool for the number of blocking threads -// and we call this method to run each incoming service. Therefore, this variable directly maps to this maximum pool -// when the runtime is setup in main() -// The current tokio default for this pool is 512 -// https://docs.rs/tokio/latest/tokio/runtime/struct.Builder.html#method.max_blocking_threads -pub const MAX_DEPLOYS: usize = 512; - -/// Inner struct of a deployment which holds the deployment itself -/// and the some metadata -pub(crate) struct Deployment { - meta: Arc>, - state: RwLock, -} - -impl Deployment { - fn new(meta: DeploymentMeta, state: DeploymentState) -> Self { - Self { - meta: Arc::new(RwLock::new(meta)), - state: RwLock::new(state), - } - } - - fn from_bytes(fqdn: &str, project: ProjectName, crate_bytes: Vec) -> Self { - Self { - meta: Arc::new(RwLock::new(DeploymentMeta::queued(fqdn, project))), - state: RwLock::new(DeploymentState::queued(crate_bytes)), - } - } - - /// Initialise a deployment from a directory - fn from_directory(fqdn: &str, dir: DirEntry) -> Result { - let project_path = dir.path(); - let project_name = dir - .file_name() - .into_string() - .map_err(|err| anyhow!("invalid project name: {:?}", err)) - .and_then(|name| name.parse::().map_err(|err| err.into()))?; - // find marker which points to so file - let marker_path = project_path.join(".shuttle_marker"); - let so_path_str = std::fs::read(&marker_path).context(anyhow!( - "could not find so marker file at {:?}", - marker_path - ))?; - - let so_path: PathBuf = String::from_utf8_lossy(&so_path_str) - .parse() - .context("could not parse contents of marker file to a valid path")?; - - let meta = DeploymentMeta::built(fqdn, project_name); - let state = DeploymentState::built(Build { so_path }); - Ok(Self::new(meta, state)) - } - - /// Gets a `clone`ed copy of the metadata. - pub(crate) async fn meta(&self) -> DeploymentMeta { - trace!("trying to get meta"); - self.meta.read().await.clone() - } - - pub(crate) async fn deployment_active(&self) -> bool { - matches!(*self.state.read().await, DeploymentState::Deployed(_)) - } - - /// Evaluates if the deployment can be advanced. If the deployment - /// has reached a state where it can no longer `advance`, returns `false`. - pub(crate) async fn deployment_finished(&self) -> bool { - match *self.state.read().await { - DeploymentState::Queued(_) | DeploymentState::Built(_) | DeploymentState::Loaded(_) => { - false - } - DeploymentState::Deployed(_) | DeploymentState::Error(_) | DeploymentState::Deleted => { - true - } - } - } - - /// Tries to advance the deployment one stage. Does nothing if the deployment - /// is in a terminal state. - pub(crate) async fn advance(&self, context: &Context, run_logs_tx: UnboundedSender) { - { - trace!("waiting to get write on the state"); - let meta = self.meta().await; - let mut state = self.state.write().await; - - *state = match state.take() { - DeploymentState::Queued(queued) => { - debug!("deployment '{}' build starting...", &meta.id); - - let console_writer = BuildOutputWriter::new(self.meta.clone()); - match context - .build_system - .build( - &queued.crate_bytes, - meta.project.as_str(), - Box::new(console_writer), - ) - .await - { - Ok(build) => DeploymentState::built(build), - Err(e) => { - dbg!("failed to build with error: {}", &e); - DeploymentState::Error(e) - } - } - } - DeploymentState::Built(built) => { - debug!( - "deployment '{}' loading shared object and service...", - &meta.id - ); - - match Loader::from_so_file(&built.build.so_path) { - Ok(loader) => DeploymentState::loaded(loader), - Err(e) => { - debug!("failed to load with error: {}", &e); - DeploymentState::Error(e.into()) - } - } - } - DeploymentState::Loaded(loader) => { - let port = identify_free_port(); - - debug!( - "deployment '{}' getting deployed on port {}...", - meta.id, port - ); - - debug!("{}: factory phase", meta.project); - - let mut factory = ShuttleFactory::new( - context.provisioner_client.clone(), - context.provisioner_address.clone(), - meta.project.clone(), - ); - let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), port); - match loader.load(&mut factory, addr, run_logs_tx, meta.id).await { - Err(e) => { - debug!("{}: factory phase FAILED: {:?}", meta.project, e); - DeploymentState::Error(e.into()) - } - Ok((handle, so)) => { - debug!("{}: factory phase DONE", meta.project); - self.meta.write().await.database_deployment = - factory.into_database_info(); - - // Remove stale active deployments - if let Some(stale_id) = context.router.promote(meta.host, meta.id).await - { - debug!("removing stale deployment `{}`", &stale_id); - context.deployments.write().await.remove(&stale_id); - } - - DeploymentState::deployed(so, port, handle) - } - } - } - deployed_or_error => deployed_or_error, /* nothing to do here */ - }; - } - - // ensures that the metadata state is inline with the actual state. This - // can go when we have an API layer. - self.update_meta_state().await; - } - - async fn update_meta_state(&self) { - self.meta.write().await.state = self.state.read().await.meta() - } - - async fn port(&self) -> Option { - match &*self.state.read().await { - DeploymentState::Deployed(deployed) => Some(deployed.port), - _ => None, - } - } - - async fn add_runtime_log(&self, datetime: DateTime, log: LogItem) { - self.meta.write().await.runtime_logs.insert(datetime, log); - } -} - -/// Provides a `Write` wrapper around the build logs - i.e., the build output -/// is written into our build logs using this wrapper. -struct BuildOutputWriter { - meta: Arc>, - buf: String, -} - -impl BuildOutputWriter { - pub fn new(meta: Arc>) -> Self { - Self { - meta, - buf: String::new(), - } - } -} - -impl Write for BuildOutputWriter { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - let write_len = buf.len(); - let is_new_line = buf[write_len - 1] == b'\n'; - if let Ok(buf) = std::str::from_utf8(buf) { - self.buf.push_str(buf); - - // The flush step introduces async code which can potentially execute out of order. - // For example, if the write is called with inputs a, b, and c then a new threads will be - // spawned to add a, b, and c to meta. However, the threads might execute in others b, a, - // and c if they are too close to one another. This `write` method seems to be called for - // every character which causes many threads with overlapping lifetimes and therefore - // many out of order executions which just mess up the log output. - // Since line orders rarely matter and only spawning a thread for each output line also - // reduces the likelihood of threads with overlapping lifetimes, the guard exists. - if is_new_line { - // Safe to unwrap since `flush` has no errors internally - self.flush().unwrap(); - } - - return Ok(write_len); - } - - Ok(0) - } - - fn flush(&mut self) -> std::io::Result<()> { - let handle = tokio::runtime::Handle::current(); - let meta = self.meta.clone(); - let buf = self.buf.clone(); - self.buf = String::new(); - - // Can't call `block_on` on a thread that already has a tokio executor, so spawn a new one - std::thread::spawn(move || { - handle.block_on(async { - meta.write() - .await - .build_logs - .get_or_insert("".to_string()) - .push_str(&buf) - }); - }); - - Ok(()) - } -} - -// Make sure to clean the buffer one last time -impl Drop for BuildOutputWriter { - fn drop(&mut self) { - self.flush().unwrap(); - } -} - -type Deployments = HashMap>; - -/// The top-level manager for deployments. Is responsible for their creation -/// and lifecycle. -pub(crate) struct DeploymentSystem { - deployments: Arc>, - job_queue: JobQueue, - router: Arc, - fqdn: String, - pub(crate) provisioner_address: String, -} - -const JOB_QUEUE_SIZE: usize = 200; - -struct JobQueue { - send: mpsc::Sender>, -} - -impl JobQueue { - async fn new(context: Context, run_logs_tx: UnboundedSender) -> Self { - let (send, mut recv) = mpsc::channel::>(JOB_QUEUE_SIZE); - - log::debug!("starting job processor task"); - - tokio::spawn(async move { - while let Some(deployment) = recv.recv().await { - let id = deployment.meta().await.id; - - log::debug!("started deployment job for deployment '{}'", id); - - while !deployment.deployment_finished().await { - let run_logs_tx = run_logs_tx.clone(); - - deployment.advance(&context, run_logs_tx).await; - } - - debug!("ended deployment job for id: '{}'", id); - } - - log::debug!("job processor task ended"); - }); - - Self { send } - } - - async fn push(&self, deployment: Arc) { - self.send - .send(deployment) - .await - .unwrap_or_else(|_| panic!("deployment job queue unexpectedly closed")); - } -} - -/// Convenience struct used to store a bunch of stuff needed for the job -/// processor. -pub(crate) struct Context { - router: Arc, - build_system: Box, - deployments: Arc>, - provisioner_client: ProvisionerClient, - provisioner_address: String, -} - -impl DeploymentSystem { - pub(crate) async fn new( - build_system: Box, - fqdn: String, - provisioner_address: String, - provisioner_port: Port, - ) -> Self { - let router: Arc = Default::default(); - let (tx, mut rx) = mpsc::unbounded_channel::(); - - let deployments = Arc::new(RwLock::new( - Self::initialise_from_fs(&build_system.fs_root(), &fqdn).await, - )); - - let deployments_log = deployments.clone(); - - tokio::spawn(async move { - while let Some(log) = rx.recv().await { - let mut deployments_log = deployments_log.write().await; - - if let Some(deployment) = deployments_log.get_mut(&log.deployment_id) { - deployment.add_runtime_log(log.datetime, log.item).await; - } - } - }); - - let provisioner_uri = Endpoint::try_from(format!( - "http://{}:{}", - provisioner_address, provisioner_port - )) - .expect("provisioner uri to be valid"); - - let provisioner_client = ProvisionerClient::connect(provisioner_uri) - .await - .expect("failed to connect to provisioner"); - - let context = Context { - router: router.clone(), - build_system, - deployments: deployments.clone(), - provisioner_client, - provisioner_address: provisioner_address.clone(), - }; - - let job_queue = JobQueue::new(context, tx).await; - - debug!("loading deployments into job processor"); - for deployment in deployments.read().await.values() { - debug!("loading deployment: {:?}", deployment.meta); - job_queue.push(deployment.clone()).await; - } - - Self { - deployments, - job_queue, - router, - fqdn, - provisioner_address, - } - } - - /// Traverse the build directory re-create deployments. - /// If a project could not be re-created, this will get logged and skipped. - async fn initialise_from_fs(fs_root: &Path, fqdn: &str) -> Deployments { - let mut deployments = HashMap::default(); - for project_dir in std::fs::read_dir(fs_root).unwrap() { - let project_dir = match project_dir { - Ok(project_dir) => project_dir, - Err(e) => { - warn!("failed to read directory for project with error `{:?}`", e); - warn!("skipping..."); - continue; - } - }; - let project_name = project_dir.file_name(); - match Deployment::from_directory(fqdn, project_dir) { - Err(e) => { - warn!( - "failed to re-create deployment for project `{:?}` with error: {:?}", - project_name, e - ); - } - Ok(deployment) => { - let deployment = Arc::new(deployment); - let info = deployment.meta().await; - deployments.insert(info.id, deployment.clone()); - } - } - } - deployments - } - - /// Returns the port for a given host. If the host does not exist, returns - /// `None`. - pub(crate) async fn port_for_host(&self, host: &Host) -> Option { - let id_for_host = self.router.id_for_host(host).await?; - self.deployments - .read() - .await - .get(&id_for_host)? - .port() - .await - } - - /// Retrieves a clone of the deployment information - pub(crate) async fn get_deployment( - &self, - id: &DeploymentId, - ) -> Result { - match self.deployments.read().await.get(id) { - Some(deployment) => Ok(deployment.meta().await), - None => Err(DeploymentApiError::NotFound(format!( - "could not find deployment for id '{}'", - &id - ))), - } - } - - /// Retrieves a clone of the deployment information - /// for a given project. If there are multiple deployments - /// for a given project, will return the latest. - pub(crate) async fn get_deployment_for_project( - &self, - project_name: &ProjectName, - ) -> Result { - let mut candidates = Vec::new(); - - for deployment in self.deployments.read().await.values() { - if deployment.meta.read().await.project == *project_name { - candidates.push(deployment.meta().await); - } - } - - let latest = candidates - .into_iter() - .max_by(|d1, d2| d1.created_at.cmp(&d2.created_at)); - - match latest { - Some(latest) => Ok(latest), - None => Err(DeploymentApiError::NotFound(format!( - "could not find deployment for project '{}'", - &project_name - ))), - } - } - - pub(crate) async fn kill_deployment_for_project( - &self, - project_name: &ProjectName, - ) -> Result { - let id = self.get_deployment_for_project(project_name).await?.id; - self.kill_deployment(&id).await - } - - /// Remove a deployment from the deployments hash map and, if it has - /// already been deployed, kill the Tokio task in which it is running - /// and deallocate the linked library. - pub(crate) async fn kill_deployment( - &self, - id: &DeploymentId, - ) -> Result { - match self.deployments.write().await.remove(id) { - Some(deployment) => { - let mut meta = deployment.meta().await; - - // If the deployment is in the 'deployed' state, kill the Tokio - // task in which it is deployed and deallocate the linked - // library when the runtime gets around to it. - - let mut lock = deployment.state.write().await; - if let DeploymentState::Deployed(DeployedState { so, handle, .. }) = lock.take() { - handle.abort(); - - tokio::spawn(async move { - so.close().unwrap(); - }); - - match handle.await { - Err(err) if err.is_cancelled() => {} - other => other - .map_err(|e| DeploymentApiError::Internal(e.to_string()))? - .map_err(|e| DeploymentApiError::Internal(e.to_string()))?, - }; - } - - let _ = self.router.remove(&meta.host); - - meta.state = DeploymentStateMeta::Deleted; - - Ok(meta) - } - None => Err(DeploymentApiError::NotFound(String::new())), - } - } - - pub(crate) async fn num_active(&self) -> usize { - let deployments = self - .deployments - .read() - .await - .values() - .cloned() - .collect::>(); - stream::unfold(deployments, |mut deployments| async move { - Some((deployments.pop()?.deployment_active().await, deployments)) - }) - .filter(|is_active| future::ready(*is_active)) - .count() - .await - } - - /// Main way to interface with the deployment manager. - /// Will take a crate through the whole lifecycle. - pub(crate) async fn deploy( - &self, - crate_file: Data<'_>, - project: ProjectName, - ) -> Result { - // Assumes that only `::Deployed` deployments are blocking a thread. - if self.num_active().await >= MAX_DEPLOYS { - return Err(DeploymentApiError::Unavailable( - "this instance has reached its maximum number of supported deployments".to_string(), - )); - }; - - let crate_bytes = crate_file - .open(ByteUnit::max_value()) - .into_bytes() - .await - .map_err(|_| { - DeploymentApiError::BadRequest("could not read crate file into bytes".to_string()) - })? - .to_vec(); - - let deployment = Arc::new(Deployment::from_bytes(&self.fqdn, project, crate_bytes)); - - let info = deployment.meta().await; - - self.deployments - .write() - .await - .insert(info.id, deployment.clone()); - - self.job_queue.push(deployment).await; - - Ok(info) - } -} - -/// Call on the operating system to identify an available port on which a -/// deployment may then be hosted. -fn identify_free_port() -> u16 { - let ip = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0); - TcpListener::bind(ip).unwrap().local_addr().unwrap().port() -} - -/// Finite-state machine representing the various stages of the build -/// process. -enum DeploymentState { - /// Deployment waiting to be built. - Queued(QueuedState), - /// Built deployment that is ready and waiting to be loaded. - Built(BuiltState), - /// Deployment is loaded into the server application as a - /// dynamically-linked library (`.so` file). The [`libloading`] crate has - /// been used to achieve this and to obtain this particular deployment's - /// implementation of the [`shuttle_service::Service`] trait. - Loaded(Loader), - /// Deployment that is actively running inside a Tokio task and listening - /// for connections on some port indicated in [`DeployedState`]. - Deployed(DeployedState), - /// A state entered when something unexpected occurs during the deployment - /// process. - Error(anyhow::Error), - /// A state indicating that the user has intentionally terminated this - /// deployment - #[allow(dead_code)] - Deleted, -} - -impl DeploymentState { - fn take(&mut self) -> Self { - std::mem::replace(self, DeploymentState::Deleted) - } - - fn queued(crate_bytes: Vec) -> Self { - Self::Queued(QueuedState { crate_bytes }) - } - - fn built(build: Build) -> Self { - Self::Built(BuiltState { build }) - } - - fn loaded(loader: Loader) -> Self { - Self::Loaded(loader) - } - - fn deployed(so: Library, port: Port, handle: ServeHandle) -> Self { - Self::Deployed(DeployedState { so, port, handle }) - } - - fn meta(&self) -> DeploymentStateMeta { - match self { - DeploymentState::Queued(_) => DeploymentStateMeta::Queued, - DeploymentState::Built(_) => DeploymentStateMeta::Built, - DeploymentState::Loaded(_) => DeploymentStateMeta::Loaded, - DeploymentState::Deployed(_) => DeploymentStateMeta::Deployed, - DeploymentState::Error(e) => DeploymentStateMeta::Error(format!("{:#?}", e)), - DeploymentState::Deleted => DeploymentStateMeta::Deleted, - } - } -} - -struct QueuedState { - crate_bytes: Vec, -} - -struct BuiltState { - build: Build, -} - -#[allow(dead_code)] -struct DeployedState { - so: Library, - port: Port, - handle: ServeHandle, -} diff --git a/api/src/factory.rs b/api/src/factory.rs deleted file mode 100644 index f55dbf250..000000000 --- a/api/src/factory.rs +++ /dev/null @@ -1,58 +0,0 @@ -use async_trait::async_trait; -use proto::provisioner::{provisioner_client::ProvisionerClient, DatabaseRequest}; -use shuttle_common::{project::ProjectName, DatabaseReadyInfo}; -use shuttle_service::Factory; -use tonic::{transport::Channel, Request}; - -pub(crate) struct ShuttleFactory { - project_name: ProjectName, - provisioner_client: ProvisionerClient, - provisioner_address: String, - info: Option, -} - -impl ShuttleFactory { - pub(crate) fn new( - provisioner_client: ProvisionerClient, - provisioner_address: String, - project_name: ProjectName, - ) -> Self { - Self { - provisioner_client, - provisioner_address, - project_name, - info: None, - } - } - - pub(crate) fn into_database_info(self) -> Option { - self.info - } -} - -#[async_trait] -impl Factory for ShuttleFactory { - async fn get_sql_connection_string(&mut self) -> Result { - if let Some(ref info) = self.info { - return Ok(info.connection_string(&self.provisioner_address)); - } - - let request = Request::new(DatabaseRequest { - project_name: self.project_name.to_string(), - }); - - let response = self - .provisioner_client - .provision_database(request) - .await - .map_err(shuttle_service::error::CustomError::new)? - .into_inner(); - - let info: DatabaseReadyInfo = response.into(); - let conn_str = info.connection_string(&self.provisioner_address); - self.info = Some(info); - - debug!("giving a sql connection string: {}", conn_str); - Ok(conn_str) - } -} diff --git a/api/src/main.rs b/api/src/main.rs deleted file mode 100644 index f14170793..000000000 --- a/api/src/main.rs +++ /dev/null @@ -1,240 +0,0 @@ -#[macro_use] -extern crate rocket; - -#[macro_use] -extern crate log; - -mod args; -mod auth; -mod auth_admin; -mod build; -mod deployment; -mod factory; -mod proxy; -mod router; - -use std::collections::HashMap; -use std::net::IpAddr; -use std::sync::Arc; - -use auth_admin::Admin; -use deployment::MAX_DEPLOYS; -use factory::ShuttleFactory; -use rocket::serde::json::Json; -use rocket::{tokio, Build, Data, Rocket, State}; -use shuttle_common::project::ProjectName; -use shuttle_common::{DeploymentApiError, DeploymentMeta, Port}; -use shuttle_service::SecretStore; -use structopt::StructOpt; -use uuid::Uuid; - -use crate::args::Args; -use crate::auth::{ApiKey, AuthorizationError, ScopedUser, User, UserDirectory}; -use crate::build::{BuildSystem, FsBuildSystem}; -use crate::deployment::DeploymentSystem; - -type ApiResult = Result, E>; - -/// Find user by username and return it's API Key. -/// if user does not exist create it and update `users` state to `users.toml`. -/// Finally return user's API Key. -#[post("/users/")] -async fn get_or_create_user( - user_directory: &State, - username: String, - _admin: Admin, -) -> Result { - user_directory.get_or_create(username) -} - -/// Status API to be used to check if the service is alive -#[get("/status")] -async fn status() {} - -#[get("/version")] -async fn version() -> String { - String::from(shuttle_service::VERSION) -} - -#[get("/<_>/deployments/")] -async fn get_deployment( - state: &State, - id: Uuid, - _user: ScopedUser, -) -> ApiResult { - info!("[GET_DEPLOYMENT, {}, {}]", _user.name(), _user.scope()); - let deployment = state.deployment_manager.get_deployment(&id).await?; - Ok(Json(deployment)) -} - -#[delete("/<_>/deployments/")] -async fn delete_deployment( - state: &State, - id: Uuid, - _user: ScopedUser, -) -> ApiResult { - info!("[DELETE_DEPLOYMENT, {}, {}]", _user.name(), _user.scope()); - // TODO why twice? - let _deployment = state.deployment_manager.get_deployment(&id).await?; - let deployment = state.deployment_manager.kill_deployment(&id).await?; - Ok(Json(deployment)) -} - -#[get("/<_>")] -async fn get_project( - state: &State, - user: ScopedUser, -) -> ApiResult { - info!("[GET_PROJECT, {}, {}]", user.name(), user.scope()); - - let deployment = state - .deployment_manager - .get_deployment_for_project(user.scope()) - .await?; - - Ok(Json(deployment)) -} - -#[delete("/<_>")] -async fn delete_project( - state: &State, - user: ScopedUser, -) -> ApiResult { - info!("[DELETE_PROJECT, {}, {}]", user.name(), user.scope()); - - let deployment = state - .deployment_manager - .kill_deployment_for_project(user.scope()) - .await?; - Ok(Json(deployment)) -} - -#[post("/", data = "")] -async fn create_project( - state: &State, - user_directory: &State, - crate_file: Data<'_>, - project_name: ProjectName, - user: User, -) -> ApiResult { - info!("[CREATE_PROJECT, {}, {}]", &user.name, &project_name); - - if !user - .projects - .iter() - .any(|my_project| *my_project == project_name) - { - user_directory.create_project_if_not_exists(&user.name, &project_name)?; - } - let deployment = state - .deployment_manager - .deploy(crate_file, project_name) - .await?; - Ok(Json(deployment)) -} - -#[post("//secrets", data = "")] -async fn project_secrets( - state: &State, - secrets: Json>, - project_name: ProjectName, - user: ScopedUser, -) -> ApiResult { - info!("[PROJECT_SECRETS, {}, {}]", user.name(), &project_name); - - let deployment = state - .deployment_manager - .get_deployment_for_project(user.scope()) - .await?; - - if let Some(database_deployment) = &deployment.database_deployment { - let conn_str = - database_deployment.connection_string(&state.deployment_manager.provisioner_address); - let conn = sqlx::PgPool::connect(&conn_str) - .await - .map_err(|e| DeploymentApiError::Internal(e.to_string()))?; - - let map = secrets.into_inner(); - for (key, value) in map.iter() { - conn.set_secret(key, value) - .await - .map_err(|e| DeploymentApiError::BadRequest(e.to_string()))?; - } - } - - Ok(Json(deployment)) -} - -struct ApiState { - deployment_manager: Arc, -} - -fn main() -> Result<(), Box> { - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .max_blocking_threads(MAX_DEPLOYS) - .build() - .unwrap() - .block_on(async { - let _rocket = rocket().await.launch().await?; - - Ok(()) - }) -} - -//noinspection ALL -async fn rocket() -> Rocket { - env_logger::Builder::new() - .filter_module("rocket", log::LevelFilter::Warn) - .filter_module("_", log::LevelFilter::Warn) - .filter_module("api", log::LevelFilter::Debug) - .init(); - - let args: Args = Args::from_args(); - let build_system = FsBuildSystem::initialise(args.path).unwrap(); - let deployment_manager = Arc::new( - DeploymentSystem::new( - Box::new(build_system), - args.proxy_fqdn.to_string(), - args.provisioner_address, - args.provisioner_port, - ) - .await, - ); - - start_proxy(args.bind_addr, args.proxy_port, deployment_manager.clone()).await; - - let state = ApiState { deployment_manager }; - - let user_directory = - UserDirectory::from_user_file().expect("could not initialise user directory"); - - let config = rocket::Config { - address: args.bind_addr, - port: args.api_port, - ..Default::default() - }; - rocket::custom(config) - .mount( - "/projects", - routes![ - delete_deployment, - get_deployment, - delete_project, - create_project, - get_project, - project_secrets - ], - ) - .mount("/", routes![get_or_create_user, status, version]) - .manage(state) - .manage(user_directory) -} - -async fn start_proxy( - bind_addr: IpAddr, - proxy_port: Port, - deployment_manager: Arc, -) { - tokio::spawn(async move { proxy::start(bind_addr, proxy_port, deployment_manager).await }); -} diff --git a/api/src/proxy.rs b/api/src/proxy.rs deleted file mode 100644 index d9d2fd885..000000000 --- a/api/src/proxy.rs +++ /dev/null @@ -1,126 +0,0 @@ -use std::convert::Infallible; -use std::net::{IpAddr, SocketAddr}; -use std::sync::Arc; - -use ::hyper::server::conn::AddrStream; -use ::hyper::server::Server; -use ::hyper::service::{make_service_fn, service_fn}; -use ::hyper::{Body, Request, Response, StatusCode}; -use hyper::client::connect::dns::GaiResolver; -use hyper::client::HttpConnector; -use hyper::header::{HeaderValue, SERVER}; -use hyper::Client; -use hyper_reverse_proxy::{ProxyError, ReverseProxy}; -use lazy_static::lazy_static; -use shuttle_common::Port; - -use crate::DeploymentSystem; - -lazy_static! { - static ref HEADER_SERVER: HeaderValue = "shuttle.rs".parse().unwrap(); - static ref PROXY_CLIENT: ReverseProxy> = - ReverseProxy::new(Client::new()); -} - -pub(crate) async fn start( - bind_addr: IpAddr, - proxy_port: Port, - deployment_manager: Arc, -) { - let socket_address = (bind_addr, proxy_port).into(); - - // A `Service` is needed for every connection. - let make_svc = make_service_fn(|socket: &AddrStream| { - let dm_ref = deployment_manager.clone(); - let remote_addr = socket.remote_addr(); - async move { - Ok::<_, Infallible>(service_fn(move |req| { - handle(remote_addr, req, dm_ref.clone()) - })) - } - }); - - let server = Server::bind(&socket_address).serve(make_svc); - - log::debug!("starting proxy server: {}", &socket_address); - - if let Err(e) = server.await { - eprintln!("server error: {}", e); - eprintln!("proxy died, killing process..."); - std::process::exit(1); - } -} - -async fn handle( - remote_addr: SocketAddr, - req: Request, - deployment_manager: Arc, -) -> Result, Infallible> { - // if no `Host:` or invalid value, return 400 - let host = match req.headers().get("Host") { - Some(host) if host.to_str().is_ok() => host.to_str().unwrap().to_owned(), - _ => { - return Ok(Response::builder() - .status(StatusCode::BAD_REQUEST) - .body(Body::empty()) - .unwrap()) - } - }; - - // if we could not get a port from the deployment manager, - // the host does not exist or is not initialised yet - so - // we return a 404 - let port = match deployment_manager.port_for_host(&host).await { - None => { - // no port being assigned here means that we couldn't - // find a service for a given host - let response_body = format!("could not find service for host: {}", host); - return Ok(Response::builder() - .status(StatusCode::NOT_FOUND) - .body(response_body.into()) - .unwrap()); - } - Some(port) => port, - }; - - match reverse_proxy(remote_addr.ip(), port, req).await { - Ok(response) => { - info!("[PROXY, {}]", &host); - Ok(response) - } - Err(error) => { - match error { - ProxyError::InvalidUri(e) => { - log::warn!("error while handling request in reverse proxy: {}", e); - } - ProxyError::HyperError(e) => { - log::warn!("error while handling request in reverse proxy: {}", e); - } - ProxyError::ForwardHeaderError => { - log::warn!("error while handling request in reverse proxy: 'fwd header error'"); - } - ProxyError::UpgradeError(e) => log::warn!( - "error while handling request needing upgrade in reverse proxy: {}", - e - ), - }; - Ok(Response::builder() - .status(StatusCode::INTERNAL_SERVER_ERROR) - .body(Body::empty()) - .unwrap()) - } - } -} - -async fn reverse_proxy( - ip: IpAddr, - port: Port, - req: Request, -) -> Result, ProxyError> { - let forward_uri = format!("http://127.0.0.1:{}", port); - let mut response = PROXY_CLIENT.call(ip, &forward_uri, req).await?; - - response.headers_mut().insert(SERVER, HEADER_SERVER.clone()); - - Ok(response) -} diff --git a/api/src/router.rs b/api/src/router.rs deleted file mode 100644 index 01ee94984..000000000 --- a/api/src/router.rs +++ /dev/null @@ -1,30 +0,0 @@ -use std::collections::HashMap; - -use rocket::tokio::sync::RwLock; -use shuttle_common::{DeploymentId, Host}; - -#[derive(Default)] -/// Deployment router which figures out which `DeploymentId` -/// a `Host` corresponds to -pub(crate) struct Router { - table: RwLock>, -} - -impl Router { - /// Promotes a new `DeploymentId` to a give `Host`. Optionally returns - /// the old `DeploymentId` if it existed. - pub(crate) async fn promote(&self, host: Host, id: DeploymentId) -> Option { - self.table.write().await.insert(host, id) - } - - /// Gets a `DeploymentId` for a given `Host`. Returns `None` if it - /// does not exist. - pub(crate) async fn id_for_host(&self, host: &Host) -> Option { - self.table.read().await.get(host).copied() - } - - /// Removes an entry for a given `Host` - pub(crate) async fn remove(&self, host: &Host) -> Option { - self.table.write().await.remove(host) - } -} diff --git a/api/users.toml b/api/users.toml deleted file mode 100644 index 9646d9e58..000000000 --- a/api/users.toml +++ /dev/null @@ -1,11 +0,0 @@ -[ci-test] -name = 'ci' -projects = [ - 'hello-world-rocket-app', - 'postgres-rocket-app', - 'hello-world-axum-app', - 'websocket-axum-app', - 'authentication-rocket-app', - 'hello-world-tide-app', - 'hello-world-tower-app', -]