Skip to content

Commit

Permalink
[memory][config] Add events proccesors config
Browse files Browse the repository at this point in the history
  • Loading branch information
NikitaZotov committed Apr 14, 2023
1 parent 66e1ffa commit ab37902
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 53 deletions.
1 change: 1 addition & 0 deletions config/sc-machine.ini
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[sc-memory]
max_loaded_segments = 1000
max_threads = 32
events_processors = 1

save_period = 3600
update_period = 1800
Expand Down
9 changes: 7 additions & 2 deletions sc-memory/sc-core/sc-store/sc_event.c
Original file line number Diff line number Diff line change
Expand Up @@ -415,12 +415,17 @@ void sc_event_unlock(sc_event * evt)
}

// --------
sc_bool sc_events_initialize()
sc_bool sc_events_initialize_ext(sc_int32 const events_processors)
{
event_queue = sc_event_queue_new();
event_queue = sc_event_queue_new_ext(events_processors);
return SC_TRUE;
}

sc_bool sc_events_initialize()
{
return sc_events_initialize_ext((sc_int32)g_get_num_processors());
}

void sc_events_shutdown()
{
sc_event_queue_destroy_wait(event_queue);
Expand Down
3 changes: 3 additions & 0 deletions sc-memory/sc-core/sc-store/sc_event/sc_event_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ struct _sc_event
sc_access_levels access_levels;
};

//! Function to initialize sc-events module with user processors number
sc_bool sc_events_initialize_ext(sc_int32 events_processors);

//! Function to initialize sc-events module
sc_bool sc_events_initialize();

Expand Down
35 changes: 15 additions & 20 deletions sc-memory/sc-core/sc-store/sc_event/sc_event_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#include "sc_event_private.h"

#include "../sc-base/sc_allocator.h"
#include "../sc-base/sc_assert_utils.h"
#include "../sc-base/sc_message.h"

