Skip to content

Commit

Permalink
migrate forge interfaces to use rest client
Browse files Browse the repository at this point in the history
Closes: #10053
  • Loading branch information
Xiao Li authored and bors-libra committed Dec 20, 2021
1 parent f57e1e1 commit bf16231
Show file tree
Hide file tree
Showing 32 changed files with 1,102 additions and 984 deletions.
51 changes: 42 additions & 9 deletions crates/debug-interface/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,28 +83,61 @@ impl NodeDebugClient {
/// Implement default utility client for AsyncNodeDebugInterface
pub struct AsyncNodeDebugClient {
client: reqwest::Client,
addr: String,
url: Url,
}

impl AsyncNodeDebugClient {
/// Create AsyncNodeDebugInterface from a valid socket address.
pub fn new<A: AsRef<str>>(client: reqwest::Client, address: A, port: u16) -> Self {
let addr = format!("http://{}:{}", address.as_ref(), port);

Self { client, addr }
Self {
client,
url: Url::parse(&addr).unwrap(),
}
}

pub fn from_url(url: Url) -> Self {
let client = reqwest::Client::new();

Self { client, url }
}

pub async fn get_node_metric<S: AsRef<str>>(&mut self, metric: S) -> Result<Option<i64>> {
pub async fn get_node_metric<S: AsRef<str>>(&self, metric: S) -> Result<Option<i64>> {
let metrics = self.get_node_metrics().await?;
Ok(metrics.get(metric.as_ref()).cloned())
}

pub async fn get_node_metrics(&mut self) -> Result<HashMap<String, i64>> {
let response = self
.client
.get(&format!("{}/metrics", self.addr))
.send()
.await?;
/// Retrieves all node metrics for a given metric name. Allows for filtering metrics by fields afterwards.
pub async fn get_node_metric_with_name(
&self,
metric: &str,
) -> Result<Option<HashMap<String, i64>>> {
let metrics = self.get_node_metrics().await?;
let search_string = format!("{}{{", metric);

let result: HashMap<_, _> = metrics
.iter()
.filter_map(|(key, value)| {
if key.starts_with(&search_string) {
Some((key.clone(), *value))
} else {
None
}
})
.collect();

if result.is_empty() {
Ok(None)
} else {
Ok(Some(result))
}
}

pub async fn get_node_metrics(&self) -> Result<HashMap<String, i64>> {
let mut url = self.url.clone();
url.set_path("metrics");
let response = self.client.get(url).send().await?;

response
.json::<HashMap<String, String>>()
Expand Down
2 changes: 1 addition & 1 deletion crates/diem-rest-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ impl Client {
.await?;

if !response.status().is_success() {
return Err(anyhow::anyhow!("health check failed",));
return Err(anyhow::anyhow!("health check failed"));
}

Ok(())
Expand Down
31 changes: 19 additions & 12 deletions testsuite/forge-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ fn main() -> Result<()> {
global_emit_job_request.workers_per_endpoint(workers_per_endpoint);
}

let runtime = Runtime::new()?;
match args.cli_cmd {
// cmd input for test
CliCommand::Test(test_cmd) => match test_cmd {
Expand Down Expand Up @@ -215,23 +216,27 @@ fn main() -> Result<()> {
),
OperatorCommand::CleanUp(cleanup) => {
uninstall_from_k8s_cluster()?;
set_eks_nodegroup_size(cleanup.cluster_name, 0, cleanup.auth_with_k8s_env)
runtime.block_on(set_eks_nodegroup_size(
cleanup.cluster_name,
0,
cleanup.auth_with_k8s_env,
))
}
OperatorCommand::Resize(resize) => {
set_eks_nodegroup_size(
runtime.block_on(set_eks_nodegroup_size(
resize.cluster_name,
resize.num_validators,
resize.auth_with_k8s_env,
)?;
))?;
uninstall_from_k8s_cluster()?;
clean_k8s_cluster(
runtime.block_on(clean_k8s_cluster(
resize.helm_repo,
resize.num_validators,
resize.validator_image_tag,
resize.testnet_image_tag,
resize.require_validator_healthcheck,
resize.move_modules_dir,
)?;
))?;
Ok(())
}
},
Expand Down Expand Up @@ -535,13 +540,15 @@ impl Test for RestartValidator {

impl NetworkTest for RestartValidator {
fn run<'t>(&self, ctx: &mut NetworkContext<'t>) -> Result<()> {
let node = ctx.swarm().validators_mut().next().unwrap();
node.health_check().expect("node health check failed");
node.stop()?;
println!("Restarting node {}", node.peer_id());
node.start()?;
node.health_check().expect("node health check failed");

let runtime = Runtime::new()?;
runtime.block_on(async {
let node = ctx.swarm().validators_mut().next().unwrap();
node.health_check().await.expect("node health check failed");
node.stop().unwrap();
println!("Restarting node {}", node.peer_id());
node.start().await.unwrap();
node.health_check().await.expect("node health check failed");
});
Ok(())
}
}
Expand Down
159 changes: 84 additions & 75 deletions testsuite/forge/src/backend/k8s/cluster_helper.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
// Copyright (c) The Diem Core Contributors
// SPDX-License-Identifier: Apache-2.0

use crate::{get_validators, k8s_retry_strategy, nodes_healthcheck, Result, Validator};
use crate::{get_validators, k8s_retry_strategy, nodes_healthcheck, Result};
use anyhow::{bail, format_err};
use diem_logger::*;
use futures::future::try_join_all;
use hyper::{Client, Uri};
use hyper_proxy::{Intercept, Proxy, ProxyConnector};
use hyper_tls::HttpsConnector;
Expand All @@ -29,7 +30,6 @@ use std::{
str,
};
use tempfile::TempDir;
use tokio::runtime::Runtime;

const HELM_BIN: &str = "helm";
const KUBECTL_BIN: &str = "kubectl";
Expand Down Expand Up @@ -222,7 +222,7 @@ pub fn uninstall_from_k8s_cluster() -> Result<()> {
Ok(())
}

pub fn clean_k8s_cluster(
pub async fn clean_k8s_cluster(
helm_repo: String,
base_num_validators: usize,
base_validator_image_tag: String,
Expand Down Expand Up @@ -331,23 +331,20 @@ pub fn clean_k8s_cluster(
upgrade_testnet(&helm_repo, &testnet_upgrade_options)?;

// wait for genesis to run again, and get the updated validators
let rt = Runtime::new().unwrap();
let mut validators = rt.block_on(async {
let kube_client = create_k8s_client().await;
wait_genesis_job(&kube_client, &new_era).await.unwrap();
let vals = get_validators(kube_client.clone(), &base_validator_image_tag)
.await
.unwrap();
vals
});
let all_nodes = Box::new(validators.values_mut().map(|v| v as &mut dyn Validator));

let kube_client = create_k8s_client().await;
wait_genesis_job(&kube_client, &new_era).await.unwrap();
let vals = get_validators(kube_client.clone(), &base_validator_image_tag)
.await
.unwrap();
let all_nodes = vals.values().collect();
// healthcheck on each of the validators wait until they all healthy
if require_validator_healthcheck {
let unhealthy_nodes = nodes_healthcheck(all_nodes).unwrap();
if !unhealthy_nodes.is_empty() {
bail!("Unhealthy validators after cleanup: {:?}", unhealthy_nodes);
}
let unhealthy_nodes = if require_validator_healthcheck {
nodes_healthcheck(all_nodes).await.unwrap()
} else {
vec![]
};
if !unhealthy_nodes.is_empty() {
bail!("Unhealthy validators after cleanup: {:?}", unhealthy_nodes);
}

Ok(new_era)
Expand Down Expand Up @@ -454,7 +451,7 @@ async fn submit_update_nodegroup_config_request(
Ok(update_id)
}

pub fn set_eks_nodegroup_size(
pub async fn set_eks_nodegroup_size(
cluster_name: String,
num_validators: usize,
auth_with_k8s_env: bool,
Expand Down Expand Up @@ -511,85 +508,97 @@ pub fn set_eks_nodegroup_size(
* (VALIDATOR_SCALING_FACTOR + UTILITIES_SCALING_FACTOR + TRUSTED_SCALING_FACTOR);

// submit the scaling requests
let rt = Runtime::new()?;
let validators_update_id = rt.block_on(submit_update_nodegroup_config_request(
let validators_update_id = submit_update_nodegroup_config_request(
&eks_client,
&cluster_name,
"validators",
validator_scaling,
))?;
let utilities_update_id = rt.block_on(submit_update_nodegroup_config_request(
)
.await?;
let utilities_update_id = submit_update_nodegroup_config_request(
&eks_client,
&cluster_name,
"utilities",
utilities_scaling,
))?;
let trusted_update_id = rt.block_on(submit_update_nodegroup_config_request(
)
.await?;
let trusted_update_id = submit_update_nodegroup_config_request(
&eks_client,
&cluster_name,
"trusted",
trusted_scaling,
))?;
)
.await?;

// wait for nodegroup updates
let updates: Vec<(&str, &str)> = vec![
("validators", &validators_update_id),
("utilities", &utilities_update_id),
("trusted", &trusted_update_id),
];
updates
.into_par_iter()
.for_each(|(nodegroup_name, update_id)| {
let rt = Runtime::new().unwrap();
rt.block_on(async {
diem_retrier::retry_async(k8s_retry_strategy(), || {
let client = eks_client.clone();
let describe_update_request = DescribeUpdateRequest {
addon_name: None,
name: cluster_name.clone(),
nodegroup_name: Some(nodegroup_name.to_string()),
update_id: update_id.to_string(),
};
Box::pin(async move {
let describe_update =
match client.describe_update(describe_update_request).await {
Ok(resp) => resp.update.unwrap(),
Err(err) => bail!(err),
};
if let Some(s) = describe_update.status {
match s.as_str() {
"Failed" => bail!("Nodegroup update failed"),
"Successful" => {
println!(
"{} nodegroup update {} successful!!!",
&nodegroup_name, update_id
);
Ok(())
}
&_ => {
println!(
"Waiting for {} update {}: {} ...",
&nodegroup_name, update_id, s
);
bail!("Waiting for valid update status")
}
}
} else {
bail!("Failed to describe nodegroup update")
}
})
})
.await
})
.unwrap();
});
try_join_all(updates.into_iter().map(|(nodegroup_name, update_id)| {
describe_update(
eks_client.clone(),
cluster_name.clone(),
nodegroup_name.to_string(),
update_id.to_string(),
)
}))
.await
.unwrap();

rt.block_on(async { nodegroup_state_check(desire_nodegroup_size).await })
.unwrap();
nodegroup_state_check(desire_nodegroup_size).await.unwrap();

Ok(())
}

async fn describe_update(
eks_client: EksClient,
cluster_name: String,
nodegroup_name: String,
update_id: String,
) -> Result<()> {
diem_retrier::retry_async(k8s_retry_strategy(), || {
let client = eks_client.clone();
let nodegroup_name = nodegroup_name.clone();
let update_id = update_id.clone();
let request = DescribeUpdateRequest {
addon_name: None,
name: cluster_name.clone(),
nodegroup_name: Some(nodegroup_name.clone()),
update_id: update_id.clone(),
};
Box::pin(async move {
let describe_update = match client.describe_update(request).await {
Ok(resp) => resp.update.unwrap(),
Err(err) => bail!(err),
};
if let Some(s) = describe_update.status {
match s.as_str() {
"Failed" => bail!("Nodegroup update failed"),
"Successful" => {
println!(
"{} nodegroup update {} successful!!!",
&nodegroup_name, update_id
);
Ok(())
}
&_ => {
println!(
"Waiting for {} update {}: {} ...",
&nodegroup_name, update_id, s
);
bail!("Waiting for valid update status")
}
}
} else {
bail!("Failed to describe nodegroup update")
}
})
})
.await
}

pub fn scale_sts_replica(sts_name: &str, replica_num: u64) -> Result<()> {
let scale_sts_args = [
"scale",
Expand Down
Loading

0 comments on commit bf16231

Please sign in to comment.