diff --git a/Makefile b/Makefile index 815456c..fe40777 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ OS = $(shell uname) APP = libevlite -VERSION = 9.6.0 +VERSION = 9.6.1 PREFIX = /usr/local # 主版本号 @@ -21,8 +21,8 @@ endif # 默认选项 LFLAGS = -ggdb -lpthread -CFLAGS = -Wall -Wformat=0 -Iinclude/ -Isrc/ -Itest/ -ggdb -fPIC -O2 -DNDEBUG -D__EVENT_VERSION__=\"$(REALNAME)\" #-DUSE_REUSEPORT #-DUSE_ATOMIC -CXXFLAGS = -Wall -Wformat=0 -Iinclude/ -Isrc/ -Itest/ -ggdb -fPIC -O2 -DNDEBUG -D__EVENT_VERSION__=\"$(REALNAME)\" #-DUSE_REUSEPORT #-DUSE_ATOMIC +CFLAGS = -Wall -Wformat=0 -Iinclude/ -Isrc/ -Itest/ -ggdb -fPIC -O2 -DNDEBUG -D__EVENT_VERSION__=\"$(REALNAME)\" -DUSE_ATOMIC #-DUSE_REUSEPORT +CXXFLAGS = -Wall -Wformat=0 -Iinclude/ -Isrc/ -Itest/ -ggdb -fPIC -O2 -DNDEBUG -D__EVENT_VERSION__=\"$(REALNAME)\" -DUSE_ATOMIC #-DUSE_REUSEPORT # 动态库编译选项 ifeq ($(OS),Darwin) @@ -130,6 +130,7 @@ chatroom_client: io.o chatroom_client.o $(OBJS) clean : rm -rf *.o rm -rf *.log + rm -rf core rm -rf *.core rm -rf core.* rm -rf vgcore.* diff --git a/docs/README b/docs/README index aea4f79..98ecfa6 100644 --- a/docs/README +++ b/docs/README @@ -26,7 +26,7 @@ Makefile.* - Makefile 1000 113MBytes/s 41MBytes/s 1000 110MBytes/s 43MBytes/s 10000 119MBytes/s 20MBytes/s - 10000 110MBytes/s 37MBytes/s + 10000 110MBytes/s 37MBytes/s 2. ���ֵ����⣺ 1) 1000������ʱ����������л��ﵽ50K, ��20000�����ӵ��л���300��;[�ѽ��, vmstatͳ������] @@ -39,8 +39,8 @@ Makefile.* - Makefile 6000 9000,[330~38000] 130MBytes/s 7000 6000,[300~19000] 131MBytes/s 8000 1000,[300~18000] 127MBytes/s - 10000 350 130MBytes/s - + 10000 350 130MBytes/s + 2) ͬ��1000�����ӵ�ʱ��, IO�̵߳ȴ�ʱ��Ϊ0ʱ(IO�߳�������̯ѹ��), �������л��������ͣ������������; �ġ�Accept-Lockģ�͵Ļ��Է����� @@ -51,6 +51,9 @@ Makefile.* - Makefile 2. ���Խ���� 1) accept�¼�������һ���߳��з���, ��û��ԭ����Ƶ�Ч������ѹ�����ԵĽű������������Ľ��Ҳ�ǿ�������ġ� �Ͼ��������Ӷ��ɹ��󣬲ſ�ʼ�������ݵģ�����һ��ʼ���̻߳���û��ʲôѹ���� - + �塢ͬ���Ŀ��������У�centos6.0��freebsd8.2���˽�30%�� +��������������Ƚ�(muduo, Ubuntu 14.04.5, Intel(R) Xeon(R) CPU E5606 @ 2.13GHz, 8��8G) + moduo 1000�����ӣ�4��IO�̣߳�4K�����ݰ���197.515234375 MiB/s throughput + libevlite 1000�����ӣ�4��IO�̣߳�4K�����ݰ���245.58203125 MiB/s throughput diff --git a/docs/benchmark.md b/docs/benchmark.md new file mode 100644 index 0000000..4793bde --- /dev/null +++ b/docs/benchmark.md @@ -0,0 +1,29 @@ +# libevlite性能测试 + + + +### 一、吞吐量测试结果 + +##### 1. 硬件配置 + +2CPU4Core + +CPU型号 Intel(R) Xeon(R) CPU E5606 @ 2.13GHz + +8G内存 + +Ubuntu 14.04.5 + +##### 2. 4K数据包,4个IO线程 + +| 连接数 | 1000 | 2000 | 10000 | +| --------- | ------- | ------- | ------- | +| muduo | 197.515 | 232.825 | 244.675 | +| libevlite | 245.582 | 238.293 | 244.478 | + +##### 3. 16K数据包,4个IO线程 + +| 连接数 | 1000 | 2000 | 10000 | +| --------- | ------- | ------- | ------- | +| muduo | 596.692 | 598.942 | 588.075 | +| libevlite | 630.339 | 598.021 | 571.337 | \ No newline at end of file diff --git a/libevlite.xcodeproj/project.pbxproj b/libevlite.xcodeproj/project.pbxproj index 2d33dde..f69639f 100644 --- a/libevlite.xcodeproj/project.pbxproj +++ b/libevlite.xcodeproj/project.pbxproj @@ -585,7 +585,7 @@ isa = XCBuildConfiguration; buildSettings = { DYLIB_COMPATIBILITY_VERSION = 9; - DYLIB_CURRENT_VERSION = 9.5.2; + DYLIB_CURRENT_VERSION = 9.6.1; EXECUTABLE_PREFIX = ""; GCC_PREPROCESSOR_DEFINITIONS = ( "DEBUG=1", @@ -599,7 +599,7 @@ isa = XCBuildConfiguration; buildSettings = { DYLIB_COMPATIBILITY_VERSION = 9; - DYLIB_CURRENT_VERSION = 9.5.2; + DYLIB_CURRENT_VERSION = 9.6.1; EXECUTABLE_PREFIX = ""; PRODUCT_NAME = "$(TARGET_NAME)"; }; diff --git a/src/channel.c b/src/channel.c index ce04e11..a1f0790 100644 --- a/src/channel.c +++ b/src/channel.c @@ -49,7 +49,6 @@ int32_t _transmit( struct session * session ) int32_t iov_size = 0; struct iovec iov_array[iov_max]; - memset( iov_array, 0, sizeof(iov_array) ); for ( i = 0; i < session_sendqueue_count(session) && iov_size < iov_max; ++i ) { diff --git a/src/iolayer.c b/src/iolayer.c index d9bd753..116eb53 100644 --- a/src/iolayer.c +++ b/src/iolayer.c @@ -438,8 +438,12 @@ int32_t iolayer_send( iolayer_t self, sid_t id, const char * buf, uint32_t nbyte int32_t iolayer_broadcast( iolayer_t self, sid_t * ids, uint32_t count, const char * buf, uint32_t nbytes ) { - uint8_t i = 0; + if ( unlikely( ids == NULL || count == 0 ) ) + { + return 0; + } + uint8_t i = 0; pthread_t threadid = pthread_self(); struct iolayer * layer = (struct iolayer *)self; @@ -594,6 +598,11 @@ int32_t iolayer_shutdown( iolayer_t self, sid_t id ) int32_t iolayer_shutdowns( iolayer_t self, sid_t * ids, uint32_t count ) { + if ( unlikely( ids == NULL || count == 0 ) ) + { + return 0; + } + uint8_t i = 0; struct iolayer * layer = (struct iolayer *)self; diff --git a/src/kqueue.c b/src/kqueue.c index d19dba3..2c1d9b1 100644 --- a/src/kqueue.c +++ b/src/kqueue.c @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -127,7 +128,7 @@ int32_t kqueue_add( void * arg, struct event * ev ) struct kevent kev; struct kqueuer * poller = (struct kqueuer *)arg; - memset( &kev, 0, sizeof(kev) ); + bzero( &kev, sizeof(kev) ); if ( ev->events & EV_READ ) { @@ -186,7 +187,7 @@ int32_t kqueue_del( void * arg, struct event * ev ) return 0; } - memset( &kev, 0, sizeof(kev) ); + bzero( &kev, sizeof(kev) ); if ( ev->events & EV_READ ) { diff --git a/src/session.c b/src/session.c index 5315b2b..168a3ed 100644 --- a/src/session.c +++ b/src/session.c @@ -554,7 +554,7 @@ struct hashnode * _find_table( struct hashtable * table, sid_t id, int32_t flag struct hashnode * node = NULL; struct hashnode * entries = table->entries + bucket; - do + for ( ; entries != NULL; entries = entries->next ) { if ( entries->session != NULL ) { @@ -568,15 +568,7 @@ struct hashnode * _find_table( struct hashtable * table, sid_t id, int32_t flag { node = entries; } - - if ( entries->next == NULL ) - { - break; - } - - entries = entries->next; } - while ( 1 ); if ( node == NULL && flag != 0 ) { @@ -623,7 +615,7 @@ struct session * _find_session( struct hashtable * table, sid_t id ) { return NULL; } - if ( node->session == NULL ) + else if ( node->session == NULL ) { return NULL; } @@ -640,7 +632,7 @@ int32_t _remove_session( struct hashtable * table, struct session * s ) { return -1; } - if ( node->session == NULL ) + else if ( node->session == NULL ) { return -2; } diff --git a/src/utils.c b/src/utils.c index 2318701..d1a5e14 100644 --- a/src/utils.c +++ b/src/utils.c @@ -1,5 +1,6 @@ #include +#include #include #include #include @@ -105,7 +106,7 @@ int32_t tcp_accept( int32_t fd, char * remotehost, uint16_t * remoteport ) *remoteport = 0; remotehost[0] = 0; - memset( &in_addr, 0, sizeof(in_addr) ); + bzero( &in_addr, sizeof(in_addr) ); cfd = accept( fd, (struct sockaddr *)&in_addr, &len ); if ( cfd != -1 ) @@ -131,7 +132,7 @@ int32_t tcp_listen( const char * host, uint16_t port, void (*options)(int32_t) ) // 对描述符的选项操作 options( fd ); - memset( &addr, 0, sizeof(addr) ); + bzero( &addr, sizeof(addr) ); addr.sin_family = AF_INET; addr.sin_port = htons( port ); if ( host == NULL @@ -184,7 +185,7 @@ int32_t tcp_connect( const char * host, uint16_t port, void (*options)(int32_t) // 对描述符的选项操作 options( fd ); - memset( &addr, 0, sizeof(addr) ); + bzero( &addr, sizeof(addr) ); addr.sin_family = AF_INET; addr.sin_port = htons(port); inet_pton(AF_INET, host, (void *)&(addr.sin_addr.s_addr)); @@ -371,6 +372,7 @@ struct msgqueue * msgqueue_create( uint32_t size ) int32_t rc = -1; int32_t fds[2] = { -1, -1 }; + // TODO: linux2.6直接使用eventfd rc = pipe( fds ); //rc = socketpair( AF_UNIX, SOCK_STREAM, 0, fds ); if ( rc == -1 ) diff --git a/src/utils.h b/src/utils.h index fa11e55..2c29fee 100644 --- a/src/utils.h +++ b/src/utils.h @@ -59,9 +59,9 @@ int32_t tcp_connect( const char * host, uint16_t port, void (*options)(int32_t) uint32_t getpower( uint32_t size ); uint32_t nextpow2( uint32_t size ); -/* - * sidlist - */ +// +// sidlist +// struct sidlist { uint32_t count; @@ -78,6 +78,10 @@ int32_t sidlist_adds( struct sidlist * self, sid_t * ids, uint32_t count ); sid_t sidlist_del( struct sidlist * self, int32_t index ); void sidlist_destroy( struct sidlist * self ); +// +// 消息队列 +// + // 任务类型 enum { @@ -108,10 +112,10 @@ struct task QUEUE_PADDING_HEAD(taskqueue, struct task); QUEUE_PROTOTYPE(taskqueue, struct task) -/* - * 消息队列 - * 线程安全的消息队列, 有通知的功能 - */ +// +// 消息队列 +// 线程安全的消息队列, 有通知的功能 +// struct msgqueue { struct taskqueue queue; diff --git a/test/chatroom_server.cpp b/test/chatroom_server.cpp index 7ed01d0..941af65 100644 --- a/test/chatroom_server.cpp +++ b/test/chatroom_server.cpp @@ -220,14 +220,6 @@ void CChatRoomService::run() std::swap( swapqueue, m_TaskQueue ); pthread_mutex_unlock( &m_TaskLock ); - if ( !m_Perform ) - { - TASK * t = new TASK(); - t->data = 1000; - perform( t, t->clone, t->perform ); - m_Perform = true; - } - for ( std::deque::iterator it = swapqueue.begin(); it != swapqueue.end(); ++it ) { Task * task = &(*it); @@ -237,7 +229,6 @@ void CChatRoomService::run() case 0 : { m_SessionMap.push_back( task->sid ); - perform( task->sid, 1, (void *)(++m_UniqueID) ); } break; diff --git a/test/echoserver.cpp b/test/echoserver.cpp index 5277f4b..009e20f 100644 --- a/test/echoserver.cpp +++ b/test/echoserver.cpp @@ -102,6 +102,8 @@ int main() return -1; } + service->start(); + if ( !service->listen( "127.0.0.1", 9029 ) ) { printf("service start failed \n"); diff --git a/test/io.cpp b/test/io.cpp index fd83999..bafb13c 100644 --- a/test/io.cpp +++ b/test/io.cpp @@ -159,6 +159,11 @@ void IIOService::stop() delete m_ListenContexts[i]; } + for ( size_t i = 0; i < m_ConnectContexts.size(); ++i ) + { + delete m_ConnectContexts[i]; + } + if ( m_IOContextGroup != NULL ) { for ( uint8_t i = 0; i < m_ThreadsCount; ++i ) @@ -171,43 +176,46 @@ void IIOService::stop() delete [] m_IOContextGroup; } + + m_ListenContexts.clear(); + m_ConnectContexts.clear(); } void IIOService::halt() { - SidList sids; - - // 获取所有会话ID - pthread_mutex_lock( &m_Lock ); - for ( size_t i = 0; i < m_RemoteHosts.size(); ++i ) - { - sids.push_back( m_RemoteHosts[i].sid ); - } - pthread_mutex_unlock( &m_Lock ); - if ( m_IOLayer != NULL ) { - // 关闭连接的服务器 - this->shutdown( sids ); // 停止网络库 iolayer_stop( m_IOLayer ); } } -sid_t IIOService::id( const char * host, uint16_t port ) +bool IIOService::isConnecting( const char * host, uint16_t port ) { - sid_t sid = 0; + bool found = false; pthread_mutex_lock( &m_Lock ); - sid = this->getRemoteSid( host, port ); + for ( size_t i = 0; i < m_ConnectContexts.size(); ++i ) + { + if ( m_ConnectContexts[i]->port == port + && m_ConnectContexts[i]->host == std::string(host) ) + { + found = true; + break; + } + } pthread_mutex_unlock( &m_Lock ); - return sid == (sid_t)-1 ? 0 : sid ; + return found; } bool IIOService::listen( const char * host, uint16_t port ) { ListenContext * context = new ListenContext( port, this ); + if ( context == NULL ) + { + return false; + } pthread_mutex_lock( &m_Lock ); m_ListenContexts.push_back( context ); @@ -216,19 +224,28 @@ bool IIOService::listen( const char * host, uint16_t port ) return ( iolayer_listen( m_IOLayer, host, port, onAcceptSession, context ) == 0 ); } -bool IIOService::connect( const char * host, uint16_t port, int32_t seconds, bool isblock ) +sid_t IIOService::connect( const char * host, uint16_t port, int32_t seconds, bool isblock ) { - if ( iolayer_connect( m_IOLayer, host, port, seconds, onConnectSession, this ) != 0 ) + ConnectContext * context = new ConnectContext( host, port, this ); + if ( context == NULL ) { - return false; + return -1; } - if ( !isblock ) + if ( iolayer_connect( m_IOLayer, host, port, seconds, onConnectSession, context ) != 0 ) { - return true; + return -1; } - sid_t connectedsid = 0; + // 异步模式需要加入连接的队列中 + if ( !isblock ) + { + pthread_mutex_lock( &m_Lock ); + m_ConnectContexts.push_back( context ); + pthread_mutex_unlock( &m_Lock ); + + return 0; + } // 计算超时时间 struct timeval now; @@ -237,8 +254,8 @@ bool IIOService::connect( const char * host, uint16_t port, int32_t seconds, boo pthread_mutex_lock( &m_Lock ); for ( ;; ) { - connectedsid = getRemoteSid( host, port ); - if ( connectedsid != 0 ) + if ( context->sid != 0 + && context->sid != (sid_t)-1 ) { break; } @@ -254,7 +271,10 @@ bool IIOService::connect( const char * host, uint16_t port, int32_t seconds, boo } pthread_mutex_unlock( &m_Lock ); - return connectedsid > 0 && connectedsid != (sid_t)-1; + sid_t connectedsid = context->sid; + + delete context; + return connectedsid; } int32_t IIOService::send( sid_t id, const std::string & buffer ) @@ -280,28 +300,46 @@ int32_t IIOService::broadcast( const char * buffer, uint32_t nbytes ) return iolayer_broadcast2( m_IOLayer, buffer, nbytes ); } -int32_t IIOService::broadcast( const std::vector & ids, const std::string & buffer ) +int32_t IIOService::broadcast( const sids_t & ids, const std::string & buffer ) { + if ( ids.empty() ) + { + return 0; + } + uint32_t nbytes = static_cast(buffer.size()); const char * buf = static_cast( buffer.data() ); uint32_t count = (uint32_t)ids.size(); - std::vector::const_iterator start = ids.begin(); + sids_t::const_iterator start = ids.begin(); return iolayer_broadcast( m_IOLayer, const_cast( &(*start) ), count, buf, nbytes ); } -int32_t IIOService::broadcast( const std::vector & ids, const char * buffer, uint32_t nbytes ) +int32_t IIOService::broadcast( const sids_t & ids, const char * buffer, uint32_t nbytes ) { + if ( ids.empty() ) + { + return 0; + } + uint32_t count = (uint32_t)ids.size(); - std::vector::const_iterator start = ids.begin(); + sids_t::const_iterator start = ids.begin(); return iolayer_broadcast( m_IOLayer, const_cast( &(*start) ), count, buffer, nbytes ); } -int32_t IIOService::perform( sid_t sid, int32_t type, void * task ) +int32_t IIOService::perform( sid_t sid, int32_t type, void * task, void (*recycle)(int32_t, void *) ) { - return iolayer_perform( m_IOLayer, sid, type, task ); + int32_t rc = iolayer_perform( + m_IOLayer, sid, type, task, recycle ); + + if ( rc != 0 ) + { + recycle( type, task ); + } + + return rc; } int32_t IIOService::perform( void * task, void * (*clone)( void * ), void (*perform)( void *, void * ) ) @@ -314,9 +352,14 @@ int32_t IIOService::shutdown( sid_t id ) return iolayer_shutdown( m_IOLayer, id ); } -int32_t IIOService::shutdown( const std::vector & ids ) +int32_t IIOService::shutdown( const sids_t & ids ) { - std::vector::const_iterator start = ids.begin(); + if ( ids.empty() ) + { + return 0; + } + + sids_t::const_iterator start = ids.begin(); uint32_t count = (uint32_t)ids.size(); sid_t * idlist = const_cast( &(*start) ); @@ -324,6 +367,39 @@ int32_t IIOService::shutdown( const std::vector & ids ) return iolayer_shutdowns( m_IOLayer, idlist, count ); } +void IIOService::notifyConnectResult( ConnectContext * context, int32_t result, sid_t id, int32_t ack ) +{ + bool isremove = ( result == 0 || ack != 0 ); + + pthread_mutex_lock( &m_Lock ); + + // 会话ID的转换 + context->sid = result != 0 ? -1 : id; + + // 连接成功或者不再重连的情况下 + // 需要从连接队列中删除 + if ( isremove ) + { + ConnectContexts::iterator it = m_ConnectContexts.begin(); + for ( ; it != m_ConnectContexts.end(); ) + { + if ( (*it) != context ) + { + ++it; + } + else + { + it = m_ConnectContexts.erase( it ); + delete context; + break; + } + } + } + + pthread_cond_signal( &m_Cond ); + pthread_mutex_unlock( &m_Lock ); +} + void IIOService::attach( sid_t id, IIOSession * session, void * iocontext, const std::string & host, uint16_t port ) { session->init( id, iocontext, m_IOLayer, host, port ); @@ -340,48 +416,6 @@ void IIOService::attach( sid_t id, IIOSession * session, void * iocontext, const iolayer_set_service( m_IOLayer, id, &ioservice, session ); } -sid_t IIOService::getRemoteSid( const char * host, uint16_t port ) const -{ - for ( size_t i = 0; i < m_RemoteHosts.size(); ++i ) - { - if ( m_RemoteHosts[i].port == port - && m_RemoteHosts[i].host == std::string( host ) ) - { - return m_RemoteHosts[i].sid; - } - } - - return 0; -} - -void IIOService::setRemoteSid( const char * host, uint16_t port, sid_t sid ) -{ - pthread_mutex_lock( &m_Lock ); - - if ( sid != 0 ) - { - bool is_add = true; - - for ( size_t i = 0; i < m_RemoteHosts.size(); ++i ) - { - if ( m_RemoteHosts[i].port == port - && m_RemoteHosts[i].host == std::string( host ) ) - { - is_add = false; - m_RemoteHosts[i].sid = sid; - } - } - - if ( is_add ) - { - m_RemoteHosts.push_back( RemoteHost( host, port, sid ) ); - } - } - pthread_cond_signal( &m_Cond ); - - pthread_mutex_unlock( &m_Lock ); -} - char * IIOService::onTransformService( void * context, const char * buffer, uint32_t * nbytes ) { uint32_t & _nbytes = *nbytes; @@ -407,23 +441,31 @@ int32_t IIOService::onAcceptSession( void * context, void * iocontext, sid_t id, int32_t IIOService::onConnectSession( void * context, void * iocontext, int32_t result, const char * host, uint16_t port, sid_t id ) { - IIOSession * session = NULL; - IIOService * service = static_cast( context ); + ConnectContext * ctx = static_cast(context); - // 通知 - service->setRemoteSid( host, port, result != 0 ? -1 : id ); + int32_t ack = 0; + IIOSession * session = NULL; + IIOService * service = ctx->service; if ( result != 0 ) { - return service->onConnectFailed( result, host, port ) ? 0 : -2; + ack = service->onConnectFailed( result, host, port ) ? 0 : -2; } - - session = service->onConnectSucceed( id, host, port ); - if ( session == NULL ) + else { - return -1; + session = service->onConnectSucceed( id, host, port ); + if ( session == NULL ) + { + ack = -1; + } + else + { + service->attach( id, session, iocontext, host, port ); + } } - service->attach( id, session, iocontext, host, port ); - return 0; + // 通知连接结果 + service->notifyConnectResult( ctx, result, id, ack ); + + return ack; } diff --git a/test/io.h b/test/io.h index 718dcd2..d0e9c5a 100644 --- a/test/io.h +++ b/test/io.h @@ -6,10 +6,10 @@ #include #include -#include "event.h" -#include "network.h" +#include "evlite/event.h" +#include "evlite/network.h" -typedef std::vector SidList; +typedef std::vector sids_t; // // 会话, 非线程安全的 @@ -129,18 +129,30 @@ public : bool start(); void stop(); + // 暂停对外服务(不可恢复) + void halt(); + // 获取版本号 static const char * version(); - // 停止对外服务 - void halt(); + // 监听 + bool listen( const char * host, uint16_t port ); - // 获取连接成功的会话ID - sid_t id( const char * host, uint16_t port ); + // 是否正在异步连接 + bool isConnecting( const char * host, uint16_t port ); - // 连接/监听 - bool listen( const char * host, uint16_t port ); - bool connect( const char * host, uint16_t port, int32_t seconds, bool isblock = false ); + // 连接远程服务器 + // 参数: + // host - 主机地址 + // port - 主机端口 + // seconds - 超时时间 + // isblock - 是否阻塞的连接 + // + // 返回值: + // -1 - 连接失败 + // 0 - 正在连接 + // >0 - 连接成功返回会话ID + sid_t connect( const char * host, uint16_t port, int32_t seconds, bool isblock = false ); // 发送数据 int32_t send( sid_t id, const std::string & buffer ); @@ -149,63 +161,72 @@ public : // 广播数据 int32_t broadcast( const std::string & buffer ); int32_t broadcast( const char * buffer, uint32_t nbytes ); - int32_t broadcast( const std::vector & ids, const std::string & buffer ); - int32_t broadcast( const std::vector & ids, const char * buffer, uint32_t nbytes ); - - // perform - int32_t perform( sid_t sid, int32_t type, void * task ); - int32_t perform( void * task, void * (*clone)( void * ), void (*perform)( void *, void * ) ); + int32_t broadcast( const sids_t & ids, const std::string & buffer ); + int32_t broadcast( const sids_t & ids, const char * buffer, uint32_t nbytes ); // 终止会话 int32_t shutdown( sid_t id ); - int32_t shutdown( const std::vector & ids ); + int32_t shutdown( const sids_t & ids ); + + // 跨线程提交任务 + int32_t perform( sid_t sid, + int32_t type, void * task, void (*recycle)(int32_t, void *) ); + int32_t perform( void * task, void * (*clone)( void * ), void (*perform)( void *, void * ) ); private : - struct RemoteHost + // 监听上下文 + struct ListenContext { - sid_t sid; uint16_t port; - std::string host; + IIOService * service; - RemoteHost() - : sid( 0 ), - port( 0 ) + ListenContext() + : port( 0 ), + service( NULL ) {} - RemoteHost( const char * host, uint16_t port, sid_t sid ) - { - this->sid = sid; - this->host = host; - this->port = port; - } + ListenContext( uint16_t p, IIOService * s ) + : port( p ), + service( s ) + {} }; - struct ListenContext + // 连接上下文 + struct ConnectContext { + sid_t sid; uint16_t port; + std::string host; IIOService * service; - ListenContext() - : port( 0 ), + ConnectContext() + : sid( 0 ), + port( 0 ), service( NULL ) {} - ListenContext( uint16_t p, IIOService * s ) - : port( p ), + ConnectContext( const char * h, uint16_t p, IIOService * s ) + : sid( 0 ), + port( p ), + host( h ), service( s ) {} }; - typedef std::vector RemoteHosts; typedef std::vector ListenContexts; + typedef std::vector ConnectContexts; + +private : + // 通知连接结果 + void notifyConnectResult( + ConnectContext * context, + int32_t result, sid_t id, int32_t ack ); + // 会话ID和会话的绑定 void attach( sid_t id, IIOSession * session, void * iocontext, const std::string & host, uint16_t port ); - sid_t getRemoteSid( const char * host, uint16_t port ) const; - void setRemoteSid( const char * host, uint16_t port, sid_t sid ); - static char * onTransformService( void * context, const char * buffer, uint32_t * nbytes ); static int32_t onAcceptSession( void * context, void * iocontext, sid_t id, const char * host, uint16_t port ); @@ -222,8 +243,8 @@ private : private : pthread_cond_t m_Cond; pthread_mutex_t m_Lock; - RemoteHosts m_RemoteHosts; - ListenContexts m_ListenContexts; + ListenContexts m_ListenContexts; // 正在监听的会话 + ConnectContexts m_ConnectContexts; // 正在连接的会话 }; #endif diff --git a/test/pingpong.c b/test/pingpong.c index 6b40fd3..eb4865c 100644 --- a/test/pingpong.c +++ b/test/pingpong.c @@ -66,11 +66,6 @@ int32_t onProcess( void * context, const char * buf, uint32_t nbytes ) return nprocess; } -char * onTransform( void * context, const char * buf, uint32_t * nbytes ) -{ - return (char *)buf; -} - int32_t onTimeout( void * context ) { return 0; @@ -92,12 +87,9 @@ void onShutdown( void * context, int32_t way ) free( s ); } -void onPerform( void * context, int32_t type, void * task ) -{} - -char * onLayerTransform( void * context, const char * buf, uint32_t * nbytes ) +int32_t onPerform( void * context, int32_t type, void * task ) { - return (char *)buf; + return 0; } int32_t onLayerAccept( void * context, void * local, sid_t id, const char * host, uint16_t port ) @@ -113,7 +105,7 @@ int32_t onLayerAccept( void * context, void * local, sid_t id, const char * host ioservice_t ioservice; ioservice.start = onStart; ioservice.process = onProcess; - ioservice.transform = onTransform; + ioservice.transform = NULL; ioservice.timeout = onTimeout; ioservice.keepalive = onKeepalive; ioservice.error = onError; @@ -154,7 +146,6 @@ int main( int32_t argc, char ** argv ) return -2; } - iolayer_set_transform( layer, onLayerTransform, layer ); iolayer_listen( layer, host, port, onLayerAccept, layer) ; g_Running = 1;