Skip to content

Commit

Permalink
Resolves issue-32, re-add stats watchers to Rx and Python nodes (nv-m…
Browse files Browse the repository at this point in the history
…orpheus#130)

Closes nv-morpheus#32 

Fixes benchmarking and stat collection unit tests.

Authors:
  - Devin Robison (https://github.com/drobison00)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: nv-morpheus#130
  • Loading branch information
drobison00 authored Jul 13, 2022
1 parent 96b0a9c commit f4da153
Show file tree
Hide file tree
Showing 8 changed files with 326 additions and 271 deletions.
2 changes: 1 addition & 1 deletion ci/scripts/jenkins/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ set +e
# * test_srf_private - https://github.com/nv-morpheus/SRF/issues/33
# * nvrpc - https://github.com/nv-morpheus/SRF/issues/34
ctest --output-on-failure \
--exclude-regex "test_srf_benchmarking|test_srf_private|nvrpc" \
--exclude-regex "test_srf_private|nvrpc" \
--output-junit ${REPORTS_DIR}/report_ctest.xml

CTEST_RESULTS=$?
Expand Down
6 changes: 3 additions & 3 deletions cmake/deps/Configure_boost.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ function(find_and_configure_boost_true_cmake version)

rapids_cpm_find(Boost ${version}
GLOBAL_TARGETS
Boost::context Boost::fiber Boost::filesystem Boost::system
Boost::context Boost::fiber Boost::hana Boost::filesystem Boost::system
BUILD_EXPORT_SET
${PROJECT_NAME}-core-exports
INSTALL_EXPORT_SET
Expand All @@ -55,7 +55,7 @@ function(find_and_configure_boost_boost_cmake version)

rapids_cpm_find(Boost ${version}
GLOBAL_TARGETS
Boost::context Boost::fiber Boost::filesystem Boost::system
Boost::context Boost::fiber Boost::hana Boost::filesystem Boost::system
BUILD_EXPORT_SET
${PROJECT_NAME}-core-exports
INSTALL_EXPORT_SET
Expand All @@ -72,7 +72,7 @@ function(find_and_configure_boost_boost_cmake version)
# Now add it to the list of packages to install
rapids_export_package(INSTALL Boost
${PROJECT_NAME}-core-exports
GLOBAL_TARGETS Boost::context Boost::fiber Boost::filesystem Boost::system
GLOBAL_TARGETS Boost::context Boost::fiber Boost::hana Boost::filesystem Boost::system
)

# Overwrite the default package contents
Expand Down
81 changes: 66 additions & 15 deletions include/srf/segment/builder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#pragma once

#include "srf/benchmarking/trace_statistics.hpp"
#include "srf/engine/segment/ibuilder.hpp"
#include "srf/exceptions/runtime_error.hpp"
#include "srf/node/edge_builder.hpp"
Expand All @@ -34,6 +35,7 @@
#include "srf/segment/runnable.hpp"
#include "srf/utils/macros.hpp"

#include <boost/hana.hpp>
#include <glog/logging.h>
#include <rxcpp/rx-observable.hpp>
#include <rxcpp/rx-observer.hpp>
Expand All @@ -45,6 +47,44 @@
#include <string>
#include <utility>

namespace {
namespace hana = boost::hana;

template <typename T>
auto has_source_add_watcher =
hana::is_valid([](auto&& thing) -> decltype(std::forward<decltype(thing)>(thing).source_add_watcher(
std::declval<std::shared_ptr<srf::WatcherInterface>>())) {});

template <typename T>
auto has_sink_add_watcher =
hana::is_valid([](auto&& thing) -> decltype(std::forward<decltype(thing)>(thing).sink_add_watcher(
std::declval<std::shared_ptr<srf::WatcherInterface>>())) {});

template <typename T>
void add_stats_watcher_if_rx_source(T& thing, std::string name)
{
return hana::if_(
has_source_add_watcher<T>(thing),
[name](auto&& object) {
auto trace_stats = srf::benchmarking::TraceStatistics::get_or_create(name);
std::forward<decltype(object)>(object).source_add_watcher(trace_stats);
},
[name](auto&&) {})(thing);
}

template <typename T>
void add_stats_watcher_if_rx_sink(T& thing, std::string name)
{
return hana::if_(
has_sink_add_watcher<T>(thing),
[name](auto&& object) {
auto trace_stats = srf::benchmarking::TraceStatistics::get_or_create(name);
std::forward<decltype(object)>(object).sink_add_watcher(trace_stats);
},
[name](auto&&) {})(thing);
}
} // namespace

namespace srf::segment {

class Builder final
Expand All @@ -67,37 +107,47 @@ class Builder final
template <typename ObjectT, typename... ArgsT>
std::shared_ptr<Object<ObjectT>> construct_object(std::string name, ArgsT&&... args)
{
return make_object(std::move(name), std::make_unique<ObjectT>(std::forward<ArgsT>(args)...));
auto uptr = std::make_unique<ObjectT>(std::forward<ArgsT>(args)...);

::add_stats_watcher_if_rx_source(*uptr, name);
::add_stats_watcher_if_rx_sink(*uptr, name);

return make_object(std::move(name), std::move(uptr));
}

template <typename SourceTypeT, typename CreateFnT>
template <typename SourceTypeT,
template <class, class = srf::runnable::Context> class NodeTypeT = node::RxSource,
typename CreateFnT>
auto make_source(std::string name, CreateFnT&& create_fn)
{
return make_object(std::move(name),
std::make_unique<node::RxSource<SourceTypeT>>(
rxcpp::observable<>::create<SourceTypeT>(std::forward<CreateFnT>(create_fn))));
return construct_object<NodeTypeT<SourceTypeT>>(
name, rxcpp::observable<>::create<SourceTypeT>(std::forward<CreateFnT>(create_fn)));
}

template <typename SinkTypeT, typename... ArgsT>
template <typename SinkTypeT,
template <class, class = srf::runnable::Context> class NodeTypeT = node::RxSink,
typename... ArgsT>
auto make_sink(std::string name, ArgsT&&... ops)
{
return make_object(std::move(name),
std::make_unique<node::RxSink<SinkTypeT>>(
rxcpp::make_observer_dynamic<SinkTypeT>(std::forward<ArgsT>(ops)...)));
return construct_object<NodeTypeT<SinkTypeT>>(name,
rxcpp::make_observer<SinkTypeT>(std::forward<ArgsT>(ops)...));
}

template <typename SinkTypeT, typename... ArgsT>
template <typename SinkTypeT,
template <class, class, class = srf::runnable::Context> class NodeTypeT = node::RxNode,
typename... ArgsT>
auto make_node(std::string name, ArgsT&&... ops)
{
return make_object(std::move(name),
std::make_unique<node::RxNode<SinkTypeT, SinkTypeT>>(std::forward<ArgsT>(ops)...));
return construct_object<NodeTypeT<SinkTypeT, SinkTypeT>>(name, std::forward<ArgsT>(ops)...);
}

template <typename SinkTypeT, typename SourceTypeT, typename... ArgsT>
template <typename SinkTypeT,
typename SourceTypeT,
template <class, class, class = srf::runnable::Context> class NodeTypeT = node::RxNode,
typename... ArgsT>
auto make_node(std::string name, ArgsT&&... ops)
{
return make_object(std::move(name),
std::make_unique<node::RxNode<SinkTypeT, SourceTypeT>>(std::forward<ArgsT>(ops)...));
return construct_object<NodeTypeT<SinkTypeT, SourceTypeT>>(name, std::forward<ArgsT>(ops)...);
}

template <typename SourceNodeTypeT, typename SinkNodeTypeT>
Expand Down Expand Up @@ -188,6 +238,7 @@ std::shared_ptr<Object<ObjectT>> Builder::make_object(std::string name, std::uni
{
auto segment_name = m_backend.name() + "/" + name;
auto segment_node = std::make_shared<Runnable<ObjectT>>(segment_name, std::move(node));

m_backend.add_runnable(name, segment_node);
m_backend.add_object(name, segment_node);
segment_object = segment_node;
Expand Down
12 changes: 6 additions & 6 deletions python/srf/_pysrf/include/pysrf/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ namespace pysrf {
// Export everything in the srf::pysrf namespace by default since we compile with -fvisibility=hidden
#pragma GCC visibility push(default)

template <typename InputT>
class PythonSink : public node::RxSink<InputT>,
template <typename InputT, typename ContextT = srf::runnable::Context>
class PythonSink : public node::RxSink<InputT, ContextT>,
public pysrf::AutoRegSinkAdapter<InputT>
{
using base_t = node::RxSink<InputT>;
Expand All @@ -183,8 +183,8 @@ class PythonSink : public node::RxSink<InputT>,
using node::RxSink<InputT>::RxSink;
};

template <typename InputT, typename OutputT>
class PythonNode : public node::RxNode<InputT, OutputT>,
template <typename InputT, typename OutputT, typename ContextT = srf::runnable::Context>
class PythonNode : public node::RxNode<InputT, OutputT, ContextT>,
public pysrf::AutoRegSourceAdapter<OutputT>,
public pysrf::AutoRegSinkAdapter<InputT>
{
Expand Down Expand Up @@ -226,8 +226,8 @@ class PythonNode : public node::RxNode<InputT, OutputT>,
}
};

template <typename OutputT>
class PythonSource : public node::RxSource<OutputT>,
template <typename OutputT, typename ContextT = srf::runnable::Context>
class PythonSource : public node::RxSource<OutputT, ContextT>,
public pysrf::AutoRegSourceAdapter<OutputT>
{
using base_t = node::RxSource<OutputT>;
Expand Down
9 changes: 3 additions & 6 deletions python/srf/_pysrf/src/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,7 @@ std::shared_ptr<srf::segment::ObjectProperties> SegmentProxy::make_sink(srf::seg
on_completed();
};

return self.construct_object<PythonSink<PyHolder>>(
name, rxcpp::make_observer<PyHolder>(on_next_w, on_error_w, on_completed_w));
return self.make_sink<PyHolder, PythonSink>(name, on_next_w, on_error_w, on_completed_w);
}

std::shared_ptr<srf::segment::ObjectProperties> SegmentProxy::get_ingress(
Expand All @@ -199,7 +198,7 @@ std::shared_ptr<srf::segment::ObjectProperties> SegmentProxy::get_egress(
std::shared_ptr<srf::segment::ObjectProperties> SegmentProxy::make_node(
srf::segment::Builder& self, const std::string& name, std::function<pybind11::object(pybind11::object object)> map_f)
{
auto node = self.construct_object<PythonNode<PyHolder, PyHolder>>(
return self.make_node<PyHolder, PyHolder, PythonNode>(
name, rxcpp::operators::map([map_f](PyHolder data_object) -> PyHolder {
try
{
Expand All @@ -220,16 +219,14 @@ std::shared_ptr<srf::segment::ObjectProperties> SegmentProxy::make_node(
// caught by python output.on_error(std::current_exception());
}
}));

return node;
}

std::shared_ptr<srf::segment::ObjectProperties> SegmentProxy::make_node_full(
srf::segment::Builder& self,
const std::string& name,
std::function<void(const pysrf::PyObjectObservable& obs, pysrf::PyObjectSubscriber& sub)> sub_fn)
{
auto node = self.construct_object<PythonNode<PyHolder, PyHolder>>(name);
auto node = self.make_node<PyHolder, PyHolder, PythonNode>(name);

node->object().make_stream([sub_fn](const PyObjectObservable& input) -> PyObjectObservable {
return rxcpp::observable<>::create<PyHolder>([input, sub_fn](pysrf::PyObjectSubscriber output) {
Expand Down
83 changes: 42 additions & 41 deletions python/tests/test_benchmarking.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,51 +14,50 @@
# limitations under the License.
# ===========================================================================

# import pathlib
# import random
import pathlib
import random

# import pytest
import pytest

# import srf
# #import srf.benchmarking
# from srf.core.options import PlacementStrategy
import srf
import srf.benchmarking
from srf.core.options import PlacementStrategy

# whereami = pathlib.Path(__file__).parent.resolve()
whereami = pathlib.Path(__file__).parent.resolve()

# # Add a little jitter to testing
# TEST_ITERATIONS = random.randint(10, 250)
# Add a little jitter to testing
TEST_ITERATIONS = random.randint(10, 250)

# def tracer_test_f(x):
# return x
def tracer_test_f(x):
return x

# @pytest.mark.xfail # issue#161
# def init_tracer_segment(seg: srf.Builder, watcher: srf.benchmarking.LatencyWatcher):
# ## CXX double source with heterogesrfus segment node composition
# ## print("Made it into init_tracer_segment\n", flush=True)
# python_tracer_source = watcher.make_tracer_source(seg, "tracer_source", False)
# python_node_1 = watcher.make_traced_node(seg, "python_traced_1", tracer_test_f)
# seg.make_edge(python_tracer_source, python_node_1)

# python_tracer_sink = watcher.make_tracer_sink(seg, "tracer_sink", lambda x: x)
# seg.make_edge(python_node_1, python_tracer_sink)

# @pytest.mark.xfail # issue#161
# def test_tracer_creation():
# options = srf.Options()
# options.placement.cpu_strategy = PlacementStrategy.PerMachine
# executor = srf.Executor(options)
#@pytest.mark.xfail # issue#161
def init_tracer_segment(seg: srf.Builder, watcher: srf.benchmarking.LatencyWatcher):
## CXX double source with heterogesrfus segment node composition
## print("Made it into init_tracer_segment\n", flush=True)
python_tracer_source = watcher.make_tracer_source(seg, "tracer_source", False)
python_node_1 = watcher.make_traced_node(seg, "python_traced_1", tracer_test_f)
seg.make_edge(python_tracer_source, python_node_1)

# latency_watcher = srf.benchmarking.LatencyWatcher(executor)
# latency_watcher.make_segment("tracer_segment", init_tracer_segment)
python_tracer_sink = watcher.make_tracer_sink(seg, "tracer_sink", lambda x: x)
seg.make_edge(python_node_1, python_tracer_sink)

# latency_watcher.tracer_count(TEST_ITERATIONS)
# latency_watcher.trace_until_notified()
# latency_watcher.shutdown()

# tracer_metrics = latency_watcher.aggregate_tracers()
# # print(json.dumps(tracer_metrics, indent=2))
# metadata = tracer_metrics["metadata"]
# assert (metadata["tracer_count"] == TEST_ITERATIONS)
#@pytest.mark.xfail # issue#161
#def test_tracer_creation():
# options = srf.Options()
# executor = srf.Executor(options)
#
# latency_watcher = srf.benchmarking.LatencyWatcher(executor)
# latency_watcher.make_segment("tracer_segment", init_tracer_segment)
#
# latency_watcher.tracer_count(TEST_ITERATIONS)
# latency_watcher.trace_until_notified()
# latency_watcher.shutdown()
#
# tracer_metrics = latency_watcher.aggregate_tracers()
# # print(json.dumps(tracer_metrics, indent=2))
# metadata = tracer_metrics["metadata"]
# assert (metadata["tracer_count"] == TEST_ITERATIONS)

# @pytest.mark.xfail # issue#161
# def test_latency_tracer_counts_match_framework_stats():
Expand Down Expand Up @@ -170,7 +169,9 @@

# srf.benchmarking.reset_tracing_stats()

# if (__name__ in ("__main__", )):
# test_tracer_creation()
# test_latency_tracer_counts_match_framework_stats()
# test_throughput_tracer_counts_match_framework_stats()
if (__name__ in ("__main__", )):
#init_tracer_segment()
#test_tracer_creation()
#test_latency_tracer_counts_match_framework_stats()
#test_throughput_tracer_counts_match_framework_stats()
pass
Loading

0 comments on commit f4da153

Please sign in to comment.