Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc benchmark: json rpc benchmark #20912

Open
wants to merge 6 commits into
base: grafana-pull
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/sui-rpc-benchmark/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ clap = { workspace = true, features = ["derive"] }
dashmap.workspace = true
futures.workspace = true
rand.workspace = true
reqwest.workspace = true
serde.workspace = true
serde_json.workspace = true
sui-indexer-alt-framework.workspace = true
telemetry-subscribers.workspace = true
tracing.workspace = true
Expand Down
129 changes: 129 additions & 0 deletions crates/sui-rpc-benchmark/src/bin/pull_grafana_logs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

/// This script pulls JSON RPC read logs from Grafana, extracts JSON bodies,
/// and groups them by RPC "method" for later replay and analysis.
use reqwest::header::{ACCEPT, AUTHORIZATION};
use serde::Deserialize;
use serde_json::Value;
use std::collections::HashMap;
use std::env;
use std::error::Error;
use std::fs::File;
use std::io::{BufWriter, Write};
use std::process;
use tracing::{debug, error, info, warn};

#[derive(Debug, Deserialize)]
struct GrafanaLog {
message: String,
}

fn extract_body_from_message(message: &str) -> Option<String> {
if let Some(body_start) = message.find("body=") {
if let Some(peer_type_start) = message.find(" peer_type=") {
let raw_body = &message[(body_start + 5)..peer_type_start].trim();
if raw_body.starts_with('b') {
let trimmed = raw_body.trim_start_matches('b').trim_matches('"');
let unescaped = trimmed.replace("\\\"", "\"");
return Some(unescaped);
}
}
}
None
}

#[tokio::main]
async fn main() {
let _guard = telemetry_subscribers::TelemetryConfig::new()
.with_env()
.init();
if let Err(e) = run().await {
error!("Error: {}", e);
process::exit(1);
}
}

