Skip to content

Commit

Permalink
refactor wait thread logic
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Nov 8, 2024
1 parent 08e77f4 commit e9fc6a1
Showing 1 changed file with 61 additions and 48 deletions.
109 changes: 61 additions & 48 deletions src/thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,63 @@ static void rnng_wait_thread_single(void *args) {

}

void single_wait_thread_create(SEXP x) {

nano_aio *aiop = (nano_aio *) NANO_PTR(x);
nano_thread_aio *taio = R_Calloc(1, nano_thread_aio);
nano_cv *ncv = R_Calloc(1, nano_cv);
taio->aio = aiop->aio;
taio->cv = ncv;
nng_mtx *mtx;
nng_cv *cv;
int xc, signalled;

if ((xc = nng_mtx_alloc(&mtx)))
goto exitlevel1;

if ((xc = nng_cv_alloc(&cv, mtx)))
goto exitlevel2;

ncv->mtx = mtx;
ncv->cv = cv;

if ((xc = nng_thread_create(&taio->thr, rnng_wait_thread_single, taio)))
goto exitlevel3;

SEXP xptr;
PROTECT(xptr = R_MakeExternalPtr(taio, R_NilValue, R_NilValue));
R_RegisterCFinalizerEx(xptr, thread_aio_finalizer, TRUE);
R_MakeWeakRef(x, xptr, R_NilValue, TRUE);
UNPROTECT(1);

nng_time time = nng_clock();

while (1) {
time = time + 400;
signalled = 1;
nng_mtx_lock(mtx);
while (ncv->condition == 0) {
if (nng_cv_until(cv, time) == NNG_ETIMEDOUT) {
signalled = 0;
break;
}
}
nng_mtx_unlock(mtx);
if (signalled) break;
R_CheckUserInterrupt();
}

return;

exitlevel3:
nng_cv_free(cv);
exitlevel2:
nng_mtx_free(mtx);
exitlevel1:
ERROR_OUT(xc);

}

SEXP rnng_wait_thread_create(SEXP x) {

const SEXPTYPE typ = TYPEOF(x);
Expand Down Expand Up @@ -335,60 +392,16 @@ SEXP rnng_wait_thread_create(SEXP x) {
nano_shared_aio = aiop->aio;
nano_wait_condition = 1;
nng_cv_wake(nano_wait_cv);
} else if (nano_shared_aio != aiop->aio) {
thread_required = 1;
} else {
thread_required = nano_shared_aio != aiop->aio;
}
nng_mtx_unlock(nano_wait_mtx);

if (thread_required) {

PROTECT(coreaio);
nano_thread_aio *taio = R_Calloc(1, nano_thread_aio);
nano_cv *ncv = R_Calloc(1, nano_cv);
taio->aio = aiop->aio;
taio->cv = ncv;
nng_mtx *mtx;
nng_cv *cv;

if ((xc = nng_mtx_alloc(&mtx)))
ERROR_OUT(xc);

if ((xc = nng_cv_alloc(&cv, mtx))) {
nng_mtx_free(mtx);
ERROR_OUT(xc);
}

ncv->mtx = mtx;
ncv->cv = cv;

if ((xc = nng_thread_create(&taio->thr, rnng_wait_thread_single, taio))) {
nng_cv_free(cv);
nng_mtx_free(mtx);
ERROR_OUT(xc);
}

SEXP xptr;
PROTECT(xptr = R_MakeExternalPtr(taio, R_NilValue, R_NilValue));
R_RegisterCFinalizerEx(xptr, thread_aio_finalizer, TRUE);
R_MakeWeakRef(coreaio, xptr, R_NilValue, TRUE);
UNPROTECT(2);

nng_time time = nng_clock();

while (1) {
time = time + 400;
signalled = 1;
nng_mtx_lock(mtx);
while (ncv->condition == 0) {
if (nng_cv_until(cv, time) == NNG_ETIMEDOUT) {
signalled = 0;
break;
}
}
nng_mtx_unlock(mtx);
if (signalled) break;
R_CheckUserInterrupt();
}
single_wait_thread_create(coreaio);
UNPROTECT(1);

} else {

Expand Down

0 comments on commit e9fc6a1

Please sign in to comment.