Skip to content

Commit

Permalink
Merge pull request #45518 from portworx/px-remote
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue (batch tested with PRs 45518, 46127, 46146, 45932, 45003)

Remove requirement to run the Portworx volume driver on master node

**What this PR does / why we need it**:
This change removes requirement to run the Portworx volume driver on Kubernetes master node.

**Special notes for your reviewer**:
Before this pull request, in order to use a Portworx volume, users had to run the Portworx container on the master node. Since it isn't ideal (and impossible on GKE) to schedule any pods on the master node, this PR removes that requirement.
  • Loading branch information
Kubernetes Submit Queue authored May 25, 2017
2 parents 079020f + ad4f21f commit b017a7a
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 44 deletions.
2 changes: 1 addition & 1 deletion pkg/volume/portworx/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ go_library(
],
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/util/exec:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/util/strings:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/libopenstorage/openstorage/api:go_default_library",
"//vendor/github.com/libopenstorage/openstorage/api/client:go_default_library",
Expand Down
22 changes: 10 additions & 12 deletions pkg/volume/portworx/portworx.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@ import (
"k8s.io/kubernetes/pkg/util/mount"
kstrings "k8s.io/kubernetes/pkg/util/strings"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
)

// This is the primary entrypoint for volume plugins.
func ProbeVolumePlugins() []volume.VolumePlugin {
return []volume.VolumePlugin{&portworxVolumePlugin{nil}}
return []volume.VolumePlugin{&portworxVolumePlugin{nil, nil}}
}

type portworxVolumePlugin struct {
host volume.VolumeHost
util *PortworxVolumeUtil
}

var _ volume.VolumePlugin = &portworxVolumePlugin{}
Expand All @@ -56,6 +56,7 @@ func getPath(uid types.UID, volName string, host volume.VolumeHost) string {

func (plugin *portworxVolumePlugin) Init(host volume.VolumeHost) error {
plugin.host = host
plugin.util = &PortworxVolumeUtil{}
return nil
}

Expand Down Expand Up @@ -89,7 +90,7 @@ func (plugin *portworxVolumePlugin) GetAccessModes() []v1.PersistentVolumeAccess
}

func (plugin *portworxVolumePlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) {
return plugin.newMounterInternal(spec, pod.UID, &PortworxVolumeUtil{}, plugin.host.GetMounter())
return plugin.newMounterInternal(spec, pod.UID, plugin.util, plugin.host.GetMounter())
}

