Skip to content

Commit

Permalink
fix(turborepo): properly tag errors when running in GH Actions (verce…
Browse files Browse the repository at this point in the history
…l#5435)

### Description

- log grouping now streams log lines to a single buffer, each line is
tagged on whether it came from stdout or stderr
- log lines are replayed in approximately the correct order (there is
always a natural race condition since they are separate streams)
 - command failures use github prefixing when running on github actions
 - failures _should_ appear inside the correct grouping in the output

### Testing Instructions

 - added a new integration test for a failing run in github actions

---------

Co-authored-by: Greg Soltis <Greg Soltis>
  • Loading branch information
Greg Soltis authored Jul 7, 2023
1 parent 6bd45fe commit 9d5a094
Show file tree
Hide file tree
Showing 5 changed files with 222 additions and 41 deletions.
122 changes: 86 additions & 36 deletions cli/internal/run/real_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,68 @@ func (tsob *threadsafeOutputBuffer) Bytes() []byte {
return tsob.buf.Bytes()
}

type logLine struct {
isStdout bool
line []byte
}

// logBuffer holds the log lines for a task, tagged to stdout or stderr
type logBuffer struct {
mu sync.Mutex
lines []logLine
}

// LogLine appends a log line to the log buffer
func (lb *logBuffer) LogLine(line []byte, isStdout bool) {
lb.mu.Lock()
defer lb.mu.Unlock()
lb.lines = append(lb.lines, logLine{isStdout, line})
}

// Drain writes the contents of the logBuffer to the appropriate output stream
func (lb *logBuffer) Drain(stdout io.Writer, stderr io.Writer) error {
for _, line := range lb.lines {
if line.isStdout {
if _, err := stdout.Write(line.line); err != nil {
return err
}
} else {
if _, err := stderr.Write(line.line); err != nil {
return err
}
}
}
return nil
}

// StdoutWriter returns a writer tagged to stdout
func (lb *logBuffer) StdoutWriter() *logBufferWriter {
return &logBufferWriter{
isStdout: true,
logBuffer: lb,
}
}

// StderrWriter returns a writer tagged to stderr
func (lb *logBuffer) StderrWriter() *logBufferWriter {
return &logBufferWriter{
isStdout: false,
logBuffer: lb,
}
}

type logBufferWriter struct {
isStdout bool
logBuffer *logBuffer
}

// Write implements io.Writer.Write for logBufferWriter
func (lbw *logBufferWriter) Write(bytes []byte) (int, error) {
n := len(bytes)
lbw.logBuffer.LogLine(bytes, lbw.isStdout)
return n, nil
}

// RealRun executes a set of tasks
func RealRun(
ctx gocontext.Context,
Expand Down Expand Up @@ -124,25 +186,19 @@ func RealRun(
}

taskCount := len(engine.TaskGraph.Vertices())
logChan := make(chan taskLogContext, taskCount)
logChan := make(chan *logBuffer, taskCount)
logWaitGroup := sync.WaitGroup{}
isGrouped := rs.Opts.runOpts.LogOrder == "grouped"

if isGrouped {
logWaitGroup.Add(1)
go func() {
for logContext := range logChan {

outBytes := logContext.outBuf.Bytes()
errBytes := logContext.errBuf.Bytes()
for logBuffer := range logChan {

_, errOut := os.Stdout.Write(outBytes)
_, errErr := os.Stderr.Write(errBytes)

if errOut != nil || errErr != nil {
ec.ui.Error("Failed to output some of the logs.")
err := logBuffer.Drain(os.Stdout, os.Stderr)
if err != nil {
ec.ui.Error(fmt.Sprintf("Failed to output some of the logs: %v", err))
}

}
logWaitGroup.Done()
}()
Expand All @@ -151,15 +207,14 @@ func RealRun(
taskSummaryMutex := sync.Mutex{}
taskSummaries := []*runsummary.TaskSummary{}
execFunc := func(ctx gocontext.Context, packageTask *nodes.PackageTask, taskSummary *runsummary.TaskSummary) error {
outBuf := &bytes.Buffer{}
errBuf := &bytes.Buffer{}

var outWriter io.Writer = os.Stdout
var errWriter io.Writer = os.Stderr

logBuffer := &logBuffer{}

if isGrouped {
outWriter = outBuf
errWriter = errBuf
outWriter = logBuffer.StdoutWriter()
errWriter = logBuffer.StderrWriter()
}

var spacesLogBuffer *threadsafeOutputBuffer
Expand Down Expand Up @@ -194,10 +249,7 @@ func RealRun(
runSummary.CloseTask(taskSummary, logBytes)
}
if isGrouped {
logChan <- taskLogContext{
outBuf: outBuf,
errBuf: errBuf,
}
logChan <- logBuffer
}

// Return the error when there is one
Expand All @@ -214,6 +266,10 @@ func RealRun(

visitorFn := g.GetPackageTaskVisitor(ctx, engine.TaskGraph, rs.Opts.runOpts.FrameworkInference, globalEnvMode, getArgs, base.Logger, execFunc)
errs := engine.Execute(visitorFn, execOpts)
if isGrouped {
close(logChan)
logWaitGroup.Wait()
}

// Track if we saw any child with a non-zero exit code
exitCode := 0
Expand Down Expand Up @@ -257,11 +313,6 @@ func RealRun(
}
}

if isGrouped {
close(logChan)
logWaitGroup.Wait()
}

if err := runSummary.Close(ctx, exitCode, g.WorkspaceInfos, base.UI); err != nil {
// We don't need to throw an error, but we can warn on this.
// Note: this method doesn't actually return an error for Real Runs at the time of writing.
Expand All @@ -276,11 +327,6 @@ func RealRun(
return nil
}

type taskLogContext struct {
outBuf *bytes.Buffer
errBuf *bytes.Buffer
}

type execContext struct {
colorCache *colorcache.ColorCache
runSummary runsummary.Meta
Expand Down Expand Up @@ -357,6 +403,13 @@ func (ec *execContext) exec(ctx gocontext.Context, packageTask *nodes.PackageTas

if ec.rs.Opts.runOpts.IsGithubActions {
ui.Output(fmt.Sprintf("::group::%s", packageTask.OutputPrefix(ec.isSinglePackage)))
prefixedUI.WarnPrefix = "::warn::"
prefixedUI.ErrorPrefix = "::error::"
defer func() {
// We don't use the prefixedUI here because the prefix in this case would include
// the ::group::<taskID>, and we explicitly want to close the github group
ui.Output("::endgroup::")
}()
}

cacheStatus, err := taskCache.RestoreOutputs(ctx, prefixedUI, progressLogger)
Expand Down Expand Up @@ -447,11 +500,6 @@ func (ec *execContext) exec(ctx gocontext.Context, packageTask *nodes.PackageTas

closeOutputs := func() error {
var closeErrors []error
if ec.rs.Opts.runOpts.IsGithubActions {
// We don't use the prefixedUI here because the prefix in this case would include
// the ::group::<taskID>, and we explicitly want to close the github group
ui.Output("::endgroup::")
}

if err := logStreamerOut.Close(); err != nil {
closeErrors = append(closeErrors, errors.Wrap(err, "log stdout"))
Expand All @@ -475,7 +523,9 @@ func (ec *execContext) exec(ctx gocontext.Context, packageTask *nodes.PackageTas

// Run the command
if err := ec.processes.Exec(cmd); err != nil {
// close off our outputs. We errored, so we mostly don't care if we fail to close
// ensure we close off our outputs. We errored, so we mostly don't care if we fail to close
// We don't close them directly because we're potentially going to output some errors or
// warnings that we want grouped with the task output.
_ = closeOutputs()
// if we already know we're in the process of exiting,
// we don't need to record an error to that effect.
Expand Down
102 changes: 101 additions & 1 deletion cli/internal/ui/ui_factory.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
package ui

import (
"bufio"
"errors"
"fmt"
"io"
"os"
"os/signal"
"strings"

"github.com/bgentry/speakeasy"
"github.com/fatih/color"
"github.com/mattn/go-isatty"
"github.com/mitchellh/cli"
)

Expand All @@ -16,9 +24,101 @@ type Factory interface {
type BasicUIFactory struct {
}

// basicUI is an implementation of Ui that just outputs to the given
// writer. This UI is not threadsafe by default, but you can wrap it
// in a ConcurrentUi to make it safe.
//
// Inlined from cli.Ui to fuse newlines to lines being logged. This is
// probably not the optimal way to do it, but it works for now.
type basicUI struct {
Reader io.Reader
Writer io.Writer
ErrorWriter io.Writer
}

// Ask implements ui.Cli.Ask for BasicUi
func (u *basicUI) Ask(query string) (string, error) {
return u.ask(query, false)
}

// AskSecret implements ui.Cli.AskSecret for BasicUi
func (u *basicUI) AskSecret(query string) (string, error) {
return u.ask(query, true)
}

func (u *basicUI) ask(query string, secret bool) (string, error) {
if _, err := fmt.Fprint(u.Writer, query+" "); err != nil {
return "", err
}

// Register for interrupts so that we can catch it and immediately
// return...
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt)
defer signal.Stop(sigCh)

// Ask for input in a go-routine so that we can ignore it.
errCh := make(chan error, 1)
lineCh := make(chan string, 1)
go func() {
var line string
var err error
if secret && isatty.IsTerminal(os.Stdin.Fd()) {
line, err = speakeasy.Ask("")
} else {
r := bufio.NewReader(u.Reader)
line, err = r.ReadString('\n')
}
if err != nil {
errCh <- err
return
}

lineCh <- strings.TrimRight(line, "\r\n")
}()

select {
case err := <-errCh:
return "", err
case line := <-lineCh:
return line, nil
case <-sigCh:
// Print a newline so that any further output starts properly
// on a new line.
fmt.Fprintln(u.Writer)

return "", errors.New("interrupted")
}
}

// Error implements ui.Cli.Error for BasicUi
func (u *basicUI) Error(message string) {
w := u.Writer
if u.ErrorWriter != nil {
w = u.ErrorWriter
}

fmt.Fprintf(w, "%v\n", message)
}

// Info implements ui.Cli.Info for BasicUi
func (u *basicUI) Info(message string) {
u.Output(message)
}

// Output implements ui.Cli.Output for BasicUi
func (u *basicUI) Output(message string) {
fmt.Fprintf(u.Writer, "%v\n", message)
}

// Warn implements ui.Cli.Warn for BasicUi
func (u *basicUI) Warn(message string) {
u.Error(message)
}

// Build builds a cli.BasicUi from input, output and error IOs
func (factory *BasicUIFactory) Build(in io.Reader, out io.Writer, err io.Writer) cli.Ui {
return &cli.BasicUi{
return &basicUI{
Reader: in,
Writer: out,
ErrorWriter: err,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"name": "util",
"scripts": {
"build": "sleep 0.5 && echo 'building' && sleep 1 && echo 'completed'"
"build": "sleep 0.5 && echo 'building' && sleep 1 && echo 'completed'",
"fail": "echo 'failing'; exit 1"
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"$schema": "https://turbo.build/schema.json",
"pipeline": {
"build": {
}
"build": {},
"fail": {}
}
}
32 changes: 31 additions & 1 deletion turborepo-tests/integration/tests/ordered/github.t
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ because otherwise prysk interprets them as multiline commands
done
::endgroup::
::group::util:build
cache bypass, force executing 0bc36a8a234e31d4
cache bypass, force executing 90d7154e362e3386

>\sbuild (re)
>\ssleep 0.5 && echo 'building' && sleep 1 && echo 'completed' (re)
Expand All @@ -33,3 +33,33 @@ because otherwise prysk interprets them as multiline commands
Cached: 0 cached, 2 total
Time:\s*[\.0-9]+m?s (re)

Verify that errors are grouped properly
$ ${TURBO} run fail
\xe2\x80\xa2 Packages in scope: my-app, util (esc)
\xe2\x80\xa2 Running fail in 2 packages (esc)
\xe2\x80\xa2 Remote caching disabled (esc)
::group::util:fail
cache miss, executing 9bf2c727d81cf834

\> fail (re)
\> echo 'failing'; exit 1 (re)

failing
npm ERR! Lifecycle script `fail` failed with error:
npm ERR! Error: command failed
npm ERR! in workspace: util
npm ERR\! at location: (.*)/packages/util (re)
::error::ERROR: command finished with error: command \((.*)/packages/util\) npm run fail exited \(1\) (re)
::endgroup::
command \(.*/packages/util\) npm run fail exited \(1\) (re)

Tasks: 0 successful, 1 total
Cached: 0 cached, 1 total
Time:\s*[\.0-9]+m?s (re)
Failed: util#fail

ERROR run failed: command exited (1)
[1]



0 comments on commit 9d5a094

Please sign in to comment.