Skip to content

Commit

Permalink
Refactor System functionality into SystemManager (#1340)
Browse files Browse the repository at this point in the history
SimulationRunner is getting long/crowded, so I'm refactoring out a bit of the System-oriented functionality into it's own Manager class as a peer to things like EventsManager and LevelManager. This should make it a bit easier to test, as well as reduce the clutter in SimulationRunner

Signed-off-by: Michael Carroll <michael@openrobotics.org>
  • Loading branch information
mjcarroll authored Feb 25, 2022
1 parent ac989f8 commit d03a77e
Show file tree
Hide file tree
Showing 8 changed files with 736 additions and 225 deletions.
2 changes: 2 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ set (sources
ServerPrivate.cc
SimulationRunner.cc
SystemLoader.cc
SystemManager.cc
TestFixture.cc
Util.cc
World.cc
Expand All @@ -86,6 +87,7 @@ set (gtest_sources
Server_TEST.cc
SimulationRunner_TEST.cc
SystemLoader_TEST.cc
SystemManager_TEST.cc
System_TEST.cc
TestFixture_TEST.cc
Util_TEST.cc
Expand Down
149 changes: 46 additions & 103 deletions src/SimulationRunner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,6 @@ SimulationRunner::SimulationRunner(const sdf::World *_world,
// Keep world name
this->worldName = _world->Name();

// Keep system loader so plugins can be loaded at runtime
this->systemLoader = _systemLoader;

// Get the physics profile
// TODO(luca): remove duplicated logic in SdfEntityCreator and LevelManager
auto physics = _world->PhysicsByIndex(0);
Expand Down Expand Up @@ -115,6 +112,10 @@ SimulationRunner::SimulationRunner(const sdf::World *_world,
static_cast<int>(this->stepSize.count() / this->desiredRtf));
}

// Create the system manager
this->systemMgr = std::make_unique<SystemManager>(_systemLoader,
&this->entityCompMgr, &this->eventMgr);

this->pauseConn = this->eventMgr.Connect<events::Pause>(
std::bind(&SimulationRunner::SetPaused, this, std::placeholders::_1));

Expand Down Expand Up @@ -173,7 +174,7 @@ SimulationRunner::SimulationRunner(const sdf::World *_world,

// If we have reached this point and no systems have been loaded, then load
// a default set of systems.
if (this->systems.empty() && this->pendingSystems.empty())
if (this->systemMgr->TotalCount() == 0)
{
ignmsg << "No systems loaded from SDF, loading defaults" << std::endl;
bool isPlayback = !this->serverConfig.LogPlaybackPath().empty();
Expand Down Expand Up @@ -455,7 +456,9 @@ void SimulationRunner::AddSystem(const SystemPluginPtr &_system,
std::optional<Entity> _entity,
std::optional<std::shared_ptr<const sdf::Element>> _sdf)
{
this->AddSystemImpl(SystemInternal(_system), _entity, _sdf);
auto entity = _entity.value_or(worldEntity(this->entityCompMgr));
auto sdf = _sdf.value_or(this->sdfWorld->Element());
this->systemMgr->AddSystem(_system, entity, sdf);
}

//////////////////////////////////////////////////
Expand All @@ -464,104 +467,57 @@ void SimulationRunner::AddSystem(
std::optional<Entity> _entity,
std::optional<std::shared_ptr<const sdf::Element>> _sdf)
{
this->AddSystemImpl(SystemInternal(_system), _entity, _sdf);
}

//////////////////////////////////////////////////
void SimulationRunner::AddSystemImpl(
SystemInternal _system,
std::optional<Entity> _entity,
std::optional<std::shared_ptr<const sdf::Element>> _sdf)
{
// Call configure
if (_system.configure)
{
// Default to world entity and SDF
auto entity = _entity.has_value() ? _entity.value()
: worldEntity(this->entityCompMgr);
auto sdf = _sdf.has_value() ? _sdf.value() : this->sdfWorld->Element();

_system.configure->Configure(
entity, sdf,
this->entityCompMgr,
this->eventMgr);
}

// Update callbacks will be handled later, add to queue
std::lock_guard<std::mutex> lock(this->pendingSystemsMutex);
this->pendingSystems.push_back(_system);
auto entity = _entity.value_or(worldEntity(this->entityCompMgr));
auto sdf = _sdf.value_or(this->sdfWorld->Element());
this->systemMgr->AddSystem(_system, entity, sdf);
}

/////////////////////////////////////////////////
void SimulationRunner::AddSystemToRunner(SystemInternal _system)
void SimulationRunner::ProcessSystemQueue()
{
this->systems.push_back(_system);
auto pending = this->systemMgr->PendingCount();

if (_system.preupdate)
this->systemsPreupdate.push_back(_system.preupdate);
if (0 == pending)
return;

if (_system.update)
this->systemsUpdate.push_back(_system.update);
// If additional systems are to be added, stop the existing threads.
this->StopWorkerThreads();

if (_system.postupdate)
this->systemsPostupdate.push_back(_system.postupdate);
}
this->systemMgr->ActivatePendingSystems();

/////////////////////////////////////////////////
void SimulationRunner::ProcessSystemQueue()
{
std::lock_guard<std::mutex> lock(this->pendingSystemsMutex);
auto pending = this->pendingSystems.size();

if (pending > 0)
{
// If additional systems are to be added, stop the existing threads.
this->StopWorkerThreads();
}
auto threadCount = this->systemMgr->SystemsPostUpdate().size() + 1u;

for (const auto &system : this->pendingSystems)
{
this->AddSystemToRunner(system);
}
this->pendingSystems.clear();
igndbg << "Creating PostUpdate worker threads: "
<< threadCount << std::endl;

// If additional systems were added, recreate the worker threads.
if (pending > 0)
{
igndbg << "Creating PostUpdate worker threads: "
<< this->systemsPostupdate.size() + 1 << std::endl;
this->postUpdateStartBarrier = std::make_unique<Barrier>(threadCount);
this->postUpdateStopBarrier = std::make_unique<Barrier>(threadCount);

this->postUpdateStartBarrier =
std::make_unique<Barrier>(this->systemsPostupdate.size() + 1u);
this->postUpdateStopBarrier =
std::make_unique<Barrier>(this->systemsPostupdate.size() + 1u);
this->postUpdateThreadsRunning = true;
int id = 0;

this->postUpdateThreadsRunning = true;
int id = 0;
for (auto &system : this->systemMgr->SystemsPostUpdate())
{
igndbg << "Creating postupdate worker thread (" << id << ")" << std::endl;

for (auto &system : this->systemsPostupdate)
this->postUpdateThreads.push_back(std::thread([&, id]()
{
igndbg << "Creating postupdate worker thread (" << id << ")" << std::endl;

this->postUpdateThreads.push_back(std::thread([&, id]()
std::stringstream ss;
ss << "PostUpdateThread: " << id;
IGN_PROFILE_THREAD_NAME(ss.str().c_str());
while (this->postUpdateThreadsRunning)
{
std::stringstream ss;
ss << "PostUpdateThread: " << id;
IGN_PROFILE_THREAD_NAME(ss.str().c_str());
while (this->postUpdateThreadsRunning)
this->postUpdateStartBarrier->Wait();
if (this->postUpdateThreadsRunning)
{
this->postUpdateStartBarrier->Wait();
if (this->postUpdateThreadsRunning)
{
system->PostUpdate(this->currentInfo, this->entityCompMgr);
}
this->postUpdateStopBarrier->Wait();
system->PostUpdate(this->currentInfo, this->entityCompMgr);
}
igndbg << "Exiting postupdate worker thread ("
<< id << ")" << std::endl;
}));
id++;
}
this->postUpdateStopBarrier->Wait();
}
igndbg << "Exiting postupdate worker thread ("
<< id << ")" << std::endl;
}));
id++;
}
}

Expand All @@ -577,13 +533,13 @@ void SimulationRunner::UpdateSystems()

{
IGN_PROFILE("PreUpdate");
for (auto& system : this->systemsPreupdate)
for (auto& system : this->systemMgr->SystemsPreUpdate())
system->PreUpdate(this->currentInfo, this->entityCompMgr);
}

{
IGN_PROFILE("Update");
for (auto& system : this->systemsUpdate)
for (auto& system : this->systemMgr->SystemsUpdate())
system->Update(this->currentInfo, this->entityCompMgr);
}

Expand Down Expand Up @@ -903,19 +859,7 @@ void SimulationRunner::LoadPlugin(const Entity _entity,
const std::string &_name,
const sdf::ElementPtr &_sdf)
{
std::optional<SystemPluginPtr> system;
{
std::lock_guard<std::mutex> lock(this->systemLoaderMutex);
system = this->systemLoader->LoadPlugin(_fname, _name, _sdf);
}

// System correctly loaded from library
if (system)
{
this->AddSystem(system.value(), _entity, _sdf);
igndbg << "Loaded system [" << _name
<< "] for entity [" << _entity << "]" << std::endl;
}
this->systemMgr->LoadPlugin(_entity, _fname, _name, _sdf);
}

//////////////////////////////////////////////////
Expand Down Expand Up @@ -1085,8 +1029,7 @@ size_t SimulationRunner::EntityCount() const
/////////////////////////////////////////////////
size_t SimulationRunner::SystemCount() const
{
std::lock_guard<std::mutex> lock(this->pendingSystemsMutex);
return this->systems.size() + this->pendingSystems.size();
return this->systemMgr->TotalCount();
}

/////////////////////////////////////////////////
Expand Down
Loading

0 comments on commit d03a77e

Please sign in to comment.