Skip to content

Commit

Permalink
Create python scripts for deploying Kubeflow on GCP via deployment ma…
Browse files Browse the repository at this point in the history
…nager.

* The scripts replaces our bash commands
* For teardown we want to add retries to better handle INTERNAL_ERRORS
  with deployment manager that are causing the test to be flaky.

Related to #836 verify Kubeflow deployed correctly with deployment manager.

* Fix resource_not_found errors in delete (#833)

* The not found error was due to the type providers for K8s resources
  being deleted before the corresponding K8s resources. So the subsequent
  delete of the K8s resource would fail because the type provider did not
  exist.

* We fix this by using a $ref to refer to the type provider in the type field
  of K8s resources.
  • Loading branch information
jlewi committed May 24, 2018
1 parent 01ccdff commit c5834bc
Show file tree
Hide file tree
Showing 7 changed files with 374 additions and 63 deletions.
47 changes: 23 additions & 24 deletions docs/gke/configs/cluster.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ limitations under the License.
#}
{% set K8S_ENDPOINTS = {'': 'api/v1', '-v1beta1-extensions': 'apis/extensions/v1beta1', '-rbac-v1': 'apis/rbac.authorization.k8s.io/v1', '-apps-v1': 'apis/apps/v1/'} %}

{% set CLUSTER_TYPE_API_V1 = env['project'] + '/' + TYPE_NAME %}
{% set RBAC_TYPE = env['project'] + '/' + RBAC_TYPE_NAME %}
{% set APPS_TYPE = env['project'] + '/' + APPS_TYPE_NAME %}

{% set COLLECTION_PREFIX = '/api/v1/namespaces/{namespace}/' %}
{% set NAMESPACE_COLLECTION = '/api/v1/namespaces' %}
{% set CM_COLLECTION = COLLECTION_PREFIX + 'configmaps' %}
Expand All @@ -48,6 +44,9 @@ limitations under the License.
We also set deletePolicy to ABANDON on the project APIs because otherwise it tries to deactivate them
which causes errors.
TODO(jlewi): I don't think this is needed. I think the bug was that we weren't using references in K8s types
so we weren't ensuring the type providers were deleted after the corresponding resources.
#}
resources:
- name: {{ CLUSTER_NAME }}
Expand Down Expand Up @@ -180,6 +179,11 @@ e.g. creating namespaces, service accounts, stateful set to run the bootstrapper
value: >
$.concat("Bearer ", $.googleOauth2AccessToken())
descriptorUrl: https://$(ref.{{ CLUSTER_NAME }}.endpoint)/swaggerapi/{{ endpoint }}


metadata:
{# Set policy to abandon to avoid RESOURCE_NOT_FOUND_ERRORS on delete. #}
deletePolicy: ABANDON
{% endfor %}

{# Enable the resource manager API. This is needed below to get IAM policy.
Expand Down Expand Up @@ -234,29 +238,30 @@ e.g. creating namespaces, service accounts, stateful set to run the bootstrapper
runtimePolicy:
- UPDATE_ALWAYS

{# A note about K8s resources.
The type value should be defined using a reference to the corresponding type provider.
Using references will ensure the K8s resource has
an explicit dependency on the type provider. This will ensure resources aren't created
until their type provider is created and that the resources are deleted before
the corresponding type provider.
#}

{# Namespace for bootstrapper. #}
- name: admin-namespace
type: {{ CLUSTER_TYPE_API_V1 }}:{{ NAMESPACE_COLLECTION }}
type: {{ env['project'] }}/$(ref.{{ TYPE_NAME }}.name):{{ NAMESPACE_COLLECTION }}
properties:
apiVersion: v1
kind: Namespace
metadata:
name: kubeflow-admin
spec:

metadata:
dependsOn:
# Wait for the type provider to be created.
- {{ TYPE_NAME }}

deletePolicy: ABANDON

{# The deployment manager uses the cloudservices account. We need to create
a cluster role binding making the cloudservices account cluster admin
so that we can then create other cluster role bindings.
#}
- name: dm-rbac
type: {{ RBAC_TYPE }}:{{ CLUSTER_ROLE_BINDING_COLLECTION }}
type: {{ env['project'] }}/$(ref.{{ RBAC_TYPE_NAME }}.name):{{ CLUSTER_ROLE_BINDING_COLLECTION }}
properties:
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
Expand All @@ -271,15 +276,13 @@ e.g. creating namespaces, service accounts, stateful set to run the bootstrapper
apiGroup: rbac.authorization.k8s.io
metadata:
dependsOn:
- {{ RBAC_TYPE_NAME }}
- admin-namespace
deletePolicy: ABANDON

{# Make the default service account in the kubeflow-admin namespace a cluster admin.
Cluster admin priveleges are needed by the bootstrapper.
#}
- name: bootstrap-rbac
type: {{ RBAC_TYPE }}:{{ CLUSTER_ROLE_BINDING_COLLECTION }}
type: {{ env['project'] }}/$(ref.{{ RBAC_TYPE_NAME }}.name):{{ CLUSTER_ROLE_BINDING_COLLECTION }}
properties:
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
Expand All @@ -295,15 +298,13 @@ e.g. creating namespaces, service accounts, stateful set to run the bootstrapper
apiGroup: rbac.authorization.k8s.io
metadata:
dependsOn:
- {{ RBAC_TYPE_NAME }}
- admin-namespace
- dm-rbac
deletePolicy: ABANDON

{# Create a persistent volume to store the ksonnet app.
#}
- name: bootstrap-pvc
type: {{ CLUSTER_TYPE_API_V1 }}:{{ PVC_COLLECTION }}
type: {{ env['project'] }}/$(ref.{{ TYPE_NAME }}.name):{{ PVC_COLLECTION }}
properties:
apiVersion: v1
kind: PersistentVolumeClaim
Expand All @@ -324,11 +325,10 @@ e.g. creating namespaces, service accounts, stateful set to run the bootstrapper
metadata:
dependsOn:
- admin-namespace
deletePolicy: ABANDON

{# ConfigMap for the bootstrapper #}
- name: bootstrap-configmap
type: {{ CLUSTER_TYPE_API_V1 }}:{{ CM_COLLECTION }}
type: {{ env['project'] }}/$(ref.{{ TYPE_NAME }}.name):{{ CM_COLLECTION }}
properties:
apiVersion: v1
{# Namespace is a property because its used bye deployment manager in
Expand All @@ -345,11 +345,10 @@ e.g. creating namespaces, service accounts, stateful set to run the bootstrapper
metadata:
dependsOn:
- admin-namespace
deletePolicy: ABANDON

{# Stateful set for the bootstrapper #}
- name: bootstrap-statefulset
type: {{ APPS_TYPE }}:{{ STATEFULSETS_COLLECTION }}
type: {{ env['project'] }}/$(ref.{{ APPS_TYPE_NAME }}.name):{{ STATEFULSETS_COLLECTION }}
properties:
apiVersion: apps/v1
{# Namespace is a property because its used bye deployment manager in
Expand Down Expand Up @@ -400,7 +399,7 @@ e.g. creating namespaces, service accounts, stateful set to run the bootstrapper
metadata:
dependsOn:
- admin-namespace
deletePolicy: ABANDON

outputs:
{% for typeSuffix, endpoint in K8S_ENDPOINTS.iteritems() %}
- name: clusterType{{ typeSuffix }}
Expand Down
3 changes: 3 additions & 0 deletions testing/deploy_kubeflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

import yaml
from kubernetes.config import kube_config
# TODO(jlewi): We should be using absolute imports always.
# So it should be from testing import deploy_utils because testing
# is the top level python package.
from . import deploy_utils
from kubeflow.testing import test_helper
from kubeflow.testing import util # pylint: disable=no-name-in-module
Expand Down
129 changes: 129 additions & 0 deletions testing/deploy_kubeflow_gcp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
"""Deploy Kubeflow on GCP using deployment manager and the bootstrapper."""
import argparse
import datetime
import json
import logging
import os
import requests
import time

from googleapiclient import discovery
from googleapiclient import errors
from oauth2client.client import GoogleCredentials

from testing import deploy_utils
from kubeflow.testing import test_helper

def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"--project", required=True, type=str,
help="The project to deploy in.")

parser.add_argument(
"--name", required=True, type=str,
help="The name for the deployment.")

parser.add_argument(
"--config", required=True, type=str,
help="The path to the YAML file for the deployment config to use.")

parser.add_argument(
"--imports", default="", type=str,
help=("Comma separated list of files to import as part of the deployment "
"manager manifest"))

args, _ = parser.parse_known_args()
return args

def deploy_kubeflow_gcp(_):
"""Deploy Kubeflow."""
args = parse_args()
project = args.project
deployment_name = args.name
credentials = GoogleCredentials.get_application_default()
deploy = discovery.build("deploymentmanager", "v2", credentials=credentials)

deployments = deploy.deployments()

import_files = []

if args.imports:
import_files = args.imports.split(",")

imports = []

with open(args.config) as hf:
content = hf.read()

for f in import_files:
with open(f) as hf:
name = os.path.basename(f)
imports.append({
"name": name,
"content": hf.read(),
})


body = {
"name": deployment_name,
"target": {
"config": {
"content": content,
},
"imports": imports,
},
}

response = None
try:
logging.info("Creating deployment %s in project %s", deployment_name,
project)
response = deployments.insert(project=project, body=body).execute()
except errors.HttpError as e:
logging.error("Got exception %s", e)
if not e.content:
raise

try:
content = json.loads(e.content)
except ValueError:
logging.error("Could not parse content %s as json", e.content)

code = content.get("error", {}).get("code")
if code == requests.codes.CONFLICT:
logging.info("Deployment %s already exists", deployment_name)
else:
raise

if response:
op_id = response["name"]

else:
# Get the deployment and make sure its up
d = deployments.get(project=project, deployment=deployment_name).execute()
op_id = d.get("operation", {}).get("name")
if not op_id:
raise ValueError("Could not get operation name.")

logging.info("Wait for deployment; operation %s", op_id)
final_status = deploy_utils.wait_for_operation(deploy, project, op_id)

logging.info("Deployment status\n%s:", json.dumps(final_status,
sort_keys=True,
indent=2,
separators=(',', ': ')))

if final_status.get("status") != "DONE":
logging.error("Deployment operation isn't done.")
raise RuntimeError("Deployment operation isn't done.")

def main():
test_case = test_helper.TestCase(
name='deploy_kubeflow_gcp', test_func=deploy_kubeflow_gcp)
test_suite = test_helper.init(
name='deploy_kubeflow_gcp', test_cases=[test_case])
test_suite.run()

if __name__ == "__main__":
main()
49 changes: 49 additions & 0 deletions testing/deploy_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,52 @@ def setup_kubeflow_ks_app(dir, namespace, github_token, api_client):
os.symlink(source, target_dir)

return app_dir

def log_operation_status(operation):
"""A callback to use with wait_for_operation."""
name = operation.get("name", "")
status = operation.get("status", "")
logging.info("Operation %s status %s", name, status)

def wait_for_operation(client,
project,
op_id,
timeout=datetime.timedelta(hours=1),
polling_interval=datetime.timedelta(seconds=5),
status_callback=log_operation_status):
"""Wait for the specified operation to complete.
Args:
client: Client for the API that owns the operation.
project: project
op_id: Operation id.
timeout: A datetime.timedelta expressing the amount of time to wait before
giving up.
polling_interval: A datetime.timedelta to represent the amount of time to
wait between requests polling for the operation status.
Returns:
op: The final operation.
Raises:
TimeoutError: if we timeout waiting for the operation to complete.
"""
endtime = datetime.datetime.now() + timeout
while True:
op = client.operations().get(
project=project, operation=op_id).execute()

if status_callback:
status_callback(op)

status = op.get("status", "")
# Need to handle other status's
if status == "DONE":
return op
if datetime.datetime.now() > endtime:
raise TimeoutError(
"Timed out waiting for op: {0} to complete.".format(op_id))
time.sleep(polling_interval.total_seconds())

# Linter complains if we don't have a return here even though its unreachable.
return None
Loading

0 comments on commit c5834bc

Please sign in to comment.