Skip to content

Commit

Permalink
remove alert backup (#362)
Browse files Browse the repository at this point in the history
  • Loading branch information
woocash2 authored Oct 27, 2023
1 parent 573d303 commit 8ec90d3
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 317 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ More details are available [in the book][reference-link-implementation-details].
- Import AlephBFT in your crate
```toml
[dependencies]
aleph-bft = "^0.31"
aleph-bft = "^0.32"
```
- The main entry point is the `run_session` function, which returns a Future that runs the
consensus algorithm.
Expand Down
2 changes: 1 addition & 1 deletion consensus/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "aleph-bft"
version = "0.31.0"
version = "0.32.0"
edition = "2021"
authors = ["Cardinal Cryptography"]
categories = ["algorithms", "data-structures", "cryptography", "database"]
Expand Down
112 changes: 14 additions & 98 deletions consensus/src/alerts/service.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use crate::{
alerts::{
handler::{Handler, OnNetworkAlertResponse, OnOwnAlertResponse, RmcResponse},
Alert, AlertData, AlertMessage, ForkingNotification, NetworkMessage,
handler::{Handler, RmcResponse},
Alert, AlertMessage, ForkingNotification, NetworkMessage,
},
Data, Hasher, MultiKeychain, Multisigned, NodeIndex, Receiver, Recipient, Sender,
};
use aleph_bft_rmc::{DoublingDelayScheduler, Message as RmcMessage};
use aleph_bft_types::Terminator;
use futures::{FutureExt, StreamExt};
use log::{debug, error, warn};
use std::{collections::HashMap, time::Duration};
use std::time::Duration;

const LOG_TARGET: &str = "AlephBFT-alerter";
type RmcService<H, MK, S, M> =
Expand All @@ -20,11 +20,6 @@ pub struct Service<H: Hasher, D: Data, MK: MultiKeychain> {
messages_from_network: Receiver<NetworkMessage<H, D, MK>>,
notifications_for_units: Sender<ForkingNotification<H, D, MK::Signature>>,
alerts_from_units: Receiver<Alert<H, D, MK::Signature>>,
data_for_backup: Sender<AlertData<H, D, MK>>,
responses_from_backup: Receiver<AlertData<H, D, MK>>,
own_alert_responses: HashMap<H::Hash, OnOwnAlertResponse<H, D, MK>>,
network_alert_responses: HashMap<H::Hash, OnNetworkAlertResponse<H, D, MK>>,
multisigned_notifications: HashMap<H::Hash, ForkingNotification<H, D, MK::Signature>>,
node_index: NodeIndex,
exiting: bool,
handler: Handler<H, D, MK>,
Expand All @@ -36,8 +31,6 @@ pub struct IO<H: Hasher, D: Data, MK: MultiKeychain> {
pub messages_from_network: Receiver<NetworkMessage<H, D, MK>>,
pub notifications_for_units: Sender<ForkingNotification<H, D, MK::Signature>>,
pub alerts_from_units: Receiver<Alert<H, D, MK::Signature>>,
pub data_for_backup: Sender<AlertData<H, D, MK>>,
pub responses_from_backup: Receiver<AlertData<H, D, MK>>,
}

impl<H: Hasher, D: Data, MK: MultiKeychain> Service<H, D, MK> {
Expand All @@ -47,8 +40,6 @@ impl<H: Hasher, D: Data, MK: MultiKeychain> Service<H, D, MK> {
messages_from_network,
notifications_for_units,
alerts_from_units,
data_for_backup,
responses_from_backup,
} = io;

let node_index = keychain.index();
Expand All @@ -63,11 +54,6 @@ impl<H: Hasher, D: Data, MK: MultiKeychain> Service<H, D, MK> {
messages_from_network,
notifications_for_units,
alerts_from_units,
data_for_backup,
responses_from_backup,
own_alert_responses: HashMap::new(),
network_alert_responses: HashMap::new(),
multisigned_notifications: HashMap::new(),
node_index,
exiting: false,
handler,
Expand Down Expand Up @@ -126,18 +112,12 @@ impl<H: Hasher, D: Data, MK: MultiKeychain> Service<H, D, MK> {
) {
match message {
AlertMessage::ForkAlert(alert) => match self.handler.on_network_alert(alert.clone()) {
Ok(response) => {
let alert = alert.as_signable().clone();
self.network_alert_responses.insert(alert.hash(), response);
if self
.data_for_backup
.unbounded_send(AlertData::NetworkAlert(alert))
.is_err()
{
error!(
target: LOG_TARGET,
"Network alert couldn't be sent to backup.",
);
Ok((maybe_notification, hash)) => {
if let Some(multisigned) = self.rmc_service.start_rmc(hash) {
self.handle_multisigned(multisigned);
}
if let Some(notification) = maybe_notification {
self.send_notification_for_units(notification);
}
}
Err(error) => debug!(target: LOG_TARGET, "{}", error),
Expand Down Expand Up @@ -168,79 +148,22 @@ impl<H: Hasher, D: Data, MK: MultiKeychain> Service<H, D, MK> {
}

fn handle_alert_from_runway(&mut self, alert: Alert<H, D, MK::Signature>) {
let response = self.handler.on_own_alert(alert.clone());
self.own_alert_responses.insert(alert.hash(), response);
if self
.data_for_backup
.unbounded_send(AlertData::OwnAlert(alert))
.is_err()
{
error!(target: LOG_TARGET, "Own alert couldn't be sent to backup.");
let (message, recipient, hash) = self.handler.on_own_alert(alert.clone());
self.send_message_for_network(message, recipient);
if let Some(multisigned) = self.rmc_service.start_rmc(hash) {
self.handle_multisigned(multisigned);
}
}

fn handle_multisigned(&mut self, multisigned: Multisigned<H::Hash, MK>) {
match self.handler.alert_confirmed(multisigned.clone()) {
Ok(notification) => {
self.multisigned_notifications
.insert(*multisigned.as_signable(), notification);
if self
.data_for_backup
.unbounded_send(AlertData::MultisignedHash(multisigned))
.is_err()
{
error!(
target: LOG_TARGET,
"Multisigned hash couldn't be sent to backup."
);
}
self.send_notification_for_units(notification);
}
Err(error) => warn!(target: LOG_TARGET, "{}", error),
}
}

fn handle_data_from_backup(&mut self, data: AlertData<H, D, MK>) {
match data {
AlertData::OwnAlert(alert) => match self.own_alert_responses.remove(&alert.hash()) {
Some((message, recipient, hash)) => {
self.send_message_for_network(message, recipient);
if let Some(multisigned) = self.rmc_service.start_rmc(hash) {
self.handle_multisigned(multisigned);
}
}
None => warn!(target: LOG_TARGET, "Alert response missing from storage."),
},
AlertData::NetworkAlert(alert) => {
match self.network_alert_responses.remove(&alert.hash()) {
Some((maybe_notification, hash)) => {
if let Some(multisigned) = self.rmc_service.start_rmc(hash) {
self.handle_multisigned(multisigned);
}
if let Some(notification) = maybe_notification {
self.send_notification_for_units(notification);
}
}
None => warn!(
target: LOG_TARGET,
"Network alert response missing from storage."
),
}
}
AlertData::MultisignedHash(multisigned) => {
match self
.multisigned_notifications
.remove(multisigned.as_signable())
{
Some(notification) => self.send_notification_for_units(notification),
None => warn!(
target: LOG_TARGET,
"Multisigned response missing from storage."
),
}
}
}
}

pub async fn run(&mut self, mut terminator: Terminator) {
loop {
futures::select! {
Expand All @@ -261,13 +184,6 @@ impl<H: Hasher, D: Data, MK: MultiKeychain> Service<H, D, MK> {
message = self.rmc_service.next_message().fuse() => {
self.rmc_message_to_network(message);
},
item = self.responses_from_backup.next() => match item {
Some(item) => self.handle_data_from_backup(item),
None => {
error!(target: LOG_TARGET, "Backup responses stream closed.");
break;
}
},
_ = terminator.get_exit().fuse() => {
debug!(target: LOG_TARGET, "Received exit signal.");
self.exiting = true;
Expand Down
Loading

0 comments on commit 8ec90d3

Please sign in to comment.