Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

demos: extend benchmark demo by can and ethernet #164

Merged
merged 11 commits into from
Jan 13, 2025
248 changes: 206 additions & 42 deletions Demos/Benchmark/BenchmarkDemo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */
using namespace SilKit::Services::Orchestration;
using namespace SilKit::Config;
using namespace SilKit::Services::PubSub;
using namespace SilKit::Services::Ethernet;
using namespace SilKit::Services::Can;
using namespace std::chrono_literals;

std::chrono::milliseconds stepSize = 1ms;

std::ostream& operator<<(std::ostream& out, std::chrono::nanoseconds timestamp)
{
const auto seconds = std::chrono::duration_cast<std::chrono::duration<double, std::ratio<1, 1>>>(timestamp);
Expand All @@ -70,22 +70,60 @@ void PrintUsage(const std::string& executableName)
<< std::endl
<< "\t--number-participants\tSets the number of simulation participants to NUM. Default: 2" << std::endl
<< "\t--number-simulation-runs\tSets the number of simulation runs to perform to NUM. Default: 4" << std::endl
<< "\t--simulation-step-size\tSets the simulation step size (virtual time) to MILLISECONDS. Default: 1ms"
<< std::endl
<< "\t--simulation-duration\tSets the simulation duration (virtual time) to SECONDS. Default: 1s" << std::endl
<< "\t--service\tSets the service type (pubsub, can or ethernet). Default: pubsub" << std::endl
<< "\t--configuration\tPath and filename of the participant configuration YAML or JSON file. Default: empty"
<< std::endl
<< "\t--write-csv\tPath and filename of csv file with benchmark results. Default: empty" << std::endl;
}

enum class ServiceType
{
PubSub,
Can,
Ethernet
};

std::ostream& operator<<(std::ostream& outStream, const ServiceType& service)
{
switch (service)
{
case ServiceType::PubSub:
outStream << "PubSub";
break;
case ServiceType::Can:
outStream << "CAN";
break;
case ServiceType::Ethernet:
outStream << "Ethernet";
break;
default:
outStream << "Invalid Service Type";
}
return outStream;
}

std::string to_string(const ServiceType& service)
{
std::stringstream outStream;
outStream << service;
return outStream.str();
}

struct BenchmarkConfig
{
uint32_t numberOfSimulationRuns = 4;
std::chrono::seconds simulationDuration = 1s;
std::chrono::milliseconds simulationStepSize = 1ms;
uint32_t numberOfParticipants = 2;
uint32_t messageCount = 50;
uint32_t messageSizeInBytes = 1000;
std::string registryUri = "silkit://localhost:0";
std::string silKitConfigPath = "";
std::string writeCsv = "";
ServiceType service = ServiceType::PubSub;
};

bool Parse(int argc, char** argv, BenchmarkConfig& config)
Expand All @@ -96,6 +134,26 @@ bool Parse(int argc, char** argv, BenchmarkConfig& config)

auto asNum = [](const auto& str) { return static_cast<uint32_t>(std::stoul(str)); };
auto asStr = [](auto& a) { return std::string{a}; };
auto asServiceType = [](const auto& a) {
auto arg = std::string{a};

if (arg == "pubsub")
{
return ServiceType::PubSub;
}
else if (arg == "ethernet")
{
return ServiceType::Ethernet;
}
else if (arg == "can")
{
return ServiceType::Can;
}
else
{
throw std::runtime_error{"Invalid argument \"" + arg + "\"!"};
}
};

// test and remove the flag from args, returns true if flag was present
auto consumeFlag = [&args](const auto& namedOption) {
Expand Down Expand Up @@ -159,7 +217,9 @@ bool Parse(int argc, char** argv, BenchmarkConfig& config)
parseOptional("--message-count", config.messageCount, asNum);
parseOptional("--number-participants", config.numberOfParticipants, asNum);
parseOptional("--number-simulation-runs", config.numberOfSimulationRuns, asNum);
parseOptional("--simulation-step-size", config.simulationStepSize, asNum);
parseOptional("--simulation-duration", config.simulationDuration, asNum);
parseOptional("--service", config.service, asServiceType);
parseOptional("--configuration", config.silKitConfigPath, asStr);
parseOptional("--write-csv", config.writeCsv, asStr);

Expand Down Expand Up @@ -238,6 +298,14 @@ bool Validate(const BenchmarkConfig& config)
return false;
}

if (config.simulationDuration < config.simulationStepSize)
{
std::cout << "Invalid argument: The simulation duration (virtual time) must be greater or equal the simulation "
"step size."
<< std::endl;
return false;
}

if (config.numberOfSimulationRuns < 1)
{
std::cout << "Invalid argument: The number of simulations runs must be at least 1." << std::endl;
Expand Down Expand Up @@ -265,18 +333,55 @@ uint32_t relateParticipant(uint32_t idx, uint32_t numberOfParticipants)
}
}

