-
Notifications
You must be signed in to change notification settings - Fork 13
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
13 changed files
with
472 additions
and
9 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -131,7 +131,6 @@ dmypy.json | |
.vscode | ||
.idea | ||
config | ||
.gitlab-ci.yml | ||
openshift | ||
coverage_html_report | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
from celery import Celery | ||
from flask import Blueprint, Flask, redirect | ||
from flask_cors import CORS | ||
from werkzeug.middleware.proxy_fix import ProxyFix | ||
|
||
from captcha_api.database import db | ||
from captcha_api.log_utils import configure_logging | ||
from captcha_api.rest import api | ||
|
||
index_bp = Blueprint('index', __name__) | ||
|
||
celery = Celery() | ||
|
||
|
||
@index_bp.route("/") | ||
def index(): | ||
return redirect("/swagger-ui") | ||
|
||
|
||
def read_env_config(app: Flask): | ||
try: | ||
app.config.from_envvar("CAPTCHA_API_CONFIG") | ||
except Exception as e: | ||
app.logger.error(e) | ||
|
||
|
||
def setup_api(app: Flask): | ||
api.version = app.config['API_VERSION'] | ||
api.prefix = f"/api/{api.version}" | ||
api.init_app(app) | ||
|
||
|
||
def make_celery(app): | ||
""" | ||
Sets up Celery as a background task runner for the application | ||
""" | ||
if app.config.get('USE_CELERY', False): | ||
celery.conf.broker_url = app.config['CELERY_BROKER_URL'] | ||
celery.conf.result_backend = app.config['CELERY_RESULT_BACKEND'] | ||
celery.conf.update(app.config) | ||
|
||
class ContextTask(celery.Task): | ||
def __call__(self, *args, **kwargs): | ||
with app.app_context(): | ||
return self.run(*args, **kwargs) | ||
|
||
celery.Task = ContextTask | ||
else: | ||
app.logger.warn('Celery is disabled!') | ||
|
||
|
||
def create_app() -> Flask: | ||
app = Flask(__name__) | ||
app.wsgi_app = ProxyFix(app.wsgi_app) | ||
CORS(app) | ||
app.url_map.strict_slashes = False | ||
app.config.from_object("captcha_api.config") | ||
app.logger = configure_logging() | ||
read_env_config(app) | ||
|
||
# Create a Celery connection | ||
make_celery(app) | ||
|
||
# DB initialization | ||
with app.app_context(): | ||
db.init_app(app) | ||
db.create_all() | ||
|
||
setup_api(app) | ||
|
||
# Blueprints | ||
app.register_blueprint(index_bp) | ||
|
||
return app |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
from io import BytesIO | ||
from random import randint | ||
from typing import Tuple | ||
|
||
from PIL import Image, ImageDraw, ImageFont, ImageOps | ||
|
||
|
||
def _get_random_color(): | ||
# random color rgb | ||
return randint(120, 200), randint(120, 200), randint(120, 200) | ||
|
||
|
||
def _get_random_code(): | ||
# random characters | ||
codes = [[chr(i) for i in range(49, 58)], [chr(i) | ||
for i in range(65, 91)], [chr(i) for i in range(97, 123)]] | ||
codes = codes[randint(0, 2)] | ||
return codes[randint(0, len(codes) - 1)] | ||
|
||
|
||
def _generate_rotated_char(c, font): | ||
txt = Image.new('L', font.getsize(c)) | ||
blank_image = ImageDraw.Draw(txt) | ||
blank_image.text((0, 0), c, font=font, fill=255) | ||
rotated_text = txt.rotate(randint(-50, 50), expand=1) | ||
return rotated_text | ||
|
||
|
||
class CaptchaGenerator: | ||
""" | ||
Generates captcha images based on the parameters | ||
""" | ||
|
||
def __init__(self, fontname="DejaVuSerif.ttf", width=250, height=60): | ||
self.width = width | ||
self.height = height | ||
self.font = ImageFont.truetype(fontname, size=36) | ||
|
||
def generate_captcha(self, length=6) -> Tuple[BytesIO, str]: | ||
""" | ||
Generate a captcha image | ||
:return: A tuple consisting of the image bytes and the text | ||
""" | ||
img = Image.new("RGB", (self.width, self.height), (250, 250, 250)) | ||
draw = ImageDraw.Draw(img) | ||
# captcha text | ||
text = "" | ||
for i in range(length): | ||
char = _get_random_code() | ||
text += char | ||
|
||
rotated = _generate_rotated_char(char, self.font) | ||
colorized = ImageOps.colorize( | ||
rotated, (0, 0, 0), _get_random_color()) | ||
img.paste(colorized, (int(self.width * 0.13 * (i + 1)), | ||
int(self.height * 0.2)), rotated) | ||
# add interference line | ||
for i in range(15): | ||
x_1 = randint(0, self.width) | ||
y_1 = randint(0, self.height) | ||
x_2 = randint(0, self.width) | ||
y_2 = randint(0, self.height) | ||
draw.line((x_1, y_1, x_2, y_2), fill=_get_random_color()) | ||
# add interference point | ||
for i in range(16): | ||
draw.point((randint(0, self.width), randint( | ||
0, self.height)), fill=_get_random_color()) | ||
# save the picture | ||
img_byte_array = BytesIO() | ||
img.save(img_byte_array, format='jpeg') | ||
return img_byte_array, text |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
import os | ||
|
||
from celery.schedules import crontab | ||
|
||
from captcha_api.app_factory import celery, create_app | ||
from captcha_api.tasks import delete_old_captchas | ||
|
||
app = create_app() | ||
|
||
|
||
@celery.on_after_configure.connect | ||
def setup_periodic_tasks(sender, **kwargs): | ||
# Executes every hour the delete old captchas task | ||
sender.add_periodic_task( | ||
crontab(hour="*/1"), | ||
delete_old_captchas.s(), | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
API_VERSION = "v1.0" | ||
|
||
# Local database | ||
SQLALCHEMY_DATABASE_URI = "sqlite:///test.db" | ||
DEFAULT_CAPTCHA_FONT = "DejaVuSerif.ttf" | ||
|
||
# Set to True for Celery background tasks functionality | ||
USE_CELERY = False | ||
|
||
CELERY_BROKER_URL = "redis://localhost:6379" | ||
CELERY_RESULT_BACKEND = "redis://localhost:6379" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
import datetime | ||
|
||
from flask_sqlalchemy import SQLAlchemy | ||
|
||
db = SQLAlchemy() | ||
|
||
|
||
class Captcha(db.Model): | ||
id = db.Column(db.String(36), primary_key=True) | ||
answer = db.Column(db.String(120), nullable=False) | ||
creation_time = db.Column(db.DateTime, default=datetime.datetime.utcnow, nullable=False) | ||
|
||
def __repr__(self): | ||
return '<Captcha %r>' % self.id |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
import logging | ||
import sys | ||
|
||
|
||
def configure_logging(): | ||
"""Logging setup | ||
""" | ||
logger = logging.getLogger(__name__) | ||
logger.setLevel(logging.DEBUG) | ||
formatter = logging.Formatter("%(asctime)s %(levelname)s %(filename)s:%(lineno)d - %(message)s") | ||
|
||
# adds console handler to logger instance the first time this code is called | ||
# avoids adding extra handlers to the instance, which causes duplicate logs msgs | ||
if not len(logger.handlers): | ||
console = logging.StreamHandler(sys.stdout) | ||
console.setFormatter(formatter) | ||
logger.addHandler(console) | ||
|
||
# Requests logs some stuff at INFO that we don't want | ||
# unless we have DEBUG | ||
requests_log = logging.getLogger("requests") | ||
requests_log.setLevel(logging.ERROR) | ||
return logger |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
import logging | ||
import re | ||
from base64 import b64encode | ||
from copy import deepcopy | ||
from datetime import datetime, timedelta | ||
from urllib.parse import urlparse | ||
from uuid import uuid4 | ||
|
||
from flask import current_app, jsonify, request | ||
from flask_restx import Api, Resource, fields | ||
|
||
from captcha_api.captcha_generator import CaptchaGenerator | ||
from captcha_api.database import Captcha, db | ||
|
||
api = Api( | ||
title="CAPTCHA API", | ||
description="A simple API for handling CAPTCHA", | ||
security={'oauth2': ['api']}, | ||
doc="/swagger-ui", | ||
) | ||
|
||
|
||
captcha_ns = api.namespace( | ||
"captcha", description="Utilities for validating and generating CAPTCHA") | ||
|
||
|
||
captcha_model = captcha_ns.model( | ||
"CaptchaAnswer", | ||
{ | ||
"answer": fields.String, | ||
"id": fields.String | ||
} | ||
) | ||
|
||
|
||
def get_request_data(request): | ||
""" | ||
Gets the data from the request | ||
""" | ||
# https://stackoverflow.com/questions/10434599/how-to-get-data-received-in-flask-request/25268170 | ||
data = request.form.to_dict() if request.form else request.get_json() | ||
if not data: | ||
return {} | ||
return data | ||
|
||
|
||
@captcha_ns.route("/") | ||
class CaptchaResource(Resource): | ||
""" | ||
Handling captchas | ||
""" | ||
|
||
def __init__(self, api=None, *args, **kwargs): | ||
super().__init__(api=api, *args, **kwargs) | ||
self.generator = CaptchaGenerator( | ||
fontname=api.app.config['DEFAULT_CAPTCHA_FONT']) | ||
|
||
def get(self): | ||
""" | ||
Generate a new captcha text | ||
""" | ||
img_array, answer = self.generator.generate_captcha() | ||
captcha_id = str(uuid4()) | ||
new_captcha = Captcha(id=captcha_id, answer=answer) | ||
db.session.add(new_captcha) | ||
db.session.commit() | ||
return { | ||
"id": captcha_id, | ||
"img": "data:image/jpeg;base64," + b64encode(img_array.getvalue()).decode() | ||
} | ||
|
||
@captcha_ns.doc(body=captcha_model) | ||
def post(self): | ||
""" | ||
Solve a captcha and match it with the database thing | ||
""" | ||
data = get_request_data(request) | ||
|
||
existing = Captcha.query.filter_by(id=data["id"]).first() | ||
if not existing: | ||
return {"message": "Not found"}, 404 | ||
|
||
time_difference = datetime.utcnow() - existing.creation_time | ||
if time_difference > timedelta(minutes=1): | ||
db.session.delete(existing) | ||
db.session.commit() | ||
return {"message": "You did not answer fast enough!"}, 400 | ||
|
||
if data["answer"].casefold() != existing.answer.casefold(): | ||
db.session.delete(existing) | ||
db.session.commit() | ||
return {"message": "Invalid answer"}, 400 | ||
|
||
db.session.delete(existing) | ||
db.session.commit() | ||
return {"message": "Valid"} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
from datetime import datetime, timedelta | ||
|
||
from captcha_api.app_factory import celery | ||
from captcha_api.database import Captcha, db | ||
|
||
|
||
@celery.task | ||
def delete_old_captchas(): | ||
one_hour_ago = datetime.utcnow() - timedelta(hours=1) | ||
old_captchas = Captcha.query.filter( | ||
Captcha.creation_time <= one_hour_ago) | ||
old_captchas.delete() | ||
db.session.commit() |
Oops, something went wrong.