From ba52c23c81f30021035a9de07225289a7d7869d8 Mon Sep 17 00:00:00 2001 From: "Phillip W." Date: Wed, 15 Nov 2023 11:09:50 +0100 Subject: [PATCH] feat: gossip connector --- src/cluster/connector/gossip.rs | 197 +++++++++++++++++++++--------- src/cluster/connector/messages.rs | 19 ++- src/cluster/connector/mod.rs | 61 +++++---- src/cluster/connector/tests.rs | 129 ++++++++++++++++--- src/cluster/mod.rs | 68 +++++++---- src/cluster/tests.rs | 16 ++- src/codec.rs | 2 +- src/network/mod.rs | 29 +++-- src/remote/addr/tests.rs | 11 +- src/remote/message.rs | 2 +- 10 files changed, 367 insertions(+), 167 deletions(-) diff --git a/src/cluster/connector/gossip.rs b/src/cluster/connector/gossip.rs index 3b4b86e..5be5b61 100644 --- a/src/cluster/connector/gossip.rs +++ b/src/cluster/connector/gossip.rs @@ -1,22 +1,24 @@ -use log::*; +use crate::cluster::connector::messages::{ + GossipEvent, GossipJoining, GossipMessage, NodeResolving, +}; +use crate::cluster::connector::{Connector, ConnectorVariant}; +use crate::network::NetworkInterface; +use crate::{Cluster, ConnectToNode, CustomSystemService, Node, NodeEvent, RemoteAddr}; use actix::prelude::*; +use log::*; +use rand::prelude::{IteratorRandom, ThreadRng}; use std::collections::{HashMap, HashSet}; -use crate::network::NetworkInterface; -use crate::{RemoteAddr, Cluster, CustomSystemService, ConnectToNode, NodeEvents, Node}; -use std::net::SocketAddr; use std::iter::FromIterator; +use std::net::SocketAddr; use std::str::FromStr; -use rand::prelude::{IteratorRandom, ThreadRng}; -use crate::cluster::connector::{Connector, ConnectorVariant}; -use crate::cluster::connector::messages::{GossipEvent, GossipJoining, GossipMessage, NodeResolving}; - const CONNECTOR: &str = "Connector"; +#[derive(Debug, Clone)] enum GossipState { Lonely, Joining, - Joined + Joined, } pub struct Gossip { @@ -25,7 +27,7 @@ pub struct Gossip { waiting_to_add: HashSet, state: GossipState, about_to_join: usize, - gossip_msgs: Vec + gossip_msgs: Vec, } impl Default for Gossip { @@ -36,53 +38,71 @@ impl Default for Gossip { waiting_to_add: HashSet::new(), state: GossipState::Lonely, about_to_join: 0, - gossip_msgs: vec![] + gossip_msgs: vec![], } } } impl Gossip { pub fn new(own_addr: SocketAddr) -> Self { - Self {own_addr, ..Default::default()} + Self { + own_addr, + ..Default::default() + } } fn add_member(&mut self, node: Node) { - self.members.insert(node.socket_addr.clone(), node.network_interface.expect("Empty network interface")); - debug!("Member {} added!", node.socket_addr.to_string()); + self.members.insert( + node.socket_addr.clone(), + node.network_interface.expect("Empty network interface"), + ); + debug!(target: &self.own_addr.to_string(), "Member {} added!", node.socket_addr.to_string()); } fn remove_member(&mut self, addr: SocketAddr) { self.members.remove(&addr); - debug!("Member {} removed", addr.to_string()); + debug!(target: &self.own_addr.to_string(), "Member {} removed", addr.to_string()); } fn ignite_member_up(&self, new_addr: SocketAddr) { - self.gossip_member_event(new_addr, GossipEvent::Join, HashSet::from_iter([self.own_addr.clone()])); + debug!(target: &self.own_addr.to_string(), "Igniting member up {}", new_addr.to_string()); + self.gossip_member_event( + new_addr, + GossipEvent::Join, + HashSet::from_iter([self.own_addr.clone(), new_addr.clone()]), + ); } fn ignite_member_down(&self, leaving_addr: SocketAddr) { - self.gossip_member_event(leaving_addr, GossipEvent::Leave, HashSet::from_iter([self.own_addr.clone()])); + debug!(target: &self.own_addr.to_string(), "Igniting member down {}", leaving_addr.to_string()); + self.gossip_member_event( + leaving_addr, + GossipEvent::Leave, + HashSet::from_iter([self.own_addr.clone()]), + ); } fn gossip_member_event(&self, addr: SocketAddr, event: GossipEvent, seen: HashSet) { - let random_members = self.choose_random_members(3); + debug!(target: &self.own_addr.to_string(), "Gossiping member event {} {:?} {:?}", addr.to_string(), event, seen); + let random_members = self.choose_random_members(3, addr.clone()); - let gossip_message = GossipMessage { - event, - addr, - seen - }; + let gossip_message = GossipMessage { event, addr, seen }; for member in random_members { member.do_send(gossip_message.clone()) } } - fn choose_random_members(&self, amount: usize) -> Vec { + fn choose_random_members(&self, amount: usize, except: SocketAddr) -> Vec { let mut rng = ThreadRng::default(); - self.members.iter() - .choose_multiple(&mut rng, amount).into_iter() - .map(|(socket_addr, network_interface)| RemoteAddr::new_connector(socket_addr.clone(), Some(network_interface.clone()))) + self.members + .iter() + .filter(|(addr, _)| !(*addr).eq(&except)) + .choose_multiple(&mut rng, amount) + .into_iter() + .map(|(socket_addr, network_interface)| { + RemoteAddr::new_connector(socket_addr.clone(), Some(network_interface.clone())) + }) .collect() } @@ -93,10 +113,33 @@ impl Gossip { fn all_seen(&self, seen: &HashSet) -> bool { let members: HashSet = self.members.keys().cloned().collect(); - members.difference(seen).into_iter().collect::>().is_empty() + members + .difference(seen) + .into_iter() + .collect::>() + .is_empty() + } + + fn handle_gossip_queue(&mut self) { + for _ in 0..self.gossip_msgs.len() { + if let Some(gossip_msg) = self.gossip_msgs.pop() { + self.handle_gossip_message(gossip_msg); + } + } } pub(crate) fn handle_gossip_message(&mut self, msg: GossipMessage) { + match &self.state { + GossipState::Lonely => { + error!(target: &self.own_addr.to_string(), "Received a GossipMessage while in LONELY state!") + } + GossipState::Joining => { + self.gossip_msgs.push(msg); + return; + } + GossipState::Joined => (), + } + let all_seen = self.all_seen(&msg.seen); let mut seen = msg.seen; let member_contains = self.members.contains_key(&msg.addr); @@ -104,17 +147,17 @@ impl Gossip { match &msg.event { GossipEvent::Join => { if member_contains & all_seen { - return + return; } if !member_contains { seen.insert(self.own_addr); self.connect_to_node(&msg.addr); } - }, + } GossipEvent::Leave => { if !member_contains & all_seen { - return + return; } if member_contains { @@ -128,60 +171,96 @@ impl Gossip { } pub(crate) fn handle_gossip_joining(&mut self, msg: GossipJoining) { - self.about_to_join = msg.about_to_join; - if self.about_to_join == self.members.len() { - self.state = GossipState::Joined; + match self.state { + GossipState::Joining => { + self.about_to_join = msg.about_to_join; + if self.about_to_join == self.members.len() { + self.change_state(GossipState::Joined); + } + } + _ => { + warn!(target: &self.own_addr.to_string(), "Received a GossipJoining message while not in JOINING state!") + } } } -} + fn change_state(&mut self, state: GossipState) { + debug!(target: &self.own_addr.to_string(), "changing state to {:?}", state); + self.state = state.clone(); + match state { + GossipState::Joining => (), + GossipState::Joined => { + self.handle_gossip_queue(); + } + GossipState::Lonely => (), + } + } +} impl ConnectorVariant for Gossip { - fn handle_node_event(&mut self, msg: NodeEvents, ctx: &mut Context) { + fn handle_node_event(&mut self, msg: NodeEvent, _ctx: &mut Context) { match msg { - NodeEvents::MemberUp(node, seed) => { + NodeEvent::MemberUp(node, seed) => { self.add_member(node.clone()); if !self.waiting_to_add.remove(&node.socket_addr) { match &self.state { GossipState::Lonely => { - if seed { // if connecting node is seed, we are joining - self.state = GossipState::Joining; - } else { // else we are the seed and therefore are already joined - self.state = GossipState::Joined; + if seed { + // if current node is considered seed, we are the seed and therefore are already joined + self.change_state(GossipState::Joined); + + let remote_addr = node.get_remote_addr(CONNECTOR.to_string()); + remote_addr.do_send(GossipJoining { + about_to_join: self.members.len(), + }); + self.ignite_member_up(node.socket_addr); + } else { + // else we are not the seed and therefore need to join + self.change_state(GossipState::Joining); } - }, + } GossipState::Joining => { if self.members.len() == self.about_to_join { - self.state = GossipState::Joined; - for _ in 0..self.gossip_msgs.len() { - if let Some(gossip_msg) = self.gossip_msgs.pop() { - ctx.address().do_send(gossip_msg); - } - } + self.change_state(GossipState::Joined); } - }, + } GossipState::Joined => { let remote_addr = node.get_remote_addr(CONNECTOR.to_string()); - remote_addr.do_send(GossipJoining { about_to_join: self.members.len() }); + remote_addr.do_send(GossipJoining { + about_to_join: self.members.len(), + }); self.ignite_member_up(node.socket_addr); } } } } - NodeEvents::MemberDown(host) => { + NodeEvent::MemberDown(host) => { self.remove_member(host.clone()); self.ignite_member_down(host); } } } - fn handle_node_resolving(&mut self, msg: NodeResolving, _ctx: &mut Context) -> Result>, ()> { - Ok(msg.addrs.into_iter().filter_map(|x| { - if x.clone() == self.own_addr { - None - } else { - Some(self.members.get(&x).expect(&format!("Socket {} should be known!", &x)).clone()) - } - }).collect()) + fn handle_node_resolving( + &mut self, + msg: NodeResolving, + _ctx: &mut Context, + ) -> Result>, ()> { + Ok(msg + .addrs + .into_iter() + .filter_map(|x| { + if x.clone() == self.own_addr { + None + } else { + Some( + self.members + .get(&x) + .expect(&format!("Socket {} should be known!", &x)) + .clone(), + ) + } + }) + .collect()) } } diff --git a/src/cluster/connector/messages.rs b/src/cluster/connector/messages.rs index c379ba5..768060c 100644 --- a/src/cluster/connector/messages.rs +++ b/src/cluster/connector/messages.rs @@ -1,31 +1,30 @@ +use crate::NetworkInterface; +use crate::{DefaultSerialization, RemoteMessage}; +use actix::prelude::*; +use serde::{Deserialize, Serialize}; use std::collections::HashSet; use std::net::SocketAddr; -use actix::prelude::*; -use crate::NetworkInterface; -use serde::{Serialize, Deserialize}; -use crate::{RemoteMessage, DefaultSerialization}; - #[derive(Message)] #[rtype(result = "Result>, ()>")] -pub struct NodeResolving{ - pub addrs: Vec +pub struct NodeResolving { + pub addrs: Vec, } #[derive(Serialize, Deserialize, Clone, Debug)] pub enum GossipEvent { Join, - Leave + Leave, } #[derive(RemoteMessage, Serialize, Deserialize, Debug, Clone)] pub struct GossipMessage { pub event: GossipEvent, pub addr: SocketAddr, - pub seen: HashSet + pub seen: HashSet, } #[derive(RemoteMessage, Serialize, Deserialize, Debug, Clone)] pub struct GossipJoining { - pub about_to_join: usize + pub about_to_join: usize, } diff --git a/src/cluster/connector/mod.rs b/src/cluster/connector/mod.rs index 2f035f5..14ccb44 100644 --- a/src/cluster/connector/mod.rs +++ b/src/cluster/connector/mod.rs @@ -1,10 +1,10 @@ -use std::net::SocketAddr; +pub use crate::cluster::connector::messages::NodeResolving; +use crate::cluster::connector::messages::{GossipJoining, GossipMessage}; +use crate::{CustomSerialization, RemoteActor, RemoteMessage, RemoteWrapper}; +use crate::{CustomSystemService, Gossip, NetworkInterface, NodeEvent}; use actix::prelude::*; use log::*; -use crate::{CustomSystemService, Gossip, NetworkInterface, NodeEvents}; -pub use crate::cluster::connector::messages::NodeResolving; -use crate::cluster::connector::messages::{GossipMessage, GossipJoining}; -use crate::{RemoteActor, RemoteWrapper, RemoteMessage, CustomSerialization}; +use std::net::SocketAddr; pub mod gossip; mod messages; @@ -15,7 +15,7 @@ mod tests; #[derive(Debug, Clone, Copy)] pub enum ConnectionProtocol { SingleSeed, - Gossip + Gossip, } impl Default for ConnectionProtocol { @@ -24,28 +24,30 @@ impl Default for ConnectionProtocol { } } - #[derive(RemoteActor)] #[remote_messages(GossipMessage, GossipJoining)] pub enum Connector { - Gossip(Gossip) + Gossip(Gossip), } - impl Connector { - pub fn from_connection_protocol(connection_protocol: ConnectionProtocol, own_address: SocketAddr) -> Self { + pub fn from_connection_protocol( + connection_protocol: ConnectionProtocol, + own_address: SocketAddr, + ) -> Self { match connection_protocol { ConnectionProtocol::Gossip => Self::Gossip(Gossip::new(own_address)), - ConnectionProtocol::SingleSeed => todo!("This must still be implemented.") + ConnectionProtocol::SingleSeed => todo!("This must still be implemented."), } } pub fn start_service_from(connection_protocol: ConnectionProtocol, own_address: SocketAddr) { - Self::start_service_with(move || { Connector::from_connection_protocol(connection_protocol, own_address) }); + Self::start_service_with(move || { + Connector::from_connection_protocol(connection_protocol, own_address) + }); } } - impl Actor for Connector { type Context = Context; @@ -61,7 +63,6 @@ impl Default for Connector { } } - impl Supervised for Connector {} impl SystemService for Connector {} impl CustomSystemService for Connector { @@ -71,17 +72,20 @@ impl CustomSystemService for Connector { } pub trait ConnectorVariant { - fn handle_node_event(&mut self, msg: NodeEvents, ctx: &mut Context); - fn handle_node_resolving(&mut self, msg: NodeResolving, ctx: &mut Context) -> Result>, ()>; + fn handle_node_event(&mut self, msg: NodeEvent, ctx: &mut Context); + fn handle_node_resolving( + &mut self, + msg: NodeResolving, + ctx: &mut Context, + ) -> Result>, ()>; } - -impl Handler for Connector { +impl Handler for Connector { type Result = (); - fn handle(&mut self, msg: NodeEvents, ctx: &mut Self::Context) -> Self::Result { + fn handle(&mut self, msg: NodeEvent, ctx: &mut Self::Context) -> Self::Result { match self { - Connector::Gossip(gossip) => gossip.handle_node_event(msg, ctx) + Connector::Gossip(gossip) => gossip.handle_node_event(msg, ctx), } } } @@ -91,12 +95,11 @@ impl Handler for Connector { fn handle(&mut self, msg: NodeResolving, ctx: &mut Context) -> Self::Result { match self { - Connector::Gossip(gossip) => gossip.handle_node_resolving(msg, ctx) + Connector::Gossip(gossip) => gossip.handle_node_resolving(msg, ctx), } } } - // --- Gossip impl --- impl Handler for Connector { @@ -104,10 +107,8 @@ impl Handler for Connector { fn handle(&mut self, msg: GossipMessage, _ctx: &mut Self::Context) -> Self::Result { match self { - Connector::Gossip(gossip) => { - gossip.handle_gossip_message(msg) - }, - _ => warn!("Connector can only handle GossipMessage if it is Connector::Gossip") + Connector::Gossip(gossip) => gossip.handle_gossip_message(msg), + _ => warn!("Connector can only handle GossipMessage if it is Connector::Gossip"), } } } @@ -117,10 +118,8 @@ impl Handler for Connector { fn handle(&mut self, msg: GossipJoining, _ctx: &mut Self::Context) -> Self::Result { match self { - Connector::Gossip(gossip) => { - gossip.handle_gossip_joining(msg) - }, - _ => warn!("Connector can only handle GossipJoining if it is Connector::Gossip") + Connector::Gossip(gossip) => gossip.handle_gossip_joining(msg), + _ => warn!("Connector can only handle GossipJoining if it is Connector::Gossip"), } } -} \ No newline at end of file +} diff --git a/src/cluster/connector/tests.rs b/src/cluster/connector/tests.rs index c791037..79bec2f 100644 --- a/src/cluster/connector/tests.rs +++ b/src/cluster/connector/tests.rs @@ -1,12 +1,12 @@ use std::net::SocketAddr; -use actix::{Actor, WrapFuture}; -use futures::TryFutureExt; +use actix::Actor; use port_scanner::request_open_port; -use rayon::iter::{ParallelIterator, IntoParallelIterator}; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; use crate::{Cluster, Connector, CustomSystemService, NodeResolving}; +const FAILED_TO_RESOLVE_NODES: &str = "Failed to resolve nodes"; struct TestActor {} @@ -14,10 +14,9 @@ impl Actor for TestActor { type Context = actix::Context; } - -#[actix_rt::test] +#[test] #[ignore] -async fn test_gossip_connector() { +fn test_gossip_connector_size_2() { let local_ip: SocketAddr = format!("127.0.0.1:{}", request_open_port().unwrap_or(8000)) .parse() .unwrap(); @@ -25,14 +24,116 @@ async fn test_gossip_connector() { .parse() .unwrap(); - let nodes = vec![(local_ip.clone(), other_ip.clone()), (other_ip.clone(), local_ip.clone())]; - nodes.into_par_iter().for_each(|(own_ip, other_ip)| build_gossip_cluster(own_ip, vec![other_ip])); + let variables = vec![ + ( + local_ip.clone(), + vec![other_ip.clone()], + vec![other_ip.clone()], + ), + (other_ip.clone(), vec![], vec![local_ip.clone()]), + ]; + + let results: Vec> = variables + .into_par_iter() + .map(|(own_ip, seed_nodes, other_ips)| build_gossip_cluster(own_ip, seed_nodes, other_ips)) + .collect(); + + for result in results { + assert_eq!(result.unwrap(), 1); + } +} + +#[test] +#[ignore] +fn test_gossip_connector_size_3_one_seed() { + let ip_0: SocketAddr = format!("127.0.0.1:{}", request_open_port().unwrap_or(8000)) + .parse() + .unwrap(); + let ip_1: SocketAddr = format!("127.0.0.1:{}", request_open_port().unwrap_or(8000)) + .parse() + .unwrap(); + let ip_2: SocketAddr = format!("127.0.0.1:{}", request_open_port().unwrap_or(8000)) + .parse() + .unwrap(); + + let variables = vec![ + ( + ip_0.clone(), + vec![ip_1.clone()], + vec![ip_1.clone(), ip_2.clone()], + ), + (ip_1.clone(), vec![], vec![ip_0.clone(), ip_2.clone()]), + ( + ip_2.clone(), + vec![ip_1.clone()], + vec![ip_0.clone(), ip_1.clone()], + ), + ]; + + let results: Vec> = variables + .into_par_iter() + .map(|(own_ip, seed_nodes, other_ips)| build_gossip_cluster(own_ip, seed_nodes, other_ips)) + .collect(); + + for result in results { + assert_eq!(result.unwrap(), 2); + } +} + +#[test] +#[ignore] +fn test_gossip_connector_size_3_two_seeds() { + let ip_0: SocketAddr = format!("127.0.0.1:{}", request_open_port().unwrap_or(8000)) + .parse() + .unwrap(); + let ip_1: SocketAddr = format!("127.0.0.1:{}", request_open_port().unwrap_or(8000)) + .parse() + .unwrap(); + let ip_2: SocketAddr = format!("127.0.0.1:{}", request_open_port().unwrap_or(8000)) + .parse() + .unwrap(); + + let variables = vec![ + ( + ip_0.clone(), + vec![ip_1.clone()], + vec![ip_1.clone(), ip_2.clone()], + ), + (ip_1.clone(), vec![], vec![ip_0.clone(), ip_2.clone()]), + ( + ip_2.clone(), + vec![ip_0.clone()], + vec![ip_0.clone(), ip_1.clone()], + ), + ]; + + let results: Vec> = variables + .into_par_iter() + .map(|(own_ip, seed_nodes, other_ips)| build_gossip_cluster(own_ip, seed_nodes, other_ips)) + .collect(); + + for result in results { + assert_eq!(result.unwrap(), 2); + } } -async fn build_gossip_cluster(local_ip: SocketAddr, seed_nodes: Vec) { - let other_ip = seed_nodes.clone().into_iter().next().unwrap(); - let _cluster = Cluster::new_with_connection_protocol(local_ip, seed_nodes.clone(), crate::ConnectionProtocol::Gossip); +#[actix_rt::main] +async fn build_gossip_cluster( + local_ip: SocketAddr, + seed_nodes: Vec, + other_ips: Vec, +) -> Result { + let _cluster = Cluster::new_with_connection_protocol( + local_ip, + seed_nodes.clone(), + crate::ConnectionProtocol::Gossip, + ); + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; let connector = Connector::from_custom_registry(); - let addrs = connector.send(NodeResolving { addrs: seed_nodes}).await.unwrap().unwrap(); - addrs.contains(&other_ip); -} \ No newline at end of file + let addrs = connector + .send(NodeResolving { addrs: other_ips }) + .await + .expect(FAILED_TO_RESOLVE_NODES) + .expect(FAILED_TO_RESOLVE_NODES); + Ok(addrs.len()) +} diff --git a/src/cluster/mod.rs b/src/cluster/mod.rs index f60033b..3658f3d 100644 --- a/src/cluster/mod.rs +++ b/src/cluster/mod.rs @@ -1,29 +1,30 @@ +mod connector; mod listener; #[cfg(test)] mod tests; -mod connector; +pub use self::listener::{ClusterListener, ClusterLog}; pub use connector::gossip::Gossip; pub use connector::NodeResolving; -pub use self::listener::{ClusterListener, ClusterLog}; +pub use crate::cluster::connector::ConnectionProtocol; +pub use crate::cluster::connector::Connector; use crate::network::NetworkInterface; use crate::remote::Node; use crate::CustomSystemService; use actix::prelude::*; -use std::net; -use std::io::Result as IoResult; -use tokio::net::{TcpListener, TcpStream}; -use std::collections::HashMap; use actix_broker::BrokerIssue; use futures::executor::block_on; use futures::StreamExt; use log::*; +use std::collections::HashMap; +use std::fmt::Display; +use std::io::Result as IoResult; +use std::net; use std::net::SocketAddr; use std::str::FromStr; +use tokio::net::{TcpListener, TcpStream}; use tokio_stream::wrappers::TcpListenerStream; -pub use crate::cluster::connector::ConnectionProtocol; -pub use crate::cluster::connector::Connector; #[derive(MessageResponse)] pub enum ConnectionApprovalResponse { @@ -42,25 +43,38 @@ pub struct ConnectionApproval { #[rtype(result = "()")] pub struct TcpConnect(pub TcpStream, pub SocketAddr); -#[derive(Message)] +#[derive(Message, Debug)] #[rtype(result = "()")] -pub enum NodeEvents { +pub enum NodeEvent { /// (Node, and whether it is a seed node) MemberUp(Node, bool), MemberDown(SocketAddr), } +impl Display for NodeEvent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + NodeEvent::MemberUp(node, seed) => write!( + f, + "MemberUp: {} (seed: {})", + node.socket_addr.to_string(), + seed + ), + NodeEvent::MemberDown(addr) => write!(f, "MemberDown: {}", addr.to_string()), + } + } +} + #[derive(Message)] #[rtype(result = "()")] pub struct ConnectToNode(pub(crate) SocketAddr); - /// Central Actor for cluster handling pub struct Cluster { ip_address: SocketAddr, addrs: Vec, own_addr: Option>, - nodes: HashMap> + nodes: HashMap>, } impl Actor for Cluster { @@ -92,18 +106,20 @@ impl Cluster { Self::new_with_connection_protocol(ip_address, seed_nodes, ConnectionProtocol::SingleSeed) } - pub fn new_with_connection_protocol(ip_address: SocketAddr, seed_nodes: Vec, connection_protocol: ConnectionProtocol) -> Addr { + pub fn new_with_connection_protocol( + ip_address: SocketAddr, + seed_nodes: Vec, + connection_protocol: ConnectionProtocol, + ) -> Addr { debug!("Cluster created"); Connector::start_service_from(connection_protocol, ip_address.clone()); - Cluster::start_service_with(move || - Cluster { - ip_address: ip_address.clone(), - addrs: seed_nodes.clone(), - own_addr: None, - nodes: Default::default() - } - ) + Cluster::start_service_with(move || Cluster { + ip_address: ip_address.clone(), + addrs: seed_nodes.clone(), + own_addr: None, + nodes: Default::default(), + }) } fn bind(addr: String) -> IoResult> { @@ -167,15 +183,15 @@ impl Handler for Cluster { } } -impl Handler for Cluster { +impl Handler for Cluster { type Result = (); - fn handle(&mut self, msg: NodeEvents, _ctx: &mut Self::Context) -> Self::Result { + fn handle(&mut self, msg: NodeEvent, _ctx: &mut Self::Context) -> Self::Result { match &msg { - NodeEvents::MemberUp(node, _seed) => { + NodeEvent::MemberUp(node, _seed) => { self.issue_system_async(ClusterLog::NewMember(node.clone())); - }, - NodeEvents::MemberDown(host) => { + } + NodeEvent::MemberDown(host) => { self.issue_system_async(ClusterLog::MemberLeft(host.clone())); self.nodes.remove(host); } diff --git a/src/cluster/tests.rs b/src/cluster/tests.rs index 2b8011e..5e30bda 100644 --- a/src/cluster/tests.rs +++ b/src/cluster/tests.rs @@ -1,7 +1,7 @@ use crate::test_utils::cluster_listener::TestClusterListener; use crate::{ - Cluster, ClusterListener, ClusterLog, CustomSystemService, NetworkInterface, - NodeResolving, Connector, + Cluster, ClusterListener, ClusterLog, Connector, CustomSystemService, NetworkInterface, + NodeResolving, }; use actix::prelude::*; use actix_broker::BrokerSubscribe; @@ -136,7 +136,11 @@ async fn gossip_adds_member_and_resolves_it() { let other_ip: SocketAddr = format!("127.0.0.1:{}", request_open_port().unwrap_or(8000)) .parse() .unwrap(); - let _cluster = Cluster::new_with_connection_protocol(local_ip.clone(), vec![], crate::ConnectionProtocol::Gossip); + let _cluster = Cluster::new_with_connection_protocol( + local_ip.clone(), + vec![], + crate::ConnectionProtocol::Gossip, + ); let addrs = Arc::new(Mutex::new(vec![])); let _own_listener = OwnListenerAskingGossip { asking: other_ip.clone(), @@ -156,7 +160,11 @@ async fn gossip_removes_member() { let other_ip: SocketAddr = format!("127.0.0.1:{}", request_open_port().unwrap_or(8000)) .parse() .unwrap(); - let _cluster = Cluster::new_with_connection_protocol(local_ip.clone(), vec![], crate::ConnectionProtocol::Gossip); + let _cluster = Cluster::new_with_connection_protocol( + local_ip.clone(), + vec![], + crate::ConnectionProtocol::Gossip, + ); let addrs = Arc::new(Mutex::new(vec![])); let _own_listener = OwnListenerAskingGossip { asking: other_ip.clone(), diff --git a/src/codec.rs b/src/codec.rs index 0c5836a..5fe92a2 100644 --- a/src/codec.rs +++ b/src/codec.rs @@ -10,7 +10,7 @@ use tokio_util::codec::{Decoder, Encoder}; const PREFIX: &[u8] = b"ACTIX/1.0\r\n"; const ENDIAN_LENGTH: usize = 4; -#[derive(Message, Deserialize, Serialize)] +#[derive(Message, Deserialize, Serialize, Debug)] #[rtype(result = "()")] pub enum ClusterMessage { /// bool = is_seed? diff --git a/src/network/mod.rs b/src/network/mod.rs index 569a1f3..bc338fc 100644 --- a/src/network/mod.rs +++ b/src/network/mod.rs @@ -7,16 +7,16 @@ use std::io::Error; use std::net::SocketAddr; use tokio::net::TcpStream; -use crate::cluster::{Cluster, NodeEvents}; +use crate::cluster::{Cluster, NodeEvent}; use crate::codec::{ClusterMessage, ConnectCodec}; -use crate::remote::{RemoteWrapper, AddrRepresentation, AddrResolver}; -use actix::io::WriteHandler; -use std::thread::sleep; -use std::fmt; -use crate::{ConnectionApproval, ConnectionApprovalResponse, CustomSystemService, Connector}; use crate::network::resolver::{Connect, Resolver}; use crate::network::writer::Writer; +use crate::remote::{AddrRepresentation, AddrResolver, RemoteWrapper}; use crate::Node; +use crate::{ConnectionApproval, ConnectionApprovalResponse, Connector, CustomSystemService}; +use actix::io::WriteHandler; +use std::fmt; +use std::thread::sleep; use tokio::time::Duration; use tokio_util::codec::FramedRead; @@ -35,7 +35,7 @@ impl Actor for NetworkInterface { type Context = Context; fn started(&mut self, ctx: &mut Context) { - debug!("NetworkInterface started! {}", self.addr); + debug!(target: &self.own_ip.to_string(), "NetworkInterface started! {}", self.addr); self.own_addr = Some(ctx.address()); self.counter = 0; if self.stream.is_empty() { @@ -51,7 +51,7 @@ impl Actor for NetworkInterface { self.connect_to_stream(ctx); return Running::Continue; } - Cluster::from_custom_registry().do_send(NodeEvents::MemberDown(self.addr)); + Cluster::from_custom_registry().do_send(NodeEvent::MemberDown(self.addr)); Running::Stop } @@ -128,14 +128,14 @@ impl NetworkInterface { .wait(ctx); } - fn finish_connecting(&mut self) { + fn finish_connecting(&mut self, self_is_seed: bool) { self.connected = true; match self.own_addr.clone() { Some(addr) => { - debug!("finish connecting to {}", self.addr.to_string()); + debug!(target: &self.own_ip.to_string(), "finish connecting to {}", self.addr.to_string()); let node = Node::new(self.addr, Some(addr)); - Cluster::from_custom_registry().do_send(NodeEvents::MemberUp(node, self.seed)); + Cluster::from_custom_registry().do_send(NodeEvent::MemberUp(node, self_is_seed)); } None => error!("NetworkInterface might not have been started already!"), }; @@ -160,17 +160,16 @@ impl NetworkInterface { let send_addr = self.addr; self.addr.set_port(port); let addr = self.addr; - self.seed = seed; Cluster::from_custom_registry() .send(ConnectionApproval { addr, send_addr }) .into_actor(self) - .map(|res, act, ctx| { + .map(move |res, act, ctx| { if let Ok(message_response) = res { match message_response { ConnectionApprovalResponse::Approved => { act.transmit_message(ClusterMessage::Response); - act.finish_connecting() + act.finish_connecting(seed) } ConnectionApprovalResponse::Declined => { act.transmit_message(ClusterMessage::Decline); @@ -190,7 +189,7 @@ impl StreamHandler> for NetworkInterface { ClusterMessage::Request(reply_port, seed) => { self.set_reply_port(reply_port, ctx, seed) } - ClusterMessage::Response => self.finish_connecting(), + ClusterMessage::Response => self.finish_connecting(false), ClusterMessage::Message(remote_message) => self.received_message(remote_message), ClusterMessage::Decline => ctx.stop(), }, diff --git a/src/remote/addr/tests.rs b/src/remote/addr/tests.rs index 9dff372..f993819 100644 --- a/src/remote/addr/tests.rs +++ b/src/remote/addr/tests.rs @@ -40,11 +40,11 @@ impl Handler for TestActor { .map(|res, act, _ctx| match res { Ok(res) => match res { Ok(addr_res) => match addr_res { - AddrResponse::ResolveRec(identifer) => { - match act { - TestActor::Test(identifiers) => identifiers.lock().unwrap().push(identifer) + AddrResponse::ResolveRec(identifer) => match act { + TestActor::Test(identifiers) => { + identifiers.lock().unwrap().push(identifer) } - } + }, _ => panic!("Wrong Response returned!"), }, Err(_) => panic!("Couldn't resolve Addr!"), @@ -59,8 +59,7 @@ impl Handler for TestActor { async fn addr_resolver_registers_and_resolves_addr() { let identifier = "testActor".to_string(); let identifiers = Arc::new(Mutex::new(vec![])); - let ta = TestActor::Test(identifiers.clone()) - .start(); + let ta = TestActor::Test(identifiers.clone()).start(); AddrResolver::from_registry().do_send(AddrRequest::Register( ta.clone().recipient(), identifier.clone(), diff --git a/src/remote/message.rs b/src/remote/message.rs index 9a9c787..98c5dd4 100644 --- a/src/remote/message.rs +++ b/src/remote/message.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; use uuid::Uuid; /// Wrapper for messages to be sent to remote actor -#[derive(Message, Serialize, Deserialize)] +#[derive(Message, Serialize, Deserialize, Debug)] #[rtype(result = "()")] pub struct RemoteWrapper { pub destination: RemoteAddr,