void CheckAndTruncateMessageSize(BenchmarkConfig& benchmark)
{
const uint32_t maxSizeEth = 1500;
const uint32_t maxSizeCan = 8;

if (benchmark.service == ServiceType::Ethernet && benchmark.messageSizeInBytes > maxSizeEth)
{
benchmark.messageSizeInBytes = maxSizeEth;
std::cout << std::endl
<< "CAUTION: Ethernet frame size is truncated to " << benchmark.messageSizeInBytes << " Byte."
<< std::endl;
}
else if (benchmark.service == ServiceType::Can && benchmark.messageSizeInBytes > maxSizeCan)
{
benchmark.messageSizeInBytes = maxSizeCan;
std::cout << std::endl
<< "CAUTION: CAN frame payload size is truncated to " << benchmark.messageSizeInBytes << " Byte."
<< std::endl;
}
}

void PublishMessages(IDataPublisher* publisher, uint32_t messageCount, uint32_t messageSizeInBytes)
{
std::vector<uint8_t> data(messageSizeInBytes, '*');
for (uint32_t i = 0; i < messageCount; i++)
{
std::vector<uint8_t> data(messageSizeInBytes, '*');
publisher->Publish(std::move(data));
{
publisher->Publish(SilKit::Util::ToSpan(data));
}
}

void ReceiveMessage(IDataSubscriber* /*subscriber*/, const std::vector<uint8_t>& /*data*/)
void SendEthernetFrames(IEthernetController* ethernetController, uint32_t messageCount, uint32_t messageSizeInBytes)
{
// do nothing
std::vector<uint8_t> frameData(messageSizeInBytes, '*');
for (uint32_t i = 0; i < messageCount; i++)
{
ethernetController->SendFrame(EthernetFrame{SilKit::Util::ToSpan(frameData)});
}
}

void SendCanFrames(ICanController* canController, uint32_t messageCount, uint32_t messageSizeInBytes)
{
std::vector<uint8_t> frameData(messageSizeInBytes, '*');
for (uint32_t i = 0; i < messageCount; i++)
{
CanFrame canFrame{};
canFrame.dataField = SilKit::Util::ToSpan(frameData);

canController->SendFrame(std::move(canFrame));
}
}

void ParticipantsThread(std::shared_ptr<SilKit::Config::IParticipantConfiguration> config,
Expand All @@ -287,19 +392,55 @@ void ParticipantsThread(std::shared_ptr<SilKit::Config::IParticipantConfiguratio
auto* lifecycleService = participant->CreateLifecycleService({OperationMode::Coordinated});
auto* timeSyncService = lifecycleService->CreateTimeSyncService();

const std::string topicPub = "Topic" + std::to_string(participantIndex);
const std::string topicSub =
"Topic" + std::to_string(relateParticipant(participantIndex, benchmark.numberOfParticipants));
SilKit::Services::PubSub::PubSubSpec dataSpec{topicPub, {}};
SilKit::Services::PubSub::PubSubSpec matchingDataSpec{topicSub, {}};
auto publisher = participant->CreateDataPublisher("PubCtrl1", dataSpec, 0);
participant->CreateDataSubscriber("SubCtrl1", matchingDataSpec, [&messageCounter](auto*, auto&) {
// this is handled in I/O thread, so no data races on counter.
messageCounter++;
});
IDataPublisher* publisher = nullptr;
IEthernetController* ethernetController = nullptr;
ICanController* canController = nullptr;

switch (benchmark.service)
{
case ServiceType::PubSub:
{
const std::string topicPub = "Topic" + std::to_string(participantIndex);
const std::string topicSub =
"Topic" + std::to_string(relateParticipant(participantIndex, benchmark.numberOfParticipants));
SilKit::Services::PubSub::PubSubSpec dataSpec{topicPub, {}};
SilKit::Services::PubSub::PubSubSpec matchingDataSpec{topicSub, {}};
publisher = participant->CreateDataPublisher("PubCtrl1", dataSpec, 0);
participant->CreateDataSubscriber("SubCtrl1", matchingDataSpec, [&messageCounter](auto*, auto&) {
// this is handled in I/O thread, so no data races on counter.
messageCounter++;
});
VJonasHolley marked this conversation as resolved.
Show resolved Hide resolved
break;
}
case ServiceType::Ethernet:
{
ethernetController = participant->CreateEthernetController("Eth1", "Eth1");

ethernetController->AddFrameHandler(
[&messageCounter](IEthernetController* /*controller*/, const EthernetFrameEvent& /*frameEvent*/) {
messageCounter++;
});

lifecycleService->SetCommunicationReadyHandler([ethernetController]() { ethernetController->Activate(); });
VJonasHolley marked this conversation as resolved.
Show resolved Hide resolved
break;
}
case ServiceType::Can:
{
canController = participant->CreateCanController("CAN1", "CAN1");

canController->AddFrameHandler(
[&messageCounter](ICanController* /*ctrl*/, const CanFrameEvent& /*frameEvent*/) { messageCounter++; });

lifecycleService->SetCommunicationReadyHandler([canController]() {
canController->SetBaudRate(10'000, 1'000'000, 2'000'000);
canController->Start();
});
VJonasHolley marked this conversation as resolved.
Show resolved Hide resolved
break;
}
}

const auto isVerbose = participantIndex == 0;
timeSyncService->SetSimulationStepHandler([=, &publisher](std::chrono::nanoseconds now, const auto /*duration*/) {
timeSyncService->SetSimulationStepHandler([=](std::chrono::nanoseconds now, const auto /*duration*/) {
if (now > benchmark.simulationDuration)
{
lifecycleService->Stop("Simulation done");
Expand All @@ -311,13 +452,31 @@ void ParticipantsThread(std::shared_ptr<SilKit::Config::IParticipantConfiguratio
std::chrono::duration_cast<std::chrono::nanoseconds>(benchmark.simulationDuration);
const auto durationOfOneSimulationPercentile = simulationDurationInNs / 20;

if (now % durationOfOneSimulationPercentile < stepSize)
if (now % durationOfOneSimulationPercentile < benchmark.simulationStepSize)
{
std::cout << ".";
}
}
PublishMessages(publisher, benchmark.messageCount, benchmark.messageSizeInBytes);
}, stepSize);

switch (benchmark.service)
{
case ServiceType::PubSub:
{
PublishMessages(publisher, benchmark.messageCount, benchmark.messageSizeInBytes);
VJonasHolley marked this conversation as resolved.
Show resolved Hide resolved
break;
}
case ServiceType::Ethernet:
{
SendEthernetFrames(ethernetController, benchmark.messageCount, benchmark.messageSizeInBytes);
VJonasHolley marked this conversation as resolved.
Show resolved Hide resolved
break;
}
case ServiceType::Can:
{
SendCanFrames(canController, benchmark.messageCount, benchmark.messageSizeInBytes);
VJonasHolley marked this conversation as resolved.
Show resolved Hide resolved
break;
}
}
}, benchmark.simulationStepSize);

auto lifecycleFuture = lifecycleService->StartLifecycle();
lifecycleFuture.get();
Expand All @@ -335,25 +494,27 @@ void PrintParameters(BenchmarkConfig benchmark)
std::cout << std::endl
<< "This benchmark demo produces timings of a configurable simulation setup." << std::endl
<< "<N> participants exchange <M> messages of <B> bytes per simulation step" << std::endl
<< "with a fixed period of 1ms and run for <S> seconds (virtual time)." << std::endl
<< "with a period of <T> ms and run for <S> seconds (virtual time)." << std::endl
<< "This simulation run is repeated <K> times and averages over all runs are calculated." << std::endl
<< "The demo uses PubSub controllers with the same topic for the message exchange," << std::endl
<< "so each participant broadcasts the messages to all other participants." << std::endl
<< "The demo can be run using PubSub, CAN or Ethernet controllers for the message exchange," << std::endl
<< "where each participant broadcasts the messages to all other participants." << std::endl
<< std::endl
<< "Running simulations with the following parameters:" << std::endl
<< std::endl
<< std::left << std::setw(38) << "- Number of simulation runs: " << benchmark.numberOfSimulationRuns
<< std::left << std::setw(39) << "- Communication service type: " << benchmark.service << std::endl
<< std::left << std::setw(39) << "- Number of simulation runs: " << benchmark.numberOfSimulationRuns
<< std::endl
<< std::left << std::setw(38) << "- Simulation duration (virtual time): " << benchmark.simulationDuration
<< std::left << std::setw(39) << "- Simulation step size (virtual time): " << benchmark.simulationStepSize
<< std::endl
<< std::left << std::setw(38) << "- Number of participants: " << benchmark.numberOfParticipants
<< std::left << std::setw(39) << "- Simulation duration (virtual time): " << benchmark.simulationDuration
<< std::endl
<< std::left << std::setw(38) << "- Messages per simulation step (1ms): " << benchmark.messageCount
<< std::left << std::setw(39) << "- Number of participants: " << benchmark.numberOfParticipants
<< std::endl
<< std::left << std::setw(38) << "- Message size (bytes): " << benchmark.messageSizeInBytes << std::endl
<< std::left << std::setw(38) << "- Registry URI: " << benchmark.registryUri << std::endl
<< std::left << std::setw(38) << "- Configuration: " << benchmark.silKitConfigPath << std::endl
<< std::left << std::setw(38) << "- CSV output: " << benchmark.writeCsv << std::endl;
<< std::left << std::setw(39) << "- Messages per simulation step: " << benchmark.messageCount << std::endl
<< std::left << std::setw(39) << "- Message size (bytes): " << benchmark.messageSizeInBytes << std::endl
<< std::left << std::setw(39) << "- Registry URI: " << benchmark.registryUri << std::endl
<< std::left << std::setw(39) << "- Configuration: " << benchmark.silKitConfigPath << std::endl
<< std::left << std::setw(39) << "- CSV output: " << benchmark.writeCsv << std::endl;
}

template <typename T>
Expand Down Expand Up @@ -385,6 +546,8 @@ int main(int argc, char** argv)
return -1;
}

CheckAndTruncateMessageSize(benchmark);

PrintParameters(benchmark);

try
Expand Down Expand Up @@ -448,7 +611,7 @@ int main(int argc, char** argv)
measuredRealDurations.emplace_back(endTimestamp - startTimestamp);
auto totalCount = std::accumulate(counters.begin(), counters.end(), size_t{0});
messageCounts.emplace_back(totalCount);
std::cout << " " << measuredRealDurations.back() << std::endl;
std::cout << " " << measuredRealDurations.back() << std::endl;
}

std::vector<double> measuredRealDurationsSeconds(measuredRealDurations.size());
Expand Down Expand Up @@ -497,20 +660,20 @@ int main(int argc, char** argv)
std::ostringstream averageMsgRateWithUnit;
averageMsgRateWithUnit << static_cast<int>(averageMsgRate.first) << " 1/s";

std::cout << std::setw(38) << "- Realtime duration (runtime): " << std::setw(13)
std::cout << std::setw(39) << "- Realtime duration (runtime): " << std::setw(13)
<< averageDurationWithUnit.str() << " +/- " << averageDuration.second << "s" << std::endl

<< std::setw(38) << "- Speedup (virtual time/runtime): " << std::setw(13) << averageSpeedup.first
<< std::setw(39) << "- Speedup (virtual time/runtime): " << std::setw(13) << averageSpeedup.first
<< " +/- " << averageSpeedup.second << std::endl

<< std::setw(38) << "- Throughput (data size/runtime): " << std::setw(13)
<< std::setw(39) << "- Throughput (data size/runtime): " << std::setw(13)
<< averageThroughputWithUnit.str() << " +/- " << averageThroughput.second << " MiB/s" << std::endl

<< std::setw(38) << "- Message rate (count/runtime): " << std::setw(13)
<< std::setw(39) << "- Message rate (count/runtime): " << std::setw(13)
<< averageMsgRateWithUnit.str() << " +/- " << static_cast<int>(averageMsgRate.second) << " 1/s"
<< std::endl

<< std::left << std::setw(38) << "- Total number of messages: " << averageNumberMessages << std::endl
<< std::left << std::setw(39) << "- Total number of messages: " << averageNumberMessages << std::endl

<< std::endl
<< std::endl;
Expand All @@ -520,6 +683,7 @@ int main(int argc, char** argv)
std::stringstream csvHeader;
csvHeader << "# SilKitBenchmarkDemo, SIL Kit Version " << SilKit::Version::String();
const auto csvColumns = "numRuns; participants; messageSize; messageCount; duration(virtual time, s); "
"stepSize(virtual time, ms); "
"numberMessageSent; runtime(s); runtime_err; throughput(MiB/s); "
"throughput_err; speedup; speedup_err; messageRate(1/s); messageRate_err";
std::fstream csvFile;
Expand Down Expand Up @@ -550,10 +714,10 @@ int main(int argc, char** argv)
csvFile.seekp(0, std::ios_base::end);
csvFile << benchmark.numberOfSimulationRuns << ";" << benchmark.numberOfParticipants << ";"
<< benchmark.messageSizeInBytes << ";" << benchmark.messageCount << ";"
<< benchmark.simulationDuration.count() << ";" << averageNumberMessages << ";"
<< averageDuration.first << ";" << averageDuration.second << ";" << averageThroughput.first
<< ";" << averageThroughput.second << ";" << averageSpeedup.first << ";"
<< averageSpeedup.second << ";" << averageMsgRate.first << ";" << averageMsgRate.second
<< benchmark.simulationDuration.count() << ";" << benchmark.simulationStepSize.count() << ";"
<< averageNumberMessages << ";" << averageDuration.first << ";" << averageDuration.second << ";"
<< averageThroughput.first << ";" << averageThroughput.second << ";" << averageSpeedup.first
<< ";" << averageSpeedup.second << ";" << averageMsgRate.first << ";" << averageMsgRate.second
<< std::endl;
}
csvFile.close();
Expand Down
Loading