Skip to content

Commit

Permalink
clang-format
Browse files Browse the repository at this point in the history
  • Loading branch information
agokhale committed Nov 21, 2019
1 parent 84d3b75 commit 2d09b36
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 59 deletions.
5 changes: 3 additions & 2 deletions cfg.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
#include <sys/param.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/uio.h>
#include <unistd.h>

#include "util.h"
#include <signal.h>
Expand Down Expand Up @@ -93,7 +93,8 @@ struct rxconf_s {
int socknum; // reusable bound socket number later accepts
unsigned short port;
struct rxworker_s workers[kthreadmax];
volatile u_long next_leg; // main sequencer to monotonically order legs to stdout
volatile u_long
next_leg; // main sequencer to monotonically order legs to stdout
volatile int done_mbox;
pthread_mutex_t rxmutex;
pthread_cond_t seq_cv;
Expand Down
25 changes: 13 additions & 12 deletions plumbing.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#include "worker.h"
#include <sys/socket.h>

#ifdef CHAOS
#ifdef CHAOS
extern unsigned long gchaos;
extern unsigned long gchaoscounter;
int chaos_fail() {
Expand All @@ -14,8 +14,8 @@ int chaos_fail() {
}
return (0);
}
#elseif
#define chaos_fail err;
#elseif
#define chaos_fail err;
#endif

ssize_t bufferfill(int fd, u_char *__restrict dest, size_t size, int charmode) {
Expand Down Expand Up @@ -74,23 +74,24 @@ void stopwatch_start(struct timespec *t) {
u_long stopwatch_stop(struct timespec *t, int whisper_channel) {
// stop the timer started at t
// returns usec resolution diff of time
int retc = 1;
int retc = 1;
struct timespec stoptime;
while ( retc !=0 ) {
retc = clock_gettime(CLOCK_UPTIME, &stoptime);
// this can fail errno 4 EINTR
while (retc != 0) {
retc = clock_gettime(CLOCK_UPTIME, &stoptime);
// this can fail errno 4 EINTR
}
if (errno == EINTR) {
errno = 0;
}
if ( errno == EINTR )
{ errno =0;}
time_t secondsdiff = stoptime.tv_sec - t->tv_sec;
long nanoes = stoptime.tv_nsec - t->tv_nsec;
// microoptimization trick to avoid a branch
// microoptimization trick to avoid a branch
// borrow a billion seconds
nanoes += 1000000000;
secondsdiff--;
//rely on truncation to correct the seconds place
// rely on truncation to correct the seconds place
secondsdiff += nanoes / 1000000000;
/* simple way relies on a branch
/* simple way relies on a branch
if (nanoes < 0) {
// borrow billions place nanoseconds to come up true
nanoes += 1000000000;
Expand Down
26 changes: 15 additions & 11 deletions rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ void rxworker(struct rxworker_s *rxworker) {
rxworker->id, pkt.leg_id, readsize, errno);
restartme = 1;
readsize = 0; // a -1 would really confuse the remainder algo
if (errno == ECONNRESET || errno == EPIPE ) { // 54 connection reset
if (errno == ECONNRESET || errno == EPIPE) { // 54 connection reset
whisper(5, "rx: will retry after con reset errno: %i\n", errno);
errno = 0;
}
Expand All @@ -137,10 +137,11 @@ void rxworker(struct rxworker_s *rxworker) {
pkt.leg_id, readsize >> 10, remainder >> 10,
((remainder_counter++) % 16 == 0) ? (int)10 : (int)' ');
}
whisper(10, "\nrxw:%02x leg:%lx buffer filled to :%x wating on leg:%lx\n", rxworker->id,
pkt.leg_id, cursor, rxworker->rxconf_parent->next_leg);
whisper(
10, "\nrxw:%02x leg:%lx buffer filled to :%x wating on leg:%lx\n",
rxworker->id, pkt.leg_id, cursor, rxworker->rxconf_parent->next_leg);

if (gprbs_seed > 0 && !restartme ) {
if (gprbs_seed > 0 && !restartme) {
if (!prbs_verify((unsigned long *)buffer, gprbs_seed + pkt.leg_id,
kfootsize)) {
whisper(1, "prbs verification failure leg:%lx", pkt.leg_id);
Expand All @@ -164,22 +165,25 @@ void rxworker(struct rxworker_s *rxworker) {
better so declare an error and exit
*/
long sequencer_stalls = 0;

pthread_mutex_lock(&rxworker->rxconf_parent->rxmutex);
while (pkt.leg_id != rxworker->rxconf_parent->next_leg && (!restartme)) {
int kvret =0;
int kvret = 0;
kvret = pthread_cond_wait(&rxworker->rxconf_parent->seq_cv,
&rxworker->rxconf_parent->rxmutex);
if ( kvret != 0 ) { whisper (3, "kv error %i", kvret); assert(0);}
&rxworker->rxconf_parent->rxmutex);
if (kvret != 0) {
whisper(3, "kv error %i", kvret);
assert(0);
}
pthread_mutex_lock(
&rxworker->rxconf_parent
->rxmutex); // do nothing but compare seqeuncer under lock
}
pthread_mutex_unlock(&rxworker->rxconf_parent->rxmutex);
if (!restartme) {
whisper(5, "rxw:%02i sequenced leg:%lx[%lx]\n",
rxworker->id, pkt.leg_id, pkt.size);

whisper(5, "rxw:%02i sequenced leg:%lx[%lx]\n", rxworker->id,
pkt.leg_id, pkt.size);

int writesize = 0;
struct iovec iov;
Expand Down
18 changes: 9 additions & 9 deletions terminate.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@

int terminate(struct txconf_s *txconf, struct rxconf_s *rxconf,
struct ioconf_s *ioconf) {
/** terminate accepts a tcp connection and fixes up the ioconf structure for ingest
the tcp socket
returns the file descripter chosen
*/
/** terminate accepts a tcp connection and fixes up the ioconf structure for
ingest the tcp socket returns the file descripter chosen
*/
int retc = -6;
if (ioconf->terminate_port > 0) {
struct sockaddr_in sa;
Expand All @@ -14,15 +13,16 @@ int terminate(struct txconf_s *txconf, struct rxconf_s *rxconf,
txconf->input_fd =
tcp_accept(&sa, ioconf->terminate_socket); // XX should we block?
assert(txconf->input_fd > 2);
stopwatch_start(&(txconf->ticker)); //reset the timer to exclude waiting time
stopwatch_start(
&(txconf->ticker)); // reset the timer to exclude waiting time
retc = rxconf->output_fd = txconf->input_fd;
whisper(7, "txterm: accepted fd %d", txconf->input_fd);
} else if (ioconf->initiate_port > 0) {
// Dont clobber input_fd
retc = 1;
} else {
// Revert to pipe io
whisper (10, "setting input to STDIN");
whisper(10, "setting input to STDIN");
retc = txconf->input_fd = STDIN_FILENO;
}
return retc;
Expand All @@ -33,13 +33,13 @@ int initiate(struct txconf_s *txconf, struct rxconf_s *rxconf,
if (ioconf->initiate_port > 0) {
retc = txconf->input_fd = rxconf->output_fd =
tcp_connect(ioconf->initiate_host, ioconf->initiate_port);
whisper(6, "initiate: host:%s port:%d, fd:%d",
ioconf->initiate_host, ioconf->initiate_port, retc);
whisper(6, "initiate: host:%s port:%d, fd:%d", ioconf->initiate_host,
ioconf->initiate_port, retc);
} else if (ioconf->terminate_port > 0) {
// dont clobber output_fd;
retc = 1;
} else {
whisper (10, "initiate: setting output to STDOUT");
whisper(10, "initiate: setting output to STDOUT");
retc = rxconf->output_fd = STDOUT_FILENO;
}

Expand Down
29 changes: 16 additions & 13 deletions tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ extern char *gcheckphrase;
extern u_long gdelay_us;
void tx_rate_report();

void txshutdown( struct txconf_s *txconf, int worker, u_long leg );
void txshutdown(struct txconf_s *txconf, int worker, u_long leg);
char tx_state(struct txworker_s *txworker) {
// wrap the state with a locking primatitve
pthread_mutex_lock(&txworker->mutex);
Expand Down Expand Up @@ -86,11 +86,10 @@ void txingest(struct txconf_s *txconf) {
whisper(19, "\ntxw:%02i read leg %lx : fd:%i reqsiz:%x rsiz:%x\n", worker,
ingest_leg_counter, txconf->input_fd, kfootsize, readsize);
}
if ( (gleg_limit > 0) && (gleg_limit == ingest_leg_counter) ) {
if ((gleg_limit > 0) && (gleg_limit == ingest_leg_counter)) {
whisper(5, "txingest: leg limit reached sending shutdown");
txshutdown( txconf, worker, ingest_leg_counter);
}
else if (readsize > 0) {
txshutdown(txconf, worker, ingest_leg_counter);
} else if (readsize > 0) {
// find first idle worker , lock it and dispatch as separate calls --
// perhaps
txconf->workers[worker].pkt.size = readsize;
Expand All @@ -109,7 +108,7 @@ void txingest(struct txconf_s *txconf) {
txconf->stream_total_bytes += readsize;
} else {
whisper(5, "txingest: stdin exhausted. sending shutdown");
txshutdown( txconf, worker, ingest_leg_counter);
txshutdown(txconf, worker, ingest_leg_counter);
}
ingest_leg_counter++;
pthread_mutex_lock(&(txconf->mutex));
Expand All @@ -121,7 +120,7 @@ void txingest(struct txconf_s *txconf) {
tx_rate_report();
}

void txshutdown( struct txconf_s *txconf, int worker, u_long leg ) {
void txshutdown(struct txconf_s *txconf, int worker, u_long leg) {
pthread_mutex_lock(&(txconf->mutex));
txconf->done = 1;
txconf->input_eof = 1;
Expand Down Expand Up @@ -158,15 +157,18 @@ int txpush(struct txworker_s *txworker) {
}
#endif
if (errno != 0) {
whisper(2,"nuisance errno %i before write to socket leg:%lx", errno, txworker->pkt.leg_id);
whisper(2, "nuisance errno %i before write to socket leg:%lx", errno,
txworker->pkt.leg_id);
}
if(gdelay_us) usleep(gdelay_us);
if (gdelay_us)
usleep(gdelay_us);
writelen =
write(txworker->sockfd, ((txworker->buffer) + cursor), minedsize);
if (errno != 0) {
// indicate that this needs retried
txstatus(txworker->txconf_parent, 7);
whisper(2," errno %i after write to socket leg:%lx", errno, txworker->pkt.leg_id);
whisper(2, " errno %i after write to socket leg:%lx", errno,
txworker->pkt.leg_id);
retcode = 0;
};
checkperror(" write to socket");
Expand Down Expand Up @@ -226,8 +228,9 @@ int tx_start_net(struct txworker_s *txworker) {
if (txworker->txconf_parent->done) {
pthread_mutex_unlock(&(txworker->txconf_parent->mutex));
tx_state_set(txworker, 'f');
whisper(2, "txw:%02d ingest done reconnect not required anymore; giving "
"up thread\n",
whisper(2,
"txw:%02d ingest done reconnect not required anymore; giving "
"up thread\n",
txworker->id);
pthread_exit(0);
}
Expand Down Expand Up @@ -405,7 +408,7 @@ void txstatus(struct txconf_s *txconf, int log_level) {
whisper(log_level, "%c:%lx(%x)\t", tx_state(&txconf->workers[i]),
txconf->workers[i].pkt.leg_id,
(txconf->workers[i].writeremainder) >> 10 // kbytes are sufficient
);
);
}
whisper(log_level, "\n");
}
Expand Down
2 changes: 1 addition & 1 deletion util.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#ifndef utilh
#define utilh
#include <sys/types.h>
#include <sys/sdt.h>
#include <sys/types.h>
extern int gverbose;
ssize_t bufferfill(int fd, u_char *__restrict dest, size_t size, int charmode);
void stopwatch_start(struct timespec *t);
Expand Down
16 changes: 6 additions & 10 deletions viamillipede.c
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
#include "cfg.h"
#include "worker.h"

void verbose_plus () {
gverbose++;
}
void verbose_minus () {
gverbose --;
}
void verbose_plus() { gverbose++; }
void verbose_minus() { gverbose--; }

void usage() {
printf("viamillipede scatter gather multiplexed tcp for pipe transport "
Expand All @@ -31,7 +27,7 @@ unsigned long gdelay_us = 0; // per buffer ingest delay
int gchecksums = 0;
int gcharmode = 0;
int ginitiator_oneshot = 0;
int gleg_limit =0;
int gleg_limit = 0;
char *gcheckphrase;
struct txconf_s txconf;
struct rxconf_s rxconf;
Expand Down Expand Up @@ -68,7 +64,7 @@ int main(int argc, char **argv) {
txconf.target_port_cursor = 0;
ioconf.terminate_port = 0;
ioconf.initiate_port = 0;
errno=0; // why, really why?!!!
errno = 0; // why, really why?!!!
checkperror(" main nuiscance -0 ");
while (arg_cursor < argc) {
whisper(19, " arg: %d, %s\n", arg_cursor, argv[arg_cursor]);
Expand Down Expand Up @@ -218,8 +214,8 @@ int main(int argc, char **argv) {
rxpoll_done = 1;
usleep(333);
}
signal ( SIGUSR1, &verbose_plus );
signal ( SIGUSR2, &verbose_minus );
signal(SIGUSR1, &verbose_plus);
signal(SIGUSR2, &verbose_minus);
whisper(15, "finished normally");
exit(0);
}
2 changes: 1 addition & 1 deletion worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@ int rx_poll(struct rxconf_s *);
int tcp_connect(char *host, int port);
int tcp_recieve_prep(struct sockaddr_in *sa, int *socknum, int inport);
int tcp_accept(struct sockaddr_in *sa, int socknum);
int initiate_relay();
int initiate_relay();

#endif

0 comments on commit 2d09b36

Please sign in to comment.