Skip to content

Commit

Permalink
Merge pull request #36683 from saad-ali/automated-cherry-pick-of-#34859-
Browse files Browse the repository at this point in the history
#35883-upstream-release-1.4

Automated cherry pick of #34859 #35883 upstream release 1.4
  • Loading branch information
jessfraz authored Nov 12, 2016
2 parents a076c79 + a0714b0 commit b953db2
Show file tree
Hide file tree
Showing 32 changed files with 885 additions and 228 deletions.
162 changes: 91 additions & 71 deletions pkg/cloudprovider/providers/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"io"
"net"
"net/url"
"regexp"
"strconv"
"strings"
Expand Down Expand Up @@ -299,28 +298,31 @@ type Volumes interface {
// Attach the disk to the specified instance
// instanceName can be empty to mean "the instance on which we are running"
// Returns the device (e.g. /dev/xvdf) where we attached the volume
AttachDisk(diskName string, instanceName string, readOnly bool) (string, error)
// Detach the disk from the specified instance
// instanceName can be empty to mean "the instance on which we are running"
AttachDisk(diskName KubernetesVolumeID, nodeName string, readOnly bool) (string, error)
// Detach the disk from the node with the specified NodeName
// nodeName can be empty to mean "the instance on which we are running"
// Returns the device where the volume was attached
DetachDisk(diskName string, instanceName string) (string, error)
DetachDisk(diskName KubernetesVolumeID, nodeName string) (string, error)

// Create a volume with the specified options
CreateDisk(volumeOptions *VolumeOptions) (volumeName string, err error)
CreateDisk(volumeOptions *VolumeOptions) (volumeName KubernetesVolumeID, err error)
// Delete the specified volume
// Returns true iff the volume was deleted
// If the was not found, returns (false, nil)
DeleteDisk(volumeName string) (bool, error)
DeleteDisk(volumeName KubernetesVolumeID) (bool, error)

// Get labels to apply to volume on creation
GetVolumeLabels(volumeName string) (map[string]string, error)
GetVolumeLabels(volumeName KubernetesVolumeID) (map[string]string, error)

// Get volume's disk path from volume name
// return the device path where the volume is attached
GetDiskPath(volumeName string) (string, error)
GetDiskPath(volumeName KubernetesVolumeID) (string, error)

// Check if the volume is already attached to the node with the specified NodeName
DiskIsAttached(diskName KubernetesVolumeID, nodeName string) (bool, error)

// Check if the volume is already attached to the instance
DiskIsAttached(diskName, instanceID string) (bool, error)
// Check if a list of volumes are attached to the node with the specified NodeName
DisksAreAttached(diskNames []KubernetesVolumeID, nodeName string) (map[KubernetesVolumeID]bool, error)
}

// InstanceGroups is an interface for managing cloud-managed instance groups / autoscaling instance groups
Expand Down Expand Up @@ -362,7 +364,7 @@ type Cloud struct {
// attached, to avoid a race condition where we assign a device mapping
// and then get a second request before we attach the volume
attachingMutex sync.Mutex
attaching map[ /*nodeName*/ string]map[mountDevice]string
attaching map[string]map[mountDevice]awsVolumeID
}

var _ Volumes = &Cloud{}
Expand Down Expand Up @@ -795,7 +797,7 @@ func newAWSCloud(config io.Reader, awsServices Services) (*Cloud, error) {
cfg: cfg,
region: regionName,

attaching: make(map[string]map[mountDevice]string),
attaching: make(map[string]map[mountDevice]awsVolumeID),
}

