Skip to content

Commit

Permalink
full duplex operation
Browse files Browse the repository at this point in the history
  • Loading branch information
agokhale committed Aug 23, 2018
1 parent a12739e commit 13d0572
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 42 deletions.
13 changes: 1 addition & 12 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,22 +1,11 @@
#all:
# cc -g -o viamillipede -lpthread plumbing.c rx.c tx.c viamillipede.c
#clean:
# rm -f viamillipede viamillipede.core
#
#test: clean all
# sh test.sh

#.include <src.opts.mk>
# XXX I'm not really suppoesd to DESTDIR but ... ???
#DESTDIR= /usr/local/
BINDIR= /usr/local/bin
MANDIR= /usr/local/man/man
MK_DEBUG_FILES= no
SHAREDIR= ""
#this is now a runtime checksums flag CFLAGS += -Dvmpd_strict

PROG= viamillipede
SRCS= plumbing.c tx.c rx.c viamillipede.c
SRCS= plumbing.c tx.c rx.c viamillipede.c dtrace_viamillipede.d
# This LDADD knob can't possibly be intended to use this way
LDADD= -lpthread
.include <bsd.prog.mk>
1 change: 1 addition & 0 deletions cfg.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ struct txconf_s {
int target_port_cursor;
struct target_port_s target_ports[kthreadmax];
pthread_mutex_t mutex;
pthread_t ingest_thread;
int done;
};

