From 891b4429d71ef9ee8ceaa5b4fab1d9cd3423759d Mon Sep 17 00:00:00 2001 From: taymindis Date: Sat, 1 Sep 2018 16:54:20 +0800 Subject: [PATCH] update sample and memory fence --- README.md | 5 ++++- example.c | 45 +++++++++++++++++++++++++-------------------- lfqueue.c | 5 ++--- 3 files changed, 31 insertions(+), 24 deletions(-) diff --git a/README.md b/README.md index 652afa7..1b353b7 100644 --- a/README.md +++ b/README.md @@ -4,11 +4,14 @@ lock-free FIFO queue by C native built it, easy built cross platform(no extra de ## API ```c -int lfqueue_init(lfqueue_t *lfqueue, int num_concurrent_thread); + +/** if Expandable is true, it double up the queue size **/ +int lfqueue_init(lfqueue_t *lfqueue, size_t queue_size, int expandable); int lfqueue_enq(lfqueue_t *lfqueue, void *value); void *lfqueue_deq(lfqueue_t *lfqueue); void lfqueue_destroy(lfqueue_t *lfqueue); size_t lfqueue_size(lfqueue_t *lfqueue); + ``` diff --git a/example.c b/example.c index 5a04909..0660c64 100644 --- a/example.c +++ b/example.c @@ -11,18 +11,21 @@ struct timeval tv1, tv2; lfqueue_t myq; -#define total_put 5 -int nthreads = 4; //sysconf(_SC_NPROCESSORS_ONLN); // Linux +#define total_put 5000 +int nthreads = 16; //sysconf(_SC_NPROCESSORS_ONLN); // Linux +int one_enq = 1; +int one_deq = 1; /** Worker Keep Consuming at the same time, do not try instensively **/ -void* worker_c(void *); -void* worker_c(void *arg) { +void* worker_deq(void *); +void* worker_deq(void *arg) { int i = 0; int *int_data; - while (i++ < total_put * nthreads) { + int threads = *(int*)arg; + while (i++ < total_put * threads) { /*Dequeue*/ while ((int_data = lfqueue_deq(&myq)) == NULL) { - usleep(1); + // usleep(1); // __sync_synchronize(); } // printf("%d\n", *int_data); @@ -32,12 +35,13 @@ void* worker_c(void *arg) { } /** Worker Keep Sending at the same time, do not try instensively **/ -void* worker_s(void *); -void* worker_s(void *arg) +void* worker_enq(void *); +void* worker_enq(void *arg) { int i = 0; int *int_data; - while (i < total_put) { + int threads = *(int*)arg; + while (i < total_put * threads) { int_data = (int*)malloc(sizeof(int)); assert(int_data != NULL); @@ -53,8 +57,8 @@ void* worker_s(void *arg) } /** Worker Send And Consume at the same time **/ -void* worker_sc(void *); -void* worker_sc(void *arg) +void* worker_enq_deq(void *); +void* worker_enq_deq(void *arg) { int i = 0; int *int_data; @@ -94,7 +98,7 @@ printf("current size= %zu\n", lfqueue_size(&myq) );\ void multi_enq_deq(pthread_t *threads) { int i; for (i = 0; i < nthreads; i++) { - pthread_create(threads + i, NULL, worker_sc, NULL); + pthread_create(threads + i, NULL, worker_enq_deq, NULL); } join_threads; @@ -104,9 +108,9 @@ void multi_enq_deq(pthread_t *threads) { void one_deq_and_multi_enq(pthread_t *threads) { int i; for (i = 0; i < nthreads; i++) - pthread_create(threads + i, NULL, worker_s, NULL); + pthread_create(threads + i, NULL, worker_enq, &nthreads); - worker_c(NULL); + worker_deq(&one_deq); join_threads; // detach_thread_and_loop; @@ -115,9 +119,9 @@ void one_deq_and_multi_enq(pthread_t *threads) { void one_enq_and_multi_deq(pthread_t *threads) { int i; for (i = 0; i < nthreads; i++) - pthread_create(threads + i, NULL, worker_c, NULL); + pthread_create(threads + i, NULL, worker_deq, &nthreads); - worker_s(NULL); + worker_enq(&one_enq); //join_threads; detach_thread_and_loop; @@ -129,7 +133,7 @@ int main(void) int n; for (n = 0; n < total_run; n++) { printf("running count = %d\n", n); - lfqueue_init(&myq, 16, 1); + lfqueue_init(&myq, total_put, 1); /* Spawn threads. */ pthread_t threads[nthreads]; @@ -137,9 +141,9 @@ int main(void) printf("Total requests %d \n", total_put); gettimeofday(&tv1, NULL); - one_enq_and_multi_deq(threads); - //one_deq_and_multi_enq(threads); - // multi_enq_deq(threads); + // one_deq_and_multi_enq(threads); + multi_enq_deq(threads); + // one_enq_and_multi_deq(threads); gettimeofday(&tv2, NULL); printf ("Total time = %f seconds\n", @@ -150,6 +154,7 @@ int main(void) assert ( 0 == lfqueue_size(&myq) && "Error, all queue should be consumed but not"); lfqueue_destroy(&myq); + // sleep(1); } return 0; } diff --git a/lfqueue.c b/lfqueue.c index d7ef4e3..1cea91c 100644 --- a/lfqueue.c +++ b/lfqueue.c @@ -170,7 +170,7 @@ lfqueue_destroy(lfqueue_t *lfqueue) { int lfqueue_enq(lfqueue_t *lfqueue, void *value) { - __LFQ_SYNC_MEMORY(); + __LFQ_LOAD_FENCE(); if (lfqueue->size >= lfqueue->capacity) { // Rest the thread for other enqueue return -1; @@ -186,14 +186,13 @@ lfqueue_enq(lfqueue_t *lfqueue, void *value) { void* lfqueue_deq(lfqueue_t *lfqueue) { void *v; - __LFQ_SYNC_MEMORY(); + __LFQ_LOAD_FENCE(); if (lfqueue->size && (v = dequeue_(lfqueue)) ) { __LFQ_FETCH_AND_ADD(&lfqueue->size, -1); return v; } - __LFQ_YIELD_THREAD(); return NULL; }