Skip to content

Commit

Permalink
Fix for swarm/libnetwork init race condition
Browse files Browse the repository at this point in the history
This change cleans up the SetClusterProvider method.
Swarm calls the SetClusterProvider to pass to libnetwork the pointer
of the provider from which libnetwork can fetch all the information to
initialize the internal agent.

The method can be and is called multiple times passing the same value,
with the previous logic that was erroneusly spawning multiple go routines that
were making possiblea race between an agentInit and an agentClose.

The new logic aims to disallow it by checking for the provider passed and
ensuring that if the provider is already present there is nothing to do because
there is already an active go routine that is ready to process cluster events.
Moreover a patch on moby side takes care of clearing up the Cluster Events
dispacthing using only 1 channel to handle all the events types.
This will also guarantee in order event handling because now all the events are
piped into one single channel.

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
  • Loading branch information
Flavio Crisciani committed May 4, 2017
1 parent 8c113c7 commit a2bf0b3
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 90 deletions.
61 changes: 26 additions & 35 deletions libnetwork/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/go-events"
"github.com/docker/libnetwork/cluster"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/discoverapi"
"github.com/docker/libnetwork/driverapi"
Expand Down Expand Up @@ -40,7 +41,7 @@ type agent struct {
bindAddr string
advertiseAddr string
dataPathAddr string
epTblCancel func()
coreCancelFuncs []func()
driverCancelFuncs map[string][]func()
sync.Mutex
}
Expand Down Expand Up @@ -192,16 +193,12 @@ func (c *controller) handleKeyChange(keys []*types.EncryptionKey) error {
return nil
}

func (c *controller) agentSetup() error {
c.Lock()
clusterProvider := c.cfg.Daemon.ClusterProvider
agent := c.agent
c.Unlock()
func (c *controller) agentSetup(clusterProvider cluster.Provider) error {
agent := c.getAgent()

if clusterProvider == nil {
msg := "Aborting initialization of Libnetwork Agent because cluster provider is now unset"
logrus.Errorf(msg)
return fmt.Errorf(msg)
// If the agent is already present there is no need to try to initilize it again
if agent != nil {
return nil
}

bindAddr := clusterProvider.GetLocalAddress()
Expand All @@ -221,15 +218,15 @@ func (c *controller) agentSetup() error {
listenAddr, bindAddr, advAddr, dataAddr, remoteAddrList)
if advAddr != "" && agent == nil {
if err := c.agentInit(listenAddr, bindAddr, advAddr, dataAddr); err != nil {
logrus.Errorf("Error in agentInit : %v", err)
} else {
c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
if capability.DataScope == datastore.GlobalScope {
c.agentDriverNotify(driver)
}
return false
})
logrus.Errorf("error in agentInit: %v", err)
return err
}
c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
if capability.DataScope == datastore.GlobalScope {
c.agentDriverNotify(driver)
}
return false
})
}

if len(remoteAddrList) > 0 {
Expand All @@ -238,14 +235,6 @@ func (c *controller) agentSetup() error {
}
}

c.Lock()
if c.agent != nil && c.agentInitDone != nil {
close(c.agentInitDone)
c.agentInitDone = nil
c.agentStopDone = make(chan struct{})
}
c.Unlock()

return nil
}

Expand Down Expand Up @@ -287,16 +276,12 @@ func (c *controller) getPrimaryKeyTag(subsys string) ([]byte, uint64, error) {
}

func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, dataPathAddr string) error {
if !c.isAgent() {
return nil
}

bindAddr, err := resolveAddr(bindAddrOrInterface)
if err != nil {
return err
}

keys, tags := c.getKeys(subsysGossip)
keys, _ := c.getKeys(subsysGossip)
hostname, _ := os.Hostname()
nodeName := hostname + "-" + stringid.TruncateID(stringid.GenerateRandomID())
logrus.Info("Gossip cluster hostname ", nodeName)
Expand All @@ -312,16 +297,19 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d
return err
}

var cancelList []func()
ch, cancel := nDB.Watch(libnetworkEPTable, "", "")
cancelList = append(cancelList, cancel)
nodeCh, cancel := nDB.Watch(networkdb.NodeTable, "", "")
cancelList = append(cancelList, cancel)

c.Lock()
c.agent = &agent{
networkDB: nDB,
bindAddr: bindAddr,
advertiseAddr: advertiseAddr,
dataPathAddr: dataPathAddr,
epTblCancel: cancel,
coreCancelFuncs: cancelList,
driverCancelFuncs: make(map[string][]func()),
}
c.Unlock()
Expand All @@ -330,7 +318,7 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d
go c.handleTableEvents(nodeCh, c.handleNodeTableEvent)

