Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove requirement to run the Portworx volume driver on master node #45518

Merged
merged 5 commits into from
May 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/volume/portworx/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ go_library(
],
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/util/exec:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/util/strings:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/libopenstorage/openstorage/api:go_default_library",
"//vendor/github.com/libopenstorage/openstorage/api/client:go_default_library",
Expand Down
22 changes: 10 additions & 12 deletions pkg/volume/portworx/portworx.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@ import (
"k8s.io/kubernetes/pkg/util/mount"
kstrings "k8s.io/kubernetes/pkg/util/strings"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
)

// This is the primary entrypoint for volume plugins.
func ProbeVolumePlugins() []volume.VolumePlugin {
return []volume.VolumePlugin{&portworxVolumePlugin{nil}}
return []volume.VolumePlugin{&portworxVolumePlugin{nil, nil}}
}

type portworxVolumePlugin struct {
host volume.VolumeHost
util *PortworxVolumeUtil
}

var _ volume.VolumePlugin = &portworxVolumePlugin{}
Expand All @@ -56,6 +56,7 @@ func getPath(uid types.UID, volName string, host volume.VolumeHost) string {

func (plugin *portworxVolumePlugin) Init(host volume.VolumeHost) error {
plugin.host = host
plugin.util = &PortworxVolumeUtil{}
return nil
}

Expand Down Expand Up @@ -89,7 +90,7 @@ func (plugin *portworxVolumePlugin) GetAccessModes() []v1.PersistentVolumeAccess
}

func (plugin *portworxVolumePlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) {
return plugin.newMounterInternal(spec, pod.UID, &PortworxVolumeUtil{}, plugin.host.GetMounter())
return plugin.newMounterInternal(spec, pod.UID, plugin.util, plugin.host.GetMounter())
}

