Skip to content

Commit

Permalink
Bugs:
Browse files Browse the repository at this point in the history
        Fix deadlock!: Increase buffersize socket options; small buffers on single machines might deadlock due to one binary exhausting all TCP buffers.
Performance:
        Use writev for tx sock operations
Observatiblity:
        Add SIGINFO reports for RX, compact and sanitize the status reports
        Add delayus operator for slowing connection.
        SIGUSR1 and SIGUSER2 now increment and decrement verbose levels.
Smoke tests rewritten (to pass)
        - use openssl for repeatable pseudorandom, unsalted, extra chunky data.
  • Loading branch information
agokhale committed Dec 23, 2019
1 parent 597021b commit 0e767f2
Show file tree
Hide file tree
Showing 10 changed files with 269 additions and 109 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ CFLAGS+= -g
# This LDADD knob can't possibly be intended to use this way
LDADD= -lpthread
.include <bsd.prog.mk>
.include <bsd.clang-analyze.mk>
3 changes: 3 additions & 0 deletions cfg.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ struct txconf_s {

struct rxworker_s {
int id;
char state;
u_long leg;
u_long legop;
int sockfd;
int socknum;
struct rxconf_s *rxconf_parent;
Expand Down
57 changes: 50 additions & 7 deletions plumbing.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
#include "util.h"
#include "worker.h"
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>

#ifdef CHAOS
extern unsigned long gchaos;
Expand Down Expand Up @@ -71,7 +75,7 @@ ssize_t bufferfill(int fd, u_char *__restrict dest, size_t size, int charmode) {
void stopwatch_start(struct timespec *t) {
assert(clock_gettime(CLOCK_UPTIME, t) == 0);
}
u_long stopwatch_stop(struct timespec *t, int whisper_channel) {
u_long stopwatch_stop(struct timespec *t) {
// stop the timer started at t
// returns usec resolution diff of time
int retc = 1;
Expand Down Expand Up @@ -101,6 +105,45 @@ u_long stopwatch_stop(struct timespec *t, int whisper_channel) {
u_long ret = (secondsdiff * 1000000) + (nanoes / 1000); // in usec
return ret;
}
int tcp_geterr( int sfd) {
int sockerr;
u_int sockerrsize = sizeof(sockerr); // uhg
getsockopt(sfd, SOL_SOCKET, SO_ERROR, &sockerr, &sockerrsize);
checkperror("__FUNCTION__");
return sockerr;
}
int tcp_nowait( int si) {
int val=1;
socklen_t outsiz=sizeof(val);
setsockopt( si,IPPROTO_TCP, TCP_NODELAY, &val, outsiz);
checkperror(__FUNCTION__);
return val;
}
int tcp_setbufsize( int si) {
#define MEGA_BYTES (1024*1024)
int val= 1 * MEGA_BYTES;
socklen_t vsiz=sizeof(val);
setsockopt( si,SOL_SOCKET, SO_RCVBUF, &val, vsiz); //nice to have
setsockopt( si,SOL_SOCKET, SO_SNDBUF, &val, vsiz); // absloutlely necessary or single host lo0 use will lock up viamillipede
checkperror("nowait");

tcp_geterr(si);
return val;
}
int tcp_getsockinfo1( int si,int whatsel ) {
int outval=0;
socklen_t outsiz=sizeof(outval);
getsockopt( si,SOL_SOCKET, whatsel, &outval ,&outsiz);
return outval;
}
void tcp_dump_sockfdparams ( int sfd) {
whisper( 8, "%s:%x ","RCVBUF", tcp_getsockinfo1( sfd,SO_RCVBUF));
whisper( 8, "%s:%x ","SO_SNDBUF", tcp_getsockinfo1( sfd,SO_SNDBUF));
whisper( 8, "%s:%x ","SO_SNDLOWAT", tcp_getsockinfo1( sfd,SO_SNDLOWAT));
whisper( 8, "%s:%x ","SO_RCVLOWAT", tcp_getsockinfo1( sfd,SO_RCVLOWAT));
whisper( 6, "\nsocketfd:%i\n ",sfd);
}

// return a connected socket fd
int tcp_connect(char *host, int port) {
int ret_sockfd = -1;
Expand All @@ -125,14 +168,13 @@ int tcp_connect(char *host, int port) {
whisper(1, "tx: connect failed to %s:%d fd: %i \n", host, port, ret_sockfd);
// our only output is the socketfd, so trash it
ret_sockfd = -1;
whisper(19, "trashing fd to fail %s:%d fd: %i \n", host, port, ret_sockfd);
} else {
int sockerr;
u_int sockerrsize = sizeof(sockerr); // uhg
getsockopt(ret_sockfd, SOL_SOCKET, SO_ERROR, &sockerr, &sockerrsize);
checkperror("connect");
assert(sockerr == 0);
assert ( 0 == tcp_geterr(ret_sockfd));
whisper(8, " connected to %s:%i\n", host, port);
tcp_nowait( ret_sockfd);
tcp_setbufsize( ret_sockfd);
tcp_dump_sockfdparams(ret_sockfd);
checkperror("sopt");
}
return (ret_sockfd);
}
Expand Down Expand Up @@ -169,6 +211,7 @@ int tcp_accept(struct sockaddr_in *sa, int socknum) {
whisper(13, " socket %i accepted to fd:%i \n", socknum, out_sockfd);
DTRACE_PROBE1(viamillipede, worker__connected, out_sockfd);
checkperror("acceptor");
tcp_dump_sockfdparams(out_sockfd);
return (out_sockfd);
}

Expand Down
33 changes: 32 additions & 1 deletion rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ void rxworker(struct rxworker_s *rxworker) {
char okphrase[] = "ok";
char checkphrase[] = "yoes";
buffer = calloc(1, (size_t)kfootsize);
rxworker->state='i';
#ifdef kv
assert(pthread_cond_init(&rxworker->rxconf_parent->seq_cv, NULL) == 0);
struct timespec stall_timespec;
Expand All @@ -34,6 +35,8 @@ void rxworker(struct rxworker_s *rxworker) {
whisper(7, "rxw:%02i accepting and locked\n", rxworker->id);
rxworker->sockfd = tcp_accept(&(rxworker->rxconf_parent->sa),
rxworker->rxconf_parent->socknum);
rxworker->rxconf_parent->workercount++;
rxworker->state='a';
whisper(5, "rxw:%02i accepted fd:%i \n", rxworker->id, rxworker->sockfd);
DTRACE_PROBE1(viamillipede, worker__connected, rxworker->sockfd);
pthread_mutex_unlock(&rxworker->rxconf_parent->sa_mutex);
Expand All @@ -52,6 +55,7 @@ void rxworker(struct rxworker_s *rxworker) {
if (write(rxworker->sockfd, okphrase, (size_t)2) != 2) {
exit(ENOTCONN);
}
rxworker->state='s';
checkperror("signaturewrite ");
// /usr/src/contrib/netcat has a nice pipe input routine XXX perhaps lift it
while (!rxworker->rxconf_parent->done_mbox && (!restartme)) {
Expand All @@ -69,6 +73,7 @@ void rxworker(struct rxworker_s *rxworker) {
struct iovec rxiov;
rxiov.iov_len = (sizeof(struct millipacket_s) - preamble_cursor);
rxiov.iov_base = ((u_char *)&pkt) + preamble_cursor;
rxworker->state='a';
readlen = readv(rxworker->sockfd, &rxiov, 1);
checkperror("preamble read");
whisper(
Expand Down Expand Up @@ -101,30 +106,36 @@ void rxworker(struct rxworker_s *rxworker) {
assert(pkt.preamble == preamble_cannon_ul && "preamble check");
assert(pkt.size >= 0);
assert(pkt.size <= kfootsize);
rxworker->leg=pkt.leg_id;
rxworker->legop=pkt.opcode;
whisper(9, "rxw:%02i leg:%lx siz:%lu op:%x caught new leg\n",
rxworker->id, pkt.leg_id, pkt.size, pkt.opcode);
int remainder = pkt.size;
int remainder_counter = 0;
assert(remainder <= kfootsize);
while (remainder && (!restartme)) {
rxworker->state='r';
readsize =
read(rxworker->sockfd, buffer + cursor, MIN(remainder, MAXBSIZE));
checkperror("rx: read failure\n");
if (errno != 0 || readsize <= 0) {
whisper(4, "rxw:%02i leg:%lx retired due to read len:%i errno:%i\n",
rxworker->id, pkt.leg_id, readsize, errno);
rxinfo(rxworker->rxconf_parent);
restartme = 1;
readsize = 0; // a -1 would really confuse the remainder algo
if (errno == ECONNRESET || errno == EPIPE) { // 54 connection reset
whisper(5, "rx: will retry after con reset errno: %i\n", errno);
errno = 0;
}
}
rxworker->state='R';
cursor += readsize;
assert(cursor <= kfootsize);
remainder -= readsize;
assert(remainder >= 0);
if (readsize == 0 && (!restartme)) {
rxinfo(rxworker->rxconf_parent);
whisper(2, "rx: 0 byte read ;giving up. are we done?\n"); // XXX this
// should
// not be
Expand All @@ -148,6 +159,7 @@ void rxworker(struct rxworker_s *rxworker) {
whisper(3, "prbs verification complete leg:%lx", pkt.leg_id);
} else {
whisper(1, "prbs verification failure leg:%lx", pkt.leg_id);
rxinfo(rxworker->rxconf_parent);
exit(EDOOFUS);
}
}
Expand Down Expand Up @@ -186,13 +198,15 @@ void rxworker(struct rxworker_s *rxworker) {
pthread_mutex_unlock(&rxworker->rxconf_parent->rxmutex);
if (!restartme) {

whisper(5, "rxw:%02i sequenced leg:%lx[%lx]\n", rxworker->id,
whisper(6, "rxw:%02i sequenced leg:%lx[%lx]\n", rxworker->id,
pkt.leg_id, pkt.size);

int writesize = 0;
struct iovec iov;
iov.iov_len = pkt.size;
iov.iov_base = (void *)buffer;

rxworker->state='P';
writesize = writev(rxworker->rxconf_parent->output_fd, &iov, 1);
whisper(19, "rxw: writev fd:%i siz:%ld",
rxworker->rxconf_parent->output_fd, pkt.size);
Expand Down Expand Up @@ -227,6 +241,7 @@ void rxworker(struct rxworker_s *rxworker) {
rxworker->rxconf_parent
->next_leg++; // do this last or race out the cksum code
pthread_mutex_unlock(&rxworker->rxconf_parent->rxmutex);
rxworker->state='i';
// the sleepers must awaken
pthread_cond_broadcast(&rxworker->rxconf_parent->seq_cv);
} // if not restartme
Expand All @@ -235,6 +250,7 @@ void rxworker(struct rxworker_s *rxworker) {
} // restartme
free(buffer);
whisper(4, "rxw:%02i done", rxworker->id);
rxinfo( rxworker->rxconf_parent);
}

void rxlaunchworkers(struct rxconf_s *rxconf) {
Expand Down Expand Up @@ -275,3 +291,18 @@ void rx(struct rxconf_s *rxconf) {
rxconf->done_mbox = 0;
rxlaunchworkers(rxconf);
}

void rxinfo(struct rxconf_s *rxconf) {
char winflag='O';
whisper(1, "rxconf: workers %i, sequencerleg: %lx done: %i\n", rxconf->workercount, rxconf->next_leg, rxconf->done_mbox);
//for (int i=0; i < rxconf->workercount; i++){
for (int i=0; i < kthreadmax; i++){
if ( rxconf->workers[i].leg > 0 ) {
if ( rxconf->workers[i].leg == rxconf->next_leg) winflag = '$'; else winflag='o';
whisper( 3,"{r:%02i:%c:%lx:%c}\t", i, rxconf->workers[i].state, rxconf->workers[i].leg,winflag);
if (gverbose > 10) tcp_dump_sockfdparams ( rxconf->workers[i].sockfd);
if ( i%8 == 7 ) whisper (1,"\n");
}
}
whisper(3, "\n");
}
Loading

0 comments on commit 0e767f2

Please sign in to comment.