Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lock free audio #4230

Closed
wants to merge 40 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
c011722
vs2022 compatible project files
LeonidGoltsblat Oct 20, 2024
81996e6
stack implementation and tests
LeonidGoltsblat Oct 20, 2024
b2ab472
code style fix
LeonidGoltsblat Oct 20, 2024
79062c1
fix to Added more logs (#4098)
LeonidGoltsblat Oct 27, 2024
267e1e0
remove Debug-Dynamic-Client and Release-Dynamic-Client configurations
LeonidGoltsblat Oct 25, 2024
79f47fb
use traditional select ioqueue instead of iocp
LeonidGoltsblat Oct 27, 2024
95f49c1
re-enabled lost stack test
LeonidGoltsblat Oct 27, 2024
97ba6f3
Merge branch 'stack' into pj_stack
LeonidGoltsblat Oct 27, 2024
24c959f
untabify
LeonidGoltsblat Oct 27, 2024
48f47a2
makefiles fixes
LeonidGoltsblat Oct 28, 2024
dd3bdc3
VS projects and solution files changes
LeonidGoltsblat Oct 28, 2024
52e0086
define appropriate PJ_POOL_ALIGNMENT for Windows platform
LeonidGoltsblat Oct 29, 2024
6a331a1
make files fix
LeonidGoltsblat Oct 29, 2024
5fec115
stack implementation files added to aconfugure.*
LeonidGoltsblat Oct 29, 2024
547b72d
merge conflicts resolved
LeonidGoltsblat Nov 24, 2024
61e69ef
merge conflicts resolved
LeonidGoltsblat Nov 24, 2024
722022d
Merge branch 'master' into pj_stack
LeonidGoltsblat Nov 24, 2024
44dd759
added some comments about data alignment and macros to control alignment
LeonidGoltsblat Nov 24, 2024
0588d57
Merge remote-tracking branch 'myfork/pj_stack' into pj_stack
LeonidGoltsblat Nov 24, 2024
f0eb2c4
stack_stress_test() added
LeonidGoltsblat Nov 26, 2024
246a20a
fix pj_stack multithreading for non-windows platforms
LeonidGoltsblat Nov 28, 2024
5db76f6
stack tests
LeonidGoltsblat Nov 29, 2024
1eea899
Merge branch 'pjsip:master' into pj_stack
LeonidGoltsblat Nov 29, 2024
37f4a8a
stack tests optimized
LeonidGoltsblat Nov 29, 2024
d7323fc
Merge branch 'pjsip:master' into pj_stack
LeonidGoltsblat Dec 3, 2024
abbad93
simplified pj_stack impl
LeonidGoltsblat Dec 8, 2024
3b0e0a4
some irreleveant changes in the patch removed
LeonidGoltsblat Dec 10, 2024
23610d6
pjlib\build\pjlib.vcxproj.filters
LeonidGoltsblat Dec 11, 2024
6fefbf5
copyright info added
LeonidGoltsblat Dec 11, 2024
6c83473
copyright 2
LeonidGoltsblat Dec 11, 2024
88e02f3
line ending
LeonidGoltsblat Dec 11, 2024
445f8a1
undone ..
LeonidGoltsblat Dec 11, 2024
9f38c20
line ending
LeonidGoltsblat Dec 12, 2024
40f57cc
rewind some unrelated changes
LeonidGoltsblat Dec 12, 2024
934ba4a
line ending
LeonidGoltsblat Dec 12, 2024
bb1307d
unrelated...
LeonidGoltsblat Dec 12, 2024
9841571
little formal changes
LeonidGoltsblat Dec 13, 2024
a102249
fix
LeonidGoltsblat Dec 13, 2024
a424e39
Merge remote-tracking branch 'github.com/master' into pj_stack
LeonidGoltsblat Dec 18, 2024
fcaa536
lock free
LeonidGoltsblat Dec 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
stack_stress_test() added
  • Loading branch information
LeonidGoltsblat committed Nov 26, 2024
commit f0eb2c4ae3fd43a8aa313d208e5abadc08b66f0d
366 changes: 366 additions & 0 deletions pjlib/src/pjlib-test/stack.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,112 @@

#endif // PJ_WIN32

#define THIS_FILE "stack.c"
#define MAX_THREADS 16
#define MAX_RESERVED 16
#define MAX_SLOTS 100

#define TRACE(log) PJ_LOG(3,log)

typedef struct PJ_SYS_ALIGN_PREFIX stack_node {
PJ_DECL_STACK_MEMBER(struct stack_node);
int value;
} PJ_SYS_ALIGN_SUFFIX stack_node;


/* any useful data;
* here simply pj_thread_t* - thread owned this slot
* alignment of array's items used as stack node under Windows platform should not be less than MEMORY_ALLOCATION_ALIGNMENT
*/
typedef struct PJ_SYS_ALIGN_PREFIX slot_data
{
PJ_DECL_STACK_MEMBER(struct slot_data);

pj_thread_t *owner;
} PJ_SYS_ALIGN_SUFFIX slot_data;

typedef struct slot_data_list
{
PJ_DECL_LIST_MEMBER(struct list_node);

pj_thread_t* owner;
} slot_data_list;

typedef struct stack_test_desc stack_test_desc;
typedef struct stack_test_desc {
struct {
const char *title;
int n_threads; /* number of worker threads */

unsigned repeat; /* number of successfull slot reservation on each concurrent thread*/

int (*test_init)(stack_test_desc *test);
int (*test_destroy)(stack_test_desc *test);
//slot_data *(*reserve_slot)(stack_test_desc* test);
//void (*release_slot)(stack_test_desc* test, slot_data* slot);
int (*worker_thread)(stack_test_desc* test);

} cfg;

struct {
pj_pool_t *pool;
pj_stack_type *empty_slot_stack; /**< Empty slots stack. In current implemetation each stack item store pointer to slot*/
slot_data slots[MAX_SLOTS]; /**< Array of useful information "slots" (file players, for example).*/

pj_list empty_slot_list;
pj_lock_t *list_lock; /* list is not thread safe, lock required*/
slot_data_list slots_list[MAX_SLOTS]; /**< Array of useful information "slots" (file players, for example).*/

int retcode; /* test retcode. non-zero will abort. */
} state;
} stack_test_desc;

static int stack_stress_test_init(stack_test_desc* test);
static int stack_stress_test_destroy(stack_test_desc* test);
static int stack_worker_thread(stack_test_desc* test);

static int list_stress_test_init(stack_test_desc* test);
static int list_stress_test_destroy(stack_test_desc* test);
static int list_worker_thread(stack_test_desc* test);

static int stack_stress_test(stack_test_desc* test);
static int worker_thread(void* p);

static stack_test_desc tests[] = {
{
.cfg.title = "stack (single thread)",
.cfg.n_threads = 0,
.cfg.repeat = 10000,
.cfg.test_init = &stack_stress_test_init,
.cfg.test_destroy = &stack_stress_test_destroy,
.cfg.worker_thread = &stack_worker_thread
},
{
.cfg.title = "list (single thread)",
.cfg.n_threads = 0,
.cfg.repeat = 10000,
.cfg.test_init = &list_stress_test_init,
.cfg.test_destroy = &list_stress_test_destroy,
.cfg.worker_thread = &list_worker_thread
},
{
.cfg.title = "stack (multithread thread)",
.cfg.n_threads = 16,
.cfg.repeat = 10000,
.cfg.test_init = &stack_stress_test_init,
.cfg.test_destroy = &stack_stress_test_destroy,
.cfg.worker_thread = &stack_worker_thread
},
{
.cfg.title = "list (multithread thread)",
.cfg.n_threads = 16,
.cfg.repeat = 10000,
.cfg.test_init = &list_stress_test_init,
.cfg.test_destroy = &list_stress_test_destroy,
.cfg.worker_thread = &list_worker_thread
}
};

int stack_test()
{
pj_stack_type *stack = NULL;
Expand Down Expand Up @@ -139,12 +240,277 @@ int stack_test()
rc = -55;
}

for (i = 0; !rc && i < PJ_ARRAY_SIZE(tests); ++i) {
tests[i].state.pool = pool;
rc = stack_stress_test(&tests[i]);
}

if (pool)
pj_pool_release(pool);

return rc;
}

