Skip to content

Commit

Permalink
- djm@cvs.openbsd.org 2010/01/26 01:28:35
Browse files Browse the repository at this point in the history
     [channels.c channels.h clientloop.c clientloop.h mux.c nchan.c ssh.c]
     rewrite ssh(1) multiplexing code to a more sensible protocol.

     The new multiplexing code uses channels for the listener and
     accepted control sockets to make the mux master non-blocking, so
     no stalls when processing messages from a slave.

     avoid use of fatal() in mux master protocol parsing so an errant slave
     process cannot take down a running master.

     implement requesting of port-forwards over multiplexed sessions. Any
     port forwards requested by the slave are added to those the master has
     established.

     add support for stdio forwarding ("ssh -W host:port ...") in mux slaves.

     document master/slave mux protocol so that other tools can use it to
     control a running ssh(1). Note: there are no guarantees that this
     protocol won't be incompatibly changed (though it is versioned).

     feedback Salvador Fandino, dtucker@
     channel changes ok markus@
  • Loading branch information
djmdjm committed Jan 26, 2010
1 parent f589fd1 commit e1537f9
Show file tree
Hide file tree
Showing 8 changed files with 1,725 additions and 558 deletions.
23 changes: 23 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,29 @@
[roaming_client.c]
s/long long unsigned/unsigned long long/, from tim via portable
(Id sync only, change already in portable)
- djm@cvs.openbsd.org 2010/01/26 01:28:35
[channels.c channels.h clientloop.c clientloop.h mux.c nchan.c ssh.c]
rewrite ssh(1) multiplexing code to a more sensible protocol.

The new multiplexing code uses channels for the listener and
accepted control sockets to make the mux master non-blocking, so
no stalls when processing messages from a slave.

avoid use of fatal() in mux master protocol parsing so an errant slave
process cannot take down a running master.

implement requesting of port-forwards over multiplexed sessions. Any
port forwards requested by the slave are added to those the master has
established.

add support for stdio forwarding ("ssh -W host:port ...") in mux slaves.

document master/slave mux protocol so that other tools can use it to
control a running ssh(1). Note: there are no guarantees that this
protocol won't be incompatibly changed (though it is versioned).

feedback Salvador Fandino, dtucker@
channel changes ok markus@

20100122
- (tim) [configure.ac] Due to constraints in Windows Sockets in terms of
Expand Down
214 changes: 164 additions & 50 deletions channels.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* $OpenBSD: channels.c,v 1.301 2010/01/11 01:39:46 dtucker Exp $ */
/* $OpenBSD: channels.c,v 1.302 2010/01/26 01:28:35 djm Exp $ */
/*
* Author: Tatu Ylonen <ylo@cs.hut.fi>
* Copyright (c) 1995 Tatu Ylonen <ylo@cs.hut.fi>, Espoo, Finland
Expand Down Expand Up @@ -239,7 +239,6 @@ channel_register_fds(Channel *c, int rfd, int wfd, int efd,
c->rfd = rfd;
c->wfd = wfd;
c->sock = (rfd == wfd) ? rfd : -1;
c->ctl_fd = -1; /* XXX: set elsewhere */
c->efd = efd;
c->extended_usage = extusage;

