Skip to content

Commit

Permalink
feat: gossip connector
Browse files Browse the repository at this point in the history
  • Loading branch information
Phillip W. committed Nov 15, 2023
1 parent 111a1c5 commit ba52c23
Show file tree
Hide file tree
Showing 10 changed files with 367 additions and 167 deletions.
197 changes: 138 additions & 59 deletions src/cluster/connector/gossip.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
use log::*;
use crate::cluster::connector::messages::{
GossipEvent, GossipJoining, GossipMessage, NodeResolving,
};
use crate::cluster::connector::{Connector, ConnectorVariant};
use crate::network::NetworkInterface;
use crate::{Cluster, ConnectToNode, CustomSystemService, Node, NodeEvent, RemoteAddr};
use actix::prelude::*;
use log::*;
use rand::prelude::{IteratorRandom, ThreadRng};
use std::collections::{HashMap, HashSet};
use crate::network::NetworkInterface;
use crate::{RemoteAddr, Cluster, CustomSystemService, ConnectToNode, NodeEvents, Node};
use std::net::SocketAddr;
use std::iter::FromIterator;
use std::net::SocketAddr;
use std::str::FromStr;
use rand::prelude::{IteratorRandom, ThreadRng};
use crate::cluster::connector::{Connector, ConnectorVariant};
use crate::cluster::connector::messages::{GossipEvent, GossipJoining, GossipMessage, NodeResolving};


const CONNECTOR: &str = "Connector";

#[derive(Debug, Clone)]
enum GossipState {
Lonely,
Joining,
Joined
Joined,
}

pub struct Gossip {
Expand All @@ -25,7 +27,7 @@ pub struct Gossip {
waiting_to_add: HashSet<SocketAddr>,
state: GossipState,
about_to_join: usize,
gossip_msgs: Vec<GossipMessage>
gossip_msgs: Vec<GossipMessage>,
}

impl Default for Gossip {
Expand All @@ -36,53 +38,71 @@ impl Default for Gossip {
waiting_to_add: HashSet::new(),
state: GossipState::Lonely,
about_to_join: 0,
gossip_msgs: vec![]
gossip_msgs: vec![],
}
}
}

