Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle EINTR in usockets #12357

Merged
merged 8 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Handle EINTR in usockets
  • Loading branch information
Jarred-Sumner committed Jul 5, 2024
commit af6d49ae483dd5471f3d72e425ea3effb720c38e
106 changes: 79 additions & 27 deletions packages/bun-usockets/src/bsd.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

#define __APPLE_USE_RFC_3542

#define LIKELY(cond) __builtin_expect((_Bool)(cond), 1)
#define UNLIKELY(cond) __builtin_expect((_Bool)(cond), 0)

#include "libusockets.h"
#include "internal/internal.h"

Expand All @@ -42,6 +45,12 @@
#define HAS_MSGX
#endif

#ifdef _WIN32
#define IS_EINTR(rc) (rc == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
#else
#define IS_EINTR(rc) (rc == -1 && errno == EINTR)
#endif

/* We need to emulate sendmmsg, recvmmsg on platform who don't have it */
int bsd_sendmmsg(LIBUS_SOCKET_DESCRIPTOR fd, struct udp_sendbuf* sendbuf, int flags) {
#if defined(_WIN32)// || defined(__APPLE__)
Expand Down Expand Up @@ -397,20 +406,28 @@ int bsd_addr_get_port(struct bsd_addr_t *addr) {
// called by dispatch_ready_poll
LIBUS_SOCKET_DESCRIPTOR bsd_accept_socket(LIBUS_SOCKET_DESCRIPTOR fd, struct bsd_addr_t *addr) {
LIBUS_SOCKET_DESCRIPTOR accepted_fd;
addr->len = sizeof(addr->mem);

while (1) {
addr->len = sizeof(addr->mem);

#if defined(SOCK_CLOEXEC) && defined(SOCK_NONBLOCK)
// Linux, FreeBSD
accepted_fd = accept4(fd, (struct sockaddr *) addr, &addr->len, SOCK_CLOEXEC | SOCK_NONBLOCK);
#else
// Windows, OS X
accepted_fd = accept(fd, (struct sockaddr *) addr, &addr->len);

#endif

/* We cannot rely on addr since it is not initialized if failed */
if (accepted_fd == LIBUS_SOCKET_ERROR) {
return LIBUS_SOCKET_ERROR;
if (UNLIKELY(IS_EINTR(accepted_fd))) {
continue;
}

/* We cannot rely on addr since it is not initialized if failed */
if (accepted_fd == LIBUS_SOCKET_ERROR) {
return LIBUS_SOCKET_ERROR;
}

break;
}

internal_finalize_bsd_addr(addr);
Expand All @@ -423,28 +440,44 @@ LIBUS_SOCKET_DESCRIPTOR bsd_accept_socket(LIBUS_SOCKET_DESCRIPTOR fd, struct bsd
#endif
}

int bsd_recv(LIBUS_SOCKET_DESCRIPTOR fd, void *buf, int length, int flags) {
return recv(fd, buf, length, flags);
ssize_t bsd_recv(LIBUS_SOCKET_DESCRIPTOR fd, void *buf, int length, int flags) {
while (1) {
ssize_t ret = recv(fd, buf, length, flags);

if (UNLIKELY(IS_EINTR(ret))) {
continue;
}

return ret;
}
}

#if !defined(_WIN32)
#include <sys/uio.h>

int bsd_write2(LIBUS_SOCKET_DESCRIPTOR fd, const char *header, int header_length, const char *payload, int payload_length) {
ssize_t bsd_write2(LIBUS_SOCKET_DESCRIPTOR fd, const char *header, int header_length, const char *payload, int payload_length) {
struct iovec chunks[2];

chunks[0].iov_base = (char *)header;
chunks[0].iov_len = header_length;
chunks[1].iov_base = (char *)payload;
chunks[1].iov_len = payload_length;

return writev(fd, chunks, 2);
while (1) {
ssize_t written = writev(fd, chunks, 2);

if (UNLIKELY(IS_EINTR(written))) {
continue;
}

return written;
}
}
#else
int bsd_write2(LIBUS_SOCKET_DESCRIPTOR fd, const char *header, int header_length, const char *payload, int payload_length) {
int written = bsd_send(fd, header, header_length, 0);
ssize_t bsd_write2(LIBUS_SOCKET_DESCRIPTOR fd, const char *header, int header_length, const char *payload, int payload_length) {
ssize_t written = bsd_send(fd, header, header_length, 0);
if (written == header_length) {
int second_write = bsd_send(fd, payload, payload_length, 0);
ssize_t second_write = bsd_send(fd, payload, payload_length, 0);
if (second_write > 0) {
written += second_write;
}
Expand All @@ -453,26 +486,28 @@ int bsd_write2(LIBUS_SOCKET_DESCRIPTOR fd, const char *header, int header_length
}
#endif

int bsd_send(LIBUS_SOCKET_DESCRIPTOR fd, const char *buf, int length, int msg_more) {

ssize_t bsd_send(LIBUS_SOCKET_DESCRIPTOR fd, const char *buf, int length, int msg_more) {
while (1) {
// MSG_MORE (Linux), MSG_PARTIAL (Windows), TCP_NOPUSH (BSD)

#ifndef MSG_NOSIGNAL
#define MSG_NOSIGNAL 0
#endif

#ifdef MSG_MORE

// for Linux we do not want signals
return send(fd, buf, length, ((msg_more != 0) * MSG_MORE) | MSG_NOSIGNAL | MSG_DONTWAIT);

#else

// use TCP_NOPUSH
#ifdef MSG_MORE
// for Linux we do not want signals
ssize_t rc = send(fd, buf, length, ((msg_more != 0) * MSG_MORE) | MSG_NOSIGNAL | MSG_DONTWAIT);
#else
// use TCP_NOPUSH
ssize_t rc = send(fd, buf, length, MSG_NOSIGNAL | MSG_DONTWAIT);
#endif

return send(fd, buf, length, MSG_NOSIGNAL | MSG_DONTWAIT);
if (UNLIKELY(IS_EINTR(rc))) {
continue;
}

#endif
return rc;
}
}

int bsd_would_block() {
Expand All @@ -483,6 +518,23 @@ int bsd_would_block() {
#endif
}

static int us_internal_bind_and_listen(LIBUS_SOCKET_DESCRIPTOR listenFd, struct sockaddr *listenAddr, socklen_t listenAddrLength, int backlog) {
int result;
do
result = bind(listenFd, listenAddr, listenAddrLength);
while (IS_EINTR(result));

if (result == -1) {
return -1;
}

do
result = listen(listenFd, backlog);
while (IS_EINTR(result));

return 0;
}

inline __attribute__((always_inline)) LIBUS_SOCKET_DESCRIPTOR bsd_bind_listen_fd(
LIBUS_SOCKET_DESCRIPTOR listenFd,
struct addrinfo *listenAddr,
Expand Down Expand Up @@ -512,7 +564,7 @@ inline __attribute__((always_inline)) LIBUS_SOCKET_DESCRIPTOR bsd_bind_listen_fd
setsockopt(listenFd, IPPROTO_IPV6, IPV6_V6ONLY, (void *) &disabled, sizeof(disabled));
#endif

if (bind(listenFd, listenAddr->ai_addr, (socklen_t) listenAddr->ai_addrlen) || listen(listenFd, 512)) {
if (us_internal_bind_and_listen(listenFd, listenAddr->ai_addr, (socklen_t) listenAddr->ai_addrlen, 512)) {
return LIBUS_SOCKET_ERROR;
}

Expand Down Expand Up @@ -690,7 +742,7 @@ static LIBUS_SOCKET_DESCRIPTOR internal_bsd_create_listen_socket_unix(const char
unlink(path);
#endif

if (bind(listenFd, (struct sockaddr *)server_address, addrlen) || listen(listenFd, 512)) {
if (us_internal_bind_and_listen(listenFd, (struct sockaddr *) server_address, (socklen_t) addrlen, 512)) {
#if defined(_WIN32)
int shouldSimulateENOENT = WSAGetLastError() == WSAENETDOWN;
#endif
Expand Down Expand Up @@ -925,7 +977,7 @@ static int bsd_do_connect_raw(LIBUS_SOCKET_DESCRIPTOR fd, struct sockaddr *addr,
do {
errno = 0;
r = connect(fd, (struct sockaddr *)addr, namelen);
} while (r == -1 && errno == EINTR);
} while (IS_EINTR(r));

// connect() can return -1 with an errno of 0.
// the errno is the correct one in that case.
Expand Down
6 changes: 3 additions & 3 deletions packages/bun-usockets/src/internal/networking/bsd.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ int bsd_addr_get_port(struct bsd_addr_t *addr);
// called by dispatch_ready_poll
LIBUS_SOCKET_DESCRIPTOR bsd_accept_socket(LIBUS_SOCKET_DESCRIPTOR fd, struct bsd_addr_t *addr);

int bsd_recv(LIBUS_SOCKET_DESCRIPTOR fd, void *buf, int length, int flags);
int bsd_send(LIBUS_SOCKET_DESCRIPTOR fd, const char *buf, int length, int msg_more);
int bsd_write2(LIBUS_SOCKET_DESCRIPTOR fd, const char *header, int header_length, const char *payload, int payload_length);
ssize_t bsd_recv(LIBUS_SOCKET_DESCRIPTOR fd, void *buf, int length, int flags);
ssize_t bsd_send(LIBUS_SOCKET_DESCRIPTOR fd, const char *buf, int length, int msg_more);
ssize_t bsd_write2(LIBUS_SOCKET_DESCRIPTOR fd, const char *header, int header_length, const char *payload, int payload_length);
int bsd_would_block();

// return LIBUS_SOCKET_ERROR or the fd that represents listen socket
Expand Down
Loading