Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc-alt: metrics #20728

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
indexer-alt: factor out MetricsService
## Description

The RPC and Indexer implementations use essentially the same metrics
service, and when they are combined in a single process they need to use
the same instance of that metrics service as well. This change starts
that process, by pulling out the implementation from `sui-indexer-alt`.

In a later change, the metrics service in the RPC implementation will
switch over to this common crate as well. This unblocks running both
services together in one process, which is something we will need to do
as part of E2E tests.

## Test plan

Run the indexer and make sure it can be cancelled (with Ctrl-C), and
that it also shuts down cleanly when run to a specific last checkpoint.
  • Loading branch information
amnn committed Dec 22, 2024
commit d0f16e0a2c6c2fcf21a4908a2897014ca7b28444
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ members = [
"crates/sui-indexer-alt",
"crates/sui-indexer-alt-framework",
"crates/sui-indexer-alt-jsonrpc",
"crates/sui-indexer-alt-metrics",
"crates/sui-indexer-alt-schema",
"crates/sui-indexer-builder",
"crates/sui-json",
Expand Down Expand Up @@ -644,6 +645,7 @@ sui-genesis-builder = { path = "crates/sui-genesis-builder" }
sui-indexer = { path = "crates/sui-indexer" }
sui-indexer-alt-framework = { path = "crates/sui-indexer-alt-framework" }
sui-indexer-alt-jsonrpc = { path = "crates/sui-indexer-alt-jsonrpc" }
sui-indexer-alt-metrics = { path = "crates/sui-indexer-alt-metrics" }
sui-indexer-alt-schema = { path = "crates/sui-indexer-alt-schema" }
sui-indexer-builder = { path = "crates/sui-indexer-builder" }
sui-json = { path = "crates/sui-json" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ pub(crate) mod tests {
use crate::ingestion::client::IngestionClient;
use crate::ingestion::test_utils::test_checkpoint_data;
use crate::metrics::tests::test_metrics;
use std::sync::Arc;
use sui_storage::blob::{Blob, BlobEncoding};
use tokio_util::sync::CancellationToken;

Expand All @@ -51,8 +50,7 @@ pub(crate) mod tests {
let test_checkpoint = test_checkpoint_data(1);
tokio::fs::write(&path, &test_checkpoint).await.unwrap();

let metrics = Arc::new(test_metrics());
let local_client = IngestionClient::new_local(tempdir, metrics);
let local_client = IngestionClient::new_local(tempdir, test_metrics());
let checkpoint = local_client
.fetch(1, &CancellationToken::new())
.await
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer-alt-framework/src/ingestion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ mod tests {
ingest_concurrency,
..Default::default()
},
Arc::new(test_metrics()),
test_metrics(),
cancel,
)
.unwrap()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ pub(crate) mod tests {
use crate::ingestion::test_utils::test_checkpoint_data;
use crate::metrics::tests::test_metrics;
use axum::http::StatusCode;
use std::sync::{Arc, Mutex};
use std::sync::Mutex;
use tokio_util::sync::CancellationToken;
use wiremock::{
matchers::{method, path_regex},
Expand All @@ -133,7 +133,7 @@ pub(crate) mod tests {
}

fn remote_test_client(uri: String) -> IngestionClient {
IngestionClient::new_remote(Url::parse(&uri).unwrap(), Arc::new(test_metrics())).unwrap()
IngestionClient::new_remote(Url::parse(&uri).unwrap(), test_metrics()).unwrap()
}

fn assert_http_error(error: Error, checkpoint: u64, code: StatusCode) {
Expand Down
45 changes: 7 additions & 38 deletions crates/sui-indexer-alt-framework/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{collections::BTreeSet, net::SocketAddr, sync::Arc};
use std::{collections::BTreeSet, sync::Arc};

use anyhow::{ensure, Context, Result};
use diesel::{
Expand All @@ -10,12 +10,13 @@ use diesel::{
};
use diesel_migrations::{embed_migrations, EmbeddedMigrations};
use ingestion::{client::IngestionClient, ClientArgs, IngestionConfig, IngestionService};
use metrics::{IndexerMetrics, MetricsService};
use metrics::{DbConnectionStatsCollector, IndexerMetrics};
use pipeline::{
concurrent::{self, ConcurrentConfig},
sequential::{self, SequentialConfig},
Processor,
};
use prometheus::Registry;
use sui_pg_db::{Db, DbArgs};
use task::graceful_shutdown;
use tokio::task::JoinHandle;
Expand All @@ -33,7 +34,7 @@ pub(crate) mod watermarks;
const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations");

/// Command-line arguments for the indexer
#[derive(clap::Args, Debug, Clone)]
#[derive(clap::Args, Default, Debug, Clone)]
pub struct IndexerArgs {
/// Override for the checkpoint to start ingestion from -- useful for backfills. By default,
/// ingestion will start just after the lowest checkpoint watermark across all active
Expand All @@ -54,10 +55,6 @@ pub struct IndexerArgs {
/// Don't write to the watermark tables for concurrent pipelines.
#[arg(long)]
pub skip_watermark: bool,

/// Address to serve Prometheus Metrics from.
#[arg(long, default_value_t = Self::default().metrics_address)]
pub metrics_address: SocketAddr,
}

pub struct Indexer {
Expand All @@ -67,9 +64,6 @@ pub struct Indexer {
/// Prometheus Metrics.
metrics: Arc<IndexerMetrics>,

/// Service for serving Prometheus metrics.
metrics_service: MetricsService,

/// Service for downloading and disseminating checkpoint data.
ingestion_service: IngestionService,

Expand Down Expand Up @@ -125,14 +119,14 @@ impl Indexer {
client_args: ClientArgs,
ingestion_config: IngestionConfig,
migrations: &'static EmbeddedMigrations,
registry: &Registry,
cancel: CancellationToken,
) -> Result<Self> {
let IndexerArgs {
first_checkpoint,
last_checkpoint,
pipeline,
skip_watermark,
metrics_address,
} = indexer_args;

let db = Db::for_write(db_args)
Expand All @@ -144,8 +138,8 @@ impl Indexer {
.await
.context("Failed to run pending migrations")?;

let (metrics, metrics_service) =
MetricsService::new(metrics_address, db.clone(), cancel.clone())?;
let metrics = IndexerMetrics::new(registry);
registry.register(Box::new(DbConnectionStatsCollector::new(db.clone())))?;

let ingestion_service = IngestionService::new(
client_args,
Expand All @@ -157,7 +151,6 @@ impl Indexer {
Ok(Self {
db,
metrics,
metrics_service,
ingestion_service,
first_checkpoint,
last_checkpoint,
Expand Down Expand Up @@ -301,12 +294,6 @@ impl Indexer {
);
}

let metrics_handle = self
.metrics_service
.run()
.await
.context("Failed to start metrics service")?;

// If an override has been provided, start ingestion from there, otherwise start ingestion
// from just after the lowest committer watermark across all enabled pipelines.
let first_checkpoint = self
Expand All @@ -326,19 +313,13 @@ impl Indexer {
self.handles.push(regulator_handle);
self.handles.push(broadcaster_handle);

let cancel = self.cancel.clone();
Ok(tokio::spawn(async move {
// Wait for the ingestion service and all its related tasks to wind down gracefully:
// If ingestion has been configured to only handle a specific range of checkpoints, we
// want to make sure that tasks are allowed to run to completion before shutting them
// down.
graceful_shutdown(self.handles, self.cancel).await;

info!("Indexing pipeline gracefully shut down");

// Pick off any stragglers (in this case, just the metrics service).
cancel.cancel();
metrics_handle.await.unwrap();
}))
}

Expand Down Expand Up @@ -396,15 +377,3 @@ impl Indexer {
Ok(Some(watermark))
}
}

impl Default for IndexerArgs {
fn default() -> Self {
Self {
first_checkpoint: None,
last_checkpoint: None,
pipeline: vec![],
skip_watermark: false,
metrics_address: "0.0.0.0:9184".parse().unwrap(),
}
}
}
89 changes: 11 additions & 78 deletions crates/sui-indexer-alt-framework/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,18 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{
net::SocketAddr,
sync::{atomic::AtomicU64, Arc},
};
use std::sync::{atomic::AtomicU64, Arc};

use anyhow::Result;
use axum::{extract::Extension, http::StatusCode, routing::get, Router};
use prometheus::{
core::{Collector, Desc},
proto::{Counter, Gauge, LabelPair, Metric, MetricFamily, MetricType, Summary},
register_histogram_vec_with_registry, register_histogram_with_registry,
register_int_counter_vec_with_registry, register_int_counter_with_registry,
register_int_gauge_vec_with_registry, register_int_gauge_with_registry, Histogram,
HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry, TextEncoder,
HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
};
use sui_pg_db::Db;
use tokio::{net::TcpListener, task::JoinHandle};
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};
use tracing::warn;

use crate::{ingestion::error::Error, pipeline::Processor};

Expand Down Expand Up @@ -52,13 +45,6 @@ const BATCH_SIZE_BUCKETS: &[f64] = &[
1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0,
];

/// Service to expose prometheus metrics from the indexer.
pub(crate) struct MetricsService {
addr: SocketAddr,
registry: Registry,
cancel: CancellationToken,
}

#[derive(Clone)]
pub(crate) struct IndexerMetrics {
// Statistics related to fetching data from the remote store.
Expand Down Expand Up @@ -149,7 +135,7 @@ pub(crate) struct IndexerMetrics {
}

/// Collects information about the database connection pool.
struct DbConnectionStatsCollector {
pub(crate) struct DbConnectionStatsCollector {
db: Db,
desc: Vec<(MetricType, Desc)>,
}
Expand All @@ -163,57 +149,13 @@ pub(crate) struct CheckpointLagMetricReporter {
latest_checkpoint_time_lag_gauge: IntGauge,
/// Metric to report the sequence number of the checkpoint with the highest sequence number observed so far.
latest_checkpoint_sequence_number_gauge: IntGauge,

// Internal state to keep track of the highest checkpoint sequence number reported so far.
latest_reported_checkpoint: AtomicU64,
}

impl MetricsService {
/// Create a new metrics service, exposing Mysten-wide metrics, and Indexer-specific metrics.
/// Returns the Indexer-specific metrics and the service itself (which must be run with
/// [Self::run]).
pub(crate) fn new(
addr: SocketAddr,
db: Db,
cancel: CancellationToken,
) -> Result<(Arc<IndexerMetrics>, MetricsService)> {
let registry = Registry::new_custom(Some("indexer_alt".to_string()), None)?;

let metrics = IndexerMetrics::new(&registry);
registry.register(Box::new(DbConnectionStatsCollector::new(db)))?;

let service = Self {
addr,
registry,
cancel,
};

Ok((Arc::new(metrics), service))
}

/// Start the service. The service will run until the cancellation token is triggered.
pub(crate) async fn run(self) -> Result<JoinHandle<()>> {
let listener = TcpListener::bind(&self.addr).await?;
let app = Router::new()
.route("/metrics", get(metrics))
.layer(Extension(self.registry));

Ok(tokio::spawn(async move {
info!("Starting metrics service on {}", self.addr);
axum::serve(listener, app)
.with_graceful_shutdown(async move {
self.cancel.cancelled().await;
info!("Shutdown received, stopping metrics service");
})
.await
.unwrap();
}))
}
}

impl IndexerMetrics {
pub(crate) fn new(registry: &Registry) -> Self {
Self {
pub(crate) fn new(registry: &Registry) -> Arc<Self> {
Arc::new(Self {
total_ingested_checkpoints: register_int_counter_with_registry!(
"indexer_total_ingested_checkpoints",
"Total number of checkpoints fetched from the remote store",
Expand Down Expand Up @@ -651,7 +593,7 @@ impl IndexerMetrics {
registry,
)
.unwrap(),
}
})
}

/// Register that we're retrying a checkpoint fetch due to a transient error, logging the
Expand All @@ -673,7 +615,7 @@ impl IndexerMetrics {
}

impl DbConnectionStatsCollector {
fn new(db: Db) -> Self {
pub(crate) fn new(db: Db) -> Self {
let desc = vec![
(
MetricType::GAUGE,
Expand Down Expand Up @@ -808,17 +750,6 @@ impl CheckpointLagMetricReporter {
}
}

/// Route handler for metrics service
async fn metrics(Extension(registry): Extension<Registry>) -> (StatusCode, String) {
match TextEncoder.encode_to_string(&registry.gather()) {
Ok(s) => (StatusCode::OK, s),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
format!("unable to encode metrics: {e}"),
),
}
}

fn desc(name: &str, help: &str) -> Desc {
desc_with_labels(name, help, &[])
}
Expand Down Expand Up @@ -904,12 +835,14 @@ fn summary(desc: &Desc, sum: f64, count: u64) -> MetricFamily {

#[cfg(test)]
pub(crate) mod tests {
use std::sync::Arc;

use prometheus::Registry;

use super::IndexerMetrics;

/// Construct metrics for test purposes.
pub fn test_metrics() -> IndexerMetrics {
pub fn test_metrics() -> Arc<IndexerMetrics> {
IndexerMetrics::new(&Registry::new())
}
}
Loading