Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor System functionality into SystemManager #1340

Merged
merged 6 commits into from
Feb 25, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Move System interaction to SystemManager
Signed-off-by: Michael Carroll <michael@openrobotics.org>
  • Loading branch information
mjcarroll committed Feb 25, 2022
commit f885cf5f02ad4511c04945f9b790f1edc16dee86
2 changes: 2 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ set (sources
ServerPrivate.cc
SimulationRunner.cc
SystemLoader.cc
SystemManager.cc
TestFixture.cc
Util.cc
World.cc
Expand All @@ -86,6 +87,7 @@ set (gtest_sources
Server_TEST.cc
SimulationRunner_TEST.cc
SystemLoader_TEST.cc
SystemManager_TEST.cc
System_TEST.cc
TestFixture_TEST.cc
Util_TEST.cc
Expand Down
154 changes: 52 additions & 102 deletions src/SimulationRunner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,6 @@ SimulationRunner::SimulationRunner(const sdf::World *_world,
// Keep world name
this->worldName = _world->Name();

// Keep system loader so plugins can be loaded at runtime
this->systemLoader = _systemLoader;

// Get the physics profile
// TODO(luca): remove duplicated logic in SdfEntityCreator and LevelManager
auto physics = _world->PhysicsByIndex(0);
Expand Down Expand Up @@ -115,6 +112,9 @@ SimulationRunner::SimulationRunner(const sdf::World *_world,
static_cast<int>(this->stepSize.count() / this->desiredRtf));
}

// Create the system manager
this->systemMgr = std::make_unique<SystemManager>(_systemLoader);

this->pauseConn = this->eventMgr.Connect<events::Pause>(
std::bind(&SimulationRunner::SetPaused, this, std::placeholders::_1));

Expand Down Expand Up @@ -173,7 +173,7 @@ SimulationRunner::SimulationRunner(const sdf::World *_world,

// If we have reached this point and no systems have been loaded, then load
// a default set of systems.
if (this->systems.empty() && this->pendingSystems.empty())
if (this->systemMgr->TotalCount() == 0)
{
ignmsg << "No systems loaded from SDF, loading defaults" << std::endl;
bool isPlayback = !this->serverConfig.LogPlaybackPath().empty();
Expand Down Expand Up @@ -455,7 +455,12 @@ void SimulationRunner::AddSystem(const SystemPluginPtr &_system,
std::optional<Entity> _entity,
std::optional<std::shared_ptr<const sdf::Element>> _sdf)
{
this->AddSystemImpl(SystemInternal(_system), _entity, _sdf);
auto entity = _entity.has_value() ? _entity.value()
: worldEntity(this->entityCompMgr);
mjcarroll marked this conversation as resolved.
Show resolved Hide resolved
auto sdf = _sdf.has_value() ? _sdf.value() : this->sdfWorld->Element();
mjcarroll marked this conversation as resolved.
Show resolved Hide resolved

this->systemMgr->AddSystem(_system, entity, sdf);
this->systemMgr->ConfigurePendingSystems(this->entityCompMgr, this->eventMgr);
}

//////////////////////////////////////////////////
Expand All @@ -464,104 +469,60 @@ void SimulationRunner::AddSystem(
std::optional<Entity> _entity,
std::optional<std::shared_ptr<const sdf::Element>> _sdf)
{
this->AddSystemImpl(SystemInternal(_system), _entity, _sdf);
}
auto entity = _entity.has_value() ? _entity.value()
: worldEntity(this->entityCompMgr);
auto sdf = _sdf.has_value() ? _sdf.value() : this->sdfWorld->Element();

//////////////////////////////////////////////////
void SimulationRunner::AddSystemImpl(
SystemInternal _system,
std::optional<Entity> _entity,
std::optional<std::shared_ptr<const sdf::Element>> _sdf)
{
// Call configure
if (_system.configure)
{
// Default to world entity and SDF
auto entity = _entity.has_value() ? _entity.value()
: worldEntity(this->entityCompMgr);
auto sdf = _sdf.has_value() ? _sdf.value() : this->sdfWorld->Element();

_system.configure->Configure(
entity, sdf,
this->entityCompMgr,
this->eventMgr);
}

// Update callbacks will be handled later, add to queue
std::lock_guard<std::mutex> lock(this->pendingSystemsMutex);
this->pendingSystems.push_back(_system);
this->systemMgr->AddSystem(_system, entity, sdf);
mjcarroll marked this conversation as resolved.
Show resolved Hide resolved
this->systemMgr->ConfigurePendingSystems(this->entityCompMgr, this->eventMgr);
}

/////////////////////////////////////////////////
void SimulationRunner::AddSystemToRunner(SystemInternal _system)
void SimulationRunner::ProcessSystemQueue()
{
this->systems.push_back(_system);
auto pending = this->systemMgr->PendingCount();

if (_system.preupdate)
this->systemsPreupdate.push_back(_system.preupdate);
if (0 == pending)
return;

if (_system.update)
this->systemsUpdate.push_back(_system.update);
// If additional systems are to be added, stop the existing threads.
this->StopWorkerThreads();

if (_system.postupdate)
this->systemsPostupdate.push_back(_system.postupdate);
}
this->systemMgr->ActivatePendingSystems();

/////////////////////////////////////////////////
void SimulationRunner::ProcessSystemQueue()
{
std::lock_guard<std::mutex> lock(this->pendingSystemsMutex);
auto pending = this->pendingSystems.size();

if (pending > 0)
{
// If additional systems are to be added, stop the existing threads.
this->StopWorkerThreads();
}
auto threadCount = this->systemMgr->SystemsPostUpdate().size() + 1u;

for (const auto &system : this->pendingSystems)
{
this->AddSystemToRunner(system);
}
this->pendingSystems.clear();
igndbg << "Creating PostUpdate worker threads: "
<< threadCount << std::endl;

// If additional systems were added, recreate the worker threads.
if (pending > 0)
{
igndbg << "Creating PostUpdate worker threads: "
<< this->systemsPostupdate.size() + 1 << std::endl;
this->postUpdateStartBarrier = std::make_unique<Barrier>(threadCount);
this->postUpdateStopBarrier = std::make_unique<Barrier>(threadCount);

this->postUpdateStartBarrier =
std::make_unique<Barrier>(this->systemsPostupdate.size() + 1u);
this->postUpdateStopBarrier =
std::make_unique<Barrier>(this->systemsPostupdate.size() + 1u);
this->postUpdateThreadsRunning = true;
int id = 0;

this->postUpdateThreadsRunning = true;
int id = 0;
for (auto &system : this->systemMgr->SystemsPostUpdate())
{
igndbg << "Creating postupdate worker thread (" << id << ")" << std::endl;

for (auto &system : this->systemsPostupdate)
this->postUpdateThreads.push_back(std::thread([&, id]()
{
igndbg << "Creating postupdate worker thread (" << id << ")" << std::endl;

this->postUpdateThreads.push_back(std::thread([&, id]()
std::stringstream ss;
ss << "PostUpdateThread: " << id;
IGN_PROFILE_THREAD_NAME(ss.str().c_str());
while (this->postUpdateThreadsRunning)
{
std::stringstream ss;
ss << "PostUpdateThread: " << id;
IGN_PROFILE_THREAD_NAME(ss.str().c_str());
while (this->postUpdateThreadsRunning)
this->postUpdateStartBarrier->Wait();
if (this->postUpdateThreadsRunning)
{
this->postUpdateStartBarrier->Wait();
if (this->postUpdateThreadsRunning)
{
system->PostUpdate(this->currentInfo, this->entityCompMgr);
}
this->postUpdateStopBarrier->Wait();
system->PostUpdate(this->currentInfo, this->entityCompMgr);
}
igndbg << "Exiting postupdate worker thread ("
<< id << ")" << std::endl;
}));
id++;
}
this->postUpdateStopBarrier->Wait();
}
igndbg << "Exiting postupdate worker thread ("
<< id << ")" << std::endl;
}));
id++;
}
}

Expand All @@ -577,13 +538,13 @@ void SimulationRunner::UpdateSystems()

{
IGN_PROFILE("PreUpdate");
for (auto& system : this->systemsPreupdate)
for (auto& system : this->systemMgr->SystemsPreUpdate())
system->PreUpdate(this->currentInfo, this->entityCompMgr);
}

{
IGN_PROFILE("Update");
for (auto& system : this->systemsUpdate)
for (auto& system : this->systemMgr->SystemsUpdate())
system->Update(this->currentInfo, this->entityCompMgr);
}

Expand Down Expand Up @@ -903,19 +864,9 @@ void SimulationRunner::LoadPlugin(const Entity _entity,
const std::string &_name,
const sdf::ElementPtr &_sdf)
{
std::optional<SystemPluginPtr> system;
{
std::lock_guard<std::mutex> lock(this->systemLoaderMutex);
system = this->systemLoader->LoadPlugin(_fname, _name, _sdf);
}

// System correctly loaded from library
if (system)
{
this->AddSystem(system.value(), _entity, _sdf);
igndbg << "Loaded system [" << _name
<< "] for entity [" << _entity << "]" << std::endl;
}
this->systemMgr->LoadPlugin(_entity, _fname, _name, _sdf);
this->systemMgr->ConfigurePendingSystems(
this->entityCompMgr, this->eventMgr);
}

//////////////////////////////////////////////////
Expand Down Expand Up @@ -1085,8 +1036,7 @@ size_t SimulationRunner::EntityCount() const
/////////////////////////////////////////////////
size_t SimulationRunner::SystemCount() const
{
std::lock_guard<std::mutex> lock(this->pendingSystemsMutex);
return this->systems.size() + this->pendingSystems.size();
return this->systemMgr->TotalCount();
}

/////////////////////////////////////////////////
Expand Down
42 changes: 8 additions & 34 deletions src/SimulationRunner.hh
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@

#include "network/NetworkManager.hh"
#include "LevelManager.hh"
#include "SystemManager.hh"
#include "Barrier.hh"
#include "SystemInternal.hh"
#include "WorldControl.hh"

using namespace std::chrono_literals;
Expand Down Expand Up @@ -370,16 +370,6 @@ namespace ignition
/// Physics component of the world, if any.
public: void UpdatePhysicsParams();

/// \brief Implementation for AddSystem functions. This only adds systems
/// to a queue, the actual addition is performed by `AddSystemToRunner` at
/// the appropriate time.
/// \param[in] _system Generic representation of a system.
/// \param[in] _entity Entity received from AddSystem.
/// \param[in] _sdf SDF received from AddSystem.
private: void AddSystemImpl(SystemInternal _system,
std::optional<Entity> _entity = std::nullopt,
std::optional<std::shared_ptr<const sdf::Element>> _sdf = std::nullopt);

/// \brief Process entities with the components::Recreate component.
/// Put in a request to make them as removed
private: void ProcessRecreateEntitiesRemove();
Expand All @@ -399,27 +389,17 @@ namespace ignition
/// server is in the run state.
private: std::atomic<bool> running{false};

/// \brief All the systems.
private: std::vector<SystemInternal> systems;

/// \brief Pending systems to be added to systems.
private: std::vector<SystemInternal> pendingSystems;

/// \brief Mutex to protect pendingSystems
private: mutable std::mutex pendingSystemsMutex;

/// \brief Systems implementing PreUpdate
private: std::vector<ISystemPreUpdate *> systemsPreupdate;

/// \brief Systems implementing Update
private: std::vector<ISystemUpdate *> systemsUpdate;

/// \brief Systems implementing PostUpdate
private: std::vector<ISystemPostUpdate *> systemsPostupdate;
/// \brief Manager of all systems.
private: std::unique_ptr<SystemManager> systemMgr;

/// \brief Manager of all events.
/// Note: must be before EntityComponentManager
private: EventManager eventMgr;

/// \brief Manager of all systems.
/// Note: must be before EntityComponentManager
private: std::unique_ptr<SystemManager> systemMgr;

/// \brief Manager of all components.
private: EntityComponentManager entityCompMgr;

Expand Down Expand Up @@ -449,12 +429,6 @@ namespace ignition
/// \brief List of real times used to compute averages.
private: std::list<std::chrono::steady_clock::duration> realTimes;

/// \brief System loader, for loading system plugins.
private: SystemLoaderPtr systemLoader;

/// \brief Mutex to protect systemLoader
private: std::mutex systemLoaderMutex;

/// \brief Node for communication.
private: std::unique_ptr<transport::Node> node{nullptr};

Expand Down
8 changes: 8 additions & 0 deletions src/SystemInternal.hh
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ namespace ignition
/// Will be nullptr if the System doesn't implement this interface.
public: ISystemPostUpdate *postupdate = nullptr;

/// \brief Cached entity that was used to call `Configure` on the system
/// Useful for if a system needs to be reconfigured at runtime
public: Entity configureEntity = {kNullEntity};

/// \brief Cached sdf that was used to call `Configure` on the system
/// Useful for if a system needs to be reconfigured at runtime
public: std::shared_ptr<const sdf::Element> configureSdf = nullptr;

/// \brief Vector of queries and callbacks
public: std::vector<EntityQueryCallback> updates;
};
Expand Down
Loading