Skip to content

Commit

Permalink
Merge pull request #71314 from saad-ali/csi03Compat
Browse files Browse the repository at this point in the history
Reintroduce CSI 0.3.x support in CSI Volume Plugin
  • Loading branch information
k8s-ci-robot authored Nov 22, 2018
2 parents 2b0212d + a7c5582 commit 20e1ab6
Show file tree
Hide file tree
Showing 31 changed files with 6,842 additions and 276 deletions.
1 change: 1 addition & 0 deletions hack/.golint_failures
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ pkg/version/verflag
pkg/volume
pkg/volume/azure_dd
pkg/volume/azure_file
pkg/volume/csi/csiv0
pkg/volume/csi/fake
pkg/volume/git_repo
pkg/volume/host_path
Expand Down
4 changes: 2 additions & 2 deletions pkg/kubelet/cm/devicemanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (m *ManagerImpl) GetWatcherHandler() watcher.PluginHandler {
}

// ValidatePlugin validates a plugin if the version is correct and the name has the format of an extended resource
func (m *ManagerImpl) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
func (m *ManagerImpl) ValidatePlugin(pluginName string, endpoint string, versions []string, foundInDeprecatedDir bool) error {
klog.V(2).Infof("Got Plugin %s at endpoint %s with versions %v", pluginName, endpoint, versions)

if !m.isVersionCompatibleWithPlugin(versions) {
Expand All @@ -263,7 +263,7 @@ func (m *ManagerImpl) ValidatePlugin(pluginName string, endpoint string, version
// RegisterPlugin starts the endpoint and registers it
// TODO: Start the endpoint and wait for the First ListAndWatch call
// before registering the plugin
func (m *ManagerImpl) RegisterPlugin(pluginName string, endpoint string) error {
func (m *ManagerImpl) RegisterPlugin(pluginName string, endpoint string, versions []string) error {
klog.V(2).Infof("Registering Plugin %s at endpoint %s", pluginName, endpoint)

e, err := newEndpointImpl(endpoint, pluginName, m.callback)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/cm/devicemanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func setupDevicePlugin(t *testing.T, devs []*pluginapi.Device, pluginSocketName
}

func setupPluginWatcher(pluginSocketName string, m Manager) *pluginwatcher.Watcher {
w := pluginwatcher.NewWatcher(filepath.Dir(pluginSocketName))
w := pluginwatcher.NewWatcher(filepath.Dir(pluginSocketName), "" /* deprecatedSockDir */)
w.AddHandler(watcherapi.DevicePlugin, m.GetWatcherHandler())
w.Start()

Expand Down
5 changes: 4 additions & 1 deletion pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,10 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
return nil, err
}
if klet.enablePluginsWatcher {
klet.pluginWatcher = pluginwatcher.NewWatcher(klet.getPluginsRegistrationDir())
klet.pluginWatcher = pluginwatcher.NewWatcher(
klet.getPluginsRegistrationDir(), /* sockDir */
klet.getPluginsDir(), /* deprecatedSockDir */
)
}

// If the experimentalMounterPathFlag is set, we do not want to
Expand Down
15 changes: 11 additions & 4 deletions pkg/kubelet/util/pluginwatcher/example_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type exampleHandler struct {

m sync.Mutex
count int

permitDeprecatedDir bool
}

type examplePluginEvent int
Expand All @@ -50,16 +52,21 @@ const (
)

// NewExampleHandler provide a example handler
func NewExampleHandler(supportedVersions []string) *exampleHandler {
func NewExampleHandler(supportedVersions []string, permitDeprecatedDir bool) *exampleHandler {
return &exampleHandler{
SupportedVersions: supportedVersions,
ExpectedNames: make(map[string]int),

eventChans: make(map[string]chan examplePluginEvent),
eventChans: make(map[string]chan examplePluginEvent),
permitDeprecatedDir: permitDeprecatedDir,
}
}

func (p *exampleHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
func (p *exampleHandler) ValidatePlugin(pluginName string, endpoint string, versions []string, foundInDeprecatedDir bool) error {
if foundInDeprecatedDir && !p.permitDeprecatedDir {
return fmt.Errorf("device plugin socket was found in a directory that is no longer supported and this test does not permit plugins from deprecated dir")
}

p.SendEvent(pluginName, exampleEventValidate)

n, ok := p.DecreasePluginCount(pluginName)
Expand All @@ -79,7 +86,7 @@ func (p *exampleHandler) ValidatePlugin(pluginName string, endpoint string, vers
return nil
}

func (p *exampleHandler) RegisterPlugin(pluginName, endpoint string) error {
func (p *exampleHandler) RegisterPlugin(pluginName, endpoint string, versions []string) error {
p.SendEvent(pluginName, exampleEventRegister)

// Verifies the grpcServer is ready to serve services.
Expand Down
69 changes: 58 additions & 11 deletions pkg/kubelet/util/pluginwatcher/plugin_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"net"
"os"
"path/filepath"
"strings"
"sync"
"time"
Expand All @@ -36,11 +37,12 @@ import (

// Watcher is the plugin watcher
type Watcher struct {
path string
stopCh chan interface{}
fs utilfs.Filesystem
fsWatcher *fsnotify.Watcher
wg sync.WaitGroup
path string
deprecatedPath string
stopCh chan interface{}
fs utilfs.Filesystem
fsWatcher *fsnotify.Watcher
wg sync.WaitGroup

mutex sync.Mutex
handlers map[string]PluginHandler
Expand All @@ -54,10 +56,13 @@ type pathInfo struct {
}

// NewWatcher provides a new watcher
func NewWatcher(sockDir string) *Watcher {
// deprecatedSockDir refers to a pre-GA directory that was used by older plugins
// for socket registration. New plugins should not use this directory.
func NewWatcher(sockDir string, deprecatedSockDir string) *Watcher {
return &Watcher{
path: sockDir,
fs: &utilfs.DefaultFs{},
path: sockDir,
deprecatedPath: deprecatedSockDir,
fs: &utilfs.DefaultFs{},

handlers: make(map[string]PluginHandler),
plugins: make(map[string]pathInfo),
Expand Down Expand Up @@ -137,7 +142,15 @@ func (w *Watcher) Start() error {
// Traverse plugin dir after starting the plugin processing goroutine
if err := w.traversePluginDir(w.path); err != nil {
w.Stop()
return fmt.Errorf("failed to traverse plugin socket path, err: %v", err)
return fmt.Errorf("failed to traverse plugin socket path %q, err: %v", w.path, err)
}

// Traverse deprecated plugin dir, if specified.
if len(w.deprecatedPath) != 0 {
if err := w.traversePluginDir(w.deprecatedPath); err != nil {
w.Stop()
return fmt.Errorf("failed to traverse deprecated plugin socket path %q, err: %v", w.deprecatedPath, err)
}
}

return nil
Expand Down Expand Up @@ -190,6 +203,10 @@ func (w *Watcher) traversePluginDir(dir string) error {

switch mode := info.Mode(); {
case mode.IsDir():
if w.containsBlacklistedDir(path) {
return filepath.SkipDir
}

if err := w.fsWatcher.Add(path); err != nil {
return fmt.Errorf("failed to watch %s, err: %v", path, err)
}
Expand All @@ -216,6 +233,10 @@ func (w *Watcher) traversePluginDir(dir string) error {
func (w *Watcher) handleCreateEvent(event fsnotify.Event) error {
klog.V(6).Infof("Handling create event: %v", event)

if w.containsBlacklistedDir(event.Name) {
return nil
}

fi, err := os.Stat(event.Name)
if err != nil {
return fmt.Errorf("stat file %s failed: %v", event.Name, err)
Expand Down Expand Up @@ -271,16 +292,18 @@ func (w *Watcher) handlePluginRegistration(socketPath string) error {
infoResp.Endpoint = socketPath
}

foundInDeprecatedDir := w.foundInDeprecatedDir(socketPath)

// calls handler callback to verify registration request
if err := handler.ValidatePlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil {
if err := handler.ValidatePlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions, foundInDeprecatedDir); err != nil {
return w.notifyPlugin(client, false, fmt.Sprintf("plugin validation failed with err: %v", err))
}

// We add the plugin to the pluginwatcher's map before calling a plugin consumer's Register handle
// so that if we receive a delete event during Register Plugin, we can process it as a DeRegister call.
w.registerPlugin(socketPath, infoResp.Type, infoResp.Name)

if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint); err != nil {
if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil {
return w.notifyPlugin(client, false, fmt.Sprintf("plugin registration failed with err: %v", err))
}

Expand Down Expand Up @@ -417,3 +440,27 @@ func dial(unixSocketPath string, timeout time.Duration) (registerapi.Registratio

return registerapi.NewRegistrationClient(c), c, nil
}

// While deprecated dir is supported, to add extra protection around #69015
// we will explicitly blacklist kubernetes.io directory.
func (w *Watcher) containsBlacklistedDir(path string) bool {
return strings.HasPrefix(path, w.deprecatedPath+"/kubernetes.io/") ||
path == w.deprecatedPath+"/kubernetes.io"
}

func (w *Watcher) foundInDeprecatedDir(socketPath string) bool {
if len(w.deprecatedPath) != 0 {
if socketPath == w.deprecatedPath {
return true
}

deprecatedPath := w.deprecatedPath
if !strings.HasSuffix(deprecatedPath, "/") {
deprecatedPath = deprecatedPath + "/"
}
if strings.HasPrefix(socketPath, deprecatedPath) {
return true
}
}
return false
}
Loading

0 comments on commit 20e1ab6

Please sign in to comment.