[Python-modules-commits] [kombu] 01/05: Import kombu_3.0.33.orig.tar.gz

Brian May bam at moszumanska.debian.org
Mon Jan 18 07:34:52 UTC 2016


This is an automated email from the git hooks/post-receive script.

bam pushed a commit to branch master
in repository kombu.

commit 5bdc5a4c59a815f0c189f1276b53bf0f8c5d1092
Author: Brian May <bam at debian.org>
Date:   Mon Jan 18 16:27:28 2016 +1100

    Import kombu_3.0.33.orig.tar.gz
---
 Changelog                           |  15 +++
 PKG-INFO                            |   4 +-
 README.rst                          |   2 +-
 docs/changelog.rst                  |  15 +++
 docs/introduction.rst               |   2 +-
 kombu.egg-info/PKG-INFO             |   4 +-
 kombu.egg-info/requires.txt         |   2 +-
 kombu/__init__.py                   |   2 +-
 kombu/five.py                       |   9 +-
 kombu/tests/transport/test_qpid.py  | 227 +++++-------------------------------
 kombu/tests/transport/test_redis.py |  84 +++++++------
 kombu/transport/qpid.py             | 105 ++---------------
 kombu/transport/redis.py            |   5 +-
 requirements/default.txt            |   2 +-
 14 files changed, 136 insertions(+), 342 deletions(-)

diff --git a/Changelog b/Changelog
index a5ef237..637c612 100644
--- a/Changelog
+++ b/Changelog
@@ -4,6 +4,21 @@
  Change history
 ================
 
