Skip to content

Commit

Permalink
Fix incomplete trace on asynchronous exception
Browse files Browse the repository at this point in the history
  • Loading branch information
alexbrasetvik committed Nov 24, 2011
2 parents 214f856 + 48a56cf commit 9d1830c
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 30 deletions.
4 changes: 4 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ Backward-incompatible changes:

instead.

Bug fixes:
- #21: When tracing, if a processor asynchronously raised an exception,
the trace token could be lost, causing the trace to be incomplete.

========================== Release 0.2.0 2011-10-17 ==========================

Features:
Expand Down
79 changes: 53 additions & 26 deletions piped/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,22 +294,28 @@ class _ProcessTracer(object):
"""

def __init__(self, token_value):
def __init__(self):
self.traced = list()
self.token_value = token_value
self.token_name = 'trace_token'
self.token_value = uuid.uuid4().get_hex()
self.token_name = 'trace_token_'+self.token_value

self.sentinel = object()
self.encoder = util.BatonJSONEncoder()

def find_trace_token(self, frame):
""" Returns the value of the trace token if it is in the current stack of frames. """
if self._is_the_trace_token_in_the_current_stack(frame):
def refresh_trace_token(self, frame):
""" Attempt to find and refresh the trace token in the given stack of frames.
:return: The value of the trace token value if it is found.
"""
if self._refresh_trace_token_if_found_in_the_current_stack(frame):
return self.token_value

def add_trace_entry(self, frame, kind, **state):
""" Add an entry to the trace if the correct trace token is in the current stack of frames. """
if not self._is_the_trace_token_in_the_current_stack(frame):
""" Add an entry to the trace if the correct trace token is in the current stack of frames.
:return: The state added to the trace.
"""
if not self._refresh_trace_token_if_found_in_the_current_stack(frame):
return

if state['source'] is None:
Expand All @@ -324,18 +330,16 @@ def add_trace_entry(self, frame, kind, **state):
elif kind in ('process_source_raised', '_process_consumer_raised', '_process_error_consumer', '_process_error_consumer_raised'):
state['failure'] = self._get_trace_failure()

if kind == 'process_source_raised':
state['destination'] = self._find_nearest_possible_source(frame)

self._append_to_trace(**state)

return state

def _find_nearest_possible_source(self, frame):
""" Returns the first 'self' variable in a stack of frames that is not part of piped.processing """
while frame:
possible_source = frame.f_locals.get('self', None)
if possible_source and not isinstance(possible_source, (TwistedProcessorGraphEvaluator, failure.Failure)):
if possible_source and not isinstance(possible_source, (TwistedProcessorGraphEvaluator, failure.Failure, defer.Deferred)):
return possible_source

frame = frame.f_back

def _get_trace_failure(self):
Expand All @@ -347,14 +351,25 @@ def _get_trace_failure(self):
traceback = reason.getTraceback(elideFrameworkCode=True, detail='brief')
)

def _is_the_trace_token_in_the_current_stack(self, frame):
""" Looks for the trace token in a stack of frames. """
# Look for the sentinel in the stack.
def _refresh_trace_token_if_found_in_the_current_stack(self, frame):
""" Looks for the trace token in a stack of frames and copies it to all frames
leading from the frame the token was found in to the given frame.
:return: True if the token was found, False otherwise.
"""

frames_visited = list()

while frame:
# It may be in the locals of the current frame
if frame.f_locals.get(self.token_name, self.sentinel) == self.token_value:
# if it is found, add it to the locals of the intermediary frames
for visited_frame in frames_visited:
visited_frame.f_locals[self.token_name] = self.token_value

return True

frames_visited.append(frame)
frame = frame.f_back

return False
Expand Down Expand Up @@ -386,23 +401,29 @@ def __init__(self, processor_graph, name=None):
def _add_trace_entry(cls, *a, **kw):
for tracer in cls.tracers:
frame = sys._getframe(1)
tracer.add_trace_entry(frame, *a, **kw)
entry = tracer.add_trace_entry(frame, *a, **kw)
if entry:
return entry


@classmethod
def _find_trace_token(cls):
def _refresh_trace_token(cls):
for tracer in cls.tracers:
frame = sys._getframe(1)
token = tracer.find_trace_token(frame)
token = tracer.refresh_trace_token(frame)
if token:
return token
else:
pass

@defer.inlineCallbacks
def _process(self, processor, baton, results):
# Get a copy of the trace token so that it is easier to find by the tracer.
# Refresh the trace token, which will copy the trace token as far down the stack
# as possible, so that it is easier to find by the tracer.
# This fixes a corner case where a processor performs some deep magic that would
# change the stack, making it hard (or even impossible) to locate the original
# trace token location.
trace_token = self._find_trace_token()
self._refresh_trace_token()

try:
# Profile the time each processor spends.
Expand Down Expand Up @@ -457,11 +478,13 @@ def process(self, baton):
results = list()

for source_processor in self.processor_graph.sources:
# store the trace entry before processing in order to use it as a destination if the processing raises an Exception
entry = self._add_trace_entry('process_source', source=None, destination=source_processor, baton=baton)
try:
self._add_trace_entry('process_source', source=None, destination=source_processor, baton=baton)
yield self._process(source_processor, baton, results)
except Exception as e:
self._add_trace_entry('process_source_raised', source=source_processor, destination=None, baton=baton)
source = entry['source'] if entry else None
self._add_trace_entry('process_source_raised', source=source_processor, destination=source, baton=baton)
raise

defer.returnValue(results)
Expand All @@ -473,10 +496,15 @@ def process(self, baton):
def traced_process(self, *a, **kw):
""" Traces and processes a baton asynchronously through a processor graph.
.. warn:: The act of tracing introduces a new variable in the locals of
all the traced frames on the form ``trace_token_UUID4``.
.. seealso:: :class:`_ProcessTracer` and :meth:`process`.
"""
trace_token = uuid.uuid4().get_hex()
tracer = _ProcessTracer(token_value=trace_token)
tracer = _ProcessTracer()
# add the token to the current locals, so _refresh_trace_token may find it
locals()[tracer.token_name] = tracer.token_value

try:
self.tracers.append(tracer)
results = yield self.process(*a, **kw)
Expand All @@ -486,7 +514,6 @@ def traced_process(self, *a, **kw):
self.tracers.remove(tracer)
defer.returnValue((results, tracer.traced))


def configure_processors(self, runtime_environment):
# give the processor a reference to its evaluator
for processor in self.processor_graph:
Expand Down
60 changes: 58 additions & 2 deletions piped/test/test_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
from mock import patch
from twisted.internet import reactor, defer
from twisted.trial import unittest
from twisted.python import failure
from twisted.python import failure, reflect
from zope import interface

from piped import exceptions, processing
from piped import exceptions, processing, util
from piped.processors import base, util_processors, pipeline_processors


Expand Down Expand Up @@ -107,6 +107,17 @@ def process(self, baton):
return baton


class DelayedErrbackProcessor(StubProcessor):
def __init__(self, delay=0, *a, **kw):
super(DelayedErrbackProcessor, self).__init__(*a, **kw)
self.delay = delay

@defer.inlineCallbacks
def process(self, baton):
yield util.wait(self.delay)
raise StubException


class ProcessorGraphTest(unittest.TestCase):

def setUp(self):
Expand Down Expand Up @@ -2023,6 +2034,51 @@ def test_tracing_across_pipelines(self):
dict(source=run_pipeline, destination=None, baton=dict(n=2)),
])

@defer.inlineCallbacks
def test_tracing_across_pipelines_with_delayed_failures(self):
""" If a target pipeline raises an exception asynchronously, much of the stack is
lost, but tracing should still work. """
runtime_environment = processing.RuntimeEnvironment()
runtime_environment.configure()

nested_pg = processing.ProcessorGraph()
nested_raiser = DelayedErrbackProcessor()
nested_pg.get_builder().add_processor(nested_raiser)
nested_evaluator = processing.TwistedProcessorGraphEvaluator(nested_pg, name='test_pipeline2')

pg = processing.ProcessorGraph()
builder = pg.get_builder()

passthrough = util_processors.Passthrough()
incrementing = IncrementingProcessor()
for_each = pipeline_processors.ForEach(pipeline='test_pipeline2', input_path='for_each_input', output_path=None)

builder.add_processor(passthrough).add_processor(incrementing).add_processor(for_each)
evaluator = processing.TwistedProcessorGraphEvaluator(pg, name='test_pipeline')

evaluator.configure_processors(runtime_environment)
for_each.pipeline_dependency.is_ready = True
for_each.pipeline_dependency.on_resource_ready(nested_evaluator)

results, trace = yield evaluator.traced_process(dict(n=0, for_each_input=[1,2]))

self.assertEquals(results, [dict(n=1, for_each_input=[1,2])])
self.assertTraceEquals(trace, [
dict(source=self, destination=passthrough, baton=dict(n=0, for_each_input=[1,2])),
dict(source=passthrough, destination=incrementing, baton=dict(n=0, for_each_input=[1,2])),
dict(source=incrementing, destination=for_each, baton=dict(n=1, for_each_input=[1,2])),

# for_each -> to the nested pipeline
dict(source=for_each, destination=nested_raiser, baton=1),
dict(source=nested_raiser, destination=for_each, baton=1, failure=dict(type=StubException, args=())),

# the for_each processor ignores the error and processes the second baton:
dict(source=for_each, destination=nested_raiser, baton=2),
dict(source=nested_raiser, destination=for_each, baton=2, failure=dict(type=StubException, args=())),

dict(source=for_each, destination=None, baton=dict(n=1, for_each_input=[1,2])),
])

@defer.inlineCallbacks
def test_tracing_across_pipelines_via_proxy_object(self):
""" If any non-pipeline proxy object invokes a pipeline, it should show up
Expand Down
8 changes: 6 additions & 2 deletions piped/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,11 @@ def _process_input(self):
self._waiting_on_processor = None


class NonCleaningFailure(failure.Failure):
# we store a reference to the superclass of NonCleaningFailure here because it may change
# due to the monkey-patching performed when piped is started with the -D argument
NonCleaningFailureSuperClass = failure.Failure

class NonCleaningFailure(NonCleaningFailureSuperClass):
""" A Failure subclass that doesn't replace its traceback with repr'd objects. """

def __init__(self, *a, **kw):
Expand All @@ -581,7 +585,7 @@ def __init__(self, *a, **kw):
if (twisted.version.major, twisted.version.minor) > (11, 0):
kw.setdefault('captureVars', True)

failure.Failure.__init__(self, *a, **kw)
NonCleaningFailureSuperClass.__init__(self, *a, **kw)

def cleanFailure(self):
pass
Expand Down

0 comments on commit 9d1830c

Please sign in to comment.