Skip to content

Commit

Permalink
update sample and memory fence
Browse files Browse the repository at this point in the history
  • Loading branch information
Taymindis committed Sep 1, 2018
1 parent e910ade commit 891b442
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 24 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ lock-free FIFO queue by C native built it, easy built cross platform(no extra de

## API
```c
int lfqueue_init(lfqueue_t *lfqueue, int num_concurrent_thread);

/** if Expandable is true, it double up the queue size **/
int lfqueue_init(lfqueue_t *lfqueue, size_t queue_size, int expandable);
int lfqueue_enq(lfqueue_t *lfqueue, void *value);
void *lfqueue_deq(lfqueue_t *lfqueue);
void lfqueue_destroy(lfqueue_t *lfqueue);
size_t lfqueue_size(lfqueue_t *lfqueue);

```
Expand Down
45 changes: 25 additions & 20 deletions example.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,21 @@
struct timeval tv1, tv2;
lfqueue_t myq;

#define total_put 5
int nthreads = 4; //sysconf(_SC_NPROCESSORS_ONLN); // Linux
#define total_put 5000
int nthreads = 16; //sysconf(_SC_NPROCESSORS_ONLN); // Linux
int one_enq = 1;
int one_deq = 1;
/** Worker Keep Consuming at the same time, do not try instensively **/
void* worker_c(void *);
void* worker_c(void *arg) {
void* worker_deq(void *);
void* worker_deq(void *arg) {
int i = 0;
int *int_data;
while (i++ < total_put * nthreads) {
int threads = *(int*)arg;
while (i++ < total_put * threads) {

/*Dequeue*/
while ((int_data = lfqueue_deq(&myq)) == NULL) {
usleep(1);
// usleep(1);
// __sync_synchronize();
}
// printf("%d\n", *int_data);
Expand All @@ -32,12 +35,13 @@ void* worker_c(void *arg) {
}

/** Worker Keep Sending at the same time, do not try instensively **/
void* worker_s(void *);
void* worker_s(void *arg)
void* worker_enq(void *);
void* worker_enq(void *arg)
{
int i = 0;
int *int_data;
while (i < total_put) {
int threads = *(int*)arg;
while (i < total_put * threads) {

int_data = (int*)malloc(sizeof(int));
assert(int_data != NULL);
Expand All @@ -53,8 +57,8 @@ void* worker_s(void *arg)
}

/** Worker Send And Consume at the same time **/
void* worker_sc(void *);
void* worker_sc(void *arg)
void* worker_enq_deq(void *);
void* worker_enq_deq(void *arg)
{
int i = 0;
int *int_data;
Expand Down Expand Up @@ -94,7 +98,7 @@ printf("current size= %zu\n", lfqueue_size(&myq) );\
void multi_enq_deq(pthread_t *threads) {
int i;
for (i = 0; i < nthreads; i++) {
pthread_create(threads + i, NULL, worker_sc, NULL);
pthread_create(threads + i, NULL, worker_enq_deq, NULL);
}

join_threads;
Expand All @@ -104,9 +108,9 @@ void multi_enq_deq(pthread_t *threads) {
void one_deq_and_multi_enq(pthread_t *threads) {
int i;
for (i = 0; i < nthreads; i++)
pthread_create(threads + i, NULL, worker_s, NULL);
pthread_create(threads + i, NULL, worker_enq, &nthreads);

worker_c(NULL);
worker_deq(&one_deq);

join_threads;
// detach_thread_and_loop;
Expand All @@ -115,9 +119,9 @@ void one_deq_and_multi_enq(pthread_t *threads) {
void one_enq_and_multi_deq(pthread_t *threads) {
int i;
for (i = 0; i < nthreads; i++)
pthread_create(threads + i, NULL, worker_c, NULL);
pthread_create(threads + i, NULL, worker_deq, &nthreads);

worker_s(NULL);
worker_enq(&one_enq);

//join_threads;
detach_thread_and_loop;
Expand All @@ -129,17 +133,17 @@ int main(void)
int n;
for (n = 0; n < total_run; n++) {
printf("running count = %d\n", n);
lfqueue_init(&myq, 16, 1);
lfqueue_init(&myq, total_put, 1);

/* Spawn threads. */
pthread_t threads[nthreads];
printf("Using %d thread%s.\n", nthreads, nthreads == 1 ? "" : "s");
printf("Total requests %d \n", total_put);
gettimeofday(&tv1, NULL);

one_enq_and_multi_deq(threads);
//one_deq_and_multi_enq(threads);
// multi_enq_deq(threads);
// one_deq_and_multi_enq(threads);
multi_enq_deq(threads);
// one_enq_and_multi_deq(threads);

gettimeofday(&tv2, NULL);
printf ("Total time = %f seconds\n",
Expand All @@ -150,6 +154,7 @@ int main(void)
assert ( 0 == lfqueue_size(&myq) && "Error, all queue should be consumed but not");

lfqueue_destroy(&myq);
// sleep(1);
}
return 0;
}
5 changes: 2 additions & 3 deletions lfqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ lfqueue_destroy(lfqueue_t *lfqueue) {

int
lfqueue_enq(lfqueue_t *lfqueue, void *value) {
__LFQ_SYNC_MEMORY();
__LFQ_LOAD_FENCE();
if (lfqueue->size >= lfqueue->capacity) {
// Rest the thread for other enqueue
return -1;
Expand All @@ -186,14 +186,13 @@ lfqueue_enq(lfqueue_t *lfqueue, void *value) {
void*
lfqueue_deq(lfqueue_t *lfqueue) {
void *v;
__LFQ_SYNC_MEMORY();
__LFQ_LOAD_FENCE();
if (lfqueue->size
&& (v = dequeue_(lfqueue))
) {
__LFQ_FETCH_AND_ADD(&lfqueue->size, -1);
return v;
}
__LFQ_YIELD_THREAD();
return NULL;
}

Expand Down

0 comments on commit 891b442

Please sign in to comment.