Skip to content

Commit

Permalink
initiate and terminate fdx tcp connections
Browse files Browse the repository at this point in the history
  • Loading branch information
agokhale committed Aug 31, 2018
1 parent cccc9a6 commit 117dadd
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 23 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ MK_DEBUG_FILES= no
SHAREDIR= ""

PROG= viamillipede
SRCS= plumbing.c tx.c rx.c viamillipede.c dtrace_viamillipede.d
SRCS= plumbing.c tx.c rx.c viamillipede.c dtrace_viamillipede.d terminate.c
# This LDADD knob can't possibly be intended to use this way
LDADD= -lpthread
.include <bsd.prog.mk>
22 changes: 21 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ TCP connections are fragile and IP employs best effort delivery to preserve its
+ `rx <portnum> ` Become a reciever. Write output to stdout.
+ `tx <host> <portnum> ` Become a transmitter and add transport graph link toward an rx host. Optionally provide tx muliple times to inform us about transport alternatives. We fill tcp queues on the first entries and then proceed down the list if there is more input than link throughput. It can be helpful to provide multiple ip aliases to push work to different nic channel workers and balance traffic across LACP hash lanes. Analysis of the network resources shold inform this graph. You may use multiple physical interfaces by chosing rx host ip's that force multiple routes.
+ Read stdin and push it over the network.
+ Full duplex, rx and tx may be used simultaneously to provide a transparent pipe. Happy shell throwing!
+ Two disinct port numbers are requied, one rx port for each side, with the tx on the other host pointing at the rx
+ host1: ./vimaillipede rx 7788 tx host2 9900 charmode
+ host2: ./vimaillipede rx 9900 tx host1 7788 charmode
+ The source and destination machine may have multiple interfaces and may have:
+ varying layer1 media ( ethernet, serial, Infiniband , 1488, Carrier Pidgeon, insects, ntb)
+ varying layer2 attachment ( vlan, aggregation )
Expand All @@ -88,6 +92,22 @@ TCP connections are fragile and IP employs best effort delivery to preserve its
tx host1.expensive-last-resort.yoyodyne.com
```

+ terminate <port>
+ transmitter or receiver, requires full duplex
+ Accept a tcp connection
+ initiate <hostname> <port>
+ transmitter or receiver, requires full duplex
+ Create a tcp socket
+ use with terminate to tunnel a full duplex socket, this example tunnels ssh from host1:9022 to host2:22
```
host1: ./vimaillipede rx 7788 tx host2 9900 charmode terminate 9022
host2: ./vimaillipede rx 9900 tx host1 7788 charmode initiate localhost 22
```

+ charmode
+ transmitter or receiver
+ Attempt to deliver any data in the buffer, do not wait for buffers to fill strictly
+ verbose <0-20+>,
+ transmitter or receiver
+ ``` viamillipede rx 8834 verbose 5 ```
Expand Down Expand Up @@ -139,7 +159,7 @@ TCP connections are fragile and IP employs best effort delivery to preserve its
+ rot39, od
+ xdr/rpc marshalling for architecture independence
+ serializing a struct is not ideal
+ reverse channel capablity
+ reverse channel capablity *done 20180830
+ millipedesh ? millipederpc?
+ specify rx/tx at the same time + fifo?
+ is this even a good idea? Exploit generator?
Expand Down
12 changes: 12 additions & 0 deletions cfg.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

struct txconf_s; // forward decl to permit inception
struct rxconf_s; // forward decl to permit inception
struct ioconf_s; // forward decl to permit inception

#define preamble_cannon_ul 0xa5a5a5a55a5a5a5a
// opcodes for signaling oob status to remote end
Expand Down Expand Up @@ -71,6 +72,8 @@ struct txconf_s {
pthread_t ingest_thread;
int input_eof;
int done;
int input_fd;
struct ioconf_s *ioconf;
};

struct rxworker_s {
Expand All @@ -91,6 +94,15 @@ struct rxconf_s {
int next_leg; // main sequencer to monotonically order legs to stdout
int done_mbox;
pthread_mutex_t rxmutex;
int output_fd;
struct ioconf_s *ioconf;
};

struct ioconf_s {
unsigned terminate_port;
int terminate_socket;
char *initiate_host;
unsigned short initiate_port;
int initiate_socket;
};
#endif
3 changes: 2 additions & 1 deletion rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,9 @@ void rxworker(struct rxworker_s *rxworker) {
int writesize = 0;
cursor = 0;
while (remainder) {
writesize = write(STDOUT_FILENO, buffer + cursor,
writesize = write(rxworker->rxconf_parent->output_fd, buffer + cursor,
(size_t)MIN(remainder, MAXBSIZE));
assert(writesize > 0);
cursor += writesize;
remainder -= writesize;
}
Expand Down
19 changes: 16 additions & 3 deletions terminate.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,36 @@
int terminate(struct txconf_s *txconf, struct rxconf_s *rxconf,
struct ioconf_s *ioconf) {
int retc = -6;
if (ioconf->terminate_port) {
if (ioconf->terminate_port > 0) {
struct sockaddr_in sa;
tcp_recieve_prep(&sa, &(ioconf->terminate_socket), ioconf->terminate_port);
whisper(8, "term: accepting on %d", ioconf->terminate_port);
txconf->input_fd =
tcp_accept(&sa, ioconf->terminate_socket); // XX should we block?
assert(txconf->input_fd > 2);
rxconf->output_fd = txconf->input_fd;
retc = rxconf->output_fd = txconf->input_fd;
whisper(7, "txterm: accepted fd %d", txconf->input_fd);
} else if (ioconf->initiate_port > 0) {
// Dont clobber input_fd
retc = 1;
} else {
txconf->input_fd = STDIN_FILENO;
// Revert to pipe io
retc = txconf->input_fd = STDIN_FILENO;
}
return retc;
}
int initiate(struct txconf_s *txconf, struct rxconf_s *rxconf,
struct ioconf_s *ioconf) {
int retc = -6;
if (ioconf->initiate_port > 0) {
retc = txconf->input_fd = rxconf->output_fd =
tcp_connect(ioconf->initiate_host, ioconf->initiate_port);
} else if (ioconf->terminate_port > 0) {
// dont clobber output_fd;
retc = 1;
} else {
retc = rxconf->output_fd = STDOUT_FILENO;
}

return retc;
}
38 changes: 21 additions & 17 deletions tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,10 @@ void start_worker(struct txworker_s *txworker) {
txstatus(txworker->txconf_parent, 6);
}

// dump stdin in chunks
// this is single thread and should use memory at a drastic pace
// dump <source>/stdin in chunks
// this is single thread
void txingest(struct txconf_s *txconf) {
int readsize;
int in_fd = STDIN_FILENO;
int foot_cursor = 0;
unsigned long saved_checksum = 0xff;
int ingest_leg_counter = 0;
Expand All @@ -57,10 +56,11 @@ void txingest(struct txconf_s *txconf) {
pthread_mutex_unlock(&(txconf->mutex));
int worker = dispatch_idle_worker(txconf);
assert((txconf->workers[worker].buffer) != NULL);
readsize = bufferfill(in_fd, (u_char *)(txconf->workers[worker].buffer),
kfootsize, gcharmode);
readsize =
bufferfill(txconf->input_fd, (u_char *)(txconf->workers[worker].buffer),
kfootsize, gcharmode);
whisper(8, "\ntxw:%02i read leg %i : fd:%i siz:%i\n", worker,
ingest_leg_counter, in_fd, kfootsize);
ingest_leg_counter, txconf->input_fd, kfootsize);
assert(readsize <= kfootsize);
if (readsize > 0) {
// find the idle worker , lock it and dispatch as separate calls --
Expand Down Expand Up @@ -111,7 +111,10 @@ int txpush(struct txworker_s *txworker) {
int writelen = -1;
int cursor = 0;
txworker->state = 'a'; // preAmble
write(txworker->sockfd, &txworker->pkt, sizeof(struct millipacket_s));
int pktwrret =
write(txworker->sockfd, &txworker->pkt, sizeof(struct millipacket_s));
assert(pktwrret == sizeof(struct millipacket_s) &&
"millipacket not written ");
txworker->writeremainder = txworker->pkt.size;
assert(txworker->writeremainder <= kfootsize);
while (txworker->writeremainder && retcode) {
Expand Down Expand Up @@ -186,7 +189,7 @@ int tx_start_net(struct txworker_s *txworker) {
pthread_mutex_unlock(&(txworker->txconf_parent->mutex));

pthread_mutex_lock(&(txworker->mutex));
usleep(300 * 1000);
usleep((300 * 1000) << (kthreadmax - reconnect_fuse));
whisper(5, "txw:%02ireconnecting\n", txworker->id);
txworker->sockfd = tx_tcp_connect_next(txworker->txconf_parent);
// detect a dead connection and move on to the next port in the target map
Expand All @@ -196,6 +199,7 @@ int tx_start_net(struct txworker_s *txworker) {
whisper(5, "txw:%02i reconnect success fd:%i\n", txworker->id,
txworker->sockfd);
errno = 0;
reconnect_fuse = kthreadmax;
}
reconnect_fuse--;
}
Expand Down Expand Up @@ -321,7 +325,7 @@ void txworker_sm(struct txworker_s *txworker) {
void txlaunchworkers(struct txconf_s *txconf) {
int worker_cursor = 0;
int ret;
checkperror("nuicance before launch");
checkperror("nuisance before launch");
while (worker_cursor < txconf->worker_count) {
txconf->workers[worker_cursor].state = '0'; // unitialized
txconf->workers[worker_cursor].txconf_parent =
Expand All @@ -338,7 +342,7 @@ void txlaunchworkers(struct txconf_s *txconf) {
"insufficient memory up front");
// digression: pthreads murders all possible kittens stored in argument
// types
checkperror("nuicance pthread error launch");
checkperror("nuisance pthread error launch");
ret =
pthread_create(&(txconf->workers[worker_cursor].thread), NULL,
(void *)&txworker_sm, &(txconf->workers[worker_cursor]));
Expand Down Expand Up @@ -383,8 +387,8 @@ int tx_poll(struct txconf_s *txconf) {
char instate = 'E'; // error uninitialized
while (!done) {
usleep(1000); // e^n backoff?
if ((busy_cycles % 100) == 0)
txstatus(txconf, 4);
// if ((busy_cycles % 100) == 0)
// txstatus(txconf, 4);
busy_cycles++;
int busy_workers = 0;
for (int i = 0; i < txconf->worker_count; i++) {
Expand Down Expand Up @@ -442,20 +446,20 @@ void init_workers(struct txconf_s *txconf) {
void tx(struct txconf_s *txconf) {
int retcode;
gtxconf = txconf;
checkperror(" nuicance tx 0");
checkperror(" nuisance starting tx");
// start control channel
stopwatch_start(&(txconf->ticker));
signal(SIGINFO, &wat);
signal(SIGINFO, &wat); // XXX this gets weird in fdx mode
signal(SIGINT, &partingshot);
signal(SIGHUP, &partingshot);
checkperror(" nuicance tx 1");
checkperror("nuisance setting signal");
pthread_mutex_init(&(txconf->mutex), NULL);
pthread_mutex_lock(&(txconf->mutex));
checkperror(" nuicance tx 2");
checkperror("nuicance locking txconf");
txconf->done = 0;
txconf->input_eof = 0;
pthread_mutex_unlock(&(txconf->mutex));
init_workers(txconf);
checkperror(" nuicance tx 3");
checkperror("nuicance tx initializing workers");
txlaunchworkers(txconf);
}
2 changes: 2 additions & 0 deletions worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#define workerh

#include "cfg.h"
int terminate ( struct txconf_s * txconf, struct rxconf_s *rxconf, struct ioconf_s *ioconf );
int initiate ( struct txconf_s * txconf, struct rxconf_s *rxconf, struct ioconf_s *ioconf );
void tx(struct txconf_s *);
int tx_poll(struct txconf_s *);
void txstatus(struct txconf_s *, int log_level);
Expand Down

0 comments on commit 117dadd

Please sign in to comment.