/*
* This test illustrates:
* 1) a multi-threaded use case for the pj_stack API
* 2) a useful idea: reserving an empty slot in a large array without having to lock the entire array
* 3) pj_stack performance on Windows is 2-3x higher than pj_list with pj_simple_mutex_lock
*/
static int stack_stress_test(stack_test_desc* test) {
unsigned i;
pj_status_t status;
pj_timestamp t1, t2;
int rc;

TRACE((THIS_FILE, "%s", test->cfg.title));
int ident = pj_log_get_indent(); /* worker_thread change ident on this thread */
pj_log_push_indent();

rc = (*test->cfg.test_init)(test);
if (rc)
return rc;

pj_get_timestamp(&t1);

if (test->cfg.n_threads == 0)
worker_thread(test);
else {
unsigned n_threads = test->cfg.n_threads;
pj_thread_t* threads[MAX_THREADS];

for (i = 0; i < n_threads; ++i) {
status = pj_thread_create(test->state.pool, "stack_stress_test",
&worker_thread, test,
0, PJ_THREAD_SUSPENDED,
&threads[i]);
if (status != PJ_SUCCESS) {
PJ_PERROR(1, (THIS_FILE, status, "Unable to create thread"));
return -70;
}
}

for (i = 0; i < n_threads; ++i) {
status = pj_thread_resume(threads[i]);
if (status != PJ_SUCCESS) {
PJ_PERROR(1, (THIS_FILE, status, "Unable to resume thread"));
return -75;
}
}

worker_thread(test);

for (i = 0; i < n_threads; ++i) {
pj_thread_join(threads[i]);
pj_thread_destroy(threads[i]);
}
}

rc = (*test->cfg.test_destroy)(test);
if (rc)
return rc;

pj_get_timestamp(&t2);

TRACE((THIS_FILE, "time: %d ms", pj_elapsed_msec(&t1, &t2)));

pj_log_set_indent(ident); /* restore ident changed by worker_thread() instead of pj_log_pop_indent() */
return test->state.retcode;

}

