Skip to content

Commit

Permalink
Separate acceptor loop for picov
Browse files Browse the repository at this point in the history
Enable nodejs build
[docker verify]
  • Loading branch information
sumeetchhetri committed Jul 14, 2024
1 parent 2877104 commit 9f84ba6
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 40 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci-nodejs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ concurrency:
jobs:
build:
runs-on: ubuntu-latest
if: "contains(github.event.head_commit.message, '[build_only_nodejs]')"
if: "!contains(github.event.head_commit.message, '[skip_build]')"
steps:
- uses: actions/checkout@v4
- name: Build ffead-cpp with nodejs backend
Expand Down
8 changes: 3 additions & 5 deletions lang-server-backends/v/pico.v/main.v
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ pub mut:
}

struct C.ffead_request3_t {}

/*
const ffead_request3 *request, int* scode, const char** smsg, size_t *smsg_len,
const char **out_mime, size_t *out_mime_len, const char **out_url, size_t *out_url_len,
Expand All @@ -70,7 +69,6 @@ fn C.ffead_cpp_handle_picov_2_init_sock(int, voidptr) voidptr
fn C.ffead_cpp_handle_picov_2_deinit_sock(int, voidptr)
fn C.ffead_cpp_handle_picov_ext_fd_cb(int, voidptr)
fn C.ffead_cpp_handle_picov_clean_sockets()

fn C.ffead_cpp_resp_cleanup(voidptr)

fn cpy_str_1(dst byteptr, src string) byteptr {
Expand Down Expand Up @@ -295,9 +293,9 @@ fn main() {
println('Bootstrapping ffead-cpp end...')

if is_async {
mut pv = picoev.new(server_port, &callback_async, &open_cb_async, &close_cb_async, &C.ffead_cpp_handle_picov_ext_fd_cb, true)
mut pv = picoev.new(server_port, &callback_async, &open_cb_async, &close_cb_async, &C.ffead_cpp_handle_picov_ext_fd_cb, true, true)
} else {
mut pv = picoev.new(server_port, &callback, &open_cb_async, &close_cb_async, &C.ffead_cpp_handle_picov_ext_fd_cb, false)
mut pv = picoev.new(server_port, &callback, &open_cb_async, &close_cb_async, &C.ffead_cpp_handle_picov_ext_fd_cb, false, true)
}

println('Initializing ffead-cpp start...')
Expand All @@ -319,7 +317,7 @@ fn main() {

println('Initializing ffead-cpp end...')

pv.listen(server_port, is_async)
pv.listen(server_port)
println('Listening on port $server_port ...')
pv.serve()

Expand Down
91 changes: 57 additions & 34 deletions lang-server-backends/v/pico.v/picoev.v
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,15 @@ struct C.picoev_loop {}

struct Picoev {
loop &C.picoev_loop
is_async bool
is_cloop bool
cb fn(req picohttpparser.Request, mut res picohttpparser.Response)
cb1 fn(req picohttpparser.Request, fd int, pv voidptr)
open_cb fn(fd int) voidptr
close_cb fn(fd int, fd_data voidptr)
cb_ext_fd_cb fn(int, voidptr)
pub mut:
cloop &C.picoev_loop
date byteptr
buf byteptr
idx [2048]int
Expand Down Expand Up @@ -213,6 +216,15 @@ fn accept_callback(loop &C.picoev_loop, fd, events int, cb_arg voidptr) {
}
}

fn accept_callback_cloop(loop &C.picoev_loop, fd, events int, cb_arg voidptr) {
newfd := C.accept(fd, 0, 0)
if newfd != -1 {
setup_sock(newfd)
mut p := &Picoev(cb_arg)
C.picoev_add(p.cloop, newfd, C.PICOEV_READ, timeout_secs, rw_callback, cb_arg)
}
}

pub fn external_fd_rw_callback(loop &C.picoev_loop, fd, events int, cb_arg voidptr) {
if (events & C.PICOEV_READ) != 0 {
pv.cb_ext_fd_cb(fd, cb_arg)
Expand Down Expand Up @@ -302,8 +314,18 @@ fn accept_callback_async(loop &C.picoev_loop, fd, events int, cb_arg voidptr) {
}
}

fn accept_callback_async_cloop(loop &C.picoev_loop, fd, events int, cb_arg voidptr) {
newfd := C.accept(fd, 0, 0)
if newfd != -1 {
setup_sock(newfd)
mut p := &Picoev(cb_arg)
p.data[newfd] = p.open_cb(newfd)
C.picoev_add(p.cloop, newfd, C.PICOEV_READ, timeout_secs, rw_callback_async, cb_arg)
}
}

__global pv Picoev
pub fn (pv Picoev) listen(port int, is_async bool) {
pub fn (pv Picoev) listen(port int) {
fd := C.socket(C.AF_INET, C.SOCK_STREAM, 0)
assert fd != -1

Expand Down Expand Up @@ -331,74 +353,75 @@ pub fn (pv Picoev) listen(port int, is_async bool) {

setup_sock(fd)

if !is_async {
C.picoev_add(pv.loop, fd, C.PICOEV_READ, 0, accept_callback, &pv)
if !pv.is_async {
if !pv.is_cloop {
C.picoev_add(pv.loop, fd, C.PICOEV_READ, 0, accept_callback, &pv)
} else {
C.picoev_add(pv.loop, fd, C.PICOEV_READ, 0, accept_callback_cloop, &pv)
}
} else {
C.picoev_add(pv.loop, fd, C.PICOEV_READ, 0, accept_callback_async, &pv)
if !pv.is_cloop {
C.picoev_add(pv.loop, fd, C.PICOEV_READ, 0, accept_callback_async, &pv)
} else {
C.picoev_add(pv.loop, fd, C.PICOEV_READ, 0, accept_callback_async_cloop, &pv)
}
}
}
pub fn new(port int, cb voidptr, open_cb voidptr, close_cb voidptr, cb_ext_fd_cb voidptr, is_async bool) &Picoev {
/*fd := C.socket(C.AF_INET, C.SOCK_STREAM, 0)
assert fd != -1
flag := 1
assert C.setsockopt(fd, C.SOL_SOCKET, C.SO_REUSEADDR, &flag, sizeof(int)) == 0
assert C.setsockopt(fd, C.SOL_SOCKET, C.SO_REUSEPORT, &flag, sizeof(int)) == 0
$if linux {
assert C.setsockopt(fd, C.IPPROTO_TCP, C.TCP_QUICKACK, &flag, sizeof(int)) == 0
timeout := 10
assert C.setsockopt(fd, C.IPPROTO_TCP, C.TCP_DEFER_ACCEPT, &timeout, sizeof(int)) == 0
queue_len := 4096
assert C.setsockopt(fd, C.IPPROTO_TCP, C.TCP_FASTOPEN, &queue_len, sizeof(int)) == 0
}
mut addr := C.sockaddr_in{}
addr.sin_family = C.AF_INET
addr.sin_port = C.htons(port)
addr.sin_addr.s_addr = C.htonl(C.INADDR_ANY)
size := 16 // sizeof(C.sockaddr_in)
bind_res := C.bind(fd, &addr, size)
assert bind_res == 0
listen_res := C.listen(fd, C.SOMAXCONN)
assert listen_res == 0
setup_sock(fd)*/

