-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
14 changed files
with
1,964 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
### viamillipede: | ||
|
||
RAID for TCP. | ||
|
||
Viamillipede is client and server program built to improve network pipe transport using multiple TCP sessions. It multiplexes a single network pipe into multiple TCP connectons and then terminates the connections into a pipe transparently on another host. It is similar to the simplest mode of remote pipe transparency of Netcat with the parallelism from iperf. | ||
|
||
![alt text](theory_operation_viamillipede.svg "theory of operation") | ||
#### Problems With existing approaches: | ||
|
||
+ Single TCP connections are friable | ||
+ typical pathology: | ||
+ `/bin/dd obs=1m 2> /dev/null | /bin/dd obs=1m 2> /dev/null| /usr/local/bin/pipewatcher` | ||
+ double serial penalty, note latent mistake causing 1B reads | ||
+ desperately superstitious | ||
+ `# replcmd = '%s%s/usr/local/bin/pipewatcher $$ | %s "%s/sbin/zfs receive -F -d \'%s\' && echo Succeeded"' % (compress, throttle, sshcmd, decompress, remotefs) | ||
` | ||
+ ssh is not the tool for every situation | ||
+ fixed pipeline is not tuned for any system | ||
+ keep interpreted(y) lanuages out of plumbing | ||
+ SMP igorant, Cpu's lonely forever. | ||
+ underused tx/rx interrupt enpoints, pcie lanes, channel workers, memory lanes and flow control concurrency controls idled. | ||
+ network hardware is tuned against hot single tcp connections | ||
+ Poor mss window scaling. Congestion controls aggressively collapse mss when network conditions are not pristine. | ||
+ Long bandwidth latency product connections vs. short skinny pipes; niether work out of the box due to 'impedence mismatches' ( not really Z !) | ||
+ Poor buffer interactions. "Shoe shining" when buffer sizing is not appropriate. Taoe libaries muxed connections soon after helical type drives. | ||
+ NewReno alternatives are not often acceptable. | ||
+ Flows are stuck on one L3 path. This defeats the benefits of aggregation and multi-homed connections. | ||
+ Alternate Scatter gather transport is usually not pipe transparent and difficult to set up; eg: pftp, bittorrent, pNFS | ||
|
||
|
||
#### Goals and Features of viamillipede: | ||
+ Simple to use in pipe+filter programs | ||
+ too simple? | ||
+ Why hasn't someone done this before? | ||
+ clear parallel programming model | ||
+ nothing parallel is clear, mind boundary conditions against transporter accidents. | ||
+ Provide: | ||
+ Sufficent buffering for throughput. | ||
+ Runtime SIGINFO inspection of traffic flow.`( parallelism, worker allocation, total throughput )` | ||
+ Resilience against dropped TCP connections.`(*)` | ||
+ Increase traffic throughput by: | ||
+ Using parallel connections that each vie for survival against scaling window collapse. | ||
+ Using multiple destination addresses with LACP/LAGG or separate Layer 2 adressing. | ||
+ Intelligent Traffic Shaping: | ||
+ Steer traffic to preferred interfaces. | ||
+ Dynamically use faster destinations if preferred interfaces are clogged. | ||
+ Make additional 'sidechain' compression/crypto pipeline steps parallel. `(*)` | ||
+ hard due to unpredictable buffer size dynamics | ||
+ sidechains could include any reversable pipe transparent program | ||
+ gzip, bzip2 | ||
+ openssl | ||
+ rot39, od | ||
+ Architecture independence `(*)` | ||
+ xdr/rpc marshalling | ||
+ reverse channel capablity | ||
+ millipedesh ? millipederpc? | ||
+ provide proxy trasport for other bulk movers: rsync, ssh OpenVPN | ||
+ error feedback path | ||
+ just run two tx/rx pairs? | ||
+ Error resiliance `(*)` | ||
+ dead link bypass ` (*)` | ||
+ very long lived TCP sessions are delicate things; | ||
+ Your NOC wants to do maintenance and you must have a week of pipeline to push | ||
+ restart broken legs on alternate connections automatically `(*)` | ||
+ self tuning worker count, side chain, link choices and buffer sizes, Genetic optimization topic? | ||
+ checksums | ||
|
||
`(*)` denotes work in progress, because "hard * ugly > time" | ||
|
||
#### Examples: | ||
|
||
+ trivial operation | ||
+ Configure receiver with rx <portnum> | ||
` viamillipede rx 8834 ` | ||
+ Configure transmitter with tx <receiver_host> <portnum> | ||
` echo "Osymandias" | viamillipede tx foreign.shore.net 8834 ` | ||
+ verbose <0-20> | ||
` viamillipede rx 8834 verbose 5 ` | ||
+ control worker thread count (only on transmitter) with threads <1-16> | ||
` viamillipede tx foreign.shore.net 8834 threads 16 ` | ||
+ use with zfs send/recv | ||
+ Configure transmitter with tx <receiver_host> <portnum> and provide stdin from zfs send | ||
+ ` zfs send dozer/visage | viamillipede tx foriegn.shore.net 8834 ` | ||
+ Configure receiver with rx <portnum> and ppipe output to zfs recv | ||
+ `viamillipede rx 8834 | zfs recv trinity/broken ` | ||
|
||
+ explicitly distribute load to reciever with multiple ip addresses, preferring the first ones used | ||
+ Use the cheap link, saturate it, then fill the fast (north) transport and then use the last resort link (east) if there is still source and sync throughput available. | ||
+ The destination machine has three interfaces and may have: | ||
+ varying layer1 media ( ether, serial, Infiniband , 1488, Carrier Pidgeon, Bugs) | ||
+ varying layer2 attachment ( vlan, aggregation ) | ||
+ varying layer3 routes | ||
+ `viamillipede tx foreign-cheap-1g.shore.net 8834 tx foreign-north-10g.shore.net 8834 tx foreign-east-1g.shore.net 8834 ` | ||
|
||
|
||
|
||
``` | ||
TOP: | ||
scatter gather transport via multiple workers | ||
feet are the work blocks | ||
start workers | ||
worker states | ||
idle | ||
working | ||
the window is the feet in flight | ||
window: | ||
foot (footid) | ||
stream start = footid * footsize | ||
stream end = (footid + 1) * footsize | ||
window [firstfoot, lastfoot] ... heap? sorted for min foot out? | ||
sequence recieved feet to receate stream in order | ||
supervise the results relaibly. | ||
retrnsmit failed feet | ||
maximize throughput vs window vs latency product | ||
Retry broken transport works | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
#ifndef cfgh | ||
#define cfgh | ||
|
||
#include <stdio.h> | ||
#include <string.h> | ||
#include <unistd.h> | ||
#include <assert.h> | ||
#include <stdlib.h> | ||
#include <limits.h> | ||
#include <sys/types.h> | ||
#include <sys/socket.h> | ||
#include <sys/param.h> | ||
#include <netinet/in.h> | ||
#include <netdb.h> | ||
#include <errno.h> | ||
#include <pthread.h> | ||
|
||
#include <signal.h> | ||
#include "util.h" | ||
|
||
#define kfootsize ( 2048 * 1024 ) | ||
|
||
struct txconf_s; // forward decl to permit inception | ||
struct rxconf_s; // forward decl to permit inception | ||
|
||
#define preamble_cannon_ul 0xa5a5a5a55a5a5a5a | ||
// opcodes for signaling oob status to remote end | ||
#define feed_more 0xcafe | ||
#define end_of_millipede 0xdead | ||
// the bearer channel header | ||
struct millipacket_s { | ||
unsigned long preamble; // shoudl always be preamble_cannon_ul constant | ||
unsigned long leg_id; // leg_id= ( streampos % leg_size ) is the main sequencer for the whole session | ||
unsigned long size; | ||
int checksum; | ||
int opcode; | ||
|
||
}; | ||
struct txworker_s { | ||
int id; | ||
struct txconf_s *txconf_parent; | ||
char state; | ||
pthread_mutex_t mutex; | ||
pthread_t thread; | ||
int sockfd ; | ||
u_char * buffer; | ||
struct millipacket_s pkt; | ||
int writeremainder; | ||
FILE * pip; | ||
|
||
}; | ||
|
||
struct target_port_s { | ||
char * name; | ||
unsigned short port ; | ||
}; | ||
struct txconf_s { | ||
int worker_count; | ||
struct timespec ticker; | ||
u_long stream_total_bytes; | ||
struct txworker_s workers[16]; //XXX make dynamic??? | ||
int target_port_count; | ||
struct target_port_s target_ports[16]; | ||
pthread_mutex_t mutex; | ||
}; | ||
|
||
|
||
|
||
struct rxworker_s { | ||
int id; | ||
int sockfd; | ||
int socknum; | ||
struct rxconf_s *rxconf_parent; | ||
pthread_mutex_t mutex; | ||
pthread_t thread; | ||
}; | ||
struct rxconf_s { | ||
int workercount; | ||
struct sockaddr_in sa; // reusable bound sa for later accepts | ||
int socknum ; // reusable bound socket number later accepts | ||
unsigned short port; | ||
struct rxworker_s workers[17]; | ||
int next_leg ; // main sequencer to monotonically order legs to stdout | ||
int done_mbox; | ||
} ; | ||
|
||
|
||
#endif |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
#!/bin/csh | ||
while ( 1 == 1 ) | ||
make clean all test | ||
sleep 60 | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
all: | ||
cc -g -o viamillipede -lpthread plumbing.c rx.c tx.c viamillipede.c | ||
clean: | ||
rm -f viamillipede viamillipede.core | ||
|
||
test: clean all | ||
sh test.sh |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
#include "worker.h" | ||
#include "util.h" | ||
#include <sys/socket.h> | ||
|
||
// forcefully read utill a bufffer completes or EOF | ||
ssize_t bufferfill ( int fd, u_char * dest, size_t size ) { | ||
int remainder=size; | ||
u_char * dest_cursor = dest; | ||
ssize_t accumulator=0; | ||
ssize_t readsize; | ||
int fuse=1055; // don't spin forever | ||
int sleep_thief =0; | ||
assert ( dest != NULL ); | ||
do { | ||
checkperror ( "bufferfill nuiscance err"); | ||
if ( errno !=0 ) { | ||
whisper (3, "ignoring sig %i\n", errno); | ||
errno = 0 ; | ||
} | ||
//reset nuiscances | ||
readsize = read( fd, dest_cursor, MIN( MAXBSIZE, remainder) ); | ||
if ( errno != 0 ) { | ||
whisper ( 3, "erno: %i readsize %ld requestedsiz: %d fd:%i dest:%p \n", errno , readsize, MIN( MAXBSIZE, remainder), fd, dest_cursor ); | ||
} | ||
checkperror( "bufferfill read err"); | ||
/* | ||
whisper( 20, "txingest: read stdin size %ld offset:%i remaining %i \n", | ||
readsize, | ||
(int) ((u_char*)dest_cursor - (u_char*)dest), | ||
remainder ); | ||
*/ | ||
if ( readsize < 0 ) { | ||
whisper (2, "negative read"); | ||
perror ( "negread"); | ||
break; | ||
} | ||
else { | ||
remainder -= readsize ; | ||
assert ( remainder >= 0); | ||
accumulator += readsize; | ||
dest_cursor += readsize; | ||
if ( readsize < MAXBSIZE) { | ||
// discourage tinygrams - they just beat us up and chew the cpu | ||
// XXX histgram the readsize and use ema to track optimal effort | ||
sleep_thief ++; | ||
} else { | ||
sleep_thief= 0; | ||
} | ||
usleep ( sleep_thief ); | ||
if ( readsize < 1 ) { // short reads are the end | ||
break; | ||
} | ||
} | ||
} while ( (remainder > 0) && ( fuse-- > 0 ) ); | ||
assert ( (fuse > 1 ) && "fuse blown" ); | ||
return ( (fuse < 1 )? -1 : accumulator ); | ||
} | ||
void stopwatch_start (struct timespec * t ) { | ||
assert ( clock_gettime( CLOCK_UPTIME, t ) == 0); | ||
} | ||
int stopwatch_stop ( struct timespec * t , int whisper_channel) { | ||
// stop the timer started at t | ||
struct timespec stoptime; | ||
assert ( clock_gettime( CLOCK_UPTIME, &stoptime ) == 0 ); | ||
time_t secondsdiff = stoptime.tv_sec - t->tv_sec ; | ||
long nanoes = stoptime.tv_nsec - t->tv_nsec; | ||
|
||
if ( nanoes < 0 ) { | ||
// borrow billions place nanoseconds to come up true | ||
nanoes += 1000000000; | ||
secondsdiff --; | ||
} | ||
u_long ret = MIN( ULONG_MAX, (secondsdiff * 1000000 ) + (nanoes/1000)); | ||
if ( whisper_channel > 0 ) { whisper ( whisper_channel, "%li.%03li", secondsdiff, nanoes/1000000); } | ||
return ret; | ||
} | ||
//return a connected socket fd | ||
int tcp_connect ( char * host, int port ) { | ||
int ret_sockfd= -1; | ||
int retcode; | ||
struct hostent *lhostent; | ||
struct addrinfo laddrinfo; | ||
struct sockaddr_in lsockaddr; | ||
//struct sockaddr lsockaddr; | ||
lhostent = gethostbyname ( host ); | ||
if ( h_errno != 0 ) herror ( "gethostenterror" ); | ||
assert ( h_errno == 0 && "hostname fishy"); | ||
lsockaddr.sin_family = AF_INET; | ||
lsockaddr.sin_port = htons( port ); | ||
memcpy(&(lsockaddr.sin_addr), lhostent->h_addr_list [0], sizeof(struct in_addr)); //y u hate c? | ||
ret_sockfd = socket ( AF_INET,SOCK_STREAM, 0 ); | ||
assert ( ret_sockfd > 0 && "socket fishy"); | ||
retcode = connect ( ret_sockfd, (struct sockaddr*) &(lsockaddr), sizeof (struct sockaddr) ); | ||
checkperror ( "socket connect"); | ||
if ( retcode != 0 ) perror ("connect() errrr:"); | ||
assert ( retcode == 0 && "connect fail "); | ||
int sockerr; | ||
u_int sockerrsize = sizeof(sockerr); //uhg | ||
getsockopt ( ret_sockfd , SOL_SOCKET, SO_ERROR, &sockerr, &sockerrsize); | ||
checkperror ( "connect"); | ||
assert ( sockerr == 0); | ||
whisper (8, " connected to %s:%i\n", host, port); | ||
return ( ret_sockfd ); | ||
} | ||
int tcp_recieve_prep (struct sockaddr_in * sa, int * socknum, int inport) { | ||
int one=1; | ||
int retcode; | ||
int lsock = -1; | ||
*socknum = socket (AF_INET, SOCK_STREAM, 0); | ||
sa->sin_family= AF_INET; | ||
sa->sin_addr.s_addr= htons (INADDR_ANY); | ||
sa->sin_port = htons (inport); | ||
whisper (7, "bind sockid: %i\n",*socknum); | ||
setsockopt(*socknum,SOL_SOCKET,SO_REUSEPORT,(char *)&one,sizeof(one)); | ||
retcode = bind (*socknum, (struct sockaddr *) sa,sizeof (struct sockaddr_in) ); | ||
if ( retcode != 0 ) { perror ("bind failed"); assert (0); } | ||
checkperror("bindfail"); | ||
whisper (9, " listen sockid: %i\n",*socknum); | ||
retcode = listen ( *socknum, 6) ; | ||
if ( retcode != 0) { perror ("listen fail:\n"); assert ( 0 );}; | ||
return retcode ; | ||
} | ||
|
||
int tcp_accept(struct sockaddr_in * sa , int socknum ){ | ||
int out_sockfd; | ||
socklen_t socklen = sizeof (struct sockaddr_in ) ; | ||
whisper (17, "accept sockid: %i\n",socknum); | ||
out_sockfd = accept (socknum,(struct sockaddr *)sa,&socklen); | ||
whisper (10, "socket %i accepted to fd:%i \n" , socknum,out_sockfd); | ||
checkperror ("acceptor"); | ||
return (out_sockfd); | ||
} | ||
|
Oops, something went wrong.