Skip to content

Commit

Permalink
feat(node_framework): Ergonomic improvements (#1453)
Browse files Browse the repository at this point in the history
## What ❔

- Split ZkStackService into the main struct and builder for ease of use.
- Introduce strong typing for ZkStackErrors.
- Add a showcase example.

## Why ❔

Quality of life improvements.

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [ ] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [ ] Code has been formatted via `zk fmt` and `zk lint`.
- [ ] Spellcheck has been run via `zk spellcheck`.
- [ ] Linkcheck has been run via `zk linkcheck`.
  • Loading branch information
popzxc authored Mar 19, 2024
1 parent f9f4d38 commit 09b6887
Show file tree
Hide file tree
Showing 7 changed files with 319 additions and 77 deletions.
12 changes: 6 additions & 6 deletions core/node/node_framework/examples/main_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,17 @@ use zksync_node_framework::{
tx_sink::TxSinkLayer,
},
},
service::ZkStackService,
service::{ZkStackService, ZkStackServiceBuilder, ZkStackServiceError},
};

struct MainNodeBuilder {
node: ZkStackService,
node: ZkStackServiceBuilder,
}

impl MainNodeBuilder {
fn new() -> Self {
Self {
node: ZkStackService::new().expect("Failed to initialize the node"),
node: ZkStackServiceBuilder::new(),
}
}

Expand Down Expand Up @@ -282,8 +282,8 @@ impl MainNodeBuilder {
Ok(self)
}

fn build(self) -> ZkStackService {
self.node
fn build(mut self) -> Result<ZkStackService, ZkStackServiceError> {
self.node.build()
}
}

Expand Down Expand Up @@ -315,7 +315,7 @@ fn main() -> anyhow::Result<()> {
.add_ws_web3_api_layer()?
.add_house_keeper_layer()?
.add_commitment_generator_layer()?
.build()
.build()?
.run()?;

Ok(())
Expand Down
214 changes: 214 additions & 0 deletions core/node/node_framework/examples/showcase.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
//! This example is a showcase of the framework usage.
//! It demonstrates the main abstractions of a framework and shows how to use them.
use std::{
collections::HashMap,
sync::{Arc, Mutex},
time::Duration,
};

use zksync_node_framework::{
resource::{Resource, ResourceId},
service::{ServiceContext, StopReceiver, ZkStackServiceBuilder},
task::Task,
wiring_layer::{WiringError, WiringLayer},
};

/// This will be an example of a shared resource. Basically, something that can be used by multiple
/// tasks. In a real world, imagine that we have a `ConnectionPool` instead of this structure.
/// Or `GasAdjuster`.
#[derive(Debug, Clone)]
struct MemoryDatabase {
data: Arc<Mutex<HashMap<String, String>>>,
}

/// Often, we don't want to use a concrete resource. Instead, we want to be able
/// to customize the behavior for whoever uses it. For that, it's often useful to hide
/// the implementation behind a trait. This way it's the code that composes the service that
/// gets to decide how the resource is used.
///
/// Examples of such behavior in a real world: locally we store artifacts in a local storage,
/// but in real envs we use GCP. Alternatively, we have different resource implementations for
/// main node and EN, like `MempoolIO` and `ExternalIO`.
///
/// Whether it makes sense to hdie the actual resource behind a trait often depends on the resource
/// itself. For example, our DAL is massive and cannot realistically be changed easily, so it's OK
/// for it to be a concrete resource. But for anything that may realistically have two different
/// implementations, it's often a good idea to hide it behind a trait.
trait Database: 'static + Send + Sync {
fn put(&self, key: String, value: String);
fn get(&self, key: String) -> Option<String>;
}

impl Database for MemoryDatabase {
fn put(&self, key: String, value: String) {
self.data.lock().unwrap().insert(key, value);
}

fn get(&self, key: String) -> Option<String> {
self.data.lock().unwrap().get(&key).cloned()
}
}

/// An idiomatic way to create a resource is to prepare a wrapper for it.
/// This way we separate the logic of framework (which is primarily about glueing things together)
/// from an actual logic of the resource.
#[derive(Clone)]
struct DatabaseResource(pub Arc<dyn Database>);

/// Resource is mostly a marker trait for things that can be added and fetched from the service.
/// It requires things to be `Send`/`Sync` to share them across threads, `'static` (i.e. not bound
/// to any non-static lifetime) and also `Clone`, since any task may receive their own copy of a
/// resource.
///
/// For the latter requirement, there exists an `Unique` wrapper that can be used to store non-`Clone`
/// resources. It's not used in this example, but it's a useful thing to know about.
///
/// Finally, there are other wrappers for resources as well, like `ResourceCollection` and `LazyResource`.
impl Resource for DatabaseResource {
fn resource_id() -> ResourceId {
// The convention for resource names is `<scope>/<name>`. In this case, the scope is `common`, but
// for anything that is component-specific it could've been e.g. `state_keeper` or `api`.
"common/database".into()
}
}

/// Now that we have a resource, we can create a task that uses it.
/// This task will be putting values to the database.
struct PutTask {
// This is a resource that the task will use.
db: Arc<dyn Database>,
}

impl PutTask {
async fn run_inner(self) {
let mut counter = 0;
loop {
let key = format!("key_{}", counter);
let value = format!("value_{}", counter);
tracing::info!("Put key-value pair: {} -> {}", key, value);
self.db.put(key, value);
counter += 1;
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}

#[async_trait::async_trait]
impl Task for PutTask {
fn name(&self) -> &'static str {
// Task names simply have to be unique. They are used for logging and debugging.
"put_task"
}

/// This method will be invoked by the framework when the task is started.
async fn run(self: Box<Self>, mut stop_receiver: StopReceiver) -> anyhow::Result<()> {
tracing::info!("Starting the task {}", self.name());

// We have to respect the stop receiver and should exit as soon as we receive
// a stop signal.
tokio::select! {
_ = self.run_inner() => {},
_ = stop_receiver.0.changed() => {},
}

Ok(())
}
}

/// A second task that will be checking the contents of the database and use the same shared resource.
struct CheckTask {
db: Arc<dyn Database>,
}

impl CheckTask {
async fn run_inner(self) {
let mut counter = 0;
loop {
let key = format!("key_{}", counter);
let value = self.db.get(key.clone());
tracing::info!("Check key-value pair: {} -> {:?}", key, value);
if value.is_some() {
counter += 1;
}
tokio::time::sleep(Duration::from_millis(800)).await;
}
}
}

#[async_trait::async_trait]
impl Task for CheckTask {
fn name(&self) -> &'static str {
"check_task"
}

async fn run(self: Box<Self>, mut stop_receiver: StopReceiver) -> anyhow::Result<()> {
tracing::info!("Starting the task {}", self.name());

tokio::select! {
_ = self.run_inner() => {},
_ = stop_receiver.0.changed() => {},
}

Ok(())
}
}

/// Now we have to somehow add the resource and tasks to the service.
/// For that, there is a concept in the framework called `WiringLayer`.
/// It's a way to logically group a set of related tasks and resources in a composable way.
/// In real world, you usually want to extract shareable resources into their own wiring layer,
/// and keep tasks separately. Here we do exactly that: we'll use one layer to add a database
/// and another layer to fetch it. The benefit here is that if you want to swap the database
/// implementation, you only have to inject a different wiring layer for database, and the
/// wiring layers for the tasks will remain unchanged.
struct DatabaseLayer;

#[async_trait::async_trait]
impl WiringLayer for DatabaseLayer {
fn layer_name(&self) -> &'static str {
"database_layer"
}

/// `wire` method will be invoked by the service before the tasks are started.
async fn wire(self: Box<Self>, mut context: ServiceContext<'_>) -> Result<(), WiringError> {
let database = Arc::new(MemoryDatabase {
data: Arc::new(Mutex::new(HashMap::new())),
});
// We add the resource to the service context. This way it will be available for the tasks.
context.insert_resource(DatabaseResource(database))?;
Ok(())
}
}

