[Python-modules-commits] [kombu] 01/05: Import kombu_3.0.32.orig.tar.gz

Michael Fladischer fladi at moszumanska.debian.org
Sat Jan 2 16:25:45 UTC 2016


This is an automated email from the git hooks/post-receive script.

fladi pushed a commit to branch master
in repository kombu.

commit 9c5b4b686c6d376e02ccf6f1c5229251b673abd3
Author: Michael Fladischer <FladischerMichael at fladi.at>
Date:   Sat Jan 2 16:56:52 2016 +0100

    Import kombu_3.0.32.orig.tar.gz
---
 Changelog                           | 24 +++++++++++-
 PKG-INFO                            |  4 +-
 README.rst                          |  2 +-
 docs/changelog.rst                  | 24 +++++++++++-
 docs/introduction.rst               |  2 +-
 kombu.egg-info/PKG-INFO             |  4 +-
 kombu/__init__.py                   |  2 +-
 kombu/async/hub.py                  |  5 ---
 kombu/tests/transport/test_redis.py | 21 +++++------
 kombu/transport/redis.py            | 75 +++++++++++++++++++++++--------------
 10 files changed, 110 insertions(+), 53 deletions(-)

diff --git a/Changelog b/Changelog
index 36006c1..a5ef237 100644
--- a/Changelog
+++ b/Changelog
@@ -4,11 +4,33 @@
  Change history
 ================
 
