-
Notifications
You must be signed in to change notification settings - Fork 40.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
New example: Distributed task queue #5126
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,247 @@ | ||
# Example: Distributed task queues with Celery, RabbitMQ and Flower | ||
|
||
## Introduction | ||
|
||
Celery is an asynchronous task queue based on distributed message passing. It is used to create execution units (i.e. tasks) which are then executed on one or more worker nodes, either synchronously or asynchronously. | ||
|
||
Celery is implemented in Python. | ||
|
||
Since Celery is based on message passing, it requires some middleware (to handle translation of the message between sender and receiver) called a _message broker_. RabbitMQ is a message broker often used in conjunction with Celery. | ||
|
||
This example will show you how to use Kubernetes to set up a very basic distributed task queue using Celery as the task queue and RabbitMQ as the message broker. It will also show you how to set up a Flower-based front end to monitor the tasks. | ||
|
||
## Goal | ||
|
||
At the end of the example, we will have: | ||
|
||
* Three pods: | ||
* A Celery task queue | ||
* A RabbitMQ message broker | ||
* A Flower frontend | ||
* A service that provides access to the message broker | ||
* A basic celery task that can be passed to the worker node | ||
|
||
|
||
## Prerequisites | ||
|
||
You should already have turned up a Kubernetes cluster. To get the most of this example, ensure that Kubernetes will create more than one minion (e.g. by setting your `NUM_MINIONS` environment variable to 2 or more). | ||
|
||
|
||
## Step 1: Start the RabbitMQ service | ||
|
||
The Celery task queue will need to communicate with the RabbitMQ broker. RabbitMQ will eventually appear on a separate pod, but since pods are ephemeral we need a service that can transparently route requests to RabbitMQ. | ||
|
||
Use the file `examples/celery-rabbitmq/rabbitmq-service.json`: | ||
|
||
```js | ||
{ | ||
"id": "rabbitmq-service", | ||
"kind": "Service", | ||
"apiVersion": "v1beta1", | ||
"port": 5672, | ||
"containerPort": 5672, | ||
"selector": { | ||
"app": "taskQueue" | ||
}, | ||
"labels": { | ||
"name": "rabbitmq-service" | ||
} | ||
} | ||
``` | ||
|
||
To start the service, run: | ||
|
||
```shell | ||
$ kubectl create -f examples/celery-rabbitmq/rabbitmq-service.json | ||
``` | ||
|
||
**NOTE**: If you're running Kubernetes from source, you can use `cluster/kubectl.sh` instead of `kubectl`. | ||
|
||
This service allows other pods to connect to the rabbitmq. To them, it will be seen as available on port 5672, although the service is routing the traffic to the container (also via port 5672). | ||
|
||
|
||
## Step 2: Fire up RabbitMQ | ||
|
||
A RabbitMQ broker can be turned up using the file `examples/celery-rabbitmq/rabbitmq-controller.json`: | ||
|
||
```js | ||
{ | ||
"id": "rabbitmq-controller", | ||
"kind": "ReplicationController", | ||
"apiVersion": "v1beta1", | ||
"desiredState": { | ||
"replicas": 1, | ||
"replicaSelector": {"name": "rabbitmq"}, | ||
"podTemplate": { | ||
"desiredState": { | ||
"manifest": { | ||
"version": "v1beta1", | ||
"id": "rabbitmq", | ||
"containers": [{ | ||
"name": "rabbitmq", | ||
"image": "dockerfile/rabbitmq", | ||
"cpu": 100, | ||
"ports": [{"containerPort": 5672, "hostPort": 5672}] | ||
}] | ||
} | ||
}, | ||
"labels": { | ||
"name": "rabbitmq", | ||
"app": "taskQueue" | ||
} | ||
} | ||
}, | ||
"labels": { | ||
"name": "rabbitmq" | ||
} | ||
} | ||
``` | ||
|
||
Running `$ kubectl create -f examples/celery-rabbitmq/rabbitmq-controller.json` brings up a replication controller that ensures one pod exists which is running a RabbitMQ instance. | ||
|
||
Note that bringing up the pod includes pulling down a docker image, which may take a few moments. This applies to all other pods in this example. | ||
|
||
|
||
## Step 3: Fire up Celery | ||
|
||
Bringing up the celery worker is done by running `$ kubectl create -f examples/celery-rabbitmq/celery-controller.json`, which contains this: | ||
|
||
```js | ||
{ | ||
"id": "celery-controller", | ||
"kind": "ReplicationController", | ||
"apiVersion": "v1beta1", | ||
"desiredState": { | ||
"replicas": 1, | ||
"replicaSelector": {"name": "celery"}, | ||
"podTemplate": { | ||
"desiredState": { | ||
"manifest": { | ||
"version": "v1beta1", | ||
"id": "celery", | ||
"containers": [{ | ||
"name": "celery", | ||
"image": "endocode/celery-app-add", | ||
"cpu": 100, | ||
"ports": [{"containerPort": 5672, "hostPort": 5672}] | ||
}] | ||
} | ||
}, | ||
"labels": { | ||
"name": "celery", | ||
"app": "taskQueue" | ||
} | ||
} | ||
}, | ||
"labels": { | ||
"name": "celery" | ||
} | ||
} | ||
``` | ||
|
||
There are several things to point out here... | ||
|
||
Like the RabbitMQ controller, this controller ensures that there is always a pod is running a Celery worker instance. The celery-app-add Docker image is an extension of the standard Celery image. This is the Dockerfile: | ||
|
||
``` | ||
FROM dockerfile/celery | ||
|
||
ADD celery_conf.py /data/celery_conf.py | ||
ADD run_tasks.py /data/run_tasks.py | ||
ADD run.sh /usr/local/bin/run.sh | ||
|
||
ENV C_FORCE_ROOT 1 | ||
|
||
CMD ["/bin/bash", "/usr/local/bin/run.sh"] | ||
``` | ||
|
||
The celery\_conf.py contains the defintion of a simple Celery task that adds two numbers. This last line starts the Celery worker. | ||
|
||
**NOTE:** `ENV C_FORCE_ROOT 1` forces Celery to be run as the root user, which is *not* recommended in production! | ||
|
||
The celery\_conf.py file contains the following: | ||
|
||
```python | ||
import os | ||
|
||
from celery import Celery | ||
|
||
# Get Kubernetes-provided address of the broker service | ||
broker_service_host = os.environ.get('RABBITMQ_SERVICE_SERVICE_HOST') | ||
|
||
app = Celery('tasks', broker='amqp://guest@%s//' % broker_service_host, backend='amqp') | ||
|
||
@app.task | ||
def add(x, y): | ||
return x + y | ||
``` | ||
|
||
Assuming you're already familiar with how Celery works, everything here should be familiar, except perhaps the part `os.environ.get('RABBITMQ_SERVICE_SERVICE_HOST')`. This environment variable contains the IP address of the RabbitMQ service we created in step 1. Kubernetes automatically provides this environment variable to all containers which have the same app label as that defined in the RabbitMQ service (in this case "taskQueue"). In the Python code above, this has the effect of automatically filling in the broker address when the pod is started. | ||
|
||
The second python script (run\_tasks.py) periodically executes the `add()` task every 5 seconds with a couple of random numbers. | ||
|
||
The question now is, how do you see what's going on? | ||
|
||
|
||
## Step 4: Put a frontend in place | ||
|
||
Flower is a web-based tool for monitoring and administrating Celery clusters. By connecting to the node that contains Celery, you can see the behaviour of all the workers and their tasks in real-time. | ||
|
||
To bring up the frontend, run this command `$ kubectl create -f examples/celery-rabbitmq/celery-controller.json`. This controller is defined as so: | ||
|
||
```js | ||
{ | ||
"id": "flower-controller", | ||
"kind": "ReplicationController", | ||
"apiVersion": "v1beta1", | ||
"desiredState": { | ||
"replicas": 1, | ||
"replicaSelector": {"name": "flower"}, | ||
"podTemplate": { | ||
"desiredState": { | ||
"manifest": { | ||
"version": "v1beta1", | ||
"id": "flower", | ||
"containers": [{ | ||
"name": "flower", | ||
"image": "endocode/flower", | ||
"cpu": 100, | ||
"ports": [{"containerPort": 5555, "hostPort": 5555}] | ||
}] | ||
} | ||
}, | ||
"labels": { | ||
"name": "flower", | ||
"app": "taskQueue" | ||
} | ||
} | ||
}, | ||
"labels": { | ||
"name": "flower" | ||
} | ||
} | ||
``` | ||
|
||
This will bring up a new pod with Flower installed and port 5555 (Flower's default port) exposed. This image uses the following command to start Flower: | ||
|
||
```sh | ||
flower --broker=amqp://guest:guest@${RABBITMQ_SERVICE_SERVICE_HOST:localhost}:5672// | ||
``` | ||
|
||
Again, it uses the Kubernetes-provided environment variable to obtain the address of the RabbitMQ service. | ||
|
||
Once all pods are up and running, running `kubectl get pods` will display something like this: | ||
|
||
``` | ||
POD IP CONTAINER(S) IMAGE(S) HOST LABELS STATUS | ||
celery-controller-h3x9k 10.246.1.11 celery endocode/celery-app-add 10.245.1.3/10.245.1.3 app=taskQueue,name=celery Running | ||
flower-controller-cegta 10.246.2.17 flower endocode/flower 10.245.1.4/10.245.1.4 app=taskQueue,name=flower Running | ||
kube-dns-fplln 10.246.1.3 etcd quay.io/coreos/etcd:latest 10.245.1.3/10.245.1.3 k8s-app=kube-dns,kubernetes.io/cluster-service=true Running | ||
kube2sky kubernetes/kube2sky:1.0 | ||
skydns kubernetes/skydns:2014-12-23-001 | ||
rabbitmq-controller-pjzb3 10.246.2.16 rabbitmq dockerfile/rabbitmq 10.245.1.4/10.245.1.4 app=taskQueue,name=rabbitmq Running | ||
|
||
``` | ||
|
||
Now you know on which host Flower is running (in this case, 10.245.1.4), you can open your browser and enter the address (e.g. `http://10.245.1.4:5555`. If you click on the tab called "Tasks", you should see an ever-growing list of tasks called "celery_conf.add" which the run\_tasks.py script is dispatching. | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
FROM dockerfile/celery | ||
|
||
ADD celery_conf.py /data/celery_conf.py | ||
ADD run_tasks.py /data/run_tasks.py | ||
ADD run.sh /usr/local/bin/run.sh | ||
|
||
ENV C_FORCE_ROOT 1 | ||
|
||
CMD ["/bin/bash", "/usr/local/bin/run.sh"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
import os | ||
|
||
from celery import Celery | ||
|
||
# Get Kubernetes-provided address of the broker service | ||
broker_service_host = os.environ.get('RABBITMQ_SERVICE_SERVICE_HOST') | ||
|
||
app = Celery('tasks', broker='amqp://guest@%s//' % broker_service_host, backend='amqp') | ||
|
||
@app.task | ||
def add(x, y): | ||
return x + y | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
#!/bin/bash | ||
|
||
# Copyright 2014 Google Inc. All rights reserved. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
# Run the celery worker | ||
/usr/local/bin/celery -A celery_conf worker -f /data/celery.log & | ||
|
||
# Start firing periodic tasks automatically | ||
python /data/run_tasks.py |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
import random | ||
import syslog | ||
import time | ||
|
||
from celery_conf import add | ||
|
||
while True: | ||
x = random.randint(1, 10) | ||
y = random.randint(1, 10) | ||
res = add.delay(x, y) | ||
time.sleep(5) | ||
if res.ready(): | ||
res.get() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
{ | ||
"id": "celery-controller", | ||
"kind": "ReplicationController", | ||
"apiVersion": "v1beta1", | ||
"desiredState": { | ||
"replicas": 1, | ||
"replicaSelector": {"name": "celery"}, | ||
"podTemplate": { | ||
"desiredState": { | ||
"manifest": { | ||
"version": "v1beta1", | ||
"id": "celery", | ||
"containers": [{ | ||
"name": "celery", | ||
"image": "endocode/celery-app-add", | ||
"cpu": 100, | ||
"ports": [{"containerPort": 5672, "hostPort": 5672}] | ||
}] | ||
} | ||
}, | ||
"labels": { | ||
"name": "celery", | ||
"app": "taskQueue" | ||
} | ||
} | ||
}, | ||
"labels": { | ||
"name": "celery" | ||
} | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
{ | ||
"id": "flower-controller", | ||
"kind": "ReplicationController", | ||
"apiVersion": "v1beta1", | ||
"desiredState": { | ||
"replicas": 1, | ||
"replicaSelector": {"name": "flower"}, | ||
"podTemplate": { | ||
"desiredState": { | ||
"manifest": { | ||
"version": "v1beta1", | ||
"id": "flower", | ||
"containers": [{ | ||
"name": "flower", | ||
"image": "endocode/flower", | ||
"cpu": 100, | ||
"ports": [{"containerPort": 5555, "hostPort": 5555}] | ||
}] | ||
} | ||
}, | ||
"labels": { | ||
"name": "flower", | ||
"app": "taskQueue" | ||
} | ||
} | ||
}, | ||
"labels": { | ||
"name": "flower" | ||
} | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
FROM ubuntu:trusty | ||
|
||
# update the package repository and install python pip | ||
RUN apt-get -y update && apt-get -y install python-dev python-pip | ||
|
||
# install flower | ||
RUN pip install flower | ||
|
||
# Make sure we expose port 5555 so that we can connect to it | ||
EXPOSE 5555 | ||
|
||
ADD run_flower.sh /usr/local/bin/run_flower.sh | ||
|
||
# Running flower | ||
CMD ["/bin/bash", "/usr/local/bin/run_flower.sh"] |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: preference for yaml rather htan json