diff --git a/.bumpversion.cfg b/.bumpversion.cfg index b0798009e..cf4d11242 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 5.3.7 +current_version = 5.4.0rc1 commit = True tag = True parse = (?P\d+)\.(?P\d+)\.(?P\d+)(?P[a-z]+)? diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 1c9442c6b..1e4137ce3 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,6 @@ repos: - repo: https://github.com/asottile/pyupgrade - rev: v3.15.2 + rev: v3.16.0 hooks: - id: pyupgrade args: ["--py37-plus", "--keep-runtime-typing"] @@ -12,7 +12,7 @@ repos: args: ["--in-place", "--ignore-pass-after-docstring", "--imports"] - repo: https://github.com/PyCQA/flake8 - rev: 7.0.0 + rev: 7.1.0 hooks: - id: flake8 diff --git a/Changelog.rst b/Changelog.rst index 25d391085..fe53774b3 100644 --- a/Changelog.rst +++ b/Changelog.rst @@ -4,6 +4,41 @@ Change history ================ +.. _version-5.4.0rc1: + +5.4.0rc1 +======== +:release-date: 22 June, 2024 +:release-by: Tomer Nosrati + +We want to add a special thanks to contribution #2007 by @awmackowiak for fixing the Redis reconnection bug. +This release candidate aims to allow the community to test the changes and provide feedback. + +Please let us know if Redis is stable again! + +New: #1998, #2016, #2024, #1976 +The rest of the changes are bug fixes and dependency updates. + +Lastly, ``requests`` is limited to <2.32.0 per #2011. + +- Update mypy to 1.10.0 (#1988) +- Update pytest to 8.2.0 (#1990) +- fix: Fanout exchange messages mixed across virtual databases in Redis sentinel (#1986) +- Pin pymongo to latest version 4.7.2 (#1994) +- enable/fix test_etcd.py (resolves #2001) (#2002) +- Bump pytest from 8.2.0 to 8.2.1 (#2005) +- Limit requests<2.32.0 due to docker-py issue 3256 (#2011) +- enhance: allow users to disable broker heartbeats (#1998) +- enhance: allow uses to disable broker heartbeats by not providing a timeout (#1997,#1998) (#2016) +- Pin typing_extensions to latest version 4.12.1 (#2017) +- chore(typing): annotate `utils/debug.py` (#1714) +- Bump pytest from 8.2.1 to 8.2.2 (#2021) +- Bump pymongo from 4.7.2 to 4.7.3 (#2022) +- ConnectionPool can't be used after .resize(..., reset=True) (resolves #2018) (#2024) +- Fix Redis connections after reconnect - consumer starts consuming the tasks after crash. (#2007) +- Update flake8 to 7.1.0 (#2028) +- Add support for mongodb+srv scheme (#1976) + .. _version-5.3.7: 5.3.7 diff --git a/README.rst b/README.rst index a50ba759d..5c9f2aba9 100644 --- a/README.rst +++ b/README.rst @@ -4,7 +4,7 @@ |build-status| |coverage| |license| |wheel| |pyversion| |pyimp| |downloads| -:Version: 5.3.7 +:Version: 5.4.0rc1 :Documentation: https://kombu.readthedocs.io/ :Download: https://pypi.org/project/kombu/ :Source: https://github.com/celery/kombu/ diff --git a/docs/includes/introduction.txt b/docs/includes/introduction.txt index 8296eef4c..cfec93559 100644 --- a/docs/includes/introduction.txt +++ b/docs/includes/introduction.txt @@ -1,4 +1,4 @@ -:Version: 5.3.7 +:Version: 5.4.0rc1 :Web: https://kombu.readthedocs.io/ :Download: https://pypi.org/project/kombu/ :Source: https://github.com/celery/kombu/ diff --git a/kombu/__init__.py b/kombu/__init__.py index bcc1c01ad..0957a93aa 100644 --- a/kombu/__init__.py +++ b/kombu/__init__.py @@ -8,7 +8,7 @@ from collections import namedtuple from typing import Any, cast -__version__ = '5.3.7' +__version__ = '5.4.0rc1' __author__ = 'Ask Solem' __contact__ = 'auvipy@gmail.com' __homepage__ = 'https://kombu.readthedocs.io' diff --git a/kombu/connection.py b/kombu/connection.py index 13b1e9e59..beec4c738 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -1065,7 +1065,8 @@ def acquire_channel(self, block=False): def setup(self): if self.limit: q = self._resource.queue - while len(q) < self.limit: + # Keep in mind dirty/used resources + while len(q) < self.limit - len(self._dirty): self._resource.put_nowait(lazy(self.new)) def prepare(self, resource): @@ -1091,7 +1092,8 @@ def setup(self): channel = self.new() if self.limit: q = self._resource.queue - while len(q) < self.limit: + # Keep in mind dirty/used resources + while len(q) < self.limit - len(self._dirty): self._resource.put_nowait(lazy(channel)) def prepare(self, channel): diff --git a/kombu/log.py b/kombu/log.py index ed8d0a509..b9bca7f19 100644 --- a/kombu/log.py +++ b/kombu/log.py @@ -7,11 +7,15 @@ import os import sys from logging.handlers import WatchedFileHandler +from typing import TYPE_CHECKING from .utils.encoding import safe_repr, safe_str from .utils.functional import maybe_evaluate from .utils.objects import cached_property +if TYPE_CHECKING: + from logging import Logger + __all__ = ('LogMixin', 'LOG_LEVELS', 'get_loglevel', 'setup_logging') LOG_LEVELS = dict(logging._nameToLevel) @@ -21,7 +25,7 @@ DISABLE_TRACEBACKS = os.environ.get('DISABLE_TRACEBACKS') -def get_logger(logger): +def get_logger(logger: str | Logger): """Get logger by name.""" if isinstance(logger, str): logger = logging.getLogger(logger) diff --git a/kombu/mixins.py b/kombu/mixins.py index 14c1c1b9f..92acee980 100644 --- a/kombu/mixins.py +++ b/kombu/mixins.py @@ -195,10 +195,11 @@ def consume(self, limit=None, timeout=None, safety_interval=1, **kwargs): try: conn.drain_events(timeout=safety_interval) except socket.timeout: - conn.heartbeat_check() - elapsed += safety_interval - if timeout and elapsed >= timeout: - raise + if timeout: + conn.heartbeat_check() + elapsed += safety_interval + if elapsed >= timeout: + raise except OSError: if not self.should_stop: raise diff --git a/kombu/resource.py b/kombu/resource.py index 9f75eb068..d911587ac 100644 --- a/kombu/resource.py +++ b/kombu/resource.py @@ -144,15 +144,20 @@ def release(self, resource): def collect_resource(self, resource): pass - def force_close_all(self): + def force_close_all(self, close_pool=True): """Close and remove all resources in the pool (also those in use). Used to close resources from parent processes after fork (e.g. sockets/connections). + + Arguments: + --------- + close_pool (bool): If True (default) then the pool is marked + as closed. In case of False the pool can be reused. """ if self._closed: return - self._closed = True + self._closed = close_pool dirty = self._dirty resource = self._resource while 1: # - acquired @@ -188,7 +193,7 @@ def resize(self, limit, force=False, ignore_errors=False, reset=False): self._limit = limit if reset: try: - self.force_close_all() + self.force_close_all(close_pool=False) except Exception: pass self.setup() @@ -211,7 +216,9 @@ def __exit__( resource = self._resource # Items to the left are last recently used, so we remove those first. with getattr(resource, 'mutex', Noop()): - while len(resource.queue) > self.limit: + # keep in mind the dirty resources are not shrinking + while len(resource.queue) and \ + (len(resource.queue) + len(self._dirty)) > self.limit: R = resource.queue.popleft() if collect: self.collect_resource(R) diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py index 2ee976afc..ae8da59df 100644 --- a/kombu/transport/mongodb.py +++ b/kombu/transport/mongodb.py @@ -284,6 +284,10 @@ def _parse_uri(self, scheme='mongodb://'): client = self.connection.client hostname = client.hostname + if hostname.startswith('srv://'): + scheme = 'mongodb+srv://' + hostname = 'mongodb+' + hostname + if not hostname.startswith(scheme): hostname = scheme + hostname @@ -317,6 +321,9 @@ def _parse_uri(self, scheme='mongodb://'): options.update(parsed['options']) options = self._prepare_client_options(options) + if 'tls' in options: + options.pop('ssl') + return hostname, dbname, options def _prepare_client_options(self, options): diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 912b0fe0e..9311ecf5c 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -722,7 +722,7 @@ def __init__(self, *args, **kwargs): if not self.ack_emulation: # disable visibility timeout self.QoS = virtual.QoS - + self._registered = False self._queue_cycle = cycle_by_name(self.queue_order_strategy)() self.Client = self._get_client() self.ResponseError = self._get_response_error() @@ -747,6 +747,9 @@ def __init__(self, *args, **kwargs): raise self.connection.cycle.add(self) # add to channel poller. + # and set to true after sucessfuly added channel to the poll. + self._registered = True + # copy errors, in case channel closed but threads still # are still waiting for data. self.connection_errors = self.connection.connection_errors @@ -1201,7 +1204,10 @@ def _connparams(self, asynchronous=False): class Connection(connection_cls): def disconnect(self, *args): super().disconnect(*args) - channel._on_connection_disconnect(self) + # We remove the connection from the poller + # only if it has been added properly. + if channel._registered: + channel._on_connection_disconnect(self) connection_cls = Connection connparams['connection_class'] = connection_cls @@ -1433,6 +1439,8 @@ def _sentinel_managed_pool(self, asynchronous=False): ).connection_pool def _get_pool(self, asynchronous=False): + params = self._connparams(asynchronous=asynchronous) + self.keyprefix_fanout = self.keyprefix_fanout.format(db=params['db']) return self._sentinel_managed_pool(asynchronous) diff --git a/kombu/utils/debug.py b/kombu/utils/debug.py index bd20948fc..7889ed476 100644 --- a/kombu/utils/debug.py +++ b/kombu/utils/debug.py @@ -3,15 +3,25 @@ from __future__ import annotations import logging +from typing import TYPE_CHECKING from vine.utils import wraps from kombu.log import get_logger +if TYPE_CHECKING: + from logging import Logger + from typing import Any, Callable, Dict, List, Optional + + from kombu.transport.base import Transport + __all__ = ('setup_logging', 'Logwrapped') -def setup_logging(loglevel=logging.DEBUG, loggers=None): +def setup_logging( + loglevel: Optional[int] = logging.DEBUG, + loggers: Optional[List[str]] = None +) -> None: """Setup logging to stdout.""" loggers = ['kombu.connection', 'kombu.channel'] if not loggers else loggers for logger_name in loggers: @@ -25,19 +35,24 @@ class Logwrapped: __ignore = ('__enter__', '__exit__') - def __init__(self, instance, logger=None, ident=None): + def __init__( + self, + instance: Transport, + logger: Optional[Logger] = None, + ident: Optional[str] = None + ): self.instance = instance self.logger = get_logger(logger) self.ident = ident - def __getattr__(self, key): + def __getattr__(self, key: str) -> Callable: meth = getattr(self.instance, key) if not callable(meth) or key in self.__ignore: return meth @wraps(meth) - def __wrapped(*args, **kwargs): + def __wrapped(*args: List[Any], **kwargs: Dict[str, Any]) -> Callable: info = '' if self.ident: info += self.ident.format(self.instance) @@ -55,8 +70,8 @@ def __wrapped(*args, **kwargs): return __wrapped - def __repr__(self): + def __repr__(self) -> str: return repr(self.instance) - def __dir__(self): + def __dir__(self) -> List[str]: return dir(self.instance) diff --git a/requirements/default.txt b/requirements/default.txt index 8ebd5c9ce..dccb60083 100644 --- a/requirements/default.txt +++ b/requirements/default.txt @@ -1,4 +1,7 @@ -typing_extensions; python_version<"3.10" +typing_extensions==4.12.1; python_version<"3.10" amqp>=5.1.1,<6.0.0 vine backports.zoneinfo[tzdata]>=0.2.1; python_version < '3.9' +# due to this bug https://github.com/docker/docker-py/issues/3256 +# we need to hard-code the version of requests +requests<2.32.0 diff --git a/requirements/funtest.txt b/requirements/funtest.txt index 2976000e5..b5f346c30 100644 --- a/requirements/funtest.txt +++ b/requirements/funtest.txt @@ -2,7 +2,7 @@ redis>=4.5.2,!=5.0.2,!=4.5.5 # MongoDB transport -pymongo +pymongo==4.7.3 # Zookeeper transport kazoo diff --git a/requirements/pkgutils.txt b/requirements/pkgutils.txt index 607d30bde..b89819b4f 100644 --- a/requirements/pkgutils.txt +++ b/requirements/pkgutils.txt @@ -1,8 +1,8 @@ setuptools>=47.0.0 wheel>=0.29.0 -flake8==7.0.0 +flake8==7.1.0 tox>=4.4.8 sphinx2rst>=1.0 bumpversion pydocstyle==6.3.0 -mypy==1.9.0 +mypy==1.10.0 diff --git a/requirements/test-ci.txt b/requirements/test-ci.txt index e3dad2c1e..e9b7227f0 100644 --- a/requirements/test-ci.txt +++ b/requirements/test-ci.txt @@ -15,3 +15,4 @@ urllib3>=1.26.16; sys_platform != 'win32' -r extras/brotli.txt -r extras/zstd.txt -r extras/sqlalchemy.txt +-r extras/etcd.txt diff --git a/requirements/test.txt b/requirements/test.txt index 0eeecf4a8..25883e258 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -2,4 +2,4 @@ hypothesis<7 Pyro4 pytest-freezer pytest-sugar==1.0.0 -pytest==8.1.1 +pytest==8.2.2 diff --git a/setup.cfg b/setup.cfg index c087af811..ced537586 100644 --- a/setup.cfg +++ b/setup.cfg @@ -25,6 +25,7 @@ disallow_untyped_defs = True ignore_missing_imports = True files = kombu/abstract.py, + kombu/utils/debug.py, kombu/utils/time.py, kombu/utils/uuid.py, t/unit/utils/test_uuid.py, diff --git a/t/unit/test_connection.py b/t/unit/test_connection.py index e57f4db06..4f1b71d83 100644 --- a/t/unit/test_connection.py +++ b/t/unit/test_connection.py @@ -811,6 +811,93 @@ def test_acquire_no_limit(self): P = self.create_resource(None) P.acquire().release() + def test_acquire_resize_in_use(self): + P = self.create_resource(5) + self.assert_state(P, 5, 0) + chans = [P.acquire() for _ in range(5)] + self.assert_state(P, 0, 5) + with pytest.raises(RuntimeError): + P.resize(4) + [chan.release() for chan in chans] + self.assert_state(P, 5, 0) + + def test_acquire_resize_ignore_err_no_shrink(self): + P = self.create_resource(5) + self.assert_state(P, 5, 0) + chans = [P.acquire() for _ in range(5)] + self.assert_state(P, 0, 5) + P.resize(4, ignore_errors=True) + self.assert_state(P, 0, 5) + [chan.release() for chan in chans] + self.assert_state(P, 5, 0) + + def test_acquire_resize_ignore_err_shrink(self): + P = self.create_resource(5) + self.assert_state(P, 5, 0) + chans = [P.acquire() for _ in range(4)] + self.assert_state(P, 1, 4) + P.resize(4, ignore_errors=True) + self.assert_state(P, 0, 4) + [chan.release() for chan in chans] + self.assert_state(P, 4, 0) + + def test_acquire_resize_larger(self): + P = self.create_resource(1) + self.assert_state(P, 1, 0) + c1 = P.acquire() + self.assert_state(P, 0, 1) + with pytest.raises(P.LimitExceeded): + P.acquire() + P.resize(2) + self.assert_state(P, 1, 1) + c2 = P.acquire() + self.assert_state(P, 0, 2) + c1.release() + c2.release() + self.assert_state(P, 2, 0) + + def test_acquire_resize_force_smaller(self): + P = self.create_resource(2) + self.assert_state(P, 2, 0) + c1 = P.acquire() + c2 = P.acquire() + self.assert_state(P, 0, 2) + with pytest.raises(P.LimitExceeded): + P.acquire() + P.resize(1, force=True) # acts like reset + del c1 + del c2 + self.assert_state(P, 1, 0) + c1 = P.acquire() + self.assert_state(P, 0, 1) + with pytest.raises(P.LimitExceeded): + P.acquire() + c1.release() + self.assert_state(P, 1, 0) + + def test_acquire_resize_reset(self): + P = self.create_resource(2) + self.assert_state(P, 2, 0) + c1 = P.acquire() + c2 = P.acquire() + self.assert_state(P, 0, 2) + with pytest.raises(P.LimitExceeded): + P.acquire() + P.resize(3, reset=True) + del c1 + del c2 + self.assert_state(P, 3, 0) + c1 = P.acquire() + c2 = P.acquire() + c3 = P.acquire() + self.assert_state(P, 0, 3) + with pytest.raises(P.LimitExceeded): + P.acquire() + c1.release() + c2.release() + c3.release() + self.assert_state(P, 3, 0) + def test_replace_when_limit(self): P = self.create_resource(10) r = P.acquire() diff --git a/t/unit/test_mixins.py b/t/unit/test_mixins.py index 7a96091ed..a33412eb9 100644 --- a/t/unit/test_mixins.py +++ b/t/unit/test_mixins.py @@ -89,6 +89,26 @@ def se(*args, **kwargs): c.connection.drain_events.side_effect = se with pytest.raises(socket.error): next(it) + c.connection.heartbeat_check.assert_called() + + def test_consume_drain_no_heartbeat_check(self): + c, Acons, Bcons = self._context() + c.should_stop = False + it = c.consume(no_ack=True, timeout=None) + + def se(*args, **kwargs): + c.should_stop = True + raise socket.timeout() + c.connection.drain_events.side_effect = se + with pytest.raises(StopIteration): + next(it) + c.connection.heartbeat_check.assert_not_called() + + it = c.consume(no_ack=True, timeout=0) + c.connection.drain_events.side_effect = se + with pytest.raises(StopIteration): + next(it) + c.connection.heartbeat_check.assert_not_called() def test_Consumer_context(self): c, Acons, Bcons = self._context() diff --git a/t/unit/transport/test_etcd.py b/t/unit/transport/test_etcd.py index 1eadda519..11039e405 100644 --- a/t/unit/transport/test_etcd.py +++ b/t/unit/transport/test_etcd.py @@ -1,5 +1,6 @@ from __future__ import annotations +from array import array from queue import Empty from unittest.mock import Mock, patch @@ -14,9 +15,11 @@ class test_Etcd: def setup_method(self): self.connection = Mock() + self.connection._used_channel_ids = array('H') + self.connection.channel_max = 65535 self.connection.client.transport_options = {} self.connection.client.port = 2739 - self.client = self.patch('etcd.Client').return_value + self.client = self.patching('etcd.Client').return_value self.channel = Channel(connection=self.connection) def test_driver_version(self): @@ -42,26 +45,26 @@ def test_create_delete_queue(self): queue = 'mynewqueue' with patch('etcd.Lock'): - self.client.write.return_value = self.patch('etcd.EtcdResult') + self.client.write.return_value = self.patching('etcd.EtcdResult') assert self.channel._new_queue(queue) - self.client.delete.return_value = self.patch('etcd.EtcdResult') + self.client.delete.return_value = self.patching('etcd.EtcdResult') self.channel._delete(queue) def test_size(self): with patch('etcd.Lock'): - self.client.read.return_value = self.patch( + self.client.read.return_value = self.patching( 'etcd.EtcdResult', _children=[{}, {}]) assert self.channel._size('q') == 2 def test_get(self): with patch('etcd.Lock'): - self.client.read.return_value = self.patch( + self.client.read.return_value = self.patching( 'etcd.EtcdResult', _children=[{'key': 'myqueue', 'modifyIndex': 1, 'value': '1'}]) assert self.channel._get('myqueue') is not None def test_put(self): with patch('etcd.Lock'): - self.client.write.return_value = self.patch('etcd.EtcdResult') + self.client.write.return_value = self.patching('etcd.EtcdResult') assert self.channel._put('myqueue', 'mydata') is None diff --git a/t/unit/transport/test_redis.py b/t/unit/transport/test_redis.py index b471546b6..a2c015ec2 100644 --- a/t/unit/transport/test_redis.py +++ b/t/unit/transport/test_redis.py @@ -346,17 +346,173 @@ class XTransport(Transport): Channel = XChannel conn = Connection(transport=XTransport) + conn.transport.cycle = Mock(name='cycle') client.ping.side_effect = RuntimeError() with pytest.raises(RuntimeError): conn.channel() pool.disconnect.assert_called_with() pool.disconnect.reset_mock() + # Ensure that the channel without ensured connection to Redis + # won't be added to the cycle. + conn.transport.cycle.add.assert_not_called() + assert len(conn.transport.channels) == 0 pool_at_init = [None] with pytest.raises(RuntimeError): conn.channel() pool.disconnect.assert_not_called() + def test_redis_connection_added_to_cycle_if_ping_succeeds(self): + """Test should check the connection is added to the cycle only + if the ping to Redis was finished successfully.""" + # given: mock pool and client + pool = Mock(name='pool') + client = Mock(name='client') + + # override channel class with given mocks + class XChannel(Channel): + def __init__(self, *args, **kwargs): + self._pool = pool + super().__init__(*args, **kwargs) + + def _get_client(self): + return lambda *_, **__: client + + # override Channel in Transport with given channel + class XTransport(Transport): + Channel = XChannel + + # when: create connection with overridden transport + conn = Connection(transport=XTransport) + conn.transport.cycle = Mock(name='cycle') + # create the channel + chan = conn.channel() + # then: check if ping was called + client.ping.assert_called_once() + # the connection was added to the cycle + conn.transport.cycle.add.assert_called_once() + assert len(conn.transport.channels) == 1 + # the channel was flaged as registered into poller + assert chan._registered + + def test_redis_on_disconnect_channel_only_if_was_registered(self): + """Test shoud check if the _on_disconnect method is called only + if the channel was registered into the poller.""" + # given: mock pool and client + pool = Mock(name='pool') + client = Mock( + name='client', + ping=Mock(return_value=True) + ) + + # create RedisConnectionMock class + # for the possibility to run disconnect method + class RedisConnectionMock: + def disconnect(self, *args): + pass + + # override Channel method with given mocks + class XChannel(Channel): + connection_class = RedisConnectionMock + + def __init__(self, *args, **kwargs): + self._pool = pool + # counter to check if the method was called + self.on_disconect_count = 0 + super().__init__(*args, **kwargs) + + def _get_client(self): + return lambda *_, **__: client + + def _on_connection_disconnect(self, connection): + # increment the counter when the method is called + self.on_disconect_count += 1 + + # create the channel + chan = XChannel(Mock( + _used_channel_ids=[], + channel_max=1, + channels=[], + client=Mock( + transport_options={}, + hostname="127.0.0.1", + virtual_host=None))) + # create the _connparams with overriden connection_class + connparams = chan._connparams(asynchronous=True) + # create redis.Connection + redis_connection = connparams['connection_class']() + # the connection was added to the cycle + chan.connection.cycle.add.assert_called_once() + # and the ping was called + client.ping.assert_called_once() + # the channel was registered + assert chan._registered + # than disconnect the Redis connection + redis_connection.disconnect() + # the on_disconnect counter should be incremented + assert chan.on_disconect_count == 1 + + def test_redis__on_disconnect_should_not_be_called_if_not_registered(self): + """Test should check if the _on_disconnect method is not called because + the connection to Redis isn't established properly.""" + # given: mock pool + pool = Mock(name='pool') + # client mock with ping method which return ConnectionError + from redis.exceptions import ConnectionError + client = Mock( + name='client', + ping=Mock(side_effect=ConnectionError()) + ) + + # create RedisConnectionMock + # for the possibility to run disconnect method + class RedisConnectionMock: + def disconnect(self, *args): + pass + + # override Channel method with given mocks + class XChannel(Channel): + connection_class = RedisConnectionMock + + def __init__(self, *args, **kwargs): + self._pool = pool + # counter to check if the method was called + self.on_disconect_count = 0 + super().__init__(*args, **kwargs) + + def _get_client(self): + return lambda *_, **__: client + + def _on_connection_disconnect(self, connection): + # increment the counter when the method is called + self.on_disconect_count += 1 + + # then: exception was risen + with pytest.raises(ConnectionError): + # when: create the channel + chan = XChannel(Mock( + _used_channel_ids=[], + channel_max=1, + channels=[], + client=Mock( + transport_options={}, + hostname="127.0.0.1", + virtual_host=None))) + # create the _connparams with overriden connection_class + connparams = chan._connparams(asynchronous=True) + # create redis.Connection + redis_connection = connparams['connection_class']() + # the connection wasn't added to the cycle + chan.connection.cycle.add.assert_not_called() + # the ping was called once with the exception + client.ping.assert_called_once() + # the channel was not registered + assert not chan._registered + # then: disconnect the Redis connection + redis_connection.disconnect() + # the on_disconnect counter shouldn't be incremented + assert chan.on_disconect_count == 0 + def test_get_redis_ConnectionError(self): from redis.exceptions import ConnectionError @@ -1634,6 +1790,18 @@ def test_method_called(self): connection.channel() p.assert_called() + def test_keyprefix_fanout(self): + from kombu.transport.redis import SentinelChannel + with patch.object(SentinelChannel, '_sentinel_managed_pool'): + connection = Connection( + 'sentinel://localhost:65532/1', + transport_options={ + 'master_name': 'not_important', + }, + ) + channel = connection.channel() + assert channel.keyprefix_fanout == '/1.' + def test_getting_master_from_sentinel(self): with patch('redis.sentinel.Sentinel') as patched: connection = Connection( diff --git a/tox.ini b/tox.ini index 243c85d9e..3f48d5b6a 100644 --- a/tox.ini +++ b/tox.ini @@ -9,7 +9,9 @@ envlist = apicheck pydocstyle -requires = tox-docker>=3.0 +requires = + tox-docker<=4.1 + requests<2.32.0 [gh-actions] python =