selfAWSInstance, err := awsCloud.buildSelfAWSInstance()
Expand Down Expand Up @@ -1166,7 +1168,7 @@ func (i *awsInstance) describeInstance() (*ec2.Instance, error) {
// Gets the mountDevice already assigned to the volume, or assigns an unused mountDevice.
// If the volume is already assigned, this will return the existing mountDevice with alreadyAttached=true.
// Otherwise the mountDevice is assigned by finding the first available mountDevice, and it is returned with alreadyAttached=false.
func (c *Cloud) getMountDevice(i *awsInstance, volumeID string, assign bool) (assigned mountDevice, alreadyAttached bool, err error) {
func (c *Cloud) getMountDevice(i *awsInstance, volumeID awsVolumeID, assign bool) (assigned mountDevice, alreadyAttached bool, err error) {
instanceType := i.getInstanceType()
if instanceType == nil {
return "", false, fmt.Errorf("could not get instance type for instance: %s", i.awsID)
Expand All @@ -1176,7 +1178,7 @@ func (c *Cloud) getMountDevice(i *awsInstance, volumeID string, assign bool) (as
if err != nil {
return "", false, err
}
deviceMappings := map[mountDevice]string{}
deviceMappings := map[mountDevice]awsVolumeID{}
for _, blockDevice := range info.BlockDeviceMappings {
name := aws.StringValue(blockDevice.DeviceName)
if strings.HasPrefix(name, "/dev/sd") {
Expand All @@ -1188,7 +1190,7 @@ func (c *Cloud) getMountDevice(i *awsInstance, volumeID string, assign bool) (as
if len(name) < 1 || len(name) > 2 {
glog.Warningf("Unexpected EBS DeviceName: %q", aws.StringValue(blockDevice.DeviceName))
}
deviceMappings[mountDevice(name)] = aws.StringValue(blockDevice.Ebs.VolumeId)
deviceMappings[mountDevice(name)] = awsVolumeID(aws.StringValue(blockDevice.Ebs.VolumeId))
}

// We lock to prevent concurrent mounts from conflicting
Expand Down Expand Up @@ -1234,7 +1236,7 @@ func (c *Cloud) getMountDevice(i *awsInstance, volumeID string, assign bool) (as

attaching := c.attaching[i.nodeName]
if attaching == nil {
attaching = make(map[mountDevice]string)
attaching = make(map[mountDevice]awsVolumeID)
c.attaching[i.nodeName] = attaching
}
attaching[chosen] = volumeID
Expand All @@ -1245,7 +1247,7 @@ func (c *Cloud) getMountDevice(i *awsInstance, volumeID string, assign bool) (as

// endAttaching removes the entry from the "attachments in progress" map
// It returns true if it was found (and removed), false otherwise
func (c *Cloud) endAttaching(i *awsInstance, volumeID string, mountDevice mountDevice) bool {
func (c *Cloud) endAttaching(i *awsInstance, volumeID awsVolumeID, mountDevice mountDevice) bool {
c.attachingMutex.Lock()
defer c.attachingMutex.Unlock()

Expand All @@ -1266,44 +1268,16 @@ type awsDisk struct {
ec2 EC2

// Name in k8s
name string
name KubernetesVolumeID
// id in AWS
awsID string
awsID awsVolumeID
}

func newAWSDisk(aws *Cloud, name string) (*awsDisk, error) {
// name looks like aws://availability-zone/id

// The original idea of the URL-style name was to put the AZ into the
// host, so we could find the AZ immediately from the name without
// querying the API. But it turns out we don't actually need it for
// multi-AZ clusters, as we put the AZ into the labels on the PV instead.
// However, if in future we want to support multi-AZ cluster
// volume-awareness without using PersistentVolumes, we likely will
// want the AZ in the host.

if !strings.HasPrefix(name, "aws://") {
name = "aws://" + "" + "/" + name
}
url, err := url.Parse(name)
func newAWSDisk(aws *Cloud, name KubernetesVolumeID) (*awsDisk, error) {
awsID, err := name.mapToAWSVolumeID()
if err != nil {
// TODO: Maybe we should pass a URL into the Volume functions
return nil, fmt.Errorf("Invalid disk name (%s): %v", name, err)
}
if url.Scheme != "aws" {
return nil, fmt.Errorf("Invalid scheme for AWS volume (%s)", name)
}

awsID := url.Path
if len(awsID) > 1 && awsID[0] == '/' {
awsID = awsID[1:]
}

// TODO: Regex match?
if strings.Contains(awsID, "/") || !strings.HasPrefix(awsID, "vol-") {
return nil, fmt.Errorf("Invalid format for AWS volume (%s)", name)
return nil, err
}

disk := &awsDisk{ec2: aws.ec2, name: name, awsID: awsID}
return disk, nil
}
Expand All @@ -1313,7 +1287,7 @@ func (d *awsDisk) describeVolume() (*ec2.Volume, error) {
volumeID := d.awsID

request := &ec2.DescribeVolumesInput{
VolumeIds: []*string{&volumeID},
VolumeIds: []*string{volumeID.awsString()},
}

volumes, err := d.ec2.DescribeVolumes(request)
Expand Down Expand Up @@ -1394,7 +1368,7 @@ func (d *awsDisk) waitForAttachmentStatus(status string) (*ec2.VolumeAttachment,

// Deletes the EBS disk
func (d *awsDisk) deleteVolume() (bool, error) {
request := &ec2.DeleteVolumeInput{VolumeId: aws.String(d.awsID)}
request := &ec2.DeleteVolumeInput{VolumeId: d.awsID.awsString()}
_, err := d.ec2.DeleteVolume(request)
if err != nil {
if awsError, ok := err.(awserr.Error); ok {
Expand Down Expand Up @@ -1451,7 +1425,7 @@ func (c *Cloud) getAwsInstance(nodeName string) (*awsInstance, error) {
}

// AttachDisk implements Volumes.AttachDisk
func (c *Cloud) AttachDisk(diskName string, instanceName string, readOnly bool) (string, error) {
func (c *Cloud) AttachDisk(diskName KubernetesVolumeID, instanceName string, readOnly bool) (string, error) {
disk, err := newAWSDisk(c, diskName)
if err != nil {
return "", err
Expand Down Expand Up @@ -1499,7 +1473,7 @@ func (c *Cloud) AttachDisk(diskName string, instanceName string, readOnly bool)
request := &ec2.AttachVolumeInput{
Device: aws.String(ec2Device),
InstanceId: aws.String(awsInstance.awsID),
VolumeId: aws.String(disk.awsID),
VolumeId: disk.awsID.awsString(),
}

attachResponse, err := c.ec2.AttachVolume(request)
Expand Down Expand Up @@ -1538,7 +1512,7 @@ func (c *Cloud) AttachDisk(diskName string, instanceName string, readOnly bool)
}

// DetachDisk implements Volumes.DetachDisk
func (c *Cloud) DetachDisk(diskName string, instanceName string) (string, error) {
func (c *Cloud) DetachDisk(diskName KubernetesVolumeID, instanceName string) (string, error) {
disk, err := newAWSDisk(c, diskName)
if err != nil {
return "", err
Expand Down Expand Up @@ -1570,7 +1544,7 @@ func (c *Cloud) DetachDisk(diskName string, instanceName string) (string, error)

request := ec2.DetachVolumeInput{
InstanceId: &awsInstance.awsID,
VolumeId: &disk.awsID,
VolumeId: disk.awsID.awsString(),
}

response, err := c.ec2.DetachVolume(&request)
Expand Down Expand Up @@ -1601,7 +1575,7 @@ func (c *Cloud) DetachDisk(diskName string, instanceName string) (string, error)
}

// CreateDisk implements Volumes.CreateDisk
func (c *Cloud) CreateDisk(volumeOptions *VolumeOptions) (string, error) {
func (c *Cloud) CreateDisk(volumeOptions *VolumeOptions) (KubernetesVolumeID, error) {
allZones, err := c.getAllZones()
if err != nil {
return "", fmt.Errorf("error querying for all zones: %v", err)
Expand Down Expand Up @@ -1659,10 +1633,11 @@ func (c *Cloud) CreateDisk(volumeOptions *VolumeOptions) (string, error) {
return "", err
}

az := orEmpty(response.AvailabilityZone)
awsID := orEmpty(response.VolumeId)

volumeName := "aws://" + az + "/" + awsID
awsID := awsVolumeID(aws.StringValue(response.VolumeId))
if awsID == "" {
return "", fmt.Errorf("VolumeID was not returned by CreateVolume")
}
volumeName := KubernetesVolumeID("aws://" + aws.StringValue(response.AvailabilityZone) + "/" + string(awsID))

// apply tags
tags := make(map[string]string)
Expand All @@ -1675,7 +1650,7 @@ func (c *Cloud) CreateDisk(volumeOptions *VolumeOptions) (string, error) {
}

if len(tags) != 0 {
if err := c.createTags(awsID, tags); err != nil {
if err := c.createTags(string(awsID), tags); err != nil {
// delete the volume and hope it succeeds
_, delerr := c.DeleteDisk(volumeName)
if delerr != nil {
Expand All @@ -1689,7 +1664,7 @@ func (c *Cloud) CreateDisk(volumeOptions *VolumeOptions) (string, error) {
}

// DeleteDisk implements Volumes.DeleteDisk
func (c *Cloud) DeleteDisk(volumeName string) (bool, error) {
func (c *Cloud) DeleteDisk(volumeName KubernetesVolumeID) (bool, error) {
awsDisk, err := newAWSDisk(c, volumeName)
if err != nil {
return false, err
Expand All @@ -1698,7 +1673,7 @@ func (c *Cloud) DeleteDisk(volumeName string) (bool, error) {
}

// GetVolumeLabels implements Volumes.GetVolumeLabels
func (c *Cloud) GetVolumeLabels(volumeName string) (map[string]string, error) {
func (c *Cloud) GetVolumeLabels(volumeName KubernetesVolumeID) (map[string]string, error) {
awsDisk, err := newAWSDisk(c, volumeName)
if err != nil {
return nil, err
Expand All @@ -1724,7 +1699,7 @@ func (c *Cloud) GetVolumeLabels(volumeName string) (map[string]string, error) {
}

// GetDiskPath implements Volumes.GetDiskPath
func (c *Cloud) GetDiskPath(volumeName string) (string, error) {
func (c *Cloud) GetDiskPath(volumeName KubernetesVolumeID) (string, error) {
awsDisk, err := newAWSDisk(c, volumeName)
if err != nil {
return "", err
Expand All @@ -1740,34 +1715,79 @@ func (c *Cloud) GetDiskPath(volumeName string) (string, error) {
}

// DiskIsAttached implements Volumes.DiskIsAttached
func (c *Cloud) DiskIsAttached(diskName, instanceID string) (bool, error) {
awsInstance, err := c.getAwsInstance(instanceID)
func (c *Cloud) DiskIsAttached(diskName KubernetesVolumeID, nodeName string) (bool, error) {
awsInstance, err := c.getAwsInstance(nodeName)
if err != nil {
if err == cloudprovider.InstanceNotFound {
// If instance no longer exists, safe to assume volume is not attached.
glog.Warningf(
"Instance %q does not exist. DiskIsAttached will assume disk %q is not attached to it.",
instanceID,
nodeName,
diskName)
return false, nil
}

return false, err
}

diskID, err := diskName.mapToAWSVolumeID()
if err != nil {
return false, fmt.Errorf("error mapping volume spec %q to aws id: %v", diskName, err)
}

info, err := awsInstance.describeInstance()
if err != nil {
return false, err
}
for _, blockDevice := range info.BlockDeviceMappings {
name := aws.StringValue(blockDevice.Ebs.VolumeId)
if name == diskName {
id := awsVolumeID(aws.StringValue(blockDevice.Ebs.VolumeId))
if id == diskID {
return true, nil
}
}
return false, nil
}

func (c *Cloud) DisksAreAttached(diskNames []KubernetesVolumeID, nodeName string) (map[KubernetesVolumeID]bool, error) {
idToDiskName := make(map[awsVolumeID]KubernetesVolumeID)
attached := make(map[KubernetesVolumeID]bool)
for _, diskName := range diskNames {
volumeID, err := diskName.mapToAWSVolumeID()
if err != nil {
return nil, fmt.Errorf("error mapping volume spec %q to aws id: %v", diskName, err)
}
idToDiskName[volumeID] = diskName
attached[diskName] = false
}
awsInstance, err := c.getAwsInstance(nodeName)
if err != nil {
if err == cloudprovider.InstanceNotFound {
// If instance no longer exists, safe to assume volume is not attached.
glog.Warningf(
"Node %q does not exist. DisksAreAttached will assume disks %v are not attached to it.",
nodeName,
diskNames)
return attached, nil
}

return attached, err
}
info, err := awsInstance.describeInstance()
if err != nil {
return attached, err
}
for _, blockDevice := range info.BlockDeviceMappings {
volumeID := awsVolumeID(aws.StringValue(blockDevice.Ebs.VolumeId))
diskName, found := idToDiskName[volumeID]
if found {
// Disk is still attached to node
attached[diskName] = true
}
}

return attached, nil
}

// Gets the current load balancer state
func (c *Cloud) describeLoadBalancer(name string) (*elb.LoadBalancerDescription, error) {
request := &elb.DescribeLoadBalancersInput{}
Expand Down
8 changes: 4 additions & 4 deletions pkg/cloudprovider/providers/aws/aws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1162,16 +1162,16 @@ func TestGetVolumeLabels(t *testing.T) {
awsServices := NewFakeAWSServices()
c, err := newAWSCloud(strings.NewReader("[global]"), awsServices)
assert.Nil(t, err, "Error building aws cloud: %v", err)
volumeId := aws.String("vol-VolumeId")
expectedVolumeRequest := &ec2.DescribeVolumesInput{VolumeIds: []*string{volumeId}}
volumeId := awsVolumeID("vol-VolumeId")
expectedVolumeRequest := &ec2.DescribeVolumesInput{VolumeIds: []*string{volumeId.awsString()}}
awsServices.ec2.On("DescribeVolumes", expectedVolumeRequest).Return([]*ec2.Volume{
{
VolumeId: volumeId,
VolumeId: volumeId.awsString(),
AvailabilityZone: aws.String("us-east-1a"),
},
})

labels, err := c.GetVolumeLabels(*volumeId)
labels, err := c.GetVolumeLabels(KubernetesVolumeID("aws:///" + string(volumeId)))

assert.Nil(t, err, "Error creating Volume %v", err)
assert.Equal(t, map[string]string{
Expand Down
Loading

0 comments on commit b953db2

Please sign in to comment.