Expand Down Expand Up @@ -328,6 +327,9 @@ channel_new(char *ctype, int type, int rfd, int wfd, int efd,
c->output_filter = NULL;
c->filter_ctx = NULL;
c->filter_cleanup = NULL;
c->ctl_chan = -1;
c->mux_rcb = NULL;
c->mux_ctx = NULL;
c->delayed = 1; /* prevent call to channel_post handler */
TAILQ_INIT(&c->status_confirms);
debug("channel %d: new [%s]", found, remote_name);
Expand Down Expand Up @@ -370,11 +372,10 @@ channel_close_fd(int *fdp)
static void
channel_close_fds(Channel *c)
{
debug3("channel %d: close_fds r %d w %d e %d c %d",
c->self, c->rfd, c->wfd, c->efd, c->ctl_fd);
debug3("channel %d: close_fds r %d w %d e %d",
c->self, c->rfd, c->wfd, c->efd);

channel_close_fd(&c->sock);
channel_close_fd(&c->ctl_fd);
channel_close_fd(&c->rfd);
channel_close_fd(&c->wfd);
channel_close_fd(&c->efd);
Expand All @@ -400,8 +401,6 @@ channel_free(Channel *c)

if (c->sock != -1)
shutdown(c->sock, SHUT_RDWR);
if (c->ctl_fd != -1)
shutdown(c->ctl_fd, SHUT_RDWR);
channel_close_fds(c);
buffer_free(&c->input);
buffer_free(&c->output);
Expand Down Expand Up @@ -523,6 +522,7 @@ channel_still_open(void)
case SSH_CHANNEL_X11_LISTENER:
case SSH_CHANNEL_PORT_LISTENER:
case SSH_CHANNEL_RPORT_LISTENER:
case SSH_CHANNEL_MUX_LISTENER:
case SSH_CHANNEL_CLOSED:
case SSH_CHANNEL_AUTH_SOCKET:
case SSH_CHANNEL_DYNAMIC:
Expand All @@ -536,6 +536,7 @@ channel_still_open(void)
case SSH_CHANNEL_OPENING:
case SSH_CHANNEL_OPEN:
case SSH_CHANNEL_X11_OPEN:
case SSH_CHANNEL_MUX_CLIENT:
return 1;
case SSH_CHANNEL_INPUT_DRAINING:
case SSH_CHANNEL_OUTPUT_DRAINING:
Expand Down Expand Up @@ -567,6 +568,8 @@ channel_find_open(void)
case SSH_CHANNEL_X11_LISTENER:
case SSH_CHANNEL_PORT_LISTENER:
case SSH_CHANNEL_RPORT_LISTENER:
case SSH_CHANNEL_MUX_LISTENER:
case SSH_CHANNEL_MUX_CLIENT:
case SSH_CHANNEL_OPENING:
case SSH_CHANNEL_CONNECTING:
case SSH_CHANNEL_ZOMBIE:
Expand Down Expand Up @@ -617,6 +620,8 @@ channel_open_message(void)
case SSH_CHANNEL_CLOSED:
case SSH_CHANNEL_AUTH_SOCKET:
case SSH_CHANNEL_ZOMBIE:
case SSH_CHANNEL_MUX_CLIENT:
case SSH_CHANNEL_MUX_LISTENER:
continue;
case SSH_CHANNEL_LARVAL:
case SSH_CHANNEL_OPENING:
Expand All @@ -627,12 +632,12 @@ channel_open_message(void)
case SSH_CHANNEL_INPUT_DRAINING:
case SSH_CHANNEL_OUTPUT_DRAINING:
snprintf(buf, sizeof buf,
" #%d %.300s (t%d r%d i%d/%d o%d/%d fd %d/%d cfd %d)\r\n",
" #%d %.300s (t%d r%d i%d/%d o%d/%d fd %d/%d cc %d)\r\n",
c->self, c->remote_name,
c->type, c->remote_id,
c->istate, buffer_len(&c->input),
c->ostate, buffer_len(&c->output),
c->rfd, c->wfd, c->ctl_fd);
c->rfd, c->wfd, c->ctl_chan);
buffer_append(&buffer, buf, strlen(buf));
continue;
default:
Expand Down Expand Up @@ -839,9 +844,6 @@ channel_pre_open(Channel *c, fd_set *readset, fd_set *writeset)
FD_SET(c->efd, readset);
}
/* XXX: What about efd? races? */
if (compat20 && c->ctl_fd != -1 &&
c->istate == CHAN_INPUT_OPEN && c->ostate == CHAN_OUTPUT_OPEN)
FD_SET(c->ctl_fd, readset);
}

/* ARGSUSED */
Expand Down Expand Up @@ -986,6 +988,28 @@ channel_pre_x11_open(Channel *c, fd_set *readset, fd_set *writeset)
}
}

