diff --git a/docs/user-guide/jobs.md b/docs/user-guide/jobs.md index b3535e7c034d5..0bd41f5952841 100644 --- a/docs/user-guide/jobs.md +++ b/docs/user-guide/jobs.md @@ -42,12 +42,13 @@ Documentation for other releases can be found at - [Writing a Job Spec](#writing-a-job-spec) - [Pod Template](#pod-template) - [Pod Selector](#pod-selector) - - [Multiple Completions](#multiple-completions) - - [Parallelism](#parallelism) + - [Parallelism and Completions](#parallelism-and-completions) - [Handling Pod and Container Failures](#handling-pod-and-container-failures) - - [Alternatives to Job](#alternatives-to-job) + - [Job Patterns](#job-patterns) + - [Alternatives](#alternatives) - [Bare Pods](#bare-pods) - [Replication Controller](#replication-controller) + - [Single Job starts Controller Pod](#single-job-starts-controller-pod) - [Caveats](#caveats) - [Future work](#future-work) @@ -61,6 +62,9 @@ of successful completions is reached, the job itself is complete. Deleting a Jo pods it created. A simple case is to create 1 Job object in order to reliably run one Pod to completion. +The Job object will start a new Pod if the first pod fails or is deleted (for example +due to a node hardware failure or a node reboot). + A Job can also be used to run multiple pods in parallel. ## Running an example Job @@ -179,30 +183,28 @@ Also you should not normally create any pods whose labels match this selector, e via another Job, or via another controller such as ReplicationController. Otherwise, the Job will think that those pods were created by it. Kubernetes will not stop you from doing this. -### Multiple Completions - -By default, a Job is complete when one Pod runs to successful completion. You can also specify that -this needs to happen multiple times by specifying `.spec.completions` with a value greater than 1. -When multiple completions are requested, each Pod created by the Job controller has an identical -[`spec`](../devel/api-conventions.md#spec-and-status). In particular, all pods will have -the same command line and the same image, the same volumes, and mostly the same environment -variables. It is up to the user to arrange for the pods to do work on different things. For -example, the pods might all access a shared work queue service to acquire work units. +### Parallelism and Completions -To create multiple pods which are similar, but have slightly different arguments, environment -variables or images, use multiple Jobs. +By default, a Job is complete when one Pod runs to successful completion. -### Parallelism +A single Job object can also be used to control multiple pods running in +parallel. There are several different [patterns for running parallel +jobs](#job-patterns). -You can suggest how many pods should run concurrently by setting `.spec.parallelism` to the number -of pods you would like to have running concurrently. This number is a suggestion. The number -running concurrently may be lower or higher for a variety of reasons. For example, it may be lower -if the number of remaining completions is less, or as the controller is ramping up, or if it is -throttling the job due to excessive failures. It may be higher for example if a pod is gracefully -shutdown, and the replacement starts early. +With some of these patterns, you can suggest how many pods should run +concurrently by setting `.spec.parallelism` to the number of pods you would +like to have running concurrently. This number is a suggestion. The number +running concurrently may be lower or higher for a variety of reasons. For +example, it may be lower if the number of remaining completions is less, or as +the controller is ramping up, or if it is throttling the job due to excessive +failures. It may be higher for example if a pod is gracefully shutdown, and +the replacement starts early. If you do not specify `.spec.parallelism`, then it defaults to `.spec.completions`. +Depending on the pattern you are using, you will either set `.spec.completions` +to 1 or to the number of units of work (see [Job Patterns] for an explanation). + ## Handling Pod and Container Failures A Container in a Pod may fail for a number of reasons, such as because the process in it exited with @@ -226,7 +228,61 @@ sometimes be started twice. If you do specify `.spec.parallelism` and `.spec.completions` both greater than 1, then there may be multiple pods running at once. Therefore, your pods must also be tolerant of concurrency. -## Alternatives to Job +## Job Patterns + +The Job object can be used to support reliable parallel execution of Pods. The Job object is not +designed to support closely-communicating parallel processes, as commonly found in scientific +computing. It does support parallel processing of a set of independent but related *work items*. +These might be emails to be sent, frames to be rendered, files to be transcoded, ranges of keys in a +NoSQL database to scan, and so on. + +In a complex system, there may be multiple different sets of work items. Here we are just +considering one set of work items that the user wants to manage together — a *batch job*. + +There are several different patterns for parallel computation, each with strengths and weaknesses. +The tradeoffs are: + +- One Job object for each work item, vs a single Job object for all work items. The latter is + better for large numbers of work items. The former creates some overhead for the user and for the + system to manage large numbers of Job objects. Also, with the latter, the resource usage of the job + (number of concurrently running pods) can be easily adjusted using the `kubectl scale` command. +- Number of pods created equals number of work items, vs each pod can process multiple work items. + The former typically requires less modification to existing code and containers. The latter + is better for large numbers of work items, for similar reasons to the previous bullet. +- Several approaches use a work queue. This requires running a queue service, + and modifications to the existing program or container to make it use the work queue. + Other approaches are easier to adapt to an existing containerised application. + + +The tradeoffs are summarized here, with columns 2 to 4 corresponding to the above tradeoffs. +The pattern names are also links to examples and more detailed description. + +| Pattern | Single Job object | Fewer pods than work items? | Use app unmodified? | Works in Kube 1.1? | +| -------------------------------------------------------------------------- |:-----------------:|:---------------------------:|:-------------------:|:-------------------:| +| [Job Template Expansion](../../examples/job/expansions/README.md) | | | ✓ | ✓ | +| [Queue with Pod Per Work Item](../../examples/job/work-queue-1/README.md) | ✓ | | sometimes | ✓ | +| [Queue with Variable Pod Count](../../examples/job/work-queue-2/README.md) | | ✓ | ✓ | | ✓ | +| Single Job with Static Work Assignment | ✓ | | ✓ | | + +When you specify completions with `.spec.completions`, each Pod created by the Job controller +has an identical [`spec`](../devel/api-conventions.md#spec-and-status). This means that +all pods will have the same command line and the same +image, the same volumes, and (almost) the same environment variables. These patterns +are different ways to arrange for pods to work on different things. + +This table shows the required settings for `.spec.parallelism` and `.spec.completions` for each of the patterns. +Here, `W` is the number of work items. + +| Pattern | `.spec.completions` | `.spec.parallelism` | +| -------------------------------------------------------------------------- |:-------------------:|:--------------------:| +| [Job Template Expansion](../../examples/job/expansions/README.md) | 1 | should be 1 | +| [Queue with Pod Per Work Item](../../examples/job/work-queue-1/README.md) | W | any | +| [Queue with Variable Pod Count](../../examples/job/work-queue-2/README.md) | 1 | any | +| Single Job with Static Work Assignment | W | any | + + + +## Alternatives ### Bare Pods @@ -245,6 +301,19 @@ As discussed in [life of a pod](pod-states.md), `Job` is *only* appropriate for `RestartPolicy` equal to `OnFailure` or `Never`. (Note: If `RestartPolicy` is not set, the default value is `Always`.) +### Single Job starts Controller Pod + +Another pattern is for a single Job to create a pod which then creates other pods, acting as a sort +of custom controller for those pods. This allows the most flexibility, but may be somewhat +complicated to get started with and offers less integration with Kubernetes. + +One example of this pattern would be a Job which starts a Pod which runs a script that in turn +starts a Spark master controller (see [spark example](../../examples/spark/README.md)), runs a spark +driver, and then cleans up. + +An advantage of this approach is that the overall process gets the completion guarantee of a Job +object, but complete control over what pods are created and how work is assigned to them. + ## Caveats Job objects are in the [`extensions` API Group](../api.md#api-groups). diff --git a/examples/examples_test.go b/examples/examples_test.go index 77fdb623050b3..336db7dd29298 100644 --- a/examples/examples_test.go +++ b/examples/examples_test.go @@ -392,6 +392,14 @@ func TestExampleObjectSchemas(t *testing.T) { "javaweb": &api.Pod{}, "javaweb-2": &api.Pod{}, }, + "../examples/job/work-queue-1": { + "job": &extensions.Job{}, + }, + "../examples/job/work-queue-2": { + "redis-pod": &api.Pod{}, + "redis-service": &api.Service{}, + "job": &extensions.Job{}, + }, } capabilities.SetForTests(capabilities.Capabilities{ diff --git a/examples/job/expansions/README.md b/examples/job/expansions/README.md new file mode 100644 index 0000000000000..e6b34bdf14b9a --- /dev/null +++ b/examples/job/expansions/README.md @@ -0,0 +1,255 @@ + + + + +WARNING +WARNING +WARNING +WARNING +WARNING + +

PLEASE NOTE: This document applies to the HEAD of the source tree

+ +If you are using a released version of Kubernetes, you should +refer to the docs that go with that version. + + +The latest release of this document can be found +[here](http://releases.k8s.io/release-1.1/examples/job/expansions/README.md). + +Documentation for other releases can be found at +[releases.k8s.io](http://releases.k8s.io). + +-- + + + + + +# Example: Multiple Job Objects from Template Expansion + +In this example, we will run multiple Kubernetes Jobs created from +a common template. You may want to be familiar with the basic, +non-parallel, use of [Job](../../../docs/user-guide/jobs.md) first. + +## Basic Template Expansion + +First, create a template of a Job object: + + + +``` +apiVersion: extensions/v1beta1 +kind: Job +metadata: + name: process-item-$ITEM +spec: + selector: + matchLabels: + app: jobexample + item: $ITEM + template: + metadata: + name: jobexample + labels: + app: jobexample + item: $ITEM + spec: + containers: + - name: c + image: busybox + command: ["sh", "-c", "echo Processing item $ITEM && sleep 5"] + restartPolicy: Never +``` + +[Download example](job.yaml.txt?raw=true) + + +Unlike a *pod template*, our *job template* is not a Kubernetes API type. It is just +a yaml representation of a Job object that has some placeholders that need to be filled +in before it can be used. The `$ITEM` syntax is not meaningful to Kubernetes. + +In this example, the only processing the container does is to `echo` a string and sleep for a bit. +In a real use case, the processing would be some substantial computation, such as rendering a frame +of a movie, or processing a range of rows in a database. The "$ITEM" parameter would specify for +example, the frame number or the row range. + +This Job has two labels. The first label, `app=jobexample`, distinguishes this group of jobs from +other groups of jobs (these are not shown, but there might be other ones). This label +makes it convenient to operate on all the jobs in the group at once. The second label, with +key `item`, distinguishes individual jobs in the group. Each Job object needs to have +a unique label that no other job has. This is it. +Neither of these label keys are special to kubernetes -- you can pick your own label scheme. + +Next, expand the template into multiple files, one for each item to be processed. + +```console +# Expand files into a temporary directory +$ mkdir ./jobs +$ for i in apple banana cherry +do + cat job.yaml.txt | sed "s/\$ITEM/$i/" > ./jobs/job-$i.yaml +done +$ ls jobs/ +job-apple.yaml +job-banana.yaml +job-cherry.yaml +``` + +Here, we used `sed` to replace the string `$ITEM` with the the loop variable. +You could use any type of template language (jinja2, erb) or write a program +to generate the Job objects. + +Next, create all the jobs with one kubectl command: + +```console +$ kubectl create -f ./jobs +job "process-item-apple" created +job "process-item-banana" created +job "process-item-cherry" created +``` + +Now, check on the jobs: + +```console +$ kubectl get jobs -l app=jobexample -L item +JOB CONTAINER(S) IMAGE(S) SELECTOR SUCCESSFUL ITEM +process-item-apple c busybox app in (jobexample),item in (apple) 1 apple +process-item-banana c busybox app in (jobexample),item in (banana) 1 banana +process-item-cherry c busybox app in (jobexample),item in (cherry) 1 cherry +``` + +Here we use the `-l` option to select all jobs that are part of this +group of jobs. (There might be other unrelated jobs in the system that we +do not care to see.) + +The `-L` option adds an extra column with just the `item` label value. + +We can check on the pods as well using the same label selector: + +```console +$ kubectl get pods -l app=jobexample -L item +NAME READY STATUS RESTARTS AGE ITEM +process-item-apple-kixwv 0/1 Completed 0 4m apple +process-item-banana-wrsf7 0/1 Completed 0 4m banana +process-item-cherry-dnfu9 0/1 Completed 0 4m cherry +``` + +There is not a single command to check on the output of all jobs at once, +but looping over all the pods is pretty easy: + +```console +$ for p in $(kubectl get pods -l app=jobexample -o name) +do + kubectl logs $p +done +Processing item apple +Processing item banana +Processing item cherry +``` + +## Multiple Template Parameters + +In the first example, each instance of the template had one parameter, and that parameter was also +used as a label. However label keys are limited in [what characters they can +contain](labels.md#syntax-and-character-set). + +This slightly more complex example uses a the jinja2 template language to generate our objects. +We will use a one-line python script to convert the template to a file. + +First, download or paste the following template file to a file called `job.yaml.jinja2`: + + + +``` +{%- set params = [{ "name": "apple", "url": "http://www.orangepippin.com/apples", }, + { "name": "banana", "url": "https://en.wikipedia.org/wiki/Banana", }, + { "name": "raspberry", "url": "https://www.raspberrypi.org/" }] +%} +{%- for p in params %} +{%- set name = p["name"] %} +{%- set url = p["url"] %} +apiVersion: extensions/v1beta1 +kind: Job +metadata: + name: jobexample-{{ name }} +spec: + selector: + matchLabels: + app: jobexample + item: {{ name }} + template: + metadata: + name: jobexample + labels: + app: jobexample + item: {{ name }} + spec: + containers: + - name: c + image: busybox + command: ["sh", "-c", "echo Processing URL {{ url }} && sleep 5"] + restartPolicy: Never +--- +{%- endfor %} +``` + +[Download example](job.yaml.jinja2?raw=true) + + +The above template defines parameters for each job object using a list of +python dicts (lines 1-4). Then a for loop emits one job yaml object +for each set of parameters (remaining lines). +We take advantage of the fact that multiple yaml documents can be concatenated +with the `---` separator (second to last line). +.) We can pipe the output directly to kubectl to +create the objects. + +You will need the jinja2 package if you do not already have it: `pip install --user jinja2`. +Now, use this one-line python program to expand the template: + +``` +$ alias render_template='python -c "from jinja2 import Template; import sys; print(Template(sys.stdin.read()).render());"' +``` + + + +The output can be saved to a file, like this: + +``` +$ cat job.yaml.jinja2 | render_template > jobs.yaml +``` + +or sent directly to kubectl, like this: + +``` +$ cat job.yaml.jinja2 | render_template | kubectl create -f - +``` + +## Alternatives + +If you have a large number of job objects, you may find that: + +- even using labels, managing so many Job objects is cumbersome. +- you exceed resource quota when creating all the Jobs at once, + and do not want to wait to create them incrementally. +- you need a way to easily scale the number of pods running + concurrently. One reason would be to avoid using too many + compute resources. Another would be to limit the number of + concurrent requests to a shared resource, such as a database, + used by all the pods in the job. +- very large numbers of jobs created at once overload the + kubernetes apiserver, controller, or scheduler. + +In this case, you can consider one of the +other [job patterns](../../../docs/user-guide/jobs.md#job-patterns). + + + +[![Analytics](https://kubernetes-site.appspot.com/UA-36037335-10/GitHub/examples/job/expansions/README.md?pixel)]() + diff --git a/examples/job/expansions/job.yaml.jinja2 b/examples/job/expansions/job.yaml.jinja2 new file mode 100644 index 0000000000000..962505a522acd --- /dev/null +++ b/examples/job/expansions/job.yaml.jinja2 @@ -0,0 +1,30 @@ +{%- set params = [{ "name": "apple", "url": "http://www.orangepippin.com/apples", }, + { "name": "banana", "url": "https://en.wikipedia.org/wiki/Banana", }, + { "name": "raspberry", "url": "https://www.raspberrypi.org/" }] +%} +{%- for p in params %} +{%- set name = p["name"] %} +{%- set url = p["url"] %} +apiVersion: extensions/v1beta1 +kind: Job +metadata: + name: jobexample-{{ name }} +spec: + selector: + matchLabels: + app: jobexample + item: {{ name }} + template: + metadata: + name: jobexample + labels: + app: jobexample + item: {{ name }} + spec: + containers: + - name: c + image: busybox + command: ["sh", "-c", "echo Processing URL {{ url }} && sleep 5"] + restartPolicy: Never +--- +{%- endfor %} diff --git a/examples/job/expansions/job.yaml.txt b/examples/job/expansions/job.yaml.txt new file mode 100644 index 0000000000000..318c61b59968a --- /dev/null +++ b/examples/job/expansions/job.yaml.txt @@ -0,0 +1,21 @@ +apiVersion: extensions/v1beta1 +kind: Job +metadata: + name: process-item-$ITEM +spec: + selector: + matchLabels: + app: jobexample + item: $ITEM + template: + metadata: + name: jobexample + labels: + app: jobexample + item: $ITEM + spec: + containers: + - name: c + image: busybox + command: ["sh", "-c", "echo Processing item $ITEM && sleep 5"] + restartPolicy: Never diff --git a/examples/job/work-queue-1/Dockerfile b/examples/job/work-queue-1/Dockerfile new file mode 100644 index 0000000000000..cbd36bb620303 --- /dev/null +++ b/examples/job/work-queue-1/Dockerfile @@ -0,0 +1,10 @@ +# Specify BROKER_URL and QUEUE when running +FROM ubuntu:14.04 + +RUN apt-get update && \ + apt-get install -y curl ca-certificates amqp-tools python \ + --no-install-recommends \ + && rm -rf /var/lib/apt/lists/* +COPY ./worker.py /worker.py + +CMD /usr/bin/amqp-consume --url=$BROKER_URL -q $QUEUE -c 1 /worker.py diff --git a/examples/job/work-queue-1/README.md b/examples/job/work-queue-1/README.md new file mode 100644 index 0000000000000..66c0ff3e7963a --- /dev/null +++ b/examples/job/work-queue-1/README.md @@ -0,0 +1,374 @@ + + + + +WARNING +WARNING +WARNING +WARNING +WARNING + +

PLEASE NOTE: This document applies to the HEAD of the source tree

+ +If you are using a released version of Kubernetes, you should +refer to the docs that go with that version. + + +The latest release of this document can be found +[here](http://releases.k8s.io/release-1.1/examples/job/work-queue-1/README.md). + +Documentation for other releases can be found at +[releases.k8s.io](http://releases.k8s.io). + +-- + + + + + +# Example: Job with Work Queue with Pod Per Work Item + +In this example, we will run a Kubernetes Job with multiple parallel +worker processes. You may want to be familiar with the basic, +non-parallel, use of [Job](../../../docs/user-guide/jobs.md) first. + +In this example, as each pod is created, it picks up one unit of work +from a task queue, completes it, deletes it from the queue, and exits. + + +Here is an overview of the steps in this example: + +1. **Start a message queue service.** In this example, we use RabbitMQ, but you could use another + one. In practice you would set up a message queue service once and reuse it for many jobs. +1. **Create a queue, and fill it with messages.** Each message represents one task to be done. In + this example, a message is just an integer that we will do a lengthy computation on. +1. **Start a Job that works on tasks from the queue**. The Job starts several pods. Each pod takes + one task from the message queue, processes it, and repeats until the end of the queue is reached. + +## Starting a message queue service + +This example uses RabbitMQ, but it should be easy to adapt to another AMQP-type message service. + +In practice you could set up a message queue service once in a +cluster and reuse it for many jobs, as well as for long-running services. + +Start RabbitMQ as follows: + +```console +$ kubectl create -f examples/celery-rabbitmq/rabbitmq-service.yaml +service "rabbitmq-service" created +$ kubectl create -f examples/celery-rabbitmq/rabbitmq-controller.yaml +replicationController "rabbitmq-controller" created +``` + +We will only use the rabbitmq part from the celery-rabbitmq example. + +## Testing the message queue service + +Now, we can experiment with accessing the message queue. We will +create a temporary interactive pod, install some tools on it, +and experiment with queues. + +First create a temporary interactive Pod. + +```console +# Create a temporary interactive container +$ kubectl run -i --tty temp --image ubuntu:14.04 +Waiting for pod default/temp-loe07 to be running, status is Pending, pod ready: false +... [ previous line repeats several times .. hit return when it stops ] ... +``` + +Note that your pod name and command prompt will be different. + +Next install the `amqp-tools` so we can work with message queues. + +```console +# Install some tools +root@temp-loe07:/# apt-get update +.... [ lots of output ] .... +root@temp-loe07:/# apt-get install -y curl ca-certificates amqp-tools python dnsutils +.... [ lots of output ] .... + +``` + +Later, we will make a docker image that includes these packages. + +Next, we will check that we can discover the rabbitmq service: + +``` +# Note the rabitmq-service has a DNS name, provided by Kubernetes: + +root@temp-loe07:/# nslookup rabbitmq-service +Server: 10.0.0.10 +Address: 10.0.0.10#53 + +Name: rabbitmq-service.default.svc.cluster.local +Address: 10.0.147.152 + +# Your address will vary. +``` + +If Kube-DNS is not setup correctly, the previous step may not work for you. +You can also find the service IP in an env var: + +``` +# env | grep RABBIT | grep HOST +RABBITMQ_SERVICE_SERVICE_HOST=10.0.147.152 +# Your address will vary. +``` + +Next we will verify we can create a queue, and publish and consume messages. + +```console + +# In the next line, rabbitmq-service is the hostname where the rabbitmq-service +# can be reached. 5672 is the standard port for rabbitmq. + +root@temp-loe07:/# BROKER_URL=amqp://guest:guest@rabbitmq-service:5672 +# If you could not resolve "rabbitmq-service" in the previous step, +# then use this command instead: +# root@temp-loe07:/# BROKER_URL=amqp://guest:guest@$RABBITMQ_SERVICE_SERVICE_HOST:5672 + +# Now create a queue: + +root@temp-loe07:/# /usr/bin/amqp-declare-queue --url=$BROKER_URL -q foo -d +foo + +# Publish one message to it: + +root@temp-loe07:/# /usr/bin/amqp-publish --url=$BROKER_URL -r foo -p -b Hello + +# And get it back. + +root@temp-loe07:/# /usr/bin/amqp-consume --url=$BROKER_URL -q foo -c 1 cat && echo +Hello +root@temp-loe07:/# + +``` + +In the last command, the `amqp-consume` tool takes one message (`-c 1`) +from the queue, and passes that message to the standard input of an +an arbitrary command. In this case, the program `cat` is just printing +out what it gets on the standard input, and the echo is just to add a carriage +return so the example is readable. + +## Filling the Queue with tasks + +Now lets fill the queue with some "tasks". In our example, our tasks are just strings to be +printed. + +In a practice, the content of the messages might be: + +- names of files to that need to be processed +- extra flags to the program +- ranges of keys in a database table +- configuration parameters to a simulation +- frame numbers of a scene to be rendered + +In practice, if there is large data that is needed in a read-only mode by all pods +of the Job, you will typically put that in a shared file system like NFS and mount +that readonly on all the pods, or the program in the pod will natively read data from +a cluster file system like HDFS. + +For our example, we will create the queue and fill it using the amqp command line tools. +In practice, you might write a program to fill the queue using an amqp client library. + +```console + +$ /usr/bin/amqp-declare-queue --url=$BROKER_URL -q job1 -d +job1 +$ for f in apple banana cherry date fig grape lemon melon +do + /usr/bin/amqp-publish --url=$BROKER_URL -r job1 -p -b $f +done +``` + +So, we filled the queue with 8 messages. + +## Create an Image + +Now we are ready to create an image that we will run as a job. + +We will use the `amqp-consume` utility to read the message +from the queue and run our actual program. Here is a very simple +example program: + + + +``` +#!/usr/bin/env python + +# Copyright 2015 The Kubernetes Authors All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Just prints standard out and sleeps for 10 seconds. +import sys +import time +print("Processing " + sys.stdin.lines()) +time.sleep(10) +``` + +[Download example](worker.py?raw=true) + + +Now, build an an image. If you are working in the source +tree, then change directory to `examples/job/work-queue-1`. +Otherwise, make a temporary directory, change to it, +download the [Dockerfile](Dockerfile?raw=true), +and [worker.py](worker.py?raw=true). In either case, +build the image with this command: ` + +```console +$ docker build -t job-wq-1 . +``` + +For the [Docker Hub](https://hub.docker.com/), tag your app image with +your username and push to the Hub with the below commands. Replace +`` with your Hub username. + +``` +docker tag job-wq-1 /job-wq-1 +docker push /job-wq-1 +``` + +If you are using [Google Container +Registry](https://cloud.google.com/tools/container-registry/), tag +your app image with your project ID, and push to GCR. Replace +`` with your project ID. + +``` +docker tag job-wq-1 gcr.io//job-wq-1 +gcloud docker push gcr.io//job-wq-1 +``` + +## Defining a Job + +Here is a job definition. You'll need to make a copy of the Job and edit the +image to match the name you used, and call it `./job.yaml`. + + + + +```yaml +apiVersion: extensions/v1beta1 +kind: Job +metadata: + name: job-wq-1 +spec: + selector: + matchLabels: + app: job-wq-1 + completions: 8 + parallelism: 2 + template: + metadata: + name: job-wq-1 + labels: + app: job-wq-1 + spec: + containers: + - name: c + image: gcr.io//job-wq-1 + restartPolicy: OnFailure +``` + +[Download example](job.yaml?raw=true) + + +In this example, each pod works on one item from the queue and then exits. +So, the completion count of the Job corresponds to the number of work items +done. So we set, `.spec.completions: 8` for the example, since we put 8 items in the queue. + +## Running the Job + +So, now run the Job: + +```console +$ kubectl create -f ./job.yaml +``` + +Now wait a bit, then check on the job. + +```console +$ ./kubectl describe jobs/job-wq-1 +Name: job-wq-1 +Namespace: default +Image(s): gcr.io/causal-jigsaw-637/job-wq-1 +Selector: app in (job-wq-1) +Parallelism: 4 +Completions: 8 +Labels: app=job-wq-1 +Pods Statuses: 0 Running / 8 Succeeded / 0 Failed +No volumes. +Events: + FirstSeen LastSeen Count From SubobjectPath Reason Message + ───────── ──────── ───── ──── ───────────── ────── ─────── + 27s 27s 1 {job } SuccessfulCreate Created pod: job-wq-1-hcobb + 27s 27s 1 {job } SuccessfulCreate Created pod: job-wq-1-weytj + 27s 27s 1 {job } SuccessfulCreate Created pod: job-wq-1-qaam5 + 27s 27s 1 {job } SuccessfulCreate Created pod: job-wq-1-b67sr + 26s 26s 1 {job } SuccessfulCreate Created pod: job-wq-1-xe5hj + 15s 15s 1 {job } SuccessfulCreate Created pod: job-wq-1-w2zqe + 14s 14s 1 {job } SuccessfulCreate Created pod: job-wq-1-d6ppa + 14s 14s 1 {job } SuccessfulCreate Created pod: job-wq-1-p17e0 +``` + +All our pods succeeded. Yay. + + +## Alternatives + +This approach has the advantage that you +do not need to modify your "worker" program to be aware that there is a work queue. + +It does require that you run a message queue service. +If running a queue service is inconvenient, you may +want to consider one of the other [job patterns](../../../docs/user-guide/jobs.md#job-patterns). + +This approach creates a pod for every work item. If your work items only take a few seconds, +though, creating a Pod for every work item may add a lot of overhead. Consider another +[example](../work-queue-2/README.md), that executes multiple work items per Pod. + +In this example, we used use the `amqp-consume` utility to read the message +from the queue and run our actual program. This has the advantage that you +do not need to modify your program to be aware of the queue. +A [different example](../work-queue-2/README.md), shows how to +communicate with the work queue using a client library. + +## Caveats + +If the number of completions is set to less than the number of items in the queue, then +not all items will be processed. + +If the number of completions is set to more than the number of items in the queue, +then the Job will not appear to be completed, even though all items in the queue +have been processed. It will start additional pods which will block waiting +for a mesage. + +There is an unlikely race with this pattern. If the container is killed in between the time +that the message is acknowledged by the amqp-consume command and the time that the container +exits with success, or if the node crashes before the kubelet is able to post the success of the pod +back to the api-server, then the Job will not appear to be complete, even though all items +in the queue have been processed. + + + + +[![Analytics](https://kubernetes-site.appspot.com/UA-36037335-10/GitHub/examples/job/work-queue-1/README.md?pixel)]() + diff --git a/examples/job/work-queue-1/job.yaml b/examples/job/work-queue-1/job.yaml new file mode 100644 index 0000000000000..50c1730f398da --- /dev/null +++ b/examples/job/work-queue-1/job.yaml @@ -0,0 +1,20 @@ +apiVersion: extensions/v1beta1 +kind: Job +metadata: + name: job-wq-1 +spec: + selector: + matchLabels: + app: job-wq-1 + completions: 8 + parallelism: 2 + template: + metadata: + name: job-wq-1 + labels: + app: job-wq-1 + spec: + containers: + - name: c + image: gcr.io//job-wq-1 + restartPolicy: OnFailure diff --git a/examples/job/work-queue-1/worker.py b/examples/job/work-queue-1/worker.py new file mode 100755 index 0000000000000..22e52a858570e --- /dev/null +++ b/examples/job/work-queue-1/worker.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python + +# Copyright 2015 The Kubernetes Authors All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Just prints standard out and sleeps for 10 seconds. +import sys +import time +print("Processing " + sys.stdin.lines()) +time.sleep(10) diff --git a/examples/job/work-queue-2/Dockerfile b/examples/job/work-queue-2/Dockerfile new file mode 100644 index 0000000000000..2de23b3c98340 --- /dev/null +++ b/examples/job/work-queue-2/Dockerfile @@ -0,0 +1,6 @@ +FROM python +RUN pip install redis +COPY ./worker.py /worker.py +COPY ./rediswq.py /rediswq.py + +CMD python worker.py diff --git a/examples/job/work-queue-2/README.md b/examples/job/work-queue-2/README.md new file mode 100644 index 0000000000000..e5a6d4309d089 --- /dev/null +++ b/examples/job/work-queue-2/README.md @@ -0,0 +1,305 @@ + + + + +WARNING +WARNING +WARNING +WARNING +WARNING + +

PLEASE NOTE: This document applies to the HEAD of the source tree

+ +If you are using a released version of Kubernetes, you should +refer to the docs that go with that version. + + +The latest release of this document can be found +[here](http://releases.k8s.io/release-1.1/examples/job/work-queue-2/README.md). + +Documentation for other releases can be found at +[releases.k8s.io](http://releases.k8s.io). + +-- + + + + + +# Example: Job with Work Queue with Pod Per Work Item + +In this example, we will run a Kubernetes Job with multiple parallel +worker processes. You may want to be familiar with the basic, +non-parallel, use of [Job](../../../docs/user-guide/jobs.md) first. + +In this example, as each pod is created, it picks up one unit of work +from a task queue, completes it, deletes it from the queue, and exits. + + +Here is an overview of the steps in this example: + +1. **Start a storage service to hold the work queue.** In this example, we use Redis to store + our work items. In the previous example, we used RabbitMQ. In this example, we use Redis and + a custom work-queue client library because AMQP does not provide a good way for clients to + detect when a finite-length work queue is empty. In practice you would set up a store such + as Redis once and reuse it for the work queues of many jobs, and other things. +1. **Create a queue, and fill it with messages.** Each message represents one task to be done. In + this example, a message is just an integer that we will do a lengthy computation on. +1. **Start a Job that works on tasks from the queue**. The Job starts several pods. Each pod takes + one task from the message queue, processes it, and repeats until the end of the queue is reached. + + +## Starting Redis + +For this example, for simplicitly, we will start a single instance of Redis. +See the [Redis Example](../../../examples/redis/README.md) for an example +of deploying Redis scaleably and redundantly. + +Start a temporary Pod running Redis and a service so we can find it. + +```console +$ kubectl create -f examples/job/work-queue-2/redis-pod.yaml +pod "redis-master" created +$ kubectl create -f examples/job/work-queue-2/redis-service.yaml +service "redis" created +``` + +## Filling the Queue with tasks + +Now lets fill the queue with some "tasks". In our example, our tasks are just strings to be +printed. + +Start a temporary interactive pod for running the Redis CLI + +```console +$ kubectl run -i --tty temp --image redis --command "/bin/sh" +Waiting for pod default/redis2-c7h78 to be running, status is Pending, pod ready: false +Hit enter for command prompt +``` + +Now hit enter, start the redis CLI, and create a list with some work items in it. + +``` +# redis-cli -h redis +redis:6379> rpush job2 "apple" +(integer) 1 +redis:6379> rpush job2 "banana" +(integer) 2 +redis:6379> rpush job2 "cherry" +(integer) 3 +redis:6379> rpush job2 "date" +(integer) 4 +redis:6379> rpush job2 "fig" +(integer) 5 +redis:6379> rpush job2 "grape" +(integer) 6 +redis:6379> rpush job2 "lemon" +(integer) 7 +redis:6379> rpush job2 "melon" +(integer) 8 +redis:6379> rpush job2 "orange" +(integer) 9 +redis:6379> lrange job2 0 -1 +1) "apple" +2) "banana" +3) "cherry" +4) "date" +5) "fig" +6) "grape" +7) "lemon" +8) "melon" +9) "orange" +``` + +So, the list with key `job2` will be our work queue. + +Note: if you do not have Kube DNS setup correctly, you may need to change +the first step of the above block to `redis-cli -h $REDIS_SERVICE_HOST`. + + +## Create an Image + +Now we are ready to create an image that we will run. + +We will use a python worker program with a redis client to read +the messages from the message queue. + +A simple Redis work queue client library is provided, +called rediswq.py ([Download](rediswq.py?raw=true)). + +The "worker" program in each Pod of the Job uses the work queue +client library to get work. Here it is: + + + +``` +#!/usr/bin/env python + +# Copyright 2015 The Kubernetes Authors All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import rediswq + +host="redis" +# Uncomment next two lines if you do not have Kube-DNS working. +# import os +# host = os.getenv("REDIS_SERVICE_HOST") + +q = rediswq.RedisWQ(name="job2", host="redis") +print("Worker with sessionID: " + q.sessionID()) +print("Inital queue state: empty=" + str(q.empty())) +while not q.empty(): + item = q.lease(lease_secs=10, block=True, timeout=2) + if item is not None: + itemstr = item.decode("utf=8") + print("Working on " + itemstr) + time.sleep(10) # Put your actual work here instead of sleep. + q.complete(item) + else: + print("Waiting for work") +print("Queue empty, exiting") +``` + +[Download example](worker.py?raw=true) + + +If you are working from the source tree, +change directory to the `examples/job/work-queue-2` directory. +Otherwise, download [`worker.py`](worker.py?raw=true), [`rediswq.py`](rediswq.py?raw=true), and [`Dockerfile`](Dockerfile?raw=true) +using above links. Then build the image: + +```console +$ docker build -t job-wq-2 . +``` + +For the [Docker Hub](https://hub.docker.com/), tag your app image with +your username and push to the Hub with the below commands. Replace +`` with your Hub username. + +``` +docker tag job-wq-2 /job-wq-2 +docker push /job-wq-2 +``` + +If you are using [Google Container +Registry](https://cloud.google.com/tools/container-registry/), tag +your app image with your project ID, and push to GCR. Replace +`` with your project ID. + +``` +docker tag job-wq-2 gcr.io//job-wq-2 +gcloud docker push gcr.io//job-wq-2 +``` + +## Defining a Job + +Here is the job definition: + + + + +```yaml +apiVersion: extensions/v1beta1 +kind: Job +metadata: + name: job-wq-2 +spec: + selector: + matchLabels: + app: job-wq-2 + completions: 1 + parallelism: 2 + template: + metadata: + name: job-wq-2 + labels: + app: job-wq-2 + spec: + containers: + - name: c + image: gcr.io/myproject/job-wq-2 + restartPolicy: OnFailure +``` + +[Download example](job.yaml?raw=true) + + +Be sure to edit the job template to +change `gcr.io/myproject` to your own path. + +In this example, each pod works on several items from the queue and then exits when there are no more items. +Since the workers themselves detect when the workqueue is empty, and the Job controller does not +know about the workqueue, it relies on the workers to signal when they are done working. +The workers signal that the queue is empty by exiting with success. So, as soon as any worker +exits with success, the controller knows the work is done, and the Pods will exit soon. +So, we set the completion count of the Job to 1. The job controller will wait for the other pods to complete +too. + + +## Running the Job + +So, now run the Job: + +```console +$ kubectl create -f ./job.yaml +``` + +Now wait a bit, then check on the job. + +```console +$ ./kubectl describe jobs/job-wq-2 +Name: job-wq-2 +Namespace: default +Image(s): gcr.io/causal-jigsaw-637/job-wq-2 +Selector: app in (job-wq-2) +Parallelism: 2 +Completions: 1 +Labels: app=job-wq-2 +Pods Statuses: 0 Running / 1 Succeeded / 0 Failed +No volumes. +Events: + FirstSeen LastSeen Count From SubobjectPath Reason Message + ───────── ──────── ───── ──── ───────────── ────── ─────── + 1m 1m 1 {job } SuccessfulCreate Created pod: job-wq-2-7r7b2 + +$ kubectl logs pods/job-wq-2-7r7b2 +Worker with sessionID: bbd72d0a-9e5c-4dd6-abf6-416cc267991f +Inital queue state: empty=False +Working on banana +Working on date +Working on lemon +``` + +As you can see, one of our pods worked on several work units. + +## Alternatives + +If running a queue service or modifying your containers to use a work queue is inconvenient, you may +want to consider one of the other [job patterns](../../../docs/user-guide/jobs.md#job-patterns). + +If you have a continuous stream of background processing work to run, then +consider running your background workers with a `replicationController` instead, +and consider running a background processing library such as +https://github.com/resque/resque. + + + +[![Analytics](https://kubernetes-site.appspot.com/UA-36037335-10/GitHub/examples/job/work-queue-2/README.md?pixel)]() + diff --git a/examples/job/work-queue-2/job.yaml b/examples/job/work-queue-2/job.yaml new file mode 100644 index 0000000000000..c1abf856ffaef --- /dev/null +++ b/examples/job/work-queue-2/job.yaml @@ -0,0 +1,20 @@ +apiVersion: extensions/v1beta1 +kind: Job +metadata: + name: job-wq-2 +spec: + selector: + matchLabels: + app: job-wq-2 + completions: 1 + parallelism: 2 + template: + metadata: + name: job-wq-2 + labels: + app: job-wq-2 + spec: + containers: + - name: c + image: gcr.io/myproject/job-wq-2 + restartPolicy: OnFailure diff --git a/examples/job/work-queue-2/redis-pod.yaml b/examples/job/work-queue-2/redis-pod.yaml new file mode 100644 index 0000000000000..ae0c43a793570 --- /dev/null +++ b/examples/job/work-queue-2/redis-pod.yaml @@ -0,0 +1,15 @@ +apiVersion: v1 +kind: Pod +metadata: + name: redis-master + labels: + app: redis +spec: + containers: + - name: master + image: redis + env: + - name: MASTER + value: "true" + ports: + - containerPort: 6379 diff --git a/examples/job/work-queue-2/redis-service.yaml b/examples/job/work-queue-2/redis-service.yaml new file mode 100644 index 0000000000000..85f2ca2271d0b --- /dev/null +++ b/examples/job/work-queue-2/redis-service.yaml @@ -0,0 +1,10 @@ +apiVersion: v1 +kind: Service +metadata: + name: redis +spec: + ports: + - port: 6379 + targetPort: 6379 + selector: + app: redis diff --git a/examples/job/work-queue-2/rediswq.py b/examples/job/work-queue-2/rediswq.py new file mode 100644 index 0000000000000..7135a903c7e4b --- /dev/null +++ b/examples/job/work-queue-2/rediswq.py @@ -0,0 +1,144 @@ +#!/usr/bin/env python + +# Copyright 2015 The Kubernetes Authors All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Based on http://peter-hoffmann.com/2012/python-simple-queue-redis-queue.html +# and the suggestion in the redis documentation for RPOPLPUSH, at +# http://redis.io/commands/rpoplpush, which suggests how to implement a work-queue. + + +import redis +import uuid +import hashlib + +class RedisWQ(object): + """Simple Finite Work Queue with Redis Backend + + This work queue is finite: as long as no more work is added + after workers start, the workers can detect when the queue + is completely empty. + + The items in the work queue are assumed to have unique values. + + This object is not intended to be used by multiple threads + concurrently. + """ + def __init__(self, name, **redis_kwargs): + """The default connection parameters are: host='localhost', port=6379, db=0 + + The work queue is identified by "name". The library may create other + keys with "name" as a prefix. + """ + self._db = redis.StrictRedis(**redis_kwargs) + # The session ID will uniquely identify this "worker". + self._session = str(uuid.uuid4()) + # Work queue is implemented as two queues: main, and processing. + # Work is initially in main, and moved to processing when a client picks it up. + self._main_q_key = name + self._processing_q_key = name + ":processing" + self._lease_key_prefix = name + ":leased_by_session:" + + def sessionID(self): + """Return the ID for this session.""" + return self._session + + def _main_qsize(self): + """Return the size of the main queue.""" + return self._db.llen(self._main_q_key) + + def _processing_qsize(self): + """Return the size of the main queue.""" + return self._db.llen(self._processing_q_key) + + def empty(self): + """Return True if the queue is empty, including work being done, False otherwise. + + False does not necessarily mean that there is work available to work on right now, + """ + return self._main_qsize() == 0 and self._processing_qsize() == 0 + +# TODO: implement this +# def check_expired_leases(self): +# """Return to the work queueReturn True if the queue is empty, False otherwise.""" +# # Processing list should not be _too_ long since it is approximately as long +# # as the number of active and recently active workers. +# processing = self._db.lrange(self._processing_q_key, 0, -1) +# for item in processing: +# # If the lease key is not present for an item (it expired or was +# # never created because the client crashed before creating it) +# # then move the item back to the main queue so others can work on it. +# if not self._lease_exists(item): +# TODO: transactionally move the key from processing queue to +# to main queue, while detecting if a new lease is created +# or if either queue is modified. + + def _itemkey(self, item): + """Returns a string that uniquely identifies an item (bytes).""" + return hashlib.sha224(item).hexdigest() + + def _lease_exists(self, item): + """True if a lease on 'item' exists.""" + return self._db.exists(self._lease_key_prefix + self._itemkey(item)) + + def lease(self, lease_secs=60, block=True, timeout=None): + """Begin working on an item the work queue. + + Lease the item for lease_secs. After that time, other + workers may consider this client to have crashed or stalled + and pick up the item instead. + + If optional args block is true and timeout is None (the default), block + if necessary until an item is available.""" + if block: + item = self._db.brpoplpush(self._main_q_key, self._processing_q_key, timeout=timeout) + else: + item = self._db.rpoplpush(self._main_q_key, self._processing_q_key) + if item: + # Record that we (this session id) are working on a key. Expire that + # note after the lease timeout. + # Note: if we crash at this line of the program, then GC will see no lease + # for this item an later return it to the main queue. + itemkey = self._itemkey(item) + self._db.setex(self._lease_key_prefix + itemkey, lease_secs, self._session) + return item + + def complete(self, value): + """Complete working on the item with 'value'. + + If the lease expired, the item may not have completed, and some + other worker may have picked it up. There is no indication + of what happened. + """ + self._db.lrem(self._processing_q_key, 0, value) + # If we crash here, then the GC code will try to move the value, but it will + # not be here, which is fine. So this does not need to be a transaction. + itemkey = self._itemkey(value) + self._db.delete(self._lease_key_prefix + itemkey, self._session) + +# TODO: add functions to clean up all keys associated with "name" when +# processing is complete. + +# TODO: add a function to add an item to the queue. Atomically +# check if the queue is empty and if so fail to add the item +# since other workers might think work is done and be in the process +# of exiting. + +# TODO(etune): move to my own github for hosting, e.g. github.com/erictune/rediswq-py and +# make it so it can be pip installed by anyone (see +# http://stackoverflow.com/questions/8247605/configuring-so-that-pip-install-can-work-from-github) + +# TODO(etune): finish code to GC expired leases, and call periodically +# e.g. each time lease times out. + diff --git a/examples/job/work-queue-2/worker.py b/examples/job/work-queue-2/worker.py new file mode 100755 index 0000000000000..ba42414757705 --- /dev/null +++ b/examples/job/work-queue-2/worker.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python + +# Copyright 2015 The Kubernetes Authors All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import rediswq + +host="redis" +# Uncomment next two lines if you do not have Kube-DNS working. +# import os +# host = os.getenv("REDIS_SERVICE_HOST") + +q = rediswq.RedisWQ(name="job2", host="redis") +print("Worker with sessionID: " + q.sessionID()) +print("Inital queue state: empty=" + str(q.empty())) +while not q.empty(): + item = q.lease(lease_secs=10, block=True, timeout=2) + if item is not None: + itemstr = item.decode("utf=8") + print("Working on " + itemstr) + time.sleep(10) # Put your actual work here instead of sleep. + q.complete(item) + else: + print("Waiting for work") +print("Queue empty, exiting")