Skip to content

Commit

Permalink
Use controller methods for handling the encyrption keys from agent
Browse files Browse the repository at this point in the history
instead of the Provider interface methods.

Signed-off-by: Santhosh Manohar <santhosh@docker.com>
  • Loading branch information
Santhosh Manohar committed Jun 5, 2016
1 parent b85caa0 commit c4d5bba
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 80 deletions.
74 changes: 58 additions & 16 deletions libnetwork/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ type agent struct {
bindAddr string
epTblCancel func()
driverCancelFuncs map[string][]func()
keys []*types.EncryptionKey
}

func getBindAddr(ifaceName string) (string, error) {
Expand Down Expand Up @@ -72,18 +71,18 @@ func resolveAddr(addrOrInterface string) (string, error) {
return getBindAddr(addrOrInterface)
}

func (c *controller) agentHandleKeys(keys []*types.EncryptionKey) error {
func (c *controller) handleKeyChange(keys []*types.EncryptionKey) error {
// Find the new key and add it to the key ring
a := c.agent
for _, key := range keys {
same := false
for _, aKey := range a.keys {
if same = aKey.LamportTime == key.LamportTime; same {
for _, cKey := range c.keys {
if same = cKey.LamportTime == key.LamportTime; same {
break
}
}
if !same {
a.keys = append(a.keys, key)
c.keys = append(c.keys, key)
if key.Subsystem == "networking:gossip" {
a.networkDB.SetKey(key.Key)
}
Expand All @@ -93,24 +92,24 @@ func (c *controller) agentHandleKeys(keys []*types.EncryptionKey) error {
// Find the deleted key. If the deleted key was the primary key,
// a new primary key should be set before removing if from keyring.
deleted := []byte{}
for i, aKey := range a.keys {
for i, cKey := range c.keys {
same := false
for _, key := range keys {
if same = key.LamportTime == aKey.LamportTime; same {
if same = key.LamportTime == cKey.LamportTime; same {
break
}
}
if !same {
if aKey.Subsystem == "networking:gossip" {
deleted = aKey.Key
if cKey.Subsystem == "networking:gossip" {
deleted = cKey.Key
}
a.keys = append(a.keys[:i], a.keys[i+1:]...)
c.keys = append(c.keys[:i], c.keys[i+1:]...)
break
}
}

sort.Sort(ByTime(a.keys))
for _, key := range a.keys {
sort.Sort(ByTime(c.keys))
for _, key := range c.keys {
if key.Subsystem == "networking:gossip" {
a.networkDB.SetPrimaryKey(key.Key)
break
Expand All @@ -122,16 +121,60 @@ func (c *controller) agentHandleKeys(keys []*types.EncryptionKey) error {
return nil
}

func (c *controller) agentInit(bindAddrOrInterface string, keys []*types.EncryptionKey) error {
func (c *controller) agentSetup() error {
clusterProvider := c.cfg.Daemon.ClusterProvider

bindAddr, _, _ := net.SplitHostPort(clusterProvider.GetListenAddress())
remote := clusterProvider.GetRemoteAddress()
remoteAddr, _, _ := net.SplitHostPort(remote)

// Determine the BindAddress from RemoteAddress or through best-effort routing
if !isValidClusteringIP(bindAddr) {
if !isValidClusteringIP(remoteAddr) {
remote = "8.8.8.8:53"
}
conn, err := net.Dial("udp", remote)
if err == nil {
bindHostPort := conn.LocalAddr().String()
bindAddr, _, _ = net.SplitHostPort(bindHostPort)
conn.Close()
}
}

if bindAddr != "" && c.agent == nil {
if err := c.agentInit(bindAddr); err != nil {
logrus.Errorf("Error in agentInit : %v", err)
} else {
c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
if capability.DataScope == datastore.GlobalScope {
c.agentDriverNotify(driver)
}
return false
})

if c.agent != nil {
close(c.agentInitDone)
}
}
}
if remoteAddr != "" {
if err := c.agentJoin(remoteAddr); err != nil {
logrus.Errorf("Error in agentJoin : %v", err)
}
}
return nil
}

func (c *controller) agentInit(bindAddrOrInterface string) error {
if !c.isAgent() {
return nil
}

// sort the keys by lamport time
sort.Sort(ByTime(keys))
sort.Sort(ByTime(c.keys))

gossipkey := [][]byte{}
for _, key := range keys {
for _, key := range c.keys {
if key.Subsystem == "networking:gossip" {
gossipkey = append(gossipkey, key.Key)
}
Expand Down Expand Up @@ -160,7 +203,6 @@ func (c *controller) agentInit(bindAddrOrInterface string, keys []*types.Encrypt
bindAddr: bindAddr,
epTblCancel: cancel,
driverCancelFuncs: make(map[string][]func()),
keys: keys,
}

go c.handleTableEvents(ch, c.handleEpTableEvent)
Expand Down
4 changes: 0 additions & 4 deletions libnetwork/cluster/provider.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
package cluster

import "github.com/docker/libnetwork/types"

// Provider provides clustering config details
type Provider interface {
IsManager() bool
IsAgent() bool
GetListenAddress() string
GetRemoteAddress() string
ListenClusterEvents() <-chan struct{}
GetNetworkKeys() []*types.EncryptionKey
SetNetworkKeys([]*types.EncryptionKey)
}
108 changes: 48 additions & 60 deletions libnetwork/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ type NetworkController interface {

// Wait for agent initialization complete in libnetwork controller
AgentInitWait()

// SetKeys configures the encryption key for gossip and overlay data path
SetKeys(keys []*types.EncryptionKey) error
}

// NetworkWalker is a client provided function which will be used to walk the Networks.
Expand All @@ -130,23 +133,25 @@ type SandboxWalker func(sb Sandbox) bool
type sandboxTable map[string]*sandbox

type controller struct {
id string
drvRegistry *drvregistry.DrvRegistry
sandboxes sandboxTable
cfg *config.Config
stores []datastore.DataStore
discovery hostdiscovery.HostDiscovery
extKeyListener net.Listener
watchCh chan *endpoint
unWatchCh chan *endpoint
svcRecords map[string]svcInfo
nmap map[string]*netWatch
serviceBindings map[string]*service
defOsSbox osl.Sandbox
ingressSandbox *sandbox
sboxOnce sync.Once
agent *agent
agentInitDone chan struct{}
id string
drvRegistry *drvregistry.DrvRegistry
sandboxes sandboxTable
cfg *config.Config
stores []datastore.DataStore
discovery hostdiscovery.HostDiscovery
extKeyListener net.Listener
watchCh chan *endpoint
unWatchCh chan *endpoint
svcRecords map[string]svcInfo
nmap map[string]*netWatch
serviceBindings map[string]*service
defOsSbox osl.Sandbox
ingressSandbox *sandbox
sboxOnce sync.Once
agent *agent
agentInitDone chan struct{}
keys []*types.EncryptionKey
clusterConfigAvailable bool
sync.Mutex
}

Expand Down Expand Up @@ -220,55 +225,38 @@ func isValidClusteringIP(addr string) bool {
return addr != "" && !net.ParseIP(addr).IsLoopback() && !net.ParseIP(addr).IsUnspecified()
}

// libnetwork side of agent depends on the keys. On the first receipt of
// keys setup the agent. For subsequent key set handle the key change
func (c *controller) SetKeys(keys []*types.EncryptionKey) error {
if len(c.keys) == 0 {
c.keys = keys
if c.agent != nil {
return (fmt.Errorf("libnetwork agent setup without keys"))
}
if c.clusterConfigAvailable {
return c.agentSetup()
}
log.Debugf("received encryption keys before cluster config")
return nil
}
if c.agent == nil {
c.keys = keys
return nil
}
return c.handleKeyChange(keys)
}

func (c *controller) clusterAgentInit() {
clusterProvider := c.cfg.Daemon.ClusterProvider
for {
select {
case <-clusterProvider.ListenClusterEvents():
c.clusterConfigAvailable = true
if !c.isDistributedControl() {
keys := clusterProvider.GetNetworkKeys()
// If the agent is already setup this could be a key change notificaiton
if c.agent != nil {
c.agentHandleKeys(keys)
}

bindAddr, _, _ := net.SplitHostPort(clusterProvider.GetListenAddress())
remote := clusterProvider.GetRemoteAddress()
remoteAddr, _, _ := net.SplitHostPort(remote)

// Determine the BindAddress from RemoteAddress or through best-effort routing
if !isValidClusteringIP(bindAddr) {
if !isValidClusteringIP(remoteAddr) {
remote = "8.8.8.8:53"
}
conn, err := net.Dial("udp", remote)
if err == nil {
bindHostPort := conn.LocalAddr().String()
bindAddr, _, _ = net.SplitHostPort(bindHostPort)
conn.Close()
}
}

if bindAddr != "" && len(keys) > 0 && c.agent == nil {
if err := c.agentInit(bindAddr, keys); err != nil {
log.Errorf("Error in agentInit : %v", err)
} else {
c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
if capability.DataScope == datastore.GlobalScope {
c.agentDriverNotify(driver)
}
return false
})

if c.agent != nil {
close(c.agentInitDone)
}
}
}
if remoteAddr != "" {
if err := c.agentJoin(remoteAddr); err != nil {
log.Errorf("Error in agentJoin : %v", err)
}
// agent initialization needs encyrption keys and bind/remote IP which
// comes from the daemon cluster events
if len(c.keys) > 0 {
c.agentSetup()
}
} else {
c.agentInitDone = make(chan struct{})
Expand Down

0 comments on commit c4d5bba

Please sign in to comment.