Skip to content

Commit

Permalink
feat(node-framework): Add circuit breaker checker layer to framework (m…
Browse files Browse the repository at this point in the history
…atter-labs#1452)

Key thing to note - no more circuit breaker channel for signalling about
check failure. Instead, make the circuit breaker a part of the other
tasks and return an error instead of a signal throw channel. Motivation
- framework design requires it.

## What ❔
Port circuit breaker checker to the framework

## Why ❔
these changes is a part of the system porting process to the framework

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
- [x] Linkcheck has been run via `zk linkcheck`.
  • Loading branch information
AnastasiiaVashchuk authored Mar 26, 2024
1 parent 8c26a7a commit 2c7a6bf
Show file tree
Hide file tree
Showing 14 changed files with 202 additions and 64 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 1 addition & 8 deletions core/bin/zksync_server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ async fn main() -> anyhow::Result<()> {
};

// Run core actors.
let (core_task_handles, stop_sender, cb_receiver, health_check_handle) =
let (core_task_handles, stop_sender, health_check_handle) =
initialize_components(&configs, &components, &secrets)
.await
.context("Unable to start Core actors")?;
Expand All @@ -199,13 +199,6 @@ async fn main() -> anyhow::Result<()> {
_ = sigint_receiver => {
tracing::info!("Stop signal received, shutting down");
},
error = cb_receiver => {
if let Ok(error_msg) = error {
let err = format!("Circuit breaker received, shutting down. Reason: {}", error_msg);
tracing::warn!("{err}");
vlog::capture_message(&err, vlog::AlertLevel::Warning);
}
},
}

stop_sender.send(true).ok();
Expand Down
4 changes: 4 additions & 0 deletions core/lib/circuit_breaker/src/l1_txs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ pub struct FailedL1TransactionChecker {

#[async_trait::async_trait]
impl CircuitBreaker for FailedL1TransactionChecker {
fn name(&self) -> &'static str {
"failed_l1_transaction"
}

async fn check(&self) -> Result<(), CircuitBreakerError> {
let number_of_failed_transactions = self
.pool
Expand Down
58 changes: 36 additions & 22 deletions core/lib/circuit_breaker/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,34 @@
use std::{fmt, time::Duration};
use std::{fmt, sync::Arc, time::Duration};

use thiserror::Error;
use tokio::sync::{oneshot, watch};
use zksync_config::configs::chain::CircuitBreakerConfig;
use tokio::sync::{watch, Mutex};

pub mod l1_txs;
mod metrics;
pub mod replication_lag;

#[derive(Default, Debug)]
pub struct CircuitBreakers(Mutex<Vec<Box<dyn CircuitBreaker>>>);

impl CircuitBreakers {
pub async fn insert(&self, circuit_breaker: Box<dyn CircuitBreaker>) {
let mut guard = self.0.lock().await;
if !guard
.iter()
.any(|existing_breaker| existing_breaker.name() == circuit_breaker.name())
{
guard.push(circuit_breaker);
}
}

pub async fn check(&self) -> Result<(), CircuitBreakerError> {
for circuit_breaker in self.0.lock().await.iter() {
circuit_breaker.check().await?;
}
Ok(())
}
}

#[derive(Debug, Error)]
pub enum CircuitBreakerError {
#[error("System has failed L1 transaction")]
Expand All @@ -21,45 +42,38 @@ pub enum CircuitBreakerError {
/// Checks circuit breakers
#[derive(Debug)]
pub struct CircuitBreakerChecker {
circuit_breakers: Vec<Box<dyn CircuitBreaker>>,
circuit_breakers: Arc<CircuitBreakers>,
sync_interval: Duration,
sender: oneshot::Sender<CircuitBreakerError>,
}

#[async_trait::async_trait]
pub trait CircuitBreaker: fmt::Debug + Send + Sync {
fn name(&self) -> &'static str;

async fn check(&self) -> Result<(), CircuitBreakerError>;
}

impl CircuitBreakerChecker {
pub fn new(
circuit_breakers: Vec<Box<dyn CircuitBreaker>>,
config: &CircuitBreakerConfig,
) -> (Self, oneshot::Receiver<CircuitBreakerError>) {
let (sender, receiver) = oneshot::channel();
let this = Self {
pub fn new(circuit_breakers: Arc<CircuitBreakers>, sync_interval: Duration) -> Self {
Self {
circuit_breakers,
sync_interval: config.sync_interval(),
sender,
};
(this, receiver)
sync_interval,
}
}

pub async fn check(&self) -> Result<(), CircuitBreakerError> {
for circuit_breaker in &self.circuit_breakers {
circuit_breaker.check().await?;
}
self.circuit_breakers.check().await?;

Ok(())
}

pub async fn run(self, mut stop_receiver: watch::Receiver<bool>) -> anyhow::Result<()> {
tracing::info!("running circuit breaker checker...");
while !*stop_receiver.borrow_and_update() {
if let Err(error) = self.check().await {
return self
.sender
.send(error)
.map_err(|_| anyhow::anyhow!("failed to send circuit breaker message"));
return Err(anyhow::format_err!(
"Circuit breaker error. Reason: {error}"
));
}
// Error here corresponds to a timeout w/o `stop_receiver` changed; we're OK with this.
tokio::time::timeout(self.sync_interval, stop_receiver.changed())
Expand Down
4 changes: 4 additions & 0 deletions core/lib/circuit_breaker/src/replication_lag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ pub struct ReplicationLagChecker {

#[async_trait::async_trait]
impl CircuitBreaker for ReplicationLagChecker {
fn name(&self) -> &'static str {
"replication_lag"
}

async fn check(&self) -> Result<(), CircuitBreakerError> {
let lag = self
.pool
Expand Down
45 changes: 23 additions & 22 deletions core/lib/zksync_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use tokio::{
task::JoinHandle,
};
use zksync_circuit_breaker::{
l1_txs::FailedL1TransactionChecker, replication_lag::ReplicationLagChecker, CircuitBreaker,
CircuitBreakerChecker, CircuitBreakerError,
l1_txs::FailedL1TransactionChecker, replication_lag::ReplicationLagChecker,
CircuitBreakerChecker, CircuitBreakers,
};
use zksync_concurrency::{ctx, scope};
use zksync_config::{
Expand Down Expand Up @@ -227,7 +227,6 @@ pub async fn initialize_components(
) -> anyhow::Result<(
Vec<JoinHandle<anyhow::Result<()>>>,
watch::Sender<bool>,
oneshot::Receiver<CircuitBreakerError>,
HealthCheckHandle,
)> {
tracing::info!("Starting the components: {components:?}");
Expand Down Expand Up @@ -284,12 +283,14 @@ pub async fn initialize_components(
.clone()
.context("circuit_breaker_config")?;

let circuit_breakers =
circuit_breakers_for_components(components, &postgres_config, &circuit_breaker_config)
.await
.context("circuit_breakers_for_components")?;
let (circuit_breaker_checker, circuit_breaker_error) =
CircuitBreakerChecker::new(circuit_breakers, &circuit_breaker_config);
let circuit_breaker_checker = CircuitBreakerChecker::new(
Arc::new(
circuit_breakers_for_components(components, &postgres_config, &circuit_breaker_config)
.await
.context("circuit_breakers_for_components")?,
),
circuit_breaker_config.sync_interval(),
);
circuit_breaker_checker.check().await.unwrap_or_else(|err| {
panic!("Circuit breaker triggered: {}", err);
});
Expand Down Expand Up @@ -732,12 +733,8 @@ pub async fn initialize_components(
if let Some(task) = gas_adjuster.run_if_initialized(stop_receiver.clone()) {
task_futures.push(task);
}
Ok((
task_futures,
stop_sender,
circuit_breaker_error,
health_check_handle,
))

Ok((task_futures, stop_sender, health_check_handle))
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -1263,8 +1260,8 @@ async fn circuit_breakers_for_components(
components: &[Component],
postgres_config: &PostgresConfig,
circuit_breaker_config: &CircuitBreakerConfig,
) -> anyhow::Result<Vec<Box<dyn CircuitBreaker>>> {
let mut circuit_breakers: Vec<Box<dyn CircuitBreaker>> = Vec::new();
) -> anyhow::Result<CircuitBreakers> {
let circuit_breakers = CircuitBreakers::default();

if components
.iter()
Expand All @@ -1274,7 +1271,9 @@ async fn circuit_breakers_for_components(
.build()
.await
.context("failed to build a connection pool")?;
circuit_breakers.push(Box::new(FailedL1TransactionChecker { pool }));
circuit_breakers
.insert(Box::new(FailedL1TransactionChecker { pool }))
.await;
}

if components.iter().any(|c| {
Expand All @@ -1286,10 +1285,12 @@ async fn circuit_breakers_for_components(
let pool = ConnectionPool::<Core>::singleton(postgres_config.replica_url()?)
.build()
.await?;
circuit_breakers.push(Box::new(ReplicationLagChecker {
pool,
replication_lag_limit_sec: circuit_breaker_config.replication_lag_limit_sec,
}));
circuit_breakers
.insert(Box::new(ReplicationLagChecker {
pool,
replication_lag_limit_sec: circuit_breaker_config.replication_lag_limit_sec,
}))
.await;
}
Ok(circuit_breakers)
}
1 change: 1 addition & 0 deletions core/node/node_framework/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ zksync_eth_client.workspace = true
zksync_contracts.workspace = true
zksync_web3_decl.workspace = true
zksync_utils.workspace = true
zksync_circuit_breaker.workspace = true

tracing.workspace = true
thiserror.workspace = true
Expand Down
17 changes: 16 additions & 1 deletion core/node/node_framework/examples/main_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
use anyhow::Context;
use zksync_config::{
configs::{
chain::{MempoolConfig, NetworkConfig, OperationsManagerConfig, StateKeeperConfig},
chain::{
CircuitBreakerConfig, MempoolConfig, NetworkConfig, OperationsManagerConfig,
StateKeeperConfig,
},
fri_prover_group::FriProverGroupConfig,
house_keeper::HouseKeeperConfig,
FriProofCompressorConfig, FriProverConfig, FriWitnessGeneratorConfig, ObservabilityConfig,
Expand All @@ -24,6 +27,7 @@ use zksync_core::{
use zksync_env_config::FromEnv;
use zksync_node_framework::{
implementations::layers::{
circuit_breaker_checker::CircuitBreakerCheckerLayer,
commitment_generator::CommitmentGeneratorLayer,
contract_verification_api::ContractVerificationApiLayer,
eth_sender::EthSenderLayer,
Expand Down Expand Up @@ -233,6 +237,7 @@ impl MainNodeBuilder {
let contracts_config = ContractsConfig::from_env()?;
let network_config = NetworkConfig::from_env()?;
let state_keeper_config = StateKeeperConfig::from_env()?;
let circuit_breaker_config = CircuitBreakerConfig::from_env()?;
let with_debug_namespace = state_keeper_config.save_call_traces;

let mut namespaces = Namespace::DEFAULT.to_vec();
Expand All @@ -250,6 +255,7 @@ impl MainNodeBuilder {
websocket_requests_per_minute_limit: Some(
rpc_config.websocket_requests_per_minute_limit(),
),
replication_lag_limit_sec: circuit_breaker_config.replication_lag_limit_sec,
};
self.node.add_layer(Web3ServerLayer::ws(
rpc_config.ws_port,
Expand Down Expand Up @@ -299,6 +305,14 @@ impl MainNodeBuilder {
Ok(self)
}

fn add_circuit_breaker_checker_layer(mut self) -> anyhow::Result<Self> {
let circuit_breaker_config = CircuitBreakerConfig::from_env()?;
self.node
.add_layer(CircuitBreakerCheckerLayer(circuit_breaker_config));

Ok(self)
}

fn add_contract_verification_api_layer(mut self) -> anyhow::Result<Self> {
let config = ApiConfig::from_env()?.contract_verification;
self.node.add_layer(ContractVerificationApiLayer(config));
Expand All @@ -324,6 +338,7 @@ fn main() -> anyhow::Result<()> {
MainNodeBuilder::new()
.add_sigint_handler_layer()?
.add_pools_layer()?
.add_circuit_breaker_checker_layer()?
.add_query_eth_client_layer()?
.add_sequencer_l1_gas_layer()?
.add_object_store_layer()?
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use zksync_circuit_breaker::CircuitBreakerChecker;
use zksync_config::configs::chain::CircuitBreakerConfig;

use crate::{
implementations::resources::circuit_breakers::CircuitBreakersResource,
service::{ServiceContext, StopReceiver},
task::UnconstrainedTask,
wiring_layer::{WiringError, WiringLayer},
};

#[derive(Debug)]
pub struct CircuitBreakerCheckerLayer(pub CircuitBreakerConfig);

#[async_trait::async_trait]
impl WiringLayer for CircuitBreakerCheckerLayer {
fn layer_name(&self) -> &'static str {
"circuit_breaker_checker_layer"
}

async fn wire(self: Box<Self>, mut node: ServiceContext<'_>) -> Result<(), WiringError> {
// Get resources.
let circuit_breaker_resource = node
.get_resource_or_default::<CircuitBreakersResource>()
.await;

let circuit_breaker_checker =
CircuitBreakerChecker::new(circuit_breaker_resource.breakers, self.0.sync_interval());

// Create and insert task.
let task = CircuitBreakerCheckerTask {
circuit_breaker_checker,
};

node.add_unconstrained_task(Box::new(task));
Ok(())
}
}

#[derive(Debug)]
struct CircuitBreakerCheckerTask {
circuit_breaker_checker: CircuitBreakerChecker,
}

#[async_trait::async_trait]
impl UnconstrainedTask for CircuitBreakerCheckerTask {
fn name(&self) -> &'static str {
"circuit_breaker_checker"
}

async fn run_unconstrained(
mut self: Box<Self>,
stop_receiver: StopReceiver,
) -> anyhow::Result<()> {
self.circuit_breaker_checker.run(stop_receiver.0).await
}
}
Loading

0 comments on commit 2c7a6bf

Please sign in to comment.