impl Gossip {
pub fn new(own_addr: SocketAddr) -> Self {
Self {own_addr, ..Default::default()}
Self {
own_addr,
..Default::default()
}
}

fn add_member(&mut self, node: Node) {
self.members.insert(node.socket_addr.clone(), node.network_interface.expect("Empty network interface"));
debug!("Member {} added!", node.socket_addr.to_string());
self.members.insert(
node.socket_addr.clone(),
node.network_interface.expect("Empty network interface"),
);
debug!(target: &self.own_addr.to_string(), "Member {} added!", node.socket_addr.to_string());
}

fn remove_member(&mut self, addr: SocketAddr) {
self.members.remove(&addr);
debug!("Member {} removed", addr.to_string());
debug!(target: &self.own_addr.to_string(), "Member {} removed", addr.to_string());
}

fn ignite_member_up(&self, new_addr: SocketAddr) {
self.gossip_member_event(new_addr, GossipEvent::Join, HashSet::from_iter([self.own_addr.clone()]));
debug!(target: &self.own_addr.to_string(), "Igniting member up {}", new_addr.to_string());
self.gossip_member_event(
new_addr,
GossipEvent::Join,
HashSet::from_iter([self.own_addr.clone(), new_addr.clone()]),
);
}

fn ignite_member_down(&self, leaving_addr: SocketAddr) {
self.gossip_member_event(leaving_addr, GossipEvent::Leave, HashSet::from_iter([self.own_addr.clone()]));
debug!(target: &self.own_addr.to_string(), "Igniting member down {}", leaving_addr.to_string());
self.gossip_member_event(
leaving_addr,
GossipEvent::Leave,
HashSet::from_iter([self.own_addr.clone()]),
);
}

fn gossip_member_event(&self, addr: SocketAddr, event: GossipEvent, seen: HashSet<SocketAddr>) {
let random_members = self.choose_random_members(3);
debug!(target: &self.own_addr.to_string(), "Gossiping member event {} {:?} {:?}", addr.to_string(), event, seen);
let random_members = self.choose_random_members(3, addr.clone());

let gossip_message = GossipMessage {
event,
addr,
seen
};
let gossip_message = GossipMessage { event, addr, seen };

for member in random_members {
member.do_send(gossip_message.clone())
}
}

fn choose_random_members(&self, amount: usize) -> Vec<RemoteAddr> {
fn choose_random_members(&self, amount: usize, except: SocketAddr) -> Vec<RemoteAddr> {
let mut rng = ThreadRng::default();
self.members.iter()
.choose_multiple(&mut rng, amount).into_iter()
.map(|(socket_addr, network_interface)| RemoteAddr::new_connector(socket_addr.clone(), Some(network_interface.clone())))
self.members
.iter()
.filter(|(addr, _)| !(*addr).eq(&except))
.choose_multiple(&mut rng, amount)
.into_iter()
.map(|(socket_addr, network_interface)| {
RemoteAddr::new_connector(socket_addr.clone(), Some(network_interface.clone()))
})
.collect()
}

Expand All @@ -93,28 +113,51 @@ impl Gossip {

fn all_seen(&self, seen: &HashSet<SocketAddr>) -> bool {
let members: HashSet<SocketAddr> = self.members.keys().cloned().collect();
members.difference(seen).into_iter().collect::<HashSet<&SocketAddr>>().is_empty()
members
.difference(seen)
.into_iter()
.collect::<HashSet<&SocketAddr>>()
.is_empty()
}

fn handle_gossip_queue(&mut self) {
for _ in 0..self.gossip_msgs.len() {
if let Some(gossip_msg) = self.gossip_msgs.pop() {
self.handle_gossip_message(gossip_msg);
}
}
}

pub(crate) fn handle_gossip_message(&mut self, msg: GossipMessage) {
match &self.state {
GossipState::Lonely => {
error!(target: &self.own_addr.to_string(), "Received a GossipMessage while in LONELY state!")
}
GossipState::Joining => {
self.gossip_msgs.push(msg);
return;
}
GossipState::Joined => (),
}

let all_seen = self.all_seen(&msg.seen);
let mut seen = msg.seen;
let member_contains = self.members.contains_key(&msg.addr);

match &msg.event {
GossipEvent::Join => {
if member_contains & all_seen {
return
return;
}

if !member_contains {
seen.insert(self.own_addr);
self.connect_to_node(&msg.addr);
}
},
}
GossipEvent::Leave => {
if !member_contains & all_seen {
return
return;
}

if member_contains {
Expand All @@ -128,60 +171,96 @@ impl Gossip {
}

pub(crate) fn handle_gossip_joining(&mut self, msg: GossipJoining) {
self.about_to_join = msg.about_to_join;
if self.about_to_join == self.members.len() {
self.state = GossipState::Joined;
match self.state {
GossipState::Joining => {
self.about_to_join = msg.about_to_join;
if self.about_to_join == self.members.len() {
self.change_state(GossipState::Joined);
}
}
_ => {
warn!(target: &self.own_addr.to_string(), "Received a GossipJoining message while not in JOINING state!")
}
}
}
}

fn change_state(&mut self, state: GossipState) {
debug!(target: &self.own_addr.to_string(), "changing state to {:?}", state);
self.state = state.clone();
match state {
GossipState::Joining => (),
GossipState::Joined => {
self.handle_gossip_queue();
}
GossipState::Lonely => (),
}
}
}

impl ConnectorVariant for Gossip {
fn handle_node_event(&mut self, msg: NodeEvents, ctx: &mut Context<Connector>) {
fn handle_node_event(&mut self, msg: NodeEvent, _ctx: &mut Context<Connector>) {
match msg {
NodeEvents::MemberUp(node, seed) => {
NodeEvent::MemberUp(node, seed) => {
self.add_member(node.clone());
if !self.waiting_to_add.remove(&node.socket_addr) {
match &self.state {
GossipState::Lonely => {
if seed { // if connecting node is seed, we are joining
self.state = GossipState::Joining;
} else { // else we are the seed and therefore are already joined
self.state = GossipState::Joined;
if seed {
// if current node is considered seed, we are the seed and therefore are already joined
self.change_state(GossipState::Joined);

let remote_addr = node.get_remote_addr(CONNECTOR.to_string());
remote_addr.do_send(GossipJoining {
about_to_join: self.members.len(),
});
self.ignite_member_up(node.socket_addr);
} else {
// else we are not the seed and therefore need to join
self.change_state(GossipState::Joining);
}
},
}
GossipState::Joining => {
if self.members.len() == self.about_to_join {
self.state = GossipState::Joined;
for _ in 0..self.gossip_msgs.len() {
if let Some(gossip_msg) = self.gossip_msgs.pop() {
ctx.address().do_send(gossip_msg);
}
}
self.change_state(GossipState::Joined);
}
},
}
GossipState::Joined => {
let remote_addr = node.get_remote_addr(CONNECTOR.to_string());
remote_addr.do_send(GossipJoining { about_to_join: self.members.len() });
remote_addr.do_send(GossipJoining {
about_to_join: self.members.len(),
});
self.ignite_member_up(node.socket_addr);
}
}
}
}
NodeEvents::MemberDown(host) => {
NodeEvent::MemberDown(host) => {
self.remove_member(host.clone());
self.ignite_member_down(host);
}
}
}

fn handle_node_resolving(&mut self, msg: NodeResolving, _ctx: &mut Context<Connector>) -> Result<Vec<Addr<NetworkInterface>>, ()> {
Ok(msg.addrs.into_iter().filter_map(|x| {
if x.clone() == self.own_addr {
None
} else {
Some(self.members.get(&x).expect(&format!("Socket {} should be known!", &x)).clone())
}
}).collect())
fn handle_node_resolving(
&mut self,
msg: NodeResolving,
_ctx: &mut Context<Connector>,
) -> Result<Vec<Addr<NetworkInterface>>, ()> {
Ok(msg
.addrs
.into_iter()
.filter_map(|x| {
if x.clone() == self.own_addr {
None
} else {
Some(
self.members
.get(&x)
.expect(&format!("Socket {} should be known!", &x))
.clone(),
)
}
})
.collect())
}
}
19 changes: 9 additions & 10 deletions src/cluster/connector/messages.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,30 @@
use crate::NetworkInterface;
use crate::{DefaultSerialization, RemoteMessage};
use actix::prelude::*;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::net::SocketAddr;
use actix::prelude::*;
use crate::NetworkInterface;
use serde::{Serialize, Deserialize};
use crate::{RemoteMessage, DefaultSerialization};


#[derive(Message)]
#[rtype(result = "Result<Vec<Addr<NetworkInterface>>, ()>")]
pub struct NodeResolving{
pub addrs: Vec<SocketAddr>
pub struct NodeResolving {
pub addrs: Vec<SocketAddr>,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum GossipEvent {
Join,
Leave
Leave,
}

#[derive(RemoteMessage, Serialize, Deserialize, Debug, Clone)]
pub struct GossipMessage {
pub event: GossipEvent,
pub addr: SocketAddr,
pub seen: HashSet<SocketAddr>
pub seen: HashSet<SocketAddr>,
}

#[derive(RemoteMessage, Serialize, Deserialize, Debug, Clone)]
pub struct GossipJoining {
pub about_to_join: usize
pub about_to_join: usize,
}
Loading

0 comments on commit ba52c23

Please sign in to comment.