Skip to content

Commit

Permalink
pseudorandom internal stress benchmarks
Browse files Browse the repository at this point in the history
  • Loading branch information
agokhale committed Sep 12, 2018
1 parent 886f469 commit b1d0603
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 19 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ MK_DEBUG_FILES= no
SHAREDIR= ""

PROG= viamillipede
SRCS= plumbing.c tx.c rx.c viamillipede.c dtrace_viamillipede.d terminate.c
SRCS= plumbing.c tx.c rx.c viamillipede.c dtrace_viamillipede.d terminate.c prbs.c
# This LDADD knob can't possibly be intended to use this way
LDADD= -lpthread
.include <bsd.prog.mk>
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ TCP connections are fragile and IP employs best effort delivery to preserve its
+ `rx <portnum> ` Become a reciever. Write output to stdout.
+ `tx <host> <portnum> ` Become a transmitter and add transport graph link toward an rx host. Optionally provide tx muliple times to inform us about transport alternatives. We fill tcp queues on the first entries and then proceed down the list if there is more input than link throughput. It can be helpful to provide multiple ip aliases to push work to different nic channel workers and balance traffic across LACP hash lanes. Analysis of the network resources shold inform this graph. You may use multiple physical interfaces by chosing rx host ip's that force multiple routes.
+ Read stdin and push it over the network.
+ Full duplex, rx and tx may be used simultaneously to provide a transparent pipe. Happy shell throwing!
+ Full duplex, rx and tx may be used simultaneously to provide a transparent full duplex pipe. Happy shell throwing!
+ Two disinct port numbers are requied, one rx port for each side, with the tx on the other host pointing at the rx
+ host1: ./vimaillipede rx 7788 tx host2 9900 charmode
+ host2: ./vimaillipede rx 9900 tx host1 7788 charmode
Expand All @@ -84,6 +84,7 @@ TCP connections are fragile and IP employs best effort delivery to preserve its
+ Use the preferred link. Should you saturate it, fill the next available link.
+ Provide tx multiple times to describe the transport graph.
+ Provide tx the same number of times as the thread count to precisely distribute traffic on specific links
+ Provide prbs 0x5a5a generate or verify pseudorandom bit stream for load testing
``` viamillipede \
tx host1.40g-infiniband.yoyodyne.com\
tx host1a.40g-infiniband.yoyodyne.com\
Expand Down
27 changes: 20 additions & 7 deletions rx.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "worker.h"

extern char *gcheckphrase;
extern unsigned long gprbs_seed;
unsigned long grx_saved_checksum = 0xff;

void rxworker(struct rxworker_s *rxworker) {
Expand Down Expand Up @@ -117,6 +118,15 @@ void rxworker(struct rxworker_s *rxworker) {
}
whisper(8, "\nrxw:%02i leg:%lu buffer filled to :%i\n", rxworker->id,
pkt.leg_id, cursor);

if (gprbs_seed > 0) {
if (!prbs_verify((unsigned long *)buffer, gprbs_seed + pkt.leg_id,
kfootsize)) {
whisper(1, "prbs verification failure");
exit(EDOOFUS);
}
}

checkperror("read leg");
/*block until the sequencer is ready to push this
XXXX suboptimal sequencer ?? prove it!
Expand Down Expand Up @@ -150,13 +160,16 @@ void rxworker(struct rxworker_s *rxworker) {
if (!restartme) {
whisper(5, "rxw:%02i sequenced leg:%08lu[%08lu]after %05ld stalls\n",
rxworker->id, pkt.leg_id, pkt.size, sequencer_stalls);
int writesize = 0;
struct iovec iov;
iov.iov_len = pkt.size;
iov.iov_base = (void *)buffer;
writesize = writev(rxworker->rxconf_parent->output_fd, &iov, 1);
checkperror("write buffer");
assert(writesize == pkt.size);
if (!gprbs_seed) {
//we are verifying prbs, don't output
int writesize = 0;
struct iovec iov;
iov.iov_len = pkt.size;
iov.iov_base = (void *)buffer;
writesize = writev(rxworker->rxconf_parent->output_fd, &iov, 1);
checkperror("write buffer");
assert(writesize == pkt.size);
}
DTRACE_PROBE(viamillipede, leg__rx);
if (pkt.opcode == end_of_millipede) {
whisper(5, "rxw:%02i caught 0x%x done with last frame\n",
Expand Down
11 changes: 9 additions & 2 deletions smoke.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,15 @@
echo asdf:
echo asdf | ./viamillipede rx 4545 tx localhost 4545 verbose 15 threads 3

echo push zeros around
dd if=/dev/zero bs=1g count=3 | cat - > /dev/null
nau=`date +"%s`
./viamillipede prbs 1 tx localhost 3434 verbose 3 threads 2 rx 3434 &
prbpid=$!
sudo dtrace -qn'profile-333hz /execname == "viamillipede"/ { @[ustack()]=count();}' -o /tmp/viaustac$nau.ustacks &
dtpid=$!
sleep 4
kill $dtpid
kill -INFO $prbpid
dtstackcollapse_flame.pl < /tmp/viaustac$nau.ustacks | flamegraph.pl > /net/delerium/zz/pub/viam.svg
echo tunnel ssh over an fdx viamillipede this is probably a bad idea as ssh issues tinygrams and the system shoe shines around buffer sync trouble
Expand Down
22 changes: 16 additions & 6 deletions tx.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "worker.h"
extern int gchecksums;
extern int gcharmode;
extern unsigned long gprbs_seed;
extern char *gcheckphrase;
void tx_rate_report();

