Skip to content

Commit

Permalink
Merge pull request #47028 from makortel/alpakaSynchronize
Browse files Browse the repository at this point in the history
Add central `synchronize` configuration parameter to Alpaka modules
  • Loading branch information
cmsbuild authored Dec 30, 2024
2 parents 9aa0a45 + 5469807 commit 2f2cacb
Show file tree
Hide file tree
Showing 27 changed files with 205 additions and 92 deletions.
28 changes: 25 additions & 3 deletions HeterogeneousCore/AlpakaCore/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ The `...` can in principle be any of the module abilities listed in the linked T

New base classes (or other functionality) can be added based on new use cases that come up.

The Alpaka-based ESProducers should use the `ESProducer` base class (`#include "HeterogeneousCore/AlpakaCore/interface/alpaka/ESProducer.h"`). Note that the Alpaka-based ESProducer constructor must pass the argument `edm::ParameterSet` object to the constructor of the `ESProducer` base class.
The Alpaka-based ESProducers should use the `ESProducer` base class (`#include "HeterogeneousCore/AlpakaCore/interface/alpaka/ESProducer.h"`).

Note that both the Alpaka-based EDProducer and ESProducer constructors must pass the argument `edm::ParameterSet` object to the constructor of their base class.

Note that currently Alpaka-based ESSources are not supported. If you need to produce EventSetup data products into a Record for which there is no ESSource yet, use [`EmptyESSource`](https://twiki.cern.ch/twiki/bin/view/CMSPublic/SWGuideEDMParametersForModules#EmptyESSource).

Expand Down Expand Up @@ -237,8 +239,10 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
class ExampleAlpakaProducer : public global::EDProducer<> {
public:
ExampleAlpakaProducer(edm::ParameterSet const& iConfig)
// produces() must not specify the product type, it is deduced from deviceToken_
: deviceToken_{produces()}, size_{iConfig.getParameter<int32_t>("size")} {}
: EDProducer<>(iConfig),
// produces() must not specify the product type, it is deduced from deviceToken_
deviceToken_{produces()},
size_{iConfig.getParameter<int32_t>("size")} {}

// device::Event and device::EventSetup are defined in ALPAKA_ACCELERATOR_NAMESPACE as well
void produce(edm::StreamID sid, device::Event& iEvent, device::EventSetup const& iSetup) const override {
Expand Down Expand Up @@ -479,6 +483,24 @@ process.ProcessAcceleratorAlpaka.setBackend("serial_sync") # or "cuda_async" or
process.options.accelerators = ["cpu"] # or "gpu-nvidia" or "gpu-amd"
```

### Blocking synchronization (for testing)

While the general approach is to favor asynchronous operations with non-blocking synchronization, for testing purposes it can be useful to synchronize the EDModule's `acquire()` / `produce()` or ESProducer's production functions in a blocking way. Such a blocking synchronization can be specified for individual modules via the `alpaka` `PSet` along
```python
process.producer = cms.EDProducer("ExampleAlpakaProducer@alpaka",
...
alpaka = cms.untracked.PSet(
synchronize = cms.untracked.bool(True)
)
)
```

The blocking synchronization can be specified for all Alpaka modules via the `ProcessAcceleratorAlpaka` along
```python
process.ProcessAcceleratorAlpaka.setSynchronize(True)
```
Note that the possible per-module parameter overrides this global setting.


## Unit tests

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
public:
// TODO: WaitingTaskWithArenaHolder not really needed for host synchronous case
// Constructor overload to be called from acquire()
EDMetadataAcquireSentry(edm::StreamID stream, edm::WaitingTaskWithArenaHolder holder);
EDMetadataAcquireSentry(edm::StreamID stream, edm::WaitingTaskWithArenaHolder holder, bool synchronize);

// Constructor overload to be called from registerTransformAsync()
EDMetadataAcquireSentry(Device const& device, edm::WaitingTaskWithArenaHolder holder);
EDMetadataAcquireSentry(Device const& device, edm::WaitingTaskWithArenaHolder holder, bool synchronize = false);

EDMetadataAcquireSentry(EDMetadataAcquireSentry const&) = delete;
EDMetadataAcquireSentry& operator=(EDMetadataAcquireSentry const&) = delete;
Expand All @@ -40,6 +40,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
std::shared_ptr<EDMetadata> metadata_;

edm::WaitingTaskWithArenaHolder waitingTaskHolder_;
bool const synchronize_;
};
} // namespace detail
} // namespace ALPAKA_ACCELERATOR_NAMESPACE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
class EDMetadataSentry {
public:
// For normal module
EDMetadataSentry(edm::StreamID stream);
EDMetadataSentry(edm::StreamID stream, bool synchronize);

// For ExternalWork-module's produce()
EDMetadataSentry(std::shared_ptr<EDMetadata> metadata) : metadata_(std::move(metadata)) {}
EDMetadataSentry(std::shared_ptr<EDMetadata> metadata, bool synchronize)
: metadata_(std::move(metadata)), synchronize_(synchronize) {}

EDMetadataSentry(EDMetadataSentry const&) = delete;
EDMetadataSentry& operator=(EDMetadataSentry const&) = delete;
Expand All @@ -31,6 +32,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {

private:
std::shared_ptr<EDMetadata> metadata_;
bool const synchronize_;
};
} // namespace detail
} // namespace ALPAKA_ACCELERATOR_NAMESPACE
Expand Down
4 changes: 2 additions & 2 deletions HeterogeneousCore/AlpakaCore/interface/alpaka/ESProducer.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include "FWCore/Framework/interface/ESProducer.h"
#include "FWCore/Framework/interface/MakeDataException.h"
#include "FWCore/Framework/interface/produce_helpers.h"
#include "HeterogeneousCore/AlpakaCore/interface/module_backend_config.h"
#include "HeterogeneousCore/AlpakaCore/interface/modulePrevalidate.h"
#include "HeterogeneousCore/AlpakaCore/interface/alpaka/ESDeviceProduct.h"
#include "HeterogeneousCore/AlpakaCore/interface/alpaka/ESDeviceProductType.h"
#include "HeterogeneousCore/AlpakaCore/interface/alpaka/Record.h"
Expand All @@ -30,7 +30,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
public:
static void prevalidate(edm::ConfigurationDescriptions& descriptions) {
Base::prevalidate(descriptions);
cms::alpakatools::module_backend_config(descriptions);
cms::alpakatools::modulePrevalidate(descriptions);
}

protected:
Expand Down
16 changes: 14 additions & 2 deletions HeterogeneousCore/AlpakaCore/interface/alpaka/ProducerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
#include "FWCore/Framework/interface/FrameworkfwdMostUsed.h"
#include "FWCore/Framework/interface/moduleAbilities.h"
#include "FWCore/Framework/interface/Event.h"
#include "FWCore/ParameterSet/interface/ParameterSet.h"
#include "FWCore/Utilities/interface/EDPutToken.h"
#include "FWCore/Utilities/interface/Transition.h"
#include "HeterogeneousCore/AlpakaCore/interface/alpaka/DeviceProductType.h"
#include "HeterogeneousCore/AlpakaCore/interface/alpaka/EDMetadataAcquireSentry.h"
#include "HeterogeneousCore/AlpakaCore/interface/EventCache.h"
#include "HeterogeneousCore/AlpakaCore/interface/QueueCache.h"
#include "HeterogeneousCore/AlpakaCore/interface/module_backend_config.h"
#include "HeterogeneousCore/AlpakaCore/interface/modulePrevalidate.h"
#include "HeterogeneousCore/AlpakaInterface/interface/Backend.h"
#include "HeterogeneousCore/AlpakaInterface/interface/CopyToHost.h"

Expand Down Expand Up @@ -46,7 +47,15 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
using Base = BaseT<Args..., edm::Transformer>;

public:
// TODO: default constructor to be removed after all derived classes have been migrated
ProducerBase() : backendToken_(Base::produces("backend")) {}
ProducerBase(edm::ParameterSet const& iConfig)
: backendToken_(Base::produces("backend")),
// The 'synchronize' parameter can be unset in Alpaka
// modules specified with the namespace prefix instead if
// '@alpaka' suffix
synchronize_(iConfig.getUntrackedParameter<edm::ParameterSet>("alpaka").getUntrackedParameter<bool>(
"synchronize", false)) {}

template <edm::Transition Tr = edm::Transition::Event>
[[nodiscard]] auto produces() noexcept {
Expand All @@ -60,16 +69,19 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {

static void prevalidate(edm::ConfigurationDescriptions& descriptions) {
Base::prevalidate(descriptions);
cms::alpakatools::module_backend_config(descriptions);
cms::alpakatools::modulePrevalidate(descriptions);
}

protected:
void putBackend(edm::Event& iEvent) const {
iEvent.emplace(this->backendToken_, static_cast<unsigned short>(kBackend));
}

bool synchronize() const { return synchronize_; }

private:
edm::EDPutTokenT<unsigned short> const backendToken_;
bool const synchronize_ = false;

template <typename TProducer, edm::Transition Tr>
friend class ProducerBaseAdaptor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
static_assert(not edm::CheckAbility<edm::module::Abilities::kExternalWork, Args...>::kHasIt,
"ALPAKA_ACCELERATOR_NAMESPACE::global::EDProducer may not be used with ExternalWork ability. "
"Please use ALPAKA_ACCELERATOR_NAMESPACE::stream::SynchronizingEDProducer instead.");
using Base = ProducerBase<edm::global::EDProducer, Args...>;

protected:
EDProducer() = default; // to be removed in the near future
EDProducer(edm::ParameterSet const iConfig) : Base(iConfig) {}

public:
void produce(edm::StreamID sid, edm::Event& iEvent, edm::EventSetup const& iSetup) const final {
detail::EDMetadataSentry sentry(sid);
detail::EDMetadataSentry sentry(sid, this->synchronize());
device::Event ev(iEvent, sentry.metadata());
device::EventSetup const es(iSetup, ev.device());
produce(sid, ev, es);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
static_assert(not edm::CheckAbility<edm::module::Abilities::kExternalWork, Args...>::kHasIt,
"ALPAKA_ACCELERATOR_NAMESPACE::stream::EDProducer may not be used with ExternalWork ability. "
"Please use ALPAKA_ACCELERATOR_NAMESPACE::stream::SynchronizingEDProducer instead.");
using Base = ProducerBase<edm::stream::EDProducer, Args...>;

protected:
EDProducer() = default; // to be removed in the near future
EDProducer(edm::ParameterSet const iConfig) : Base(iConfig) {}

public:
void produce(edm::Event& iEvent, edm::EventSetup const& iSetup) final {
detail::EDMetadataSentry sentry(iEvent.streamID());
detail::EDMetadataSentry sentry(iEvent.streamID(), this->synchronize());
device::Event ev(iEvent, sentry.metadata());
device::EventSetup const es(iSetup, ev.device());
produce(ev, es);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,25 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
not edm::CheckAbility<edm::module::Abilities::kExternalWork, Args...>::kHasIt,
"ExternalWork ability is redundant with ALPAKA_ACCELERATOR_NAMESPACE::stream::SynchronizingEDProducer."
"Please remove it.");
using Base = ProducerBase<edm::stream::EDProducer, edm::ExternalWork, Args...>;

protected:
SynchronizingEDProducer() = default; // to be removed in the near future
SynchronizingEDProducer(edm::ParameterSet const iConfig) : Base(iConfig) {}

public:
void acquire(edm::Event const& iEvent,
edm::EventSetup const& iSetup,
edm::WaitingTaskWithArenaHolder holder) final {
detail::EDMetadataAcquireSentry sentry(iEvent.streamID(), std::move(holder));
detail::EDMetadataAcquireSentry sentry(iEvent.streamID(), std::move(holder), this->synchronize());
device::Event const ev(iEvent, sentry.metadata());
device::EventSetup const es(iSetup, ev.device());
acquire(ev, es);
metadata_ = sentry.finish();
}

void produce(edm::Event& iEvent, edm::EventSetup const& iSetup) final {
detail::EDMetadataSentry sentry(std::move(metadata_));
detail::EDMetadataSentry sentry(std::move(metadata_), this->synchronize());
device::Event ev(iEvent, sentry.metadata());
device::EventSetup const es(iSetup, ev.device());
produce(ev, es);
Expand Down
10 changes: 10 additions & 0 deletions HeterogeneousCore/AlpakaCore/interface/modulePrevalidate.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#ifndef HeterogeneousCore_AlpakaCore_interface_modulePrevalidate_h
#define HeterogeneousCore_AlpakaCore_interface_modulePrevalidate_h

#include "FWCore/Framework/interface/FrameworkfwdMostUsed.h"

namespace cms::alpakatools {
void modulePrevalidate(edm::ConfigurationDescriptions& iDesc);
} // namespace cms::alpakatools

#endif
10 changes: 0 additions & 10 deletions HeterogeneousCore/AlpakaCore/interface/module_backend_config.h

This file was deleted.

20 changes: 18 additions & 2 deletions HeterogeneousCore/AlpakaCore/python/ProcessAcceleratorAlpaka.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from HeterogeneousCore.Common.PlatformStatus import PlatformStatus

class ModuleTypeResolverAlpaka:
def __init__(self, accelerators, backend):
def __init__(self, accelerators, backend, synchronize):
# first element is used as the default if nothing is set
self._valid_backends = []
if "gpu-nvidia" in accelerators:
Expand All @@ -23,6 +23,7 @@ def __init__(self, accelerators, backend):
if backend != self._valid_backends[0]:
self._valid_backends.remove(backend)
self._valid_backends.insert(0, backend)
self._synchronize = synchronize

def plugin(self):
return "ModuleTypeResolverAlpaka"
Expand All @@ -31,6 +32,11 @@ def setModuleVariant(self, module):
if module.type_().endswith("@alpaka"):
defaultBackend = self._valid_backends[0]
if hasattr(module, "alpaka"):
# Ensure the untrackedness already here, because the
# C++ ModuleTypeResolverAlpaka relies on the
# untrackedness (before the configuration validation)
if module.alpaka.isTracked():
raise cms.EDMException(cms.edm.errors.Configuration, "The 'alpaka' PSet in module '{}' is tracked, but it should be untracked".format(module.label()))
if hasattr(module.alpaka, "backend"):
if module.alpaka.backend == "":
module.alpaka.backend = defaultBackend
Expand All @@ -42,6 +48,12 @@ def setModuleVariant(self, module):
module.alpaka = cms.untracked.PSet(
backend = cms.untracked.string(defaultBackend)
)
isDefaultValue = lambda v: \
isinstance(v, type(cms.optional.untracked.bool)) \
and not v.isTracked() \
and v.isCompatibleCMSType(cms.bool)
if not hasattr(module.alpaka, "synchronize") or isDefaultValue(module.alpaka.synchronize):
module.alpaka.synchronize = cms.untracked.bool(self._synchronize)

class ProcessAcceleratorAlpaka(cms.ProcessAccelerator):
"""ProcessAcceleratorAlpaka itself does not define or inspect
Expand All @@ -53,14 +65,18 @@ class ProcessAcceleratorAlpaka(cms.ProcessAccelerator):
def __init__(self):
super(ProcessAcceleratorAlpaka, self).__init__()
self._backend = None
self._synchronize = False

# User-facing interface
def setBackend(self, backend):
self._backend = backend

def setSynchronize(self, synchronize):
self._synchronize = synchronize

# Framework-facing interface
def moduleTypeResolver(self, accelerators):
return ModuleTypeResolverAlpaka(accelerators, self._backend)
return ModuleTypeResolverAlpaka(accelerators, self._backend, self._synchronize)

def apply(self, process, accelerators):
# Propagate the AlpakaService messages through the MessageLogger
Expand Down
18 changes: 13 additions & 5 deletions HeterogeneousCore/AlpakaCore/src/alpaka/EDMetadataAcquireSentry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@

namespace ALPAKA_ACCELERATOR_NAMESPACE {
namespace detail {
EDMetadataAcquireSentry::EDMetadataAcquireSentry(edm::StreamID streamID, edm::WaitingTaskWithArenaHolder holder)
: EDMetadataAcquireSentry(detail::chooseDevice(streamID), std::move(holder)) {}
EDMetadataAcquireSentry::EDMetadataAcquireSentry(edm::StreamID streamID,
edm::WaitingTaskWithArenaHolder holder,
bool synchronize)
: EDMetadataAcquireSentry(detail::chooseDevice(streamID), std::move(holder), synchronize) {}

EDMetadataAcquireSentry::EDMetadataAcquireSentry(Device const& device, edm::WaitingTaskWithArenaHolder holder)
: waitingTaskHolder_(std::move(holder)) {
EDMetadataAcquireSentry::EDMetadataAcquireSentry(Device const& device,
edm::WaitingTaskWithArenaHolder holder,
bool synchronize)
: waitingTaskHolder_(std::move(holder)), synchronize_(synchronize) {
#ifdef ALPAKA_ACC_CPU_B_SEQ_T_SEQ_ENABLED
// all synchronous backends
metadata_ = std::make_shared<EDMetadata>(cms::alpakatools::getQueueCache<Queue>().get(device));
Expand All @@ -23,7 +27,11 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
#ifndef ALPAKA_ACC_CPU_B_SEQ_T_SEQ_ENABLED
// all asynchronous backends
std::shared_ptr<EDMetadata> EDMetadataAcquireSentry::finish() {
metadata_->enqueueCallback(std::move(waitingTaskHolder_));
if (synchronize_) {
alpaka::wait(metadata_->queue());
} else {
metadata_->enqueueCallback(std::move(waitingTaskHolder_));
}
return std::move(metadata_);
}
#endif
Expand Down
15 changes: 11 additions & 4 deletions HeterogeneousCore/AlpakaCore/src/alpaka/EDMetadataSentry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

namespace ALPAKA_ACCELERATOR_NAMESPACE {
namespace detail {
EDMetadataSentry::EDMetadataSentry(edm::StreamID streamID) {
EDMetadataSentry::EDMetadataSentry(edm::StreamID streamID, bool synchronize) : synchronize_(synchronize) {
auto const& device = detail::chooseDevice(streamID);
#ifdef ALPAKA_ACC_CPU_B_SEQ_T_SEQ_ENABLED
metadata_ = std::make_shared<EDMetadata>(cms::alpakatools::getQueueCache<Queue>().get(device));
Expand All @@ -16,12 +16,19 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
}

void EDMetadataSentry::finish(bool launchedAsyncWork) {
if (launchedAsyncWork) {
if constexpr (not std::is_same_v<Queue, alpaka::Queue<Device, alpaka::Blocking>>) {
if (launchedAsyncWork and synchronize_) {
alpaka::wait(metadata_->queue());
}
}

if (launchedAsyncWork and not synchronize_) {
metadata_->recordEvent();
} else {
// If we are certain no asynchronous work was launched (i.e.
// the Queue was not used in any way), there is no need to
// synchronize, and the Event can be discarded.
// the Queue was not used in any way), or a blocking
// synchronization was explicitly requested, there is no need
// to synchronize later, and the Event can be discarded.
metadata_->discardEvent();
}
}
Expand Down
Loading

0 comments on commit 2f2cacb

Please sign in to comment.