static int stack_stress_test_init(stack_test_desc* test) {
pj_status_t status;
status = pj_stack_create(test->state.pool, &test->state.empty_slot_stack);
if (status != PJ_SUCCESS) {
PJ_PERROR(1, (THIS_FILE, status, "Unable to create stack"));
return -60;
}
slot_data* p;
for (p = test->state.slots + PJ_ARRAY_SIZE(test->state.slots) - 1; p > test->state.slots - 1; --p) {
if ((status = pj_stack_push(test->state.empty_slot_stack, p)) != PJ_SUCCESS) {
PJ_PERROR(1, (THIS_FILE, status, "Unable to init stack"));
return -65;
}
}
return 0;
}

static int stack_stress_test_destroy(stack_test_desc* test) {
pj_status_t status;
status = pj_stack_destroy(test->state.empty_slot_stack);
if (status != PJ_SUCCESS) {
PJ_PERROR(1, (THIS_FILE, status, "Unable to destroy stack"));
return -80;
}
return 0;
}

static int list_stress_test_init(stack_test_desc* test) {
pj_status_t status;

pj_list_init(&test->state.empty_slot_list);

slot_data_list *p;
for (p = test->state.slots_list + PJ_ARRAY_SIZE(test->state.slots_list) - 1; p > test->state.slots_list - 1; --p) {
pj_list_push_back(&test->state.empty_slot_list, p);
}
status = pj_lock_create_simple_mutex(test->state.pool, "stress_test_list_lock", &test->state.list_lock);
if (status != PJ_SUCCESS) {
PJ_PERROR(1, (THIS_FILE, status, "Unable to create simple mutex lock"));
return -67;
}
return 0;
}

static int list_stress_test_destroy(stack_test_desc* test) {
pj_status_t status;
status = pj_lock_destroy(test->state.list_lock);
if (status != PJ_SUCCESS) {
PJ_PERROR(1, (THIS_FILE, status, "Unable to destroy simple mutex lock"));
return -77;
}
return 0;
}

/* worker thread */
static int worker_thread(void* p) {
stack_test_desc* test = (stack_test_desc*)p;
return (*test->cfg.worker_thread)(test);
}