+.. _version-3.0.32:
+
+3.0.32
+======
+:release-date: 2015-12-16 02:29 P.M PST
+:release-by: Ask Solem
+
+- Redis: Fixed bug introduced in 3.0.31 where the redis transport always
+  connects to localhost, regardless of host setting.
+
+.. _version-3.0.31:
+
+3.0.31
+======
+:release-date: 2015-12-16 12:00 P.M PST
+:release-by: Ask Solem
+
+- Redis: Fixed bug introduced in 3.0.30 where socket was prematurely
+  disconnected.
+
+- Hub: Removed debug logging message: "Deregistered fd..." (Issue #549).
+
 .. _version-3.0.30:
 
 3.0.30
 ======
-:release-date: 2015-12-07 12:28 A.M PDT
+:release-date: 2015-12-07 12:28 A.M PST
 :release-by: Ask Solem
 
 - Fixes compatiblity with uuid in Python 2.7.11 and 3.5.1.
diff --git a/PKG-INFO b/PKG-INFO
index 0e012ea..5d7ec1f 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,6 +1,6 @@
 Metadata-Version: 1.1
 Name: kombu
-Version: 3.0.30
+Version: 3.0.32
 Summary: Messaging library for Python
 Home-page: http://kombu.readthedocs.org
 Author: Ask Solem
@@ -12,7 +12,7 @@ Description: .. _kombu-index:
          kombu - Messaging library for Python
         ========================================
         
-        :Version: 3.0.30
+        :Version: 3.0.32
         
         `Kombu` is a messaging library for Python.
         
diff --git a/README.rst b/README.rst
index 3371801..642dddd 100644
--- a/README.rst
+++ b/README.rst
@@ -4,7 +4,7 @@
  kombu - Messaging library for Python
 ========================================
 
-:Version: 3.0.30
+:Version: 3.0.32
 
 `Kombu` is a messaging library for Python.
 
diff --git a/docs/changelog.rst b/docs/changelog.rst
index 36006c1..a5ef237 100644
--- a/docs/changelog.rst
+++ b/docs/changelog.rst
@@ -4,11 +4,33 @@
  Change history
 ================
 
+.. _version-3.0.32:
+
+3.0.32
+======
+:release-date: 2015-12-16 02:29 P.M PST
+:release-by: Ask Solem
+
+- Redis: Fixed bug introduced in 3.0.31 where the redis transport always
+  connects to localhost, regardless of host setting.
+
+.. _version-3.0.31:
+
+3.0.31
+======
+:release-date: 2015-12-16 12:00 P.M PST
+:release-by: Ask Solem
+
+- Redis: Fixed bug introduced in 3.0.30 where socket was prematurely
+  disconnected.
+
+- Hub: Removed debug logging message: "Deregistered fd..." (Issue #549).
+
 .. _version-3.0.30:
 
 3.0.30
 ======
-:release-date: 2015-12-07 12:28 A.M PDT
+:release-date: 2015-12-07 12:28 A.M PST
 :release-by: Ask Solem
 
 - Fixes compatiblity with uuid in Python 2.7.11 and 3.5.1.
diff --git a/docs/introduction.rst b/docs/introduction.rst
index 3371801..642dddd 100644
--- a/docs/introduction.rst
+++ b/docs/introduction.rst
@@ -4,7 +4,7 @@
  kombu - Messaging library for Python
 ========================================
 
-:Version: 3.0.30
+:Version: 3.0.32
 
 `Kombu` is a messaging library for Python.
 
diff --git a/kombu.egg-info/PKG-INFO b/kombu.egg-info/PKG-INFO
index 0e012ea..5d7ec1f 100644
--- a/kombu.egg-info/PKG-INFO
+++ b/kombu.egg-info/PKG-INFO
@@ -1,6 +1,6 @@
 Metadata-Version: 1.1
 Name: kombu
-Version: 3.0.30
+Version: 3.0.32
 Summary: Messaging library for Python
 Home-page: http://kombu.readthedocs.org
 Author: Ask Solem
@@ -12,7 +12,7 @@ Description: .. _kombu-index:
          kombu - Messaging library for Python
         ========================================
         
-        :Version: 3.0.30
+        :Version: 3.0.32
         
         `Kombu` is a messaging library for Python.
         
diff --git a/kombu/__init__.py b/kombu/__init__.py
index 3e54569..7105102 100644
--- a/kombu/__init__.py
+++ b/kombu/__init__.py
@@ -11,7 +11,7 @@ version_info_t = namedtuple(
     'version_info_t', ('major', 'minor', 'micro', 'releaselevel', 'serial'),
 )
 
-VERSION = version_info_t(3, 0, 30, '', '')
+VERSION = version_info_t(3, 0, 32, '', '')
 __version__ = '{0.major}.{0.minor}.{0.micro}{0.releaselevel}'.format(VERSION)
 __author__ = 'Ask Solem'
 __contact__ = 'ask at celeryproject.org'
diff --git a/kombu/async/hub.py b/kombu/async/hub.py
index 551f351..66067bf 100644
--- a/kombu/async/hub.py
+++ b/kombu/async/hub.py
@@ -34,10 +34,6 @@ W_UNKNOWN_EVENT = """\
 Received unknown event %r for fd %r, please contact support!\
 """
 
-W_EVENT_UNREGISTERED = """\
-Deregistered fd %r from event loop without registered callbacks.\
-"""
-
 
 class Stop(BaseException):
     """Stops the event loop."""
@@ -324,7 +320,6 @@ class Hub(object):
                             pass
 
                     if cb is None:
-                        logger.info(W_EVENT_UNREGISTERED, fd)
                         self.remove(fd)
                         continue
 
diff --git a/kombu/tests/transport/test_redis.py b/kombu/tests/transport/test_redis.py
index ae79d52..46087a4 100644
--- a/kombu/tests/transport/test_redis.py
+++ b/kombu/tests/transport/test_redis.py
@@ -202,10 +202,10 @@ class Pipeline(object):
 
 class Channel(redis.Channel):
 
-    def _get_client(self):
+    def _get_async_client(self):
         return Client
 
-    def _get_pool(self):
+    def _get_pool(self, async=False):
         return Mock()
 
     def _get_response_error(self):
@@ -280,7 +280,7 @@ class test_Channel(Case):
                 self._pool = pool_at_init[0]
                 super(XChannel, self).__init__(*args, **kwargs)
 
-            def _get_client(self):
+            def _get_async_client(self):
                 return lambda *_, **__: client
 
         class XTransport(Transport):
@@ -302,9 +302,9 @@ class test_Channel(Case):
         self.channel._pool = None
         self.channel._after_fork()
 
-        self.channel._pool = Mock(name='pool')
+        pool = self.channel._pool = Mock(name='pool')
         self.channel._after_fork()
-        self.channel._pool.disconnect.assert_called_with()
+        pool.disconnect.assert_called_with()
 
     def test_next_delivery_tag(self):
         self.assertNotEqual(
@@ -662,16 +662,16 @@ class test_Channel(Case):
         self.channel._rotate_cycle('elaine')
 
     @skip_if_not_module('redis')
-    def test_get_client(self):
+    def test_get_async_client(self):
         import redis as R
-        KombuRedis = redis.Channel._get_client(self.channel)
+        KombuRedis = redis.Channel._get_async_client(self.channel)
         self.assertTrue(KombuRedis)
 
         Rv = getattr(R, 'VERSION', None)
         try:
             R.VERSION = (2, 4, 0)
             with self.assertRaises(VersionMismatch):
-                redis.Channel._get_client(self.channel)
+                redis.Channel._get_async_client(self.channel)
         finally:
             if Rv is not None:
                 R.VERSION = Rv
@@ -693,11 +693,10 @@ class test_Channel(Case):
         self.channel._in_poll = True
         self.channel._pool = Mock()
         cc = self.channel._create_client = Mock()
-        client = cc.return_value = Mock()
+        cc.return_value = Mock()
 
         with self.channel.conn_or_acquire():
             pass
-        self.channel.pool.release.assert_called_with(client.connection)
         cc.assert_called_with()
 
     def test_register_with_event_loop(self):
@@ -908,7 +907,7 @@ class test_Redis(Case):
             channel._get('does-not-exist')
         channel.close()
 
-    def test_get_client(self):
+    def test_get_async_client(self):
 
         myredis, exceptions = _redis_modules()
 
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py
index 35e1ca9..5005854 100644
--- a/kombu/transport/redis.py
+++ b/kombu/transport/redis.py
@@ -407,7 +407,10 @@ class Channel(virtual.Channel):
     #: and binding keys (like a topic exchange but using PUB/SUB).
     #: This will be enabled by default in a future version.
     fanout_patterns = False
+
+    _async_pool = None
     _pool = None
+    _disconnecting_pools = False
 
     from_transport_options = (
         virtual.Channel.from_transport_options +
@@ -433,7 +436,8 @@ class Channel(virtual.Channel):
             self.QoS = virtual.QoS
 
         self._queue_cycle = []
-        self.Client = self._get_client()
+        self.AsyncClient = self._get_async_client()
+        self.Client = redis.Redis
         self.ResponseError = self._get_response_error()
         self.active_fanout_queues = set()
         self.auto_delete_queues = set()
@@ -452,8 +456,7 @@ class Channel(virtual.Channel):
         try:
             self.client.info()
         except Exception:
-            if self._pool:
-                self._pool.disconnect()
+            self._disconnect_pools()
             raise
 
         self.connection.cycle.add(self)  # add to channel poller.
@@ -464,14 +467,26 @@ class Channel(virtual.Channel):
         register_after_fork(self, self._after_fork)
 
     def _after_fork(self):
-        if self._pool is not None:
-            self._pool.disconnect()
+        self._disconnect_pools()
+
+    def _disconnect_pools(self):
+        if not self._disconnecting_pools:
+            self._disconnecting_pools = True
+            try:
+                if self._async_pool is not None:
+                    self._async_pool.disconnect()
+                if self._pool is not None:
+                    self._pool.disconnect()
+                self._async_pool = self._pool = None
+            finally:
+                self._disconnecting_pools = False
 
     def _on_connection_disconnect(self, connection):
         self._in_poll = False
         self._in_listen = False
         if self.connection and self.connection.cycle:
             self.connection.cycle._on_connection_disconnect(connection)
+        self._disconnect_pools()
         if not self._closing:
             raise get_redis_ConnectionError()
 
@@ -749,8 +764,7 @@ class Channel(virtual.Channel):
 
     def close(self):
         self._closing = True
-        if self._pool:
-            self._pool.disconnect()
+        self._disconnect_pools()
         if not self.closed:
             # remove from channel poller.
             self.connection.cycle.discard(self)
@@ -787,7 +801,7 @@ class Channel(virtual.Channel):
                     ))
         return vhost
 
-    def _connparams(self):
+    def _connparams(self, async=False):
         conninfo = self.connection.client
         connparams = {'host': conninfo.hostname or '127.0.0.1',
                       'port': conninfo.port or DEFAULT_PORT,
@@ -814,38 +828,41 @@ class Channel(virtual.Channel):
             redis.Connection
         )
 
-        class Connection(connection_cls):
-            def disconnect(self):
-                channel._on_connection_disconnect(self)
-                super(Connection, self).disconnect()
-        connparams['connection_class'] = Connection
+        if async:
+            class Connection(connection_cls):
+                def disconnect(self):
+                    super(Connection, self).disconnect()
+                    channel._on_connection_disconnect(self)
+            connparams['connection_class'] = Connection
 
         return connparams
 
-    def _create_client(self):
+    def _create_client(self, async=False):
+        if async:
+            return self.AsyncClient(connection_pool=self.async_pool)
         return self.Client(connection_pool=self.pool)
 
-    def _get_pool(self):
-        params = self._connparams()
+    def _get_pool(self, async=False):
+        params = self._connparams(async=async)
         self.keyprefix_fanout = self.keyprefix_fanout.format(db=params['db'])
         return redis.ConnectionPool(**params)
 
-    def _get_client(self):
+    def _get_async_client(self):
         if redis.VERSION < (2, 4, 4):
             raise VersionMismatch(
                 'Redis transport requires redis-py versions 2.4.4 or later. '
                 'You have {0.__version__}'.format(redis))
 
-        # KombuRedis maintains a connection attribute on it's instance and
+        # AsyncRedis maintains a connection attribute on it's instance and
         # uses that when executing commands
         # This was added after redis-py was changed.
-        class KombuRedis(redis.Redis):  # pragma: no cover
+        class AsyncRedis(redis.Redis):  # pragma: no cover
 
             def __init__(self, *args, **kwargs):
-                super(KombuRedis, self).__init__(*args, **kwargs)
+                super(AsyncRedis, self).__init__(*args, **kwargs)
                 self.connection = self.connection_pool.get_connection('_')
 
-        return KombuRedis
+        return AsyncRedis
 
     @contextmanager
     def conn_or_acquire(self, client=None):
@@ -853,11 +870,7 @@ class Channel(virtual.Channel):
             yield client
         else:
             if self._in_poll:
-                client = self._create_client()
-                try:
-                    yield client
-                finally:
-                    self.pool.release(client.connection)
+                yield self._create_client()
             else:
                 yield self.client
 
@@ -867,15 +880,21 @@ class Channel(virtual.Channel):
             self._pool = self._get_pool()
         return self._pool
 
+    @property
+    def async_pool(self):
+        if self._async_pool is None:
+            self._async_pool = self._get_pool(async=True)
+        return self._async_pool
+
     @cached_property
     def client(self):
         """Client used to publish messages, BRPOP etc."""
-        return self._create_client()
+        return self._create_client(async=True)
 
     @cached_property
     def subclient(self):
         """Pub/Sub connection used to consume fanout queues."""
-        client = self._create_client()
+        client = self._create_client(async=True)
         pubsub = client.pubsub()
         pool = pubsub.connection_pool
         pubsub.connection = pool.get_connection('pubsub', pubsub.shard_hint)

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/kombu.git



More information about the Python-modules-commits mailing list