Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Fix cycle dispute-coordinator <-> dispute-distribution #6489

Merged
merged 24 commits into from
Jan 10, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
dyn Fn variant (no cloning)
  • Loading branch information
eskimor authored and eskimor committed Dec 30, 2022
commit b1a57259a5fd41ad0d94f14600a19726c6c92c9a
133 changes: 86 additions & 47 deletions node/subsystem-util/src/message_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,69 +68,108 @@
//! in any non blocking/CPU intensive parts, which allows us to share data via references for
//! example.
//!
//! Nothing is ever for free though: Each level adds an indirect function call to message sending.
//! which should be cheap enough for most applications. In particular we avoided the use of of
//! async traits, which would have required memory allocations on each send.
//!
//! TODO: Remove async trait requirement as this means an allocation for each message to be sent.

use futures::mpsc;
use std::{marker::PhantomData, convert::identity};

/// Send messages.
///
/// This trait is designed to allow for senders of different messages to simply forward to a sender
/// of a super type (an enum that has variants for those message types).
trait MessageSender: Send {
type Message;
use futures::mpsc;

/// Send a message.
pub async fn send_message(&mut self, msg: Self::Message);
struct RootSender {}
struct ChildSender {}
struct GrandChildSender {}

///
pub async fn box_clone(&self) -> Box<dyn MessageSender<Message = Self::Message>>;
struct MessageSender<M, M1> {
sender: mpsc::Sender<M>,
conversion: Box<dyn Fn(M1) -> M>,
}

impl<M> Clone for Box<dyn MessageSender<Message = M>> {
fn clone(&self) -> Box<dyn MessageSender<Message = M>> {
self.box_clone()
}
trait MessageConversion<P, M> {

fn convert(message: M) -> P;
fn box_clone(self: Box<Self>) -> Box<Self>;
}

struct WrappingSender<Upper, M> {
upper_sender: Box<dyn MessageSender<Message = Upper>>,
_phantom: PhantomData<M>,
struct ParentChild { }

enum ParentMessage {
Child(ChildMessage)
}

enum ChildMessage {
Huhu,
GrandChild(GrandChildMessage),

impl<Upper, M> Clone for WrappingSender<Upper, M> {
fn clone(&self) -> Self {
Self { upper_sender: self.upper_sender.clone(), _phantom: PhantomData }
}
}

/// Root implementation for an actual mpsc::Sender.
impl<M> MessageSender for mpsc::Sender<M>
where M: Debug + Send + 'static {
type Message = M;
async fn send_message(&mut self, msg: M) {
// Flushing means to wait for the receiver to pick up the data - we don't want to wait for
// that.
self.feed(msg).await
}

fn box_clone(&self) -> Box<dyn MessageSender<Message = M>> {
Box::new(self.clone())
}
enum GrandChildMessage {
Haha,
}

impl<M, Upper> MessageSender for WrappingSender<Upper, M>
where Upper: From<M>,
M: Send + 'static,
Upper: 'static
{
type Message = M;
impl MessageConversion<ParentMessage, ChildMessage> for ParentChild {

fn send_message(&mut self, msg: Self::Message) {
self.upper_sender.send_message(msg.into())
}
fn convert(message: ChildMessage) -> ParentMessage {
ParentMessage::Child(message)
}

fn box_clone(&self) -> Box<dyn MessageSender<Message = M>> {
Box::new(self.clone())
}
fn box_clone(self: Box<Self>) -> Box<Self> {
Box::new(ParentChild {})
}
}

impl<M> MessageSender<M, M>
{
pub fn new_root(root: mpsc::Sender<M>) -> Self {
Self {
sender: root,
conversion: Box::new(identity),
}
}
}

impl<M, M1> MessageSender<M, M1> {
/// M1 -> M2 -> M
/// Inputs:
/// F(M2) -> M (via parent)
/// F(M1) -> M2 (via child_conversion)
/// Result: F(M1) -> M
pub fn new<M2, F>(parent: MessageSender<M, M2>, child_conversion: F) -> Self
where
F: Fn(M1) -> M2 {
let MessageSender { sender, conversion } = parent;
Self {
sender,
conversion: Box::new(|x| conversion(child_conversion(x)))
}
}

pub async fn send_message(&mut self, m: M1) {
self.sender.feed((*self.conversion)(m)).await
}
}

// impl<M1, M, F> MessageSender<M, M1, F>
// where
// F: Fn(M1) -> M, {
//
// M1 -> M2 -> M
// Inputs:
// F(M2) -> M
// F(M1) -> M2
// Result: F(M1) -> M
//
// pub fn new_nested<M2, F2, F3>(parent: MessageSender<M, F2>, nested_conversion: F3) -> Self
// where
// F2: Fn(M2) -> M,
// F3: Fn(M1) -> M2 {
// let Self { sender, conversion } = parent;
// let full_conversion = |m| conversion(nested_conversion(m));
// Self {
// sender,
// conversion: full_conversion,
// }
// }
// }