func (plugin *portworxVolumePlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, manager portworxManager, mounter mount.Interface) (volume.Mounter, error) {
Expand Down Expand Up @@ -117,10 +118,11 @@ func (plugin *portworxVolumePlugin) newMounterInternal(spec *volume.Spec, podUID
}

func (plugin *portworxVolumePlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) {
return plugin.newUnmounterInternal(volName, podUID, &PortworxVolumeUtil{}, plugin.host.GetMounter())
return plugin.newUnmounterInternal(volName, podUID, plugin.util, plugin.host.GetMounter())
}

func (plugin *portworxVolumePlugin) newUnmounterInternal(volName string, podUID types.UID, manager portworxManager, mounter mount.Interface) (volume.Unmounter, error) {
func (plugin *portworxVolumePlugin) newUnmounterInternal(volName string, podUID types.UID, manager portworxManager,
mounter mount.Interface) (volume.Unmounter, error) {
return &portworxVolumeUnmounter{
&portworxVolume{
podUID: podUID,
Expand All @@ -133,13 +135,14 @@ func (plugin *portworxVolumePlugin) newUnmounterInternal(volName string, podUID
}

func (plugin *portworxVolumePlugin) NewDeleter(spec *volume.Spec) (volume.Deleter, error) {
return plugin.newDeleterInternal(spec, &PortworxVolumeUtil{})
return plugin.newDeleterInternal(spec, plugin.util)
}

func (plugin *portworxVolumePlugin) newDeleterInternal(spec *volume.Spec, manager portworxManager) (volume.Deleter, error) {
if spec.PersistentVolume != nil && spec.PersistentVolume.Spec.PortworxVolume == nil {
return nil, fmt.Errorf("spec.PersistentVolumeSource.PortworxVolume is nil")
}

return &portworxVolumeDeleter{
portworxVolume: &portworxVolume{
volName: spec.Name(),
Expand All @@ -150,7 +153,7 @@ func (plugin *portworxVolumePlugin) newDeleterInternal(spec *volume.Spec, manage
}

func (plugin *portworxVolumePlugin) NewProvisioner(options volume.VolumeOptions) (volume.Provisioner, error) {
return plugin.newProvisionerInternal(options, &PortworxVolumeUtil{})
return plugin.newProvisionerInternal(options, plugin.util)
}

func (plugin *portworxVolumePlugin) newProvisionerInternal(options volume.VolumeOptions, manager portworxManager) (volume.Provisioner, error) {
Expand Down Expand Up @@ -311,11 +314,6 @@ func (c *portworxVolumeUnmounter) TearDown() error {
// resource was the last reference to that disk on the kubelet.
func (c *portworxVolumeUnmounter) TearDownAt(dir string) error {
glog.V(4).Infof("Portworx Volume TearDown of %s", dir)
// Unmount the bind mount inside the pod
if err := util.UnmountPath(dir, c.mounter); err != nil {
return err
}

// Call Portworx Unmount for Portworx's book-keeping.
if err := c.manager.UnmountVolume(c, dir); err != nil {
return err
Expand Down
121 changes: 90 additions & 31 deletions pkg/volume/portworx/portworx_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
osdclient "github.com/libopenstorage/openstorage/api/client"
volumeclient "github.com/libopenstorage/openstorage/api/client/volume"
osdspec "github.com/libopenstorage/openstorage/api/spec"
osdvolume "github.com/libopenstorage/openstorage/volume"
volumeapi "github.com/libopenstorage/openstorage/volume"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/volume"
)
Expand All @@ -31,8 +33,8 @@ const (
osdMgmtPort = "9001"
osdDriverVersion = "v1"
pxdDriverName = "pxd"
pwxSockName = "pwx"
pvcClaimLabel = "pvc"
pxServiceName = "portworx-service"
)

type PortworxVolumeUtil struct {
Expand All @@ -41,9 +43,9 @@ type PortworxVolumeUtil struct {

// CreateVolume creates a Portworx volume.
func (util *PortworxVolumeUtil) CreateVolume(p *portworxVolumeProvisioner) (string, int, map[string]string, error) {
hostname := p.plugin.host.GetHostName()
client, err := util.osdClient(hostname)
if err != nil {
driver, err := util.getPortworxDriver(p.plugin.host)
if err != nil || driver == nil {
glog.Errorf("Failed to get portworx driver. Err: %v", err)
return "", 0, nil, err
}

Expand All @@ -64,7 +66,7 @@ func (util *PortworxVolumeUtil) CreateVolume(p *portworxVolumeProvisioner) (stri
// Add claim Name as a part of Portworx Volume Labels
locator.VolumeLabels = make(map[string]string)
locator.VolumeLabels[pvcClaimLabel] = p.options.PVC.Name
volumeID, err := client.Create(&locator, &source, spec)
volumeID, err := driver.Create(&locator, &source, spec)
if err != nil {
glog.V(2).Infof("Error creating Portworx Volume : %v", err)
}
Expand All @@ -73,13 +75,13 @@ func (util *PortworxVolumeUtil) CreateVolume(p *portworxVolumeProvisioner) (stri

// DeleteVolume deletes a Portworx volume
func (util *PortworxVolumeUtil) DeleteVolume(d *portworxVolumeDeleter) error {
hostname := d.plugin.host.GetHostName()
client, err := util.osdClient(hostname)
if err != nil {
driver, err := util.getPortworxDriver(d.plugin.host)
if err != nil || driver == nil {
glog.Errorf("Failed to get portworx driver. Err: %v", err)
return err
}

err = client.Delete(d.volumeID)
err = driver.Delete(d.volumeID)
if err != nil {
glog.V(2).Infof("Error deleting Portworx Volume (%v): %v", d.volName, err)
return err
Expand All @@ -89,13 +91,13 @@ func (util *PortworxVolumeUtil) DeleteVolume(d *portworxVolumeDeleter) error {

// AttachVolume attaches a Portworx Volume
func (util *PortworxVolumeUtil) AttachVolume(m *portworxVolumeMounter) (string, error) {
hostname := m.plugin.host.GetHostName()
client, err := util.osdClient(hostname)
if err != nil {
driver, err := util.getPortworxDriver(m.plugin.host)
if err != nil || driver == nil {
glog.Errorf("Failed to get portworx driver. Err: %v", err)
return "", err
}

devicePath, err := client.Attach(m.volName)
devicePath, err := driver.Attach(m.volName)
if err != nil {
glog.V(2).Infof("Error attaching Portworx Volume (%v): %v", m.volName, err)
return "", err
Expand All @@ -105,13 +107,13 @@ func (util *PortworxVolumeUtil) AttachVolume(m *portworxVolumeMounter) (string,

// DetachVolume detaches a Portworx Volume
func (util *PortworxVolumeUtil) DetachVolume(u *portworxVolumeUnmounter) error {
hostname := u.plugin.host.GetHostName()
client, err := util.osdClient(hostname)
if err != nil {
driver, err := util.getPortworxDriver(u.plugin.host)
if err != nil || driver == nil {
glog.Errorf("Failed to get portworx driver. Err: %v", err)
return err
}

err = client.Detach(u.volName)
err = driver.Detach(u.volName)
if err != nil {
glog.V(2).Infof("Error detaching Portworx Volume (%v): %v", u.volName, err)
return err
Expand All @@ -121,13 +123,13 @@ func (util *PortworxVolumeUtil) DetachVolume(u *portworxVolumeUnmounter) error {

// MountVolume mounts a Portworx Volume on the specified mountPath
func (util *PortworxVolumeUtil) MountVolume(m *portworxVolumeMounter, mountPath string) error {
hostname := m.plugin.host.GetHostName()
client, err := util.osdClient(hostname)
if err != nil {
driver, err := util.getPortworxDriver(m.plugin.host)
if err != nil || driver == nil {
glog.Errorf("Failed to get portworx driver. Err: %v", err)
return err
}

err = client.Mount(m.volName, mountPath)
err = driver.Mount(m.volName, mountPath)
if err != nil {
glog.V(2).Infof("Error mounting Portworx Volume (%v) on Path (%v): %v", m.volName, mountPath, err)
return err
Expand All @@ -137,28 +139,85 @@ func (util *PortworxVolumeUtil) MountVolume(m *portworxVolumeMounter, mountPath

// UnmountVolume unmounts a Portworx Volume
func (util *PortworxVolumeUtil) UnmountVolume(u *portworxVolumeUnmounter, mountPath string) error {
hostname := u.plugin.host.GetHostName()
client, err := util.osdClient(hostname)
if err != nil {
driver, err := util.getPortworxDriver(u.plugin.host)
if err != nil || driver == nil {
glog.Errorf("Failed to get portworx driver. Err: %v", err)
return err
}

err = client.Unmount(u.volName, mountPath)
err = driver.Unmount(u.volName, mountPath)
if err != nil {
glog.V(2).Infof("Error unmounting Portworx Volume (%v) on Path (%v): %v", u.volName, mountPath, err)
return err
}
return nil
}

func (util *PortworxVolumeUtil) osdClient(hostname string) (osdvolume.VolumeDriver, error) {
osdEndpoint := "http://" + hostname + ":" + osdMgmtPort
if util.portworxClient == nil {
driverClient, err := volumeclient.NewDriverClient(osdEndpoint, pxdDriverName, osdDriverVersion)
func isClientValid(client *osdclient.Client) (bool, error) {
if client == nil {
return false, nil
}

_, err := client.Versions(osdapi.OsdVolumePath)
if err != nil {
glog.Errorf("portworx client failed driver versions check. Err: %v", err)
return false, err
}

return true, nil
}

func createDriverClient(hostname string) (*osdclient.Client, error) {
client, err := volumeclient.NewDriverClient("http://"+hostname+":"+osdMgmtPort,
pxdDriverName, osdDriverVersion)
if err != nil {
return nil, err
}

if isValid, err := isClientValid(client); isValid {
return client, nil
} else {
return nil, err
}
}

func (util *PortworxVolumeUtil) getPortworxDriver(volumeHost volume.VolumeHost) (volumeapi.VolumeDriver, error) {
if isValid, _ := isClientValid(util.portworxClient); isValid {
return volumeclient.VolumeDriver(util.portworxClient), nil
}

// create new client
var err error
util.portworxClient, err = createDriverClient(volumeHost.GetHostName()) // for backward compatibility
if err != nil || util.portworxClient == nil {
// Create client from portworx service
kubeClient := volumeHost.GetKubeClient()
if kubeClient == nil {
glog.Error("Failed to get kubeclient when creating portworx client")
return nil, nil
}

opts := metav1.GetOptions{}
svc, err := kubeClient.CoreV1().Services(api.NamespaceSystem).Get(pxServiceName, opts)
if err != nil {
glog.Errorf("Failed to get service. Err: %v", err)
return nil, err
}
util.portworxClient = driverClient

if svc == nil {
glog.Errorf("Service: %v not found. Consult Portworx docs to deploy it.", pxServiceName)
return nil, err
}

util.portworxClient, err = createDriverClient(svc.Spec.ClusterIP)
if err != nil || util.portworxClient == nil {
glog.Errorf("Failed to connect to portworx service. Err: %v", err)
return nil, err
}

glog.Infof("Using portworx service at: %v as api endpoint", svc.Spec.ClusterIP)
} else {
glog.Infof("Using portworx service at: %v as api endpoint", volumeHost.GetHostName())
}

return volumeclient.VolumeDriver(util.portworxClient), nil
Expand Down