Skip to content

Commit

Permalink
Optimize server code (#5456)
Browse files Browse the repository at this point in the history
* optimize code

* optimize code [2]

* Optimize sendfile

* added enable_asan option
  • Loading branch information
matyhtf authored Aug 22, 2024
1 parent e692bad commit 0df5246
Show file tree
Hide file tree
Showing 14 changed files with 59 additions and 39 deletions.
3 changes: 3 additions & 0 deletions .github/WORKFLOW-PARAMETERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@ Setting this parameter will cause the test program to be run with `valgrind`.
```shell
git commit -m "commit message --filter=[core][unit] --valgrind"
```

## --asan
Setting this parameter will cause the test program to be run with `ASAN`.
5 changes: 5 additions & 0 deletions .github/workflows/core.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ jobs:
sudo apt install -y valgrind
cd core-tests && SWOOLE_VALGRIND=1 ./run.sh
- name: make test with asan
if: "contains(github.event.head_commit.message, '--asan')"
run: |
cd core-tests && SWOOLE_ENABLE_ASAN=1 ./run.sh
- name: make test
run:
cd core-tests && ./run.sh
Expand Down
5 changes: 5 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ if (DEFINED enable_trace_log)
add_definitions(-DSW_LOG_TRACE_OPEN)
endif()

if (DEFINED enable_asan)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address -fno-omit-frame-pointer")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fsanitize=address -fno-omit-frame-pointer")
endif()

execute_process(COMMAND php-config --includes OUTPUT_VARIABLE PHP_INCLUDES OUTPUT_STRIP_TRAILING_WHITESPACE)
execute_process(COMMAND php-config --extension-dir OUTPUT_VARIABLE PHP_EXTENSION_DIR OUTPUT_STRIP_TRAILING_WHITESPACE)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${PHP_INCLUDES}")
Expand Down
4 changes: 4 additions & 0 deletions core-tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ if (DEFINED enable_asan)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address -fno-omit-frame-pointer")
endif()

if (DEFINED enable_thread)
add_definitions(-DSW_THREAD)
endif()

# should execute before the add_executable command
link_directories(${core_tests_link_directories})

Expand Down
6 changes: 5 additions & 1 deletion core-tests/run.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
#!/bin/bash
cmake . -DSW_THREAD=1
if [ "${SWOOLE_ENABLE_ASAN}" = 1 ]; then
cmake . -D enable_thread=1 -D enable_asan=1
else
cmake . -D enable_thread=1
fi
make -j8
ipcs -q

