Skip to content

Commit

Permalink
改进消息回调机制
Browse files Browse the repository at this point in the history
  • Loading branch information
prudens committed Nov 4, 2017
1 parent 5a69459 commit bd9f50c
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 38 deletions.
3 changes: 2 additions & 1 deletion base/timer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ namespace audio_engine
}
if(block)
{
// 这里有待改进,即是否要在退出的时候,把任务执行完。
this->condition.wait_for( lock, sleep_time,[this] { return this->_stop; } );
if(this->_stop && this->_tasks.empty())
if(this->_stop)
return;
}
}
Expand Down
1 change: 0 additions & 1 deletion outsource/real_audio_client/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ int main(int argc, char** argv)
filename = argv[1];
}
std::thread(run_engine, filename, "d:/").join();

return 0;
}

Expand Down
70 changes: 52 additions & 18 deletions outsource/real_audio_client/user_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ namespace audio_engine{
UserManager::~UserManager()
{
_user_service->UnRegisterHandler( this );
_user_service.reset();
}

void UserManager::SetEventCallback( UserEventHandler* handle )
Expand Down Expand Up @@ -86,11 +87,19 @@ namespace audio_engine{
audio_engine::UpdateUserExtend* user_extend = pb->mutable_update_user_extend();
user_extend->set_extend( extend );
user_extend->set_token( _token );
_user_service->SendPacket( pb, 2000ms, [=]( auto pb, auto timeout ){
if(_event_handle)
_user_service->SendPacket( pb, 2000ms, [=]( auto pb, auto ec ){
if(!ec)
{
_event_handle->UpdateUserExtend( _token, extend, ERR_TIME_OUT );
RecvPacket( pb );
}
else
{
if(_event_handle)
{
_event_handle->UpdateUserExtend( _token, extend, ERR_TIME_OUT );
}
}

} );
}

Expand All @@ -101,10 +110,17 @@ namespace audio_engine{
user_state->set_state( state );
user_state->set_src_token( _token );
user_state->set_dst_token( dst_token );
_user_service->SendPacket( pb, 2000ms, [=]( auto, auto ){
if(_event_handle)
_user_service->SendPacket( pb, 2000ms, [=]( auto pb, auto ec ){
if(!ec)
{
_event_handle->UpdateUserState( _token, dst_token, state, ERR_TIME_OUT );
RecvPacket(pb);
}
else
{
if(_event_handle)
{
_event_handle->UpdateUserState( _token, dst_token, state, ERR_TIME_OUT );
}
}
} );
}
Expand All @@ -114,9 +130,17 @@ namespace audio_engine{
auto pb = std::make_shared<RAUserMessage>();
auto logout_req = pb->mutable_request_logout();
logout_req->set_token( _token );
_user_service->SendPacket( pb, 1000ms, [this]( auto, auto ){
Log.w( "logout time out" );
Transform( LS_CONNECTED );
_user_service->SendPacket( pb, 1000ms, [this]( auto pb, auto ec ){
if(!ec)
{
RecvPacket( pb );
}
else
{
Log.w( "logout time out" );
Transform( LS_CONNECTED );
}

} );
Transform( LS_LOGOUT );
}
Expand All @@ -143,8 +167,11 @@ namespace audio_engine{

void UserManager::DisConnectServer()
{
Log.d( "UserManager::DisConnectServer()1\n" );
_user_service->DisconnectServer();
Log.d( "UserManager::DisConnectServer()2\n" );
Transform( LS_INIT );

}

void UserManager::VerifyAccount()
Expand All @@ -156,15 +183,22 @@ namespace audio_engine{
login_req->set_devtype( _device_type );
login_req->set_state( _user_state );
login_req->set_version( "20171028" );
_user_service->SendPacket( pb, 5000ms, [this]( auto, auto ){
if(++_try_login_count > MAX_TRY_LOGIN)
_user_service->SendPacket( pb, 5000ms, [this]( auto pb, auto ec ){
if(!ec)
{
_target_state_internel = LS_NONE;
Transform( LS_NONE );
RecvPacket( pb );
}
else
{
Transform( LS_CONNECTED );
if(++_try_login_count > MAX_TRY_LOGIN)
{
_target_state_internel = LS_NONE;
Transform( LS_NONE );
}
else
{
Transform( LS_CONNECTED );
}
}
} );
Transform( LS_VERIFY_ACCOUNT );
Expand Down Expand Up @@ -304,7 +338,8 @@ namespace audio_engine{

bool UserManager::HandleError( std::error_code ec )
{
if(_user_service->IsConnectServer() || LS_INIT < _cur_state)
Log.d( "UserManager::HandleError()\n" );
if(_target_state_internel > LS_INIT)
{
Log.w( "socket error 读写失败:%s\n", ec.message().c_str() );
if(++_try_login_count > MAX_TRY_LOGIN)
Expand Down Expand Up @@ -439,22 +474,21 @@ namespace audio_engine{
default:
break;
}

// });
}


void UserManager::Update( LoginState state )
{
if(_cur_state != state)
{
_cur_state = state;
_cur_state_time++;
if(_event_handle && ( state % 2 == 0 ))
{
_event_handle->UpdateLoginState( state );
}
}
_cur_state = state;


}
}
18 changes: 8 additions & 10 deletions outsource/real_audio_client/user_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ namespace audio_engine{
{
HandleError( ec );
}

} );
}

Expand Down Expand Up @@ -151,7 +150,7 @@ namespace audio_engine{
{
_task->AddTask( [wait]
{
wait->cb( wait->pb, wait->timeout );
wait->cb( wait->pb, std::make_error_code(std::errc::timed_out) );
} );
it = _req_packet_list.erase( it );
}
Expand Down Expand Up @@ -195,8 +194,8 @@ namespace audio_engine{
{
auto wait = it->second;
_req_packet_list.erase( it );
_task->AddTask( [wait]{
wait->cb( wait->pb, wait->timeout );
_task->AddTask( [=]{
wait->cb( pb, std::error_code() );
} );
return;
}
Expand All @@ -213,23 +212,22 @@ namespace audio_engine{
_lock_handle.unlock();
}

void UserService::SendPacket( RAUserMessagePtr pb, tick_t timeout, std::function<void( RAUserMessagePtr, tick_t )> cb )
void UserService::SendPacket( RAUserMessagePtr pb, tick_t timeout, std::function<void( RAUserMessagePtr, std::error_code )> cb )
{
int sn = 0;
_sns_mutex.lock();
if(++_sn < 0)
++_sn;
if( _sn <= 0)
{
_sn = 1;
}
_sn = _sn;

pb->set_sn( _sn );
WaitRespPacketPtr wait = std::make_shared<WaitRespPacket>();
wait->timeout = timeout;
wait->cb = std::move(cb);
wait->pb = pb;
wait->calltime = TimeStamp();
_req_packet_list[sn] = wait;
_req_packet_list[_sn] = wait;
_sns_mutex.unlock();
auto buf = _proto_packet.Build( pb );
if(buf)
Expand All @@ -240,7 +238,7 @@ namespace audio_engine{

void UserService::SendPacket( RAUserMessagePtr pb )
{
pb->set_sn( _sn );
pb->set_sn( 0 );
auto buf = _proto_packet.Build( pb );
if(buf)
{
Expand Down
4 changes: 2 additions & 2 deletions outsource/real_audio_client/user_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace audio_engine{
RAUserMessagePtr pb;
tick_t timeout;
tick_t calltime;
std::function<void( RAUserMessagePtr, tick_t )> cb;
std::function<void( RAUserMessagePtr, std::error_code )> cb;
};
typedef std::shared_ptr<WaitRespPacket> WaitRespPacketPtr;
class UserService : public std::enable_shared_from_this<UserService>
Expand All @@ -34,7 +34,7 @@ namespace audio_engine{
void RegisterHandler( ProtoPacketizer *p );
void UnRegisterHandler( ProtoPacketizer* p );
void RecvPacket( std::error_code ec, std::shared_ptr<RAUserMessage> pb );
void SendPacket( RAUserMessagePtr pb, tick_t timeout, std::function<void( RAUserMessagePtr, tick_t )> cb );
void SendPacket( RAUserMessagePtr pb, tick_t timeout, std::function<void( RAUserMessagePtr, std::error_code )> cb );
void SendPacket( RAUserMessagePtr pb );
private:
void Read( BufferPtr buf );
Expand Down
10 changes: 6 additions & 4 deletions outsource/real_audio_server/user.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,12 @@ namespace audio_engine{
if(pb->has_request_login())
{
auto login_req = pb->request_login();
HandleLogin( login_req );
HandleLogin( login_req,pb->sn() );
}
if(pb->has_request_logout())
{
auto logout_req = pb->request_logout();
HandleLogout( logout_req );
HandleLogout( logout_req,pb->sn() );
}

if(pb->has_update_user_extend())
Expand Down Expand Up @@ -169,7 +169,7 @@ namespace audio_engine{

}

void User::HandleLogin( const audio_engine::RequestLogin& login_req )
void User::HandleLogin( const audio_engine::RequestLogin& login_req,int sn )
{
_userid = login_req.userid();
_extend = login_req.extend();
Expand All @@ -180,6 +180,7 @@ namespace audio_engine{
login_res->set_token( _token );
login_res->set_userid( _userid );
login_res->set_error_code( 0 );
pb->set_sn( sn );
auto user_list = pb->mutable_notify_user_list();
auto users = _host->GetUserList();
auto size = users.size();
Expand All @@ -203,7 +204,7 @@ namespace audio_engine{
printf( "用户:%s登陆\n", _userid.c_str() );
}

void User::HandleLogout( const ::audio_engine::RequestLogout& logout_req )
void User::HandleLogout( const ::audio_engine::RequestLogout& logout_req,int sn )
{
auto self = shared_from_this();
auto token = logout_req.token();
Expand All @@ -220,6 +221,7 @@ namespace audio_engine{
auto logout_res = pb->mutable_responed_logout();
logout_res->set_token( _token );
logout_res->set_error_code( 0 );
pb->set_sn(sn);
BufferPtr buf = _proto_packet.Build( pb );
if(buf)
{
Expand Down
4 changes: 2 additions & 2 deletions outsource/real_audio_server/user.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ namespace audio_engine{
void Read();
void Write( int type, BufferPtr buf );
void HandleError( std::error_code ec );
void HandleLogin( const audio_engine::RequestLogin& login_req );
void HandleLogout( const ::audio_engine::RequestLogout& logout_req );
void HandleLogin( const audio_engine::RequestLogin& login_req,int sn );
void HandleLogout( const ::audio_engine::RequestLogout& logout_req,int sn );
std::string _userid;
std::string _extend;
int64_t _token = 0;
Expand Down

0 comments on commit bd9f50c

Please sign in to comment.