Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add psycopg2 wrapper #350

Merged
merged 6 commits into from
Aug 16, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add psycopg2 wrapper
Closes #344
  • Loading branch information
kolanos committed Aug 14, 2019
commit 8d2505c8b82c2bfc8c3278983bd4d21d37e6e94f
75 changes: 75 additions & 0 deletions iopipe/contrib/trace/auto_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@
import uuid
import wrapt

from .dbapi import AdapterProxy, ConnectionProxy, CursorProxy
from .util import ensure_utf8

Request = collections.namedtuple(
"Request", ["command", "key", "hostname", "port", "connectionName", "db", "table"]
)


def collect_psycopg2_metrics(context, trace, instance):
pass


def collect_pymongo_metrics(context, trace, instance, response):
from pymongo.cursor import Cursor
from pymongo.results import (
Expand Down Expand Up @@ -126,6 +131,69 @@ def wrapper(wrapped, instance, args, kwargs):
)


def patch_psycopg2(context):
"""
Monkey patches psycopg2 client, if available. Overloads the
execute method to add tracing and metrics collection.
"""

class PGCursorProxy(CursorProxy):
def execute(self, *args, **kwargs):
if not hasattr(context, "iopipe") or not hasattr(
context.iopipe, "mark"
): # pragma: no cover
self.__wrapped__.execute(*args, **kwargs)

id = ensure_utf8(str(uuid.uuid4()))
with context.iopipe.mark(id):
self.__wrapped__.execute(*args, **kwargs)
trace = context.iopipe.mark.measure(id)
context.iopipe.mark.delete(id)
collect_psycopg2_metrics(context, trace, self)

class PGConnectionProxy(ConnectionProxy):
def cursor(self, *args, **kwargs):
cursor = self.__wrapped__.cursor(*args, **kwargs)
return PGCursorProxy(cursor, self)

def adapt_wrapper(wrapped, instance, args, kwargs):
adapter = wrapped(*args, **kwargs)
return AdapterProxy(adapter) if hasattr(adapter, "prepare") else adapter

def connect_wrapper(wrapped, instance, args, kwargs):
connection = wrapped(*args, **kwargs)
return PGConnectionProxy(connection, args, kwargs)

def register_type_wrapper(wrapped, instance, args, kwargs):
def _extract_arguments(obj, scope=None):
return obj, scope

obj, scope = _extract_arguments(*args, **kwargs)

if scope is not None:
if isinstance(scope, wrapt.ObjectProxy):
scope = scope.__wrapped__
return wrapped(obj, scope)

return wrapped(obj)

try:
wrapt.wrap_function_wrapper("psycopg2", "connect", connect_wrapper)
except Exception: # pragma: no cover
pass
else:
wrapt.wrap_function_wrapper("psycopg2.extensions", "adapt", adapt_wrapper)
wrapt.wrap_function_wrapper(
"psycopg2.extensions", "register_type", register_type_wrapper
)
wrapt.wrap_function_wrapper(
"psycopg2._psycopg", "register_type", register_type_wrapper
)
wrapt.wrap_function_wrapper(
"psycopg2._json", "register_type", register_type_wrapper
)


def patch_redis(context):
"""
Monkey patches redis client, if available. Overloads the
Expand Down Expand Up @@ -179,6 +247,11 @@ def pipeline_wrapper(wrapped, instance, args, kwargs): # pragma: no cover
wrapt.wrap_function_wrapper(module_name, class_method, pipeline_wrapper)


def restore_psycopg2():
"""Restores psycopg2"""
pass


def restore_pymongo():
"""Restores pymongo"""
try:
Expand Down Expand Up @@ -224,10 +297,12 @@ def patch_db_requests(context):
if not hasattr(context, "iopipe"):
return

patch_psycopg2(context)
patch_pymongo(context)
patch_redis(context)


def restore_db_requests():
restore_psycopg2()
restore_pymongo()
restore_redis()
48 changes: 48 additions & 0 deletions iopipe/contrib/trace/dbapi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import wrapt


class CursorProxy(wrapt.ObjectProxy):
def __init__(self, cursor, connection_proxy):
super(CursorProxy, self).__init__(cursor)

self._self_connection = connection_proxy

@property
def connection_proxy(self):
return self._self_connection

def execute(self, *args, **kwargs):
self.__wrapped__.execute(*args, **kwargs)


class ConnectionProxy(wrapt.ObjectProxy):
def __init__(self, connection, args, kwargs):
super(ConnectionProxy, self).__init__(connection)

self._self_args = args
self._self_kwargs = kwargs

def cursor(self, *args, **kwargs):
cursor = self.__wrapped__.cursor(*args, **kwargs)
return CursorProxy(cursor, self)

@property
def extract_hostname(self):
return self._self_kwargs.get("host", "localhost")

@property
def extract_dbname(self):
return self._self_kwargs.get("db", self._self_kwargs.get("database", ""))


class AdapterProxy(wrapt.ObjectProxy):
def prepare(self, *args, **kwargs):
if not args:
return self.__wrapped__.prepare(*args, **kwargs)

connection = args[0]

if isinstance(connection, wrapt.ObjectProxy):
connection = connection.__wrapped__

return self.__wrapped__.prepare(connection, *args[1:], **kwargs)
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"mock",
"mongomock==3.17.0",
"more-itertools<6.0.0",
"psycopg2==2.8.3",
"pymongo==3.8.0",
"pytest==4.1.0",
"pytest-benchmark==3.2.0",
Expand Down