Skip to content

Commit 680adb9

Browse files
author
Michael Crosby
committedJun 27, 2014
Merge pull request moby#6232 from LK4D4/wait_functions_for_state
Wait functions for state
2 parents 3aa7521 + 57d86a5 commit 680adb9

10 files changed

+244
-101
lines changed
 

‎daemon/container.go

+24-51
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ type Container struct {
5353
Args []string
5454

5555
Config *runconfig.Config
56-
State State
56+
State *State
5757
Image string
5858

5959
NetworkSettings *NetworkSettings
@@ -74,8 +74,7 @@ type Container struct {
7474
daemon *Daemon
7575
MountLabel, ProcessLabel string
7676

77-
waitLock chan struct{}
78-
Volumes map[string]string
77+
Volumes map[string]string
7978
// Store rw/ro in a separate structure to preserve reverse-compatibility on-disk.
8079
// Easier than migrating older container configs :)
8180
VolumesRW map[string]bool
@@ -284,7 +283,6 @@ func (container *Container) Start() (err error) {
284283
if err := container.startLoggingToDisk(); err != nil {
285284
return err
286285
}
287-
container.waitLock = make(chan struct{})
288286

289287
return container.waitForStart()
290288
}
@@ -293,7 +291,7 @@ func (container *Container) Run() error {
293291
if err := container.Start(); err != nil {
294292
return err
295293
}
296-
container.Wait()
294+
container.State.WaitStop(-1 * time.Second)
297295
return nil
298296
}
299297

@@ -307,7 +305,7 @@ func (container *Container) Output() (output []byte, err error) {
307305
return nil, err
308306
}
309307
output, err = ioutil.ReadAll(pipe)
310-
container.Wait()
308+
container.State.WaitStop(-1 * time.Second)
311309
return output, err
312310
}
313311

@@ -467,6 +465,7 @@ func (container *Container) monitor(callback execdriver.StartCallback) error {
467465
if err != nil {
468466
utils.Errorf("Error running container: %s", err)
469467
}
468+
container.State.SetStopped(exitCode)
470469

471470
// Cleanup
472471
container.cleanup()
@@ -475,28 +474,17 @@ func (container *Container) monitor(callback execdriver.StartCallback) error {
475474
if container.Config.OpenStdin {
476475
container.stdin, container.stdinPipe = io.Pipe()
477476
}
478-
479477
if container.daemon != nil && container.daemon.srv != nil {
480478
container.daemon.srv.LogEvent("die", container.ID, container.daemon.repositories.ImageName(container.Image))
481479
}
482-
483-
close(container.waitLock)
484-
485480
if container.daemon != nil && container.daemon.srv != nil && container.daemon.srv.IsRunning() {
486-
container.State.SetStopped(exitCode)
487-
488-
// FIXME: there is a race condition here which causes this to fail during the unit tests.
489-
// If another goroutine was waiting for Wait() to return before removing the container's root
490-
// from the filesystem... At this point it may already have done so.
491-
// This is because State.setStopped() has already been called, and has caused Wait()
492-
// to return.
493-
// FIXME: why are we serializing running state to disk in the first place?
494-
//log.Printf("%s: Failed to dump configuration to the disk: %s", container.ID, err)
481+
// FIXME: here is race condition between two RUN instructions in Dockerfile
482+
// because they share same runconfig and change image. Must be fixed
483+
// in server/buildfile.go
495484
if err := container.ToDisk(); err != nil {
496-
utils.Errorf("Error dumping container state to disk: %s\n", err)
485+
utils.Errorf("Error dumping container %s state to disk: %s\n", container.ID, err)
497486
}
498487
}
499-
500488
return err
501489
}
502490

@@ -532,6 +520,7 @@ func (container *Container) cleanup() {
532520
}
533521

534522
func (container *Container) KillSig(sig int) error {
523+
utils.Debugf("Sending %d to %s", sig, container.ID)
535524
container.Lock()
536525
defer container.Unlock()
537526

@@ -577,17 +566,17 @@ func (container *Container) Kill() error {
577566
}
578567

579568
// 2. Wait for the process to die, in last resort, try to kill the process directly
580-
if err := container.WaitTimeout(10 * time.Second); err != nil {
569+
if _, err := container.State.WaitStop(10 * time.Second); err != nil {
581570
// Ensure that we don't kill ourselves
582-
if pid := container.State.Pid; pid != 0 {
571+
if pid := container.State.GetPid(); pid != 0 {
583572
log.Printf("Container %s failed to exit within 10 seconds of kill - trying direct SIGKILL", utils.TruncateID(container.ID))
584573
if err := syscall.Kill(pid, 9); err != nil {
585574
return err
586575
}
587576
}
588577
}
589578

590-
container.Wait()
579+
container.State.WaitStop(-1 * time.Second)
591580
return nil
592581
}
593582

@@ -605,11 +594,11 @@ func (container *Container) Stop(seconds int) error {
605594
}
606595

607596
// 2. Wait for the process to exit on its own
608-
if err := container.WaitTimeout(time.Duration(seconds) * time.Second); err != nil {
597+
if _, err := container.State.WaitStop(time.Duration(seconds) * time.Second); err != nil {
609598
log.Printf("Container %v failed to exit within %d seconds of SIGTERM - using the force", container.ID, seconds)
610599
// 3. If it doesn't, then send SIGKILL
611600
if err := container.Kill(); err != nil {
612-
container.Wait()
601+
container.State.WaitStop(-1 * time.Second)
613602
return err
614603
}
615604
}
@@ -630,12 +619,6 @@ func (container *Container) Restart(seconds int) error {
630619
return container.Start()
631620
}
632621

633-
// Wait blocks until the container stops running, then returns its exit code.
634-
func (container *Container) Wait() int {
635-
<-container.waitLock
636-
return container.State.GetExitCode()
637-
}
638-
639622
func (container *Container) Resize(h, w int) error {
640623
return container.command.Terminal.Resize(h, w)
641624
}
@@ -678,21 +661,6 @@ func (container *Container) Export() (archive.Archive, error) {
678661
nil
679662
}
680663

681-
func (container *Container) WaitTimeout(timeout time.Duration) error {
682-
done := make(chan bool, 1)
683-
go func() {
684-
container.Wait()
685-
done <- true
686-
}()
687-
688-
select {
689-
case <-time.After(timeout):
690-
return fmt.Errorf("Timed Out")
691-
case <-done:
692-
return nil
693-
}
694-
}
695-
696664
func (container *Container) Mount() error {
697665
return container.daemon.Mount(container)
698666
}
@@ -1103,9 +1071,7 @@ func (container *Container) startLoggingToDisk() error {
11031071
}
11041072

11051073
func (container *Container) waitForStart() error {
1106-
callbackLock := make(chan struct{})
11071074
callback := func(command *execdriver.Command) {
1108-
container.State.SetRunning(command.Pid())
11091075
if command.Tty {
11101076
// The callback is called after the process Start()
11111077
// so we are in the parent process. In TTY mode, stdin/out/err is the PtySlace
@@ -1117,16 +1083,23 @@ func (container *Container) waitForStart() error {
11171083
if err := container.ToDisk(); err != nil {
11181084
utils.Debugf("%s", err)
11191085
}
1120-
close(callbackLock)
1086+
container.State.SetRunning(command.Pid())
11211087
}
11221088

11231089
// We use a callback here instead of a goroutine and an chan for
11241090
// syncronization purposes
11251091
cErr := utils.Go(func() error { return container.monitor(callback) })
11261092

1093+
waitStart := make(chan struct{})
1094+
1095+
go func() {
1096+
container.State.WaitRunning(-1 * time.Second)
1097+
close(waitStart)
1098+
}()
1099+
11271100
// Start should not return until the process is actually running
11281101
select {
1129-
case <-callbackLock:
1102+
case <-waitStart:
11301103
case err := <-cErr:
11311104
return err
11321105
}

‎daemon/daemon.go

+3-8
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ func (daemon *Daemon) containerRoot(id string) string {
138138
// Load reads the contents of a container from disk
139139
// This is typically done at startup.
140140
func (daemon *Daemon) load(id string) (*Container, error) {
141-
container := &Container{root: daemon.containerRoot(id)}
141+
container := &Container{root: daemon.containerRoot(id), State: NewState()}
142142
if err := container.FromDisk(); err != nil {
143143
return nil, err
144144
}
@@ -236,12 +236,6 @@ func (daemon *Daemon) register(container *Container, updateSuffixarray bool, con
236236
}
237237
}
238238
}
239-
} else {
240-
// When the container is not running, we still initialize the waitLock
241-
// chan and close it. Receiving on nil chan blocks whereas receiving on a
242-
// closed chan does not. In this case we do not want to block.
243-
container.waitLock = make(chan struct{})
244-
close(container.waitLock)
245239
}
246240
return nil
247241
}
@@ -588,6 +582,7 @@ func (daemon *Daemon) newContainer(name string, config *runconfig.Config, img *i
588582
Name: name,
589583
Driver: daemon.driver.String(),
590584
ExecDriver: daemon.execDriver.Name(),
585+
State: NewState(),
591586
}
592587
container.root = daemon.containerRoot(container.ID)
593588

@@ -900,7 +895,7 @@ func (daemon *Daemon) shutdown() error {
900895
if err := c.KillSig(15); err != nil {
901896
utils.Debugf("kill 15 error for %s - %s", c.ID, err)
902897
}
903-
c.Wait()
898+
c.State.WaitStop(-1 * time.Second)
904899
utils.Debugf("container stopped %s", c.ID)
905900
}()
906901
}

‎daemon/state.go

+92-23
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,13 @@ type State struct {
1616
ExitCode int
1717
StartedAt time.Time
1818
FinishedAt time.Time
19+
waitChan chan struct{}
20+
}
21+
22+
func NewState() *State {
23+
return &State{
24+
waitChan: make(chan struct{}),
25+
}
1926
}
2027

2128
// String returns a human-readable description of the state
@@ -35,56 +42,118 @@ func (s *State) String() string {
3542
return fmt.Sprintf("Exited (%d) %s ago", s.ExitCode, units.HumanDuration(time.Now().UTC().Sub(s.FinishedAt)))
3643
}
3744

45+
func wait(waitChan <-chan struct{}, timeout time.Duration) error {
46+
if timeout < 0 {
47+
<-waitChan
48+
return nil
49+
}
50+
select {
51+
case <-time.After(timeout):
52+
return fmt.Errorf("Timed out: %v", timeout)
53+
case <-waitChan:
54+
return nil
55+
}
56+
}
57+
58+
// WaitRunning waits until state is running. If state already running it returns
59+
// immediatly. If you want wait forever you must supply negative timeout.
60+
// Returns pid, that was passed to SetRunning
61+
func (s *State) WaitRunning(timeout time.Duration) (int, error) {
62+
s.RLock()
63+
if s.IsRunning() {
64+
pid := s.Pid
65+
s.RUnlock()
66+
return pid, nil
67+
}
68+
waitChan := s.waitChan
69+
s.RUnlock()
70+
if err := wait(waitChan, timeout); err != nil {
71+
return -1, err
72+
}
73+
return s.GetPid(), nil
74+
}
75+
76+
// WaitStop waits until state is stopped. If state already stopped it returns
77+
// immediatly. If you want wait forever you must supply negative timeout.
78+
// Returns exit code, that was passed to SetStopped
79+
func (s *State) WaitStop(timeout time.Duration) (int, error) {
80+
s.RLock()
81+
if !s.Running {
82+
exitCode := s.ExitCode
83+
s.RUnlock()
84+
return exitCode, nil
85+
}
86+
waitChan := s.waitChan
87+
s.RUnlock()
88+
if err := wait(waitChan, timeout); err != nil {
89+
return -1, err
90+
}
91+
return s.GetExitCode(), nil
92+
}
93+
3894
func (s *State) IsRunning() bool {
3995
s.RLock()
40-
defer s.RUnlock()
96+
res := s.Running
97+
s.RUnlock()
98+
return res
99+
}
41100

42-
return s.Running
101+
func (s *State) GetPid() int {
102+
s.RLock()
103+
res := s.Pid
104+
s.RUnlock()
105+
return res
43106
}
44107

45108
func (s *State) GetExitCode() int {
46109
s.RLock()
47-
defer s.RUnlock()
48-
49-
return s.ExitCode
110+
res := s.ExitCode
111+
s.RUnlock()
112+
return res
50113
}
51114

52115
func (s *State) SetRunning(pid int) {
53116
s.Lock()
54-
defer s.Unlock()
55-
56-
s.Running = true
57-
s.Paused = false
58-
s.ExitCode = 0
59-
s.Pid = pid
60-
s.StartedAt = time.Now().UTC()
117+
if !s.Running {
118+
s.Running = true
119+
s.Paused = false
120+
s.ExitCode = 0
121+
s.Pid = pid
122+
s.StartedAt = time.Now().UTC()
123+
close(s.waitChan) // fire waiters for start
124+
s.waitChan = make(chan struct{})
125+
}
126+
s.Unlock()
61127
}
62128

63129
func (s *State) SetStopped(exitCode int) {
64130
s.Lock()
65-
defer s.Unlock()
66-
67-
s.Running = false
68-
s.Pid = 0
69-
s.FinishedAt = time.Now().UTC()
70-
s.ExitCode = exitCode
131+
if s.Running {
132+
s.Running = false
133+
s.Pid = 0
134+
s.FinishedAt = time.Now().UTC()
135+
s.ExitCode = exitCode
136+
close(s.waitChan) // fire waiters for stop
137+
s.waitChan = make(chan struct{})
138+
}
139+
s.Unlock()
71140
}
72141

73142
func (s *State) SetPaused() {
74143
s.Lock()
75-
defer s.Unlock()
76144
s.Paused = true
145+
s.Unlock()
77146
}
78147

79148
func (s *State) SetUnpaused() {
80149
s.Lock()
81-
defer s.Unlock()
82150
s.Paused = false
151+
s.Unlock()
83152
}
84153

85154
func (s *State) IsPaused() bool {
86155
s.RLock()
87-
defer s.RUnlock()
88-
89-
return s.Paused
156+
res := s.Paused
157+
s.RUnlock()
158+
return res
90159
}

0 commit comments

Comments
 (0)