forked from gruntwork-io/terragrunt
-
Notifications
You must be signed in to change notification settings - Fork 2
/
running_module.go
353 lines (301 loc) · 12.9 KB
/
running_module.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
package configstack
import (
"bytes"
"fmt"
"io"
"math"
"strings"
"sync"
"github.com/coveooss/terragrunt/v2/shell"
"github.com/coveooss/terragrunt/v2/tgerrors"
"github.com/coveooss/terragrunt/v2/util"
"github.com/fatih/color"
)
// ModuleStatus represents the status of a module that we are trying to apply as part of the apply-all or destroy-all command
type ModuleStatus int
const (
normalExitCode = 0
errorExitCode = 1
undefinedExitCode = -1
)
const (
waiting ModuleStatus = iota
running
finished
)
// CreateMultiErrors declared as a variable instead of a function allows us to override the function used to compose multi error object.
// It is used if a command wants to change the default behavior of the severity analysis that is implemented by default.
var CreateMultiErrors = func(errs []error) error {
return errMulti{Errors: errs}
}
// Represents a module we are trying to "run" (i.e. apply or destroy) as part of the apply-all or destroy-all command
type runningModule struct {
Module *TerraformModule
Status ModuleStatus
Err error
DependencyDone chan *runningModule
Dependencies map[string]*runningModule
NotifyWhenDone []*runningModule
OutStream bytes.Buffer
Writer io.Writer
Handler ModuleHandler
Mutex *sync.Mutex // A shared mutex pointer to ensure that there is no concurrency problem when job finish and report
bufferIndex int // Indicates the position of the buffer that has been flushed to the logger
workerID int
}
func (module runningModule) displayName() string {
format := int(math.Log10(math.Min(float64(nbWorkers()), 1)) + 1)
return fmt.Sprintf("Worker #%0*d: %s", format, module.workerID, util.GetPathRelativeToWorkingDirMax(module.Module.Path, 3))
}
// This controls in what order dependencies should be enforced between modules
type dependencyOrder int
const (
// NormalOrder describes the normal path for module execution.
NormalOrder dependencyOrder = iota
// ReverseOrder is used to execute modules in reverse order.
ReverseOrder
)
// ModuleHandler is a function prototype to inject interaction during the processing.
// The function receive the current module, its output and its error in parameter.
// Normally, the handler should return the same error as received in parameter, but it is possible to
// alter the normal course of the process by changing the error result.
type ModuleHandler func(TerraformModule, string, error) (string, error)
// Create a new RunningModule struct for the given module. This will initialize all fields to reasonable defaults,
// except for the Dependencies and NotifyWhenDone, both of which will be empty. You should fill these using a
// function such as crossLinkDependencies.
func newRunningModule(module *TerraformModule, mutex *sync.Mutex) *runningModule {
return &runningModule{
Module: module,
Status: waiting,
DependencyDone: make(chan *runningModule, 1000), // Use a huge buffer to ensure senders are never blocked
Dependencies: map[string]*runningModule{},
NotifyWhenDone: []*runningModule{},
Writer: module.TerragruntOptions.Writer,
Mutex: mutex,
}
}
// Run the given map of module path to runningModule. To "run" a module, execute the RunTerragrunt command in its
// TerragruntOptions object. The modules will be executed in an order determined by their inter-dependencies, using
// as much concurrency as possible.
func runModules(modules []*TerraformModule) error {
return runModulesWithHandler(modules, nil, NormalOrder)
}
// Run the given map of module path to runningModule. To "run" a module, execute the RunTerragrunt command in its
// TerragruntOptions object. The modules will be executed in the reverse order of their inter-dependencies, using
// as much concurrency as possible.
func runModulesReverseOrder(modules []*TerraformModule) error {
return runModulesWithHandler(modules, nil, ReverseOrder)
}
// Run the given map of module path to runningModule. To "run" a module, execute the RunTerragrunt command in its
// TerragruntOptions object. The modules will be executed in an order determined by their inter-dependencies, using
// as much concurrency as possible.
// This version accepts a function as parameter (see: ModuleHander). The handler is called when the command is
// completed (either succeeded or failed).
func runModulesWithHandler(modules []*TerraformModule, handler ModuleHandler, order dependencyOrder) error {
runningModules, err := toRunningModules(modules, order)
if err != nil {
return err
}
if len(modules) != 0 {
nbWorkers := modules[0].TerragruntOptions.NbWorkers
// We don't gain anything by running on more workers than there are modules
if nbWorkers <= 0 || nbWorkers > len(runningModules) {
nbWorkers = len(runningModules)
}
initWorkers(nbWorkers)
}
var waitGroup sync.WaitGroup
for _, module := range runningModules {
waitGroup.Add(1)
module.Handler = handler
go func(module *runningModule) {
var completed bool
defer func() {
waitGroup.Done()
completed = true
}()
logCatcher := module.Module.TerragruntOptions.Logger.Copy().SetStdout(&module.OutStream)
module.Module.TerragruntOptions.Writer = logCatcher
module.Module.TerragruntOptions.ErrWriter = logCatcher
go module.OutputPeriodicLogs(&completed) // Flush the output buffers periodically to confirm that the process is still alive
module.runModuleWhenReady()
}(module)
}
waitGroup.Wait()
return collectErrors(runningModules)
}
// Convert the list of modules to a map from module path to a runningModule struct. This struct contains information
// about executing the module, such as whether it has finished running or not and any errors that happened. Note that
// this does NOT actually run the module. For that, see the runModules method.
func toRunningModules(modules []*TerraformModule, dependencyOrder dependencyOrder) (map[string]*runningModule, error) {
var mutex sync.Mutex
runningModules := map[string]*runningModule{}
for _, module := range modules {
runningModules[module.Path] = newRunningModule(module, &mutex)
}
return crossLinkRunningModulesDependencies(runningModules, dependencyOrder)
}
// Loop through the map of runningModules and for each module M:
//
// - If dependencyOrder is NormalOrder, plug in all the modules M depends on into the Dependencies field and all the
// modules that depend on M into the NotifyWhenDone field.
// - If dependencyOrder is ReverseOrder, do the reverse.
func crossLinkRunningModulesDependencies(modules map[string]*runningModule, dependencyOrder dependencyOrder) (map[string]*runningModule, error) {
for _, module := range modules {
for _, dependency := range module.Module.Dependencies {
runningDependency, hasDependency := modules[dependency.Path]
if !hasDependency {
return modules, tgerrors.WithStackTrace(errDependencyNotFoundWhileCrossLinking{module, dependency})
}
if dependencyOrder == NormalOrder {
module.Dependencies[runningDependency.Module.Path] = runningDependency
runningDependency.NotifyWhenDone = append(runningDependency.NotifyWhenDone, module)
} else {
runningDependency.Dependencies[module.Module.Path] = module
module.NotifyWhenDone = append(module.NotifyWhenDone, runningDependency)
}
}
}
return modules, nil
}
// Collect the errors from the given modules and return a single error object to represent them, or nil if no errors
// occurred
func collectErrors(modules map[string]*runningModule) error {
errs := []error{}
for _, module := range modules {
if module.Err != nil {
errs = append(errs, module.Err)
}
}
if len(errs) == 0 {
return nil
}
return tgerrors.WithStackTrace(CreateMultiErrors(errs))
}
// Run a module once all of its dependencies have finished executing.
func (module *runningModule) dependencies() []string {
result := make([]string, 0, len(module.Dependencies))
for _, dep := range module.Dependencies {
result = append(result, util.GetPathRelativeToWorkingDirMax(dep.Module.Path, 3))
}
return result
}
// Run a module once all of its dependencies have finished executing.
func (module *runningModule) runModuleWhenReady() {
err := module.waitForDependencies()
if err == nil {
module.workerID = waitWorker()
defer func() { freeWorker(module.workerID) }()
err = module.runNow()
}
module.moduleFinished(err)
}
// Wait for all of this modules dependencies to finish executing. Return an error if any of those dependencies complete
// with an error. Return immediately if this module has no dependencies.
func (module *runningModule) waitForDependencies() error {
log := module.Module.TerragruntOptions.Logger
if len(module.Dependencies) > 0 {
log.Debugf("Module %s must wait for %s to finish", module.displayName(), strings.Join(module.dependencies(), ", "))
}
for len(module.Dependencies) > 0 {
doneDependency := <-module.DependencyDone
delete(module.Dependencies, doneDependency.Module.Path)
depPath := util.GetPathRelativeToWorkingDirMax(doneDependency.Module.Path, 3)
if doneDependency.Err != nil {
if module.Module.TerragruntOptions.IgnoreDependencyErrors {
log.Warningf("Dependency %[1]s of module %[2]s just finished with an error. Module %[2]s will have to return an error too. However, because of --terragrunt-ignore-dependency-errors, module %[2]s will run anyway.", depPath, module.displayName())
} else {
log.Warningf("Dependency %[1]s of module %[2]s just finished with an error. Module %[2]s will have to return an error too.", depPath, module.displayName())
return dependencyFinishedWithError{module.Module, doneDependency.Module, doneDependency.Err}
}
} else {
var moreDependencies string
if len(module.Dependencies) > 0 {
moreDependencies = fmt.Sprintf(" Module %s must still wait for %s.", module.displayName(), strings.Join(module.dependencies(), ", "))
}
log.Debugf("Dependency %s of module %s just finished successfully.%s", depPath, module.displayName(), moreDependencies)
}
}
return nil
}
// Run a module right now by executing the RunTerragrunt command of its TerragruntOptions field.
func (module *runningModule) runNow() error {
module.Status = running
if module.Module.AssumeAlreadyApplied {
module.Module.TerragruntOptions.Logger.Debugf("Assuming module %s has already been applied and skipping it", module.displayName())
return nil
}
module.Module.TerragruntOptions.Logger.Debugf("Running module %s now", module.displayName())
return module.Module.TerragruntOptions.RunTerragrunt(module.Module.TerragruntOptions)
}
var separator = strings.Repeat("-", 132)
// Record that a module has finished executing and notify all of this module's dependencies
func (module *runningModule) moduleFinished(moduleErr error) {
status := "successfully!"
logFinish := module.Module.TerragruntOptions.Logger.Infof
output := module.OutStream.String()
if module.Handler != nil {
output, moduleErr = module.Handler(*module.Module, output, moduleErr)
}
if moduleErr != nil {
status = fmt.Sprintf("with an error: %v", moduleErr)
logFinish = module.Module.TerragruntOptions.Logger.Errorf
}
module.Mutex.Lock()
defer module.Mutex.Unlock()
logFinish("Module %s has finished %s", module.displayName(), status)
if output == "" {
module.Module.TerragruntOptions.Logger.Info("No output")
} else {
fmt.Fprintln(module.Writer, color.HiGreenString("%s\n%v\n", separator, module.displayName()))
fmt.Fprintln(module.Writer, output)
}
module.Status = finished
module.Err = moduleErr
for _, toNotify := range module.NotifyWhenDone {
toNotify.DependencyDone <- module
}
}
// Custom error types
type dependencyFinishedWithError struct {
Module *TerraformModule
Dependency *TerraformModule
Err error
}
func (e dependencyFinishedWithError) Error() string {
return fmt.Sprintf("Cannot process module %s because one of its dependencies, %s, finished with an error: %s", e.Module, e.Dependency, e.Err)
}
func (e dependencyFinishedWithError) ExitStatus() (int, error) {
if exitCode, err := shell.GetExitCode(e.Err); err == nil {
return exitCode, nil
}
return -1, e
}
type errMulti struct {
Errors []error
}
func (e errMulti) Error() string {
errorStrings := []string{}
for _, err := range e.Errors {
errorStrings = append(errorStrings, err.Error())
}
return fmt.Sprintf("Encountered the following errors:\n%s", strings.Join(errorStrings, "\n"))
}
func (e errMulti) ExitStatus() (int, error) {
exitCode := normalExitCode
for i := range e.Errors {
if code, err := shell.GetExitCode(e.Errors[i]); err != nil {
return undefinedExitCode, e
} else if code > exitCode {
exitCode = code
}
}
return exitCode, nil
}
type errDependencyNotFoundWhileCrossLinking struct {
Module *runningModule
Dependency *TerraformModule
}
func (err errDependencyNotFoundWhileCrossLinking) Error() string {
return fmt.Sprintf("Module %v specifies a dependency on module %v, but could not find that module while cross-linking dependencies. This is most likely a bug in Terragrunt. Please report it.", err.Module, err.Dependency)
}