Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reorganize Server #333

Merged
merged 1 commit into from
Feb 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions fastpay/src/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
#![deny(warnings)]

use bytes::Bytes;
use fastpay::server_lib;
use fastpay_core::authority::*;
use fastpay_core::{authority::*, authority_server::AuthorityServer};
use fastx_network::{network::NetworkClient, transport};
use fastx_types::FASTX_FRAMEWORK_ADDRESS;
use fastx_types::{base_types::*, committee::*, messages::*, object::Object, serialize::*};
Expand Down Expand Up @@ -270,7 +269,7 @@ impl ClientServerBenchmark {
}

async fn spawn_server(&self, state: AuthorityState) -> transport::SpawnedServer {
let server = server_lib::Server::new(self.host.clone(), self.port, state, self.buffer_size);
let server = AuthorityServer::new(self.host.clone(), self.port, self.buffer_size, state);
server.spawn().await.unwrap()
}

Expand Down
9 changes: 0 additions & 9 deletions fastpay/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,4 @@
// Copyright (c) Facebook, Inc. and its affiliates.
// SPDX-License-Identifier: Apache-2.0

#![warn(
future_incompatible,
nonstandard_style,
rust_2018_idioms,
rust_2021_compatibility
)]
#![deny(warnings)]

pub mod config;
pub mod server_lib;
10 changes: 5 additions & 5 deletions fastpay/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

#![deny(warnings)]

use fastpay::{config::*, server_lib};
use fastpay_core::authority::*;
use fastpay::config::*;
use fastpay_core::{authority::*, authority_server::AuthorityServer};
use fastx_network::transport;
use fastx_types::{base_types::*, committee::Committee, object::Object};

Expand All @@ -24,7 +24,7 @@ fn make_server(
committee_config_path: &str,
initial_accounts_config_path: &str,
buffer_size: usize,
) -> server_lib::Server {
) -> AuthorityServer {
let server_config =
AuthorityServerConfig::read(server_config_path).expect("Fail to read server config");
let committee_config =
Expand Down Expand Up @@ -62,11 +62,11 @@ fn make_server(
state
});

server_lib::Server::new(
AuthorityServer::new(
local_ip_addr.to_string(),
server_config.authority.base_port,
state,
buffer_size,
state,
)
}

Expand Down
1 change: 1 addition & 0 deletions fastpay_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ parking_lot = "0.12.0"
itertools = "0.10.3"
async-trait = "0.1.52"
tempfile = "3.3.0"
tracing = { version = "0.1", features = ["log"] }

fastx-adapter = { path = "../fastx_programmability/adapter" }
fastx-framework = { path = "../fastx_programmability/framework" }
Expand Down
61 changes: 17 additions & 44 deletions fastpay/src/server_lib.rs → fastpay_core/src/authority_server.rs
Original file line number Diff line number Diff line change
@@ -1,68 +1,48 @@
// Copyright (c) Facebook, Inc. and its affiliates.
// SPDX-License-Identifier: Apache-2.0

use fastpay_core::authority::*;
use fastx_network::transport::*;
use crate::authority::AuthorityState;
use fastx_network::{
network::NetworkServer,
transport::{spawn_server, MessageHandler, SpawnedServer},
};
use fastx_types::{error::*, messages::*, serialize::*};

use std::io;
use std::sync::atomic::{AtomicUsize, Ordering};
use tracing::*;

