Skip to content

Commit

Permalink
Code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
swimmesberger committed Mar 29, 2020
1 parent 83104e1 commit d4d0ee6
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 17 deletions.
41 changes: 26 additions & 15 deletions src/fudp/blocking_udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,42 +28,53 @@ pub fn run(config: &ForwardingConfiguration) -> std::io::Result<()> {

println!("Sending to {:?}", peers);

let recv_mutex = Arc::new(Mutex::new(0));
crossbeam::scope(|scope| {
for idx in 0..thread_count {
let mutex_clone = recv_mutex.clone();
let copy_send_sockets = send_sockets.get(idx).unwrap().clone();
println!("Starting read worker for {}", socket.local_addr().unwrap());
let _child = scope.builder().name(format!("Receive-Worker-{}", idx)).spawn(|_| {
read_worker(&socket, copy_send_sockets, &pks, mutex_clone);
}).unwrap();
}
}).unwrap();
if thread_count == 1 {
let copy_send_sockets = send_sockets.get(0).unwrap().clone();
println!("Starting read worker for {}", socket.local_addr().unwrap());
read_worker(&socket, copy_send_sockets, &pks, &mut None);
} else {
let recv_mutex = Arc::new(Mutex::new(0));
crossbeam::scope(|scope| {
for idx in 0..thread_count {
let mutex_clone = recv_mutex.clone();
let copy_send_sockets = send_sockets.get(idx).unwrap().clone();
println!("Starting read worker for {}", socket.local_addr().unwrap());
let _child = scope.builder().name(format!("Receive-Worker-{}", idx)).spawn(|_| {
let mut mutex_option = Option::from(mutex_clone);
read_worker(&socket, copy_send_sockets, &pks, &mut mutex_option);
}).unwrap();
}
}).unwrap();
}

return Ok(());
}

#[inline]
fn read_worker(read_socket: &UdpSocket, mut write_sockets: Vec<Arc<UdpSocket>>, pks: &&&mut PacketsPerSecond, recv_mutex: Arc<Mutex<i32>>) {
fn read_worker(read_socket: &UdpSocket, mut write_sockets: Vec<Arc<UdpSocket>>, pks: &&&mut PacketsPerSecond, recv_mutex: &mut Option<Arc<Mutex<i32>>>) {
// init full buffer - otherwise we can't receive anything,
let mut buf;
{
let buf_backed: Vec<u8> = vec![0; util::BUFFER_SIZE];
buf = BytesMut::from(buf_backed.as_slice());
}

let has_mutex = recv_mutex.is_some();
loop {
#[cfg(debug_assertions)]
println!();
#[cfg(debug_assertions)]
println!("### Reading data");

let read_result;
// lock this scope because when multiple threads block in recv simultaneously WSAEINPROGRESS error is thrown to prevent that we use a mutex
{
let _lock_scope = recv_mutex.lock();
if has_mutex {
// lock this scope because when multiple threads block in recv simultaneously WSAEINPROGRESS error is thrown to prevent that we use a mutex
let _lock_scope = recv_mutex.as_mut().unwrap().lock();
read_result = read_socket.recv(&mut buf);
} else {
read_result = read_socket.recv(&mut buf);
}

if read_result.is_err() {
#[cfg(debug_assertions)]
println!("Error on read {}", read_result.unwrap_err());
Expand Down
6 changes: 4 additions & 2 deletions src/fudp/blocking_udp_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ fn read_worker(socket: &UdpSocket, peers: &Vec<SocketAddr>, bus_locked: Option<&
buf = BytesMut::from(buf_backed.as_slice());
}

let has_bus_locked = bus_locked.is_some();
let has_bus_unlocked = bus_unlocked.is_some();
loop {
let read_result = socket.recv_from(buf.as_mut());
if read_result.is_err() {
Expand All @@ -82,10 +84,10 @@ fn read_worker(socket: &UdpSocket, peers: &Vec<SocketAddr>, bus_locked: Option<&

// Redeclare `buf` as slice of the received data
let read_buf: Bytes = BytesMut::from(&buf[..read_bytes]).freeze();
if bus_locked.is_some() {
if has_bus_locked {
let mut send_bus = bus_locked.unwrap().lock().unwrap();
send_bus.broadcast(ForwardingPacket::new(read_buf, peers_count));
} else if bus_unlocked.is_some() {
} else if has_bus_unlocked {
bus_unlocked.as_mut().unwrap().broadcast(ForwardingPacket::new(read_buf, peers_count));
}
pks.on_packet();
Expand Down

0 comments on commit d4d0ee6

Please sign in to comment.