static void
channel_pre_mux_client(Channel *c, fd_set *readset, fd_set *writeset)
{
if (c->istate == CHAN_INPUT_OPEN &&
buffer_check_alloc(&c->input, CHAN_RBUF))
FD_SET(c->rfd, readset);
if (c->istate == CHAN_INPUT_WAIT_DRAIN) {
/* clear buffer immediately (discard any partial packet) */
buffer_clear(&c->input);
chan_ibuf_empty(c);
/* Start output drain. XXX just kill chan? */
chan_rcvd_oclose(c);
}
if (c->ostate == CHAN_OUTPUT_OPEN ||
c->ostate == CHAN_OUTPUT_WAIT_DRAIN) {
if (buffer_len(&c->output) > 0)
FD_SET(c->wfd, writeset);
else if (c->ostate == CHAN_OUTPUT_WAIT_DRAIN)
chan_obuf_empty(c);
}
}

/* try to decode a socks4 header */
/* ARGSUSED */
static int
Expand Down Expand Up @@ -1218,19 +1242,14 @@ channel_decode_socks5(Channel *c, fd_set *readset, fd_set *writeset)
}

Channel *
channel_connect_stdio_fwd(const char *host_to_connect, u_short port_to_connect)
channel_connect_stdio_fwd(const char *host_to_connect, u_short port_to_connect,
int in, int out)
{
Channel *c;
int in, out;

debug("channel_connect_stdio_fwd %s:%d", host_to_connect,
port_to_connect);

in = dup(STDIN_FILENO);
out = dup(STDOUT_FILENO);
if (in < 0 || out < 0)
fatal("channel_connect_stdio_fwd: dup() in/out failed");

c = channel_new("stdio-forward", SSH_CHANNEL_OPENING, in, out,
-1, CHAN_TCP_WINDOW_DEFAULT, CHAN_TCP_PACKET_DEFAULT,
0, "stdio-forward", /*nonblock*/0);
Expand Down Expand Up @@ -1749,36 +1768,6 @@ channel_handle_efd(Channel *c, fd_set *readset, fd_set *writeset)
return 1;
}

/* ARGSUSED */
static int
channel_handle_ctl(Channel *c, fd_set *readset, fd_set *writeset)
{
char buf[16];
int len;

/* Monitor control fd to detect if the slave client exits */
if (c->ctl_fd != -1 && FD_ISSET(c->ctl_fd, readset)) {
len = read(c->ctl_fd, buf, sizeof(buf));
if (len < 0 &&
(errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK))
return 1;
if (len <= 0) {
debug2("channel %d: ctl read<=0", c->self);
if (c->type != SSH_CHANNEL_OPEN) {
debug2("channel %d: not open", c->self);
chan_mark_dead(c);
return -1;
} else {
chan_read_failed(c);
chan_write_failed(c);
}
return -1;
} else
fatal("%s: unexpected data on ctl fd", __func__);
}
return 1;
}

static int
channel_check_window(Channel *c)
{
Expand Down Expand Up @@ -1809,10 +1798,131 @@ channel_post_open(Channel *c, fd_set *readset, fd_set *writeset)
if (!compat20)
return;
channel_handle_efd(c, readset, writeset);
channel_handle_ctl(c, readset, writeset);
channel_check_window(c);
}

static u_int
read_mux(Channel *c, u_int need)
{
char buf[CHAN_RBUF];
int len;
u_int rlen;

if (buffer_len(&c->input) < need) {
rlen = need - buffer_len(&c->input);
len = read(c->rfd, buf, MIN(rlen, CHAN_RBUF));
if (len <= 0) {
if (errno != EINTR && errno != EAGAIN) {
debug2("channel %d: ctl read<=0 rfd %d len %d",
c->self, c->rfd, len);
chan_read_failed(c);
return 0;
}
} else
buffer_append(&c->input, buf, len);
}
return buffer_len(&c->input);
}

