From 6f46c546dc5471107672b1c742f54b69705f1c14 Mon Sep 17 00:00:00 2001 From: skyfire Date: Sun, 18 Aug 2019 17:28:06 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=86tcp=20server=E4=BF=AE=E6=94=B9?= =?UTF-8?q?=E4=B8=BA=E6=AF=8F=E4=B8=AA=E7=BA=BF=E7=A8=8B=E9=83=BD=E7=9B=91?= =?UTF-8?q?=E5=90=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- network/sf_tcp_server_interface.h | 214 +++++----- network/sf_tcp_server_linux.h | 96 ++--- network/sf_tcp_server_linux.hpp | 676 ++++++++++++++---------------- 3 files changed, 458 insertions(+), 528 deletions(-) diff --git a/network/sf_tcp_server_interface.h b/network/sf_tcp_server_interface.h index f39b125e..29c54cf1 100644 --- a/network/sf_tcp_server_interface.h +++ b/network/sf_tcp_server_interface.h @@ -13,128 +13,124 @@ #pragma once -#include "tools/sf_nocopy.h" #include "core/sf_object.h" +#include "core/sf_stdc++.h" #include "core/sf_type.h" -#include "sf_tcp_utils.h" #include "network/sf_net_utils.h" #include "network/sf_server_socket_filter.h" -#include "core/sf_stdc++.h" +#include "sf_tcp_utils.h" +#include "tools/sf_nocopy.h" -namespace skyfire -{ +namespace skyfire { +/** + * @brief tcp服务器 + */ +class sf_tcp_server_interface : public sf_nocopy { + /** + * 新连接到来信号 + */ + SF_REG_SIGNAL(new_connection, SOCKET); + /** + * 数据包到来信号 + */ + SF_REG_SIGNAL(data_coming, SOCKET, const sf_pkg_header_t &, + const byte_array &); /** - * @brief tcp服务器 + * 原始数据到来信号 */ - class sf_tcp_server_interface : public sf_nocopy - { - /** - * 新连接到来信号 - */ - SF_REG_SIGNAL(new_connection, SOCKET); - /** - * 数据包到来信号 - */ - SF_REG_SIGNAL(data_coming, SOCKET, const sf_pkg_header_t&, const byte_array&); - /** - * 原始数据到来信号 - */ - SF_REG_SIGNAL(raw_data_coming, SOCKET, const byte_array&); - /** - * 关闭信号 - */ - SF_REG_SIGNAL(closed, SOCKET); - - /** - * 写成功信号 - */ - SF_REG_SIGNAL(write_finished, SOCKET); - - /** - * 写失败信号 - */ - SF_REG_SIGNAL(write_error, SOCKET); - - private: - - std::vector> filters__; - - protected: - - bool manage_clients__{ true }; - - void after_raw_recv_filter__(SOCKET sock,byte_array &data); - void after_recv_filter__(SOCKET sock,sf_pkg_header_t &header, byte_array &data); - void before_raw_send_filter__(SOCKET sock,byte_array &data); - void before_send_filter__(SOCKET sock,sf_pkg_header_t &header, byte_array &data); - void new_connection_filter__(SOCKET sock); - void listen_sock_filter__(SOCKET sock); - void disconnect_sock_filter__(SOCKET sock); - - public: + SF_REG_SIGNAL(raw_data_coming, SOCKET, const byte_array &); + /** + * 关闭信号 + */ + SF_REG_SIGNAL(closed, SOCKET); + + /** + * 写成功信号 + */ + SF_REG_SIGNAL(write_finished, SOCKET); + + /** + * 写失败信号 + */ + SF_REG_SIGNAL(write_error, SOCKET); + + private: + std::vector> filters__; + + protected: + bool manage_clients__{true}; + void after_raw_recv_filter__(SOCKET sock, byte_array &data); + void after_recv_filter__(SOCKET sock, sf_pkg_header_t &header, + byte_array &data); + void before_raw_send_filter__(SOCKET sock, byte_array &data); + void before_send_filter__(SOCKET sock, sf_pkg_header_t &header, + byte_array &data); + void new_connection_filter__(SOCKET sock); + void listen_sock_filter__(SOCKET sock); + void disconnect_sock_filter__(SOCKET sock); + + public: #pragma clang diagnostic push #pragma ide diagnostic ignored "OCUnusedGlobalDeclarationInspection" - void add_server_socket_filter(std::shared_ptr&& filter); + void add_server_socket_filter( + std::shared_ptr &&filter); #pragma clang diagnostic pop - /** - * 获取原始socket - * @return 原始socket - */ - virtual SOCKET get_raw_socket() = 0; - /** - * 监听端口 - * @param ip 本地ip - * @param port 本地端口 - * @return 是否监听成功 - */ - virtual bool listen(const std::string &ip, unsigned short port) = 0; - - /** - * 关闭服务器 - */ - virtual void close() = 0; - /** - * 关闭指定连接 - * @param sock 指定连接socket - */ - virtual void close(SOCKET sock) = 0; - - /** - * 发送数据包 - * @param sock socket - * @param type 包类型 - * @param data 包数据 - * @return 是否发送成功 - */ - virtual bool send(SOCKET sock, int type, const byte_array &data) = 0; - - /** - * 发送数据 - * @param sock scoket - * @param data 数据 - * @return 是否发送成功 - */ - virtual bool send(SOCKET sock, const byte_array &data) = 0; - - /** - * 获取server地址 - * @param addr server地址信息 - * @return 获取结果 - */ - bool get_server_addr(sf_addr_info_t &addr); - - - virtual bool detach(SOCKET socket) = 0; + /** + * 获取原始socket + * @return 原始socket + */ + virtual SOCKET get_raw_socket() = 0; + /** + * 监听端口 + * @param ip 本地ip + * @param port 本地端口 + * @return 是否监听成功 + */ + virtual bool listen(const std::string &ip, unsigned short port) = 0; + + /** + * 关闭服务器 + */ + virtual void close() = 0; + /** + * 关闭指定连接 + * @param sock 指定连接socket + */ + virtual void close(SOCKET sock) = 0; + + /** + * 发送数据包 + * @param sock socket + * @param type 包类型 + * @param data 包数据 + * @return 是否发送成功 + */ + virtual bool send(SOCKET sock, int type, const byte_array &data) = 0; + + /** + * 发送数据 + * @param sock scoket + * @param data 数据 + * @return 是否发送成功 + */ + virtual bool send(SOCKET sock, const byte_array &data) = 0; + + /** + * 获取server地址 + * @param addr server地址信息 + * @return 获取结果 + */ + bool get_server_addr(sf_addr_info_t &addr); #pragma clang diagnostic push #pragma ide diagnostic ignored "OCUnusedGlobalDeclarationInspection" - /** - * 设置是否管理客户端连接 - * @param flag 是否管理 - */ - void set_manage_clients(bool flag); + /** + * 设置是否管理客户端连接 + * @param flag 是否管理 + */ + void set_manage_clients(bool flag); #pragma clang diagnostic pop - }; -} \ No newline at end of file +}; +} // namespace skyfire \ No newline at end of file diff --git a/network/sf_tcp_server_linux.h b/network/sf_tcp_server_linux.h index db732a45..ceab8cc7 100644 --- a/network/sf_tcp_server_linux.h +++ b/network/sf_tcp_server_linux.h @@ -13,86 +13,80 @@ #pragma once -#include -#include +#include #include -#include +#include #include -#include #include -#include #include +#include +#include +#include +#include #include #include -#include "sf_tcp_utils.hpp" -#include "tools/sf_nocopy.h" #include "core/sf_object.hpp" #include "core/sf_type.hpp" +#include "sf_tcp_server_interface.h" +#include "sf_tcp_utils.hpp" #include "tools/sf_json.hpp" #include "tools/sf_logger.hpp" -#include "sf_tcp_server_interface.h" -#include +#include "tools/sf_nocopy.h" #include "tools/sf_utils.h" +namespace skyfire { -namespace skyfire -{ - - struct sock_data_context_t{ - epoll_event ev; - byte_array data_buffer_in; - std::deque data_buffer_out; - }; - - struct epoll_context_t{ - int epoll_fd; - std::shared_ptr mu_sock_context__ = std::make_shared(); - std::unordered_map sock_context__; - }; - - class sf_tcp_server : public sf_make_instance_t - { - private: - int listen_fd__ = -1; - bool raw__ = false; - int thread_count__ = std::thread::hardware_concurrency() * 2 + 2; - - std::vector epoll_data__; +struct sock_data_context_t { + epoll_event ev{}; + byte_array data_buffer_in{}; + std::shared_ptr mu_data_buffer_out = + std::make_shared(); + std::deque data_buffer_out{}; +}; - std::vector thread_vec__; +struct epoll_context_t { + int epoll_fd{}; + std::unordered_map sock_context__{}; +}; - void work_thread(int index, bool listen_thread = false, SOCKET listen_fd = -1); +class sf_tcp_server + : public sf_make_instance_t { + private: + int listen_fd__ = -1; + bool raw__ = false; + int thread_count__ = std::thread::hardware_concurrency() * 2 + 2; - bool in_dispatch(SOCKET fd); + std::vector thread_vec__; - int find_fd_epoll_index(SOCKET fd); + void work_thread__(bool listen_thread = false, SOCKET listen_fd = -1); - bool handle_accept(int index); + bool in_dispatch__(SOCKET fd); - void handle_read(int index,const epoll_event &ev); + bool handle_accept__(); - void handle_write(int index, const epoll_event &ev); + void handle_read__(const epoll_event &ev); - public: - SOCKET get_raw_socket() override; + void handle_write__(const epoll_event &ev); - explicit sf_tcp_server(bool raw = false); + epoll_context_t &epoll_data__() const; - ~sf_tcp_server() override; + public: + SOCKET get_raw_socket() override; - bool listen(const std::string &ip, unsigned short port) override; + explicit sf_tcp_server(bool raw = false); - void close() override; + ~sf_tcp_server() override; - void close(SOCKET sock) override; + bool listen(const std::string &ip, unsigned short port) override; - bool send(SOCKET sock, int type, const byte_array &data) override; + void close() override; - bool send(SOCKET sock, const byte_array &data) override; + void close(SOCKET sock) override; - bool detach(SOCKET sock) override ; + bool send(SOCKET sock, int type, const byte_array &data) override; - }; + bool send(SOCKET sock, const byte_array &data) override; +}; -} \ No newline at end of file +} // namespace skyfire \ No newline at end of file diff --git a/network/sf_tcp_server_linux.hpp b/network/sf_tcp_server_linux.hpp index 496af776..c781e630 100644 --- a/network/sf_tcp_server_linux.hpp +++ b/network/sf_tcp_server_linux.hpp @@ -11,462 +11,402 @@ * 发布日期:2018-10-22 */ - #pragma once -#include "sf_tcp_server_linux.h" #include "core/sf_define.h" #include "network/sf_net_utils.hpp" #include "sf_tcp_server_interface.hpp" +#include "sf_tcp_server_linux.h" namespace skyfire { - inline sf_tcp_server::sf_tcp_server(const bool raw) { - raw__ = raw; - } +inline sf_tcp_server::sf_tcp_server(const bool raw) { raw__ = raw; } - inline SOCKET sf_tcp_server::get_raw_socket() { - return listen_fd__; - } - - inline bool sf_tcp_server::send(int sock, const byte_array &data) { - auto index = find_fd_epoll_index(sock); - if (index == -1) { - sf_debug("not found fd", sock); - return false; - } +inline SOCKET sf_tcp_server::get_raw_socket() { return listen_fd__; } - std::lock_guard lock_context(*epoll_data__[index].mu_sock_context__); - auto &sock_context__ = epoll_data__[index].sock_context__; +inline bool sf_tcp_server::send(int sock, const byte_array &data) { + auto &sock_context__ = epoll_data__().sock_context__; + auto send_data = data; + before_raw_send_filter__(sock, send_data); - sf_debug("find index", index); + sf_debug("index", index, "sock", sock, "push data"); - auto send_data = data; - before_raw_send_filter__(sock, send_data); - - sf_debug("index", index, "sock", sock, "push data"); - sf_debug(sock, "before", sock_context__[sock].data_buffer_out.size()); + { + std::lock_guard lck( + *sock_context__[sock].mu_data_buffer_out); sock_context__[sock].data_buffer_out.push_back(send_data); - sf_debug(sock, "after", sock_context__[sock].data_buffer_out.size()); - - sock_context__[sock].ev.events |= EPOLLOUT; - - return epoll_ctl(epoll_data__[index].epoll_fd, EPOLL_CTL_MOD, sock, &sock_context__[sock].ev) != -1; - - } - inline bool sf_tcp_server::send(int sock, int type, const byte_array &data) { - auto index = find_fd_epoll_index(sock); - if (index == -1) { - sf_debug("not found fd", sock); - return false; - } + sock_context__[sock].ev.events |= EPOLLOUT; - std::lock_guard lock_context(*epoll_data__[index].mu_sock_context__); + return epoll_ctl(epoll_data__().epoll_fd, EPOLL_CTL_MOD, sock, + &sock_context__[sock].ev) != -1; +} - auto &sock_context__ = epoll_data__[index].sock_context__; +inline bool sf_tcp_server::send(int sock, int type, const byte_array &data) { + auto &sock_context__ = epoll_data__().sock_context__; - sf_debug("find index", index); + sf_debug("find index", index); - sf_pkg_header_t header{}; - header.type = type; - header.length = data.size(); - make_header_checksum(header); - auto tmp_data = data; - before_send_filter__(sock, header, tmp_data); - auto send_data = make_pkg(header) + tmp_data; + sf_pkg_header_t header{}; + header.type = type; + header.length = data.size(); + make_header_checksum(header); + auto tmp_data = data; + before_send_filter__(sock, header, tmp_data); + auto send_data = make_pkg(header) + tmp_data; - before_raw_send_filter__(sock, send_data); + before_raw_send_filter__(sock, send_data); - sf_debug(sock, "before", sock_context__[sock].data_buffer_out.size()); + { + std::shared_lock lck( + *sock_context__[sock].mu_data_buffer_out); sock_context__[sock].data_buffer_out.push_back(send_data); - sf_debug(sock, "after", sock_context__[sock].data_buffer_out.size()); - - sock_context__[sock].ev.events |= EPOLLOUT; + } - return epoll_ctl(epoll_data__[index].epoll_fd, EPOLL_CTL_MOD, sock, &sock_context__[sock].ev) != -1; + sock_context__[sock].ev.events |= EPOLLOUT; - } + return epoll_ctl(epoll_data__().epoll_fd, EPOLL_CTL_MOD, sock, + &sock_context__[sock].ev) != -1; +} - inline void sf_tcp_server::close(SOCKET sock) { - ::shutdown(sock, SHUT_RDWR); - ::close(sock); - } +inline void sf_tcp_server::close(SOCKET sock) { + ::shutdown(sock, SHUT_RDWR); + ::close(sock); +} - inline void sf_tcp_server::close() { - shutdown(listen_fd__, SHUT_RDWR); - ::close(listen_fd__); - listen_fd__ = -1; +inline void sf_tcp_server::close() { + shutdown(listen_fd__, SHUT_RDWR); + ::close(listen_fd__); + listen_fd__ = -1; - for (auto &t:epoll_data__) { - std::lock_guard lock_context(*t.mu_sock_context__); - for (auto &p:t.sock_context__) { - epoll_ctl(t.epoll_fd, EPOLL_CTL_DEL, p.first, - &p.second.ev); - } - t.sock_context__.clear(); - } - for (auto &p:thread_vec__) { - p.join(); - } + for (auto &p : thread_vec__) { + p.join(); } +} - inline bool sf_tcp_server::listen(const std::string &ip, unsigned short port) { - listen_fd__ = socket(AF_INET, SOCK_STREAM, 0); - if (listen_fd__ == -1) { - return false; - } +inline bool sf_tcp_server::listen(const std::string &ip, unsigned short port) { + listen_fd__ = socket(AF_INET, SOCK_STREAM, 0); + if (listen_fd__ == -1) { + return false; + } - listen_sock_filter__(listen_fd__); + listen_sock_filter__(listen_fd__); - if (fcntl(listen_fd__, F_SETFL, fcntl(listen_fd__, F_GETFD, 0) | O_NONBLOCK) == -1) { - return false; - } + if (fcntl(listen_fd__, F_SETFL, + fcntl(listen_fd__, F_GETFD, 0) | O_NONBLOCK) == -1) { + return false; + } - int opt = 1; - if (-1 == setsockopt(listen_fd__, SOL_SOCKET, SO_REUSEADDR, - reinterpret_cast(&opt), sizeof(opt))) { - return false; - } + int opt = 1; + if (-1 == setsockopt(listen_fd__, SOL_SOCKET, SO_REUSEADDR, + reinterpret_cast(&opt), sizeof(opt))) { + return false; + } - sockaddr_in internet_addr{}; - internet_addr.sin_family = AF_INET; - internet_addr.sin_addr.s_addr = inet_addr(ip.c_str()); - internet_addr.sin_port = htons(port); + if (-1 == setsockopt(listen_fd__, SOL_SOCKET, SO_REUSEPORT, + reinterpret_cast(&opt), sizeof(opt))) { + return false; + } - if (::bind(listen_fd__, reinterpret_cast(&internet_addr), sizeof(sockaddr_in)) == -1) { - return false; - } + sockaddr_in internet_addr{}; + internet_addr.sin_family = AF_INET; + internet_addr.sin_addr.s_addr = inet_addr(ip.c_str()); + internet_addr.sin_port = htons(port); - if (::listen(listen_fd__, max_tcp_connection) == -1) { - return false; - } + if (::bind(listen_fd__, reinterpret_cast(&internet_addr), + sizeof(sockaddr_in)) == -1) { + return false; + } - epoll_data__.assign(static_cast(thread_count__), epoll_context_t{}); + if (::listen(listen_fd__, max_tcp_connection) == -1) { + return false; + } - thread_vec__.emplace_back(std::thread(&sf_tcp_server::work_thread, this, 0, true, listen_fd__)); + thread_vec__.emplace_back( + std::thread(&sf_tcp_server::work_thread__, this, true, listen_fd__)); - if (manage_clients__) { - for (int i = 1; i < thread_count__; ++i) { - thread_vec__.emplace_back(std::thread(&sf_tcp_server::work_thread, this, i, false, -1)); - } + if (manage_clients__) { + for (int i = 1; i < thread_count__; ++i) { + thread_vec__.emplace_back(std::thread(&sf_tcp_server::work_thread__, + this, true, listen_fd__)); } - return true; } + return true; +} - inline void sf_tcp_server::work_thread(int index, bool listen_thread, SOCKET listen_fd) { - sf_debug("start thread", index); - epoll_data__[index].epoll_fd = epoll_create(max_tcp_connection); +inline void sf_tcp_server::work_thread__(bool listen_thread, SOCKET listen_fd) { + sf_debug("start thread", index); + epoll_data__().epoll_fd = epoll_create(max_tcp_connection); - std::unique_lock lock_context(*epoll_data__[index].mu_sock_context__); - auto &sock_context__ = epoll_data__[index].sock_context__; + auto &sock_context__ = epoll_data__().sock_context__; - if (listen_thread) { - sock_context__[listen_fd] = sock_data_context_t{}; - sock_context__[listen_fd].ev.events = EPOLLIN | EPOLLET; - sock_context__[listen_fd].ev.data.fd = listen_fd; + if (listen_thread) { + sock_context__[listen_fd] = sock_data_context_t{}; + sock_context__[listen_fd].ev.events = EPOLLIN | EPOLLET; + sock_context__[listen_fd].ev.data.fd = listen_fd; - if (epoll_ctl(epoll_data__[index].epoll_fd, EPOLL_CTL_ADD, listen_fd, - &sock_context__[listen_fd].ev) < - 0) { - sf_debug("add to epoll error"); - close(listen_fd); - return; - } + if (epoll_ctl(epoll_data__().epoll_fd, EPOLL_CTL_ADD, listen_fd, + &sock_context__[listen_fd].ev) < 0) { + sf_debug("add to epoll error"); + close(listen_fd); + return; } - lock_context.unlock(); - - std::vector evs(max_tcp_connection); - - while (true) { - int wait_fds = 0; - if ((wait_fds = epoll_wait(epoll_data__[index].epoll_fd, evs.data(), max_tcp_connection, -1)) == -1) { - if (errno == EINTR) { - continue; - } - break; - } + } + std::vector evs(max_tcp_connection); - if (wait_fds == 0) { + while (true) { + int wait_fds = 0; + if ((wait_fds = epoll_wait(epoll_data__().epoll_fd, evs.data(), + max_tcp_connection, -1)) == -1) { + if (errno == EINTR) { continue; } + break; + } - sf_debug("new epoll event", wait_fds, index); - - auto listen_err = false; - - for (auto i = 0; i < wait_fds; ++i) { - if (evs[i].data.fd == listen_fd__ && (evs[i].events & EPOLLIN)) { - if(!handle_accept(index)) - { - listen_err = true; - } - } else if (evs[i].events & EPOLLIN) { - handle_read(index, evs[i]); - } else if (evs[i].events & EPOLLOUT) { - handle_write(index, evs[i]); - } - } - if(listen_err) - { - sf_debug("listen error"); - break; - } + if (wait_fds == 0) { + continue; } - } + sf_debug("new epoll event", wait_fds, index); - inline sf_tcp_server::~sf_tcp_server() { - close(); - } + auto listen_err = false; - inline bool sf_tcp_server::in_dispatch(SOCKET fd) { - int index = 0; - std::unique_lock lock_context(*epoll_data__[index].mu_sock_context__); - auto min_fd_count = epoll_data__[0].sock_context__.size(); - lock_context.unlock(); - for (int i = 0; i < epoll_data__.size(); ++i) { - std::lock_guard lock_context2(*epoll_data__[i].mu_sock_context__); - auto ct = epoll_data__[i].sock_context__.size(); - sf_debug("index", i, "count", ct); - if (min_fd_count > ct) { - min_fd_count = ct; - index = i; + for (auto i = 0; i < wait_fds; ++i) { + if (evs[i].data.fd == listen_fd__ && (evs[i].events & EPOLLIN)) { + if (!handle_accept__()) { + listen_err = true; + } + } else if (evs[i].events & EPOLLIN) { + handle_read__(evs[i]); + } else if (evs[i].events & EPOLLOUT) { + handle_write__(evs[i]); } } + if (listen_err) { + sf_debug("listen error"); + break; + } + } +} +inline sf_tcp_server::~sf_tcp_server() { close(); } - sf_debug("add new fd", fd, index); +inline bool sf_tcp_server::in_dispatch__(SOCKET fd) { + sf_debug("add new fd", fd); - std::lock_guard lock_context3(*epoll_data__[index].mu_sock_context__); - auto &sock_context__ = epoll_data__[index].sock_context__; - sock_context__[fd] = sock_data_context_t{}; - sock_context__[fd].ev.events = EPOLLIN | EPOLLET; - sock_context__[fd].ev.data.fd = fd; + auto &sock_context__ = epoll_data__().sock_context__; + sock_context__[fd] = sock_data_context_t{}; + sock_context__[fd].ev.events = EPOLLIN | EPOLLET; + sock_context__[fd].ev.data.fd = fd; - if (epoll_ctl(epoll_data__[index].epoll_fd, EPOLL_CTL_ADD, fd, - &sock_context__[fd].ev) < - 0) { - sf_debug("add to epoll error"); - close(fd); - return false; - } - return true; + if (epoll_ctl(epoll_data__().epoll_fd, EPOLL_CTL_ADD, fd, + &sock_context__[fd].ev) < 0) { + sf_debug("add to epoll error"); + close(fd); + return false; } - - inline int sf_tcp_server::find_fd_epoll_index(SOCKET fd) { - for (int i = 0; i < epoll_data__.size(); ++i) { - std::lock_guard lck(*epoll_data__[i].mu_sock_context__); - if (epoll_data__[i].sock_context__.count(fd) != 0) { - return i; + return true; +} + +inline bool sf_tcp_server::handle_accept__() { + int conn_fd = 0; + sockaddr_in client_addr{}; + socklen_t len = sizeof(client_addr); + while ((conn_fd = accept(listen_fd__, (struct sockaddr *)&client_addr, + &len)) > 0) { + new_connection_filter__(conn_fd); + if (manage_clients__) { + if (fcntl(conn_fd, F_SETFL, + fcntl(conn_fd, F_GETFD, 0) | O_NONBLOCK) == -1) { + sf_debug("set no block error"); + close(conn_fd); + return true; + } + if (!in_dispatch__(conn_fd)) { + return true; } } - return -1; + sf_debug("new connection", conn_fd, index); + new_connection(conn_fd); } - inline bool sf_tcp_server::handle_accept(int index) { - - int conn_fd = 0; - sockaddr_in client_addr{}; - socklen_t len = sizeof(client_addr); - while ((conn_fd = accept(listen_fd__, (struct sockaddr *) &client_addr, &len)) > - 0) { - new_connection_filter__(conn_fd); - if (manage_clients__) { - if (fcntl(conn_fd, F_SETFL, fcntl(conn_fd, F_GETFD, 0) | O_NONBLOCK) == -1) { - sf_debug("set no block error"); - close(conn_fd); - return true; - } - if (!in_dispatch(conn_fd)) { - return true; + if (errno == EAGAIN || errno == EINTR) { + sf_debug("accept finished", index); + return true; + } else { + sf_debug("accept error"); + { + epoll_ctl(epoll_data__().epoll_fd, EPOLL_CTL_DEL, listen_fd__, + &epoll_data__().sock_context__[listen_fd__].ev); + epoll_data__().sock_context__.erase(listen_fd__); + } + return false; + } +} + +inline void sf_tcp_server::handle_read__(const epoll_event &ev) { + byte_array recv_buf(sf_default_buffer_size); + sf_pkg_header_t header{}; + auto &sock_context__ = epoll_data__().sock_context__; + while (true) { + sf_debug("start read"); + auto count_read = static_cast( + recv(ev.data.fd, recv_buf.data(), sf_default_buffer_size, 0)); + sf_debug("read", count_read); + if (count_read <= 0) { + sf_debug("errno", errno); + // EWOULDBLOCK == EAGAIN + if ((errno == EAGAIN || + errno == EINTR /* || errno == EWOULDBLOCK */) && + count_read < 0) { + sf_debug("read finished", errno); + break; + } else { + disconnect_sock_filter__(ev.data.fd); + { + epoll_ctl(epoll_data__().epoll_fd, EPOLL_CTL_DEL, + ev.data.fd, &sock_context__[ev.data.fd].ev); + sock_context__.erase(ev.data.fd); } + closed(ev.data.fd); + close(ev.data.fd); + sf_debug("read error / connection closed"); + break; } - sf_debug("new connection", conn_fd, index); - new_connection(conn_fd); } - - if (errno == EAGAIN || errno == EINTR) { - sf_debug("accept finished", index); - return true; + recv_buf.resize(static_cast(count_read)); + after_raw_recv_filter__(ev.data.fd, recv_buf); + if (raw__) { + sf_debug("raw data", recv_buf.size()); + raw_data_coming(ev.data.fd, recv_buf); + sf_debug("after resolve"); } else { - sf_debug("accept error"); - { - std::lock_guard lock_context(*epoll_data__[index].mu_sock_context__); - epoll_ctl(epoll_data__[index].epoll_fd, EPOLL_CTL_DEL, listen_fd__, - &epoll_data__[index].sock_context__[listen_fd__].ev); - epoll_data__[index].sock_context__.erase(listen_fd__); - } - return false; - } - } - - inline void sf_tcp_server::handle_read(int index, const epoll_event &ev) { - - byte_array recv_buf(sf_default_buffer_size); - sf_pkg_header_t header{}; - std::lock_guard lock_context(*epoll_data__[index].mu_sock_context__); - auto &sock_context__ = epoll_data__[index].sock_context__; - while (true) { - sf_debug("start read"); - auto count_read = static_cast(recv(ev.data.fd, recv_buf.data(), - sf_default_buffer_size, 0)); - sf_debug("read", count_read); - if (count_read <= 0) { - sf_debug("errno", errno); - // EWOULDBLOCK == EAGAIN - if ((errno == EAGAIN || errno == EINTR /* || errno == EWOULDBLOCK */) && count_read < 0) { - sf_debug("read finished", errno); - break; - } else { + sock_context__[ev.data.fd].data_buffer_in.insert( + sock_context__[ev.data.fd].data_buffer_in.end(), + recv_buf.begin(), recv_buf.end()); + size_t read_pos = 0; + while (sock_context__[ev.data.fd].data_buffer_in.size() - + read_pos >= + sizeof(sf_pkg_header_t)) { + memmove( + &header, + sock_context__[ev.data.fd].data_buffer_in.data() + read_pos, + sizeof(header)); + if (!check_header_checksum(header)) { disconnect_sock_filter__(ev.data.fd); - { - epoll_ctl(epoll_data__[index].epoll_fd, EPOLL_CTL_DEL, ev.data.fd, - &sock_context__[ev.data.fd].ev); - sock_context__.erase(ev.data.fd); - } - closed(ev.data.fd); - close(ev.data.fd); - sf_debug("read error / connection closed"); + close(ev.data.fd); + closed(ev.data.fd); break; } - } - recv_buf.resize(static_cast(count_read)); - after_raw_recv_filter__(ev.data.fd, recv_buf); - if (raw__) { - sf_debug("raw data", recv_buf.size()); - raw_data_coming(ev.data.fd, recv_buf); - sf_debug("after resolve"); - } else { + if (sock_context__[ev.data.fd].data_buffer_in.size() - + read_pos - sizeof(header) >= + header.length) { + byte_array data = {byte_array( + sock_context__[ev.data.fd].data_buffer_in.begin() + + read_pos + sizeof(header), + sock_context__[ev.data.fd].data_buffer_in.begin() + + read_pos + sizeof(header) + header.length)}; + read_pos += sizeof(header) + header.length; + + after_recv_filter__(ev.data.fd, header, data); + data_coming(ev.data.fd, header, data); - sock_context__[ev.data.fd].data_buffer_in.insert( - sock_context__[ev.data.fd].data_buffer_in.end(), - recv_buf.begin(), - recv_buf.end()); - size_t read_pos = 0; - while (sock_context__[ev.data.fd].data_buffer_in.size() - read_pos >= - sizeof(sf_pkg_header_t)) { - memmove(&header, - sock_context__[ev.data.fd].data_buffer_in.data() + read_pos, - sizeof(header)); - if (!check_header_checksum(header)) { - disconnect_sock_filter__(ev.data.fd); - close(ev.data.fd); - closed(ev.data.fd); - break; - } - if (sock_context__[ev.data.fd].data_buffer_in.size() - read_pos - - sizeof(header) >= - header.length) { - byte_array data = {byte_array( - sock_context__[ev.data.fd].data_buffer_in.begin() + - read_pos + - sizeof(header), - sock_context__[ev.data.fd].data_buffer_in.begin() + - read_pos + - sizeof(header) + header.length)}; - read_pos += sizeof(header) + header.length; - - after_recv_filter__(ev.data.fd, header, data); - data_coming( - ev.data.fd, header, - data - ); - - } else { - break; - } - } - if (read_pos != 0) { - sock_context__[ev.data.fd].data_buffer_in.erase( - sock_context__[ev.data.fd].data_buffer_in.begin(), - sock_context__[ev.data.fd].data_buffer_in.begin() + read_pos); + } else { + break; } } + if (read_pos != 0) { + sock_context__[ev.data.fd].data_buffer_in.erase( + sock_context__[ev.data.fd].data_buffer_in.begin(), + sock_context__[ev.data.fd].data_buffer_in.begin() + + read_pos); + } } } - - inline void sf_tcp_server::handle_write(int index, const epoll_event &ev) { - std::lock_guard lock_context(*epoll_data__[index].mu_sock_context__); - auto &sock_context__ = epoll_data__[index].sock_context__; - SOCKET fd = ev.data.fd; - sf_debug(fd, "ready write", sock_context__[fd].data_buffer_out.size()); +} + +inline void sf_tcp_server::handle_write__(const epoll_event &ev) { + auto &sock_context__ = epoll_data__().sock_context__; + SOCKET fd = ev.data.fd; + { + std::shared_lock lck( + *sock_context__[fd].mu_data_buffer_out); if (sock_context__[fd].data_buffer_out.empty()) { - sf_debug("index", index, "sock", fd, "empty"); + sf_debug("sock", fd, "empty"); return; } - sf_debug("pendding pkg count:", sock_context__[fd].data_buffer_out.size()); - while (true) { + } + while (true) { + byte_array p; + { + std::shared_lock lck( + *sock_context__[fd].mu_data_buffer_out); if (sock_context__[fd].data_buffer_out.empty()) { write_finished(fd); sock_context__[fd].ev.events &= ~EPOLLOUT; - epoll_ctl(epoll_data__[index].epoll_fd, EPOLL_CTL_MOD, fd, &sock_context__[fd].ev); - sf_debug(fd, "write_finished", sock_context__[fd].data_buffer_out.size()); + epoll_ctl(epoll_data__().epoll_fd, EPOLL_CTL_MOD, fd, + &sock_context__[fd].ev); + sf_debug(fd, "write_finished", + sock_context__[fd].data_buffer_out.size()); break; } - auto p = sock_context__[fd].data_buffer_out.front(); - sf_debug("pkg size:", p.size()); - auto data_size = p.size(); - auto n = data_size; - decltype(n) tmp_write; - bool error_flag = false; - while (n > 0) { - tmp_write = static_cast(write(fd, p.data() + data_size - n, n)); - if (tmp_write < n) { - if (tmp_write == -1 && errno != EAGAIN) { - write_error(fd); - error_flag = true; - } - if(tmp_write > 0){ - n -= tmp_write; - } - break; + p = sock_context__[fd].data_buffer_out.front(); + } + sf_debug("pkg size:", p.size()); + auto data_size = p.size(); + auto n = data_size; + decltype(n) tmp_write; + bool error_flag = false; + while (n > 0) { + tmp_write = static_cast( + write(fd, p.data() + data_size - n, n)); + if (tmp_write < n) { + if (tmp_write == -1 && errno != EAGAIN) { + write_error(fd); + error_flag = true; + } + if (tmp_write > 0) { + n -= tmp_write; } - n -= tmp_write; - } - if (error_flag) { - sf_debug("write error"); - disconnect_sock_filter__(fd); - epoll_ctl(epoll_data__[index].epoll_fd, EPOLL_CTL_DEL, fd, &sock_context__[fd].ev); - close(fd); - closed(fd); - sock_context__.erase(fd); break; + } + n -= tmp_write; + } + if (error_flag) { + sf_debug("write error"); + disconnect_sock_filter__(fd); + epoll_ctl(epoll_data__().epoll_fd, EPOLL_CTL_DEL, fd, + &sock_context__[fd].ev); + close(fd); + closed(fd); + sock_context__.erase(fd); + break; + } else { + std::lock_guard lck( + *sock_context__[fd].mu_data_buffer_out); + if (n == 0) { + sf_debug("pop front"); + sf_debug(fd, "before", + sock_context__[fd].data_buffer_out.size()); + sock_context__[fd].data_buffer_out.pop_front(); + sf_debug(fd, "after", + sock_context__[fd].data_buffer_out.size()); } else { - if (n == 0) { - sf_debug("pop front"); - sf_debug(fd, "before", sock_context__[fd].data_buffer_out.size()); - sock_context__[fd].data_buffer_out.pop_front(); - sf_debug(fd, "after", sock_context__[fd].data_buffer_out.size()); - } else { - sock_context__[fd].data_buffer_out.front() = { - sock_context__[fd].data_buffer_out.front().begin() + n, - sock_context__[fd].data_buffer_out.front().end()}; - sf_debug(fd, "write pendding", sock_context__[fd].data_buffer_out.size()); - break; - } + sock_context__[fd].data_buffer_out.front() = { + sock_context__[fd].data_buffer_out.front().begin() + n, + sock_context__[fd].data_buffer_out.front().end()}; + sf_debug(fd, "write pendding", + sock_context__[fd].data_buffer_out.size()); + break; } } } +} - inline bool sf_tcp_server::detach(SOCKET sock) { - auto index = find_fd_epoll_index(sock); - if(index == -1){ - return false; - } - std::lock_guard lock_context(*epoll_data__[index].mu_sock_context__); - if (epoll_ctl(epoll_data__[index].epoll_fd, EPOLL_CTL_DEL, sock, - &epoll_data__[index].sock_context__[sock].ev) != -1) - { - return false; - } - epoll_data__[index].sock_context__.erase(sock); - - if (fcntl(sock, F_SETFL, fcntl(sock, F_GETFD, 0) & ~O_NONBLOCK) == -1) { - sf_debug("detach error"); - return false; - } +inline epoll_context_t &sf_tcp_server::epoll_data__() const { + thread_local static epoll_context_t d; + return d; +} - return true; - } -} \ No newline at end of file +} // namespace skyfire \ No newline at end of file