/// Layer where we add tasks.
struct TasksLayer;

#[async_trait::async_trait]
impl WiringLayer for TasksLayer {
fn layer_name(&self) -> &'static str {
"tasks_layer"
}

async fn wire(self: Box<Self>, mut context: ServiceContext<'_>) -> Result<(), WiringError> {
// We fetch the database resource from the context.
// Note that we don't really care where it comes from or what's the actual implementation is.
// We only care whether it's available and bail out if not.
let db = context.get_resource::<DatabaseResource>().await?.0;
let put_task = PutTask { db: db.clone() };
let check_task = CheckTask { db };
// These tasks will be launched by the service once the wiring process is complete.
context.add_task(Box::new(put_task));
context.add_task(Box::new(check_task));
Ok(())
}
}

fn main() -> anyhow::Result<()> {
ZkStackServiceBuilder::new()
.add_layer(DatabaseLayer)
.add_layer(TasksLayer)
.build()?
.run()?;
Ok(())
}
10 changes: 9 additions & 1 deletion core/node/node_framework/src/resource/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::any::TypeId;
use std::{any::TypeId, fmt};

pub use self::{
lazy_resource::LazyResource, resource_collection::ResourceCollection, resource_id::ResourceId,
Expand Down Expand Up @@ -39,6 +39,14 @@ pub(crate) trait StoredResource: 'static + std::any::Any + Send + Sync {
fn stored_resource_wired(&mut self);
}

impl fmt::Debug for dyn StoredResource {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Resource")
.field("resource_id", &self.stored_resource_id())
.finish()
}
}

impl<T: Resource> StoredResource for T {
fn stored_resource_id(&self) -> ResourceId {
T::resource_id()
Expand Down
13 changes: 13 additions & 0 deletions core/node/node_framework/src/service/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use crate::wiring_layer::WiringError;

#[derive(Debug, thiserror::Error)]
pub enum ZkStackServiceError {
#[error("Detected a Tokio Runtime. ZkStackService manages its own runtime and does not support nested runtimes")]
RuntimeDetected,
#[error("No tasks have been added to the service")]
NoTasks,
#[error("One or more wiring layers failed to initialize: {0:?}")]
Wiring(Vec<(String, WiringError)>),
#[error(transparent)]
Task(#[from] anyhow::Error),
}
Loading

0 comments on commit 09b6887

Please sign in to comment.