Skip to content

Commit

Permalink
Servicebus docs updates (#4180)
Browse files Browse the repository at this point in the history
* Docs updates

* Some docs cleanup

* More docs fixes

* Fixed docs indentation

* More docs updates

* Added Example tags

* More docstrings

* Fixed return type

* Update CODEOWNERS

* Test fixes
  • Loading branch information
annatisch committed Jan 17, 2019
1 parent 5189c87 commit fda31ea
Show file tree
Hide file tree
Showing 20 changed files with 2,081 additions and 554 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@
/azure-mgmt-servicefabric/ @QingChenmsft
/azure-mgmt-sql/ @jaredmoo
/azure-mgmt-web/ @yugangw-msft
/azure-servicebus/ @annatisch
/azure-servicefabric/ @samedder
1 change: 1 addition & 0 deletions azure-servicebus/azure/servicebus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
'Message',
'BatchMessage',
'PeekMessage',
'AutoLockRenew',
'DeferredMessage',
'ServiceBusClient',
'QueueClient',
Expand Down
65 changes: 2 additions & 63 deletions azure-servicebus/azure/servicebus/aio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@
# license information.
#--------------------------------------------------------------------------

import asyncio
import datetime
import logging

from azure.servicebus.common.errors import (
ServiceBusError,
ServiceBusResourceNotFound,
Expand All @@ -24,13 +20,14 @@
AutoLockRenewTimeout)
from azure.servicebus.common.constants import ReceiveSettleMode, NEXT_AVAILABLE
from azure.servicebus.common.message import BatchMessage, PeekMessage
from azure.servicebus.common.utils import renewable_start_time, get_running_loop
from .async_message import Message, DeferredMessage
from .async_client import ServiceBusClient, QueueClient, TopicClient, SubscriptionClient
from .async_utils import AutoLockRenew


__all__ = [
'Message',
'AutoLockRenew',
'BatchMessage',
'PeekMessage',
'DeferredMessage',
Expand All @@ -53,61 +50,3 @@
'SessionLockExpired',
'AutoLockRenewFailed',
'AutoLockRenewTimeout']


_log = logging.getLogger(__name__)


class AutoLockRenew:

def __init__(self, loop=None):
self._shutdown = asyncio.Event()
self._futures = []
self.loop = loop or get_running_loop()
self.sleep_time = 1
self.renew_period = 10

def __aenter__(self):
return self

async def __aexit__(self, *args):
await self.shutdown()

def _renewable(self, renewable):
if self._shutdown.is_set():
return False
if hasattr(renewable, 'settled') and renewable.settled:
return False
if renewable.expired:
return False
return True

async def _auto_lock_renew(self, renewable, starttime, timeout):
_log.debug("Running async lock auto-renew for %r seconds", timeout)
try:
while self._renewable(renewable):
if (datetime.datetime.now() - starttime) >= datetime.timedelta(seconds=timeout):
_log.debug("Reached auto lock renew timeout - letting lock expire.")
raise AutoLockRenewTimeout("Auto-renew period ({} seconds) elapsed.".format(timeout))
if (renewable.locked_until - datetime.datetime.now()) <= datetime.timedelta(seconds=self.renew_period):
_log.debug("%r seconds or less until lock expires - auto renewing.", self.renew_period)
await renewable.renew_lock()
await asyncio.sleep(self.sleep_time)
except AutoLockRenewTimeout as e:
renewable.auto_renew_error = e
except Exception as e: # pylint: disable=broad-except
_log.debug("Failed to auto-renew lock: %r. Closing thread.", e)
error = AutoLockRenewFailed(
"Failed to auto-renew lock",
inner_exception=e)
renewable.auto_renew_error = error

def register(self, renewable, timeout=300):
starttime = renewable_start_time(renewable)
renew_future = asyncio.ensure_future(self._auto_lock_renew(renewable, starttime, timeout), loop=self.loop)
self._futures.append(renew_future)

async def shutdown(self, wait=True):
self._shutdown.set()
if wait:
await asyncio.wait(self._futures)
32 changes: 17 additions & 15 deletions azure-servicebus/azure/servicebus/aio/async_base_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@

from azure.servicebus.common.utils import create_properties, get_running_loop
from azure.servicebus.common.errors import (
_ServiceBusErrorPolicy,
ServiceBusError,
ServiceBusConnectionError,
InvalidHandlerState,
ServiceBusAuthorizationError,
ServiceBusErrorPolicy)
ServiceBusAuthorizationError)


_log = logging.getLogger(__name__)
Expand All @@ -45,7 +45,7 @@ def __init__(self, endpoint, auth_config, *, loop=None, connection=None, encodin
if not self.error_policy:
max_retries = kwargs.pop('max_message_retries', 3)
is_session = hasattr(self, 'session_id')
self.error_policy = ServiceBusErrorPolicy(max_retries=max_retries, is_session=is_session)
self.error_policy = _ServiceBusErrorPolicy(max_retries=max_retries, is_session=is_session)
self._handler = None
self._build_handler()

Expand Down Expand Up @@ -142,12 +142,13 @@ async def open(self):
.. note:: This operation is not thread-safe.
.. literalinclude:: ../../examples/async_examples/test_examples_async.py
:start-after: [START open_close_sender_directly]
:end-before: [END open_close_sender_directly]
:language: python
:dedent: 4
:caption: Explicitly open and close a Sender.
Example:
.. literalinclude:: ../examples/async_examples/test_examples_async.py
:start-after: [START open_close_sender_directly]
:end-before: [END open_close_sender_directly]
:language: python
:dedent: 4
:caption: Explicitly open and close a Sender.
"""
if self.running:
Expand Down Expand Up @@ -178,12 +179,13 @@ async def close(self, exception=None):
due to an error.
:type exception: Exception
.. literalinclude:: ../../examples/async_examples/test_examples_async.py
:start-after: [START open_close_sender_directly]
:end-before: [END open_close_sender_directly]
:language: python
:dedent: 4
:caption: Explicitly open and close a Sender.
Example:
.. literalinclude:: ../examples/async_examples/test_examples_async.py
:start-after: [START open_close_sender_directly]
:end-before: [END open_close_sender_directly]
:language: python
:dedent: 4
:caption: Explicitly open and close a Sender.
"""
self.running = False
Expand Down
Loading

0 comments on commit fda31ea

Please sign in to comment.