Skip to content

Commit

Permalink
Merge pull request #27553 from justinsb/pvc_zone_spreading_2
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue

AWS/GCE: Spread PetSet volume creation across zones, create GCE volumes in non-master zones

Long term we plan on integrating this into the scheduler, but in the
short term we use the volume name to place it onto a zone.
    
We hash the volume name so we don't bias to the first few zones.
    
If the volume name "looks like" a PetSet volume name (ending with
-<number>) then we use the number as an offset.  In that case we hash
the base name.
  • Loading branch information
k8s-merge-robot authored Jun 22, 2016
2 parents 8d7d0cd + dd94997 commit 07471cf
Show file tree
Hide file tree
Showing 9 changed files with 198 additions and 15 deletions.
56 changes: 52 additions & 4 deletions pkg/cloudprovider/providers/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
aws_credentials "k8s.io/kubernetes/pkg/credentialprovider/aws"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/volume"
)

const ProviderName = "aws"
Expand Down Expand Up @@ -197,6 +198,7 @@ type EC2Metadata interface {
type VolumeOptions struct {
CapacityGB int
Tags map[string]string
PVCName string
}

// Volumes is an interface for managing cloud-provisioned volumes
Expand Down Expand Up @@ -913,6 +915,49 @@ func (aws *AWSCloud) List(filter string) ([]string, error) {
return aws.getInstancesByRegex(filter)
}

// getAllZones retrieves a list of all the zones in which nodes are running
// It currently involves querying all instances
func (c *AWSCloud) getAllZones() (sets.String, error) {
// We don't currently cache this; it is currently used only in volume
// creation which is expected to be a comparatively rare occurence.

// TODO: Caching / expose api.Nodes to the cloud provider?
// TODO: We could also query for subnets, I think

filters := []*ec2.Filter{newEc2Filter("instance-state-name", "running")}
filters = c.addFilters(filters)
request := &ec2.DescribeInstancesInput{
Filters: filters,
}

instances, err := c.ec2.DescribeInstances(request)
if err != nil {
return nil, err
}
if len(instances) == 0 {
return nil, fmt.Errorf("no instances returned")
}

zones := sets.NewString()

for _, instance := range instances {
// Only return fully-ready instances when listing instances
// (vs a query by name, where we will return it if we find it)
if orEmpty(instance.State.Name) == "pending" {
glog.V(2).Infof("Skipping EC2 instance (pending): %s", *instance.InstanceId)
continue
}

if instance.Placement != nil {
zone := aws.StringValue(instance.Placement.AvailabilityZone)
zones.Insert(zone)
}
}

glog.V(2).Infof("Found instances in zones %s", zones)
return zones, nil
}

// GetZone implements Zones.GetZone
func (c *AWSCloud) GetZone() (cloudprovider.Zone, error) {
return cloudprovider.Zone{
Expand Down Expand Up @@ -1383,11 +1428,14 @@ func (aws *AWSCloud) DetachDisk(diskName string, instanceName string) (string, e
return hostDevicePath, err
}

// Implements Volumes.CreateVolume
// CreateDisk implements Volumes.CreateDisk
func (s *AWSCloud) CreateDisk(volumeOptions *VolumeOptions) (string, error) {
// Default to creating in the current zone
// TODO: Spread across zones?
createAZ := s.selfAWSInstance.availabilityZone
allZones, err := s.getAllZones()
if err != nil {
return "", fmt.Errorf("error querying for all zones: %v", err)
}

createAZ := volume.ChooseZoneForVolume(allZones, volumeOptions.PVCName)

// TODO: Should we tag this with the cluster id (so it gets deleted when the cluster does?)
request := &ec2.CreateVolumeInput{}
Expand Down
76 changes: 70 additions & 6 deletions pkg/cloudprovider/providers/gce/gce.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ type Disks interface {

// GetAutoLabelsForPD returns labels to apply to PersistentVolume
// representing this PD, namely failure domain and zone.
GetAutoLabelsForPD(name string) (map[string]string, error)
// zone can be provided to specify the zone for the PD,
// if empty all managed zones will be searched.
GetAutoLabelsForPD(name string, zone string) (map[string]string, error)
}

func init() {
Expand Down Expand Up @@ -2069,6 +2071,48 @@ func (gce *GCECloud) List(filter string) ([]string, error) {
return instances, nil
}

// GetAllZones returns all the zones in which nodes are running
func (gce *GCECloud) GetAllZones() (sets.String, error) {
// Fast-path for non-multizone
if len(gce.managedZones) == 1 {
return sets.NewString(gce.managedZones...), nil
}

// TODO: Caching, but this is currently only called when we are creating a volume,
// which is a relatively infrequent operation, and this is only # zones API calls
zones := sets.NewString()

// TODO: Parallelize, although O(zones) so not too bad (N <= 3 typically)
for _, zone := range gce.managedZones {
// We only retrieve one page in each zone - we only care about existence
listCall := gce.service.Instances.List(gce.projectID, zone)

// No filter: We assume that a zone is either used or unused
// We could only consider running nodes (like we do in List above),
// but probably if instances are starting we still want to consider them.
// I think we should wait until we have a reason to make the
// call one way or the other; we generally can't guarantee correct
// volume spreading if the set of zones is changing
// (and volume spreading is currently only a heuristic).
// Long term we want to replace GetAllZones (which primarily supports volume
// spreading) with a scheduler policy that is able to see the global state of
// volumes and the health of zones.

// Just a minimal set of fields - we only care about existence
listCall = listCall.Fields("items(name)")

res, err := listCall.Do()
if err != nil {
return nil, err
}
if len(res.Items) != 0 {
zones.Insert(zone)
}
}

return zones, nil
}

func getMetadataValue(metadata *compute.Metadata, key string) (string, bool) {
for _, item := range metadata.Items {
if item.Key == key {
Expand Down Expand Up @@ -2221,13 +2265,33 @@ func (gce *GCECloud) DeleteDisk(diskToDelete string) error {
// Builds the labels that should be automatically added to a PersistentVolume backed by a GCE PD
// Specifically, this builds FailureDomain (zone) and Region labels.
// The PersistentVolumeLabel admission controller calls this and adds the labels when a PV is created.
func (gce *GCECloud) GetAutoLabelsForPD(name string) (map[string]string, error) {
disk, err := gce.getDiskByNameUnknownZone(name)
if err != nil {
return nil, err
// If zone is specified, the volume will only be found in the specified zone,
// otherwise all managed zones will be searched.
func (gce *GCECloud) GetAutoLabelsForPD(name string, zone string) (map[string]string, error) {
var disk *gceDisk
var err error
if zone == "" {
// We would like as far as possible to avoid this case,
// because GCE doesn't guarantee that volumes are uniquely named per region,
// just per zone. However, creation of GCE PDs was originally done only
// by name, so we have to continue to support that.
// However, wherever possible the zone should be passed (and it is passed
// for most cases that we can control, e.g. dynamic volume provisioning)
disk, err = gce.getDiskByNameUnknownZone(name)
if err != nil {
return nil, err
}
zone = disk.Zone
} else {
// We could assume the disks exists; we have all the information we need
// However it is more consistent to ensure the disk exists,
// and in future we may gather addition information (e.g. disk type, IOPS etc)
disk, err = gce.getDiskByName(name, zone)
if err != nil {
return nil, err
}
}

zone := disk.Zone
region, err := GetGCERegion(zone)
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/persistentvolume/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1131,6 +1131,7 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(claimObj interfa
CloudTags: &tags,
ClusterName: ctrl.clusterName,
PVName: pvName,
PVCName: claim.Name,
}

// Provision the volume
Expand Down
1 change: 1 addition & 0 deletions pkg/volume/aws_ebs/aws_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (util *AWSDiskUtil) CreateVolume(c *awsElasticBlockStoreProvisioner) (strin
volumeOptions := &aws.VolumeOptions{
CapacityGB: requestGB,
Tags: tags,
PVCName: c.options.PVCName,
}

name, err := cloud.CreateDisk(volumeOptions)
Expand Down
2 changes: 1 addition & 1 deletion pkg/volume/gce_pd/attacher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,6 @@ func (testcase *testcase) DeleteDisk(diskToDelete string) error {
return errors.New("Not implemented")
}

func (testcase *testcase) GetAutoLabelsForPD(name string) (map[string]string, error) {
func (testcase *testcase) GetAutoLabelsForPD(name string, zone string) (map[string]string, error) {
return map[string]string{}, errors.New("Not implemented")
}
8 changes: 5 additions & 3 deletions pkg/volume/gce_pd/gce_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,20 +82,22 @@ func (gceutil *GCEDiskUtil) CreateVolume(c *gcePersistentDiskProvisioner) (strin

// The disk will be created in the zone in which this code is currently running
// TODO: We should support auto-provisioning volumes in multiple/specified zones
zone, err := cloud.GetZone()
zones, err := cloud.GetAllZones()
if err != nil {
glog.V(2).Infof("error getting zone information from GCE: %v", err)
return "", 0, nil, err
}

err = cloud.CreateDisk(name, zone.FailureDomain, int64(requestGB), *c.options.CloudTags)
zone := volume.ChooseZoneForVolume(zones, c.options.PVCName)

err = cloud.CreateDisk(name, zone, int64(requestGB), *c.options.CloudTags)
if err != nil {
glog.V(2).Infof("Error creating GCE PD volume: %v", err)
return "", 0, nil, err
}
glog.V(2).Infof("Successfully created GCE PD volume %s", name)

labels, err := cloud.GetAutoLabelsForPD(name)
labels, err := cloud.GetAutoLabelsForPD(name, zone)
if err != nil {
// We don't really want to leak the volume here...
glog.Errorf("error getting labels for volume %q: %v", name, err)
Expand Down
2 changes: 2 additions & 0 deletions pkg/volume/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type VolumeOptions struct {
// PV.Name of the appropriate PersistentVolume. Used to generate cloud
// volume name.
PVName string
// PVC.Name of the PersistentVolumeClaim; only set during dynamic provisioning.
PVCName string
// Unique name of Kubernetes cluster.
ClusterName string
// Tags to attach to the real volume in the cloud provider - e.g. AWS EBS
Expand Down
61 changes: 61 additions & 0 deletions pkg/volume/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,13 @@ import (
"k8s.io/kubernetes/pkg/watch"

"github.com/golang/glog"
"hash/fnv"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/util/sets"
"math/rand"
"strconv"
"strings"
)

// RecycleVolumeByWatchingPodUntilCompletion is intended for use with volume
Expand Down Expand Up @@ -187,3 +192,59 @@ func GenerateVolumeName(clusterName, pvName string, maxLength int) string {
}
return prefix + "-" + pvName
}

// ChooseZone implements our heuristics for choosing a zone for volume creation based on the volume name
// Volumes are generally round-robin-ed across all active zones, using the hash of the PVC Name.
// However, if the PVCName ends with `-<integer>`, we will hash the prefix, and then add the integer to the hash.
// This means that a PetSet's volumes (`claimname-petsetname-id`) will spread across available zones,
// assuming the id values are consecutive.
func ChooseZoneForVolume(zones sets.String, pvcName string) string {
// We create the volume in a zone determined by the name
// Eventually the scheduler will coordinate placement into an available zone
var hash uint32
var index uint32

if pvcName == "" {
// We should always be called with a name; this shouldn't happen
glog.Warningf("No name defined during volume create; choosing random zone")

hash = rand.Uint32()
} else {
hashString := pvcName

// Heuristic to make sure that volumes in a PetSet are spread across zones
// PetSet PVCs are (currently) named ClaimName-PetSetName-Id,
// where Id is an integer index
lastDash := strings.LastIndexByte(pvcName, '-')
if lastDash != -1 {
petIDString := pvcName[lastDash+1:]
petID, err := strconv.ParseUint(petIDString, 10, 32)
if err == nil {
// Offset by the pet id, so we round-robin across zones
index = uint32(petID)
// We still hash the volume name, but only the base
hashString = pvcName[:lastDash]
glog.V(2).Infof("Detected PetSet-style volume name %q; index=%d", pvcName, index)
}
}

// We hash the (base) volume name, so we don't bias towards the first N zones
h := fnv.New32()
h.Write([]byte(hashString))
hash = h.Sum32()
}

// Zones.List returns zones in a consistent order (sorted)
// We do have a potential failure case where volumes will not be properly spread,
// if the set of zones changes during PetSet volume creation. However, this is
// probably relatively unlikely because we expect the set of zones to be essentially
// static for clusters.
// Hopefully we can address this problem if/when we do full scheduler integration of
// PVC placement (which could also e.g. avoid putting volumes in overloaded or
// unhealthy zones)
zoneSlice := zones.List()
zone := zoneSlice[(hash+index)%uint32(len(zoneSlice))]

glog.V(2).Infof("Creating volume for PVC %q; chose zone=%q from zones=%q", pvcName, zone, zoneSlice)
return zone
}
6 changes: 5 additions & 1 deletion plugin/pkg/admission/persistentvolume/label/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"k8s.io/kubernetes/pkg/admission"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
Expand Down Expand Up @@ -159,7 +160,10 @@ func (l *persistentVolumeLabel) findGCEPDLabels(volume *api.PersistentVolume) (m
return nil, fmt.Errorf("unable to build GCE cloud provider for PD")
}

labels, err := provider.GetAutoLabelsForPD(volume.Spec.GCEPersistentDisk.PDName)
// If the zone is already labeled, honor the hint
zone := volume.Labels[unversioned.LabelZoneFailureDomain]

labels, err := provider.GetAutoLabelsForPD(volume.Spec.GCEPersistentDisk.PDName, zone)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 07471cf

Please sign in to comment.