pub fn new(port int, cb voidptr, open_cb voidptr, close_cb voidptr, cb_ext_fd_cb voidptr, is_async bool, is_cloop bool) &Picoev {
C.picoev_init(max_fds)
loop := C.picoev_create_loop(max_timeout)
if !is_async {
mut pv := &Picoev{
loop: loop
cloop: 0
is_async: is_async
is_cloop: is_cloop
cb: cb
date: C.get_date()
buf: malloc(max_fds * max_read + 1)
out: malloc(max_fds * max_write + 1)
}
//C.picoev_add(loop, fd, C.PICOEV_READ, 0, accept_callback, pv)
if is_cloop {
pv.cloop = C.picoev_create_loop(max_timeout)
}
go update_date(mut pv)
return pv
} else {
mut pv := &Picoev{
loop: loop
cloop: 0
cb1: cb
is_async: is_async
is_cloop: is_cloop
open_cb: open_cb
close_cb: close_cb
cb_ext_fd_cb: cb_ext_fd_cb
buf: malloc(max_fds * max_read + 1)
out: malloc(max_fds * max_write + 1)
}
//C.picoev_add(loop, fd, C.PICOEV_READ, 0, accept_callback_async, pv)
if is_cloop {
pv.cloop = C.picoev_create_loop(max_timeout)
}
return pv
}
}

pub fn (p Picoev) serve_c() {
for {
C.picoev_loop_once(p.cloop, 1)
}
}

pub fn (p Picoev) serve() {
if p.is_cloop {
go p.serve_c()
}
for {
C.picoev_loop_once(p.loop, 1)
}
}


fn update_date(mut p Picoev) {
for {
p.date = C.get_date()
Expand Down

0 comments on commit 9f84ba6

Please sign in to comment.