Skip to content

Commit

Permalink
fix(api): Fix API server shutdown flow (matter-labs#1425)
Browse files Browse the repository at this point in the history
## What ❔

- API server now doesn't (gracefully) shut down immediately after
receiving a signal. Instead, it waits for some time (until the inbound
traffic reaches zero), and then starts the graceful shutdown.
- Some tasks run by the main node are now shutdown-aware.

## Why ❔

If the server starts shutting down immediately after receiving a signal
(as it does now), this means that it will drop any new traffic, which is
not what the K8s load balancer expects.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
- [x] Linkcheck has been run via `zk linkcheck`.
  • Loading branch information
slowli authored Mar 25, 2024
1 parent 1245005 commit 780f6b0
Show file tree
Hide file tree
Showing 27 changed files with 542 additions and 245 deletions.
8 changes: 1 addition & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 6 additions & 8 deletions core/bin/contract-verifier/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::cell::RefCell;
use std::{cell::RefCell, time::Duration};

use anyhow::Context as _;
use futures::{channel::mpsc, executor::block_on, SinkExt, StreamExt};
Expand All @@ -11,7 +11,7 @@ use zksync_config::{
use zksync_dal::{ConnectionPool, Core, CoreDal};
use zksync_env_config::FromEnv;
use zksync_queued_job_processor::JobProcessor;
use zksync_utils::wait_for_tasks::wait_for_tasks;
use zksync_utils::wait_for_tasks::ManagedTasks;

use crate::verifier::ContractVerifier;

Expand Down Expand Up @@ -189,18 +189,16 @@ async fn main() -> anyhow::Result<()> {
),
];

let particular_crypto_alerts = None;
let graceful_shutdown = None::<futures::future::Ready<()>>;
let tasks_allowed_to_finish = false;
let mut tasks = ManagedTasks::new(tasks);
tokio::select! {
_ = wait_for_tasks(tasks, particular_crypto_alerts, graceful_shutdown, tasks_allowed_to_finish) => {},
() = tasks.wait_single() => {},
_ = stop_signal_receiver.next() => {
tracing::info!("Stop signal received, shutting down");
},
};
let _ = stop_sender.send(true);
stop_sender.send_replace(true);

// Sleep for some time to let verifier gracefully stop.
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
tasks.complete(Duration::from_secs(5)).await;
Ok(())
}
23 changes: 11 additions & 12 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use anyhow::Context as _;
use clap::Parser;
use metrics::EN_METRICS;
use prometheus_exporter::PrometheusExporterConfig;
use tokio::{sync::watch, task, time::sleep};
use tokio::{sync::watch, task};
use zksync_basic_types::{Address, L2ChainId};
use zksync_concurrency::{ctx, limiter, scope, time};
use zksync_config::configs::database::MerkleTreeMode;
Expand Down Expand Up @@ -38,7 +38,7 @@ use zksync_db_connection::healthcheck::ConnectionPoolHealthCheck;
use zksync_health_check::{AppHealthCheck, HealthStatus, ReactiveHealthCheck};
use zksync_state::PostgresStorageCaches;
use zksync_storage::RocksDB;
use zksync_utils::wait_for_tasks::wait_for_tasks;
use zksync_utils::wait_for_tasks::ManagedTasks;
use zksync_web3_decl::jsonrpsee::http_client::HttpClient;

