[Python-modules-commits] [kombu] 01/05: Import kombu_3.0.32.orig.tar.gz
Michael Fladischer
fladi at moszumanska.debian.org
Sat Jan 2 16:25:45 UTC 2016
This is an automated email from the git hooks/post-receive script.
fladi pushed a commit to branch master
in repository kombu.
commit 9c5b4b686c6d376e02ccf6f1c5229251b673abd3
Author: Michael Fladischer <FladischerMichael at fladi.at>
Date: Sat Jan 2 16:56:52 2016 +0100
Import kombu_3.0.32.orig.tar.gz
---
Changelog | 24 +++++++++++-
PKG-INFO | 4 +-
README.rst | 2 +-
docs/changelog.rst | 24 +++++++++++-
docs/introduction.rst | 2 +-
kombu.egg-info/PKG-INFO | 4 +-
kombu/__init__.py | 2 +-
kombu/async/hub.py | 5 ---
kombu/tests/transport/test_redis.py | 21 +++++------
kombu/transport/redis.py | 75 +++++++++++++++++++++++--------------
10 files changed, 110 insertions(+), 53 deletions(-)
diff --git a/Changelog b/Changelog
index 36006c1..a5ef237 100644
--- a/Changelog
+++ b/Changelog
@@ -4,11 +4,33 @@
Change history
================
+.. _version-3.0.32:
+
+3.0.32
+======
+:release-date: 2015-12-16 02:29 P.M PST
+:release-by: Ask Solem
+
+- Redis: Fixed bug introduced in 3.0.31 where the redis transport always
+ connects to localhost, regardless of host setting.
+
+.. _version-3.0.31:
+
+3.0.31
+======
+:release-date: 2015-12-16 12:00 P.M PST
+:release-by: Ask Solem
+
+- Redis: Fixed bug introduced in 3.0.30 where socket was prematurely
+ disconnected.
+
+- Hub: Removed debug logging message: "Deregistered fd..." (Issue #549).
+
.. _version-3.0.30:
3.0.30
======
-:release-date: 2015-12-07 12:28 A.M PDT
+:release-date: 2015-12-07 12:28 A.M PST
:release-by: Ask Solem
- Fixes compatiblity with uuid in Python 2.7.11 and 3.5.1.
diff --git a/PKG-INFO b/PKG-INFO
index 0e012ea..5d7ec1f 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,6 +1,6 @@
Metadata-Version: 1.1
Name: kombu
-Version: 3.0.30
+Version: 3.0.32
Summary: Messaging library for Python
Home-page: http://kombu.readthedocs.org
Author: Ask Solem
@@ -12,7 +12,7 @@ Description: .. _kombu-index:
kombu - Messaging library for Python
========================================
- :Version: 3.0.30
+ :Version: 3.0.32
`Kombu` is a messaging library for Python.
diff --git a/README.rst b/README.rst
index 3371801..642dddd 100644
--- a/README.rst
+++ b/README.rst
@@ -4,7 +4,7 @@
kombu - Messaging library for Python
========================================
-:Version: 3.0.30
+:Version: 3.0.32
`Kombu` is a messaging library for Python.
diff --git a/docs/changelog.rst b/docs/changelog.rst
index 36006c1..a5ef237 100644
--- a/docs/changelog.rst
+++ b/docs/changelog.rst
@@ -4,11 +4,33 @@
Change history
================
+.. _version-3.0.32:
+
+3.0.32
+======
+:release-date: 2015-12-16 02:29 P.M PST
+:release-by: Ask Solem
+
+- Redis: Fixed bug introduced in 3.0.31 where the redis transport always
+ connects to localhost, regardless of host setting.
+
+.. _version-3.0.31:
+
+3.0.31
+======
+:release-date: 2015-12-16 12:00 P.M PST
+:release-by: Ask Solem
+
+- Redis: Fixed bug introduced in 3.0.30 where socket was prematurely
+ disconnected.
+
+- Hub: Removed debug logging message: "Deregistered fd..." (Issue #549).
+
.. _version-3.0.30:
3.0.30
======
-:release-date: 2015-12-07 12:28 A.M PDT
+:release-date: 2015-12-07 12:28 A.M PST
:release-by: Ask Solem
- Fixes compatiblity with uuid in Python 2.7.11 and 3.5.1.
diff --git a/docs/introduction.rst b/docs/introduction.rst
index 3371801..642dddd 100644
--- a/docs/introduction.rst
+++ b/docs/introduction.rst
@@ -4,7 +4,7 @@
kombu - Messaging library for Python
========================================
-:Version: 3.0.30
+:Version: 3.0.32
`Kombu` is a messaging library for Python.
diff --git a/kombu.egg-info/PKG-INFO b/kombu.egg-info/PKG-INFO
index 0e012ea..5d7ec1f 100644
--- a/kombu.egg-info/PKG-INFO
+++ b/kombu.egg-info/PKG-INFO
@@ -1,6 +1,6 @@
Metadata-Version: 1.1
Name: kombu
-Version: 3.0.30
+Version: 3.0.32
Summary: Messaging library for Python
Home-page: http://kombu.readthedocs.org
Author: Ask Solem
@@ -12,7 +12,7 @@ Description: .. _kombu-index:
kombu - Messaging library for Python
========================================
- :Version: 3.0.30
+ :Version: 3.0.32
`Kombu` is a messaging library for Python.
diff --git a/kombu/__init__.py b/kombu/__init__.py
index 3e54569..7105102 100644
--- a/kombu/__init__.py
+++ b/kombu/__init__.py
@@ -11,7 +11,7 @@ version_info_t = namedtuple(
'version_info_t', ('major', 'minor', 'micro', 'releaselevel', 'serial'),
)
-VERSION = version_info_t(3, 0, 30, '', '')
+VERSION = version_info_t(3, 0, 32, '', '')
__version__ = '{0.major}.{0.minor}.{0.micro}{0.releaselevel}'.format(VERSION)
__author__ = 'Ask Solem'
__contact__ = 'ask at celeryproject.org'
diff --git a/kombu/async/hub.py b/kombu/async/hub.py
index 551f351..66067bf 100644
--- a/kombu/async/hub.py
+++ b/kombu/async/hub.py
@@ -34,10 +34,6 @@ W_UNKNOWN_EVENT = """\
Received unknown event %r for fd %r, please contact support!\
"""
-W_EVENT_UNREGISTERED = """\
-Deregistered fd %r from event loop without registered callbacks.\
-"""
-
class Stop(BaseException):
"""Stops the event loop."""
@@ -324,7 +320,6 @@ class Hub(object):
pass
if cb is None:
- logger.info(W_EVENT_UNREGISTERED, fd)
self.remove(fd)
continue
diff --git a/kombu/tests/transport/test_redis.py b/kombu/tests/transport/test_redis.py
index ae79d52..46087a4 100644
--- a/kombu/tests/transport/test_redis.py
+++ b/kombu/tests/transport/test_redis.py
@@ -202,10 +202,10 @@ class Pipeline(object):
class Channel(redis.Channel):
- def _get_client(self):
+ def _get_async_client(self):
return Client
- def _get_pool(self):
+ def _get_pool(self, async=False):
return Mock()
def _get_response_error(self):
@@ -280,7 +280,7 @@ class test_Channel(Case):
self._pool = pool_at_init[0]
super(XChannel, self).__init__(*args, **kwargs)
- def _get_client(self):
+ def _get_async_client(self):
return lambda *_, **__: client
class XTransport(Transport):
@@ -302,9 +302,9 @@ class test_Channel(Case):
self.channel._pool = None
self.channel._after_fork()
- self.channel._pool = Mock(name='pool')
+ pool = self.channel._pool = Mock(name='pool')
self.channel._after_fork()
- self.channel._pool.disconnect.assert_called_with()
+ pool.disconnect.assert_called_with()
def test_next_delivery_tag(self):
self.assertNotEqual(
@@ -662,16 +662,16 @@ class test_Channel(Case):
self.channel._rotate_cycle('elaine')
@skip_if_not_module('redis')
- def test_get_client(self):
+ def test_get_async_client(self):
import redis as R
- KombuRedis = redis.Channel._get_client(self.channel)
+ KombuRedis = redis.Channel._get_async_client(self.channel)
self.assertTrue(KombuRedis)
Rv = getattr(R, 'VERSION', None)
try:
R.VERSION = (2, 4, 0)
with self.assertRaises(VersionMismatch):
- redis.Channel._get_client(self.channel)
+ redis.Channel._get_async_client(self.channel)
finally:
if Rv is not None:
R.VERSION = Rv
@@ -693,11 +693,10 @@ class test_Channel(Case):
self.channel._in_poll = True
self.channel._pool = Mock()
cc = self.channel._create_client = Mock()
- client = cc.return_value = Mock()
+ cc.return_value = Mock()
with self.channel.conn_or_acquire():
pass
- self.channel.pool.release.assert_called_with(client.connection)
cc.assert_called_with()
def test_register_with_event_loop(self):
@@ -908,7 +907,7 @@ class test_Redis(Case):
channel._get('does-not-exist')
channel.close()
- def test_get_client(self):
+ def test_get_async_client(self):
myredis, exceptions = _redis_modules()
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py
index 35e1ca9..5005854 100644
--- a/kombu/transport/redis.py
+++ b/kombu/transport/redis.py
@@ -407,7 +407,10 @@ class Channel(virtual.Channel):
#: and binding keys (like a topic exchange but using PUB/SUB).
#: This will be enabled by default in a future version.
fanout_patterns = False
+
+ _async_pool = None
_pool = None
+ _disconnecting_pools = False
from_transport_options = (
virtual.Channel.from_transport_options +
@@ -433,7 +436,8 @@ class Channel(virtual.Channel):
self.QoS = virtual.QoS
self._queue_cycle = []
- self.Client = self._get_client()
+ self.AsyncClient = self._get_async_client()
+ self.Client = redis.Redis
self.ResponseError = self._get_response_error()
self.active_fanout_queues = set()
self.auto_delete_queues = set()
@@ -452,8 +456,7 @@ class Channel(virtual.Channel):
try:
self.client.info()
except Exception:
- if self._pool:
- self._pool.disconnect()
+ self._disconnect_pools()
raise
self.connection.cycle.add(self) # add to channel poller.
@@ -464,14 +467,26 @@ class Channel(virtual.Channel):
register_after_fork(self, self._after_fork)
def _after_fork(self):
- if self._pool is not None:
- self._pool.disconnect()
+ self._disconnect_pools()
+
+ def _disconnect_pools(self):
+ if not self._disconnecting_pools:
+ self._disconnecting_pools = True
+ try:
+ if self._async_pool is not None:
+ self._async_pool.disconnect()
+ if self._pool is not None:
+ self._pool.disconnect()
+ self._async_pool = self._pool = None
+ finally:
+ self._disconnecting_pools = False
def _on_connection_disconnect(self, connection):
self._in_poll = False
self._in_listen = False
if self.connection and self.connection.cycle:
self.connection.cycle._on_connection_disconnect(connection)
+ self._disconnect_pools()
if not self._closing:
raise get_redis_ConnectionError()
@@ -749,8 +764,7 @@ class Channel(virtual.Channel):
def close(self):
self._closing = True
- if self._pool:
- self._pool.disconnect()
+ self._disconnect_pools()
if not self.closed:
# remove from channel poller.
self.connection.cycle.discard(self)
@@ -787,7 +801,7 @@ class Channel(virtual.Channel):
))
return vhost
- def _connparams(self):
+ def _connparams(self, async=False):
conninfo = self.connection.client
connparams = {'host': conninfo.hostname or '127.0.0.1',
'port': conninfo.port or DEFAULT_PORT,
@@ -814,38 +828,41 @@ class Channel(virtual.Channel):
redis.Connection
)
- class Connection(connection_cls):
- def disconnect(self):
- channel._on_connection_disconnect(self)
- super(Connection, self).disconnect()
- connparams['connection_class'] = Connection
+ if async:
+ class Connection(connection_cls):
+ def disconnect(self):
+ super(Connection, self).disconnect()
+ channel._on_connection_disconnect(self)
+ connparams['connection_class'] = Connection
return connparams
- def _create_client(self):
+ def _create_client(self, async=False):
+ if async:
+ return self.AsyncClient(connection_pool=self.async_pool)
return self.Client(connection_pool=self.pool)
- def _get_pool(self):
- params = self._connparams()
+ def _get_pool(self, async=False):
+ params = self._connparams(async=async)
self.keyprefix_fanout = self.keyprefix_fanout.format(db=params['db'])
return redis.ConnectionPool(**params)
- def _get_client(self):
+ def _get_async_client(self):
if redis.VERSION < (2, 4, 4):
raise VersionMismatch(
'Redis transport requires redis-py versions 2.4.4 or later. '
'You have {0.__version__}'.format(redis))
- # KombuRedis maintains a connection attribute on it's instance and
+ # AsyncRedis maintains a connection attribute on it's instance and
# uses that when executing commands
# This was added after redis-py was changed.
- class KombuRedis(redis.Redis): # pragma: no cover
+ class AsyncRedis(redis.Redis): # pragma: no cover
def __init__(self, *args, **kwargs):
- super(KombuRedis, self).__init__(*args, **kwargs)
+ super(AsyncRedis, self).__init__(*args, **kwargs)
self.connection = self.connection_pool.get_connection('_')
- return KombuRedis
+ return AsyncRedis
@contextmanager
def conn_or_acquire(self, client=None):
@@ -853,11 +870,7 @@ class Channel(virtual.Channel):
yield client
else:
if self._in_poll:
- client = self._create_client()
- try:
- yield client
- finally:
- self.pool.release(client.connection)
+ yield self._create_client()
else:
yield self.client
@@ -867,15 +880,21 @@ class Channel(virtual.Channel):
self._pool = self._get_pool()
return self._pool
+ @property
+ def async_pool(self):
+ if self._async_pool is None:
+ self._async_pool = self._get_pool(async=True)
+ return self._async_pool
+
@cached_property
def client(self):
"""Client used to publish messages, BRPOP etc."""
- return self._create_client()
+ return self._create_client(async=True)
@cached_property
def subclient(self):
"""Pub/Sub connection used to consume fanout queues."""
- client = self._create_client()
+ client = self._create_client(async=True)
pubsub = client.pubsub()
pool = pubsub.connection_pool
pubsub.connection = pool.get_connection('pubsub', pubsub.shard_hint)
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/kombu.git
More information about the Python-modules-commits
mailing list