static int stack_worker_thread(stack_test_desc* test)
{
unsigned n_events = 0;
unsigned reserved_slots[MAX_RESERVED];
unsigned reserved_count = 0;
unsigned slot_id;

pj_bzero(reserved_slots, sizeof(reserved_slots));

/* log indent is not propagated to other threads,
* so we set it explicitly here
*/
pj_log_set_indent(3);

while (test->state.retcode == 0 && n_events < test->cfg.repeat) {
slot_data* slot = pj_stack_pop(test->state.empty_slot_stack);
if (slot != NULL) { /* we have got an empty slot */
if (slot->owner != NULL) {
PJ_LOG(1, (THIS_FILE, "Reserved slot is not empty"));
test->state.retcode = -90;
break;
}
else {
slot->owner = pj_thread_this(); /* slot reserved successfully */
slot_id = slot - test->state.slots;
reserved_slots[reserved_count++] = slot_id;
++n_events;
}
}
if (slot == NULL || /* no empty slots at all or */
reserved_count >= MAX_RESERVED) { /* this thread has reserved the maximum number of slots allowed */
while (reserved_count) { /* clear slots reserved here */
slot_id = reserved_slots[--reserved_count];
slot = &test->state.slots[slot_id];
if (slot->owner != pj_thread_this()) {
PJ_LOG(1, (THIS_FILE, "Anothed thread has reserved this thread's slot"));
test->state.retcode = -85;
}
else if (slot->owner == NULL) {
PJ_LOG(1, (THIS_FILE, "Anothed thread has freed up this thread's slot"));
test->state.retcode = -95;
}
else {
slot->owner = NULL; /* free up slot before returning */
pj_stack_push(test->state.empty_slot_stack, slot); /* slot returned to empty slot's stack */
}
}
}

}

TRACE((THIS_FILE, "thread exiting, n_events=%d", n_events));
return 0;
}

static int list_worker_thread(stack_test_desc* test) {
unsigned n_events = 0;
unsigned reserved_slots[MAX_RESERVED];
unsigned reserved_count = 0;
unsigned slot_id;
pj_status_t status;

pj_bzero(reserved_slots, sizeof(reserved_slots));

/* log indent is not propagated to other threads,
* so we set it explicitly here
*/
pj_log_set_indent(3);

while (test->state.retcode == 0 && n_events < test->cfg.repeat) {
status = pj_lock_acquire(test->state.list_lock);
if (status != PJ_SUCCESS) {
PJ_PERROR(1, (THIS_FILE, status, "Unable to acquire lock"));
return -78;
}
slot_data_list* slot = test->state.empty_slot_list.next;
pj_list_erase(slot);
status = pj_lock_release(test->state.list_lock);
if (status != PJ_SUCCESS) {
PJ_PERROR(1, (THIS_FILE, status, "Unable to release lock"));
return -79;
}
if (slot != NULL) { /* we have got an empty slot */
if (slot->owner != NULL) {
PJ_LOG(1, (THIS_FILE, "Reserved slot is not empty"));
test->state.retcode = -86;
break;
}
else {
slot->owner = pj_thread_this(); /* slot reserved successfully */
slot_id = slot - test->state.slots_list;
reserved_slots[reserved_count++] = slot_id;
++n_events;
}
}
if (slot == NULL || /* no empty slots at all or */
reserved_count >= MAX_RESERVED) { /* this thread has reserved the maximum number of slots allowed */
while (reserved_count) { /* clear slots reserved here */
slot_id = reserved_slots[--reserved_count];
slot = &test->state.slots_list[slot_id];
if (slot->owner != pj_thread_this()) {
PJ_LOG(1, (THIS_FILE, "Anothed thread has reserved this thread's slot"));
test->state.retcode = -84;
}
else if (slot->owner == NULL) {
PJ_LOG(1, (THIS_FILE, "Anothed thread has freed up this thread's slot"));
test->state.retcode = -83;
}
else {
slot->owner = NULL; /* free up slot before returning */
status = pj_lock_acquire(test->state.list_lock);
if (status != PJ_SUCCESS) {
PJ_PERROR(1, (THIS_FILE, status, "Unable to acquire lock"));
return -81;
}
pj_list_push_back(&test->state.empty_slot_list, slot); /* slot returned to empty slot's stack */
status = pj_lock_release(test->state.list_lock);
if (status != PJ_SUCCESS) {
PJ_PERROR(1, (THIS_FILE, status, "Unable to release lock"));
return -82;
}

}
}
}

}

TRACE((THIS_FILE, "thread exiting, n_events=%d", n_events));
return 0;
}

#else
/* To prevent warning about "translation unit is empty"
* when this test is disabled.
Expand Down
Loading