From 23b55f3e0774683a49527f9fe4c54e22d1afb465 Mon Sep 17 00:00:00 2001 From: cynico Date: Thu, 28 Dec 2023 18:45:45 +0200 Subject: [PATCH] fix: sending times --- src/CustomMessage_m.h | 7 +++ src/Makefile | 4 +- src/node.cc | 131 +++++++++++++++++++++++++++--------------- src/node.h | 57 ++++++------------ src/util.cc | 42 -------------- src/util.h | 49 +++++++++++++++- 6 files changed, 155 insertions(+), 135 deletions(-) diff --git a/src/CustomMessage_m.h b/src/CustomMessage_m.h index 35838dd..bb5cdda 100644 --- a/src/CustomMessage_m.h +++ b/src/CustomMessage_m.h @@ -68,6 +68,13 @@ #define DATA_FRAME 2 #define COORDINATOR_FRAME 3 +#define DELAY_OFFSET 4 + +#define DELAYED_NACK_FRAME NACK_FRAME + DELAY_OFFSET +#define DELAYED_ACK_FRAME ACK_FRAME + DELAY_OFFSET +#define DELAYED_DATA_FRAME DATA_FRAME + DELAY_OFFSET + + class CustomMessage_Base : public ::omnetpp::cPacket { protected: diff --git a/src/Makefile b/src/Makefile index 1501df8..0dc7409 100644 --- a/src/Makefile +++ b/src/Makefile @@ -1,12 +1,12 @@ # -# OMNeT++/OMNEST Makefile for projcet +# OMNeT++/OMNEST Makefile for project # # This file was generated with the command: # opp_makemake -f --deep # # Name of target to be created (-o option) -TARGET = projcet$(D)$(EXE_SUFFIX) +TARGET = project$(D)$(EXE_SUFFIX) TARGET_DIR = . # User interface (uncomment one) (-u option) diff --git a/src/node.cc b/src/node.cc index ad3fd08..652c3b1 100644 --- a/src/node.cc +++ b/src/node.cc @@ -75,7 +75,7 @@ int Node::modifyPayload(vecBitset8 &payload) { return bit; } -bool Node::sendDataFrame(int lineNumber, int dataSequenceNumber, bool errorFree, float &delay) { +bool Node::sendDataFrame(int lineNumber, int dataSequenceNumber, bool errorFree, float &realTime, bool timeout=false) { std::string line; @@ -85,11 +85,12 @@ bool Node::sendDataFrame(int lineNumber, int dataSequenceNumber, bool errorFree, vecBitset8 payload = ConvertStringToBits(line.substr(5), true); CustomMessage_Base* msgToSend = new CustomMessage_Base(); - msgToSend->setFrameType(DATA_FRAME); + msgToSend->setFrameType(DELAYED_DATA_FRAME); msgToSend->setParity(CalculateChecksum(payload)); msgToSend->setDataSequence(dataSequenceNumber); - this->log("At time [%.3f], Node [%d], Introducing channel error with code = [%s]", simTime().dbl() + this->senderInfo.offsetFromRealTime, this->id, line.substr(0,4).c_str()); + realTime = this->getRealTime(&this->senderInfo.delayedMessages); + this->log("At time [%.3f], Node [%d], Introducing channel error with code = [%s]", realTime, this->id, line.substr(0,4).c_str()); bool lost = false, duplicated = false; int modifiedBit = -1; @@ -112,30 +113,32 @@ bool Node::sendDataFrame(int lineNumber, int dataSequenceNumber, bool errorFree, if (errorPrefix.test(0)) errorDelay = Info::errorDelay; } + if (!timeout && errorFree) errorDelay += 0.001; + msgToSend->setPayload(payload); + float processingTime = (int)!r.readFromBuffer * Info::processingTime; - this->senderInfo.offsetFromRealTime += processingTime; + realTime = this->insertIntoDelayed(&this->senderInfo.delayedMessages, msgToSend, lost, processingTime, errorDelay); + + scheduleAt(realTime, msgToSend); // Log output this->log("At time [%.3f], Node [%d] [sent] frame with seq_num = [%d] and payload = [%s] and trailer = [%s], Modified [%d], Lost [%s], Duplicate [%d], Delay [%.3f]", - simTime().dbl() + this->senderInfo.offsetFromRealTime, this->id, dataSequenceNumber, ConvertBitsToString(payload).c_str(), + realTime, this->id, dataSequenceNumber, ConvertBitsToString(payload).c_str(), msgToSend->getParity().to_string().c_str(), modifiedBit, lost? "Yes": "No", (int)duplicated, errorDelay); - msgToSend->setPayload(payload); - delay = this->senderInfo.offsetFromRealTime + Info::transmissionDelay; - // If duplicated, send a duplicate message with the duplication delay. if (duplicated) { - this->log("At time [%.3f], Node [%d] sent frame with seq_num = [%d] and payload = [%s] and trailer = [%s], Modified [%d], Lost [%s], Duplicate [%d], Delay [%.3f]", - simTime().dbl() + this->senderInfo.offsetFromRealTime, this->id, dataSequenceNumber, ConvertBitsToString(payload).c_str(), + CustomMessage_Base* duplicatedMsg = msgToSend->dup(); + this->insertIntoDelayed(&this->senderInfo.delayedMessages, duplicatedMsg, lost, 0, Info::duplicationDelay + errorDelay); + this->log("At time [%.3f], Node [%d] [sent] frame with seq_num = [%d] and payload = [%s] and trailer = [%s], Modified [%d], Lost [%s], Duplicate [%d], Delay [%.3f]", + realTime, this->id, dataSequenceNumber, ConvertBitsToString(payload).c_str(), msgToSend->getParity().to_string().c_str(), modifiedBit, lost? "Yes": "No", 2, errorDelay); - if (!lost) { - CustomMessage_Base* duplicatedMsg = msgToSend->dup(); - sendDelayed(duplicatedMsg, delay + Info::duplicationDelay + errorDelay, "peer$o"); - } + // if (lost) delete duplicatedMsg; + scheduleAt(realTime, duplicatedMsg); } - if (lost) delete msgToSend; - else sendDelayed(msgToSend, delay + errorDelay, "peer$o"); + //if (lost) delete msgToSend; + //else sendDelayed(msgToSend, delay + errorDelay, "peer$o"); return true; } @@ -196,8 +199,6 @@ Timer* Node::deleteTimers(int ackSequence, bool prev) { // prev is true on ack, false on nack void Node::advanceWindowAndSendFrames(int ackSequence, bool prev) { - this->senderInfo.offsetFromRealTime = 0; - int oldWStart = this->senderInfo.wStart; // Updating wCurrent with the (n)ack sequence number. @@ -215,7 +216,7 @@ void Node::advanceWindowAndSendFrames(int ackSequence, bool prev) { this->deleteTimers(startingSeq, prev); // Advancing wCurrent and sending new frames as much as our window can expand. - float delay; + float realTime = 0.0; bool errorFree = prev ? false : true; // errorFree is true for the first message to be sent in case of nack. int framesToSend = Info::windowSize - this->modulus(this->senderInfo.wCurrent - this->senderInfo.wStart); @@ -233,23 +234,20 @@ void Node::advanceWindowAndSendFrames(int ackSequence, bool prev) { int lineNumber = this->senderInfo.currentLineOffset + this->modulus(this->senderInfo.wCurrent - this->senderInfo.wStart); int dataSequenceNumber = this->senderInfo.wCurrent; - if (!this->sendDataFrame(lineNumber, dataSequenceNumber, errorFree, delay)) { - this->log("At time [%.3f], Node [%d] found no more lines to send. Checking if we should terminate now", simTime().dbl() + this->senderInfo.offsetFromRealTime, this->id); + if (!this->sendDataFrame(lineNumber, dataSequenceNumber, errorFree, realTime)) { + this->log("At time [%.3f], Node [%d] found no more lines to send. Checking if we should terminate now", simTime().dbl(), this->id); this->checkTermination(); - this->log("At time [%.3f], Node [%d] is waiting for outstanding acks to terminate.", simTime().dbl() + this->senderInfo.offsetFromRealTime, this->id); + this->log("At time [%.3f], Node [%d] is waiting for outstanding acks to terminate.", simTime().dbl(), this->id); return; } // Setting errorFree to false in case of nack. // Increasing the delay by 0.001 as per the document. - if (errorFree) { - delay += 0.001; - errorFree = false; - } + if (errorFree) errorFree = false; // Create a timer Timer* timer = this->createTimer(dataSequenceNumber); - scheduleAt( simTime() + delay + Info::timeout, timer->msg); + scheduleAt( realTime + Info::timeout, timer->msg); this->senderInfo.wCurrent = this->modulus(this->senderInfo.wCurrent + 1); } @@ -263,8 +261,19 @@ void Node::sender(CustomMessage_Base *msg) { // within the window. if (msg->isSelfMessage()) { + if (msg->getFrameType() == DELAYED_DATA_FRAME) { + DelayedMessage* m = GetElementFromLinkedList(&this->senderInfo.delayedMessages, msg); + + if (!m->lost) { + msg->setFrameType(msg->getFrameType() - DELAY_OFFSET); + sendDelayed(msg, Info::transmissionDelay + m->extraDelay, "peer$o"); + } + DeleteElementFromLinkedList(&this->senderInfo.delayedMessages, msg); + return; + } + int timedoutSequenceNumber = msg->getAckSequence(); - if (timedoutSequenceNumber != -1) this->log("At time [%.3f], Node [%d] timeout event for frame with seq_num = [%d]", simTime().dbl() + this->senderInfo.offsetFromRealTime, this->id, timedoutSequenceNumber); + if (timedoutSequenceNumber != -1) this->log("At time [%.3f], Node [%d] timeout event for frame with seq_num = [%d]", simTime().dbl(), this->id, timedoutSequenceNumber); // Cancel all timers starting from the timer that expired and onwards, i.e: all timers. // Clarification: if a timer expired, it MUST have been the first timer in the window, by the very @@ -272,8 +281,7 @@ void Node::sender(CustomMessage_Base *msg) { if (this->senderInfo.timers) this->deleteTimers(this->senderInfo.timers->msg->getAckSequence(), false); int dataSequenceNumber, lineNumber; - float delay; bool errorFree = false; - this->senderInfo.offsetFromRealTime = 0; + float realTime; bool errorFree = false; for (int i = 0; i < Info::windowSize; i++) { dataSequenceNumber = this->modulus(this->senderInfo.wStart + i); @@ -283,11 +291,11 @@ void Node::sender(CustomMessage_Base *msg) { // If this is the message that caused the timeout, then send it error-free. if (dataSequenceNumber == timedoutSequenceNumber) errorFree = true; - if (!this->sendDataFrame(lineNumber, dataSequenceNumber, errorFree, delay)) + if (!this->sendDataFrame(lineNumber, dataSequenceNumber, errorFree, realTime, true)) return this->checkTermination(); Timer* timer = this->createTimer(dataSequenceNumber); - scheduleAt( simTime() + delay + Info::timeout, timer->msg); + scheduleAt( realTime + Info::timeout, timer->msg); } senderInfo.wCurrent = this->modulus(senderInfo.wStart + Info::windowSize); @@ -317,44 +325,73 @@ void Node::sender(CustomMessage_Base *msg) { void Node::checkTermination() { if (this->senderInfo.wStart == this->senderInfo.wCurrent) { - this->log("At time [%.3f], Node [%d] finished sending and receiving acks.", simTime().dbl() + this->senderInfo.offsetFromRealTime, this->id); + this->log("At time [%.3f], Node [%d] finished sending and receiving acks.", simTime().dbl(), this->id); exit(0); } } +double Node::getRealTime(linkedList *l) { + + // Calculating the supposed delay in processing times + double realTime; + if (!l->end) realTime = simTime().dbl(); + else realTime = simTime().dbl() >= l->end->realTime ? simTime().dbl() : l->end->realTime; + + return realTime; +} + +double Node::insertIntoDelayed(linkedList *l, CustomMessage_Base* msg, bool lost, double processingTime, double extraDelay) { + + // Calculating the supposed delay in processing times + double realTime = this->getRealTime(l) + processingTime; + + InsertAtEndOfLinkedList(l); + l->end->N = msg; + l->end->simTime = simTime().dbl(); + l->end->realTime = realTime; + l->end->extraDelay = extraDelay; + l->end->lost = lost; + + return realTime; +} + void Node::sendAck(int ACK_TYPE, bool LP) { bool lost = false; CustomMessage_Base* ack = new CustomMessage_Base(); - ack->setFrameType(ACK_TYPE); + ack->setFrameType(ACK_TYPE + DELAY_OFFSET); ack->setAckSequence(this->receiverInfo.expectedFrameSequence); if ((uniform(0,1) <= Info::ackLossProb) && LP) lost = true; - else { - - this->receiverInfo.accumulatingProcessingTimes += Info::processingTime; - sendDelayed(ack, this->receiverInfo.accumulatingProcessingTimes + Info::transmissionDelay, "peer$o"); - } + double realTime = this->insertIntoDelayed(&this->receiverInfo.delayedMessages, ack, lost, Info::processingTime); - this->log("At time [%.3f], Node [%d] sending [%s] with number [%d], loss [%s]", simTime().dbl() + this->receiverInfo.accumulatingProcessingTimes, this->id, + this->log("At time [%.3f], Node [%d] sending [%s] with number [%d], loss [%s]", realTime, this->id, ACK_TYPE ? "ACK" : "NACK", ack->getAckSequence(), lost? "Yes" : "No"); if (lost) delete ack; + else scheduleAt(realTime, ack); + } void Node::receiver(CustomMessage_Base *msg) { - // If not an in-order message, drop and send an ack with the sequence of the next expected frame sequence. - if (this->receiverInfo.lastAckSimTime != simTime().dbl()) { - this->receiverInfo.lastAckSimTime = simTime().dbl(); - this->receiverInfo.accumulatingProcessingTimes = 0; + // If this is a delayed ack or nack + if (msg->isSelfMessage()) { + DelayedMessage* m = GetElementFromLinkedList(&this->receiverInfo.delayedMessages, msg); + if (!m->lost) { + msg->setFrameType(msg->getFrameType() - DELAY_OFFSET); + sendDelayed(msg, Info::transmissionDelay + m->extraDelay, "peer$o"); + } + DeleteElementFromLinkedList(&this->receiverInfo.delayedMessages, msg); + return; } + // If not an in-order message, drop and send an ack with the sequence of the next expected frame sequence. if (msg->getDataSequence() != this->receiverInfo.expectedFrameSequence) { - this->log("At time [%.3f], Node [%d] dropped out-of-order frame with seq_num = [%d]. Expecting seq_num = [%d]", simTime().dbl() + this->receiverInfo.accumulatingProcessingTimes, this->id, msg->getDataSequence(), this->receiverInfo.expectedFrameSequence); - cancelAndDelete(msg); + this->log("At time [%.3f], Node [%d] dropped out-of-order frame with seq_num = [%d]. Expecting seq_num = [%d]", this->getRealTime(&this->receiverInfo.delayedMessages), this->id, msg->getDataSequence(), this->receiverInfo.expectedFrameSequence); this->sendAck(ACK_FRAME, false); + cancelAndDelete(msg); return; } @@ -363,7 +400,7 @@ void Node::receiver(CustomMessage_Base *msg) { this->receiverInfo.expectedFrameSequence += valid ? 1 : 0; this->receiverInfo.expectedFrameSequence = this->modulus(this->receiverInfo.expectedFrameSequence); - if (valid) this->log("Uploading payload = \"%s\" and seq_num = [%d] to the network layer", ConvertBitsToString(*(msg->getPayload()), true).c_str(), msg->getDataSequence()); + if (valid) this->log("Uploading payload = \"%s\" At time [%.3f] and seq_num = [%d] to the network layer", ConvertBitsToString(*(msg->getPayload()), true).c_str(), this->getRealTime(&this->receiverInfo.delayedMessages), msg->getDataSequence()); this->sendAck(int(valid)); cancelAndDelete(msg); diff --git a/src/node.h b/src/node.h index 336aa72..ad4d8bc 100644 --- a/src/node.h +++ b/src/node.h @@ -29,6 +29,16 @@ struct Timer { Timer* prev = NULL; }; +struct DelayedMessage { + CustomMessage_Base* N = NULL; + double simTime = 0; + double realTime = 0; + DelayedMessage* next = NULL; + DelayedMessage* prev = NULL; + bool lost = false; + double extraDelay = 0; +}; + struct SenderInfo { // Sequence number of the start of the window. @@ -48,37 +58,7 @@ struct SenderInfo { // This is a vector of self-messages, acting as timeouts. Timer* timers = NULL; - - // Ad-hoc solution coming your way.. - - // Used to calculate the right time to schedule messages. - // Specifically, consider the following example, with the following parameters: - // simTime() = 0, Info::windowSize = 3, Info::processingTime = 0.5 - - // The sender sends: - // First frame: - // Sent at = simTime() + Info::processingTime = 0 + 0.5 = 0.5 - // Given that the sender doesn't either: busy-wait, or schedules a self-message, - // it will proceed to the next frame - - // Second frame: - // Sent at = simTime() + Info::processingTime = 0 + 0.5 = 0.5 => Not accurate - - // Which is the same as the time for the first frame, because it didn't - // account for the time supposedly spent processing the frame. - // We fix this by accumulating the processing times in offsetFromRealTime - // so the way it goes: - - // First frame: - // Sent at = simTime() + offsetFromRealTime + Info::processingTime = 0 + 0 + 0.5 = 0.5 - // offsetFromRealTime = offsetFromRealTime + Info::processingTime = 0 + 0.5 = 0.5 - - // Second frame: - // Sent at = simTime() + offsetFromRealTime + Info::processingTime = 0 + 0.5 + 0.5 = 1 => Accurate - // offsetFromRealTime = offsetFromRealTime + Info::processingTime = 0.5 + 0.5 = 1 - - // After sending all the frames our window can accomodate, we zero out this offsetFromRealTime. - float offsetFromRealTime = 0; + linkedList delayedMessages; }; struct ReceiverInfo { @@ -86,15 +66,7 @@ struct ReceiverInfo { // The next expected frame sequence. int expectedFrameSequence = 0; - // This serves a roughly similar role to senderInfo.offsetFromRealTime described above. - // Specifically in the case when the senders send multiple frames at the same time t. - // This happens when those frames are being resent, either due to a timeout or an NACK, - // thus no processing time at the sender's end. - - // All the frames would thus be received at the same time t`. If we didn't handle it, - // all the ack frames would be sent at (t` + 0.5), which is not logical. - float lastAckSimTime = -1.0; - float accumulatingProcessingTimes = 0.0; + linkedList delayedMessages; }; class Node : public cSimpleModule @@ -122,7 +94,7 @@ class Node : public cSimpleModule // calculates the delay and returns it in the parameter delay. // Returns true if there was more lines to read from the file, and a frame was successfully sent, // false otherwise. - virtual bool sendDataFrame(int lineNumber, int dataSequenceNumber, bool errorFree, float &delay); + virtual bool sendDataFrame(int lineNumber, int dataSequenceNumber, bool errorFree, float &realTime, bool timeout); // Sends an ACK/NACK, applying the ack loss probability if LP = true // @@ -152,6 +124,9 @@ class Node : public cSimpleModule // Logs the given printf-style string and format to omnte++ simulation stdout, // and the log file `output.txt` template void log(const char * f, Args... args); + + virtual double insertIntoDelayed(linkedList *l, CustomMessage_Base* msg, bool lost, double processingTime, double extraDelay=0); + virtual double getRealTime(linkedList *l); private: // The id of the node. diff --git a/src/util.cc b/src/util.cc index 8edef0c..c3bbaf4 100644 --- a/src/util.cc +++ b/src/util.cc @@ -144,45 +144,3 @@ bitset8 CalculateChecksum(vecBitset8 const &bytes) { return result; } -template void DeleteElementFromLinkedList(linkedList* l, N i) { - for (auto it = l->start; it; it = it->next) { - if (it->N == i) { - - T* toDelete = it; - - if (it == l->start) l->start = l->start->next; - if (it == l->end) l->end = l->end->next; - delete toDelete; - - if (l->size != -1) l->availSpace += 1; - return; - } - } -} - -template void InsertAtEndOfLinkedList(linkedList* l) { - - if (l->size != -1) { - // Deleting the oldest element in the list if there is no free space. - if (l->availSpace == 0) DeleteElementFromLinkedList(l, l->start->N); - l->availSpace -= 1; - } - - // Inserting the new element at the end. - if (!l->start) l->start = l->end = new T(); - else { - l->end->next = new T(); - l->end = l->end->next; - } -} - -template T* GetElementFromLinkedList(linkedList* l, N i) { - - for (auto it = l->start; it; it = it->next) { - if (it->N == i) - return it; - } - - return NULL; -} - diff --git a/src/util.h b/src/util.h index 4f8add5..347e4cb 100644 --- a/src/util.h +++ b/src/util.h @@ -32,6 +32,8 @@ struct TextLine { int N; TextLine* next = NULL; + TextLine* prev = NULL; + }; // Result struct for TextFile::ReadNthLine @@ -97,8 +99,49 @@ bitset8 CalculateChecksum(vecBitset8 const &bytes); // Insertion, deletion, and retrieval of elements in a singly-linked list, optionally with size-constrained. // Despite being generic, only currently used in TextFile. // I also realize now I could've just used a ready one from stl -template void InsertAtEndOfLinkedList(linkedList* l); -template T* GetElementFromLinkedList(linkedList* l, N i); -template void DeleteElementFromLinkedList(linkedList* l, N i); +template void DeleteElementFromLinkedList(linkedList* l, N i) { + for (T* it = l->start; it; it = it->next) { + if (it->N == i) { + + if (it->next) it->next->prev = it->prev; + if (it->prev) it->prev->next = it->next; + + if (it == l->start) l->start = l->start->next; + if (it == l->end) l->end = (it->prev) ? it->prev : NULL; + + if (l->size != -1) l->availSpace += 1; + + delete it; + return; + } + } +} + +template void InsertAtEndOfLinkedList(linkedList* l) { + + if (l->size != -1) { + // Deleting the oldest element in the list if there is no free space. + if (l->availSpace == 0) DeleteElementFromLinkedList(l, l->start->N); + l->availSpace -= 1; + } + + // Inserting the new element at the end. + if (!l->start) l->start = l->end = new T(); + else { + T* oldEnd = l->end; + l->end->next = new T(); + l->end = l->end->next; + l->end->prev = oldEnd; + } +} + +template T* GetElementFromLinkedList(linkedList* l, N i) { + + for (T* it = l->start; it; it = it->next) + if (it->N == i) return it; + + return NULL; +} + #endif /* UTIL_H_ */