Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
DataGreed committed Jun 17, 2020
1 parent a5934ad commit 7630be4
Show file tree
Hide file tree
Showing 18 changed files with 492 additions and 1 deletion.
2 changes: 2 additions & 0 deletions .idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions .idea/django-eb-sqs-worker.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/inspectionProfiles/profiles_settings.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 64 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,65 @@
# django-eb-sqs-worker
Django SQS Worker for Amazon Elastic Beanstalk
Django SQS Worker for Amazon Elastic Beanstalk.

Lets you handle async jobs on Elastic Beanstalk Worker Environment sent via SQS and provides methods to send tasks to worker.

You can use the same Django codebase for both your Web Tier and Worker Tier environments and send tasks
from Web environment to Worker environment. Amazon fully manages autoscaling for you.
Tasks are sent via Amazon Simple Queue Service and are delivered to your worker with Elastic Beanstalk's SQS daemon.

Created by Alexey "DataGreed" Strelkov, published under MIT License. See LICENCE file for details.

# Installation

Install using pip (#TODO: publish on pip)

```
pip install django-eb-sqs-worker
```

`#TODO settings`
`#TODO urls`

# Usage

`#TODO`

## Defining Jobs

`#TODO`

## Sending jobs to worker

`#TODO`

## Periodic jobs

`#TODO`

# Additional configuration

`#TODO list all settings`

# Security

`#TODO`

# Tips

`#TODO`

## Delay abstraction

`#TODO`

## Using different cron files for different environments

`#TODO`

# Testing

`#TODO`

# Contributing

`#TODO`
Empty file added eb_sqs_worker/__init__.py
Empty file.
3 changes: 3 additions & 0 deletions eb_sqs_worker/admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from django.contrib import admin

# Register your models here.
5 changes: 5 additions & 0 deletions eb_sqs_worker/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from django.apps import AppConfig


class EbSqsConfig(AppConfig):
name = 'eb_sqs_worker'
Empty file.
3 changes: 3 additions & 0 deletions eb_sqs_worker/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from django.db import models

# Create your models here.
175 changes: 175 additions & 0 deletions eb_sqs_worker/sqs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
import json
import uuid

import boto3
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from django.utils.module_loading import import_string
import logging


logger = logging.getLogger(__name__)
try:
AWS_REGION = settings.AWS_EB_DEFAULT_REGION
except AttributeError:
raise ImproperlyConfigured("settings.AWS_EB_DEFAULT_REGION not set, please set it to use eb_sqs django app")

# TODO: make it lazy so we can run tests without setting this settings?
sqs = boto3.resource('sqs',
region_name=AWS_REGION,
aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY)


def send_task(task_name, task_kwargs, run_locally=None, queue_name=None):
"""
Sends task to SQS queue to be run asynchronously on worker environment instances.
If settings.AWS_EB_RUN_TASKS_LOCALLY is set to True, does not send the task
to SQS, but instead runs it right away in synchronous mode. May be useful
for testing when no SQS worker is set up.
:param task_name name of the task to run.
:param task_kwargs kwargs that are passed to the task
:param run_locally if set, forces the task to be run locally or sent to SQS
regardless of what settings.AWS_EB_RUN_TASKS_LOCALLY is set to.
:return:
"""

task_data = {
'task': task_name,
'arguments': task_kwargs
}

if run_locally is None:
run_locally = getattr(settings, "AWS_EB_RUN_TASKS_LOCALLY", False)

if run_locally:

task_id = uuid.uuid4().hex

task = SQSTask(task_data)
print(f"[{task_id}] Running task locally in sync mode: {task.get_pretty_info_string()}")

result = task.run_task()
print(f"[{task_id}] Task result: {result}")

else:

if queue_name is None:
try:
queue_name = settings.AWS_EB_DEFAULT_QUEUE_NAME
except AttributeError:
raise ImproperlyConfigured("settings.AWS_EB_DEFAULT_QUEUE_NAME must be set to send task to SQS queue")

# TODO: cache queues instead of looking the up every time
try:
# Get the queue. This returns an SQS.Queue instance
queue = sqs.get_queue_by_name(QueueName=queue_name)
except:
queue = sqs.create_queue(QueueName=queue_name)


# send task to sqs workers
# see https://boto3.amazonaws.com/v1/documentation/api/latest/guide/sqs.html
response = queue.send_message(MessageBody=json.dumps(task_data))
logger.info(f"Sent message {task_data} to SQS queue {queue_name}. Got response: {response}")

# print(response.get('MessageId'))
# print(response.get('MD5OfMessageBody'))


class SQSTask:

def __init__(self, data, request=None):
"""
:param data: dictionary with parsed request data that is used to populate
task fields.
Expects the following dict format:
{
"task": "job_name", # task name in settings.AWS_EB_ENABLED_TASKS
"arguments": { # arguments passed as kwargs to task function
"someArgument": "someValue",
"otherArgument": 123,
"anotherArgument": [1,"a", 3,4]
}
}
"""

self.data = data
self.task_name = data.get('task')
self.task_kwargs = data.get('arguments', {}) # task may have no args
self.last_result = None
self.scheduled_time = None
self.sender_id = None

# check that the task is specified in body if it's not, try to get it from header as it can be a peridoic job
# see more here https://docs.aws.amazon.com/elasticbeanstalk/latest/dg/using-features-managing-env-tiers
# .html#worker-periodictasks
if not self.task_name and request is not None:
debug_meta = request.META
debug_headers = request.headers
self.task_name = request.headers.get('X-Aws-Sqsd-Taskname')
self.scheduled_time = request.headers.get('X-Aws-Sqsd-Scheduled-At')
self.sender_id = request.headers.get('X-Aws-Sqsd-Sender-Id')

if not self.task_name:
raise ValueError("SQSTask must have a name either in body, or in X-Aws-Sqsd-Taskname header")

def is_periodic(self):
"""
:return: True if the task was set using amazon's periodic scheduler See
https://docs.aws.amazon.com/elasticbeanstalk/latest/dg/using-features-managing-env-tiers.html#worker
-periodictasks
"""
return self.scheduled_time is not None

def run_task(self):
"""
Looks up the function associated with task_name in
settings.AWS_EB_ENABLED_TASKS keys and calls corresponding function
with passed keyword arguments. Be sure that your task functions
all have keyword arguments.
AWS_EB_ENABLED_TASKS must be a dictionary:
{
"task_name": "path.to.task.function"
}
:return:
"""
if not getattr(settings, "AWS_EB_ENABLED_TASKS", None):
raise ImproperlyConfigured(f"settings.AWS_EB_ENABLED_TASKS not set, cannot run task {self.task_name}")

if not isinstance(settings.AWS_EB_ENABLED_TASKS, dict):
raise ImproperlyConfigured(f"settings.AWS_EB_ENABLED_TASKS must be a dict, "
f"not {type(settings.AWS_EB_ENABLED_TASKS)}")

try:
task_method_path = settings.AWS_EB_ENABLED_TASKS[self.task_name]
except KeyError:
raise ImproperlyConfigured(f"Task named {self.task_name} is not defined in settings.AWS_EB_ENABLED_TASKS")

task_method = import_string(task_method_path)

if not callable(task_method):
raise ImproperlyConfigured(f"Tasks defined in AWS_EB_ENABLED_TASKS must be callables. "
f"Object for task f{self.task_name} is not callable, "
f"it's a {type(task_method)}'")

result = task_method(**self.task_kwargs)
self.last_result = result

# TODO: make sure that the returned result is serialized and can be displayed in json correctly

return result

def get_pretty_info_string(self):
periodic_marker = ""
periodic_info = ""
if self.is_periodic():
periodic_marker = "Periodic "
periodic_info = f", scheduled at {self.scheduled_time} by {self.sender_id}"

result = f"{periodic_marker}Task({self.task_name}, kwargs: {self.task_kwargs}{periodic_info})"

return result
8 changes: 8 additions & 0 deletions eb_sqs_worker/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
def test_task(**kwargs):
"""
Test task, echos back all arguments that it receives.
"""

print(f"The test task is being run with kwargs {kwargs} and will echo them back")

return kwargs
Loading

0 comments on commit 7630be4

Please sign in to comment.