Expand Down
2 changes: 1 addition & 1 deletion core-tests/src/memory/buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ TEST(buffer, append_iov) {

while (!buf_for_offset.empty()) {
auto chunk = buf_for_offset.front();
str.append(chunk->value.ptr, chunk->length);
str.append(chunk->value.str, chunk->length);
buf_for_offset.pop();
}

Expand Down
10 changes: 4 additions & 6 deletions include/swoole_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,10 @@ struct BufferChunk {
uint32_t length = 0;
uint32_t offset = 0;
union {
char *ptr;
void *object;
struct {
uint32_t val1;
uint32_t val2;
} data;
char *str;
void *ptr;
uint32_t u32;
uint64_t u64;
} value{};
uint32_t size = 0;

Expand Down
5 changes: 4 additions & 1 deletion include/swoole_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -934,7 +934,10 @@ class Server {
*/
std::function<int(Server *, EventData *)> onTask;
std::function<int(Server *, EventData *)> onFinish;

/**
* for MessageBus
*/
std::function<uint64_t(void)> msg_id_generator;
/**
* Hook
*/
Expand Down
8 changes: 4 additions & 4 deletions src/memory/buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ namespace swoole {

BufferChunk::BufferChunk(Type type, uint32_t size) : type(type), size(size) {
if (type == TYPE_DATA && size > 0) {
value.ptr = new char[size];
value.str = new char[size];
}
}

BufferChunk::~BufferChunk() {
if (type == TYPE_DATA) {
delete[] value.ptr;
delete[] value.str;
}
if (destroy) {
destroy(this);
Expand Down Expand Up @@ -72,7 +72,7 @@ void Buffer::append(const void *data, uint32_t size) {

total_length += _n;

memcpy(chunk->value.ptr, _pos, _n);
memcpy(chunk->value.str, _pos, _n);
chunk->length = _n;

swoole_trace_log(SW_TRACE_BUFFER, "chunk_n=%lu|size=%u|chunk_len=%u|chunk=%p", count(), _n, chunk->length, chunk);
Expand Down Expand Up @@ -121,7 +121,7 @@ void Buffer::append(const struct iovec *iov, size_t iovcnt, off_t offset) {
}

size_t _n = std::min(iov_remain_len, chunk_remain_len);
memcpy(chunk->value.ptr + chunk->length, pos, _n);
memcpy(chunk->value.str + chunk->length, pos, _n);
total_length += _n;
_length -= _n;

Expand Down
8 changes: 4 additions & 4 deletions src/network/socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ int Socket::handle_sendfile() {
int ret;
Buffer *buffer = out_buffer;
BufferChunk *chunk = buffer->front();
SendfileRequest *task = (SendfileRequest *) chunk->value.object;
SendfileRequest *task = (SendfileRequest *) chunk->value.ptr;

if (task->offset == 0) {
cork();
Expand Down Expand Up @@ -613,7 +613,7 @@ int Socket::handle_send() {
return SW_OK;
}

ssize_t ret = send(chunk->value.ptr + chunk->offset, sendn, 0);
ssize_t ret = send(chunk->value.str + chunk->offset, sendn, 0);
if (ret < 0) {
switch (catch_write_error(errno)) {
case SW_ERROR:
Expand Down Expand Up @@ -645,7 +645,7 @@ int Socket::handle_send() {
}

static void Socket_sendfile_destructor(BufferChunk *chunk) {
SendfileRequest *task = (SendfileRequest *) chunk->value.object;
SendfileRequest *task = (SendfileRequest *) chunk->value.ptr;
delete task;
}

Expand Down Expand Up @@ -685,7 +685,7 @@ int Socket::sendfile(const char *filename, off_t offset, size_t length) {
}

BufferChunk *chunk = out_buffer->alloc(BufferChunk::TYPE_SENDFILE, 0);
chunk->value.object = task.release();
chunk->value.ptr = task.release();
chunk->destroy = Socket_sendfile_destructor;

return SW_OK;
Expand Down
10 changes: 5 additions & 5 deletions src/server/base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,18 +183,18 @@ bool BaseFactory::end(SessionId session_id, int flags) {
conn->closing = 0;
conn->closed = 1;
conn->close_errno = 0;
network::Socket *_socket = conn->socket;

if (conn->socket == nullptr) {
if (_socket == nullptr) {
swoole_warning("session#%ld->socket is nullptr", session_id);
return false;
}

if (Buffer::empty(conn->socket->out_buffer) || (conn->close_reset || conn->peer_closed || conn->close_force)) {
if (Buffer::empty(_socket->out_buffer) || (conn->close_reset || conn->peer_closed || conn->close_force)) {
Reactor *reactor = SwooleTG.reactor;
return Server::close_connection(reactor, conn->socket) == SW_OK;
return Server::close_connection(reactor, _socket) == SW_OK;
} else {
BufferChunk *chunk = conn->socket->out_buffer->alloc(BufferChunk::TYPE_CLOSE, 0);
chunk->value.data.val1 = _send.info.type;
_socket->out_buffer->alloc(BufferChunk::TYPE_CLOSE, 0);
conn->close_queued = 1;
return true;
}
Expand Down
7 changes: 3 additions & 4 deletions src/server/master.cc
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,8 @@ Server::Server(enum Mode _mode) {
gs->pipe_packet_msg_id = 1;
gs->max_concurrency = UINT_MAX;

message_bus.set_id_generator([this]() { return sw_atomic_fetch_add(&gs->pipe_packet_msg_id, 1); });
msg_id_generator = [this]() { return sw_atomic_fetch_add(&gs->pipe_packet_msg_id, 1); };
message_bus.set_id_generator(msg_id_generator);
worker_thread_start = [](const WorkerFn &fn) { fn(); };

g_server_instance = this;
Expand Down Expand Up @@ -1448,11 +1449,9 @@ int Server::send_to_connection(SendData *_send) {
}
}

BufferChunk *chunk;
// close connection
if (_send->info.type == SW_SERVER_EVENT_CLOSE) {
chunk = _socket->out_buffer->alloc(BufferChunk::TYPE_CLOSE, 0);
chunk->value.data.val1 = _send->info.type;
_socket->out_buffer->alloc(BufferChunk::TYPE_CLOSE, 0);
conn->close_queued = 1;
}
// sendfile to client
Expand Down
6 changes: 3 additions & 3 deletions src/server/reactor_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ static int ReactorThread_onPipeWrite(Reactor *reactor, Event *ev) {

while (!Buffer::empty(buffer)) {
BufferChunk *chunk = buffer->front();
EventData *send_data = (EventData *) chunk->value.ptr;
EventData *send_data = (EventData *) chunk->value.str;

// server actively closed connection, should discard the data
if (Server::is_stream_event(send_data->info.type)) {
Expand All @@ -460,7 +460,7 @@ static int ReactorThread_onPipeWrite(Reactor *reactor, Event *ev) {
}
}

ret = ev->socket->send(chunk->value.ptr, chunk->length, 0);
ret = ev->socket->send(chunk->value.str, chunk->length, 0);
if (ret < 0) {
return (ev->socket->catch_write_error(errno) == SW_WAIT) ? SW_OK : SW_ERR;
} else {
Expand Down Expand Up @@ -767,7 +767,7 @@ int ReactorThread::init(Server *serv, Reactor *reactor, uint16_t reactor_id) {
pipe_command->buffer_size = UINT_MAX;
}

message_bus.set_id_generator([serv]() { return sw_atomic_fetch_add(&serv->gs->pipe_packet_msg_id, 1); });
message_bus.set_id_generator(serv->msg_id_generator);
message_bus.set_buffer_size(serv->ipc_max_size);
message_bus.set_always_chunked_transfer();
if (!message_bus.alloc_buffer()) {
Expand Down
19 changes: 9 additions & 10 deletions src/server/thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ void Server::destroy_thread_factory() {
}

ThreadFactory::ThreadFactory(Server *server) : BaseFactory(server) {
threads_.resize(server_->task_worker_num + server_->worker_num + server_->get_user_worker_num() + 1);
threads_.resize(server_->get_all_worker_num() + 1);
}

bool ThreadFactory::start() {
Expand Down Expand Up @@ -77,9 +77,8 @@ void ThreadFactory::at_thread_exit(Worker *worker) {

void ThreadFactory::create_message_bus() {
auto mb = new MessageBus();
auto server = server_;
mb->set_id_generator([server]() { return sw_atomic_fetch_add(&server->gs->pipe_packet_msg_id, 1); });
mb->set_buffer_size(server->ipc_max_size);
mb->set_id_generator(server_->msg_id_generator);
mb->set_buffer_size(server_->ipc_max_size);
mb->set_always_chunked_transfer();
if (!mb->alloc_buffer()) {
throw std::bad_alloc();
Expand Down Expand Up @@ -209,15 +208,12 @@ void ThreadFactory::wait() {
}

int Server::start_worker_threads() {
/**
* heartbeat thread
*/
ThreadFactory *_factory = dynamic_cast<ThreadFactory *>(factory);

if (heartbeat_check_interval > 0) {
start_heartbeat_thread();
}

ThreadFactory *_factory = dynamic_cast<ThreadFactory *>(factory);

if (task_worker_num > 0) {
SW_LOOP_N(task_worker_num) {
_factory->spawn_task_worker(worker_num + i);
Expand All @@ -234,12 +230,13 @@ int Server::start_worker_threads() {
}
}

int manager_thread_id = task_worker_num + worker_num + get_user_worker_num();
int manager_thread_id = get_all_worker_num();
_factory->spawn_manager_thread(manager_thread_id);

if (swoole_event_init(0) < 0) {
return SW_ERR;
}

Reactor *reactor = sw_reactor();
for (auto iter = ports.begin(); iter != ports.end(); iter++) {
auto port = *iter;
Expand All @@ -252,8 +249,10 @@ int Server::start_worker_threads() {
}
reactor->add(port->socket, SW_EVENT_READ);
}

SwooleTG.id = reactor->id = manager_thread_id + 1;
store_listen_socket();

return start_master_thread(reactor);
}

Expand Down

0 comments on commit 0df5246

Please sign in to comment.