Skip to content

Commit

Permalink
将tcp server修改为每个线程都监听
Browse files Browse the repository at this point in the history
  • Loading branch information
skyfireitdiy committed Aug 18, 2019
1 parent eef0a17 commit 6f46c54
Show file tree
Hide file tree
Showing 3 changed files with 458 additions and 528 deletions.
214 changes: 105 additions & 109 deletions network/sf_tcp_server_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,128 +13,124 @@

#pragma once

#include "tools/sf_nocopy.h"
#include "core/sf_object.h"
#include "core/sf_stdc++.h"
#include "core/sf_type.h"
#include "sf_tcp_utils.h"
#include "network/sf_net_utils.h"
#include "network/sf_server_socket_filter.h"
#include "core/sf_stdc++.h"
#include "sf_tcp_utils.h"
#include "tools/sf_nocopy.h"

namespace skyfire
{
namespace skyfire {
/**
* @brief tcp服务器
*/
class sf_tcp_server_interface : public sf_nocopy<sf_object> {
/**
* 新连接到来信号
*/
SF_REG_SIGNAL(new_connection, SOCKET);
/**
* 数据包到来信号
*/
SF_REG_SIGNAL(data_coming, SOCKET, const sf_pkg_header_t &,
const byte_array &);
/**
* @brief tcp服务器
* 原始数据到来信号
*/
class sf_tcp_server_interface : public sf_nocopy<sf_object>
{
/**
* 新连接到来信号
*/
SF_REG_SIGNAL(new_connection, SOCKET);
/**
* 数据包到来信号
*/
SF_REG_SIGNAL(data_coming, SOCKET, const sf_pkg_header_t&, const byte_array&);
/**
* 原始数据到来信号
*/
SF_REG_SIGNAL(raw_data_coming, SOCKET, const byte_array&);
/**
* 关闭信号
*/
SF_REG_SIGNAL(closed, SOCKET);

/**
* 写成功信号
*/
SF_REG_SIGNAL(write_finished, SOCKET);

/**
* 写失败信号
*/
SF_REG_SIGNAL(write_error, SOCKET);

private:

std::vector<std::shared_ptr<sf_server_socket_filter>> filters__;

protected:

bool manage_clients__{ true };

void after_raw_recv_filter__(SOCKET sock,byte_array &data);
void after_recv_filter__(SOCKET sock,sf_pkg_header_t &header, byte_array &data);
void before_raw_send_filter__(SOCKET sock,byte_array &data);
void before_send_filter__(SOCKET sock,sf_pkg_header_t &header, byte_array &data);
void new_connection_filter__(SOCKET sock);
void listen_sock_filter__(SOCKET sock);
void disconnect_sock_filter__(SOCKET sock);

public:
SF_REG_SIGNAL(raw_data_coming, SOCKET, const byte_array &);
/**
* 关闭信号
*/
SF_REG_SIGNAL(closed, SOCKET);

/**
* 写成功信号
*/
SF_REG_SIGNAL(write_finished, SOCKET);

/**
* 写失败信号
*/
SF_REG_SIGNAL(write_error, SOCKET);

private:
std::vector<std::shared_ptr<sf_server_socket_filter>> filters__;

protected:
bool manage_clients__{true};

void after_raw_recv_filter__(SOCKET sock, byte_array &data);
void after_recv_filter__(SOCKET sock, sf_pkg_header_t &header,
byte_array &data);
void before_raw_send_filter__(SOCKET sock, byte_array &data);
void before_send_filter__(SOCKET sock, sf_pkg_header_t &header,
byte_array &data);
void new_connection_filter__(SOCKET sock);
void listen_sock_filter__(SOCKET sock);
void disconnect_sock_filter__(SOCKET sock);

public:
#pragma clang diagnostic push
#pragma ide diagnostic ignored "OCUnusedGlobalDeclarationInspection"
void add_server_socket_filter(std::shared_ptr<sf_server_socket_filter>&& filter);
void add_server_socket_filter(
std::shared_ptr<sf_server_socket_filter> &&filter);
#pragma clang diagnostic pop

/**
* 获取原始socket
* @return 原始socket
*/
virtual SOCKET get_raw_socket() = 0;
/**
* 监听端口
* @param ip 本地ip
* @param port 本地端口
* @return 是否监听成功
*/
virtual bool listen(const std::string &ip, unsigned short port) = 0;

/**
* 关闭服务器
*/
virtual void close() = 0;
/**
* 关闭指定连接
* @param sock 指定连接socket
*/
virtual void close(SOCKET sock) = 0;

/**
* 发送数据包
* @param sock socket
* @param type 包类型
* @param data 包数据
* @return 是否发送成功
*/
virtual bool send(SOCKET sock, int type, const byte_array &data) = 0;

/**
* 发送数据
* @param sock scoket
* @param data 数据
* @return 是否发送成功
*/
virtual bool send(SOCKET sock, const byte_array &data) = 0;

/**
* 获取server地址
* @param addr server地址信息
* @return 获取结果
*/
bool get_server_addr(sf_addr_info_t &addr);


virtual bool detach(SOCKET socket) = 0;
/**
* 获取原始socket
* @return 原始socket
*/
virtual SOCKET get_raw_socket() = 0;
/**
* 监听端口
* @param ip 本地ip
* @param port 本地端口
* @return 是否监听成功
*/
virtual bool listen(const std::string &ip, unsigned short port) = 0;

/**
* 关闭服务器
*/
virtual void close() = 0;
/**
* 关闭指定连接
* @param sock 指定连接socket
*/
virtual void close(SOCKET sock) = 0;

/**
* 发送数据包
* @param sock socket
* @param type 包类型
* @param data 包数据
* @return 是否发送成功
*/
virtual bool send(SOCKET sock, int type, const byte_array &data) = 0;

/**
* 发送数据
* @param sock scoket
* @param data 数据
* @return 是否发送成功
*/
virtual bool send(SOCKET sock, const byte_array &data) = 0;

/**
* 获取server地址
* @param addr server地址信息
* @return 获取结果
*/
bool get_server_addr(sf_addr_info_t &addr);

#pragma clang diagnostic push
#pragma ide diagnostic ignored "OCUnusedGlobalDeclarationInspection"
/**
* 设置是否管理客户端连接
* @param flag 是否管理
*/
void set_manage_clients(bool flag);
/**
* 设置是否管理客户端连接
* @param flag 是否管理
*/
void set_manage_clients(bool flag);
#pragma clang diagnostic pop
};
}
};
} // namespace skyfire
96 changes: 45 additions & 51 deletions network/sf_tcp_server_linux.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,86 +13,80 @@

