From 6dc63f805f8be367f87ca97b34c48a619026cbac Mon Sep 17 00:00:00 2001 From: Alex Mohr Date: Tue, 15 Mar 2016 15:47:32 -0700 Subject: [PATCH] Add a rate limiter to the GCE cloudprovider It will poll for operation completion with at most 10 qps to avoid triggering GCE's rate limits. --- pkg/cloudprovider/providers/gce/gce.go | 56 ++++++++++++++++---------- 1 file changed, 34 insertions(+), 22 deletions(-) diff --git a/pkg/cloudprovider/providers/gce/gce.go b/pkg/cloudprovider/providers/gce/gce.go index bf458ce8e21ae..53f89f816f371 100644 --- a/pkg/cloudprovider/providers/gce/gce.go +++ b/pkg/cloudprovider/providers/gce/gce.go @@ -34,6 +34,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util" utilerrors "k8s.io/kubernetes/pkg/util/errors" netsets "k8s.io/kubernetes/pkg/util/net/sets" "k8s.io/kubernetes/pkg/util/sets" @@ -72,14 +73,15 @@ const ( // GCECloud is an implementation of Interface, LoadBalancer and Instances for Google Compute Engine. type GCECloud struct { - service *compute.Service - containerService *container.Service - projectID string - region string - localZone string // The zone in which we are running - managedZones []string // List of zones we are spanning (for Ubernetes-Lite, primarily when running on master) - networkURL string - useMetadataServer bool + service *compute.Service + containerService *container.Service + projectID string + region string + localZone string // The zone in which we are running + managedZones []string // List of zones we are spanning (for Ubernetes-Lite, primarily when running on master) + networkURL string + useMetadataServer bool + operationPollRateLimiter util.RateLimiter } type Config struct { @@ -296,15 +298,18 @@ func CreateGCECloud(projectID, region, zone string, managedZones []string, netwo glog.Infof("managing multiple zones: %v", managedZones) } + operationPollRateLimiter := util.NewTokenBucketRateLimiter(10, 100) // 10 qps, 100 bucket size. + return &GCECloud{ - service: svc, - containerService: containerSvc, - projectID: projectID, - region: region, - localZone: zone, - managedZones: managedZones, - networkURL: networkURL, - useMetadataServer: useMetadataServer, + service: svc, + containerService: containerSvc, + projectID: projectID, + region: region, + localZone: zone, + managedZones: managedZones, + networkURL: networkURL, + useMetadataServer: useMetadataServer, + operationPollRateLimiter: operationPollRateLimiter, }, nil } @@ -373,7 +378,7 @@ func (gce *GCECloud) targetPoolURL(name, region string) string { return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/targetPools/%s", gce.projectID, region, name) } -func waitForOp(op *compute.Operation, getOperation func(operationName string) (*compute.Operation, error)) error { +func (gce *GCECloud) waitForOp(op *compute.Operation, getOperation func(operationName string) (*compute.Operation, error)) error { if op == nil { return fmt.Errorf("operation must not be nil") } @@ -384,9 +389,15 @@ func waitForOp(op *compute.Operation, getOperation func(operationName string) (* opName := op.Name return wait.Poll(operationPollInterval, operationPollTimeoutDuration, func() (bool, error) { + start := time.Now() + gce.operationPollRateLimiter.Accept() + duration := time.Now().Sub(start) + if duration > 5*time.Second { + glog.Infof("pollOperation: waited %v for %v", duration, opName) + } pollOp, err := getOperation(opName) if err != nil { - glog.Warningf("GCE poll operation failed: %v", err) + glog.Warningf("GCE poll operation %s failed: pollOp: [%v] err: [%v] getErrorFromOp: [%v]", opName, pollOp, err, getErrorFromOp(pollOp)) } return opIsDone(pollOp), getErrorFromOp(pollOp) }) @@ -410,19 +421,19 @@ func getErrorFromOp(op *compute.Operation) error { } func (gce *GCECloud) waitForGlobalOp(op *compute.Operation) error { - return waitForOp(op, func(operationName string) (*compute.Operation, error) { + return gce.waitForOp(op, func(operationName string) (*compute.Operation, error) { return gce.service.GlobalOperations.Get(gce.projectID, operationName).Do() }) } func (gce *GCECloud) waitForRegionOp(op *compute.Operation, region string) error { - return waitForOp(op, func(operationName string) (*compute.Operation, error) { + return gce.waitForOp(op, func(operationName string) (*compute.Operation, error) { return gce.service.RegionOperations.Get(gce.projectID, region, operationName).Do() }) } func (gce *GCECloud) waitForZoneOp(op *compute.Operation, zone string) error { - return waitForOp(op, func(operationName string) (*compute.Operation, error) { + return gce.waitForOp(op, func(operationName string) (*compute.Operation, error) { return gce.service.ZoneOperations.Get(gce.projectID, zone, operationName).Do() }) } @@ -2366,7 +2377,7 @@ func (gce *GCECloud) getInstanceByName(name string) (*gceInstance, error) { zone := gce.managedZones[0] res, err := gce.service.Instances.Get(gce.projectID, zone, name).Do() if err != nil { - glog.Errorf("Failed to retrieve TargetInstance resource for instance: %s", name) + glog.Errorf("getInstanceByName/single-zone: failed to get instance %s; err: %v", name, err) if isHTTPErrorCode(err, http.StatusNotFound) { return nil, cloudprovider.InstanceNotFound } @@ -2383,6 +2394,7 @@ func (gce *GCECloud) getInstanceByName(name string) (*gceInstance, error) { instances, err := gce.getInstancesByNames([]string{name}) if err != nil { + glog.Errorf("getInstanceByName/multiple-zones: failed to get instance %s; err: %v", name, err) return nil, err } if len(instances) != 1 || instances[0] == nil {