[Python-modules-commits] [kombu] 01/06: Import kombu_3.0.29.orig.tar.gz
Brian May
bam at moszumanska.debian.org
Wed Nov 18 23:06:28 UTC 2015
This is an automated email from the git hooks/post-receive script.
bam pushed a commit to branch master
in repository kombu.
commit d36ec12315555e6fa413edaf2d3c5428cac73dcd
Author: Brian May <bam at debian.org>
Date: Thu Nov 19 09:22:33 2015 +1100
Import kombu_3.0.29.orig.tar.gz
---
AUTHORS | 1 +
Changelog | 56 +++++++++++++++-
PKG-INFO | 4 +-
README.rst | 2 +-
docs/changelog.rst | 56 +++++++++++++++-
docs/introduction.rst | 2 +-
extra/release/doc4allmods | 2 +
kombu.egg-info/PKG-INFO | 4 +-
kombu.egg-info/SOURCES.txt | 2 +
kombu/__init__.py | 2 +-
kombu/entity.py | 11 +++-
kombu/serialization.py | 2 +-
kombu/tests/case.py | 3 +-
kombu/tests/test_entities.py | 8 +++
kombu/tests/transport/test_SQS.py | 16 ++---
kombu/tests/transport/test_redis.py | 19 ++++--
kombu/transport/django/migrations/0001_initial.py | 22 +++----
kombu/transport/django/migrations/__init__.py | 16 +++++
.../django/south_migrations/0001_initial.py | 57 ++++++++++++++++
.../{migrations => south_migrations}/__init__.py | 0
kombu/transport/mongodb.py | 46 +++++++++----
kombu/transport/redis.py | 77 +++++++++++-----------
requirements/test-ci3.txt | 1 +
23 files changed, 321 insertions(+), 88 deletions(-)
diff --git a/AUTHORS b/AUTHORS
index 81b70f1..4b3b471 100644
--- a/AUTHORS
+++ b/AUTHORS
@@ -42,6 +42,7 @@ Eric Reynolds <ereynolds at opendns.com>
Eric Wang <eric1990 at gmail.com>
Fabrice Rabaute <fabrice at expa.com>
Felix Schwarz <felix.schwarz at oss.schwarz.eu>
+Felix Yan <felixonmars at archlinux.org>
Fernando Jorge Mota <f.j.mota13 at gmail.com>
Flavio [FlaPer87] Percoco Premoli <flaper87 at flaper87.org>
Florian Munz <surf at theflow.de>
diff --git a/Changelog b/Changelog
index afa6f32..e31b733 100644
--- a/Changelog
+++ b/Changelog
@@ -4,11 +4,65 @@
Change history
================
+.. _version-3.0.29:
+
+3.0.29
+======
+:release-date: 2015-10-26 11:10 A.M PDT
+:release-by: Ask Solem
+
+- Fixed serialization issue for ``bindings.as_dict()`` (Issue #453).
+
+ Fix contributed by Sergey Tikhonov.
+
+- Json serializer wrongly treated bytes as ``ascii``, not ``utf-8``
+ (Issue #532).
+
+- MongoDB: Now supports pymongo 3.x.
+
+ Contributed by Len Buckens.
+
+- SQS: Tests passing on Python 3.
+
+ Fix contributed by Felix Yan
+
+.. _version-3.0.28:
+
+3.0.28
+======
+:release-date: 2015-10-12 12:00 P.M PDT
+:release-by: Ask Solem
+
+:.. admonition:: Django transport migrations.
+
+ If you're using Django 1.8 and have already created the
+ kombu_transport_django tables, you have to run a fake initial migration:
+
+ .. code-block:: pycon
+
+ python manage.py migrate kombu_transport_django --fake-initial
+
+- No longer compatible with South by default.
+
+ To keep using kombu.transport.django with South migrations
+ you now need to configure a new location for the kombu migrations:
+
+ .. code-block:: python
+
+ SOUTH_MIGRATION_MODULES = {
+ 'kombu_transport_django':
+ 'kombu.transport.django.south_migrations',
+ }
+
+- Keep old South migrations in ``kombu.south.migrations``.
+
+- Now works with Redis < 2.10 again.
+
.. _version-3.0.27:
3.0.27
======
-:release-date: 2015-10-09 3:10 PM PDT
+:release-date: 2015-10-09 3:10 P.M PDT
:release-by: Ask Solem
- Now depends on :mod:`amqp` 1.4.7.
diff --git a/PKG-INFO b/PKG-INFO
index bb1f69b..4371b75 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,6 +1,6 @@
Metadata-Version: 1.1
Name: kombu
-Version: 3.0.27
+Version: 3.0.29
Summary: Messaging library for Python
Home-page: http://kombu.readthedocs.org
Author: Ask Solem
@@ -12,7 +12,7 @@ Description: .. _kombu-index:
kombu - Messaging library for Python
========================================
- :Version: 3.0.27
+ :Version: 3.0.29
`Kombu` is a messaging library for Python.
diff --git a/README.rst b/README.rst
index acab4b6..7474d2a 100644
--- a/README.rst
+++ b/README.rst
@@ -4,7 +4,7 @@
kombu - Messaging library for Python
========================================
-:Version: 3.0.27
+:Version: 3.0.29
`Kombu` is a messaging library for Python.
diff --git a/docs/changelog.rst b/docs/changelog.rst
index afa6f32..e31b733 100644
--- a/docs/changelog.rst
+++ b/docs/changelog.rst
@@ -4,11 +4,65 @@
Change history
================
+.. _version-3.0.29:
+
+3.0.29
+======
+:release-date: 2015-10-26 11:10 A.M PDT
+:release-by: Ask Solem
+
+- Fixed serialization issue for ``bindings.as_dict()`` (Issue #453).
+
+ Fix contributed by Sergey Tikhonov.
+
+- Json serializer wrongly treated bytes as ``ascii``, not ``utf-8``
+ (Issue #532).
+
+- MongoDB: Now supports pymongo 3.x.
+
+ Contributed by Len Buckens.
+
+- SQS: Tests passing on Python 3.
+
+ Fix contributed by Felix Yan
+
+.. _version-3.0.28:
+
+3.0.28
+======
+:release-date: 2015-10-12 12:00 P.M PDT
+:release-by: Ask Solem
+
+:.. admonition:: Django transport migrations.
+
+ If you're using Django 1.8 and have already created the
+ kombu_transport_django tables, you have to run a fake initial migration:
+
+ .. code-block:: pycon
+
+ python manage.py migrate kombu_transport_django --fake-initial
+
+- No longer compatible with South by default.
+
+ To keep using kombu.transport.django with South migrations
+ you now need to configure a new location for the kombu migrations:
+
+ .. code-block:: python
+
+ SOUTH_MIGRATION_MODULES = {
+ 'kombu_transport_django':
+ 'kombu.transport.django.south_migrations',
+ }
+
+- Keep old South migrations in ``kombu.south.migrations``.
+
+- Now works with Redis < 2.10 again.
+
.. _version-3.0.27:
3.0.27
======
-:release-date: 2015-10-09 3:10 PM PDT
+:release-date: 2015-10-09 3:10 P.M PDT
:release-by: Ask Solem
- Now depends on :mod:`amqp` 1.4.7.
diff --git a/docs/introduction.rst b/docs/introduction.rst
index acab4b6..7474d2a 100644
--- a/docs/introduction.rst
+++ b/docs/introduction.rst
@@ -4,7 +4,7 @@
kombu - Messaging library for Python
========================================
-:Version: 3.0.27
+:Version: 3.0.29
`Kombu` is a messaging library for Python.
diff --git a/extra/release/doc4allmods b/extra/release/doc4allmods
index f95fee2..4aedb79 100755
--- a/extra/release/doc4allmods
+++ b/extra/release/doc4allmods
@@ -6,6 +6,8 @@ SKIP_FILES="kombu.entity.rst
kombu.messaging.rst
kombu.transport.django.migrations.rst
kombu.transport.django.migrations.0001_initial.rst
+ kombu.transport.django.south_migrations.rst
+ kombu.transport.django.south_migrations.0001_initial.rst
kombu.transport.django.management.rst
kombu.transport.django.management.commands.rst"
diff --git a/kombu.egg-info/PKG-INFO b/kombu.egg-info/PKG-INFO
index bb1f69b..4371b75 100644
--- a/kombu.egg-info/PKG-INFO
+++ b/kombu.egg-info/PKG-INFO
@@ -1,6 +1,6 @@
Metadata-Version: 1.1
Name: kombu
-Version: 3.0.27
+Version: 3.0.29
Summary: Messaging library for Python
Home-page: http://kombu.readthedocs.org
Author: Ask Solem
@@ -12,7 +12,7 @@ Description: .. _kombu-index:
kombu - Messaging library for Python
========================================
- :Version: 3.0.27
+ :Version: 3.0.29
`Kombu` is a messaging library for Python.
diff --git a/kombu.egg-info/SOURCES.txt b/kombu.egg-info/SOURCES.txt
index 51838b3..48fcd61 100644
--- a/kombu.egg-info/SOURCES.txt
+++ b/kombu.egg-info/SOURCES.txt
@@ -229,6 +229,8 @@ kombu/transport/django/management/commands/__init__.py
kombu/transport/django/management/commands/clean_kombu_messages.py
kombu/transport/django/migrations/0001_initial.py
kombu/transport/django/migrations/__init__.py
+kombu/transport/django/south_migrations/0001_initial.py
+kombu/transport/django/south_migrations/__init__.py
kombu/transport/sqlalchemy/__init__.py
kombu/transport/sqlalchemy/models.py
kombu/transport/virtual/__init__.py
diff --git a/kombu/__init__.py b/kombu/__init__.py
index d73f6c3..c7326df 100644
--- a/kombu/__init__.py
+++ b/kombu/__init__.py
@@ -11,7 +11,7 @@ version_info_t = namedtuple(
'version_info_t', ('major', 'minor', 'micro', 'releaselevel', 'serial'),
)
-VERSION = version_info_t(3, 0, 27, '', '')
+VERSION = version_info_t(3, 0, 29, '', '')
__version__ = '{0.major}.{0.minor}.{0.micro}{0.releaselevel}'.format(VERSION)
__author__ = 'Ask Solem'
__contact__ = 'ask at celeryproject.org'
diff --git a/kombu/entity.py b/kombu/entity.py
index 9825746..066dc53 100644
--- a/kombu/entity.py
+++ b/kombu/entity.py
@@ -7,7 +7,7 @@ Exchange and Queue declarations.
"""
from __future__ import absolute_import
-from .abstract import MaybeChannelBound
+from .abstract import MaybeChannelBound, Object
from .exceptions import ContentDisallowed
from .five import string_t
from .serialization import prepare_accept_content
@@ -299,7 +299,7 @@ class Exchange(MaybeChannelBound):
return not self.auto_delete
-class binding(object):
+class binding(Object):
"""Represents a queue or exchange binding.
:keyword exchange: Exchange to bind to.
@@ -309,6 +309,13 @@ class binding(object):
"""
+ attrs = (
+ ('exchange', None),
+ ('routing_key', None),
+ ('arguments', None),
+ ('unbind_arguments', None)
+ )
+
def __init__(self, exchange=None, routing_key='',
arguments=None, unbind_arguments=None):
self.exchange = exchange
diff --git a/kombu/serialization.py b/kombu/serialization.py
index 9958139..85376e7 100644
--- a/kombu/serialization.py
+++ b/kombu/serialization.py
@@ -311,7 +311,7 @@ def register_json():
def _loads(obj):
if isinstance(obj, bytes_t):
- obj = obj.decode()
+ obj = obj.decode('utf-8')
return json_loads(obj)
registry.register('json', json_dumps, _loads,
diff --git a/kombu/tests/case.py b/kombu/tests/case.py
index 435cafd..f2caebd 100644
--- a/kombu/tests/case.py
+++ b/kombu/tests/case.py
@@ -24,6 +24,7 @@ except AttributeError:
PY3 = sys.version_info[0] == 3
+MagicMock = mock.MagicMock
patch = mock.patch
call = mock.call
@@ -50,7 +51,7 @@ class _ContextMock(Mock):
in the class, not just the instance."""
def __enter__(self):
- pass
+ return self
def __exit__(self, *exc_info):
pass
diff --git a/kombu/tests/test_entities.py b/kombu/tests/test_entities.py
index ef7591a..21deebd 100644
--- a/kombu/tests/test_entities.py
+++ b/kombu/tests/test_entities.py
@@ -4,6 +4,7 @@ import pickle
from kombu import Connection, Exchange, Producer, Queue, binding
from kombu.exceptions import NotBoundError
+from kombu.serialization import registry
from .case import Case, Mock, call
from .mocks import Transport
@@ -360,6 +361,13 @@ class test_Queue(Case):
d = q.as_dict(recurse=True)
self.assertEqual(d['exchange']['name'], self.exchange.name)
+ def test_queue_dump(self):
+ b = binding(self.exchange, 'rk')
+ q = Queue('foo', self.exchange, 'rk', bindings=[b])
+ d = q.as_dict(recurse=True)
+ self.assertEqual(d['bindings'][0]['routing_key'], 'rk')
+ registry.dumps(d)
+
def test__repr__(self):
b = Queue('foo', self.exchange, 'foo')
self.assertIn('foo', repr(b))
diff --git a/kombu/tests/transport/test_SQS.py b/kombu/tests/transport/test_SQS.py
index 69704d5..0a33fb5 100644
--- a/kombu/tests/transport/test_SQS.py
+++ b/kombu/tests/transport/test_SQS.py
@@ -164,7 +164,7 @@ class test_Channel(Case):
self.assertEqual(len(results), 1)
# Now test getting many messages
- for i in xrange(3):
+ for i in range(3):
message = 'message: {0}'.format(i)
self.producer.publish(message)
@@ -210,11 +210,11 @@ class test_Channel(Case):
self.assertEqual(message, results)
def test_puts_and_gets(self):
- for i in xrange(3):
+ for i in range(3):
message = 'message: %s' % i
self.producer.publish(message)
- for i in xrange(3):
+ for i in range(3):
self.assertEqual('message: %s' % i,
self.queue(self.channel).get().payload)
@@ -233,7 +233,7 @@ class test_Channel(Case):
self.channel.qos.prefetch_count = 5
# Now, generate all the messages
- for i in xrange(message_count):
+ for i in range(message_count):
message = 'message: %s' % i
self.producer.publish(message)
@@ -262,11 +262,11 @@ class test_Channel(Case):
self.channel.qos.prefetch_count = 5
# Now, generate all the messages
- for i in xrange(message_count):
+ for i in range(message_count):
self.producer.publish('message: %s' % i)
# Now drain all the events
- for i in xrange(message_count):
+ for i in range(message_count):
self.channel.drain_events()
# How many times was the SQSConnectionMock get_message method called?
@@ -283,11 +283,11 @@ class test_Channel(Case):
self.channel.qos.prefetch_count = None
# Now, generate all the messages
- for i in xrange(message_count):
+ for i in range(message_count):
self.producer.publish('message: %s' % i)
# Now drain all the events
- for i in xrange(message_count):
+ for i in range(message_count):
self.channel.drain_events()
# How many times was the SQSConnectionMock get_message method called?
diff --git a/kombu/tests/transport/test_redis.py b/kombu/tests/transport/test_redis.py
index b346f21..ae79d52 100644
--- a/kombu/tests/transport/test_redis.py
+++ b/kombu/tests/transport/test_redis.py
@@ -14,7 +14,7 @@ from kombu.transport import virtual
from kombu.utils import eventio # patch poll
from kombu.tests.case import (
- Case, Mock, call, module_exists, skip_if_not_module, patch,
+ Case, ContextMock, Mock, call, module_exists, skip_if_not_module, patch,
)
@@ -178,6 +178,12 @@ class Pipeline(object):
self.client = client
self.stack = []
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *exc_info):
+ pass
+
def __getattr__(self, key):
if key not in self.__dict__:
@@ -342,11 +348,11 @@ class test_Channel(Case):
with patch('kombu.transport.redis.loads') as loads:
loads.return_value = 'M', 'EX', 'RK'
client = self.channel.client = Mock(name='client')
+ client.pipeline = ContextMock()
restore = self.channel._do_restore_message = Mock(
name='_do_restore_message',
)
- pipe = Mock(name='pipe')
- client.pipeline.return_value = pipe
+ pipe = client.pipeline.return_value
pipe_hget = Mock(name='pipe.hget')
pipe.hget.return_value = pipe_hget
pipe_hget_hdel = Mock(name='pipe.hget.hdel')
@@ -371,6 +377,10 @@ class test_Channel(Case):
def test_qos_restore_visible(self):
client = self.channel.client = Mock(name='client')
+
+ def pipe(*args, **kwargs):
+ return Pipeline(client)
+ client.pipeline = pipe
client.zrevrangebyscore.return_value = [
(1, 10),
(2, 20),
@@ -1202,7 +1212,8 @@ class test_Mutex(Case):
# Won
uuid.return_value = lock_id
client.setnx.return_value = True
- pipe = client.pipeline.return_value = Mock(name='pipe')
+ client.pipeline = ContextMock()
+ pipe = client.pipeline.return_value
pipe.get.return_value = lock_id
held = False
with redis.Mutex(client, 'foo1', 100):
diff --git a/kombu/transport/django/migrations/0001_initial.py b/kombu/transport/django/migrations/0001_initial.py
index 6a6a32d..0307c28 100644
--- a/kombu/transport/django/migrations/0001_initial.py
+++ b/kombu/transport/django/migrations/0001_initial.py
@@ -1,15 +1,11 @@
# -*- coding: utf-8 -*-
-# Generated by Django 1.10.dev20151005181923 on 2015-10-05 20:53
from __future__ import absolute_import, unicode_literals
from django.db import migrations, models
-import django.db.models.deletion
class Migration(migrations.Migration):
- initial = True
-
dependencies = [
]
@@ -18,11 +14,11 @@ class Migration(migrations.Migration):
name='Message',
fields=[
('id', models.AutoField(
- auto_created=True, primary_key=True,
- serialize=False, verbose_name='ID')),
- ('visible', models.BooleanField(db_index=True, default=True)),
+ verbose_name='ID', serialize=False,
+ auto_created=True, primary_key=True)),
+ ('visible', models.BooleanField(default=True, db_index=True)),
('sent_at', models.DateTimeField(
- auto_now_add=True, db_index=True, null=True)),
+ db_index=True, auto_now_add=True, null=True)),
('payload', models.TextField(verbose_name='payload')),
],
options={
@@ -35,10 +31,10 @@ class Migration(migrations.Migration):
name='Queue',
fields=[
('id', models.AutoField(
- auto_created=True, primary_key=True,
- serialize=False, verbose_name='ID')),
+ verbose_name='ID', serialize=False,
+ auto_created=True, primary_key=True)),
('name', models.CharField(
- max_length=200, unique=True, verbose_name='name')),
+ unique=True, max_length=200, verbose_name='name')),
],
options={
'db_table': 'djkombu_queue',
@@ -50,8 +46,6 @@ class Migration(migrations.Migration):
model_name='message',
name='queue',
field=models.ForeignKey(
- on_delete=django.db.models.deletion.CASCADE,
- related_name='messages',
- to='kombu_transport_django.Queue'),
+ related_name='messages', to='kombu_transport_django.Queue'),
),
]
diff --git a/kombu/transport/django/migrations/__init__.py b/kombu/transport/django/migrations/__init__.py
index e69de29..6b92838 100644
--- a/kombu/transport/django/migrations/__init__.py
+++ b/kombu/transport/django/migrations/__init__.py
@@ -0,0 +1,16 @@
+from __future__ import absolute_import
+
+SOUTH_ERROR_MESSAGE = """
+For South support, customize the SOUTH_MIGRATION_MODULES setting
+to point to the correct migrations module:
+
+ SOUTH_MIGRATION_MODULES = {
+ 'kombu_transport_django': 'kombu.transport.django.south_migrations',
+ }
+"""
+
+try:
+ from django.db import migrations # noqa
+except ImportError:
+ from django.core.exceptions import ImproperlyConfigured
+ raise ImproperlyConfigured(SOUTH_ERROR_MESSAGE)
diff --git a/kombu/transport/django/south_migrations/0001_initial.py b/kombu/transport/django/south_migrations/0001_initial.py
new file mode 100644
index 0000000..ea1edb0
--- /dev/null
+++ b/kombu/transport/django/south_migrations/0001_initial.py
@@ -0,0 +1,57 @@
+# encoding: utf-8
+from __future__ import absolute_import
+
+# flake8: noqa
+import datetime
+from south.db import db
+from south.v2 import SchemaMigration
+from django.db import models
+
+class Migration(SchemaMigration):
+
+ def forwards(self, orm):
+
+ # Adding model 'Queue'
+ db.create_table('djkombu_queue', (
+ ('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
+ ('name', self.gf('django.db.models.fields.CharField')(unique=True, max_length=200)),
+ ))
+ db.send_create_signal('django', ['Queue'])
+
+ # Adding model 'Message'
+ db.create_table('djkombu_message', (
+ ('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
+ ('visible', self.gf('django.db.models.fields.BooleanField')(default=True, db_index=True)),
+ ('sent_at', self.gf('django.db.models.fields.DateTimeField')(auto_now_add=True, null=True, db_index=True, blank=True)),
+ ('payload', self.gf('django.db.models.fields.TextField')()),
+ ('queue', self.gf('django.db.models.fields.related.ForeignKey')(related_name='messages', to=orm['django.Queue'])),
+ ))
+ db.send_create_signal('django', ['Message'])
+
+
+ def backwards(self, orm):
+
+ # Deleting model 'Queue'
+ db.delete_table('djkombu_queue')
+
+ # Deleting model 'Message'
+ db.delete_table('djkombu_message')
+
+
+ models = {
+ 'django.message': {
+ 'Meta': {'object_name': 'Message', 'db_table': "'djkombu_message'"},
+ 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+ 'payload': ('django.db.models.fields.TextField', [], {}),
+ 'queue': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'messages'", 'to': "orm['django.Queue']"}),
+ 'sent_at': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'null': 'True', 'db_index': 'True', 'blank': 'True'}),
+ 'visible': ('django.db.models.fields.BooleanField', [], {'default': 'True', 'db_index': 'True'})
+ },
+ 'django.queue': {
+ 'Meta': {'object_name': 'Queue', 'db_table': "'djkombu_queue'"},
+ 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+ 'name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '200'})
+ }
+ }
+
+ complete_apps = ['django']
diff --git a/kombu/transport/django/migrations/__init__.py b/kombu/transport/django/south_migrations/__init__.py
similarity index 100%
copy from kombu/transport/django/migrations/__init__.py
copy to kombu/transport/django/south_migrations/__init__.py
diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py
index 2e1f9de..65b4a6d 100644
--- a/kombu/transport/mongodb.py
+++ b/kombu/transport/mongodb.py
@@ -22,6 +22,12 @@ from kombu.utils.encoding import bytes_to_str
from . import virtual
+try:
+ from pymongo.cursor import CursorType
+except ImportError:
+ class CursorType(object): # noqa
+ pass
+
DEFAULT_HOST = '127.0.0.1'
DEFAULT_PORT = 27017
@@ -172,15 +178,26 @@ class Channel(virtual.Channel):
return hostname, dbname, options
+ def _prepare_client_options(self, options):
+ if pymongo.version_tuple >= (3, ):
+ options.pop('auto_start_request', None)
+ return options
+
def _open(self, scheme='mongodb://'):
hostname, dbname, options = self._parse_uri(scheme=scheme)
- mongoconn = MongoClient(
- host=hostname, ssl=options['ssl'],
- auto_start_request=options['auto_start_request'],
- connectTimeoutMS=options['connectTimeoutMS'],
- use_greenlets=_detect_environment() != 'default',
- )
+ conf = self._prepare_client_options(options)
+ conf['host'] = hostname
+
+ env = _detect_environment()
+ if env == 'gevent':
+ from gevent import monkey
+ monkey.patch_all()
+ elif env == 'eventlet':
+ from eventlet import monkey_patch
+ monkey_patch()
+
+ mongoconn = MongoClient(**conf)
database = mongoconn[dbname]
version = mongoconn.server_info()['version']
@@ -284,11 +301,18 @@ class Channel(virtual.Channel):
)
def create_broadcast_cursor(self, exchange, routing_key, pattern, queue):
- cursor = self.get_broadcast().find(
- query={'queue': exchange},
- sort=[('$natural', 1)],
- tailable=True,
- )
+ if pymongo.version_tuple >= (3, ):
+ query = dict(filter={'queue': exchange},
+ sort=[('$natural', 1)],
+ cursor_type=CursorType.TAILABLE
+ )
+ else:
+ query = dict(query={'queue': exchange},
+ sort=[('$natural', 1)],
+ tailable=True
+ )
+
+ cursor = self.get_broadcast().find(**query)
ret = self._broadcast_cursors[queue] = BroadcastCursor(cursor)
return ret
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py
index 260ce83..5a3ad30 100644
--- a/kombu/transport/redis.py
+++ b/kombu/transport/redis.py
@@ -89,7 +89,7 @@ def get_redis_error_classes():
OSError,
exceptions.ConnectionError,
exceptions.AuthenticationError,
- getattr(exceptions, 'TimeoutError', None))))),
+ getattr(exceptions, 'south_south_TimeoutError', None))))),
(virtual.Transport.channel_errors + (
DataError,
exceptions.InvalidResponse,
@@ -115,14 +115,14 @@ def Mutex(client, name, expire):
raise MutexHeld()
finally:
if i_won:
- pipe = client.pipeline(True)
try:
- pipe.watch(name)
- if pipe.get(name) == lock_id:
- pipe.multi()
- pipe.delete(name)
- pipe.execute()
- pipe.unwatch()
+ with client.pipeline(True) as pipe:
+ pipe.watch(name)
+ if pipe.get(name) == lock_id:
+ pipe.multi()
+ pipe.delete(name)
+ pipe.execute()
+ pipe.unwatch()
except redis.WatchError:
pass
@@ -160,11 +160,11 @@ class QoS(virtual.QoS):
self.ack(delivery_tag)
@contextmanager
- def pipe_or_acquire(self, pipe=None):
+ def pipe_or_acquire(self, pipe=None, client=None):
if pipe:
yield pipe
else:
- with self.channel.conn_or_acquire() as client:
+ with self.channel.conn_or_acquire(client) as client:
yield client.pipeline()
def _remove_from_indices(self, delivery_tag, pipe=None):
@@ -191,8 +191,9 @@ class QoS(virtual.QoS):
def restore_by_tag(self, tag, client=None, leftmost=False):
with self.channel.conn_or_acquire(client) as client:
- p, _, _ = self._remove_from_indices(
- tag, client.pipeline().hget(self.unacked_key, tag)).execute()
+ with client.pipeline() as pipe:
+ p, _, _ = self._remove_from_indices(
+ tag, pipe.hget(self.unacked_key, tag)).execute()
if p:
M, EX, RK = loads(bytes_to_str(p)) # json is unicode
self.channel._do_restore_message(M, EX, RK, client, leftmost)
@@ -484,10 +485,10 @@ class Channel(virtual.Channel):
return super(Channel, self)._restore(message)
tag = message.delivery_tag
with self.conn_or_acquire() as client:
- P, _ = client.pipeline() \
- .hget(self.unacked_key, tag) \
- .hdel(self.unacked_key, tag) \
- .execute()
+ with client.pipeline() as pipe:
+ P, _ = pipe.hget(self.unacked_key, tag) \
+ .hdel(self.unacked_key, tag) \
+ .execute()
if P:
M, EX, RK = loads(bytes_to_str(P)) # json is unicode
self._do_restore_message(M, EX, RK, client, leftmost)
@@ -651,12 +652,12 @@ class Channel(virtual.Channel):
def _size(self, queue):
with self.conn_or_acquire() as client:
- cmds = client.pipeline()
- for pri in PRIORITY_STEPS:
- cmds = cmds.llen(self._q_for_pri(queue, pri))
- sizes = cmds.execute()
- return sum(size for size in sizes
- if isinstance(size, numbers.Integral))
+ with client.pipeline() as pipe:
+ for pri in PRIORITY_STEPS:
+ pipe = pipe.llen(self._q_for_pri(queue, pri))
+ sizes = pipe.execute()
+ return sum(size for size in sizes
+ if isinstance(size, numbers.Integral))
def _q_for_pri(self, queue, pri):
pri = self.priority(pri)
@@ -707,17 +708,17 @@ class Channel(virtual.Channel):
self.sep.join([routing_key or '',
pattern or '',
queue or '']))
- cmds = client.pipeline()
- for pri in PRIORITY_STEPS:
- cmds = cmds.delete(self._q_for_pri(queue, pri))
- cmds.execute()
+ with client.pipeline() as pipe:
+ for pri in PRIORITY_STEPS:
+ pipe = pipe.delete(self._q_for_pri(queue, pri))
+ pipe.execute()
def _has_queue(self, queue, **kwargs):
with self.conn_or_acquire() as client:
- cmds = client.pipeline()
- for pri in PRIORITY_STEPS:
- cmds = cmds.exists(self._q_for_pri(queue, pri))
- return any(cmds.execute())
+ with client.pipeline() as pipe:
+ for pri in PRIORITY_STEPS:
+ pipe = pipe.exists(self._q_for_pri(queue, pri))
+ return any(pipe.execute())
def get_table(self, exchange):
key = self.keyprefix_queue % exchange
@@ -729,12 +730,12 @@ class Channel(virtual.Channel):
def _purge(self, queue):
with self.conn_or_acquire() as client:
- cmds = client.pipeline()
- for pri in PRIORITY_STEPS:
- priq = self._q_for_pri(queue, pri)
- cmds = cmds.llen(priq).delete(priq)
- sizes = cmds.execute()
- return sum(sizes[::2])
+ with client.pipeline() as pipe:
+ for pri in PRIORITY_STEPS:
+ priq = self._q_for_pri(queue, pri)
+ pipe = pipe.llen(priq).delete(priq)
+ sizes = pipe.execute()
+ return sum(sizes[::2])
def close(self):
if self._pool:
@@ -819,9 +820,9 @@ class Channel(virtual.Channel):
return redis.ConnectionPool(**params)
def _get_client(self):
- if redis.VERSION < (2, 10, 0):
+ if redis.VERSION < (2, 4, 4):
raise VersionMismatch(
- 'Redis transport requires redis-py versions 2.10.0 or later. '
+ 'Redis transport requires redis-py versions 2.4.4 or later. '
'You have {0.__version__}'.format(redis))
# KombuRedis maintains a connection attribute on it's instance and
diff --git a/requirements/test-ci3.txt b/requirements/test-ci3.txt
index c5617bb..3d5ce77 100644
--- a/requirements/test-ci3.txt
+++ b/requirements/test-ci3.txt
@@ -3,3 +3,4 @@ coveralls
redis
PyYAML
msgpack-python>0.2.0 # 0.2.0 dropped 2.5 support
+boto
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/kombu.git
More information about the Python-modules-commits
mailing list