Skip to content

Commit

Permalink
fix: sending times
Browse files Browse the repository at this point in the history
  • Loading branch information
cynico committed Dec 28, 2023
1 parent b7b22ff commit 23b55f3
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 135 deletions.
7 changes: 7 additions & 0 deletions src/CustomMessage_m.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@
#define DATA_FRAME 2
#define COORDINATOR_FRAME 3

#define DELAY_OFFSET 4

#define DELAYED_NACK_FRAME NACK_FRAME + DELAY_OFFSET
#define DELAYED_ACK_FRAME ACK_FRAME + DELAY_OFFSET
#define DELAYED_DATA_FRAME DATA_FRAME + DELAY_OFFSET


class CustomMessage_Base : public ::omnetpp::cPacket
{
protected:
Expand Down
4 changes: 2 additions & 2 deletions src/Makefile
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#
# OMNeT++/OMNEST Makefile for projcet
# OMNeT++/OMNEST Makefile for project
#
# This file was generated with the command:
# opp_makemake -f --deep
#

# Name of target to be created (-o option)
TARGET = projcet$(D)$(EXE_SUFFIX)
TARGET = project$(D)$(EXE_SUFFIX)
TARGET_DIR = .

# User interface (uncomment one) (-u option)
Expand Down
131 changes: 84 additions & 47 deletions src/node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ int Node::modifyPayload(vecBitset8 &payload) {
return bit;
}

bool Node::sendDataFrame(int lineNumber, int dataSequenceNumber, bool errorFree, float &delay) {
bool Node::sendDataFrame(int lineNumber, int dataSequenceNumber, bool errorFree, float &realTime, bool timeout=false) {

std::string line;

Expand All @@ -85,11 +85,12 @@ bool Node::sendDataFrame(int lineNumber, int dataSequenceNumber, bool errorFree,
vecBitset8 payload = ConvertStringToBits(line.substr(5), true);

CustomMessage_Base* msgToSend = new CustomMessage_Base();
msgToSend->setFrameType(DATA_FRAME);
msgToSend->setFrameType(DELAYED_DATA_FRAME);
msgToSend->setParity(CalculateChecksum(payload));
msgToSend->setDataSequence(dataSequenceNumber);

this->log("At time [%.3f], Node [%d], Introducing channel error with code = [%s]", simTime().dbl() + this->senderInfo.offsetFromRealTime, this->id, line.substr(0,4).c_str());
realTime = this->getRealTime(&this->senderInfo.delayedMessages);
this->log("At time [%.3f], Node [%d], Introducing channel error with code = [%s]", realTime, this->id, line.substr(0,4).c_str());

bool lost = false, duplicated = false;
int modifiedBit = -1;
Expand All @@ -112,30 +113,32 @@ bool Node::sendDataFrame(int lineNumber, int dataSequenceNumber, bool errorFree,
if (errorPrefix.test(0)) errorDelay = Info::errorDelay;
}

if (!timeout && errorFree) errorDelay += 0.001;
msgToSend->setPayload(payload);

float processingTime = (int)!r.readFromBuffer * Info::processingTime;
this->senderInfo.offsetFromRealTime += processingTime;
realTime = this->insertIntoDelayed(&this->senderInfo.delayedMessages, msgToSend, lost, processingTime, errorDelay);

scheduleAt(realTime, msgToSend);

// Log output
this->log("At time [%.3f], Node [%d] [sent] frame with seq_num = [%d] and payload = [%s] and trailer = [%s], Modified [%d], Lost [%s], Duplicate [%d], Delay [%.3f]",
simTime().dbl() + this->senderInfo.offsetFromRealTime, this->id, dataSequenceNumber, ConvertBitsToString(payload).c_str(),
realTime, this->id, dataSequenceNumber, ConvertBitsToString(payload).c_str(),
msgToSend->getParity().to_string().c_str(), modifiedBit, lost? "Yes": "No", (int)duplicated, errorDelay);

msgToSend->setPayload(payload);
delay = this->senderInfo.offsetFromRealTime + Info::transmissionDelay;

// If duplicated, send a duplicate message with the duplication delay.
if (duplicated) {
this->log("At time [%.3f], Node [%d] sent frame with seq_num = [%d] and payload = [%s] and trailer = [%s], Modified [%d], Lost [%s], Duplicate [%d], Delay [%.3f]",
simTime().dbl() + this->senderInfo.offsetFromRealTime, this->id, dataSequenceNumber, ConvertBitsToString(payload).c_str(),
CustomMessage_Base* duplicatedMsg = msgToSend->dup();
this->insertIntoDelayed(&this->senderInfo.delayedMessages, duplicatedMsg, lost, 0, Info::duplicationDelay + errorDelay);
this->log("At time [%.3f], Node [%d] [sent] frame with seq_num = [%d] and payload = [%s] and trailer = [%s], Modified [%d], Lost [%s], Duplicate [%d], Delay [%.3f]",
realTime, this->id, dataSequenceNumber, ConvertBitsToString(payload).c_str(),
msgToSend->getParity().to_string().c_str(), modifiedBit, lost? "Yes": "No", 2, errorDelay);
if (!lost) {
CustomMessage_Base* duplicatedMsg = msgToSend->dup();
sendDelayed(duplicatedMsg, delay + Info::duplicationDelay + errorDelay, "peer$o");
}
// if (lost) delete duplicatedMsg;
scheduleAt(realTime, duplicatedMsg);
}

if (lost) delete msgToSend;
else sendDelayed(msgToSend, delay + errorDelay, "peer$o");
//if (lost) delete msgToSend;
//else sendDelayed(msgToSend, delay + errorDelay, "peer$o");

return true;
}
Expand Down Expand Up @@ -196,8 +199,6 @@ Timer* Node::deleteTimers(int ackSequence, bool prev) {
// prev is true on ack, false on nack
void Node::advanceWindowAndSendFrames(int ackSequence, bool prev) {

this->senderInfo.offsetFromRealTime = 0;

int oldWStart = this->senderInfo.wStart;

// Updating wCurrent with the (n)ack sequence number.
Expand All @@ -215,7 +216,7 @@ void Node::advanceWindowAndSendFrames(int ackSequence, bool prev) {
this->deleteTimers(startingSeq, prev);

// Advancing wCurrent and sending new frames as much as our window can expand.
float delay;
float realTime = 0.0;
bool errorFree = prev ? false : true; // errorFree is true for the first message to be sent in case of nack.

int framesToSend = Info::windowSize - this->modulus(this->senderInfo.wCurrent - this->senderInfo.wStart);
Expand All @@ -233,23 +234,20 @@ void Node::advanceWindowAndSendFrames(int ackSequence, bool prev) {

int lineNumber = this->senderInfo.currentLineOffset + this->modulus(this->senderInfo.wCurrent - this->senderInfo.wStart);
int dataSequenceNumber = this->senderInfo.wCurrent;
if (!this->sendDataFrame(lineNumber, dataSequenceNumber, errorFree, delay)) {
this->log("At time [%.3f], Node [%d] found no more lines to send. Checking if we should terminate now", simTime().dbl() + this->senderInfo.offsetFromRealTime, this->id);
if (!this->sendDataFrame(lineNumber, dataSequenceNumber, errorFree, realTime)) {
this->log("At time [%.3f], Node [%d] found no more lines to send. Checking if we should terminate now", simTime().dbl(), this->id);
this->checkTermination();
this->log("At time [%.3f], Node [%d] is waiting for outstanding acks to terminate.", simTime().dbl() + this->senderInfo.offsetFromRealTime, this->id);
this->log("At time [%.3f], Node [%d] is waiting for outstanding acks to terminate.", simTime().dbl(), this->id);
return;
}

// Setting errorFree to false in case of nack.
// Increasing the delay by 0.001 as per the document.
if (errorFree) {
delay += 0.001;
errorFree = false;
}
if (errorFree) errorFree = false;

// Create a timer
Timer* timer = this->createTimer(dataSequenceNumber);
scheduleAt( simTime() + delay + Info::timeout, timer->msg);
scheduleAt( realTime + Info::timeout, timer->msg);
this->senderInfo.wCurrent = this->modulus(this->senderInfo.wCurrent + 1);
}

Expand All @@ -263,17 +261,27 @@ void Node::sender(CustomMessage_Base *msg) {
// within the window.
if (msg->isSelfMessage()) {

if (msg->getFrameType() == DELAYED_DATA_FRAME) {
DelayedMessage* m = GetElementFromLinkedList<DelayedMessage, CustomMessage_Base*>(&this->senderInfo.delayedMessages, msg);

if (!m->lost) {
msg->setFrameType(msg->getFrameType() - DELAY_OFFSET);
sendDelayed(msg, Info::transmissionDelay + m->extraDelay, "peer$o");
}
DeleteElementFromLinkedList<DelayedMessage, CustomMessage_Base*>(&this->senderInfo.delayedMessages, msg);
return;
}

int timedoutSequenceNumber = msg->getAckSequence();
if (timedoutSequenceNumber != -1) this->log("At time [%.3f], Node [%d] timeout event for frame with seq_num = [%d]", simTime().dbl() + this->senderInfo.offsetFromRealTime, this->id, timedoutSequenceNumber);
if (timedoutSequenceNumber != -1) this->log("At time [%.3f], Node [%d] timeout event for frame with seq_num = [%d]", simTime().dbl(), this->id, timedoutSequenceNumber);

// Cancel all timers starting from the timer that expired and onwards, i.e: all timers.
// Clarification: if a timer expired, it MUST have been the first timer in the window, by the very
// structure of the way acknowledgements are serially handled in the code.
if (this->senderInfo.timers) this->deleteTimers(this->senderInfo.timers->msg->getAckSequence(), false);

int dataSequenceNumber, lineNumber;
float delay; bool errorFree = false;
this->senderInfo.offsetFromRealTime = 0;
float realTime; bool errorFree = false;
for (int i = 0; i < Info::windowSize; i++) {

dataSequenceNumber = this->modulus(this->senderInfo.wStart + i);
Expand All @@ -283,11 +291,11 @@ void Node::sender(CustomMessage_Base *msg) {
// If this is the message that caused the timeout, then send it error-free.
if (dataSequenceNumber == timedoutSequenceNumber) errorFree = true;

if (!this->sendDataFrame(lineNumber, dataSequenceNumber, errorFree, delay))
if (!this->sendDataFrame(lineNumber, dataSequenceNumber, errorFree, realTime, true))
return this->checkTermination();

Timer* timer = this->createTimer(dataSequenceNumber);
scheduleAt( simTime() + delay + Info::timeout, timer->msg);
scheduleAt( realTime + Info::timeout, timer->msg);
}

senderInfo.wCurrent = this->modulus(senderInfo.wStart + Info::windowSize);
Expand Down Expand Up @@ -317,44 +325,73 @@ void Node::sender(CustomMessage_Base *msg) {

void Node::checkTermination() {
if (this->senderInfo.wStart == this->senderInfo.wCurrent) {
this->log("At time [%.3f], Node [%d] finished sending and receiving acks.", simTime().dbl() + this->senderInfo.offsetFromRealTime, this->id);
this->log("At time [%.3f], Node [%d] finished sending and receiving acks.", simTime().dbl(), this->id);
exit(0);
}
}

double Node::getRealTime(linkedList<DelayedMessage> *l) {

// Calculating the supposed delay in processing times
double realTime;
if (!l->end) realTime = simTime().dbl();
else realTime = simTime().dbl() >= l->end->realTime ? simTime().dbl() : l->end->realTime;

return realTime;
}

double Node::insertIntoDelayed(linkedList<DelayedMessage> *l, CustomMessage_Base* msg, bool lost, double processingTime, double extraDelay) {

// Calculating the supposed delay in processing times
double realTime = this->getRealTime(l) + processingTime;

InsertAtEndOfLinkedList<DelayedMessage, CustomMessage_Base*>(l);
l->end->N = msg;
l->end->simTime = simTime().dbl();
l->end->realTime = realTime;
l->end->extraDelay = extraDelay;
l->end->lost = lost;

return realTime;
}

void Node::sendAck(int ACK_TYPE, bool LP) {

bool lost = false;

CustomMessage_Base* ack = new CustomMessage_Base();
ack->setFrameType(ACK_TYPE);
ack->setFrameType(ACK_TYPE + DELAY_OFFSET);
ack->setAckSequence(this->receiverInfo.expectedFrameSequence);

if ((uniform(0,1) <= Info::ackLossProb) && LP) lost = true;
else {

this->receiverInfo.accumulatingProcessingTimes += Info::processingTime;
sendDelayed(ack, this->receiverInfo.accumulatingProcessingTimes + Info::transmissionDelay, "peer$o");
}
double realTime = this->insertIntoDelayed(&this->receiverInfo.delayedMessages, ack, lost, Info::processingTime);

this->log("At time [%.3f], Node [%d] sending [%s] with number [%d], loss [%s]", simTime().dbl() + this->receiverInfo.accumulatingProcessingTimes, this->id,
this->log("At time [%.3f], Node [%d] sending [%s] with number [%d], loss [%s]", realTime, this->id,
ACK_TYPE ? "ACK" : "NACK", ack->getAckSequence(), lost? "Yes" : "No");

if (lost) delete ack;
else scheduleAt(realTime, ack);

}

void Node::receiver(CustomMessage_Base *msg) {

// If not an in-order message, drop and send an ack with the sequence of the next expected frame sequence.
if (this->receiverInfo.lastAckSimTime != simTime().dbl()) {
this->receiverInfo.lastAckSimTime = simTime().dbl();
this->receiverInfo.accumulatingProcessingTimes = 0;
// If this is a delayed ack or nack
if (msg->isSelfMessage()) {
DelayedMessage* m = GetElementFromLinkedList<DelayedMessage, CustomMessage_Base*>(&this->receiverInfo.delayedMessages, msg);
if (!m->lost) {
msg->setFrameType(msg->getFrameType() - DELAY_OFFSET);
sendDelayed(msg, Info::transmissionDelay + m->extraDelay, "peer$o");
}
DeleteElementFromLinkedList<DelayedMessage, CustomMessage_Base*>(&this->receiverInfo.delayedMessages, msg);
return;
}

// If not an in-order message, drop and send an ack with the sequence of the next expected frame sequence.
if (msg->getDataSequence() != this->receiverInfo.expectedFrameSequence) {
this->log("At time [%.3f], Node [%d] dropped out-of-order frame with seq_num = [%d]. Expecting seq_num = [%d]", simTime().dbl() + this->receiverInfo.accumulatingProcessingTimes, this->id, msg->getDataSequence(), this->receiverInfo.expectedFrameSequence);
cancelAndDelete(msg);
this->log("At time [%.3f], Node [%d] dropped out-of-order frame with seq_num = [%d]. Expecting seq_num = [%d]", this->getRealTime(&this->receiverInfo.delayedMessages), this->id, msg->getDataSequence(), this->receiverInfo.expectedFrameSequence);
this->sendAck(ACK_FRAME, false);
cancelAndDelete(msg);
return;
}

Expand All @@ -363,7 +400,7 @@ void Node::receiver(CustomMessage_Base *msg) {
this->receiverInfo.expectedFrameSequence += valid ? 1 : 0;
this->receiverInfo.expectedFrameSequence = this->modulus(this->receiverInfo.expectedFrameSequence);

if (valid) this->log("Uploading payload = \"%s\" and seq_num = [%d] to the network layer", ConvertBitsToString(*(msg->getPayload()), true).c_str(), msg->getDataSequence());
if (valid) this->log("Uploading payload = \"%s\" At time [%.3f] and seq_num = [%d] to the network layer", ConvertBitsToString(*(msg->getPayload()), true).c_str(), this->getRealTime(&this->receiverInfo.delayedMessages), msg->getDataSequence());
this->sendAck(int(valid));

cancelAndDelete(msg);
Expand Down
57 changes: 16 additions & 41 deletions src/node.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ struct Timer {
Timer* prev = NULL;
};

struct DelayedMessage {
CustomMessage_Base* N = NULL;
double simTime = 0;
double realTime = 0;
DelayedMessage* next = NULL;
DelayedMessage* prev = NULL;
bool lost = false;
double extraDelay = 0;
};

struct SenderInfo {

// Sequence number of the start of the window.
Expand All @@ -48,53 +58,15 @@ struct SenderInfo {
// This is a vector of self-messages, acting as timeouts.
Timer* timers = NULL;


// Ad-hoc solution coming your way..

// Used to calculate the right time to schedule messages.
// Specifically, consider the following example, with the following parameters:
// simTime() = 0, Info::windowSize = 3, Info::processingTime = 0.5

// The sender sends:
// First frame:
// Sent at = simTime() + Info::processingTime = 0 + 0.5 = 0.5
// Given that the sender doesn't either: busy-wait, or schedules a self-message,
// it will proceed to the next frame

// Second frame:
// Sent at = simTime() + Info::processingTime = 0 + 0.5 = 0.5 => Not accurate

// Which is the same as the time for the first frame, because it didn't
// account for the time supposedly spent processing the frame.
// We fix this by accumulating the processing times in offsetFromRealTime
// so the way it goes:

// First frame:
// Sent at = simTime() + offsetFromRealTime + Info::processingTime = 0 + 0 + 0.5 = 0.5
// offsetFromRealTime = offsetFromRealTime + Info::processingTime = 0 + 0.5 = 0.5

// Second frame:
// Sent at = simTime() + offsetFromRealTime + Info::processingTime = 0 + 0.5 + 0.5 = 1 => Accurate
// offsetFromRealTime = offsetFromRealTime + Info::processingTime = 0.5 + 0.5 = 1

// After sending all the frames our window can accomodate, we zero out this offsetFromRealTime.
float offsetFromRealTime = 0;
linkedList<DelayedMessage> delayedMessages;
};

struct ReceiverInfo {

// The next expected frame sequence.
int expectedFrameSequence = 0;

// This serves a roughly similar role to senderInfo.offsetFromRealTime described above.
// Specifically in the case when the senders send multiple frames at the same time t.
// This happens when those frames are being resent, either due to a timeout or an NACK,
// thus no processing time at the sender's end.

// All the frames would thus be received at the same time t`. If we didn't handle it,
// all the ack frames would be sent at (t` + 0.5), which is not logical.
float lastAckSimTime = -1.0;
float accumulatingProcessingTimes = 0.0;
linkedList<DelayedMessage> delayedMessages;
};

class Node : public cSimpleModule
Expand Down Expand Up @@ -122,7 +94,7 @@ class Node : public cSimpleModule
// calculates the delay and returns it in the parameter delay.
// Returns true if there was more lines to read from the file, and a frame was successfully sent,
// false otherwise.
virtual bool sendDataFrame(int lineNumber, int dataSequenceNumber, bool errorFree, float &delay);
virtual bool sendDataFrame(int lineNumber, int dataSequenceNumber, bool errorFree, float &realTime, bool timeout);

// Sends an ACK/NACK, applying the ack loss probability if LP = true
//
Expand Down Expand Up @@ -152,6 +124,9 @@ class Node : public cSimpleModule
// Logs the given printf-style string and format to omnte++ simulation stdout,
// and the log file `output.txt`
template<typename... Args> void log(const char * f, Args... args);

virtual double insertIntoDelayed(linkedList<DelayedMessage> *l, CustomMessage_Base* msg, bool lost, double processingTime, double extraDelay=0);
virtual double getRealTime(linkedList<DelayedMessage> *l);
private:

// The id of the node.
Expand Down
Loading

0 comments on commit 23b55f3

Please sign in to comment.