drvEnc := discoverapi.DriverEncryptionConfig{}
keys, tags = c.getKeys(subsysIPSec)
keys, tags := c.getKeys(subsysIPSec)
drvEnc.Keys = keys
drvEnc.Tags = tags

Expand Down Expand Up @@ -399,14 +387,17 @@ func (c *controller) agentClose() {
cancelList = append(cancelList, cancel)
}
}

// Add also the cancel functions for the network db
for _, cancel := range agent.coreCancelFuncs {
cancelList = append(cancelList, cancel)
}
agent.Unlock()

for _, cancel := range cancelList {
cancel()
}

agent.epTblCancel()

agent.networkDB.Close()
}

Expand Down
16 changes: 15 additions & 1 deletion libnetwork/cluster/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,20 @@ import (
"golang.org/x/net/context"
)

const (
// EventSocketChange control socket changed
EventSocketChange = iota
// EventNodeReady cluster node in ready state
EventNodeReady
// EventNodeLeave node is leaving the cluster
EventNodeLeave
// EventNetworkKeysAvailable network keys correctly configured in the networking layer
EventNetworkKeysAvailable
)

// ConfigEventType type of the event produced by the cluster
type ConfigEventType uint8

// Provider provides clustering config details
type Provider interface {
IsManager() bool
Expand All @@ -14,7 +28,7 @@ type Provider interface {
GetAdvertiseAddress() string
GetDataPathAddress() string
GetRemoteAddressList() []string
ListenClusterEvents() <-chan struct{}
ListenClusterEvents() <-chan ConfigEventType
AttachNetwork(string, string, []string) (*network.NetworkingConfig, error)
DetachNetwork(string, string) error
UpdateAttachment(string, string, *network.NetworkingConfig) error
Expand Down
9 changes: 5 additions & 4 deletions libnetwork/cmd/dnet/dnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/docker/docker/pkg/term"
"github.com/docker/libnetwork"
"github.com/docker/libnetwork/api"
"github.com/docker/libnetwork/cluster"
"github.com/docker/libnetwork/config"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/driverapi"
Expand Down Expand Up @@ -234,7 +235,7 @@ type dnetConnection struct {
// addr holds the client address.
addr string
Orchestration *NetworkOrchestration
configEvent chan struct{}
configEvent chan cluster.ConfigEventType
}

// NetworkOrchestration exported
Expand Down Expand Up @@ -275,7 +276,7 @@ func (d *dnetConnection) dnetDaemon(cfgFile string) error {
controller.SetClusterProvider(d)

if d.Orchestration.Agent || d.Orchestration.Manager {
d.configEvent <- struct{}{}
d.configEvent <- cluster.EventNodeReady
}

createDefaultNetwork(controller)
Expand Down Expand Up @@ -335,7 +336,7 @@ func (d *dnetConnection) GetNetworkKeys() []*types.EncryptionKey {
func (d *dnetConnection) SetNetworkKeys([]*types.EncryptionKey) {
}

func (d *dnetConnection) ListenClusterEvents() <-chan struct{} {
func (d *dnetConnection) ListenClusterEvents() <-chan cluster.ConfigEventType {
return d.configEvent
}

Expand Down Expand Up @@ -438,7 +439,7 @@ func newDnetConnection(val string) (*dnetConnection, error) {
return nil, errors.New("dnet currently only supports tcp transport")
}

return &dnetConnection{protoAddrParts[0], protoAddrParts[1], &NetworkOrchestration{}, make(chan struct{}, 10)}, nil
return &dnetConnection{protoAddrParts[0], protoAddrParts[1], &NetworkOrchestration{}, make(chan cluster.ConfigEventType, 10)}, nil
}

func (d *dnetConnection) httpCall(method, path string, data interface{}, headers map[string][]string) (io.ReadCloser, http.Header, int, error) {
Expand Down
4 changes: 1 addition & 3 deletions libnetwork/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ type DaemonCfg struct {
Labels []string
DriverCfg map[string]interface{}
ClusterProvider cluster.Provider
DisableProvider chan struct{}
}

// ClusterCfg represents cluster configuration
Expand Down Expand Up @@ -74,8 +73,7 @@ func ParseConfig(tomlCfgFile string) (*Config, error) {
func ParseConfigOptions(cfgOptions ...Option) *Config {
cfg := &Config{
Daemon: DaemonCfg{
DriverCfg: make(map[string]interface{}),
DisableProvider: make(chan struct{}, 10),
DriverCfg: make(map[string]interface{}),
},
Scopes: make(map[string]*datastore.ScopeCfg),
}
Expand Down
Loading

0 comments on commit a2bf0b3

Please sign in to comment.