static void
channel_post_mux_client(Channel *c, fd_set *readset, fd_set *writeset)
{
u_int need;
ssize_t len;

if (!compat20)
fatal("%s: entered with !compat20", __func__);

if (c->rfd != -1 && FD_ISSET(c->rfd, readset) &&
(c->istate == CHAN_INPUT_OPEN ||
c->istate == CHAN_INPUT_WAIT_DRAIN)) {
/*
* Don't not read past the precise end of packets to
* avoid disrupting fd passing.
*/
if (read_mux(c, 4) < 4) /* read header */
return;
need = get_u32(buffer_ptr(&c->input));
#define CHANNEL_MUX_MAX_PACKET (256 * 1024)
if (need > CHANNEL_MUX_MAX_PACKET) {
debug2("channel %d: packet too big %u > %u",
c->self, CHANNEL_MUX_MAX_PACKET, need);
chan_rcvd_oclose(c);
return;
}
if (read_mux(c, need + 4) < need + 4) /* read body */
return;
if (c->mux_rcb(c) != 0) {
debug("channel %d: mux_rcb failed", c->self);
chan_mark_dead(c);
return;
}
}

if (c->wfd != -1 && FD_ISSET(c->wfd, writeset) &&
buffer_len(&c->output) > 0) {
len = write(c->wfd, buffer_ptr(&c->output),
buffer_len(&c->output));
if (len < 0 && (errno == EINTR || errno == EAGAIN))
return;
if (len <= 0) {
chan_mark_dead(c);
return;
}
buffer_consume(&c->output, len);
}
}

static void
channel_post_mux_listener(Channel *c, fd_set *readset, fd_set *writeset)
{
Channel *nc;
struct sockaddr_storage addr;
socklen_t addrlen;
int newsock;
uid_t euid;
gid_t egid;

if (!FD_ISSET(c->sock, readset))
return;

debug("multiplexing control connection");

/*
* Accept connection on control socket
*/
memset(&addr, 0, sizeof(addr));
addrlen = sizeof(addr);
if ((newsock = accept(c->sock, (struct sockaddr*)&addr,
&addrlen)) == -1) {
error("%s accept: %s", __func__, strerror(errno));
return;
}

if (getpeereid(newsock, &euid, &egid) < 0) {
error("%s getpeereid failed: %s", __func__,
strerror(errno));
close(newsock);
return;
}
if ((euid != 0) && (getuid() != euid)) {
error("multiplex uid mismatch: peer euid %u != uid %u",
(u_int)euid, (u_int)getuid());
close(newsock);
return;
}
nc = channel_new("multiplex client", SSH_CHANNEL_MUX_CLIENT,
newsock, newsock, -1, c->local_window_max,
c->local_maxpacket, 0, "mux-control", 1);
nc->mux_rcb = c->mux_rcb;
debug3("%s: new mux channel %d fd %d", __func__,
nc->self, nc->sock);
/* establish state */
nc->mux_rcb(nc);
/* mux state transitions must not elicit protocol messages */
nc->flags |= CHAN_LOCAL;
}

/* ARGSUSED */
static void
channel_post_output_drain_13(Channel *c, fd_set *readset, fd_set *writeset)
Expand Down Expand Up @@ -1841,6 +1951,8 @@ channel_handler_init_20(void)
channel_pre[SSH_CHANNEL_AUTH_SOCKET] = &channel_pre_listener;
channel_pre[SSH_CHANNEL_CONNECTING] = &channel_pre_connecting;
channel_pre[SSH_CHANNEL_DYNAMIC] = &channel_pre_dynamic;
channel_pre[SSH_CHANNEL_MUX_LISTENER] = &channel_pre_listener;
channel_pre[SSH_CHANNEL_MUX_CLIENT] = &channel_pre_mux_client;

channel_post[SSH_CHANNEL_OPEN] = &channel_post_open;
channel_post[SSH_CHANNEL_PORT_LISTENER] = &channel_post_port_listener;
Expand All @@ -1849,6 +1961,8 @@ channel_handler_init_20(void)
channel_post[SSH_CHANNEL_AUTH_SOCKET] = &channel_post_auth_listener;
channel_post[SSH_CHANNEL_CONNECTING] = &channel_post_connecting;
channel_post[SSH_CHANNEL_DYNAMIC] = &channel_post_open;
channel_post[SSH_CHANNEL_MUX_LISTENER] = &channel_post_mux_listener;
channel_post[SSH_CHANNEL_MUX_CLIENT] = &channel_post_mux_client;
}

static void
Expand Down
Loading

0 comments on commit e1537f9

Please sign in to comment.