+.. _version-3.0.33:
+
+3.0.33
+======
+:release-date: 2016-01-08 06:36 P.M PST
+:release-by: Ask Solem
+
+- Now depends on :mod:`amqp` 1.4.9.
+
+- Redis: Fixed problem with auxilliary connections causing the main
+  consumer connection to be closed (Issue #550).
+
+- Qpid: No longer uses threads to operate, to ensure compatibility with
+  all environments (Issue #531).
+
 .. _version-3.0.32:
 
 3.0.32
diff --git a/PKG-INFO b/PKG-INFO
index 5d7ec1f..0963536 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,6 +1,6 @@
 Metadata-Version: 1.1
 Name: kombu
-Version: 3.0.32
+Version: 3.0.33
 Summary: Messaging library for Python
 Home-page: http://kombu.readthedocs.org
 Author: Ask Solem
@@ -12,7 +12,7 @@ Description: .. _kombu-index:
          kombu - Messaging library for Python
         ========================================
         
-        :Version: 3.0.32
+        :Version: 3.0.33
         
         `Kombu` is a messaging library for Python.
         
diff --git a/README.rst b/README.rst
index 642dddd..891984b 100644
--- a/README.rst
+++ b/README.rst
@@ -4,7 +4,7 @@
  kombu - Messaging library for Python
 ========================================
 
-:Version: 3.0.32
+:Version: 3.0.33
 
 `Kombu` is a messaging library for Python.
 
diff --git a/docs/changelog.rst b/docs/changelog.rst
index a5ef237..637c612 100644
--- a/docs/changelog.rst
+++ b/docs/changelog.rst
@@ -4,6 +4,21 @@
  Change history
 ================
 
+.. _version-3.0.33:
+
+3.0.33
+======
+:release-date: 2016-01-08 06:36 P.M PST
+:release-by: Ask Solem
+
+- Now depends on :mod:`amqp` 1.4.9.
+
+- Redis: Fixed problem with auxilliary connections causing the main
+  consumer connection to be closed (Issue #550).
+
+- Qpid: No longer uses threads to operate, to ensure compatibility with
+  all environments (Issue #531).
+
 .. _version-3.0.32:
 
 3.0.32
diff --git a/docs/introduction.rst b/docs/introduction.rst
index 642dddd..891984b 100644
--- a/docs/introduction.rst
+++ b/docs/introduction.rst
@@ -4,7 +4,7 @@
  kombu - Messaging library for Python
 ========================================
 
-:Version: 3.0.32
+:Version: 3.0.33
 
 `Kombu` is a messaging library for Python.
 
diff --git a/kombu.egg-info/PKG-INFO b/kombu.egg-info/PKG-INFO
index 5d7ec1f..0963536 100644
--- a/kombu.egg-info/PKG-INFO
+++ b/kombu.egg-info/PKG-INFO
@@ -1,6 +1,6 @@
 Metadata-Version: 1.1
 Name: kombu
-Version: 3.0.32
+Version: 3.0.33
 Summary: Messaging library for Python
 Home-page: http://kombu.readthedocs.org
 Author: Ask Solem
@@ -12,7 +12,7 @@ Description: .. _kombu-index:
          kombu - Messaging library for Python
         ========================================
         
-        :Version: 3.0.32
+        :Version: 3.0.33
         
         `Kombu` is a messaging library for Python.
         
diff --git a/kombu.egg-info/requires.txt b/kombu.egg-info/requires.txt
index b2f1e7a..a51256f 100644
--- a/kombu.egg-info/requires.txt
+++ b/kombu.egg-info/requires.txt
@@ -1,5 +1,5 @@
 anyjson>=0.3.3
-amqp>=1.4.7,<2.0
+amqp>=1.4.9,<2.0
 
 [sqlalchemy]
 sqlalchemy
diff --git a/kombu/__init__.py b/kombu/__init__.py
index 7105102..42a8590 100644
--- a/kombu/__init__.py
+++ b/kombu/__init__.py
@@ -11,7 +11,7 @@ version_info_t = namedtuple(
     'version_info_t', ('major', 'minor', 'micro', 'releaselevel', 'serial'),
 )
 
-VERSION = version_info_t(3, 0, 32, '', '')
+VERSION = version_info_t(3, 0, 33, '', '')
 __version__ = '{0.major}.{0.minor}.{0.micro}{0.releaselevel}'.format(VERSION)
 __author__ = 'Ask Solem'
 __contact__ = 'ask at celeryproject.org'
diff --git a/kombu/five.py b/kombu/five.py
index 06cbd91..d36f252 100644
--- a/kombu/five.py
+++ b/kombu/five.py
@@ -41,13 +41,12 @@ if sys.version_info < (3, 3):
     import platform
     SYSTEM = platform.system()
 
-    has_ctypes = True
     try:
         import ctypes
-    except ImportError:
-        has_ctypes = False
+    except ImportError:  # pragma: no cover
+        ctypes = None  # noqa
 
-    if SYSTEM == 'Darwin' and has_ctypes:
+    if SYSTEM == 'Darwin' and ctypes is not None:
         from ctypes.util import find_library
         libSystem = ctypes.CDLL(find_library('libSystem.dylib'))
         CoreServices = ctypes.CDLL(find_library('CoreServices'),
@@ -61,7 +60,7 @@ if sys.version_info < (3, 3):
         def _monotonic():
             return absolute_to_nanoseconds(mach_absolute_time()) * 1e-9
 
-    elif SYSTEM == 'Linux' and has_ctypes:
+    elif SYSTEM == 'Linux' and ctypes is not None:
         # from stackoverflow:
         # questions/1205722/how-do-i-get-monotonic-time-durations-in-python
         import os
diff --git a/kombu/tests/transport/test_qpid.py b/kombu/tests/transport/test_qpid.py
index 38a8f51..4131193 100644
--- a/kombu/tests/transport/test_qpid.py
+++ b/kombu/tests/transport/test_qpid.py
@@ -4,7 +4,6 @@ import select
 import ssl
 import socket
 import sys
-import threading
 import time
 
 from collections import Callable
@@ -16,7 +15,7 @@ from mock import call
 from kombu.five import Empty, keys, range, monotonic
 from kombu.transport.qpid import (AuthenticationFailure, Channel, Connection,
                                   ConnectionError, Message, NotFound, QoS,
-                                  ReceiversMonitor, Transport)
+                                  Transport)
 from kombu.transport.virtual import Base64
 from kombu.tests.case import Case, Mock, case_no_pypy, case_no_python3
 from kombu.tests.case import patch
@@ -855,9 +854,9 @@ class TestChannelQueueDelete(ChannelTestBase):
         self.mock_queue = Mock()
 
     def tearDown(self):
-        self.mock__has_queue.stop()
-        self.mock__size.stop()
-        self.mock__delete.stop()
+        self.patch__has_queue.stop()
+        self.patch__size.stop()
+        self.patch__delete.stop()
         super(TestChannelQueueDelete, self).tearDown()
 
     def test_checks_if_queue_exists(self):
@@ -1397,164 +1396,6 @@ class TestChannel(ExtraAssertionsMixin, Case):
 
 @case_no_python3
 @case_no_pypy
-class ReceiversMonitorTestBase(Case):
-
-    def setUp(self):
-        self.mock_session = Mock()
-        self.mock_w = Mock()
-        self.monitor = ReceiversMonitor(self.mock_session, self.mock_w)
-
-
- at case_no_python3
- at case_no_pypy
-class TestReceiversMonitorType(ReceiversMonitorTestBase):
-
-    def test_qpid_messaging_receivers_monitor_subclass_of_threading(self):
-        self.assertIsInstance(self.monitor, threading.Thread)
-
-
- at case_no_python3
- at case_no_pypy
-class TestReceiversMonitorInit(ReceiversMonitorTestBase):
-
-    def setUp(self):
-        thread___init___str = QPID_MODULE + '.threading.Thread.__init__'
-        self.patch_parent___init__ = patch(thread___init___str)
-        self.mock_Thread___init__ = self.patch_parent___init__.start()
-        super(TestReceiversMonitorInit, self).setUp()
-
-    def tearDown(self):
-        self.patch_parent___init__.stop()
-
-    def test_qpid_messaging_receivers_monitor_init_saves_session(self):
-        self.assertIs(self.monitor._session, self.mock_session)
-
-    def test_qpid_messaging_receivers_monitor_init_saves_fd(self):
-        self.assertIs(self.monitor._w_fd, self.mock_w)
-
-    def test_qpid_messaging_Receivers_monitor_init_calls_parent__init__(self):
-        self.mock_Thread___init__.assert_called_once_with()
-
-
- at case_no_python3
- at case_no_pypy
-class TestReceiversMonitorRun(ReceiversMonitorTestBase):
-
-    @patch.object(ReceiversMonitor, 'monitor_receivers')
-    @patch(QPID_MODULE + '.time.sleep')
-    def test_receivers_monitor_run_calls_monitor_receivers(
-            self, mock_sleep, mock_monitor_receivers):
-        mock_sleep.side_effect = BreakOutException()
-        with self.assertRaises(BreakOutException):
-            self.monitor.run()
-        mock_monitor_receivers.assert_called_once_with()
-
-    @patch(QPID_MODULE + '.SessionClosed', new=QpidException)
-    @patch.object(ReceiversMonitor, 'monitor_receivers')
-    @patch(QPID_MODULE + '.time.sleep')
-    def test_receivers_monitor_run_exits_on_session_closed(
-            self, mock_sleep, mock_monitor_receivers):
-        mock_monitor_receivers.side_effect = QpidException()
-        try:
-            self.monitor.run()
-        except Exception:
-            self.fail('No exception should be raised here')
-        mock_monitor_receivers.assert_called_once_with()
-        mock_sleep.has_calls([])
-
-    @patch.object(Transport, 'connection_errors', new=(BreakOutException, ))
-    @patch.object(ReceiversMonitor, 'monitor_receivers')
-    @patch(QPID_MODULE + '.time.sleep')
-    @patch(QPID_MODULE + '.logger')
-    def test_receivers_monitors_run_calls_logs_exception_and_sleeps(
-            self, mock_logger, mock_sleep, mock_monitor_receivers):
-        exc_to_raise = IOError()
-        mock_monitor_receivers.side_effect = exc_to_raise
-        mock_sleep.side_effect = BreakOutException()
-        with self.assertRaises(BreakOutException):
-            self.monitor.run()
-        mock_logger.error.assert_called_once_with(exc_to_raise, exc_info=1)
-        mock_sleep.assert_called_once_with(10)
-
-    @patch.object(ReceiversMonitor, 'monitor_receivers')
-    @patch(QPID_MODULE + '.time.sleep')
-    def test_receivers_monitor_run_loops_when_exception_is_raised(
-            self, mock_sleep, mock_monitor_receivers):
-        def return_once_raise_on_second_call(*args):
-            mock_sleep.side_effect = BreakOutException()
-        mock_sleep.side_effect = return_once_raise_on_second_call
-        with self.assertRaises(BreakOutException):
-            self.monitor.run()
-        mock_monitor_receivers.has_calls([call(), call()])
-
-    @patch.object(Transport, 'recoverable_connection_errors',
-                  new=(QpidException, ))
-    @patch.object(ReceiversMonitor, 'monitor_receivers')
-    @patch(QPID_MODULE + '.time.sleep')
-    @patch(QPID_MODULE + '.logger')
-    @patch(QPID_MODULE + '.os.write')
-    @patch(QPID_MODULE + '.sys.exc_info')
-    def test_receivers_monitor_exits_when_recoverable_exception_raised(
-            self, mock_sys_exc_info, mock_os_write, mock_logger, mock_sleep,
-            mock_monitor_receivers):
-        mock_monitor_receivers.side_effect = QpidException()
-        mock_sleep.side_effect = BreakOutException()
-        self.monitor.run()
-        self.assertFalse(mock_logger.error.called)
-
-    @patch.object(Transport, 'recoverable_connection_errors',
-                  new=(QpidException, ))
-    @patch.object(ReceiversMonitor, 'monitor_receivers')
-    @patch(QPID_MODULE + '.time.sleep')
-    @patch(QPID_MODULE + '.logger')
-    @patch(QPID_MODULE + '.os.write')
-    def test_receivers_monitor_saves_exception_when_recoverable_exc_raised(
-            self, mock_os_write, mock_logger, mock_sleep,
-            mock_monitor_receivers):
-        mock_monitor_receivers.side_effect = QpidException()
-        mock_sleep.side_effect = BreakOutException()
-        self.monitor.run()
-        self.assertIs(
-            self.mock_session.saved_exception,
-            mock_monitor_receivers.side_effect,
-        )
-
-    @patch.object(Transport, 'recoverable_connection_errors',
-                  new=(QpidException, ))
-    @patch.object(ReceiversMonitor, 'monitor_receivers')
-    @patch(QPID_MODULE + '.time.sleep')
-    @patch(QPID_MODULE + '.logger')
-    @patch(QPID_MODULE + '.os.write')
-    @patch(QPID_MODULE + '.sys.exc_info')
-    def test_receivers_monitor_writes_e_to_pipe_when_recoverable_exc_raised(
-            self, mock_sys_exc_info, mock_os_write, mock_logger, mock_sleep,
-            mock_monitor_receivers):
-        mock_monitor_receivers.side_effect = QpidException()
-        mock_sleep.side_effect = BreakOutException()
-        self.monitor.run()
-        mock_os_write.assert_called_once_with(self.mock_w, 'e')
-
-
- at case_no_python3
- at case_no_pypy
-class TestReceiversMonitorMonitorReceivers(ReceiversMonitorTestBase):
-
-    def test_receivers_monitor_monitor_receivers_calls_next_receivers(self):
-        self.mock_session.next_receiver.side_effect = BreakOutException()
-        with self.assertRaises(BreakOutException):
-            self.monitor.monitor_receivers()
-        self.mock_session.next_receiver.assert_called_once_with()
-
-    def test_receivers_monitor_monitor_receivers_writes_to_fd(self):
-        with patch(QPID_MODULE + '.os.write') as mock_os_write:
-            mock_os_write.side_effect = BreakOutException()
-            with self.assertRaises(BreakOutException):
-                self.monitor.monitor_receivers()
-            mock_os_write.assert_called_once_with(self.mock_w, '0')
-
-
- at case_no_python3
- at case_no_pypy
 @disable_runtime_dependency_check
 class TestTransportInit(Case):
 
@@ -1699,12 +1540,6 @@ class TestTransportEstablishConnection(Case):
         self.transport = Transport(self.client)
         self.mock_conn = Mock()
         self.transport.Connection = self.mock_conn
-        path_to_mock = QPID_MODULE + '.ReceiversMonitor'
-        self.patcher = patch(path_to_mock)
-        self.mock_ReceiverMonitor = self.patcher.start()
-
-    def tearDown(self):
-        self.patcher.stop()
 
     def test_transport_establish_conn_new_option_overwrites_default(self):
         self.client.userid = 'new-userid'
@@ -1863,22 +1698,7 @@ class TestTransportEstablishConnection(Case):
         new_conn = self.transport.establish_connection()
         self.assertIs(new_conn, self.mock_conn.return_value)
 
-    def test_transport_establish_conn_creates_ReceiversMonitor(self):
-        self.transport.establish_connection()
-        self.mock_ReceiverMonitor.assert_called_once_with(
-            self.transport.session, self.transport._w,
-        )
-
-    def test_transport_establish_conn_daemonizes_thread(self):
-        self.transport.establish_connection()
-        self.assertTrue(self.mock_ReceiverMonitor.return_value.daemon)
-
-    def test_transport_establish_conn_starts_thread(self):
-        self.transport.establish_connection()
-        new_receiver_monitor = self.mock_ReceiverMonitor.return_value
-        new_receiver_monitor.start.assert_called_once_with()
-
-    def test_transport_establish_conn_ignores_hostname_if_not_localhost(self):
+    def test_transport_establish_conn_uses_hostname_if_not_default(self):
         self.client.hostname = 'some_other_hostname'
         self.transport.establish_connection()
         self.mock_conn.assert_called_once_with(
@@ -1889,6 +1709,14 @@ class TestTransportEstablishConnection(Case):
             transport='tcp',
         )
 
+    def test_transport_sets_qpid_message_received_handler(self):
+        self.transport.establish_connection()
+        qpid_conn = self.mock_conn.return_value.get_qpid_connection
+        new_mock_session = qpid_conn.return_value.session.return_value
+        mock_set_callback = new_mock_session.set_message_received_handler
+        expected_callback = self.transport._qpid_session_ready
+        mock_set_callback.assert_called_once_with(expected_callback)
+
 
 @case_no_python3
 @case_no_pypy
@@ -1941,6 +1769,24 @@ class TestTransportRegisterWithEventLoop(Case):
 @case_no_python3
 @case_no_pypy
 @disable_runtime_dependency_check
+class TestTransportQpidSessionReady(Case):
+
+    def setUp(self):
+        self.patch_a = patch(QPID_MODULE + '.os.write')
+        self.mock_os_write = self.patch_a.start()
+        self.transport = Transport(Mock())
+
+    def tearDown(self):
+        self.patch_a.stop()
+
+    def test_transport__qpid_session_ready_writes_symbol_to_fd(self):
+        self.transport._qpid_session_ready()
+        self.mock_os_write.assert_called_once_with(self.transport._w, '0')
+
+
+ at case_no_python3
+ at case_no_pypy
+ at disable_runtime_dependency_check
 class TestTransportOnReadable(Case):
 
     def setUp(self):
@@ -1952,6 +1798,7 @@ class TestTransportOnReadable(Case):
 
     def tearDown(self):
         self.patch_a.stop()
+        self.patch_b.stop()
 
     def test_transport_on_readable_reads_symbol_from_fd(self):
         self.transport.on_readable(Mock(), Mock())
@@ -1971,16 +1818,6 @@ class TestTransportOnReadable(Case):
         with self.assertRaises(IOError):
             self.transport.on_readable(Mock(), Mock())
 
-    def test_transport_on_readable_reads_e_off_of_pipe_raises_exc_info(self):
-        self.transport.session = Mock()
-        try:
-            raise IOError()
-        except Exception as error:
-            self.transport.session.saved_exception = error
-        self.mock_os_read.return_value = 'e'
-        with self.assertRaises(IOError):
-            self.transport.on_readable(Mock(), Mock())
-
 
 @case_no_python3
 @case_no_pypy
diff --git a/kombu/tests/transport/test_redis.py b/kombu/tests/transport/test_redis.py
index 46087a4..5d3aab9 100644
--- a/kombu/tests/transport/test_redis.py
+++ b/kombu/tests/transport/test_redis.py
@@ -5,6 +5,7 @@ import types
 
 from anyjson import dumps, loads
 from collections import defaultdict
+from contextlib import contextmanager
 from itertools import count
 
 from kombu import Connection, Exchange, Queue, Consumer, Producer
@@ -18,6 +19,23 @@ from kombu.tests.case import (
 )
 
 
+class JSONEqual(object):
+    # The order in which a dict is serialized to json depends on the hashseed
+    # so we have this to support json in .assert_has_call*.
+
+    def __init__(self, expected):
+        self.expected = expected
+
+    def __eq__(self, other):
+        return loads(other) == loads(self.expected)
+
+    def __str__(self):
+        return self.expected
+
+    def __repr__(self):
+        return '(json)%r' % (self.expected,)
+
+
 class _poll(eventio._select):
 
     def register(self, fd, flags):
@@ -171,6 +189,9 @@ class Client(object):
 
         return self
 
+    def __repr__(self):
+        return '<MockClient: %r' % (id(self),)
+
 
 class Pipeline(object):
 
@@ -201,13 +222,21 @@ class Pipeline(object):
 
 
 class Channel(redis.Channel):
+    Client = Client
 
     def _get_async_client(self):
         return Client
 
+    def _create_client(self, async=False):
+        return Client()
+
     def _get_pool(self, async=False):
         return Mock()
 
+    @contextmanager
+    def conn_or_acquire(self, client=None):
+        yield client if client is not None else self._create_client()
+
     def _get_response_error(self):
         return ResponseError
 
@@ -280,8 +309,8 @@ class test_Channel(Case):
                 self._pool = pool_at_init[0]
                 super(XChannel, self).__init__(*args, **kwargs)
 
-            def _get_async_client(self):
-                return lambda *_, **__: client
+            def _create_client(self, async=False):
+                return client
 
         class XTransport(Transport):
             Channel = XChannel
@@ -332,8 +361,10 @@ class test_Channel(Case):
         self.channel._do_restore_message(
             pl2, 'ex', 'rkey', client,
         )
+
         client.rpush.assert_has_calls([
-            call('george', spl2), call('elaine', spl2),
+            call('george', JSONEqual(spl2)),
+            call('elaine', JSONEqual(spl2)),
         ])
 
         client.rpush.side_effect = KeyError()
@@ -347,7 +378,8 @@ class test_Channel(Case):
         message = Mock(name='message')
         with patch('kombu.transport.redis.loads') as loads:
             loads.return_value = 'M', 'EX', 'RK'
-            client = self.channel.client = Mock(name='client')
+            client = self.channel._create_client = Mock(name='client')
+            client = client()
             client.pipeline = ContextMock()
             restore = self.channel._do_restore_message = Mock(
                 name='_do_restore_message',
@@ -376,7 +408,8 @@ class test_Channel(Case):
             restore.assert_called_with('M', 'EX', 'RK', client, False)
 
     def test_qos_restore_visible(self):
-        client = self.channel.client = Mock(name='client')
+        client = self.channel._create_client = Mock(name='client')
+        client = client()
 
         def pipe(*args, **kwargs):
             return Pipeline(client)
@@ -556,36 +589,37 @@ class test_Channel(Case):
 
     def test_put_fanout(self):
         self.channel._in_poll = False
-        c = self.channel.client = Mock()
+        c = self.channel._create_client = Mock()
 
         body = {'hello': 'world'}
         self.channel._put_fanout('exchange', body, '')
-        c.publish.assert_called_with('exchange', dumps(body))
+        c().publish.assert_called_with('exchange', JSONEqual(dumps(body)))
 
     def test_put_priority(self):
-        client = self.channel.client = Mock(name='client')
+        client = self.channel._create_client = Mock(name='client')
         msg1 = {'properties': {'delivery_info': {'priority': 3}}}
 
         self.channel._put('george', msg1)
-        client.lpush.assert_called_with(
-            self.channel._q_for_pri('george', 3), dumps(msg1),
+        client().lpush.assert_called_with(
+            self.channel._q_for_pri('george', 3), JSONEqual(dumps(msg1)),
         )
 
         msg2 = {'properties': {'delivery_info': {'priority': 313}}}
         self.channel._put('george', msg2)
-        client.lpush.assert_called_with(
-            self.channel._q_for_pri('george', 9), dumps(msg2),
+        client().lpush.assert_called_with(
+            self.channel._q_for_pri('george', 9), JSONEqual(dumps(msg2)),
         )
 
         msg3 = {'properties': {'delivery_info': {}}}
         self.channel._put('george', msg3)
-        client.lpush.assert_called_with(
-            self.channel._q_for_pri('george', 0), dumps(msg3),
+        client().lpush.assert_called_with(
+            self.channel._q_for_pri('george', 0), JSONEqual(dumps(msg3)),
         )
 
     def test_delete(self):
         x = self.channel
-        self.channel._in_poll = False
+        x._create_client = Mock()
+        x._create_client.return_value = x.client
         delete = x.client.delete = Mock()
         srem = x.client.srem = Mock()
 
@@ -597,7 +631,8 @@ class test_Channel(Case):
         )
 
     def test_has_queue(self):
-        self.channel._in_poll = False
+        self.channel._create_client = Mock()
+        self.channel._create_client.return_value = self.channel.client
         exists = self.channel.client.exists = Mock()
         exists.return_value = True
         self.assertTrue(self.channel._has_queue('foo'))
@@ -682,23 +717,6 @@ class test_Channel(Case):
         self.assertIs(redis.Channel._get_response_error(self.channel),
                       ResponseError)
 
-    def test_avail_client_when_not_in_poll(self):
-        self.channel._in_poll = False
-        c = self.channel.client = Mock()
-
-        with self.channel.conn_or_acquire() as client:
-            self.assertIs(client, c)
-
-    def test_avail_client_when_in_poll(self):
-        self.channel._in_poll = True
-        self.channel._pool = Mock()
-        cc = self.channel._create_client = Mock()
-        cc.return_value = Mock()
-
-        with self.channel.conn_or_acquire():
-            pass
-        cc.assert_called_with()
-
     def test_register_with_event_loop(self):
         transport = self.connection.transport
         transport.cycle = Mock(name='cycle')
diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py
index a5a0d12..639d837 100644
--- a/kombu/transport/qpid.py
+++ b/kombu/transport/qpid.py
@@ -8,7 +8,6 @@ broker management.
 .. _`Qpid`: http://qpid.apache.org/
 .. _`qpid-python`: http://pypi.python.org/pypi/qpid-python/
 .. _`qpid-tools`: http://pypi.python.org/pypi/qpid-tools/
-.. _`Issue 2199`: https://github.com/celery/celery/issues/2199
 
 The use this transport you must install the necessary dependencies. These
 dependencies are available via PyPI and can be installed using the pip
@@ -22,11 +21,6 @@ command:
     to underlying dependencies not being compatible. This version is
     tested and works with with Python 2.7.
 
-.. admonition:: Potential Deadlock
-
-    This transport should be used with caution due to a known
-    potential deadlock. See `Issue 2199`_ for more details.
-
 Authentication
 ==============
 
@@ -84,7 +78,6 @@ import select
 import socket
 import ssl
 import sys
-import threading
 import time
 
 from itertools import count
@@ -1331,81 +1324,6 @@ class Connection(object):
             channel.connection = None
 
 
-class ReceiversMonitor(threading.Thread):
-    """A monitoring thread that reads and handles messages from all receivers.
-
-    A single instance of ReceiversMonitor is expected to be created by
-    :class:`Transport`.
-
-    In :meth:`monitor_receivers`, the thread monitors all receivers
-    associated with the session created by the Transport using the blocking
-    call to session.next_receiver(). When any receiver has messages
-    available, a symbol '0' is written to the self._w_fd file descriptor. The
-    :meth:`monitor_receivers` is designed not to exit, and loops over
-    session.next_receiver() forever.
-
-    The entry point of the thread is :meth:`run` which calls
-    :meth:`monitor_receivers`.
-
-    The thread is designed to be daemonized, and will be forcefully killed
-    when all non-daemon threads have already exited.
-    """
-
-    def __init__(self, session, w):
-        """Instantiate a ReceiversMonitor object
-
-        :param session: The session which needs all of its receivers
-            monitored.
-        :type session: :class:`qpid.messaging.endpoints.Session`
-        :param w: The file descriptor to write the '0' into when
-            next_receiver unblocks.
-        :type w: int
-        """
-        super(ReceiversMonitor, self).__init__()
-        self._session = session
-        self._w_fd = w
-
-    def run(self):
-        """Thread entry point for ReceiversMonitor
-
-        Calls :meth:`monitor_receivers` with a log-and-reenter behavior for
-        non connection errors. This guards against unexpected exceptions
-        which could cause this thread to exit unexpectedly.
-
-        A :class:`qpid.messaging.exceptions.SessionClosed` exception should
-        cause this thread to exit. This is a normal exit condition and the
-        thread is no longer needed.
-
-        If a connection error occurs, the exception needs to be propagated
-        to MainThread where the kombu exception handler can properly handle
-        it. The exception is stored as saved_exception on the self._session
-        object. The character 'e' is then written to the self.w_fd file
-        descriptor and then this thread exits.
-        """
-        while True:
-            try:
-                self.monitor_receivers()
-            except Transport.recoverable_connection_errors as exc:
-                self._session.saved_exception = exc
-                os.write(self._w_fd, 'e')
-                break
-            except SessionClosed:
-                break
-            except Exception as exc:
-                logger.error(exc, exc_info=1)
-            time.sleep(10)
-
-    def monitor_receivers(self):
-        """Monitor all receivers, and write to _w_fd when a message is ready.
-
-        The call to next_receiver() blocks until a message is ready. Once a
-        message is ready, write a '0' to _w_fd.
-        """
-        while True:
-            self._session.next_receiver()
-            os.write(self._w_fd, '0')
-
-
 class Transport(base.Transport):
     """Kombu native transport for a Qpid broker.
 
@@ -1523,24 +1441,23 @@ class Transport(base.Transport):
                 'with your package manager. You can also try `pip install '
                 'qpid-python`.')
 
+    def _qpid_session_ready(self):
+        os.write(self._w, '0')
+
     def on_readable(self, connection, loop):
         """Handle any messages associated with this Transport.
 
         This method clears a single message from the externally monitored
         file descriptor by issuing a read call to the self.r file descriptor
         which removes a single '0' character that was placed into the pipe
-        by :class:`ReceiversMonitor`. Once a '0' is read, all available
-        events are drained through a call to :meth:`drain_events`.
+        by the Qpid session message callback handler. Once a '0' is read,
+        all available events are drained through a call to
+        :meth:`drain_events`.
 
         The behavior of self.r is adjusted in __init__ to be non-blocking,
         ensuring that an accidental call to this method when no more messages
         will arrive will not cause indefinite blocking.
 
-        If the self.r file descriptor receives the character 'e', an error
-        occurred in the background thread, and this thread should raise the
-        saved exception. The exception is stored as saved_exception on the
-        session object.
-
         Nothing is expected to be returned from :meth:`drain_events` because
         :meth:`drain_events` handles messages by calling callbacks that are
         maintained on the :class:`Connection` object. When
@@ -1575,9 +1492,7 @@ class Transport(base.Transport):
             functionality.
         :type loop: kombu.async.Hub
         """
-        symbol = os.read(self.r, 1)
-        if symbol == 'e':
-            raise self.session.saved_exception
+        os.read(self.r, 1)
         try:
             self.drain_events(connection)
         except socket.timeout:
@@ -1589,7 +1504,7 @@ class Transport(base.Transport):
         Register the callback self.on_readable to be called when an
         external epoll loop sees that the file descriptor registered is
         ready for reading. The file descriptor is created by this Transport,
-        and is updated by the ReceiversMonitor thread.
+        and is written to when a message is available.
 
         Because supports_ev == True, Celery expects to call this method to
         give the Transport an opportunity to register a read file descriptor
@@ -1685,9 +1600,7 @@ class Transport(base.Transport):
         conn = self.Connection(**opts)
         conn.client = self.client
         self.session = conn.get_qpid_connection().session()
-        monitor_thread = ReceiversMonitor(self.session, self._w)
-        monitor_thread.daemon = True
-        monitor_thread.start()
+        self.session.set_message_received_handler(self._qpid_session_ready)
         return conn
 
     def close_connection(self, connection):
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py
index 5005854..573bdd7 100644
--- a/kombu/transport/redis.py
+++ b/kombu/transport/redis.py
@@ -869,10 +869,7 @@ class Channel(virtual.Channel):
         if client:
             yield client
         else:
-            if self._in_poll:
-                yield self._create_client()
-            else:
-                yield self.client
+            yield self._create_client()
 
     @property
     def pool(self):
diff --git a/requirements/default.txt b/requirements/default.txt
index ae9bbd2..13bd65e 100644
--- a/requirements/default.txt
+++ b/requirements/default.txt
@@ -1,2 +1,2 @@
 anyjson>=0.3.3
-amqp>=1.4.7,<2.0
+amqp>=1.4.9,<2.0

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/kombu.git



More information about the Python-modules-commits mailing list