typedef struct
{
Expand All @@ -30,16 +30,13 @@ sc_event_pool_worker_data * sc_event_pool_worker_data_new(sc_event * evt, sc_add

void sc_event_pool_worker_data_destroy(sc_event_pool_worker_data * data)
{
sc_assert(data != null_ptr);
sc_mem_free(data);
}

void sc_event_pool_worker(gpointer data, gpointer user_data)
{
sc_assert(data != null_ptr);
sc_event_pool_worker_data * work_data = (sc_event_pool_worker_data *)data;

sc_assert(work_data->evt != null_ptr);
if (work_data->evt->callback != null_ptr)
{
work_data->evt->callback(work_data->evt, work_data->edge_addr);
Expand All @@ -53,21 +50,31 @@ void sc_event_pool_worker(gpointer data, gpointer user_data)
sc_event_pool_worker_data_destroy(work_data);
}

sc_event_queue * sc_event_queue_new()
sc_event_queue * sc_event_queue_new_ext(sc_int32 events_processors)
{
sc_event_queue * queue = sc_mem_new(sc_event_queue, 1);
queue->running = SC_TRUE;
g_mutex_init(&queue->mutex);

events_processors = sc_max(1, sc_min(events_processors, (sc_int32)g_get_num_processors()));
{
sc_message("Sc-events configuration:");
sc_message("\tprocessors: %d", events_processors);
}

queue->thread_pool =
g_thread_pool_new(sc_event_pool_worker, null_ptr, (sc_int32)g_get_num_processors(), SC_FALSE, null_ptr);
g_thread_pool_new(sc_event_pool_worker, null_ptr, events_processors, SC_FALSE, null_ptr);

return queue;
}

void sc_event_queue_stop_processing(sc_event_queue * queue)
sc_event_queue * sc_event_queue_new()
{
sc_assert(queue != null_ptr);
return sc_event_queue_new_ext((sc_int32)g_get_num_processors());
}

void sc_event_queue_stop_processing(sc_event_queue * queue)
{
sc_bool is_running = SC_FALSE;

g_mutex_lock(&queue->mutex);
Expand All @@ -84,8 +91,6 @@ void sc_event_queue_stop_processing(sc_event_queue * queue)

void sc_event_queue_destroy_wait(sc_event_queue * queue)
{
sc_assert(queue != null_ptr);

if (queue->thread_pool)
{
g_mutex_lock(&queue->mutex);
Expand All @@ -99,8 +104,6 @@ void sc_event_queue_destroy_wait(sc_event_queue * queue)

void sc_event_queue_append(sc_event_queue * queue, sc_event * evt, sc_addr edge, sc_addr other_el)
{
sc_assert(queue != null_ptr);

g_mutex_lock(&queue->mutex);
if (queue->running == SC_TRUE)
{
Expand All @@ -109,11 +112,3 @@ void sc_event_queue_append(sc_event_queue * queue, sc_event * evt, sc_addr edge,
}
g_mutex_unlock(&queue->mutex);
}

void sc_event_queue_remove(sc_event_queue * queue, sc_event * event)
{
sc_assert(queue != null_ptr);
g_mutex_lock(&queue->mutex);

g_mutex_unlock(&queue->mutex);
}
12 changes: 3 additions & 9 deletions sc-memory/sc-core/sc-store/sc_event/sc_event_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,13 @@ struct _sc_event_queue
{
GMutex mutex;
sc_bool running; // flag that determine if queue is running
GRecMutex proc_mutex; // mutex to lock event for process
GThreadPool * thread_pool; // thread pool that used for a workers
};

struct _sc_event_queue_item
{
sc_event * event;
sc_addr edge;
sc_addr other_el;
};

typedef struct _sc_event_queue sc_event_queue;
typedef struct _sc_event_queue_item sc_event_queue_item;

//! Create new sc-event queue with user processors number
sc_event_queue * sc_event_queue_new_ext(sc_int32 events_processors);

//! Create new sc-event queue
sc_event_queue * sc_event_queue_new();
Expand Down
2 changes: 1 addition & 1 deletion sc-memory/sc-core/sc_memory.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ sc_memory_context * sc_memory_initialize(const sc_memory_params * params)
goto error;
sc_memory_context_free(helper_ctx);

if (sc_events_initialize() == SC_FALSE)
if (sc_events_initialize_ext(params->events_processors) == SC_FALSE)
{
sc_error("Error while initialize events module");
goto error;
Expand Down
9 changes: 1 addition & 8 deletions sc-memory/sc-core/sc_memory_params.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,6 @@

#include "sc_memory_params.h"

#define DEFAULT_SAVE_PERIOD 32000
#define DEFAULT_UPDATE_PERIOD 16000
#define DEFAULT_MAX_THREADS 1 //32
#define DEFAULT_MAX_LOADED_SEGMENTS 1000
#define DEFAULT_LOG_TYPE "Console"
#define DEFAULT_LOG_FILE ""
#define DEFAULT_LOG_LEVEL "Info"

void sc_memory_params_clear(sc_memory_params * params)
{
params->version = (sc_char const *)null_ptr;
Expand All @@ -30,6 +22,7 @@ void sc_memory_params_clear(sc_memory_params * params)

params->max_loaded_segments = DEFAULT_MAX_LOADED_SEGMENTS;
params->max_threads = DEFAULT_MAX_THREADS;
params->events_processors = DEFAULT_EVENTS_PROCESSORS;

params->init_memory_generated_structure = (sc_char const *)null_ptr;
params->init_memory_generated_upload = SC_FALSE;
Expand Down
10 changes: 10 additions & 0 deletions sc-memory/sc-core/sc_memory_params.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@

#include "sc-store/sc_types.h"

#define DEFAULT_SAVE_PERIOD 32000
#define DEFAULT_UPDATE_PERIOD 16000
#define DEFAULT_MAX_THREADS 32
#define DEFAULT_EVENTS_PROCESSORS 32
#define DEFAULT_MAX_LOADED_SEGMENTS 1000
#define DEFAULT_LOG_TYPE "Console"
#define DEFAULT_LOG_FILE ""
#define DEFAULT_LOG_LEVEL "Info"

typedef struct _sc_memory_params
{
const sc_char * version;
Expand All @@ -25,6 +34,7 @@ typedef struct _sc_memory_params

sc_uint32 max_loaded_segments;
sc_uint8 max_threads;
sc_int32 events_processors;

const sc_char * init_memory_generated_structure;
sc_bool init_memory_generated_upload;
Expand Down
11 changes: 2 additions & 9 deletions sc-tools/sc-config-utils/sc_memory_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,6 @@
#include "sc-core/sc_memory_params.h"
#include "sc-core/sc_memory_version.h"

#define DEFAULT_SAVE_PERIOD 32000
#define DEFAULT_UPDATE_PERIOD 16000
#define DEFAULT_MAX_THREADS 32
#define DEFAULT_MAX_LOADED_SEGMENTS 1000
#define DEFAULT_LOG_TYPE "Console"
#define DEFAULT_LOG_FILE ""
#define DEFAULT_LOG_LEVEL "Info"

class ScParams
{
public:
Expand Down Expand Up @@ -130,8 +122,9 @@ class ScMemoryConfig
m_memoryParams.ext_path = GetStringByKey("extensions_path");
m_memoryParams.enabled_exts = nullptr;

m_memoryParams.max_threads = GetIntByKey("max_threads", DEFAULT_MAX_THREADS);
m_memoryParams.max_loaded_segments = GetIntByKey("max_loaded_segments", DEFAULT_MAX_LOADED_SEGMENTS);
m_memoryParams.max_threads = GetIntByKey("max_threads", DEFAULT_MAX_THREADS);
m_memoryParams.events_processors = GetIntByKey("events_processors", DEFAULT_EVENTS_PROCESSORS);

m_memoryParams.save_period = GetIntByKey("save_period", DEFAULT_SAVE_PERIOD);
m_memoryParams.update_period = GetIntByKey("update_period", DEFAULT_UPDATE_PERIOD);
Expand Down
4 changes: 0 additions & 4 deletions sc-tools/sc-server/src/sc-server-impl/sc_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@
#define DEFAULT_HOST "127.0.0.1"
#define DEFAULT_PORT 8090

#define DEFAULT_LOG_TYPE "Console"
#define DEFAULT_LOG_FILE ""
#define DEFAULT_LOG_LEVEL "Debug"

ScServerImpl::ScServerImpl(sc_memory_params const & params)
: ScServerImpl(DEFAULT_HOST, DEFAULT_PORT, DEFAULT_LOG_TYPE, DEFAULT_LOG_FILE, DEFAULT_LOG_LEVEL, SC_FALSE, params)
{
Expand Down

0 comments on commit ab37902

Please sign in to comment.