Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP INN-4152 explore dump #2024

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Fix invalid waitForEvent expression not failing run
  • Loading branch information
amh4r committed Dec 11, 2024
commit 06c917209e176842f8144a6767db84b541760527
99 changes: 61 additions & 38 deletions pkg/execution/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,48 @@ func (e *executor) HandleResponse(ctx context.Context, i *runInstance) error {
"workflow_id", i.md.ID.FunctionID.String(),
)

// This is a success, which means either a generator or a function result.
if i.resp.Err == nil && len(i.resp.Generator) > 0 {
// Handle generator responses then return.
if serr := e.HandleGeneratorResponse(ctx, i, i.resp); serr != nil {

// If this is an error compiling async expressions, fail the function.
shouldFailEarly := errors.Is(serr, &expressions.CompileError{}) || errors.Is(serr, state.ErrStateOverflowed) || errors.Is(serr, state.ErrFunctionOverflowed)

if shouldFailEarly {
var gracefulErr *state.WrappedStandardError
if hasGracefulErr := errors.As(serr, &gracefulErr); hasGracefulErr {
serialized := gracefulErr.Serialize(execution.StateErrorKey)
i.resp.Output = serialized
i.resp.Err = &gracefulErr.StandardError.Name

// Immediately fail the function.
i.resp.NoRetry = true

// This is required to get old history to look correct.
// Without it, the function run will have no output. We can
// probably delete this when we fully remove old history.
i.resp.Generator = []*state.GeneratorOpcode{}
}

for _, e := range e.lifecycles {
go e.OnStepFinished(context.WithoutCancel(ctx), i.md, i.item, i.edge, i.resp, serr)
}

if err := e.finalize(ctx, i.md, i.events, i.f.GetSlug(), e.assignedQueueShard, *i.resp); err != nil {
l.Error("error running finish handler", "error", err)
}

// Can be reached multiple times for parallel discovery steps
for _, e := range e.lifecycles {
go e.OnFunctionFinished(context.WithoutCancel(ctx), i.md, i.item, i.events, *i.resp)
}

return nil
}
}
}

for _, e := range e.lifecycles {
// OnStepFinished handles step success and step errors/failures. It is
// currently the responsibility of the lifecycle manager to handle the differing
Expand Down Expand Up @@ -945,46 +987,16 @@ func (e *executor) HandleResponse(ctx context.Context, i *runInstance) error {
}
}

// This is a success, which means either a generator or a function result.
if len(i.resp.Generator) > 0 {
// Handle generator responses then return.
if serr := e.HandleGeneratorResponse(ctx, i, i.resp); serr != nil {

// If this is an error compiling async expressions, fail the function.
shouldFailEarly := errors.Is(serr, &expressions.CompileError{}) || errors.Is(serr, state.ErrStateOverflowed) || errors.Is(serr, state.ErrFunctionOverflowed)

if shouldFailEarly {
var gracefulErr *state.WrappedStandardError
if hasGracefulErr := errors.As(serr, &gracefulErr); hasGracefulErr {
serialized := gracefulErr.Serialize(execution.StateErrorKey)
i.resp.Output = nil
i.resp.Err = &serialized
}

if err := e.finalize(ctx, i.md, i.events, i.f.GetSlug(), e.assignedQueueShard, *i.resp); err != nil {
l.Error("error running finish handler", "error", err)
}

// Can be reached multiple times for parallel discovery steps
for _, e := range e.lifecycles {
go e.OnFunctionFinished(context.WithoutCancel(ctx), i.md, i.item, i.events, *i.resp)
}

return nil
}
return fmt.Errorf("error handling generator response: %w", serr)
if len(i.resp.Generator) == 0 {
// This is the function result.
if err := e.finalize(ctx, i.md, i.events, i.f.GetSlug(), e.assignedQueueShard, *i.resp); err != nil {
l.Error("error running finish handler", "error", err)
}
return nil
}

// This is the function result.
if err := e.finalize(ctx, i.md, i.events, i.f.GetSlug(), e.assignedQueueShard, *i.resp); err != nil {
l.Error("error running finish handler", "error", err)
}

// Can be reached multiple times for parallel discovery steps
for _, e := range e.lifecycles {
go e.OnFunctionFinished(context.WithoutCancel(ctx), i.md, i.item, i.events, *i.resp)
// Can be reached multiple times for parallel discovery steps
for _, e := range e.lifecycles {
go e.OnFunctionFinished(context.WithoutCancel(ctx), i.md, i.item, i.events, *i.resp)
}
}

return nil
Expand Down Expand Up @@ -2418,6 +2430,17 @@ func (e *executor) handleGeneratorWaitForEvent(ctx context.Context, i *runInstan
if err != nil {
return fmt.Errorf("unable to parse wait for event opts: %w", err)
}

err = expressions.Validate(ctx, *opts.If)
if err != nil {
return state.WrapInStandardError(
err,
"InvalidExpression",
"Wait for event expression is invalid",
err.Error(),
)
}

expires, err := opts.Expires()
if err != nil {
return fmt.Errorf("unable to parse wait for event expires: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/run/trace_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@ func (l traceLifecycle) OnStepFinished(

if runErr != nil {
span.SetStatus(codes.Error, runErr.Error())
span.SetStepOutput(runErr.Error())
span.SetStepOutput(runErr)
return
}

Expand Down
11 changes: 7 additions & 4 deletions tests/golang/wait_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func TestWaitInvalidExpression(t *testing.T) {
step.WaitForEventOpts{
If: inngestgo.StrPtr("invalid"),
Name: "dummy",
Timeout: time.Second,
Timeout: 30 * time.Second,
},
)

Expand All @@ -331,7 +331,7 @@ func TestWaitInvalidExpression(t *testing.T) {
// Trigger the main function and successfully invoke the other function
_, err := inngestgo.Send(ctx, &event.Event{Name: evtName})
r.NoError(err)
c.WaitForRunStatus(ctx, t, "COMPLETED", &runID)
c.WaitForRunStatus(ctx, t, "FAILED", &runID)
}

func TestWaitInvalidExpressionSyntaxError(t *testing.T) {
Expand Down Expand Up @@ -360,7 +360,7 @@ func TestWaitInvalidExpressionSyntaxError(t *testing.T) {
step.WaitForEventOpts{
If: inngestgo.StrPtr("event.data.userId === async.data.userId"),
Name: "test/continue",
Timeout: time.Second,
Timeout: 30 * time.Second,
},
)

Expand All @@ -375,7 +375,10 @@ func TestWaitInvalidExpressionSyntaxError(t *testing.T) {
_, err := inngestgo.Send(ctx, &event.Event{Name: evtName})
r.NoError(err)
run := c.WaitForRunStatus(ctx, t, "FAILED", &runID)
assert.Equal(t, "{\"error\":{\"error\":\"CompileError: Could not compile expression\",\"name\":\"CompileError\",\"message\":\"Could not compile expression\",\"stack\":\"ERROR: \\u003cinput\\u003e:1:21: Syntax error: token recognition error at: '= '\\n | event.data.userId === async.data.userId\\n | ....................^\"}}", run.Output)
assert.Equal(t,
`{"error":{"error":"InvalidExpression: Wait for event expression is invalid","name":"InvalidExpression","message":"Wait for event expression is invalid","stack":"error validating expression: error compiling expression: ERROR: \u003cinput\u003e:1:21: Syntax error: token recognition error at: '= '\n | event.data.userId === async.data.userId\n | ....................^"}}`,
run.Output,
)
}

func TestManyWaitInvalidExpressions(t *testing.T) {
Expand Down
Loading