func (plugin *portworxVolumePlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, manager portworxManager, mounter mount.Interface) (volume.Mounter, error) {
Expand Down Expand Up @@ -117,10 +118,11 @@ func (plugin *portworxVolumePlugin) newMounterInternal(spec *volume.Spec, podUID
}

func (plugin *portworxVolumePlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) {
return plugin.newUnmounterInternal(volName, podUID, &PortworxVolumeUtil{}, plugin.host.GetMounter())
return plugin.newUnmounterInternal(volName, podUID, plugin.util, plugin.host.GetMounter())
}

func (plugin *portworxVolumePlugin) newUnmounterInternal(volName string, podUID types.UID, manager portworxManager, mounter mount.Interface) (volume.Unmounter, error) {
func (plugin *portworxVolumePlugin) newUnmounterInternal(volName string, podUID types.UID, manager portworxManager,
mounter mount.Interface) (volume.Unmounter, error) {
return &portworxVolumeUnmounter{
&portworxVolume{
podUID: podUID,
Expand All @@ -133,13 +135,14 @@ func (plugin *portworxVolumePlugin) newUnmounterInternal(volName string, podUID
}

func (plugin *portworxVolumePlugin) NewDeleter(spec *volume.Spec) (volume.Deleter, error) {
return plugin.newDeleterInternal(spec, &PortworxVolumeUtil{})
return plugin.newDeleterInternal(spec, plugin.util)
}

func (plugin *portworxVolumePlugin) newDeleterInternal(spec *volume.Spec, manager portworxManager) (volume.Deleter, error) {
if spec.PersistentVolume != nil && spec.PersistentVolume.Spec.PortworxVolume == nil {
return nil, fmt.Errorf("spec.PersistentVolumeSource.PortworxVolume is nil")
}

return &portworxVolumeDeleter{
portworxVolume: &portworxVolume{
volName: spec.Name(),
Expand All @@ -150,7 +153,7 @@ func (plugin *portworxVolumePlugin) newDeleterInternal(spec *volume.Spec, manage
}

func (plugin *portworxVolumePlugin) NewProvisioner(options volume.VolumeOptions) (volume.Provisioner, error) {
return plugin.newProvisionerInternal(options, &PortworxVolumeUtil{})
return plugin.newProvisionerInternal(options, plugin.util)
}

func (plugin *portworxVolumePlugin) newProvisionerInternal(options volume.VolumeOptions, manager portworxManager) (volume.Provisioner, error) {
Expand Down Expand Up @@ -311,11 +314,6 @@ func (c *portworxVolumeUnmounter) TearDown() error {
// resource was the last reference to that disk on the kubelet.
func (c *portworxVolumeUnmounter) TearDownAt(dir string) error {
glog.V(4).Infof("Portworx Volume TearDown of %s", dir)
// Unmount the bind mount inside the pod
if err := util.UnmountPath(dir, c.mounter); err != nil {
return err
}

// Call Portworx Unmount for Portworx's book-keeping.
if err := c.manager.UnmountVolume(c, dir); err != nil {
return err
Expand Down
121 changes: 90 additions & 31 deletions pkg/volume/portworx/portworx_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
osdclient "github.com/libopenstorage/openstorage/api/client"
volumeclient "github.com/libopenstorage/openstorage/api/client/volume"
osdspec "github.com/libopenstorage/openstorage/api/spec"
osdvolume "github.com/libopenstorage/openstorage/volume"
volumeapi "github.com/libopenstorage/openstorage/volume"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/volume"
)
Expand All @@ -31,8 +33,8 @@ const (
osdMgmtPort = "9001"
osdDriverVersion = "v1"
pxdDriverName = "pxd"
pwxSockName = "pwx"
pvcClaimLabel = "pvc"
pxServiceName = "portworx-service"
)

type PortworxVolumeUtil struct {
Expand All @@ -41,9 +43,9 @@ type PortworxVolumeUtil struct {

// CreateVolume creates a Portworx volume.
func (util *PortworxVolumeUtil) CreateVolume(p *portworxVolumeProvisioner) (string, int, map[string]string, error) {
hostname := p.plugin.host.GetHostName()
client, err := util.osdClient(hostname)
if err != nil {
driver, err := util.getPortworxDriver(p.plugin.host)
if err != nil || driver == nil {
glog.Errorf("Failed to get portworx driver. Err: %v", err)
return "", 0, nil, err
}

Expand All @@ -64,7 +66,7 @@ func (util *PortworxVolumeUtil) CreateVolume(p *portworxVolumeProvisioner) (stri
// Add claim Name as a part of Portworx Volume Labels
locator.VolumeLabels = make(map[string]string)
locator.VolumeLabels[pvcClaimLabel] = p.options.PVC.Name
volumeID, err := client.Create(&locator, &source, spec)
volumeID, err := driver.Create(&locator, &source, spec)
if err != nil {
glog.V(2).Infof("Error creating Portworx Volume : %v", err)
}
Expand All @@ -73,13 +75,13 @@ func (util *PortworxVolumeUtil) CreateVolume(p *portworxVolumeProvisioner) (stri

// DeleteVolume deletes a Portworx volume
func (util *PortworxVolumeUtil) DeleteVolume(d *portworxVolumeDeleter) error {
hostname := d.plugin.host.GetHostName()
client, err := util.osdClient(hostname)
if err != nil {
driver, err := util.getPortworxDriver(d.plugin.host)
if err != nil || driver == nil {
glog.Errorf("Failed to get portworx driver. Err: %v", err)
return err
}

err = client.Delete(d.volumeID)
err = driver.Delete(d.volumeID)
if err != nil {
glog.V(2).Infof("Error deleting Portworx Volume (%v): %v", d.volName, err)
return err
Expand All @@ -89,13 +91,13 @@ func (util *PortworxVolumeUtil) DeleteVolume(d *portworxVolumeDeleter) error {

// AttachVolume attaches a Portworx Volume
func (util *PortworxVolumeUtil) AttachVolume(m *portworxVolumeMounter) (string, error) {
hostname := m.plugin.host.GetHostName()
client, err := util.osdClient(hostname)
if err != nil {
driver, err := util.getPortworxDriver(m.plugin.host)
if err != nil || driver == nil {
glog.Errorf("Failed to get portworx driver. Err: %v", err)
return "", err
}

devicePath, err := client.Attach(m.volName)
devicePath, err := driver.Attach(m.volName)
if err != nil {
glog.V(2).Infof("Error attaching Portworx Volume (%v): %v", m.volName, err)
return "", err
Expand All @@ -105,13 +107,13 @@ func (util *PortworxVolumeUtil) AttachVolume(m *portworxVolumeMounter) (string,

// DetachVolume detaches a Portworx Volume
func (util *PortworxVolumeUtil) DetachVolume(u *portworxVolumeUnmounter) error {
hostname := u.plugin.host.GetHostName()
client, err := util.osdClient(hostname)
if err != nil {
driver, err := util.getPortworxDriver(u.plugin.host)
if err != nil || driver == nil {
glog.Errorf("Failed to get portworx driver. Err: %v", err)
return err
}

err = client.Detach(u.volName)
err = driver.Detach(u.volName)
if err != nil {
glog.V(2).Infof("Error detaching Portworx Volume (%v): %v", u.volName, err)
return err
Expand All @@ -121,13 +123,13 @@ func (util *PortworxVolumeUtil) DetachVolume(u *portworxVolumeUnmounter) error {

// MountVolume mounts a Portworx Volume on the specified mountPath
func (util *PortworxVolumeUtil) MountVolume(m *portworxVolumeMounter, mountPath string) error {
hostname := m.plugin.host.GetHostName()
client, err := util.osdClient(hostname)
if err != nil {
driver, err := util.getPortworxDriver(m.plugin.host)
if err != nil || driver == nil {
glog.Errorf("Failed to get portworx driver. Err: %v", err)
return err
}

err = client.Mount(m.volName, mountPath)
err = driver.Mount(m.volName, mountPath)
if err != nil {
glog.V(2).Infof("Error mounting Portworx Volume (%v) on Path (%v): %v", m.volName, mountPath, err)
return err
Expand All @@ -137,28 +139,85 @@ func (util *PortworxVolumeUtil) MountVolume(m *portworxVolumeMounter, mountPath

// UnmountVolume unmounts a Portworx Volume
func (util *PortworxVolumeUtil) UnmountVolume(u *portworxVolumeUnmounter, mountPath string) error {
hostname := u.plugin.host.GetHostName()
client, err := util.osdClient(hostname)
if err != nil {
driver, err := util.getPortworxDriver(u.plugin.host)
if err != nil || driver == nil {
glog.Errorf("Failed to get portworx driver. Err: %v", err)
return err
}

err = client.Unmount(u.volName, mountPath)
err = driver.Unmount(u.volName, mountPath)
if err != nil {
glog.V(2).Infof("Error unmounting Portworx Volume (%v) on Path (%v): %v", u.volName, mountPath, err)
return err
}
return nil
}

func (util *PortworxVolumeUtil) osdClient(hostname string) (osdvolume.VolumeDriver, error) {
osdEndpoint := "http://" + hostname + ":" + osdMgmtPort
if util.portworxClient == nil {
driverClient, err := volumeclient.NewDriverClient(osdEndpoint, pxdDriverName, osdDriverVersion)
func isClientValid(client *osdclient.Client) (bool, error) {
if client == nil {
return false, nil
}

_, err := client.Versions(osdapi.OsdVolumePath)
if err != nil {
glog.Errorf("portworx client failed driver versions check. Err: %v", err)
return false, err
}

return true, nil
}

func createDriverClient(hostname string) (*osdclient.Client, error) {
client, err := volumeclient.NewDriverClient("http://"+hostname+":"+osdMgmtPort,
pxdDriverName, osdDriverVersion)
if err != nil {
return nil, err
}

if isValid, err := isClientValid(client); isValid {
return client, nil
} else {
return nil, err
}
}

func (util *PortworxVolumeUtil) getPortworxDriver(volumeHost volume.VolumeHost) (volumeapi.VolumeDriver, error) {
if isValid, _ := isClientValid(util.portworxClient); isValid {
return volumeclient.VolumeDriver(util.portworxClient), nil
}

// create new client
var err error
util.portworxClient, err = createDriverClient(volumeHost.GetHostName()) // for backward compatibility
if err != nil || util.portworxClient == nil {
// Create client from portworx service
kubeClient := volumeHost.GetKubeClient()
if kubeClient == nil {
glog.Error("Failed to get kubeclient when creating portworx client")
return nil, nil
}

opts := metav1.GetOptions{}
svc, err := kubeClient.CoreV1().Services(api.NamespaceSystem).Get(pxServiceName, opts)
if err != nil {
glog.Errorf("Failed to get service. Err: %v", err)
return nil, err
}
util.portworxClient = driverClient

if svc == nil {
glog.Errorf("Service: %v not found. Consult Portworx docs to deploy it.", pxServiceName)
return nil, err
}

util.portworxClient, err = createDriverClient(svc.Spec.ClusterIP)
if err != nil || util.portworxClient == nil {
glog.Errorf("Failed to connect to portworx service. Err: %v", err)
return nil, err
}

glog.Infof("Using portworx service at: %v as api endpoint", svc.Spec.ClusterIP)
} else {
glog.Infof("Using portworx service at: %v as api endpoint", volumeHost.GetHostName())
}

return volumeclient.VolumeDriver(util.portworxClient), nil
Expand Down

0 comments on commit b017a7a

Please sign in to comment.