Skip to content

Commit

Permalink
Fix flatten() Operator With Infinite Sources (nv-morpheus#117)
Browse files Browse the repository at this point in the history
* Reverting to the old flatten

* Getting tests to pass
  • Loading branch information
mdemoret-nv authored Jul 5, 2022
1 parent b2cc5b1 commit 2c8b1c6
Showing 1 changed file with 64 additions and 24 deletions.
88 changes: 64 additions & 24 deletions python/srf/_pysrf/src/operators.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <pybind11/pytypes.h>
#include <rxcpp/rx.hpp>

#include <exception>
#include <functional>
#include <iterator>
#include <memory>
Expand Down Expand Up @@ -65,27 +66,62 @@ PythonOperator OperatorsProxy::flatten()
{
// Build and return the map operator
return PythonOperator("flatten", [=](PyObjectObservable source) {
return source.concat_map([=](PyHolder data_object) {
AcquireGIL gil;

// Convert to a vector to allow releasing the GIL
std::vector<PyHolder> obj_list;

{
// Convert to C++ vector while we have the GIL. The list will go out of scope in this block
py::list l = py::object(std::move(data_object));
return rxcpp::observable<>::create<PyHolder>([=](PyObjectSubscriber sink) {
source.subscribe(
sink,
[sink](PyHolder data_object) {
try
{
AcquireGIL gil;

// Convert to a vector to allow releasing the GIL
std::vector<PyHolder> obj_list;

{
// Convert to C++ vector while we have the GIL. The list will go out of scope in this block
py::list l = py::object(std::move(data_object));

for (const auto& item : l)
{
// This increases the ref count by one but thats fine since the list will go out of
// scope and deref all its elements
obj_list.emplace_back(std::move(py::reinterpret_borrow<py::object>(item)));
}
}

if (sink.is_subscribed())
{
// Release the GIL before calling on_next
gil.release();

// Loop over the list
for (auto& i : obj_list)
{
sink.on_next(std::move(i));
}
}
} catch (py::error_already_set& err)
{
// Need the GIL here
AcquireGIL gil;

for (const auto& item : l)
{
// This increases the ref count by one but thats fine since the list will go out of
// scope and deref all its elements
obj_list.emplace_back(std::move(py::reinterpret_borrow<py::object>(item)));
}
}
py::print("Python error in callback hit!");
py::print(err.what());

gil.release();
// Release before calling on_error
gil.release();

return rxcpp::observable<>::iterate(obj_list);
sink.on_error(std::current_exception());
}
},
[sink](std::exception_ptr ex) {
// Forward
sink.on_error(std::move(ex));
},
[sink]() {
// Forward
sink.on_completed();
});
});
});
}
Expand All @@ -107,23 +143,27 @@ PythonOperator OperatorsProxy::on_completed(std::function<py::object()> finally_
{
return PythonOperator("on_completed", [=](PyObjectObservable source) {
// Make a new observable
return rxcpp::observable<>::create<PyHolder>([=](PyObjectSubscriber sub) {
return rxcpp::observable<>::create<PyHolder>([=](PyObjectSubscriber sink) {
source.subscribe(rxcpp::make_observer_dynamic<PyHolder>(
[=](PyHolder x) {
[sink](PyHolder x) {
// Forward
sink.on_next(std::move(x));
},
[sink](std::exception_ptr ex) {
// Forward
sub.on_next(std::move(x));
sink.on_error(std::move(ex));
},
[=]() {
[sink, finally_fn]() {
// In finally function, call the wrapped function
auto ret_val = finally_fn();

if (ret_val && !ret_val.is_none())
{
sub.on_next(std::move(ret_val));
sink.on_next(std::move(ret_val));
}

// Call on_completed
sub.on_completed();
sink.on_completed();
}));
});
});
Expand Down

0 comments on commit 2c8b1c6

Please sign in to comment.