Skip to content

Commit

Permalink
Merge pull request #151 from csoler/v0.6-Msgs
Browse files Browse the repository at this point in the history
Bug fixing in message IDs that would causes all sorts of errors
  • Loading branch information
csoler authored Sep 23, 2024
2 parents c54e026 + 3b38f1b commit 5e3d142
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 14 deletions.
188 changes: 175 additions & 13 deletions src/services/p3msgservice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ p3MsgService::p3MsgService( p3ServiceControl *sc, p3IdService *id_serv,
p3GxsTrans& gxsMS )
: p3Service(), p3Config(),
gxsOngoingMutex("p3MsgService Gxs Outgoing Mutex"), mIdService(id_serv),
mServiceCtrl(sc), mMsgMtx("p3MsgService"), mMsgUniqueId(0),
mServiceCtrl(sc), mMsgMtx("p3MsgService"),
recentlyReceivedMutex("p3MsgService recently received hash mutex"),
mGxsTransServ(gxsMS)
{
Expand All @@ -130,7 +130,6 @@ p3MsgService::p3MsgService( p3ServiceControl *sc, p3IdService *id_serv,

/* MsgIds are not transmitted, but only used locally as a storage index.
* As such, thay do not need to be different at friends nodes. */
mMsgUniqueId = 1;

mShouldEnableDistantMessaging = true;
mDistantMessagingEnabled = false;
Expand Down Expand Up @@ -171,7 +170,13 @@ p3MsgService::~p3MsgService()
uint32_t p3MsgService::getNewUniqueMsgId()
{
RS_STACK_MUTEX(mMsgMtx); /********** STACK LOCKED MTX ******/
return mMsgUniqueId++;

uint32_t res;

do { res = RsRandom::random_u32(); } while(mAllMessageIds.find(res)!= mAllMessageIds.end());

mAllMessageIds.insert(res);
return res;
}

int p3MsgService::tick()
Expand Down Expand Up @@ -647,8 +652,6 @@ bool p3MsgService::parseList_backwardCompatibility(std::list<RsItem*>& load)
auto msi = new RsMailStorageItem();
msi->msg = *mitem;
msg_map[mitem->msgId] = msi;

mMsgUniqueId = std::max(mMsgUniqueId,mitem->msgId+1);
}
else if(nullptr != (mti = dynamic_cast<RsMsgTags *>(it)))
msg_tags.push_back(mti);
Expand Down Expand Up @@ -789,7 +792,9 @@ bool p3MsgService::parseList_backwardCompatibility(std::list<RsItem*>& load)

bool p3MsgService::loadList(std::list<RsItem*>& load)
{
auto gxsmIt = load.begin();
RS_STACK_MUTEX(mMsgMtx); // lock ere, because we need to load, then check for duplicates, and this needs to be done in the same lock.

auto gxsmIt = load.begin();
RsMsgGRouterMap* gxsmailmap = dynamic_cast<RsMsgGRouterMap*>(*gxsmIt);
if(gxsmailmap)
{
Expand All @@ -801,7 +806,6 @@ bool p3MsgService::loadList(std::list<RsItem*>& load)
}

std::list<RsItem*> unhandled_items;
uint32_t max_msg_id = 0 ;

// load items and calculate next unique msgId
for(auto it = load.begin(); it != load.end(); ++it)
Expand Down Expand Up @@ -887,13 +891,9 @@ bool p3MsgService::loadList(std::list<RsItem*>& load)
{
RsErr() << "Loaded msg with msg.to=" << msi->to ;

if(msi->msg.msgId > max_msg_id)
max_msg_id = msi->msg.msgId ;

/* STORE MsgID */
if (msi->msg.msgId != 0)
{
RS_STACK_MUTEX(mMsgMtx);

/* switch depending on the PENDING
* flags
Expand Down Expand Up @@ -923,7 +923,6 @@ bool p3MsgService::loadList(std::list<RsItem*>& load)
else
unhandled_items.push_back(*it);
}
mMsgUniqueId = max_msg_id+1;

parseList_backwardCompatibility(unhandled_items);

Expand All @@ -934,9 +933,173 @@ bool p3MsgService::loadList(std::list<RsItem*>& load)

load.clear();

#ifdef MSG_DEBUG
// list all the msg Ids
auto print_msgids = [](const std::map<uint32_t,RsMailStorageItem*>& mp,const std::string& name) {
std::cerr << "Message ids in box " << name << " : " << std::endl;
for(auto it:mp)
std::cerr << " " << it.first << " " << it.second->msg.msgId << std::endl;
};
print_msgids(mSentMessages,"Sent");
print_msgids(mTrashMessages,"Trash");
print_msgids(mDraftMessages,"Drafts");
print_msgids(mReceivedMessages,"Received");

std::cerr << "Outgoing messages: " << std::endl;
for(auto m:msgOutgoing)
{
std::cerr << " parent " << m.first << " : " << std::endl;
for(auto p:m.second)
std::cerr << " " << p.first << std::endl;
}
#endif

// This was added on Sept 20, 2024. It is here to fix errors following a bug that caused duplication of
// some message ids. This should be kept because it also creates the list that is stored in mAllMessageIds,
// that is further used by getNewUniqueId() to create unique message Ids in a more robust way than before.

locked_checkForDuplicates();
return true;
}

// Two generic methods to replace elements in a map, when the first (resp. second) element matches an id to substitute.

template<class T> void replace_first(std::map<uint32_t,T>& mp,uint32_t old_id,uint32_t new_id)
{
auto tt = mp.find(old_id);
if(tt == mp.end())
return;

auto sec = tt->second;
mp.erase(tt);
mp[new_id] = sec;
}

template<class T> void replace_second(std::map<T,uint32_t>& mp,uint32_t old_id,uint32_t new_id)
{
for(auto& it:mp)
if(it.second == old_id)
it.second = new_id;
}
void p3MsgService::locked_checkForDuplicates()
{
std::set<uint32_t> already_known_ids;
std::set<RsMailMessageId> changed_msg_ids;

auto replace_parent = [](std::map<uint32_t,RsMailStorageItem*>& mp,uint32_t old_id,uint32_t new_id)
{
for(auto& it:mp)
if(it.second->parentId == old_id)
{
RsWarn() << "Replacing parent ID " << old_id << " of message " << it.first << " with new parent " << new_id << std::endl;
it.second->parentId = new_id;
}
};

auto check = [&already_known_ids,&changed_msg_ids,this,replace_parent](std::map<uint32_t,RsMailStorageItem*>& mp,const std::string& name)
{
std::map<uint32_t,RsMailStorageItem*> new_mp;

for(std::map<uint32_t,RsMailStorageItem*>::iterator it(mp.begin());it!=mp.end();)
{
if(already_known_ids.find(it->first)!=already_known_ids.end())
{
// generate a new ID
uint32_t old_id = it->first;
uint32_t new_id;
do { new_id = RsRandom::random_u32() ; } while(already_known_ids.find(new_id)!=already_known_ids.end());

already_known_ids.insert(new_id);
changed_msg_ids.insert(std::to_string(new_id));

RsWarn() << "Duplicate ID " << it->first << " found in message box " << name << ". Will be replaced by new ID " << new_id << std::endl;

// replace the old ID by the new, everywhere

// 1 - in the map itself

it->second->msg.msgId = new_id;
new_mp[new_id] = it->second; // put the modified item in a new map, so as not to have the same item visited twice in this loop.

auto tmp = it; // remove the item from the map
tmp++;
mp.erase(it);
it = tmp;

// Next, we replace the old id by the new, everywhere it is mentionned. Of course this may not be correct, since
// the actual old id may be mentionned on purpose. Still, there is absolutely no way to know which is the right one.

// 2 - everywhere it is designated as parent

replace_parent(mTrashMessages, old_id,new_id);
replace_parent(mSentMessages, old_id,new_id);
replace_parent(mDraftMessages, old_id,new_id);
replace_parent(mReceivedMessages,old_id,new_id);

// 3 - mMsgOutgoing refers to original msg in Sent, so the substitution must happen there too

replace_first(msgOutgoing,old_id,new_id);

// 4 - in GRouter and GxsTrans correspondance maps, and recently received messages

replace_second(_grouter_ongoing_messages ,old_id,new_id);
replace_second(gxsOngoingMessages ,old_id,new_id);
replace_second(mRecentlyReceivedMessageHashes,old_id,new_id);

// 6 - in mParentId correspondance map

replace_first(mParentId,old_id,new_id);
}
else
++it;

already_known_ids.insert(it->first);
}
mp.insert(new_mp.begin(),new_mp.end()); // merge back the new list in the modified one
};

check(mTrashMessages,"mTrashMessages");
check(mSentMessages,"mSentMessages");
check(mDraftMessages,"mDraftMessages");
check(mReceivedMessages,"mReceivedMessages");

// now check msgOutgoing. The first element refers to an element in mSentMessages, so it's already been treated

for(auto& it:msgOutgoing)
{
std::map<uint32_t,uint32_t> to_switch;

for(auto sit:it.second)
if(already_known_ids.find(sit.first) != already_known_ids.end())
{
uint32_t new_id;
do { new_id = RsRandom::random_u32() ; } while(already_known_ids.find(new_id)!=already_known_ids.end());

RsWarn() << "Duplicate ID " << sit.first << " found in msgOutgoing. Will be replaced by new ID " << new_id << std::endl;

to_switch[sit.first] = new_id;
changed_msg_ids.insert(std::to_string(new_id));
already_known_ids.insert(new_id);
}
else
already_known_ids.insert(sit.first);

for(auto sit:to_switch)
replace_first(it.second,sit.first,sit.second);
}

mAllMessageIds = already_known_ids;

if(!changed_msg_ids.empty())
{
IndicateConfigChanged(RsConfigMgr::CheckPriority::SAVE_NOW);

auto pEvent = std::make_shared<RsMailStatusEvent>();
pEvent->mMailStatusEventCode = RsMailStatusEventCode::MESSAGE_CHANGED;
pEvent->mChangedMsgIds = changed_msg_ids;
rsEvents->postEvent(pEvent);
}
}
void p3MsgService::loadWelcomeMsg()
{
/* Load Welcome Message */
Expand Down Expand Up @@ -2698,7 +2861,6 @@ RsMsgItem *p3MsgService::createOutgoingMessageItem(const RsMailStorageItem& msi,
void p3MsgService::debug_dump()
{
std::cerr << "Dump of p3MsgService data:" << std::endl;
std::cerr << " mMsgUniqueId: " << mMsgUniqueId << std::endl;
auto display_box = [=](const std::map<uint32_t,RsMailStorageItem*>& msgs,const std::string& box_name) {
std::cerr << " " + box_name + ":" << std::endl;
for(auto msg:msgs)
Expand Down
5 changes: 4 additions & 1 deletion src/services/p3msgservice.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ class p3MsgService :
private:
void locked_sendDistantMsgItem(RsMsgItem *msgitem, const RsGxsId &from, uint32_t msgId);
bool locked_getMessageTag(const std::string &msgId, Rs::Msgs::MsgTagInfo& info);
void locked_checkForDuplicates();
RsMailStorageItem *locked_getMessageData(uint32_t mid) const;

/** This contains the ongoing tunnel handling contacts.
Expand Down Expand Up @@ -242,7 +243,9 @@ class p3MsgService :

std::map<uint32_t, RsMsgTagType*> mTags;

uint32_t mMsgUniqueId;
// Set of messages ids used. Any new msg id generated will be checked against this set and added to it.
std::set<uint32_t> mAllMessageIds;

std::map<Sha1CheckSum, uint32_t> mRecentlyReceivedMessageHashes;
RsMutex recentlyReceivedMutex;

Expand Down

0 comments on commit 5e3d142

Please sign in to comment.