use crate::{
Expand Down Expand Up @@ -431,15 +431,17 @@ async fn init_tasks(

async fn shutdown_components(
stop_sender: watch::Sender<bool>,
tasks: ManagedTasks,
healthcheck_handle: HealthCheckHandle,
) {
) -> anyhow::Result<()> {
stop_sender.send(true).ok();
task::spawn_blocking(RocksDB::await_rocksdb_termination)
.await
.unwrap();
// Sleep for some time to let components gracefully stop.
sleep(Duration::from_secs(10)).await;
.context("error waiting for RocksDB instances to drop")?;
// Increase timeout because of complicated graceful shutdown procedure for API servers.
tasks.complete(Duration::from_secs(30)).await;
healthcheck_handle.stop().await;
Ok(())
}

/// External node for zkSync Era.
Expand Down Expand Up @@ -628,20 +630,17 @@ async fn main() -> anyhow::Result<()> {
.await
.context("init_tasks")?;

let particular_crypto_alerts = None;
let graceful_shutdown = None::<futures::future::Ready<()>>;
let tasks_allowed_to_finish = false;

let mut tasks = ManagedTasks::new(task_handles);
tokio::select! {
_ = wait_for_tasks(task_handles, particular_crypto_alerts, graceful_shutdown, tasks_allowed_to_finish) => {},
_ = tasks.wait_single() => {},
_ = sigint_receiver => {
tracing::info!("Stop signal received, shutting down");
},
};

// Reaching this point means that either some actor exited unexpectedly or we received a stop signal.
// Broadcast the stop signal to all actors and exit.
shutdown_components(stop_sender, healthcheck_handle).await;
shutdown_components(stop_sender, tasks, healthcheck_handle).await?;
tracing::info!("Stopped");
Ok(())
}
22 changes: 13 additions & 9 deletions core/bin/zksync_server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use zksync_core::{
};
use zksync_env_config::FromEnv;
use zksync_storage::RocksDB;
use zksync_utils::wait_for_tasks::wait_for_tasks;
use zksync_utils::wait_for_tasks::ManagedTasks;

mod config;

Expand Down Expand Up @@ -183,17 +183,15 @@ async fn main() -> anyhow::Result<()> {

// Run core actors.
let (core_task_handles, stop_sender, cb_receiver, health_check_handle) =
initialize_components(&configs, components, &secrets)
initialize_components(&configs, &components, &secrets)
.await
.context("Unable to start Core actors")?;

tracing::info!("Running {} core task handlers", core_task_handles.len());

let particular_crypto_alerts = None::<Vec<String>>;
let graceful_shutdown = None::<futures::future::Ready<()>>;
let tasks_allowed_to_finish = false;
let mut tasks = ManagedTasks::new(core_task_handles);
tokio::select! {
_ = wait_for_tasks(core_task_handles, particular_crypto_alerts, graceful_shutdown, tasks_allowed_to_finish) => {},
_ = tasks.wait_single() => {},
_ = sigint_receiver => {
tracing::info!("Stop signal received, shutting down");
},
Expand All @@ -209,9 +207,15 @@ async fn main() -> anyhow::Result<()> {
stop_sender.send(true).ok();
tokio::task::spawn_blocking(RocksDB::await_rocksdb_termination)
.await
.unwrap();
// Sleep for some time to let some components gracefully stop.
tokio::time::sleep(Duration::from_secs(5)).await;
.context("error waiting for RocksDB instances to drop")?;
let complete_timeout =
if components.contains(&Component::HttpApi) || components.contains(&Component::WsApi) {
// Increase timeout because of complicated graceful shutdown procedure for API servers.
Duration::from_secs(30)
} else {
Duration::from_secs(5)
};
tasks.complete(complete_timeout).await;
health_check_handle.stop().await;
tracing::info!("Stopped");
Ok(())
Expand Down
7 changes: 1 addition & 6 deletions core/lib/circuit_breaker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,13 @@ keywords.workspace = true
categories.workspace = true

[dependencies]
zksync_types.workspace = true
vise.workspace = true
zksync_config.workspace = true
zksync_contracts.workspace = true
zksync_dal.workspace = true
thiserror.workspace = true
serde_json.workspace = true
futures.workspace = true
tokio = { workspace = true, features = ["time"] }
anyhow.workspace = true
async-trait.workspace = true
hex.workspace = true
metrics.workspace = true
tracing.workspace = true

[dev-dependencies]
Expand Down
13 changes: 6 additions & 7 deletions core/lib/circuit_breaker/src/l1_txs.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use anyhow::Context as _;
use zksync_dal::{ConnectionPool, Core, CoreDal};

use crate::{CircuitBreaker, CircuitBreakerError};
Expand All @@ -10,17 +11,15 @@ pub struct FailedL1TransactionChecker {
#[async_trait::async_trait]
impl CircuitBreaker for FailedL1TransactionChecker {
async fn check(&self) -> Result<(), CircuitBreakerError> {
if self
let number_of_failed_transactions = self
.pool
.connection()
.await
.unwrap()
.connection_tagged("circuit_breaker")
.await?
.eth_sender_dal()
.get_number_of_failed_transactions()
.await
.unwrap()
> 0
{
.context("cannot get number of failed L1 transactions")?;
if number_of_failed_transactions > 0 {
return Err(CircuitBreakerError::FailedL1Transaction);
}
Ok(())
Expand Down
47 changes: 24 additions & 23 deletions core/lib/circuit_breaker/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,48 @@
use std::time::Duration;
use std::{fmt, time::Duration};

use anyhow::Context as _;
use futures::channel::oneshot;
use thiserror::Error;
use tokio::sync::watch;
use tokio::sync::{oneshot, watch};
use zksync_config::configs::chain::CircuitBreakerConfig;

pub mod l1_txs;
mod metrics;
pub mod replication_lag;
pub mod utils;

#[derive(Debug, Error)]
pub enum CircuitBreakerError {
#[error("System has failed L1 transaction")]
FailedL1Transaction,
#[error("Replication lag ({0:?}) is above the threshold ({1:?})")]
#[error("Replication lag ({0}) is above the threshold ({1})")]
ReplicationLag(u32, u32),
#[error("Internal error running circuit breaker checks")]
Internal(#[from] anyhow::Error),
}

/// Checks circuit breakers
#[derive(Debug)]
pub struct CircuitBreakerChecker {
circuit_breakers: Vec<Box<dyn CircuitBreaker>>,
sync_interval: Duration,
sender: oneshot::Sender<CircuitBreakerError>,
}

#[async_trait::async_trait]
pub trait CircuitBreaker: std::fmt::Debug + Send + Sync {
pub trait CircuitBreaker: fmt::Debug + Send + Sync {
async fn check(&self) -> Result<(), CircuitBreakerError>;
}

impl CircuitBreakerChecker {
pub fn new(
circuit_breakers: Vec<Box<dyn CircuitBreaker>>,
config: &CircuitBreakerConfig,
) -> Self {
Self {
) -> (Self, oneshot::Receiver<CircuitBreakerError>) {
let (sender, receiver) = oneshot::channel();
let this = Self {
circuit_breakers,
sync_interval: config.sync_interval(),
}
sender,
};
(this, receiver)
}

pub async fn check(&self) -> Result<(), CircuitBreakerError> {
Expand All @@ -48,24 +52,21 @@ impl CircuitBreakerChecker {
Ok(())
}

pub async fn run(
self,
circuit_breaker_sender: oneshot::Sender<CircuitBreakerError>,
stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<()> {
pub async fn run(self, mut stop_receiver: watch::Receiver<bool>) -> anyhow::Result<()> {
tracing::info!("running circuit breaker checker...");
loop {
if *stop_receiver.borrow() {
break;
}
while !*stop_receiver.borrow_and_update() {
if let Err(error) = self.check().await {
return circuit_breaker_sender
return self
.sender
.send(error)
.ok()
.context("failed to send circuit breaker message");
.map_err(|_| anyhow::anyhow!("failed to send circuit breaker message"));
}
tokio::time::sleep(self.sync_interval).await;
// Error here corresponds to a timeout w/o `stop_receiver` changed; we're OK with this.
tokio::time::timeout(self.sync_interval, stop_receiver.changed())
.await
.ok();
}
tracing::info!("received a stop signal; circuit breaker is shut down");
Ok(())
}
}
13 changes: 13 additions & 0 deletions core/lib/circuit_breaker/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
//! Circuit breaker metrics.
use vise::{Gauge, Global, Metrics};

#[derive(Debug, Metrics)]
#[metrics(prefix = "circuit_breaker")]
pub(crate) struct CircuitBreakerMetrics {
/// Replication lag for Postgres in seconds.
pub replication_lag: Gauge<u64>,
}

#[vise::register]
pub(crate) static METRICS: Global<CircuitBreakerMetrics> = Global::new();
13 changes: 7 additions & 6 deletions core/lib/circuit_breaker/src/replication_lag.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use anyhow::Context as _;
use zksync_dal::{ConnectionPool, Core, CoreDal};

use crate::{CircuitBreaker, CircuitBreakerError};
use crate::{metrics::METRICS, CircuitBreaker, CircuitBreakerError};

#[derive(Debug)]
pub struct ReplicationLagChecker {
Expand All @@ -13,14 +14,14 @@ impl CircuitBreaker for ReplicationLagChecker {
async fn check(&self) -> Result<(), CircuitBreakerError> {
let lag = self
.pool
.connection()
.await
.unwrap()
.connection_tagged("circuit_breaker")
.await?
.system_dal()
.get_replication_lag_sec()
.await;
.await
.context("failed getting replication lag")?;
METRICS.replication_lag.set(lag.into());

metrics::gauge!("circuit_breaker.replication_lag", lag as f64);
match self.replication_lag_limit_sec {
Some(replication_lag_limit_sec) if lag > replication_lag_limit_sec => Err(
CircuitBreakerError::ReplicationLag(lag, replication_lag_limit_sec),
Expand Down
9 changes: 0 additions & 9 deletions core/lib/circuit_breaker/src/utils.rs

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 780f6b0

Please sign in to comment.