Skip to content

Commit

Permalink
feat(provider/gce): Limit GCE batch sizes. (spinnaker#3125)
Browse files Browse the repository at this point in the history
* feat(provider/gce): Limits GCE batch sizes.

Adds a new class to limit GCE batch sizes, adds a hook
to PaginatedRequest using the new batch class, and
exercises both using the regional server group caching
agent as the guinea pig.

* feat(provider/gce): Adds new GoogleBatchRequest to all callsites.

* chore(provider/gce): Adds instrumentation to new batches.

* chore(provider/gce): Update specs for new batching mechanism.

* refactor(provider/gce): Switch to local threadpool for new batches.
  • Loading branch information
jtk54 committed Nov 13, 2018
1 parent 92a6d37 commit a439b8f
Show file tree
Hide file tree
Showing 23 changed files with 318 additions and 229 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class AppengineUtils {
task.updateStatus phase, "Querying all versions for project $project..."
def services = queryAllServices(project, credentials, task, phase)

// TODO(jacobkiefer): Consider limiting batch sizes.
// https://github.com/spinnaker/spinnaker/issues/3564.
BatchRequest batch = credentials.appengine.batch()
def allVersions = []

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ class AppengineServerGroupCachingAgent extends AbstractAppengineCachingAgent imp
Map<Service, List<Version>> loadServerGroups() {
def project = credentials.project
def loadBalancers = credentials.appengine.apps().services().list(project).execute().getServices() ?: []
BatchRequest batch = credentials.appengine.batch()
BatchRequest batch = credentials.appengine.batch() // TODO(jacobkiefer): Consider limiting batch sizes. https://github.com/spinnaker/spinnaker/issues/3564.
Map<Service, List<Version>> serverGroupsByLoadBalancer = [:].withDefault { [] }

loadBalancers.each { loadBalancer ->
Expand Down
1 change: 1 addition & 0 deletions clouddriver-google-common/clouddriver-google-common.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ dependencies {
spinnaker.group('google')
spinnaker.group('fiat')
compile spinnaker.dependency('spectatorApi')
compile spinnaker.dependency('lombok')
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,16 @@
*/
package com.netflix.spinnaker.clouddriver.googlecommon

import com.google.api.client.googleapis.batch.BatchRequest
import com.google.api.client.googleapis.services.AbstractGoogleClientRequest
import com.google.api.client.http.HttpResponseException
import com.netflix.spectator.api.Clock
import com.netflix.spectator.api.Id
import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.clouddriver.googlecommon.batch.GoogleBatchRequest
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component

import java.util.concurrent.TimeUnit
import javax.annotation.PostConstruct
import java.util.concurrent.TimeUnit

/**
* Provides a static-ish means to wrap API execution calls with spectator metrics.
Expand Down Expand Up @@ -75,12 +74,11 @@ class GoogleExecutor {
final static String TAG_REGION = "region"
final static String TAG_SCOPE = "scope"
final static String TAG_ZONE = "zone"
final static String SCOPE_BATCH = "batch"
final static String SCOPE_GLOBAL = "global"
final static String SCOPE_REGIONAL = "regional"
final static String SCOPE_ZONAL = "zonal"

public static <T> T timeExecuteBatch(Registry spectator_registry, BatchRequest batch, String batchContext, String... tags) throws IOException {
public static <T> T timeExecuteBatch(Registry spectator_registry, GoogleBatchRequest batch, String batchContext, String... tags) throws IOException {
def batchSize = batch.size()
def success = "false"
Clock clock = spectator_registry.clock()
Expand Down Expand Up @@ -125,4 +123,4 @@ class GoogleExecutor {
}
return result
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright 2018 Google, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.clouddriver.googlecommon.batch;

import com.google.api.client.googleapis.batch.BatchRequest;
import com.google.api.client.googleapis.batch.json.JsonBatchCallback;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.services.compute.Compute;
import com.google.api.services.compute.ComputeRequest;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;

/**
* Helper class for sending batch requests to GCE.
*/
@Slf4j
public class GoogleBatchRequest {

private static final int MAX_BATCH_SIZE = 100; // Platform specified max to not overwhelm batch backends.
private static final int DEFAULT_CONNECT_TIMEOUT_MILLIS = (int) TimeUnit.MINUTES.toMillis(2);
private static final int DEFAULT_READ_TIMEOUT_MILLIS = (int) TimeUnit.MINUTES.toMillis(2);

private List<QueuedRequest> queuedRequests;
private String clouddriverUserAgentApplicationName;
private Compute compute;

public GoogleBatchRequest(Compute compute, String clouddriverUserAgentApplicationName) {
this.compute = compute;
this.clouddriverUserAgentApplicationName = clouddriverUserAgentApplicationName;
this.queuedRequests = new ArrayList<>();
}

public void execute() {
if (queuedRequests.size() == 0) {
log.debug("No requests queued in batch, exiting.");
return;
}

List<BatchRequest> queuedBatches = new ArrayList<>();
List<List<QueuedRequest>> requestPartitions = Lists.partition(queuedRequests, MAX_BATCH_SIZE);
requestPartitions.forEach(requestPart -> {
BatchRequest newBatch = newBatch();
requestPart.forEach(qr -> {
try {
qr.getRequest().queue(newBatch, qr.getCallback());
} catch (Exception ioe) {
log.error("Queueing request {} in batch failed.", qr, ioe);
}
});
queuedBatches.add(newBatch);
});

ExecutorService threadPool = new ForkJoinPool(10);
try {
threadPool.submit(() -> queuedBatches.stream().parallel().forEach(this::executeInternalBatch)).get();
} catch (Exception e) {
log.error("Executing queued batches failed.", e);
}
threadPool.shutdown();
}

private void executeInternalBatch(BatchRequest b) {
try {
b.execute();
} catch (Exception e) {
log.error("Executing batch {} failed.", b, e);
}
}

private BatchRequest newBatch() {
return compute.batch(
new HttpRequestInitializer() {
@Override
public void initialize(HttpRequest request) {
request.getHeaders().setUserAgent(clouddriverUserAgentApplicationName);
request.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_MILLIS);
request.setReadTimeout(DEFAULT_READ_TIMEOUT_MILLIS);
}
}
);
}

public void queue(ComputeRequest request, JsonBatchCallback callback) {
queuedRequests.add(new QueuedRequest(request, callback));
}

public Integer size() {
return queuedRequests.size();
}

@Data
@AllArgsConstructor
private static class QueuedRequest {
private ComputeRequest request;
private JsonBatchCallback callback;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,29 @@
*/
package com.netflix.spinnaker.clouddriver.google

import com.google.api.client.googleapis.batch.BatchRequest
import com.google.api.client.googleapis.services.AbstractGoogleClientRequest
import com.google.api.client.http.HttpResponseException
import com.netflix.spinnaker.clouddriver.googlecommon.GoogleExecutor
import com.netflix.spinnaker.clouddriver.google.security.AccountForClient

import com.netflix.spectator.api.Clock
import com.netflix.spectator.api.Id
import com.netflix.spectator.api.Registry

import java.util.concurrent.TimeUnit

import com.netflix.spinnaker.clouddriver.google.security.AccountForClient
import com.netflix.spinnaker.clouddriver.googlecommon.GoogleExecutor
import com.netflix.spinnaker.clouddriver.googlecommon.batch.GoogleBatchRequest

/**
* This class is syntactic sugar atop the static GoogleExecutor.
* By making it a traite, we can wrap the calls with less in-line syntax.
* By making it a trait, we can wrap the calls with less in-line syntax.
*/
trait GoogleExecutorTraits {
final String TAG_BATCH_CONTEXT = GoogleExecutor.TAG_BATCH_CONTEXT
final String TAG_REGION = GoogleExecutor.TAG_REGION
final String TAG_SCOPE = GoogleExecutor.TAG_SCOPE
final String TAG_ZONE = GoogleExecutor.TAG_ZONE
final String SCOPE_BATCH = GoogleExecutor.SCOPE_BATCH
final String SCOPE_GLOBAL = GoogleExecutor.SCOPE_GLOBAL
final String SCOPE_REGIONAL = GoogleExecutor.SCOPE_REGIONAL
final String SCOPE_ZONAL = GoogleExecutor.SCOPE_ZONAL

abstract Registry getRegistry()

public <T> T timeExecuteBatch(BatchRequest batch, String batchContext, String... tags) throws IOException {
return GoogleExecutor.timeExecuteBatch(getRegistry(), batch, batchContext, tags)
public <T> T timeExecuteBatch(GoogleBatchRequest googleBatchRequest, String batchContext, String... tags) throws IOException {
return GoogleExecutor.timeExecuteBatch(getRegistry(), googleBatchRequest, batchContext, tags)
}

public <T> T timeExecute(AbstractGoogleClientRequest<T> request, String api, String... tags) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
package com.netflix.spinnaker.clouddriver.google.deploy

import com.fasterxml.jackson.databind.ObjectMapper
import com.google.api.client.googleapis.batch.BatchRequest
import com.google.api.client.googleapis.batch.json.JsonBatchCallback
import com.google.api.client.googleapis.json.GoogleJsonError
import com.google.api.client.googleapis.json.GoogleJsonResponseException
import com.google.api.client.http.*
import com.google.api.client.http.GenericUrl
import com.google.api.client.http.HttpHeaders
import com.google.api.client.http.HttpResponse
import com.google.api.client.json.JsonObjectParser
import com.google.api.client.json.jackson2.JacksonFactory
import com.google.api.services.compute.Compute
Expand All @@ -44,6 +45,7 @@ import com.netflix.spinnaker.clouddriver.google.model.callbacks.Utils
import com.netflix.spinnaker.clouddriver.google.model.health.GoogleInstanceHealth
import com.netflix.spinnaker.clouddriver.google.model.health.GoogleLoadBalancerHealth
import com.netflix.spinnaker.clouddriver.google.model.loadbalancing.*
import com.netflix.spinnaker.clouddriver.googlecommon.batch.GoogleBatchRequest
import com.netflix.spinnaker.clouddriver.google.provider.view.GoogleClusterProvider
import com.netflix.spinnaker.clouddriver.google.provider.view.GoogleLoadBalancerProvider
import com.netflix.spinnaker.clouddriver.google.provider.view.GoogleNetworkProvider
Expand Down Expand Up @@ -90,7 +92,7 @@ class GCEUtil {
def imageProjects = [projectName] + credentials?.imageProjects + baseImageProjects - null
def sourceImage = null

def imageListBatch = buildBatchRequest(compute, clouddriverUserAgentApplicationName)
def imageListBatch = new GoogleBatchRequest(compute, clouddriverUserAgentApplicationName)
def imageListCallback = new JsonBatchCallback<ImageList>() {
@Override
void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException {
Expand All @@ -108,7 +110,7 @@ class GCEUtil {
imageProjects.each { imageProject ->
def imagesList = compute.images().list(imageProject)
imagesList.setFilter(filter)
imagesList.queue(imageListBatch, imageListCallback)
imageListBatch.queue(imagesList, imageListCallback)
}

executor.timeExecuteBatch(imageListBatch, "findImage")
Expand Down Expand Up @@ -180,17 +182,6 @@ class GCEUtil {
}
}

private static BatchRequest buildBatchRequest(def compute, String clouddriverUserAgentApplicationName) {
return compute.batch(
new HttpRequestInitializer() {
@Override
void initialize(HttpRequest request) throws IOException {
request.headers.setUserAgent(clouddriverUserAgentApplicationName);
}
}
)
}

static GoogleNetwork queryNetwork(String accountName, String networkName, Task task, String phase, GoogleNetworkProvider googleNetworkProvider) {
task.updateStatus phase, "Looking up network $networkName..."

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@ package com.netflix.spinnaker.clouddriver.google.provider.agent

import com.fasterxml.jackson.core.type.TypeReference
import com.fasterxml.jackson.databind.ObjectMapper
import com.google.api.client.googleapis.batch.BatchRequest
import com.google.api.client.http.HttpRequest
import com.google.api.client.http.HttpRequestInitializer
import com.google.api.services.compute.Compute
import com.google.common.annotations.VisibleForTesting
import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.cats.agent.AccountAware
import com.netflix.spinnaker.cats.agent.CachingAgent
import com.netflix.spinnaker.clouddriver.google.GoogleExecutorTraits
import com.netflix.spinnaker.clouddriver.google.provider.GoogleInfrastructureProvider
import com.netflix.spinnaker.clouddriver.googlecommon.batch.GoogleBatchRequest
import com.netflix.spinnaker.clouddriver.google.security.GoogleNamedAccountCredentials

abstract class AbstractGoogleCachingAgent implements CachingAgent, AccountAware, GoogleExecutorTraits {
Expand Down Expand Up @@ -70,22 +68,13 @@ abstract class AbstractGoogleCachingAgent implements CachingAgent, AccountAware,
credentials?.name
}

def executeIfRequestsAreQueued(BatchRequest batch, String instrumentationContext) {
if (batch.size()) {
timeExecuteBatch(batch, instrumentationContext)
}
GoogleBatchRequest buildGoogleBatchRequest() {
return new GoogleBatchRequest(compute, clouddriverUserAgentApplicationName)
}

BatchRequest buildBatchRequest() {
return compute.batch(
new HttpRequestInitializer() {
@Override
void initialize(HttpRequest request) throws IOException {
request.headers.setUserAgent(clouddriverUserAgentApplicationName);
request.setConnectTimeout(2 * 60000) // 2 minutes connect timeout
request.setReadTimeout(2 * 60000) // 2 minutes read timeout
}
}
)
def executeIfRequestsAreQueued(GoogleBatchRequest googleBatchRequest, String instrumentationContext) {
if (googleBatchRequest.size()) {
timeExecuteBatch(googleBatchRequest, instrumentationContext)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.netflix.spinnaker.clouddriver.google.provider.agent

import com.fasterxml.jackson.databind.ObjectMapper
import com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest
import com.google.api.services.compute.ComputeRequest
import com.google.api.services.compute.model.BackendService
import com.google.api.services.compute.model.BackendServiceList
import com.netflix.spectator.api.Registry
Expand Down Expand Up @@ -66,7 +66,7 @@ class GoogleBackendServiceCachingAgent extends AbstractGoogleCachingAgent {
GoogleBackendServiceCachingAgent cachingAgent = this
List<BackendService> globalBackendServices = new PaginatedRequest<BackendServiceList>(cachingAgent) {
@Override
protected AbstractGoogleJsonClientRequest<BackendServiceList> request (String pageToken) {
protected ComputeRequest<BackendServiceList> request (String pageToken) {
return compute.backendServices().list(project).setPageToken(pageToken)
}

Expand All @@ -87,7 +87,7 @@ class GoogleBackendServiceCachingAgent extends AbstractGoogleCachingAgent {
credentials.regions.collect { it.name }.each { String region ->
List<BackendService> regionBackendServices = new PaginatedRequest<BackendServiceList>(cachingAgent) {
@Override
protected AbstractGoogleJsonClientRequest<BackendServiceList> request (String pageToken) {
protected ComputeRequest<BackendServiceList> request (String pageToken) {
return compute.regionBackendServices().list(project, region).setPageToken(pageToken)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.netflix.spinnaker.clouddriver.google.provider.agent

import com.fasterxml.jackson.databind.ObjectMapper
import com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest
import com.google.api.services.compute.ComputeRequest
import com.google.api.services.compute.model.Address
import com.google.api.services.compute.model.AddressList
import com.netflix.spectator.api.Registry
Expand Down Expand Up @@ -61,7 +61,7 @@ class GoogleGlobalAddressCachingAgent extends AbstractGoogleCachingAgent {
List<Address> loadAddresses() {
List<Address> globalAddresses = new PaginatedRequest<AddressList>(this) {
@Override
protected AbstractGoogleJsonClientRequest<AddressList> request (String pageToken) {
protected ComputeRequest<AddressList> request (String pageToken) {
return compute.globalAddresses().list(project).setPageToken(pageToken)
}

Expand Down
Loading

0 comments on commit a439b8f

Please sign in to comment.