Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for discovery on container start error #22561

Merged
merged 1 commit into from
May 16, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,9 @@ func (daemon *Daemon) restore() error {
}
}
}

// Make sure networks are available before starting
daemon.waitForNetworks(c)
if err := daemon.containerStart(c); err != nil {
logrus.Errorf("Failed to start container %s: %s", c.ID, err)
}
Expand Down Expand Up @@ -423,6 +426,33 @@ func (daemon *Daemon) restore() error {
return nil
}

// waitForNetworks is used during daemon initialization when starting up containers
// It ensures that all of a container's networks are available before the daemon tries to start the container.
// In practice it just makes sure the discovery service is available for containers which use a network that require discovery.
func (daemon *Daemon) waitForNetworks(c *container.Container) {
if daemon.discoveryWatcher == nil {
return
}
// Make sure if the container has a network that requires discovery that the discovery service is available before starting
for netName := range c.NetworkSettings.Networks {
// If we get `ErrNoSuchNetwork` here, it can assumed that it is due to discovery not being ready
// Most likely this is because the K/V store used for discovery is in a container and needs to be started
if _, err := daemon.netController.NetworkByName(netName); err != nil {
if _, ok := err.(libnetwork.ErrNoSuchNetwork); !ok {
continue
}
// use a longish timeout here due to some slowdowns in libnetwork if the k/v store is on anything other than --net=host
// FIXME: why is this slow???
logrus.Debugf("Container %s waiting for network to be ready", c.Name)
select {
case <-daemon.discoveryWatcher.ReadyCh():
case <-time.After(60 * time.Second):
}
return
}
}
}

func (daemon *Daemon) mergeAndVerifyConfig(config *containertypes.Config, img *image.Image) error {
if img != nil && img.Config != nil {
if err := merge(config, img.Config); err != nil {
Expand Down
26 changes: 26 additions & 0 deletions daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,12 @@ func TestDaemonDiscoveryReload(t *testing.T) {
&discovery.Entry{Host: "127.0.0.1", Port: "3333"},
}

select {
case <-time.After(10 * time.Second):
t.Fatal("timeout waiting for discovery")
case <-daemon.discoveryWatcher.ReadyCh():
}

stopCh := make(chan struct{})
defer close(stopCh)
ch, errCh := daemon.discoveryWatcher.Watch(stopCh)
Expand Down Expand Up @@ -414,6 +420,13 @@ func TestDaemonDiscoveryReload(t *testing.T) {
if err := daemon.Reload(newConfig); err != nil {
t.Fatal(err)
}

select {
case <-time.After(10 * time.Second):
t.Fatal("timeout waiting for discovery")
case <-daemon.discoveryWatcher.ReadyCh():
}

ch, errCh = daemon.discoveryWatcher.Watch(stopCh)

select {
Expand Down Expand Up @@ -450,6 +463,13 @@ func TestDaemonDiscoveryReloadFromEmptyDiscovery(t *testing.T) {
if err := daemon.Reload(newConfig); err != nil {
t.Fatal(err)
}

select {
case <-time.After(10 * time.Second):
t.Fatal("timeout waiting for discovery")
case <-daemon.discoveryWatcher.ReadyCh():
}

stopCh := make(chan struct{})
defer close(stopCh)
ch, errCh := daemon.discoveryWatcher.Watch(stopCh)
Expand Down Expand Up @@ -488,6 +508,12 @@ func TestDaemonDiscoveryReloadOnlyClusterAdvertise(t *testing.T) {
if err := daemon.Reload(newConfig); err != nil {
t.Fatal(err)
}

select {
case <-daemon.discoveryWatcher.ReadyCh():
case <-time.After(10 * time.Second):
t.Fatal("Timeout waiting for discovery")
}
stopCh := make(chan struct{})
defer close(stopCh)
ch, errCh := daemon.discoveryWatcher.Watch(stopCh)
Expand Down
61 changes: 47 additions & 14 deletions daemon/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,24 @@ type discoveryReloader interface {
discovery.Watcher
Stop()
Reload(backend, address string, clusterOpts map[string]string) error
ReadyCh() <-chan struct{}
}

type daemonDiscoveryReloader struct {
backend discovery.Backend
ticker *time.Ticker
term chan bool
readyCh chan struct{}
}

func (d *daemonDiscoveryReloader) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) {
return d.backend.Watch(stopCh)
}

func (d *daemonDiscoveryReloader) ReadyCh() <-chan struct{} {
return d.readyCh
}

func discoveryOpts(clusterOpts map[string]string) (time.Duration, time.Duration, error) {
var (
heartbeat = defaultDiscoveryHeartbeat
Expand Down Expand Up @@ -87,38 +93,64 @@ func initDiscovery(backendAddress, advertiseAddress string, clusterOpts map[stri
backend: backend,
ticker: time.NewTicker(heartbeat),
term: make(chan bool),
readyCh: make(chan struct{}),
}
// We call Register() on the discovery backend in a loop for the whole lifetime of the daemon,
// but we never actually Watch() for nodes appearing and disappearing for the moment.
reloader.advertise(advertiseAddress)
go reloader.advertiseHeartbeat(advertiseAddress)
return reloader, nil
}

func (d *daemonDiscoveryReloader) advertise(address string) {
d.registerAddr(address)
go d.advertiseHeartbeat(address)
}

func (d *daemonDiscoveryReloader) registerAddr(addr string) {
if err := d.backend.Register(addr); err != nil {
log.Warnf("Registering as %q in discovery failed: %v", addr, err)
}
}

// advertiseHeartbeat registers the current node against the discovery backend using the specified
// address. The function never returns, as registration against the backend comes with a TTL and
// requires regular heartbeats.
func (d *daemonDiscoveryReloader) advertiseHeartbeat(address string) {
var ready bool
if err := d.initHeartbeat(address); err == nil {
ready = true
close(d.readyCh)
}

for {
select {
case <-d.ticker.C:
d.registerAddr(address)
if err := d.backend.Register(address); err != nil {
log.Warnf("Registering as %q in discovery failed: %v", address, err)
} else {
if !ready {
close(d.readyCh)
ready = true
}
}
case <-d.term:
return
}
}
}

// initHeartbeat is used to do the first heartbeat. It uses a tight loop until
// either the timeout period is reached or the heartbeat is successful and returns.
func (d *daemonDiscoveryReloader) initHeartbeat(address string) error {
// Setup a short ticker until the first heartbeat has succeeded
t := time.NewTicker(500 * time.Millisecond)
defer t.Stop()
// timeout makes sure that after a period of time we stop being so aggressive trying to reach the discovery service
timeout := time.After(60 * time.Second)

for {
select {
case <-timeout:
return errors.New("timeout waiting for initial discovery")
case <-d.term:
return errors.New("terminated")
case <-t.C:
if err := d.backend.Register(address); err == nil {
return nil
}
}
}
}

// Reload makes the watcher to stop advertising and reconfigures it to advertise in a new address.
func (d *daemonDiscoveryReloader) Reload(backendAddress, advertiseAddress string, clusterOpts map[string]string) error {
d.Stop()
Expand All @@ -130,8 +162,9 @@ func (d *daemonDiscoveryReloader) Reload(backendAddress, advertiseAddress string

d.backend = backend
d.ticker = time.NewTicker(heartbeat)
d.readyCh = make(chan struct{})

d.advertise(advertiseAddress)
go d.advertiseHeartbeat(advertiseAddress)
return nil
}

Expand Down