pub struct Server {
base_address: String,
base_port: u32,
pub struct AuthorityServer {
server: NetworkServer,
state: AuthorityState,
buffer_size: usize,
// Stats
packets_processed: AtomicUsize,
user_errors: AtomicUsize,
}

impl Server {
impl AuthorityServer {
pub fn new(
base_address: String,
base_port: u32,
state: AuthorityState,
buffer_size: usize,
state: AuthorityState,
) -> Self {
Self {
base_address,
base_port,
server: NetworkServer::new(base_address, base_port, buffer_size),
state,
buffer_size,
packets_processed: AtomicUsize::new(0),
user_errors: AtomicUsize::new(0),
}
}

pub fn packets_processed(&self) -> usize {
self.packets_processed.load(Ordering::Relaxed)
}

pub fn user_errors(&self) -> usize {
self.user_errors.load(Ordering::Relaxed)
}

pub async fn spawn(self) -> Result<SpawnedServer, io::Error> {
info!(
"Listening to TCP traffic on {}:{}",
self.base_address, self.base_port
self.server.base_address, self.server.base_port
);
let address = format!("{}:{}", self.base_address, self.base_port);
let buffer_size = self.buffer_size;
let state = RunningServerState { server: self };
let address = format!("{}:{}", self.server.base_address, self.server.base_port);
let buffer_size = self.server.buffer_size;

// Launch server for the appropriate protocol.
spawn_server(&address, state, buffer_size).await
spawn_server(&address, self, buffer_size).await
}
}

struct RunningServerState {
server: Server,
}

impl MessageHandler for RunningServerState {
impl MessageHandler for AuthorityServer {
fn handle_message<'a>(
&'a self,
buffer: &'a [u8],
Expand All @@ -74,7 +54,6 @@ impl MessageHandler for RunningServerState {
Ok(result) => {
match result {
SerializedMessage::Order(message) => self
.server
.state
.handle_order(*message)
.await
Expand All @@ -84,7 +63,6 @@ impl MessageHandler for RunningServerState {
certificate: message.as_ref().clone(),
};
match self
.server
.state
.handle_confirmation_order(confirmation_order)
.await
Expand All @@ -97,19 +75,16 @@ impl MessageHandler for RunningServerState {
}
}
SerializedMessage::AccountInfoReq(message) => self
.server
.state
.handle_account_info_request(*message)
.await
.map(|info| Some(serialize_account_info_response(&info))),
SerializedMessage::ObjectInfoReq(message) => self
.server
.state
.handle_object_info_request(*message)
.await
.map(|info| Some(serialize_object_info_response(&info))),
SerializedMessage::OrderInfoReq(message) => self
.server
.state
.handle_order_info_request(*message)
.await
Expand All @@ -119,9 +94,7 @@ impl MessageHandler for RunningServerState {
}
};

self.server
.packets_processed
.fetch_add(1, Ordering::Relaxed);
self.server.increment_packets_processed();

if self.server.packets_processed() % 5000 == 0 {
info!(
Expand All @@ -136,7 +109,7 @@ impl MessageHandler for RunningServerState {
Ok(x) => x,
Err(error) => {
warn!("User query failed: {}", error);
self.server.user_errors.fetch_add(1, Ordering::Relaxed);
self.server.increment_user_errors();
Some(serialize_error(&error))
}
}
Expand Down
1 change: 1 addition & 0 deletions fastpay_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@

pub mod authority;
pub mod authority_client;
pub mod authority_server;
pub mod client;
pub mod downloader;
38 changes: 38 additions & 0 deletions network_utils/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::transport::*;
use bytes::Bytes;
use fastx_types::{error::*, serialize::*};
use futures::future::FutureExt;
use std::sync::atomic::{AtomicUsize, Ordering};
use tracing::*;

use std::io;
Expand Down Expand Up @@ -156,3 +157,40 @@ impl NetworkClient {
handles
}
}

pub struct NetworkServer {
pub base_address: String,
pub base_port: u32,
pub buffer_size: usize,
// Stats
packets_processed: AtomicUsize,
user_errors: AtomicUsize,
}

impl NetworkServer {
pub fn new(base_address: String, base_port: u32, buffer_size: usize) -> Self {
Self {
base_address,
base_port,
buffer_size,
packets_processed: AtomicUsize::new(0),
user_errors: AtomicUsize::new(0),
}
}

pub fn packets_processed(&self) -> usize {
self.packets_processed.load(Ordering::Relaxed)
}

pub fn increment_packets_processed(&self) {
self.packets_processed.fetch_add(1, Ordering::Relaxed);
}

pub fn user_errors(&self) -> usize {
self.user_errors.load(Ordering::Relaxed)
}

pub fn increment_user_errors(&self) {
self.user_errors.fetch_add(1, Ordering::Relaxed);
}
}