Expand Down
1 change: 1 addition & 0 deletions plumbing.c
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ int tcp_accept(struct sockaddr_in *sa, int socknum) {
whisper(17, " accept sockid: %i\n", socknum);
out_sockfd = accept(socknum, (struct sockaddr *)sa, &socklen);
whisper(13, " socket %i accepted to fd:%i \n", socknum, out_sockfd);
DTRACE_PROBE1(viamillipede, worker__connected, out_sockfd);
checkperror("acceptor");
return (out_sockfd);
}
Expand Down
17 changes: 10 additions & 7 deletions rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ void rxworker(struct rxworker_s *rxworker) {
rxworker->sockfd = tcp_accept(&(rxworker->rxconf_parent->sa),
rxworker->rxconf_parent->socknum);
whisper(5, "rxw:%02i accepted fd:%i \n", rxworker->id, rxworker->sockfd);
DTRACE_PROBE1(viamillipede, worker__connected, rxworker->sockfd);
pthread_mutex_unlock(&rxworker->rxconf_parent->sa_mutex);
whisper(16, "rxw:%02i fd:%i expect %s\n", rxworker->id, rxworker->sockfd,
gcheckphrase);
Expand Down Expand Up @@ -160,6 +161,7 @@ void rxworker(struct rxworker_s *rxworker) {
remainder -= writesize;
}
checkperror("write buffer");
DTRACE_PROBE(viamillipede, leg__rx);
readlen = readsize = -111;
if (pkt.opcode == end_of_millipede) {
whisper(5, "rxw:%02i caught 0x%x done with last frame\n",
Expand Down Expand Up @@ -216,16 +218,17 @@ void rxlaunchworkers(struct rxconf_s *rxconf) {
} while (worker_cursor < (kthreadmax));
whisper(7, "rx: worker group launched\n");
}
int rx_poll(struct rxconf_s *rxconf) {
/** return true if done */
int ret_done;
pthread_mutex_lock(&(rxconf->rxmutex));
ret_done = rxconf->done_mbox;
pthread_mutex_unlock(&(rxconf->rxmutex));
return ret_done;
}
void rx(struct rxconf_s *rxconf) {
pthread_mutex_init(&(rxconf->sa_mutex), NULL);
pthread_mutex_init(&(rxconf->rxmutex), NULL);
rxconf->done_mbox = 0;
rxlaunchworkers(rxconf);
while (!rxconf->done_mbox) {
pthread_mutex_unlock(&(rxconf->rxmutex));
usleep(10000);
// XXX join or hoist out the lame pthread_exits();
pthread_mutex_lock(&(rxconf->rxmutex));
}
whisper(4, "rx: done\n");
}
27 changes: 17 additions & 10 deletions tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ int txpush(struct txworker_s *txworker) {
txworker->pkt.size = 0; /// can't distroythis un less we are successful
txworker->state = 'i'; // idle ok
} else {
DTRACE_PROBE(viamillipede, leg__drop);
txworker->state =
'x'; // dead do not transmit more; save state and do it again
whisper(7, "rxw:%02i is dead\n", txworker->id);
Expand All @@ -153,13 +154,15 @@ int tx_tcp_connect_next(struct txconf_s *txconf) {
whisper(5, "tx: chosen target %s %d\n",
txconf->target_ports[txconf->target_port_cursor].name,
txconf->target_ports[txconf->target_port_cursor].port);
DTRACE_PROBE2(viamillipede, worker__connect,
txconf->target_ports[txconf->target_port_cursor].name,
txconf->target_ports[txconf->target_port_cursor].port);
return (tcp_connect(txconf->target_ports[chosen_target].name,
txconf->target_ports[chosen_target].port));
}

extern char *gcheckphrase;
int tx_start_net(struct txworker_s *txworker) {
/// XXXXXchar hellophrase[]="yoes";
const char okphrase[] = "ok";
int retcode;
char readback[2048];
Expand All @@ -181,7 +184,7 @@ int tx_start_net(struct txworker_s *txworker) {
pthread_mutex_unlock(&(txworker->txconf_parent->mutex));

pthread_mutex_lock(&(txworker->mutex));
usleep(100 * 1000);
usleep(300 * 1000);
whisper(5, "txw:%02ireconnecting\n", txworker->id);
txworker->sockfd = tx_tcp_connect_next(txworker->txconf_parent);
// detect a dead connection and move on to the next port in the target map
Expand Down Expand Up @@ -265,7 +268,7 @@ void txworker_sm(struct txworker_s *txworker) {
/* valid states:
E: uninitialized
f: faulted; unble to connect
a: premable; we think we are talking to a villipede server
a: preamble; we think we are talking to a villipede server
c: connecting; idle when connected ; die if we are done or cant
connected to anything
d: dispatched buffer is loaded; now send it
Expand All @@ -279,6 +282,7 @@ void txworker_sm(struct txworker_s *txworker) {
break; // idle
restartcase:
case 'd':
DTRACE_PROBE(viamillipede, leg__tx)
if (txpush(txworker) == 0) {
// retry this
whisper(3, "txw:%02i socket failed, scheduling retry leg:%lu\n",
Expand Down Expand Up @@ -343,6 +347,10 @@ void txlaunchworkers(struct txconf_s *txconf) {
// 10ms standoff to increase the likelyhood that PCBs are available on the
// rx side
}
ret = pthread_create(&(txconf->ingest_thread), NULL,
(void *)/*puppied killed */ &txingest, txconf);
checkperror("ingest thread launch");
whisper(15, "tx workers launched ");
txstatus(txconf, 5);
}

Expand All @@ -359,7 +367,7 @@ void txstatus(struct txconf_s *txconf, int log_level) {
}
whisper(log_level, "\n");
}
void txbusyspin(struct txconf_s *txconf) {
int tx_poll(struct txconf_s *txconf) {
// wait until all legs are pushed; called after ingest is complete
// if there are launche/dispatched /pushing workers; hang here
int done = 0;
Expand All @@ -383,6 +391,11 @@ void txbusyspin(struct txconf_s *txconf) {
done = (busy_workers == 0);
}
whisper(6, "\ntx: all workers idled after %i spins\n", busy_cycles);
whisper(2, "all complete for %lu(bytes) in ", txconf->stream_total_bytes);
u_long usecbusy = stopwatch_stop(&(txconf->ticker), 2);
// bytes per usec - thats interesting ~== to mBps
whisper(1, " %05.3f MBps\n", (txconf->stream_total_bytes / (1.0 * usecbusy)));
return done;
}
struct txconf_s *gtxconf;
void wat() {
Expand Down Expand Up @@ -435,10 +448,4 @@ void tx(struct txconf_s *txconf) {
init_workers(txconf);
checkperror(" nuicance tx 3");
txlaunchworkers(txconf);
txingest(txconf);
txbusyspin(txconf);
whisper(2, "all complete for %lu(bytes) in ", txconf->stream_total_bytes);
u_long usecbusy = stopwatch_stop(&(txconf->ticker), 2);
// bytes per usec - thats interesting ~== to mbps
whisper(1, " %05.3f MBps\n", (txconf->stream_total_bytes / (1.0 * usecbusy)));
}
1 change: 1 addition & 0 deletions util.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef utilh
#define utilh
#include <sys/types.h>
#include <sys/sdt.h>
extern int gverbose;
ssize_t bufferfill(int fd, u_char *__restrict dest, size_t size);
void stopwatch_start(struct timespec *t);
Expand Down
34 changes: 21 additions & 13 deletions viamillipede.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ char *gcheckphrase;
int main(int argc, char **argv) {

int arg_cursor = 0;
int mode = -1;
#define MODE_TX 2
#define MODE_RX 4
int mode = 0;
int users_input_port;
struct txconf_s txconf;
struct rxconf_s rxconf;
Expand All @@ -42,7 +44,7 @@ int main(int argc, char **argv) {
"port number should be 0-USHRT_MAX");
rxconf.port = (short)users_input_port;
whisper(3, " being a server at port %i \n\n ", rxconf.port);
mode = 0; // xxx enums
mode |= MODE_RX;
checkperror(" main nuiscance -1 ");
}
if (strcmp(argv[arg_cursor], "tx") == 0) {
Expand All @@ -67,7 +69,7 @@ int main(int argc, char **argv) {
txconf.target_ports[txconf.target_port_count].port);
txconf.target_port_count++;
checkperror(" main nuiscance port err ");
mode = 1;
mode |= MODE_TX;
}
if (strcmp(argv[arg_cursor], "threads") == 0) {
assert(++arg_cursor < argc && "threads needs <numeber> arguments");
Expand Down Expand Up @@ -104,17 +106,23 @@ int main(int argc, char **argv) {
arg_cursor++;
}
checkperror("main nuiscance");

switch (mode) {
case 1:
DTRACE_PROBE(viamillipede, init);
if (mode & MODE_RX)
rx(&rxconf); // rx must preceed
if (mode & MODE_TX)
tx(&txconf);
break;
case 0:
rx(&rxconf);
break;
default:
assert(-1 && " mode incorrect, internal error");
break;
int rxpoll_done = 0;
int txpoll_done = 0;
while (txpoll_done + rxpoll_done < 2) {
if (mode & MODE_TX)
txpoll_done = tx_poll(&txconf);
else
txpoll_done = 1;
if (mode & MODE_RX)
rxpoll_done = rx_poll(&rxconf);
else
rxpoll_done = 1;
usleep(333);
}
exit(0);
}
2 changes: 2 additions & 0 deletions worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@

#include "cfg.h"
void tx(struct txconf_s *);
int tx_poll(struct txconf_s *);
void txstatus(struct txconf_s *, int log_level);
void rx(struct rxconf_s *);
int rx_poll(struct rxconf_s *);
int tcp_connect(char *host, int port);
int tcp_recieve_prep(struct sockaddr_in *sa, int *socknum, int inport);
int tcp_accept(struct sockaddr_in *sa, int socknum);
Expand Down

0 comments on commit 13d0572

Please sign in to comment.