#pragma once

#include <unistd.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <errno.h>
#include <sys/socket.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <sys/sysinfo.h>
#include <sys/types.h>
#include <unistd.h>
#include <memory>
#include <string>
#include "sf_tcp_utils.hpp"
#include "tools/sf_nocopy.h"
#include "core/sf_object.hpp"
#include "core/sf_type.hpp"
#include "sf_tcp_server_interface.h"
#include "sf_tcp_utils.hpp"
#include "tools/sf_json.hpp"
#include "tools/sf_logger.hpp"
#include "sf_tcp_server_interface.h"
#include <sys/sysinfo.h>
#include "tools/sf_nocopy.h"

#include "tools/sf_utils.h"

namespace skyfire {

namespace skyfire
{

struct sock_data_context_t{
epoll_event ev;
byte_array data_buffer_in;
std::deque<byte_array> data_buffer_out;
};

struct epoll_context_t{
int epoll_fd;
std::shared_ptr<std::recursive_mutex> mu_sock_context__ = std::make_shared<std::recursive_mutex>();
std::unordered_map<SOCKET, sock_data_context_t> sock_context__;
};

class sf_tcp_server : public sf_make_instance_t<sf_tcp_server ,sf_tcp_server_interface>
{
private:
int listen_fd__ = -1;
bool raw__ = false;
int thread_count__ = std::thread::hardware_concurrency() * 2 + 2;

std::vector<epoll_context_t> epoll_data__;
struct sock_data_context_t {
epoll_event ev{};
byte_array data_buffer_in{};
std::shared_ptr<std::shared_mutex> mu_data_buffer_out =
std::make_shared<std::shared_mutex>();
std::deque<byte_array> data_buffer_out{};
};

std::vector<std::thread> thread_vec__;
struct epoll_context_t {
int epoll_fd{};
std::unordered_map<SOCKET, sock_data_context_t> sock_context__{};
};

void work_thread(int index, bool listen_thread = false, SOCKET listen_fd = -1);
class sf_tcp_server
: public sf_make_instance_t<sf_tcp_server, sf_tcp_server_interface> {
private:
int listen_fd__ = -1;
bool raw__ = false;
int thread_count__ = std::thread::hardware_concurrency() * 2 + 2;

bool in_dispatch(SOCKET fd);
std::vector<std::thread> thread_vec__;

int find_fd_epoll_index(SOCKET fd);
void work_thread__(bool listen_thread = false, SOCKET listen_fd = -1);

bool handle_accept(int index);
bool in_dispatch__(SOCKET fd);

void handle_read(int index,const epoll_event &ev);
bool handle_accept__();

void handle_write(int index, const epoll_event &ev);
void handle_read__(const epoll_event &ev);

public:
SOCKET get_raw_socket() override;
void handle_write__(const epoll_event &ev);

explicit sf_tcp_server(bool raw = false);
epoll_context_t &epoll_data__() const;

~sf_tcp_server() override;
public:
SOCKET get_raw_socket() override;

bool listen(const std::string &ip, unsigned short port) override;
explicit sf_tcp_server(bool raw = false);

void close() override;
~sf_tcp_server() override;

void close(SOCKET sock) override;
bool listen(const std::string &ip, unsigned short port) override;

bool send(SOCKET sock, int type, const byte_array &data) override;
void close() override;

bool send(SOCKET sock, const byte_array &data) override;
void close(SOCKET sock) override;

bool detach(SOCKET sock) override ;
bool send(SOCKET sock, int type, const byte_array &data) override;

};
bool send(SOCKET sock, const byte_array &data) override;
};

}
} // namespace skyfire
Loading

0 comments on commit 6f46c54

Please sign in to comment.