[Python-modules-commits] [kombu] 01/05: Import kombu_3.0.33.orig.tar.gz
Brian May
bam at moszumanska.debian.org
Mon Jan 18 07:34:52 UTC 2016
This is an automated email from the git hooks/post-receive script.
bam pushed a commit to branch master
in repository kombu.
commit 5bdc5a4c59a815f0c189f1276b53bf0f8c5d1092
Author: Brian May <bam at debian.org>
Date: Mon Jan 18 16:27:28 2016 +1100
Import kombu_3.0.33.orig.tar.gz
---
Changelog | 15 +++
PKG-INFO | 4 +-
README.rst | 2 +-
docs/changelog.rst | 15 +++
docs/introduction.rst | 2 +-
kombu.egg-info/PKG-INFO | 4 +-
kombu.egg-info/requires.txt | 2 +-
kombu/__init__.py | 2 +-
kombu/five.py | 9 +-
kombu/tests/transport/test_qpid.py | 227 +++++-------------------------------
kombu/tests/transport/test_redis.py | 84 +++++++------
kombu/transport/qpid.py | 105 ++---------------
kombu/transport/redis.py | 5 +-
requirements/default.txt | 2 +-
14 files changed, 136 insertions(+), 342 deletions(-)
diff --git a/Changelog b/Changelog
index a5ef237..637c612 100644
--- a/Changelog
+++ b/Changelog
@@ -4,6 +4,21 @@
Change history
================
+.. _version-3.0.33:
+
+3.0.33
+======
+:release-date: 2016-01-08 06:36 P.M PST
+:release-by: Ask Solem
+
+- Now depends on :mod:`amqp` 1.4.9.
+
+- Redis: Fixed problem with auxilliary connections causing the main
+ consumer connection to be closed (Issue #550).
+
+- Qpid: No longer uses threads to operate, to ensure compatibility with
+ all environments (Issue #531).
+
.. _version-3.0.32:
3.0.32
diff --git a/PKG-INFO b/PKG-INFO
index 5d7ec1f..0963536 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,6 +1,6 @@
Metadata-Version: 1.1
Name: kombu
-Version: 3.0.32
+Version: 3.0.33
Summary: Messaging library for Python
Home-page: http://kombu.readthedocs.org
Author: Ask Solem
@@ -12,7 +12,7 @@ Description: .. _kombu-index:
kombu - Messaging library for Python
========================================
- :Version: 3.0.32
+ :Version: 3.0.33
`Kombu` is a messaging library for Python.
diff --git a/README.rst b/README.rst
index 642dddd..891984b 100644
--- a/README.rst
+++ b/README.rst
@@ -4,7 +4,7 @@
kombu - Messaging library for Python
========================================
-:Version: 3.0.32
+:Version: 3.0.33
`Kombu` is a messaging library for Python.
diff --git a/docs/changelog.rst b/docs/changelog.rst
index a5ef237..637c612 100644
--- a/docs/changelog.rst
+++ b/docs/changelog.rst
@@ -4,6 +4,21 @@
Change history
================
+.. _version-3.0.33:
+
+3.0.33
+======
+:release-date: 2016-01-08 06:36 P.M PST
+:release-by: Ask Solem
+
+- Now depends on :mod:`amqp` 1.4.9.
+
+- Redis: Fixed problem with auxilliary connections causing the main
+ consumer connection to be closed (Issue #550).
+
+- Qpid: No longer uses threads to operate, to ensure compatibility with
+ all environments (Issue #531).
+
.. _version-3.0.32:
3.0.32
diff --git a/docs/introduction.rst b/docs/introduction.rst
index 642dddd..891984b 100644
--- a/docs/introduction.rst
+++ b/docs/introduction.rst
@@ -4,7 +4,7 @@
kombu - Messaging library for Python
========================================
-:Version: 3.0.32
+:Version: 3.0.33
`Kombu` is a messaging library for Python.
diff --git a/kombu.egg-info/PKG-INFO b/kombu.egg-info/PKG-INFO
index 5d7ec1f..0963536 100644
--- a/kombu.egg-info/PKG-INFO
+++ b/kombu.egg-info/PKG-INFO
@@ -1,6 +1,6 @@
Metadata-Version: 1.1
Name: kombu
-Version: 3.0.32
+Version: 3.0.33
Summary: Messaging library for Python
Home-page: http://kombu.readthedocs.org
Author: Ask Solem
@@ -12,7 +12,7 @@ Description: .. _kombu-index:
kombu - Messaging library for Python
========================================
- :Version: 3.0.32
+ :Version: 3.0.33
`Kombu` is a messaging library for Python.
diff --git a/kombu.egg-info/requires.txt b/kombu.egg-info/requires.txt
index b2f1e7a..a51256f 100644
--- a/kombu.egg-info/requires.txt
+++ b/kombu.egg-info/requires.txt
@@ -1,5 +1,5 @@
anyjson>=0.3.3
-amqp>=1.4.7,<2.0
+amqp>=1.4.9,<2.0
[sqlalchemy]
sqlalchemy
diff --git a/kombu/__init__.py b/kombu/__init__.py
index 7105102..42a8590 100644
--- a/kombu/__init__.py
+++ b/kombu/__init__.py
@@ -11,7 +11,7 @@ version_info_t = namedtuple(
'version_info_t', ('major', 'minor', 'micro', 'releaselevel', 'serial'),
)
-VERSION = version_info_t(3, 0, 32, '', '')
+VERSION = version_info_t(3, 0, 33, '', '')
__version__ = '{0.major}.{0.minor}.{0.micro}{0.releaselevel}'.format(VERSION)
__author__ = 'Ask Solem'
__contact__ = 'ask at celeryproject.org'
diff --git a/kombu/five.py b/kombu/five.py
index 06cbd91..d36f252 100644
--- a/kombu/five.py
+++ b/kombu/five.py
@@ -41,13 +41,12 @@ if sys.version_info < (3, 3):
import platform
SYSTEM = platform.system()
- has_ctypes = True
try:
import ctypes
- except ImportError:
- has_ctypes = False
+ except ImportError: # pragma: no cover
+ ctypes = None # noqa
- if SYSTEM == 'Darwin' and has_ctypes:
+ if SYSTEM == 'Darwin' and ctypes is not None:
from ctypes.util import find_library
libSystem = ctypes.CDLL(find_library('libSystem.dylib'))
CoreServices = ctypes.CDLL(find_library('CoreServices'),
@@ -61,7 +60,7 @@ if sys.version_info < (3, 3):
def _monotonic():
return absolute_to_nanoseconds(mach_absolute_time()) * 1e-9
- elif SYSTEM == 'Linux' and has_ctypes:
+ elif SYSTEM == 'Linux' and ctypes is not None:
# from stackoverflow:
# questions/1205722/how-do-i-get-monotonic-time-durations-in-python
import os
diff --git a/kombu/tests/transport/test_qpid.py b/kombu/tests/transport/test_qpid.py
index 38a8f51..4131193 100644
--- a/kombu/tests/transport/test_qpid.py
+++ b/kombu/tests/transport/test_qpid.py
@@ -4,7 +4,6 @@ import select
import ssl
import socket
import sys
-import threading
import time
from collections import Callable
@@ -16,7 +15,7 @@ from mock import call
from kombu.five import Empty, keys, range, monotonic
from kombu.transport.qpid import (AuthenticationFailure, Channel, Connection,
ConnectionError, Message, NotFound, QoS,
- ReceiversMonitor, Transport)
+ Transport)
from kombu.transport.virtual import Base64
from kombu.tests.case import Case, Mock, case_no_pypy, case_no_python3
from kombu.tests.case import patch
@@ -855,9 +854,9 @@ class TestChannelQueueDelete(ChannelTestBase):
self.mock_queue = Mock()
def tearDown(self):
- self.mock__has_queue.stop()
- self.mock__size.stop()
- self.mock__delete.stop()
+ self.patch__has_queue.stop()
+ self.patch__size.stop()
+ self.patch__delete.stop()
super(TestChannelQueueDelete, self).tearDown()
def test_checks_if_queue_exists(self):
@@ -1397,164 +1396,6 @@ class TestChannel(ExtraAssertionsMixin, Case):
@case_no_python3
@case_no_pypy
-class ReceiversMonitorTestBase(Case):
-
- def setUp(self):
- self.mock_session = Mock()
- self.mock_w = Mock()
- self.monitor = ReceiversMonitor(self.mock_session, self.mock_w)
-
-
- at case_no_python3
- at case_no_pypy
-class TestReceiversMonitorType(ReceiversMonitorTestBase):
-
- def test_qpid_messaging_receivers_monitor_subclass_of_threading(self):
- self.assertIsInstance(self.monitor, threading.Thread)
-
-
- at case_no_python3
- at case_no_pypy
-class TestReceiversMonitorInit(ReceiversMonitorTestBase):
-
- def setUp(self):
- thread___init___str = QPID_MODULE + '.threading.Thread.__init__'
- self.patch_parent___init__ = patch(thread___init___str)
- self.mock_Thread___init__ = self.patch_parent___init__.start()
- super(TestReceiversMonitorInit, self).setUp()
-
- def tearDown(self):
- self.patch_parent___init__.stop()
-
- def test_qpid_messaging_receivers_monitor_init_saves_session(self):
- self.assertIs(self.monitor._session, self.mock_session)
-
- def test_qpid_messaging_receivers_monitor_init_saves_fd(self):
- self.assertIs(self.monitor._w_fd, self.mock_w)
-
- def test_qpid_messaging_Receivers_monitor_init_calls_parent__init__(self):
- self.mock_Thread___init__.assert_called_once_with()
-
-
- at case_no_python3
- at case_no_pypy
-class TestReceiversMonitorRun(ReceiversMonitorTestBase):
-
- @patch.object(ReceiversMonitor, 'monitor_receivers')
- @patch(QPID_MODULE + '.time.sleep')
- def test_receivers_monitor_run_calls_monitor_receivers(
- self, mock_sleep, mock_monitor_receivers):
- mock_sleep.side_effect = BreakOutException()
- with self.assertRaises(BreakOutException):
- self.monitor.run()
- mock_monitor_receivers.assert_called_once_with()
-
- @patch(QPID_MODULE + '.SessionClosed', new=QpidException)
- @patch.object(ReceiversMonitor, 'monitor_receivers')
- @patch(QPID_MODULE + '.time.sleep')
- def test_receivers_monitor_run_exits_on_session_closed(
- self, mock_sleep, mock_monitor_receivers):
- mock_monitor_receivers.side_effect = QpidException()
- try:
- self.monitor.run()
- except Exception:
- self.fail('No exception should be raised here')
- mock_monitor_receivers.assert_called_once_with()
- mock_sleep.has_calls([])
-
- @patch.object(Transport, 'connection_errors', new=(BreakOutException, ))
- @patch.object(ReceiversMonitor, 'monitor_receivers')
- @patch(QPID_MODULE + '.time.sleep')
- @patch(QPID_MODULE + '.logger')
- def test_receivers_monitors_run_calls_logs_exception_and_sleeps(
- self, mock_logger, mock_sleep, mock_monitor_receivers):
- exc_to_raise = IOError()
- mock_monitor_receivers.side_effect = exc_to_raise
- mock_sleep.side_effect = BreakOutException()
- with self.assertRaises(BreakOutException):
- self.monitor.run()
- mock_logger.error.assert_called_once_with(exc_to_raise, exc_info=1)
- mock_sleep.assert_called_once_with(10)
-
- @patch.object(ReceiversMonitor, 'monitor_receivers')
- @patch(QPID_MODULE + '.time.sleep')
- def test_receivers_monitor_run_loops_when_exception_is_raised(
- self, mock_sleep, mock_monitor_receivers):
- def return_once_raise_on_second_call(*args):
- mock_sleep.side_effect = BreakOutException()
- mock_sleep.side_effect = return_once_raise_on_second_call
- with self.assertRaises(BreakOutException):
- self.monitor.run()
- mock_monitor_receivers.has_calls([call(), call()])
-
- @patch.object(Transport, 'recoverable_connection_errors',
- new=(QpidException, ))
- @patch.object(ReceiversMonitor, 'monitor_receivers')
- @patch(QPID_MODULE + '.time.sleep')
- @patch(QPID_MODULE + '.logger')
- @patch(QPID_MODULE + '.os.write')
- @patch(QPID_MODULE + '.sys.exc_info')
- def test_receivers_monitor_exits_when_recoverable_exception_raised(
- self, mock_sys_exc_info, mock_os_write, mock_logger, mock_sleep,
- mock_monitor_receivers):
- mock_monitor_receivers.side_effect = QpidException()
- mock_sleep.side_effect = BreakOutException()
- self.monitor.run()
- self.assertFalse(mock_logger.error.called)
-
- @patch.object(Transport, 'recoverable_connection_errors',
- new=(QpidException, ))
- @patch.object(ReceiversMonitor, 'monitor_receivers')
- @patch(QPID_MODULE + '.time.sleep')
- @patch(QPID_MODULE + '.logger')
- @patch(QPID_MODULE + '.os.write')
- def test_receivers_monitor_saves_exception_when_recoverable_exc_raised(
- self, mock_os_write, mock_logger, mock_sleep,
- mock_monitor_receivers):
- mock_monitor_receivers.side_effect = QpidException()
- mock_sleep.side_effect = BreakOutException()
- self.monitor.run()
- self.assertIs(
- self.mock_session.saved_exception,
- mock_monitor_receivers.side_effect,
- )
-
- @patch.object(Transport, 'recoverable_connection_errors',
- new=(QpidException, ))
- @patch.object(ReceiversMonitor, 'monitor_receivers')
- @patch(QPID_MODULE + '.time.sleep')
- @patch(QPID_MODULE + '.logger')
- @patch(QPID_MODULE + '.os.write')
- @patch(QPID_MODULE + '.sys.exc_info')
- def test_receivers_monitor_writes_e_to_pipe_when_recoverable_exc_raised(
- self, mock_sys_exc_info, mock_os_write, mock_logger, mock_sleep,
- mock_monitor_receivers):
- mock_monitor_receivers.side_effect = QpidException()
- mock_sleep.side_effect = BreakOutException()
- self.monitor.run()
- mock_os_write.assert_called_once_with(self.mock_w, 'e')
-
-
- at case_no_python3
- at case_no_pypy
-class TestReceiversMonitorMonitorReceivers(ReceiversMonitorTestBase):
-
- def test_receivers_monitor_monitor_receivers_calls_next_receivers(self):
- self.mock_session.next_receiver.side_effect = BreakOutException()
- with self.assertRaises(BreakOutException):
- self.monitor.monitor_receivers()
- self.mock_session.next_receiver.assert_called_once_with()
-
- def test_receivers_monitor_monitor_receivers_writes_to_fd(self):
- with patch(QPID_MODULE + '.os.write') as mock_os_write:
- mock_os_write.side_effect = BreakOutException()
- with self.assertRaises(BreakOutException):
- self.monitor.monitor_receivers()
- mock_os_write.assert_called_once_with(self.mock_w, '0')
-
-
- at case_no_python3
- at case_no_pypy
@disable_runtime_dependency_check
class TestTransportInit(Case):
@@ -1699,12 +1540,6 @@ class TestTransportEstablishConnection(Case):
self.transport = Transport(self.client)
self.mock_conn = Mock()
self.transport.Connection = self.mock_conn
- path_to_mock = QPID_MODULE + '.ReceiversMonitor'
- self.patcher = patch(path_to_mock)
- self.mock_ReceiverMonitor = self.patcher.start()
-
- def tearDown(self):
- self.patcher.stop()
def test_transport_establish_conn_new_option_overwrites_default(self):
self.client.userid = 'new-userid'
@@ -1863,22 +1698,7 @@ class TestTransportEstablishConnection(Case):
new_conn = self.transport.establish_connection()
self.assertIs(new_conn, self.mock_conn.return_value)
- def test_transport_establish_conn_creates_ReceiversMonitor(self):
- self.transport.establish_connection()
- self.mock_ReceiverMonitor.assert_called_once_with(
- self.transport.session, self.transport._w,
- )
-
- def test_transport_establish_conn_daemonizes_thread(self):
- self.transport.establish_connection()
- self.assertTrue(self.mock_ReceiverMonitor.return_value.daemon)
-
- def test_transport_establish_conn_starts_thread(self):
- self.transport.establish_connection()
- new_receiver_monitor = self.mock_ReceiverMonitor.return_value
- new_receiver_monitor.start.assert_called_once_with()
-
- def test_transport_establish_conn_ignores_hostname_if_not_localhost(self):
+ def test_transport_establish_conn_uses_hostname_if_not_default(self):
self.client.hostname = 'some_other_hostname'
self.transport.establish_connection()
self.mock_conn.assert_called_once_with(
@@ -1889,6 +1709,14 @@ class TestTransportEstablishConnection(Case):
transport='tcp',
)
+ def test_transport_sets_qpid_message_received_handler(self):
+ self.transport.establish_connection()
+ qpid_conn = self.mock_conn.return_value.get_qpid_connection
+ new_mock_session = qpid_conn.return_value.session.return_value
+ mock_set_callback = new_mock_session.set_message_received_handler
+ expected_callback = self.transport._qpid_session_ready
+ mock_set_callback.assert_called_once_with(expected_callback)
+
@case_no_python3
@case_no_pypy
@@ -1941,6 +1769,24 @@ class TestTransportRegisterWithEventLoop(Case):
@case_no_python3
@case_no_pypy
@disable_runtime_dependency_check
+class TestTransportQpidSessionReady(Case):
+
+ def setUp(self):
+ self.patch_a = patch(QPID_MODULE + '.os.write')
+ self.mock_os_write = self.patch_a.start()
+ self.transport = Transport(Mock())
+
+ def tearDown(self):
+ self.patch_a.stop()
+
+ def test_transport__qpid_session_ready_writes_symbol_to_fd(self):
+ self.transport._qpid_session_ready()
+ self.mock_os_write.assert_called_once_with(self.transport._w, '0')
+
+
+ at case_no_python3
+ at case_no_pypy
+ at disable_runtime_dependency_check
class TestTransportOnReadable(Case):
def setUp(self):
@@ -1952,6 +1798,7 @@ class TestTransportOnReadable(Case):
def tearDown(self):
self.patch_a.stop()
+ self.patch_b.stop()
def test_transport_on_readable_reads_symbol_from_fd(self):
self.transport.on_readable(Mock(), Mock())
@@ -1971,16 +1818,6 @@ class TestTransportOnReadable(Case):
with self.assertRaises(IOError):
self.transport.on_readable(Mock(), Mock())
- def test_transport_on_readable_reads_e_off_of_pipe_raises_exc_info(self):
- self.transport.session = Mock()
- try:
- raise IOError()
- except Exception as error:
- self.transport.session.saved_exception = error
- self.mock_os_read.return_value = 'e'
- with self.assertRaises(IOError):
- self.transport.on_readable(Mock(), Mock())
-
@case_no_python3
@case_no_pypy
diff --git a/kombu/tests/transport/test_redis.py b/kombu/tests/transport/test_redis.py
index 46087a4..5d3aab9 100644
--- a/kombu/tests/transport/test_redis.py
+++ b/kombu/tests/transport/test_redis.py
@@ -5,6 +5,7 @@ import types
from anyjson import dumps, loads
from collections import defaultdict
+from contextlib import contextmanager
from itertools import count
from kombu import Connection, Exchange, Queue, Consumer, Producer
@@ -18,6 +19,23 @@ from kombu.tests.case import (
)
+class JSONEqual(object):
+ # The order in which a dict is serialized to json depends on the hashseed
+ # so we have this to support json in .assert_has_call*.
+
+ def __init__(self, expected):
+ self.expected = expected
+
+ def __eq__(self, other):
+ return loads(other) == loads(self.expected)
+
+ def __str__(self):
+ return self.expected
+
+ def __repr__(self):
+ return '(json)%r' % (self.expected,)
+
+
class _poll(eventio._select):
def register(self, fd, flags):
@@ -171,6 +189,9 @@ class Client(object):
return self
+ def __repr__(self):
+ return '<MockClient: %r' % (id(self),)
+
class Pipeline(object):
@@ -201,13 +222,21 @@ class Pipeline(object):
class Channel(redis.Channel):
+ Client = Client
def _get_async_client(self):
return Client
+ def _create_client(self, async=False):
+ return Client()
+
def _get_pool(self, async=False):
return Mock()
+ @contextmanager
+ def conn_or_acquire(self, client=None):
+ yield client if client is not None else self._create_client()
+
def _get_response_error(self):
return ResponseError
@@ -280,8 +309,8 @@ class test_Channel(Case):
self._pool = pool_at_init[0]
super(XChannel, self).__init__(*args, **kwargs)
- def _get_async_client(self):
- return lambda *_, **__: client
+ def _create_client(self, async=False):
+ return client
class XTransport(Transport):
Channel = XChannel
@@ -332,8 +361,10 @@ class test_Channel(Case):
self.channel._do_restore_message(
pl2, 'ex', 'rkey', client,
)
+
client.rpush.assert_has_calls([
- call('george', spl2), call('elaine', spl2),
+ call('george', JSONEqual(spl2)),
+ call('elaine', JSONEqual(spl2)),
])
client.rpush.side_effect = KeyError()
@@ -347,7 +378,8 @@ class test_Channel(Case):
message = Mock(name='message')
with patch('kombu.transport.redis.loads') as loads:
loads.return_value = 'M', 'EX', 'RK'
- client = self.channel.client = Mock(name='client')
+ client = self.channel._create_client = Mock(name='client')
+ client = client()
client.pipeline = ContextMock()
restore = self.channel._do_restore_message = Mock(
name='_do_restore_message',
@@ -376,7 +408,8 @@ class test_Channel(Case):
restore.assert_called_with('M', 'EX', 'RK', client, False)
def test_qos_restore_visible(self):
- client = self.channel.client = Mock(name='client')
+ client = self.channel._create_client = Mock(name='client')
+ client = client()
def pipe(*args, **kwargs):
return Pipeline(client)
@@ -556,36 +589,37 @@ class test_Channel(Case):
def test_put_fanout(self):
self.channel._in_poll = False
- c = self.channel.client = Mock()
+ c = self.channel._create_client = Mock()
body = {'hello': 'world'}
self.channel._put_fanout('exchange', body, '')
- c.publish.assert_called_with('exchange', dumps(body))
+ c().publish.assert_called_with('exchange', JSONEqual(dumps(body)))
def test_put_priority(self):
- client = self.channel.client = Mock(name='client')
+ client = self.channel._create_client = Mock(name='client')
msg1 = {'properties': {'delivery_info': {'priority': 3}}}
self.channel._put('george', msg1)
- client.lpush.assert_called_with(
- self.channel._q_for_pri('george', 3), dumps(msg1),
+ client().lpush.assert_called_with(
+ self.channel._q_for_pri('george', 3), JSONEqual(dumps(msg1)),
)
msg2 = {'properties': {'delivery_info': {'priority': 313}}}
self.channel._put('george', msg2)
- client.lpush.assert_called_with(
- self.channel._q_for_pri('george', 9), dumps(msg2),
+ client().lpush.assert_called_with(
+ self.channel._q_for_pri('george', 9), JSONEqual(dumps(msg2)),
)
msg3 = {'properties': {'delivery_info': {}}}
self.channel._put('george', msg3)
- client.lpush.assert_called_with(
- self.channel._q_for_pri('george', 0), dumps(msg3),
+ client().lpush.assert_called_with(
+ self.channel._q_for_pri('george', 0), JSONEqual(dumps(msg3)),
)
def test_delete(self):
x = self.channel
- self.channel._in_poll = False
+ x._create_client = Mock()
+ x._create_client.return_value = x.client
delete = x.client.delete = Mock()
srem = x.client.srem = Mock()
@@ -597,7 +631,8 @@ class test_Channel(Case):
)
def test_has_queue(self):
- self.channel._in_poll = False
+ self.channel._create_client = Mock()
+ self.channel._create_client.return_value = self.channel.client
exists = self.channel.client.exists = Mock()
exists.return_value = True
self.assertTrue(self.channel._has_queue('foo'))
@@ -682,23 +717,6 @@ class test_Channel(Case):
self.assertIs(redis.Channel._get_response_error(self.channel),
ResponseError)
- def test_avail_client_when_not_in_poll(self):
- self.channel._in_poll = False
- c = self.channel.client = Mock()
-
- with self.channel.conn_or_acquire() as client:
- self.assertIs(client, c)
-
- def test_avail_client_when_in_poll(self):
- self.channel._in_poll = True
- self.channel._pool = Mock()
- cc = self.channel._create_client = Mock()
- cc.return_value = Mock()
-
- with self.channel.conn_or_acquire():
- pass
- cc.assert_called_with()
-
def test_register_with_event_loop(self):
transport = self.connection.transport
transport.cycle = Mock(name='cycle')
diff --git a/kombu/transport/qpid.py b/kombu/transport/qpid.py
index a5a0d12..639d837 100644
--- a/kombu/transport/qpid.py
+++ b/kombu/transport/qpid.py
@@ -8,7 +8,6 @@ broker management.
.. _`Qpid`: http://qpid.apache.org/
.. _`qpid-python`: http://pypi.python.org/pypi/qpid-python/
.. _`qpid-tools`: http://pypi.python.org/pypi/qpid-tools/
-.. _`Issue 2199`: https://github.com/celery/celery/issues/2199
The use this transport you must install the necessary dependencies. These
dependencies are available via PyPI and can be installed using the pip
@@ -22,11 +21,6 @@ command:
to underlying dependencies not being compatible. This version is
tested and works with with Python 2.7.
-.. admonition:: Potential Deadlock
-
- This transport should be used with caution due to a known
- potential deadlock. See `Issue 2199`_ for more details.
-
Authentication
==============
@@ -84,7 +78,6 @@ import select
import socket
import ssl
import sys
-import threading
import time
from itertools import count
@@ -1331,81 +1324,6 @@ class Connection(object):
channel.connection = None
-class ReceiversMonitor(threading.Thread):
- """A monitoring thread that reads and handles messages from all receivers.
-
- A single instance of ReceiversMonitor is expected to be created by
- :class:`Transport`.
-
- In :meth:`monitor_receivers`, the thread monitors all receivers
- associated with the session created by the Transport using the blocking
- call to session.next_receiver(). When any receiver has messages
- available, a symbol '0' is written to the self._w_fd file descriptor. The
- :meth:`monitor_receivers` is designed not to exit, and loops over
- session.next_receiver() forever.
-
- The entry point of the thread is :meth:`run` which calls
- :meth:`monitor_receivers`.
-
- The thread is designed to be daemonized, and will be forcefully killed
- when all non-daemon threads have already exited.
- """
-
- def __init__(self, session, w):
- """Instantiate a ReceiversMonitor object
-
- :param session: The session which needs all of its receivers
- monitored.
- :type session: :class:`qpid.messaging.endpoints.Session`
- :param w: The file descriptor to write the '0' into when
- next_receiver unblocks.
- :type w: int
- """
- super(ReceiversMonitor, self).__init__()
- self._session = session
- self._w_fd = w
-
- def run(self):
- """Thread entry point for ReceiversMonitor
-
- Calls :meth:`monitor_receivers` with a log-and-reenter behavior for
- non connection errors. This guards against unexpected exceptions
- which could cause this thread to exit unexpectedly.
-
- A :class:`qpid.messaging.exceptions.SessionClosed` exception should
- cause this thread to exit. This is a normal exit condition and the
- thread is no longer needed.
-
- If a connection error occurs, the exception needs to be propagated
- to MainThread where the kombu exception handler can properly handle
- it. The exception is stored as saved_exception on the self._session
- object. The character 'e' is then written to the self.w_fd file
- descriptor and then this thread exits.
- """
- while True:
- try:
- self.monitor_receivers()
- except Transport.recoverable_connection_errors as exc:
- self._session.saved_exception = exc
- os.write(self._w_fd, 'e')
- break
- except SessionClosed:
- break
- except Exception as exc:
- logger.error(exc, exc_info=1)
- time.sleep(10)
-
- def monitor_receivers(self):
- """Monitor all receivers, and write to _w_fd when a message is ready.
-
- The call to next_receiver() blocks until a message is ready. Once a
- message is ready, write a '0' to _w_fd.
- """
- while True:
- self._session.next_receiver()
- os.write(self._w_fd, '0')
-
-
class Transport(base.Transport):
"""Kombu native transport for a Qpid broker.
@@ -1523,24 +1441,23 @@ class Transport(base.Transport):
'with your package manager. You can also try `pip install '
'qpid-python`.')
+ def _qpid_session_ready(self):
+ os.write(self._w, '0')
+
def on_readable(self, connection, loop):
"""Handle any messages associated with this Transport.
This method clears a single message from the externally monitored
file descriptor by issuing a read call to the self.r file descriptor
which removes a single '0' character that was placed into the pipe
- by :class:`ReceiversMonitor`. Once a '0' is read, all available
- events are drained through a call to :meth:`drain_events`.
+ by the Qpid session message callback handler. Once a '0' is read,
+ all available events are drained through a call to
+ :meth:`drain_events`.
The behavior of self.r is adjusted in __init__ to be non-blocking,
ensuring that an accidental call to this method when no more messages
will arrive will not cause indefinite blocking.
- If the self.r file descriptor receives the character 'e', an error
- occurred in the background thread, and this thread should raise the
- saved exception. The exception is stored as saved_exception on the
- session object.
-
Nothing is expected to be returned from :meth:`drain_events` because
:meth:`drain_events` handles messages by calling callbacks that are
maintained on the :class:`Connection` object. When
@@ -1575,9 +1492,7 @@ class Transport(base.Transport):
functionality.
:type loop: kombu.async.Hub
"""
- symbol = os.read(self.r, 1)
- if symbol == 'e':
- raise self.session.saved_exception
+ os.read(self.r, 1)
try:
self.drain_events(connection)
except socket.timeout:
@@ -1589,7 +1504,7 @@ class Transport(base.Transport):
Register the callback self.on_readable to be called when an
external epoll loop sees that the file descriptor registered is
ready for reading. The file descriptor is created by this Transport,
- and is updated by the ReceiversMonitor thread.
+ and is written to when a message is available.
Because supports_ev == True, Celery expects to call this method to
give the Transport an opportunity to register a read file descriptor
@@ -1685,9 +1600,7 @@ class Transport(base.Transport):
conn = self.Connection(**opts)
conn.client = self.client
self.session = conn.get_qpid_connection().session()
- monitor_thread = ReceiversMonitor(self.session, self._w)
- monitor_thread.daemon = True
- monitor_thread.start()
+ self.session.set_message_received_handler(self._qpid_session_ready)
return conn
def close_connection(self, connection):
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py
index 5005854..573bdd7 100644
--- a/kombu/transport/redis.py
+++ b/kombu/transport/redis.py
@@ -869,10 +869,7 @@ class Channel(virtual.Channel):
if client:
yield client
else:
- if self._in_poll:
- yield self._create_client()
- else:
- yield self.client
+ yield self._create_client()
@property
def pool(self):
diff --git a/requirements/default.txt b/requirements/default.txt
index ae9bbd2..13bd65e 100644
--- a/requirements/default.txt
+++ b/requirements/default.txt
@@ -1,2 +1,2 @@
anyjson>=0.3.3
-amqp>=1.4.7,<2.0
+amqp>=1.4.9,<2.0
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/kombu.git
More information about the Python-modules-commits
mailing list