Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Morpheus Shutdown Behavior On Exception #478

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ set(CMAKE_CXX_EXTENSIONS ON)
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
set(CMAKE_POSITION_INDEPENDENT_CODE TRUE)
set(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE)
set(CMAKE_BUILD_RPATH_USE_ORIGIN TRUE)
set(CMAKE_INSTALL_RPATH "$ORIGIN")

enable_testing()
Expand Down
3 changes: 2 additions & 1 deletion ci/scripts/github/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ ninja --version

rapids-logger "Configuring cmake for Morpheus"
cmake -B build -G Ninja ${CMAKE_BUILD_ALL_FEATURES} \
-DCCACHE_PROGRAM_PATH=$(which sccache) .
-DCCACHE_PROGRAM_PATH=$(which sccache) \
-DCMAKE_BUILD_RPATH_USE_ORIGIN=ON .

rapids-logger "Building Morpheus"
cmake --build build --parallel ${PARALLEL_LEVEL}
Expand Down
36 changes: 22 additions & 14 deletions morpheus/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,20 +247,28 @@ def stop(self):

async def join(self):

await self._srf_executor.join_async()

# First wait for all sources to stop. This only occurs after all messages have been processed fully
for s in list(self._sources):
await s.join()

# Now that there is no more data, call stop on all stages to ensure shutdown (i.e., for stages that have their
# own worker loop thread)
for s in list(self._stages):
s.stop()

# Now call join on all stages
for s in list(self._stages):
await s.join()
try:
await self._srf_executor.join_async()
except Exception:
logger.exception("Exception occurred in pipeline. Rethrowing")
raise
finally:
# Make sure these are always shut down even if there was an error
for s in list(self._sources):
s.stop()

# First wait for all sources to stop. This only occurs after all messages have been processed fully
for s in list(self._sources):
await s.join()

# Now that there is no more data, call stop on all stages to ensure shutdown (i.e., for stages that have
# their own worker loop thread)
for s in list(self._stages):
s.stop()

# Now call join on all stages
for s in list(self._stages):
await s.join()

async def build_and_start(self):

Expand Down