Skip to content

Commit

Permalink
further simplify wait thread
Browse files Browse the repository at this point in the history
(cherry picked from commit 5d06805)
  • Loading branch information
shikokuchuo committed Nov 13, 2024
1 parent 9923f82 commit 8822b36
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 38 deletions.
1 change: 0 additions & 1 deletion src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ void (*eln2)(void (*)(void *), void *, double, int) = NULL;

uint8_t special_bit = 0;

extern int nano_wait_thread_created;
extern nng_thread *nano_wait_thr;
extern nng_aio *nano_shared_aio;
extern nng_mtx *nano_wait_mtx;
Expand Down
62 changes: 25 additions & 37 deletions src/thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@

// threads callable and messenger ----------------------------------------------

int nano_wait_thread_created = 0;
nng_thread *nano_wait_thr;
nng_aio *nano_shared_aio;

nng_mtx *nano_wait_mtx;
nng_cv *nano_wait_cv;
nng_thread *nano_wait_thr = NULL;
nng_aio *nano_shared_aio = NULL;
nng_mtx *nano_wait_mtx = NULL;
nng_cv *nano_wait_cv = NULL;
int nano_wait_condition = 0;

// # nocov start
Expand Down Expand Up @@ -144,18 +142,18 @@ SEXP rnng_messenger(SEXP url) {
SEXP socket, con;

if ((xc = nng_pair0_open(sock)))
goto exitlevel1;
goto fail;
lp = R_Calloc(1, nng_listener);
if ((xc = nng_listen(*sock, up, lp, 0))) {
if (xc != 10 && xc != 15) {
R_Free(lp);
goto exitlevel1;
goto fail;
}
R_Free(lp);
dp = R_Calloc(1, nng_dialer);
if ((xc = nng_dial(*sock, up, dp, 0))) {
R_Free(dp);
goto exitlevel1;
goto fail;
}
dialer = 1;
}
Expand All @@ -174,7 +172,7 @@ SEXP rnng_messenger(SEXP url) {
UNPROTECT(2);
return socket;

exitlevel1:
fail:
R_Free(sock);
ERROR_OUT(xc);

Expand Down Expand Up @@ -309,21 +307,21 @@ void single_wait_thread_create(SEXP x) {
nano_cv *ncv = R_Calloc(1, nano_cv);
taio->aio = aiop->aio;
taio->cv = ncv;
nng_mtx *mtx;
nng_cv *cv;
nng_mtx *mtx = NULL;
nng_cv *cv = NULL;
int xc, signalled;

if ((xc = nng_mtx_alloc(&mtx)))
goto exitlevel1;
goto fail;

if ((xc = nng_cv_alloc(&cv, mtx)))
goto exitlevel2;
goto fail;

ncv->mtx = mtx;
ncv->cv = cv;

if ((xc = nng_thread_create(&taio->thr, rnng_wait_thread_single, taio)))
goto exitlevel3;
goto fail;

SEXP xptr;
PROTECT(xptr = R_MakeExternalPtr(taio, R_NilValue, R_NilValue));
Expand All @@ -350,11 +348,9 @@ void single_wait_thread_create(SEXP x) {

return;

exitlevel3:
nng_cv_free(cv);
exitlevel2:
nng_mtx_free(mtx);
exitlevel1:
fail:
if (cv) nng_cv_free(cv);
if (mtx) nng_mtx_free(mtx);
ERROR_OUT(xc);

}
Expand All @@ -372,17 +368,11 @@ SEXP rnng_wait_thread_create(SEXP x) {

int xc, signalled;

if (!nano_wait_thread_created) {
if ((xc = nng_mtx_alloc(&nano_wait_mtx)))
goto exitlevel1;

if ((xc = nng_cv_alloc(&nano_wait_cv, nano_wait_mtx)))
goto exitlevel2;

if ((xc = nng_thread_create(&nano_wait_thr, rnng_wait_thread, NULL)))
goto exitlevel3;

nano_wait_thread_created = 1;
if (!nano_wait_thr) {
if ((xc = nng_mtx_alloc(&nano_wait_mtx)) ||
(xc = nng_cv_alloc(&nano_wait_cv, nano_wait_mtx)) ||
(xc = nng_thread_create(&nano_wait_thr, rnng_wait_thread, NULL)))
goto fail;
}

int thread_required = 0;
Expand Down Expand Up @@ -443,11 +433,9 @@ SEXP rnng_wait_thread_create(SEXP x) {

return x;

exitlevel3:
nng_cv_free(nano_wait_cv);
exitlevel2:
nng_mtx_free(nano_wait_mtx);
exitlevel1:
fail:
if (nano_wait_cv) nng_cv_free(nano_wait_cv);
if (nano_wait_mtx) nng_mtx_free(nano_wait_mtx);
ERROR_OUT(xc);

} else if (typ == VECSXP) {
Expand All @@ -464,7 +452,7 @@ SEXP rnng_wait_thread_create(SEXP x) {
}

SEXP rnng_thread_shutdown(void) {
if (nano_wait_thread_created) {
if (nano_wait_thr) {
if (nano_shared_aio != NULL)
nng_aio_stop(nano_shared_aio);
nng_mtx_lock(nano_wait_mtx);
Expand Down

0 comments on commit 8822b36

Please sign in to comment.