Skip to content

Commit

Permalink
chore(node_framework): Emit logs on context/resource access (matter-l…
Browse files Browse the repository at this point in the history
…abs#1075)

## What ❔

- Adds more logs when the node is being configured. We can certainly
improve it further (e.g. print task name in resource-related logs or use
prettier type output for Unique), but I think it's a good starting
point.

## Why ❔

- Logs are helpful for debugging, especially in dynamic systems like the
new framework.

New logs look roughly as follows:

![image](https://github.com/matter-labs/zksync-era/assets/12111581/cd99dd63-6e72-41cb-bedd-db5827e628f1)
  • Loading branch information
popzxc authored Feb 15, 2024
1 parent 776337a commit 82c788e
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 17 deletions.
5 changes: 5 additions & 0 deletions core/lib/dal/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ impl ConnectionPoolBuilder {
self
}

/// Returns the maximum number of connections that can be allocated by the pool.
pub fn max_size(&self) -> u32 {
self.max_size
}

/// Builds a connection pool from this builder.
pub async fn build(&self) -> anyhow::Result<ConnectionPool> {
let options = PgPoolOptions::new()
Expand Down
110 changes: 98 additions & 12 deletions core/node/node_framework/src/implementations/resources/pools.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
use std::sync::{
atomic::{AtomicU32, Ordering},
Arc,
};

use zksync_dal::{connection::ConnectionPoolBuilder, ConnectionPool};

use crate::resource::Resource;

/// Represents a connection pool to the master database.
#[derive(Debug, Clone)]
pub struct MasterPoolResource(ConnectionPoolBuilder);
pub struct MasterPoolResource {
connections_count: Arc<AtomicU32>,
builder: ConnectionPoolBuilder,
}

impl Resource for MasterPoolResource {
fn resource_id() -> crate::resource::ResourceId {
Expand All @@ -14,21 +22,48 @@ impl Resource for MasterPoolResource {

impl MasterPoolResource {
pub fn new(builder: ConnectionPoolBuilder) -> Self {
Self(builder)
Self {
connections_count: Arc::new(AtomicU32::new(0)),
builder,
}
}

pub async fn get(&self) -> anyhow::Result<ConnectionPool> {
self.0.build().await
let result = self.builder.build().await;

if result.is_ok() {
self.connections_count
.fetch_add(self.builder.max_size(), Ordering::Relaxed);
let total_connections = self.connections_count.load(Ordering::Relaxed);
tracing::info!(
"Created a new master pool. Master pool total connections count: {total_connections}"
);
}

result
}

pub async fn get_singleton(&self) -> anyhow::Result<ConnectionPool> {
self.0.build_singleton().await
let result = self.builder.build_singleton().await;

if result.is_ok() {
self.connections_count.fetch_add(1, Ordering::Relaxed);
let total_connections = self.connections_count.load(Ordering::Relaxed);
tracing::info!(
"Created a new master pool. Master pool total connections count: {total_connections}"
);
}

result
}
}

/// Represents a connection pool to the replica database.
#[derive(Debug, Clone)]
pub struct ReplicaPoolResource(ConnectionPoolBuilder);
pub struct ReplicaPoolResource {
connections_count: Arc<AtomicU32>,
builder: ConnectionPoolBuilder,
}

impl Resource for ReplicaPoolResource {
fn resource_id() -> crate::resource::ResourceId {
Expand All @@ -38,21 +73,48 @@ impl Resource for ReplicaPoolResource {

impl ReplicaPoolResource {
pub fn new(builder: ConnectionPoolBuilder) -> Self {
Self(builder)
Self {
connections_count: Arc::new(AtomicU32::new(0)),
builder,
}
}

pub async fn get(&self) -> anyhow::Result<ConnectionPool> {
self.0.build().await
let result = self.builder.build().await;

if result.is_ok() {
self.connections_count
.fetch_add(self.builder.max_size(), Ordering::Relaxed);
let total_connections = self.connections_count.load(Ordering::Relaxed);
tracing::info!(
"Created a new replica pool. Master pool total connections count: {total_connections}"
);
}

result
}

pub async fn get_singleton(&self) -> anyhow::Result<ConnectionPool> {
self.0.build_singleton().await
let result = self.builder.build_singleton().await;

if result.is_ok() {
self.connections_count.fetch_add(1, Ordering::Relaxed);
let total_connections = self.connections_count.load(Ordering::Relaxed);
tracing::info!(
"Created a new replica pool. Master pool total connections count: {total_connections}"
);
}

result
}
}

/// Represents a connection pool to the prover database.
#[derive(Debug, Clone)]
pub struct ProverPoolResource(ConnectionPoolBuilder);
pub struct ProverPoolResource {
connections_count: Arc<AtomicU32>,
builder: ConnectionPoolBuilder,
}

impl Resource for ProverPoolResource {
fn resource_id() -> crate::resource::ResourceId {
Expand All @@ -62,14 +124,38 @@ impl Resource for ProverPoolResource {

impl ProverPoolResource {
pub fn new(builder: ConnectionPoolBuilder) -> Self {
Self(builder)
Self {
connections_count: Arc::new(AtomicU32::new(0)),
builder,
}
}

pub async fn get(&self) -> anyhow::Result<ConnectionPool> {
self.0.build().await
let result = self.builder.build().await;

if result.is_ok() {
self.connections_count
.fetch_add(self.builder.max_size(), Ordering::Relaxed);
let total_connections = self.connections_count.load(Ordering::Relaxed);
tracing::info!(
"Created a new prover pool. Master pool total connections count: {total_connections}"
);
}

result
}

pub async fn get_singleton(&self) -> anyhow::Result<ConnectionPool> {
self.0.build_singleton().await
let result = self.builder.build_singleton().await;

if result.is_ok() {
self.connections_count.fetch_add(1, Ordering::Relaxed);
let total_connections = self.connections_count.load(Ordering::Relaxed);
tracing::info!(
"Created a new prover pool. Master pool total connections count: {total_connections}"
);
}

result
}
}
10 changes: 9 additions & 1 deletion core/node/node_framework/src/resource/lazy_resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl<T: Resource + Clone> LazyResource<T> {
return Ok(resource.clone());
}

tokio::select! {
let result = tokio::select! {
_ = self.stop_receiver.0.changed() => {
Err(LazyResourceError::NodeShutdown)
}
Expand All @@ -61,7 +61,13 @@ impl<T: Resource + Clone> LazyResource<T> {
let resource = resolve_receiver.borrow().as_ref().expect("Can only change if provided").clone();
Ok(resource)
}
};

if result.is_ok() {
tracing::info!("Lazy resource {} has been resolved", T::resource_id());
}

result
}

/// Provides the resource.
Expand All @@ -79,6 +85,8 @@ impl<T: Resource + Clone> LazyResource<T> {
return Err(LazyResourceError::ResourceAlreadyProvided);
}

tracing::info!("Lazy resource {} has been provided", T::resource_id());

Ok(())
}
}
Expand Down
9 changes: 9 additions & 0 deletions core/node/node_framework/src/resource/resource_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ impl<T: Resource + Clone> ResourceCollection<T> {

let mut handle = self.resources.lock().unwrap();
handle.push(resource);
tracing::info!(
"A new item has been added to the resource collection {}",
Self::resource_id()
);
Ok(())
}

Expand All @@ -100,6 +104,11 @@ impl<T: Resource + Clone> ResourceCollection<T> {
// some tasks would spawn something from the `IntoZkSyncTask` impl.
self.wired.changed().await.expect("Sender can't be dropped");

tracing::info!(
"Resource collection {} has been resolved",
Self::resource_id()
);

let handle = self.resources.lock().unwrap();
(*handle).clone()
}
Expand Down
11 changes: 10 additions & 1 deletion core/node/node_framework/src/resource/unique.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ impl<T: 'static + Send> Unique<T> {

/// Takes the resource from the container.
pub fn take(&self) -> Option<T> {
self.inner.lock().unwrap().take()
let result = self.inner.lock().unwrap().take();

if result.is_some() {
tracing::info!(
"Resource {} has been taken",
std::any::type_name::<Unique<T>>()
);
}

result
}
}
32 changes: 30 additions & 2 deletions core/node/node_framework/src/service/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ use crate::{
/// Provides the ability to fetch required resources, and also gives access to the Tokio runtime handle.
#[derive(Debug)]
pub struct ServiceContext<'a> {
layer: &'a str,
service: &'a mut ZkStackService,
}

impl<'a> ServiceContext<'a> {
pub(super) fn new(service: &'a mut ZkStackService) -> Self {
Self { service }
pub(super) fn new(layer: &'a str, service: &'a mut ZkStackService) -> Self {
Self { layer, service }
}

/// Provides access to the runtime used by the service.
Expand All @@ -24,12 +25,17 @@ impl<'a> ServiceContext<'a> {
///
/// In most cases, however, it is recommended to use [`add_task`] method instead.
pub fn runtime_handle(&self) -> &tokio::runtime::Handle {
tracing::info!(
"Layer {} has requested access to the Tokio runtime",
self.layer
);
self.service.runtime.handle()
}

/// Adds a task to the service.
/// Added tasks will be launched after the wiring process will be finished.
pub fn add_task(&mut self, task: Box<dyn Task>) -> &mut Self {
tracing::info!("Layer {} has added a new task: {}", self.layer, task.name());
self.service.tasks.push(task);
self
}
Expand Down Expand Up @@ -59,9 +65,16 @@ impl<'a> ServiceContext<'a> {
let name = T::resource_id();
// Check whether the resource is already available.
if let Some(resource) = self.service.resources.get(&name) {
tracing::info!("Layer {} has requested resource {}", self.layer, name);
return Ok(downcast_clone(resource));
}

tracing::info!(
"Layer {} has requested resource {}, but it is not available",
self.layer,
name
);

// No such resource.
// The requester is allowed to decide whether this is an error or not.
Err(WiringError::ResourceLacking(T::resource_id()))
Expand All @@ -82,6 +95,11 @@ impl<'a> ServiceContext<'a> {
self.service
.resources
.insert(T::resource_id(), Box::new(resource.clone()));
tracing::info!(
"Layer {} has created a new resource {}",
self.layer,
T::resource_id()
);
resource
}

Expand All @@ -96,9 +114,19 @@ impl<'a> ServiceContext<'a> {
pub fn insert_resource<T: Resource>(&mut self, resource: T) -> Result<(), WiringError> {
let name = T::resource_id();
if self.service.resources.contains_key(&name) {
tracing::warn!(
"Layer {} has attempted to provide resource {}, but it is already available",
self.layer,
name
);
return Err(WiringError::ResourceAlreadyProvided(name));
}
self.service.resources.insert(name, Box::new(resource));
tracing::info!(
"Layer {} has provided a new resource {}",
self.layer,
T::resource_id()
);
Ok(())
}
}
3 changes: 2 additions & 1 deletion core/node/node_framework/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ impl ZkStackService {
let runtime_handle = self.runtime.handle().clone();
for layer in wiring_layers {
let name = layer.layer_name().to_string();
let task_result = runtime_handle.block_on(layer.wire(ServiceContext::new(&mut self)));
let task_result =
runtime_handle.block_on(layer.wire(ServiceContext::new(&name, &mut self)));
if let Err(err) = task_result {
// We don't want to bail on the first error, since it'll provide worse DevEx:
// People likely want to fix as much problems as they can in one go, rather than have
Expand Down

0 comments on commit 82c788e

Please sign in to comment.