Skip to content

Commit

Permalink
fix: support restart sqlness in distributed mode (GreptimeTeam#1443)
Browse files Browse the repository at this point in the history
* fix: support restart sqlness in distributed mode

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* move alter_table case to common dir

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* is_standalone flag

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Update tests/runner/src/env.rs

Co-authored-by: LFC <bayinamine@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: LFC <bayinamine@gmail.com>
  • Loading branch information
waynexia and MichaelScofield authored Apr 24, 2023
1 parent 6d247f7 commit b9db2cf
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 93 deletions.
46 changes: 0 additions & 46 deletions tests/cases/distributed/alter/alter_table.result

This file was deleted.

13 changes: 0 additions & 13 deletions tests/cases/distributed/alter/alter_table.sql

This file was deleted.

69 changes: 35 additions & 34 deletions tests/runner/src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,8 @@ impl Env {

let db_ctx = GreptimeDBContext::new();

let mut server_process = Self::start_server("standalone", &db_ctx, true);
let server_process = Self::start_server("standalone", &db_ctx, true).await;

let is_up = util::check_port(SERVER_ADDR.parse().unwrap(), Duration::from_secs(10)).await;
if !is_up {
Env::stop_server(&mut server_process).await;
panic!("Server doesn't up in 10 seconds, quit.")
}
println!("Started, going to test. Log will be write to {STANDALONE_LOG_FILE}");

let client = Client::with_urls(vec![SERVER_ADDR]);
Expand All @@ -95,6 +90,7 @@ impl Env {
frontend_process: None,
client: Mutex::new(db),
ctx: db_ctx,
is_standalone: true,
}
}

Expand All @@ -104,25 +100,9 @@ impl Env {
let db_ctx = GreptimeDBContext::new();

// start a distributed GreptimeDB
let mut meta_server = Env::start_server("metasrv", &db_ctx, true);
if !util::check_port(METASRV_ADDR.parse().unwrap(), Duration::from_secs(10)).await {
Env::stop_server(&mut meta_server).await;
panic!("Metasrv doesn't up in 10 seconds, quit.")
}

let mut datanode = Env::start_server("datanode", &db_ctx, true);
// Wait for default catalog and schema being created.
// Can be removed once #1265 resolved, and merged with Frontend start checking below.
if !util::check_port(DATANODE_ADDR.parse().unwrap(), Duration::from_secs(10)).await {
Env::stop_server(&mut datanode).await;
panic!("Datanode doesn't up in 10 seconds, quit.")
}

let mut frontend = Env::start_server("frontend", &db_ctx, true);
if !util::check_port(SERVER_ADDR.parse().unwrap(), Duration::from_secs(10)).await {
Env::stop_server(&mut frontend).await;
panic!("Frontend doesn't up in 10 seconds, quit.")
}
let meta_server = Env::start_server("metasrv", &db_ctx, true).await;
let datanode = Env::start_server("datanode", &db_ctx, true).await;
let frontend = Env::start_server("frontend", &db_ctx, true).await;

let client = Client::with_urls(vec![SERVER_ADDR]);
let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
Expand All @@ -133,6 +113,7 @@ impl Env {
frontend_process: Some(frontend),
client: Mutex::new(db),
ctx: db_ctx,
is_standalone: false,
}
}

Expand All @@ -141,7 +122,11 @@ impl Env {
let _ = process.wait().await;
}

fn start_server(subcommand: &str, db_ctx: &GreptimeDBContext, truncate_log: bool) -> Child {
async fn start_server(
subcommand: &str,
db_ctx: &GreptimeDBContext,
truncate_log: bool,
) -> Child {
let log_file_name = match subcommand {
"datanode" => DATANODE_LOG_FILE,
"frontend" => FRONTEND_LOG_FILE,
Expand Down Expand Up @@ -178,27 +163,42 @@ impl Env {
_ => panic!("Unexpected subcommand: {subcommand}"),
}

let process = Command::new("./greptime")
let mut process = Command::new("./greptime")
.current_dir(util::get_binary_dir("debug"))
.args(args)
.stdout(log_file)
.spawn()
.unwrap_or_else(|_| panic!("Failed to start the DB with subcommand {subcommand}"));

// check connection
let ip_addr = match subcommand {
"datanode" => DATANODE_ADDR,
"frontend" => SERVER_ADDR,
"metasrv" => METASRV_ADDR,
"standalone" => SERVER_ADDR,
_ => panic!("Unexpected subcommand: {subcommand}"),
};
if !util::check_port(ip_addr.parse().unwrap(), Duration::from_secs(10)).await {
Env::stop_server(&mut process).await;
panic!("{subcommand} doesn't up in 10 seconds, quit.")
}

process
}

/// stop and restart the server process
async fn restart_server(db: &GreptimeDB) {
let mut server_process = db.server_process.lock().await;
Env::stop_server(&mut server_process).await;
let new_server_process = Env::start_server("standalone", &db.ctx, false);
*server_process = new_server_process;

let is_up = util::check_port(SERVER_ADDR.parse().unwrap(), Duration::from_secs(2)).await;
if !is_up {
Env::stop_server(&mut server_process).await;
panic!("Server doesn't up in 10 seconds, quit.")
}
// check if the server is distributed or standalone
let subcommand = if db.is_standalone {
"standalone"
} else {
"datanode"
};
let new_server_process = Env::start_server(subcommand, &db.ctx, false).await;
*server_process = new_server_process;
}

/// Generate config file to `/tmp/{subcommand}-{current_time}.toml`
Expand Down Expand Up @@ -254,6 +254,7 @@ pub struct GreptimeDB {
frontend_process: Option<Child>,
client: Mutex<DB>,
ctx: GreptimeDBContext,
is_standalone: bool,
}

#[async_trait]
Expand Down

0 comments on commit b9db2cf

Please sign in to comment.