Expand Down Expand Up @@ -57,12 +58,17 @@ void txingest(struct txconf_s *txconf) {
pthread_mutex_unlock(&(txconf->mutex));
int worker = dispatch_idle_worker(txconf);
assert((txconf->workers[worker].buffer) != NULL);
readsize =
bufferfill(txconf->input_fd, (u_char *)(txconf->workers[worker].buffer),
kfootsize, gcharmode);
whisper(8, "\ntxw:%02i read leg %i : fd:%i siz:%i\n", worker,
ingest_leg_counter, txconf->input_fd, kfootsize);
assert(readsize <= kfootsize);
if (gprbs_seed > 0) {
// we are generating prbs, don't do any work
readsize = kfootsize;
} else {
readsize = bufferfill(txconf->input_fd,
(u_char *)(txconf->workers[worker].buffer),
kfootsize, gcharmode);
whisper(8, "\ntxw:%02i read leg %i : fd:%i siz:%i\n", worker,
ingest_leg_counter, txconf->input_fd, kfootsize);
assert(readsize <= kfootsize);
}
if (readsize > 0) {
// find the idle worker , lock it and dispatch as separate calls --
// perhaps
Expand Down Expand Up @@ -117,6 +123,10 @@ int txpush(struct txworker_s *txworker) {
"millipacket not written ");
txworker->writeremainder = txworker->pkt.size;
assert(txworker->writeremainder <= kfootsize);
if (gprbs_seed > 0) {
prbs_gen((unsigned long *)txworker->buffer,
gprbs_seed + txworker->pkt.leg_id, kfootsize);
}
while (txworker->writeremainder && retcode) {
int minedsize = MIN(MAXBSIZE, txworker->writeremainder);
txworker->state = 'P'; // 'P'ush
Expand Down
9 changes: 9 additions & 0 deletions viamillipede.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ void usage() {
printf("transmitter: vimillipede tx 192.168.0.2 12323 tx 192.168.0.3 12323 "
"threads 3 verbose 3\n");
printf("receiver: vimillipede rx 12323 \n");
printf("prbs: vimillipede prbs 0xfeed \n");
printf("add repeatable failures: vimillipede rx 12323 chaos 180002 \n");
}

int gverbose = 0;
unsigned long gchaos = 0;
unsigned long gchaoscounter = 0;
unsigned long gprbs_seed = 0;
int gchecksums = 0;
int gcharmode = 0;
char *gcheckphrase;
Expand All @@ -22,6 +24,7 @@ int main(int argc, char **argv) {
int arg_cursor = 0;
#define MODE_TX 2
#define MODE_RX 4
#define MODE_PRBS 8
int mode = 0;
int users_input_port;
struct txconf_s txconf;
Expand Down Expand Up @@ -127,6 +130,12 @@ int main(int argc, char **argv) {
gcharmode = 1;
whisper(11, "charmode active");
}
if (strcmp(argv[arg_cursor], "prbs") == 0) {
mode |= MODE_PRBS;
arg_cursor++;
gprbs_seed = strtoul(argv[arg_cursor], NULL, 0);
whisper(11, "prbs mode seed %lu", gprbs_seed);
}
arg_cursor++;
}
checkperror("main nuiscance");
Expand Down
9 changes: 7 additions & 2 deletions worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@
#define workerh

#include "cfg.h"
int terminate ( struct txconf_s * txconf, struct rxconf_s *rxconf, struct ioconf_s *ioconf );
int initiate ( struct txconf_s * txconf, struct rxconf_s *rxconf, struct ioconf_s *ioconf );
unsigned long *prbs_gen(unsigned long *payload, unsigned long permutation,
size_t size);
int prbs_verify(unsigned long *payload, unsigned long permutation, size_t size);
int terminate(struct txconf_s *txconf, struct rxconf_s *rxconf,
struct ioconf_s *ioconf);
int initiate(struct txconf_s *txconf, struct rxconf_s *rxconf,
struct ioconf_s *ioconf);
void tx(struct txconf_s *);
int tx_poll(struct txconf_s *);
void txstatus(struct txconf_s *, int log_level);
Expand Down

0 comments on commit b1d0603

Please sign in to comment.