async fn run() -> Result<(), Box<dyn Error>> {
let grafana_url = env::var("GRAFANA_LOGS_URL")
.unwrap_or_else(|_| "https://metrics.sui.io/loki/api/v1/query_range".to_string());
let grafana_token = env::var("GRAFANA_API_TOKEN").unwrap_or_else(|_| "".to_string());

let net = env::var("NET").unwrap_or_else(|_| "mainnet".to_string());
let namespace = if net == "testnet" {
"rpc-testnet".to_string()
} else if net == "mainnet" {
"rpc-mainnet".to_string()
} else {
"UNKNOWN_NET".to_string()
};
let substring = env::var("SUBSTRING").unwrap_or_else(|_| "Sampled read request".to_string());
let query = format!(
r#"{{namespace="{}", container="sui-edge-proxy-mysten"}} |= "{}""#,
namespace, substring
);
debug!("Query: {}", query);

let start = env::var("START").unwrap_or_else(|_| "now-1h".to_string());
let end = env::var("END").unwrap_or_else(|_| "now".to_string());

let client = reqwest::Client::new();
let mut query_params = vec![
("query", query.as_str()),
("start", start.as_str()),
("end", end.as_str()),
];
let limit = env::var("LIMIT").ok();
if let Some(ref l) = limit {
query_params.push(("limit", l));
}

let resp = client
.get(&grafana_url)
.header(ACCEPT, "application/json")
.header(AUTHORIZATION, format!("Bearer {}", grafana_token))
.query(&query_params)
.send()
.await?;

if !resp.status().is_success() {
warn!("Request failed with status: {}", resp.status());
return Ok(());
} else {
info!("Request succeeded with status: {}", resp.status());
debug!("Response: {:?}", resp);
}

let logs: Vec<GrafanaLog> = resp.json().await?;
info!("Found {} logs.", logs.len());

let mut method_map: HashMap<String, Vec<String>> = HashMap::new();
for log_entry in logs {
if let Some(body_content) = extract_body_from_message(&log_entry.message) {
if let Ok(parsed) = serde_json::from_str::<Value>(&body_content) {
let method = parsed
.get("method")
.and_then(|m| m.as_str())
.unwrap_or("unknown_method")
.to_string();
method_map.entry(method).or_default().push(body_content);
}
}
}

let file = File::create("sampled_read_requests.jsonl")?;
let mut writer = BufWriter::new(file);

for (method, bodies) in method_map {
info!("Writing {} logs for method: {}", bodies.len(), method);
for body in bodies {
let line = format!(r#"{{"method":"{}", "body":{}}}"#, method, body);
writer.write_all(line.as_bytes())?;
writer.write_all(b"\n")?;
}
}

writer.flush()?;
info!("Done! Wrote grouped logs to sampled_read_requests.jsonl");
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,24 @@

use std::time::Duration;

#[derive(Debug, Clone)]
pub struct BenchmarkConfig {
/// Number of concurrent clients
pub concurrency: usize,
/// Duration to run the benchmark in seconds
pub duration: Duration,
/// Optional path to a jsonl file for JSON RPC benchmarks.
/// The file contains a list of JSON RPC requests that are collected from Grafana,
/// and will be run concurrently by the JSON RPC benchmark runner.
pub json_rpc_file_path: Option<String>,
}

impl Default for BenchmarkConfig {
fn default() -> Self {
Self {
concurrency: 50,
duration: Duration::from_secs(30),
json_rpc_file_path: None,
}
}
}
1 change: 0 additions & 1 deletion crates/sui-rpc-benchmark/src/direct/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

pub mod benchmark_config;
pub mod metrics;
pub mod query_executor;
pub mod query_generator;
2 changes: 1 addition & 1 deletion crates/sui-rpc-benchmark/src/direct/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use sui_indexer_alt_framework::task::TrySpawnStreamExt;
use tokio_postgres::{types::ToSql, types::Type, NoTls, Row};
use tracing::info;

use crate::direct::benchmark_config::BenchmarkConfig;
use crate::config::BenchmarkConfig;
use crate::direct::metrics::{BenchmarkResult, MetricsCollector};
use crate::direct::query_generator::BenchmarkQuery;

Expand Down
59 changes: 59 additions & 0 deletions crates/sui-rpc-benchmark/src/json_rpc/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::config::BenchmarkConfig;
use anyhow::Result;
use request_loader::load_json_rpc_requests;
use runner::run_queries;
use std::time::Duration;
use tracing::info;

pub mod request_loader;
pub mod runner;

pub async fn run_benchmark(
endpoint: &str,
file_path: &str,
concurrency: usize,
duration_secs: u64,
) -> Result<()> {
let config = BenchmarkConfig {
concurrency,
duration: Duration::from_secs(duration_secs),
gegaowp marked this conversation as resolved.
Show resolved Hide resolved
json_rpc_file_path: Some(file_path.to_string()),
};

info!("Loading JSON RPC requests from {}", file_path);
let requests = load_json_rpc_requests(file_path)?;
info!("Loaded {} requests", requests.len());

let metrics = run_queries(endpoint, &requests, &config).await?;
info!("Benchmark results:");
info!("=== Overall Statistics ===");
info!("Total requests sent: {}", metrics.total_sent);
info!("Total errors: {}", metrics.total_errors);
if metrics.total_sent > 0 {
let avg_latency = metrics.total_latency_ms / metrics.total_sent as f64;
info!("Average latency: {:.2}ms", avg_latency);
let success_rate = ((metrics.total_sent - metrics.total_errors) as f64
/ metrics.total_sent as f64)
* 100.0;
info!("Success rate: {:.1}%", success_rate);
}
info!("=== Per-Method Statistics ===");
let mut methods: Vec<_> = metrics.per_method.iter().collect();
methods.sort_by_key(|(method, _)| *method);
for (method, stats) in methods {
info!("Method: {}", method);
info!(" Requests: {}", stats.total_sent);
info!(" Errors: {}", stats.total_errors);
if stats.total_sent > 0 {
let method_avg_latency = stats.total_latency_ms / stats.total_sent as f64;
let method_success_rate =
((stats.total_sent - stats.total_errors) as f64 / stats.total_sent as f64) * 100.0;
info!(" Avg latency: {:.2}ms", method_avg_latency);
info!(" Success rate: {:.1}%", method_success_rate);
}
}
Ok(())
}
34 changes: 34 additions & 0 deletions crates/sui-rpc-benchmark/src/json_rpc/request_loader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

/// This module implements the request loader, which is used to load
/// the JSON RPC requests from a jsonl file.
use anyhow::{Context, Result};
use serde::Deserialize;
use std::{
fs::File,
io::{BufRead, BufReader},
};

#[derive(Clone, Debug, Deserialize)]
pub struct JsonRpcRequestLine {
pub method: String,
#[serde(rename = "body")]
pub body_json: serde_json::Value,
}

pub fn load_json_rpc_requests(file_path: &str) -> Result<Vec<JsonRpcRequestLine>> {
let file = File::open(file_path)
.with_context(|| format!("Could not open JSON RPC file at {}", file_path))?;
let reader = BufReader::new(file);

let mut requests = Vec::new();
for line in reader.lines() {
let line = line?;
let request_line: JsonRpcRequestLine =
serde_json::from_str(&line).with_context(|| "Failed to parse JSON RPC line")?;
requests.push(request_line);
}

Ok(requests)
}
104 changes: 104 additions & 0 deletions crates/sui-rpc-benchmark/src/json_rpc/runner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

/// This module implements the JSON RPC benchmark runner.
/// The main function is `run_queries`, which runs the queries concurrently
/// and records the overall and per-method stats.
use anyhow::Result;
use std::{
collections::HashMap,
sync::{Arc, Mutex},
time::Instant,
};
use sui_indexer_alt_framework::task::TrySpawnStreamExt;
use tokio::time::timeout;

use super::request_loader::JsonRpcRequestLine;
use crate::config::BenchmarkConfig;

#[derive(Clone, Default)]
pub struct PerMethodStats {
pub total_sent: usize,
pub total_errors: usize,
// record total latency and calculate average latency later to avoid duplicate calculations
pub total_latency_ms: f64,
}

#[derive(Clone, Default)]
pub struct JsonRpcStats {
pub total_sent: usize,
pub total_errors: usize,
// record total latency and calculate average latency to avoid duplicate calculations
pub total_latency_ms: f64,
pub per_method: HashMap<String, PerMethodStats>,
}

impl JsonRpcStats {
pub fn new() -> Self {
Self::default()
}

fn record_request(&mut self, method: &str, latency_ms: f64, is_error: bool) {
self.total_sent += 1;
self.total_latency_ms += latency_ms;
if is_error {
self.total_errors += 1;
}

let method_stats = self.per_method.entry(method.to_string()).or_default();
method_stats.total_sent += 1;
method_stats.total_latency_ms += latency_ms;
if is_error {
method_stats.total_errors += 1;
}
}
}

pub async fn run_queries(
endpoint: &str,
requests: &[JsonRpcRequestLine],
config: &BenchmarkConfig,
) -> Result<JsonRpcStats> {
let concurrency = config.concurrency;
let shared_stats = Arc::new(Mutex::new(JsonRpcStats::new()));
let client = reqwest::Client::new();
let endpoint = endpoint.to_owned();
let requests = requests.to_vec();
let stats = shared_stats.clone();

let stream = futures::stream::iter(requests.into_iter().map(move |request_line| {
let task_stats = stats.clone();
let client = client.clone();
let endpoint = endpoint.clone();
async move {
let now = Instant::now();
let res = timeout(
std::time::Duration::from_secs(10),
client.post(&endpoint).json(&request_line.body_json).send(),
)
.await;

let elapsed_ms = now.elapsed().as_millis() as f64;
let is_error = !matches!(res, Ok(Ok(ref resp)) if resp.status().is_success());

let mut stats = task_stats
.lock()
.map_err(|e| anyhow::anyhow!("Failed to acquire stats lock: {}", e))?;
stats.record_request(&request_line.method, elapsed_ms, is_error);
Ok::<(), anyhow::Error>(())
}
}));

timeout(
config.duration,
stream.try_for_each_spawned(concurrency, |fut| fut),
)
.await
.unwrap_or(Ok(()))?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is .unwrap_or(Ok(()) basically unwrap()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not really, per the timeout doc

/// If the future completes before the duration has elapsed, then the completed
/// value is returned. Otherwise, an error is returned and the future is
/// canceled.

so .unwrap_or(Ok(()) intends to catch the early-termination-error and exit gracefully instead of panic right away -- meanwhile propagating the errors from try_for_each_spawned if anything


let final_stats = shared_stats
.lock()
.map_err(|e| anyhow::anyhow!("Failed to acquire stats lock for final results: {}", e))?
.clone();
Ok(final_stats)
}
Loading
Loading