Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix amop subscribe multiple topics in-once failed #2134

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bcos-gateway/bcos-gateway/libamop/TopicManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,11 @@ void TopicManager::removeTopics(
{
m_client2TopicItems[_client].erase(topic);
}
TOPIC_LOG(INFO) << LOG_BADGE("removeTopics") << LOG_KV("client", _client)
<< LOG_KV("topicSeq", topicSeq()) << LOG_KV("topic", topic);
}
incTopicSeq();
}
TOPIC_LOG(INFO) << LOG_BADGE("removeTopics") << LOG_KV("client", _client)
<< LOG_KV("topicSeq", topicSeq());
}

void TopicManager::removeTopicsByClients(const std::vector<std::string>& _clients)
Expand Down
29 changes: 14 additions & 15 deletions bcos-rpc/bcos-rpc/amop/AMOPClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,6 @@ bool AMOPClient::updateTopicInfos(
m_topicToSessions[item.topicName()][_session->endPoint()] = _session;
}
}
{
if (topicItems.size() > 0)
{
// record the topicInfo for re-subscribe to the gateway upon it become activate again
auto item = *(topicItems.begin());
m_topicInfos[item.topicName()] = _topicInfo;
}
}
return true;
}
/**
Expand Down Expand Up @@ -340,10 +332,10 @@ void AMOPClient::broadcastAMOPMessage(std::string const& _topic, std::shared_ptr
}
std::shared_ptr<WsSession> AMOPClient::randomChooseSession(std::string const& _topic)
{
ReadGuard l(x_topicToSessions);
AMOP_CLIENT_LOG(DEBUG) << LOG_DESC("randomChooseSession:")
<< LOG_KV("sessionSize", m_topicToSessions.size())
<< LOG_KV("topic", _topic);
ReadGuard l(x_topicToSessions);
if (!m_topicToSessions.count(_topic))
{
return nullptr;
Expand Down Expand Up @@ -387,8 +379,6 @@ void AMOPClient::onClientDisconnect(std::shared_ptr<WsSession> _session)
if (sessions.size() == 0)
{
topicsToRemove.emplace_back(it->first);
// remove the topicInfo
removeTopicInfo(it->first);
it = m_topicToSessions.erase(it);
continue;
}
Expand Down Expand Up @@ -451,7 +441,7 @@ void AMOPClient::removeTopicFromAllNodes(std::vector<std::string> const& topicsT
<< LOG_DESC("asyncRemoveTopic") << LOG_KV("gateway", endPointStr)
<< LOG_KV("removedSize", topicsToRemove.size())
<< LOG_KV("code", _error ? _error->errorCode() : 0)
<< LOG_KV("msg", _error ? _error->errorMessage() : "");
<< LOG_KV("msg", _error ? _error->errorMessage() : "success");
});
}
}
Expand Down Expand Up @@ -523,10 +513,19 @@ std::string AMOPClient::generateTopicInfo()
{
Json::Value topicInfo;
Json::Value topicItems(Json::arrayValue);
ReadGuard l(x_topicInfos);
for (auto const& it : m_topicInfos)
std::set<std::string> topicList;
ReadGuard l(x_topicToSessions);
for (auto const& it : m_topicToSessions)
{
if (topicList.count(it.first))
{
continue;
}
topicList.insert(it.first);
}
for (auto const& topicName : topicList)
{
topicItems.append(it.first);
topicItems.append(topicName);
}
topicInfo["topics"] = topicItems;
return topicInfo.toStyledString();
Expand Down
15 changes: 0 additions & 15 deletions bcos-rpc/bcos-rpc/amop/AMOPClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,16 +171,6 @@ class AMOPClient : public std::enable_shared_from_this<AMOPClient>

virtual void pingGatewayAndNotifyTopics();

virtual void removeTopicInfo(std::string const& _topicName)
{
UpgradableGuard l(x_topicInfos);
if (m_topicInfos.count(_topicName))
{
UpgradeGuard ul(l);
m_topicInfos.erase(_topicName);
}
}

virtual bool onGatewayInactivated(std::shared_ptr<boostssl::ws::WsMessage> _msg,
std::shared_ptr<boostssl::ws::WsSession> _session);
std::string generateTopicInfo();
Expand All @@ -199,11 +189,6 @@ class AMOPClient : public std::enable_shared_from_this<AMOPClient>
m_topicToSessions;
mutable SharedMutex x_topicToSessions;

// for re-subscribe topics
// [topicName, topicInfos]
std::map<std::string, std::string> m_topicInfos;
mutable SharedMutex x_topicInfos;

std::shared_ptr<Timer> m_gatewayStatusDetector;
std::atomic_bool m_gatewayActivated = {true};
};
Expand Down