[Python-modules-commits] [kombu] 01/06: Import kombu_3.0.29.orig.tar.gz

Brian May bam at moszumanska.debian.org
Wed Nov 18 23:06:28 UTC 2015


This is an automated email from the git hooks/post-receive script.

bam pushed a commit to branch master
in repository kombu.

commit d36ec12315555e6fa413edaf2d3c5428cac73dcd
Author: Brian May <bam at debian.org>
Date:   Thu Nov 19 09:22:33 2015 +1100

    Import kombu_3.0.29.orig.tar.gz
---
 AUTHORS                                            |  1 +
 Changelog                                          | 56 +++++++++++++++-
 PKG-INFO                                           |  4 +-
 README.rst                                         |  2 +-
 docs/changelog.rst                                 | 56 +++++++++++++++-
 docs/introduction.rst                              |  2 +-
 extra/release/doc4allmods                          |  2 +
 kombu.egg-info/PKG-INFO                            |  4 +-
 kombu.egg-info/SOURCES.txt                         |  2 +
 kombu/__init__.py                                  |  2 +-
 kombu/entity.py                                    | 11 +++-
 kombu/serialization.py                             |  2 +-
 kombu/tests/case.py                                |  3 +-
 kombu/tests/test_entities.py                       |  8 +++
 kombu/tests/transport/test_SQS.py                  | 16 ++---
 kombu/tests/transport/test_redis.py                | 19 ++++--
 kombu/transport/django/migrations/0001_initial.py  | 22 +++----
 kombu/transport/django/migrations/__init__.py      | 16 +++++
 .../django/south_migrations/0001_initial.py        | 57 ++++++++++++++++
 .../{migrations => south_migrations}/__init__.py   |  0
 kombu/transport/mongodb.py                         | 46 +++++++++----
 kombu/transport/redis.py                           | 77 +++++++++++-----------
 requirements/test-ci3.txt                          |  1 +
 23 files changed, 321 insertions(+), 88 deletions(-)

diff --git a/AUTHORS b/AUTHORS
index 81b70f1..4b3b471 100644
--- a/AUTHORS
+++ b/AUTHORS
@@ -42,6 +42,7 @@ Eric Reynolds <ereynolds at opendns.com>
 Eric Wang <eric1990 at gmail.com>
 Fabrice Rabaute <fabrice at expa.com>
 Felix Schwarz <felix.schwarz at oss.schwarz.eu>
+Felix Yan <felixonmars at archlinux.org>
 Fernando Jorge Mota <f.j.mota13 at gmail.com>
 Flavio [FlaPer87] Percoco Premoli <flaper87 at flaper87.org>
 Florian Munz <surf at theflow.de>
diff --git a/Changelog b/Changelog
index afa6f32..e31b733 100644
--- a/Changelog
+++ b/Changelog
@@ -4,11 +4,65 @@
  Change history
 ================
 
+.. _version-3.0.29:
+
+3.0.29
+======
+:release-date: 2015-10-26 11:10 A.M PDT
+:release-by: Ask Solem
+
+- Fixed serialization issue for ``bindings.as_dict()`` (Issue #453).
+
+    Fix contributed by Sergey Tikhonov.
+
+- Json serializer wrongly treated bytes as ``ascii``, not ``utf-8``
+  (Issue #532).
+
+- MongoDB: Now supports pymongo 3.x.
+
+    Contributed by Len Buckens.
+
+- SQS: Tests passing on Python 3.
+
+    Fix contributed by Felix Yan
+
+.. _version-3.0.28:
+
+3.0.28
+======
+:release-date: 2015-10-12 12:00 P.M PDT
+:release-by: Ask Solem
+
+:.. admonition:: Django transport migrations.
+
+    If you're using Django 1.8 and have already created the
+    kombu_transport_django tables, you have to run a fake initial migration:
+
+    .. code-block:: pycon
+
+        python manage.py migrate kombu_transport_django --fake-initial
+
+- No longer compatible with South by default.
+
+    To keep using kombu.transport.django with South migrations
+    you now need to configure a new location for the kombu migrations:
+
+    .. code-block:: python
+
+        SOUTH_MIGRATION_MODULES = {
+            'kombu_transport_django':
+                'kombu.transport.django.south_migrations',
+        }
+
+- Keep old South migrations in ``kombu.south.migrations``.
+
+- Now works with Redis < 2.10 again.
+
 .. _version-3.0.27:
 
 3.0.27
 ======
-:release-date: 2015-10-09 3:10 PM PDT
+:release-date: 2015-10-09 3:10 P.M PDT
 :release-by: Ask Solem
 
 - Now depends on :mod:`amqp` 1.4.7.
diff --git a/PKG-INFO b/PKG-INFO
index bb1f69b..4371b75 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,6 +1,6 @@
 Metadata-Version: 1.1
 Name: kombu
-Version: 3.0.27
+Version: 3.0.29
 Summary: Messaging library for Python
 Home-page: http://kombu.readthedocs.org
 Author: Ask Solem
@@ -12,7 +12,7 @@ Description: .. _kombu-index:
          kombu - Messaging library for Python
         ========================================
         
-        :Version: 3.0.27
+        :Version: 3.0.29
         
         `Kombu` is a messaging library for Python.
         
diff --git a/README.rst b/README.rst
index acab4b6..7474d2a 100644
--- a/README.rst
+++ b/README.rst
@@ -4,7 +4,7 @@
  kombu - Messaging library for Python
 ========================================
 
-:Version: 3.0.27
+:Version: 3.0.29
 
 `Kombu` is a messaging library for Python.
 
diff --git a/docs/changelog.rst b/docs/changelog.rst
index afa6f32..e31b733 100644
--- a/docs/changelog.rst
+++ b/docs/changelog.rst
@@ -4,11 +4,65 @@
  Change history
 ================
 
+.. _version-3.0.29:
+
+3.0.29
+======
+:release-date: 2015-10-26 11:10 A.M PDT
+:release-by: Ask Solem
+
+- Fixed serialization issue for ``bindings.as_dict()`` (Issue #453).
+
+    Fix contributed by Sergey Tikhonov.
+
+- Json serializer wrongly treated bytes as ``ascii``, not ``utf-8``
+  (Issue #532).
+
+- MongoDB: Now supports pymongo 3.x.
+
+    Contributed by Len Buckens.
+
+- SQS: Tests passing on Python 3.
+
+    Fix contributed by Felix Yan
+
+.. _version-3.0.28:
+
+3.0.28
+======
+:release-date: 2015-10-12 12:00 P.M PDT
+:release-by: Ask Solem
+
+:.. admonition:: Django transport migrations.
+
+    If you're using Django 1.8 and have already created the
+    kombu_transport_django tables, you have to run a fake initial migration:
+
+    .. code-block:: pycon
+
+        python manage.py migrate kombu_transport_django --fake-initial
+
+- No longer compatible with South by default.
+
+    To keep using kombu.transport.django with South migrations
+    you now need to configure a new location for the kombu migrations:
+
+    .. code-block:: python
+
+        SOUTH_MIGRATION_MODULES = {
+            'kombu_transport_django':
+                'kombu.transport.django.south_migrations',
+        }
+
+- Keep old South migrations in ``kombu.south.migrations``.
+
+- Now works with Redis < 2.10 again.
+
 .. _version-3.0.27:
 
 3.0.27
 ======
-:release-date: 2015-10-09 3:10 PM PDT
+:release-date: 2015-10-09 3:10 P.M PDT
 :release-by: Ask Solem
 
 - Now depends on :mod:`amqp` 1.4.7.
diff --git a/docs/introduction.rst b/docs/introduction.rst
index acab4b6..7474d2a 100644
--- a/docs/introduction.rst
+++ b/docs/introduction.rst
@@ -4,7 +4,7 @@
  kombu - Messaging library for Python
 ========================================
 
-:Version: 3.0.27
+:Version: 3.0.29
 
 `Kombu` is a messaging library for Python.
 
diff --git a/extra/release/doc4allmods b/extra/release/doc4allmods
index f95fee2..4aedb79 100755
--- a/extra/release/doc4allmods
+++ b/extra/release/doc4allmods
@@ -6,6 +6,8 @@ SKIP_FILES="kombu.entity.rst
             kombu.messaging.rst
             kombu.transport.django.migrations.rst
             kombu.transport.django.migrations.0001_initial.rst
+            kombu.transport.django.south_migrations.rst
+            kombu.transport.django.south_migrations.0001_initial.rst
             kombu.transport.django.management.rst
             kombu.transport.django.management.commands.rst"
 
diff --git a/kombu.egg-info/PKG-INFO b/kombu.egg-info/PKG-INFO
index bb1f69b..4371b75 100644
--- a/kombu.egg-info/PKG-INFO
+++ b/kombu.egg-info/PKG-INFO
@@ -1,6 +1,6 @@
 Metadata-Version: 1.1
 Name: kombu
-Version: 3.0.27
+Version: 3.0.29
 Summary: Messaging library for Python
 Home-page: http://kombu.readthedocs.org
 Author: Ask Solem
@@ -12,7 +12,7 @@ Description: .. _kombu-index:
          kombu - Messaging library for Python
         ========================================
         
-        :Version: 3.0.27
+        :Version: 3.0.29
         
         `Kombu` is a messaging library for Python.
         
diff --git a/kombu.egg-info/SOURCES.txt b/kombu.egg-info/SOURCES.txt
index 51838b3..48fcd61 100644
--- a/kombu.egg-info/SOURCES.txt
+++ b/kombu.egg-info/SOURCES.txt
@@ -229,6 +229,8 @@ kombu/transport/django/management/commands/__init__.py
 kombu/transport/django/management/commands/clean_kombu_messages.py
 kombu/transport/django/migrations/0001_initial.py
 kombu/transport/django/migrations/__init__.py
+kombu/transport/django/south_migrations/0001_initial.py
+kombu/transport/django/south_migrations/__init__.py
 kombu/transport/sqlalchemy/__init__.py
 kombu/transport/sqlalchemy/models.py
 kombu/transport/virtual/__init__.py
diff --git a/kombu/__init__.py b/kombu/__init__.py
index d73f6c3..c7326df 100644
--- a/kombu/__init__.py
+++ b/kombu/__init__.py
@@ -11,7 +11,7 @@ version_info_t = namedtuple(
     'version_info_t', ('major', 'minor', 'micro', 'releaselevel', 'serial'),
 )
 
-VERSION = version_info_t(3, 0, 27, '', '')
+VERSION = version_info_t(3, 0, 29, '', '')
 __version__ = '{0.major}.{0.minor}.{0.micro}{0.releaselevel}'.format(VERSION)
 __author__ = 'Ask Solem'
 __contact__ = 'ask at celeryproject.org'
diff --git a/kombu/entity.py b/kombu/entity.py
index 9825746..066dc53 100644
--- a/kombu/entity.py
+++ b/kombu/entity.py
@@ -7,7 +7,7 @@ Exchange and Queue declarations.
 """
 from __future__ import absolute_import
 
-from .abstract import MaybeChannelBound
+from .abstract import MaybeChannelBound, Object
 from .exceptions import ContentDisallowed
 from .five import string_t
 from .serialization import prepare_accept_content
@@ -299,7 +299,7 @@ class Exchange(MaybeChannelBound):
         return not self.auto_delete
 
 
-class binding(object):
+class binding(Object):
     """Represents a queue or exchange binding.
 
     :keyword exchange: Exchange to bind to.
@@ -309,6 +309,13 @@ class binding(object):
 
     """
 
+    attrs = (
+        ('exchange', None),
+        ('routing_key', None),
+        ('arguments', None),
+        ('unbind_arguments', None)
+    )
+
     def __init__(self, exchange=None, routing_key='',
                  arguments=None, unbind_arguments=None):
         self.exchange = exchange
diff --git a/kombu/serialization.py b/kombu/serialization.py
index 9958139..85376e7 100644
--- a/kombu/serialization.py
+++ b/kombu/serialization.py
@@ -311,7 +311,7 @@ def register_json():
 
     def _loads(obj):
         if isinstance(obj, bytes_t):
-            obj = obj.decode()
+            obj = obj.decode('utf-8')
         return json_loads(obj)
 
     registry.register('json', json_dumps, _loads,
diff --git a/kombu/tests/case.py b/kombu/tests/case.py
index 435cafd..f2caebd 100644
--- a/kombu/tests/case.py
+++ b/kombu/tests/case.py
@@ -24,6 +24,7 @@ except AttributeError:
 
 PY3 = sys.version_info[0] == 3
 
+MagicMock = mock.MagicMock
 patch = mock.patch
 call = mock.call
 
@@ -50,7 +51,7 @@ class _ContextMock(Mock):
     in the class, not just the instance."""
 
     def __enter__(self):
-        pass
+        return self
 
     def __exit__(self, *exc_info):
         pass
diff --git a/kombu/tests/test_entities.py b/kombu/tests/test_entities.py
index ef7591a..21deebd 100644
--- a/kombu/tests/test_entities.py
+++ b/kombu/tests/test_entities.py
@@ -4,6 +4,7 @@ import pickle
 
 from kombu import Connection, Exchange, Producer, Queue, binding
 from kombu.exceptions import NotBoundError
+from kombu.serialization import registry
 
 from .case import Case, Mock, call
 from .mocks import Transport
@@ -360,6 +361,13 @@ class test_Queue(Case):
         d = q.as_dict(recurse=True)
         self.assertEqual(d['exchange']['name'], self.exchange.name)
 
+    def test_queue_dump(self):
+        b = binding(self.exchange, 'rk')
+        q = Queue('foo', self.exchange, 'rk', bindings=[b])
+        d = q.as_dict(recurse=True)
+        self.assertEqual(d['bindings'][0]['routing_key'], 'rk')
+        registry.dumps(d)
+
     def test__repr__(self):
         b = Queue('foo', self.exchange, 'foo')
         self.assertIn('foo', repr(b))
diff --git a/kombu/tests/transport/test_SQS.py b/kombu/tests/transport/test_SQS.py
index 69704d5..0a33fb5 100644
--- a/kombu/tests/transport/test_SQS.py
+++ b/kombu/tests/transport/test_SQS.py
@@ -164,7 +164,7 @@ class test_Channel(Case):
         self.assertEqual(len(results), 1)
 
         # Now test getting many messages
-        for i in xrange(3):
+        for i in range(3):
             message = 'message: {0}'.format(i)
             self.producer.publish(message)
 
@@ -210,11 +210,11 @@ class test_Channel(Case):
         self.assertEqual(message, results)
 
     def test_puts_and_gets(self):
-        for i in xrange(3):
+        for i in range(3):
             message = 'message: %s' % i
             self.producer.publish(message)
 
-        for i in xrange(3):
+        for i in range(3):
             self.assertEqual('message: %s' % i,
                              self.queue(self.channel).get().payload)
 
@@ -233,7 +233,7 @@ class test_Channel(Case):
         self.channel.qos.prefetch_count = 5
 
         # Now, generate all the messages
-        for i in xrange(message_count):
+        for i in range(message_count):
             message = 'message: %s' % i
             self.producer.publish(message)
 
@@ -262,11 +262,11 @@ class test_Channel(Case):
         self.channel.qos.prefetch_count = 5
 
         # Now, generate all the messages
-        for i in xrange(message_count):
+        for i in range(message_count):
             self.producer.publish('message: %s' % i)
 
         # Now drain all the events
-        for i in xrange(message_count):
+        for i in range(message_count):
             self.channel.drain_events()
 
         # How many times was the SQSConnectionMock get_message method called?
@@ -283,11 +283,11 @@ class test_Channel(Case):
         self.channel.qos.prefetch_count = None
 
         # Now, generate all the messages
-        for i in xrange(message_count):
+        for i in range(message_count):
             self.producer.publish('message: %s' % i)
 
         # Now drain all the events
-        for i in xrange(message_count):
+        for i in range(message_count):
             self.channel.drain_events()
 
         # How many times was the SQSConnectionMock get_message method called?
diff --git a/kombu/tests/transport/test_redis.py b/kombu/tests/transport/test_redis.py
index b346f21..ae79d52 100644
--- a/kombu/tests/transport/test_redis.py
+++ b/kombu/tests/transport/test_redis.py
@@ -14,7 +14,7 @@ from kombu.transport import virtual
 from kombu.utils import eventio  # patch poll
 
 from kombu.tests.case import (
-    Case, Mock, call, module_exists, skip_if_not_module, patch,
+    Case, ContextMock, Mock, call, module_exists, skip_if_not_module, patch,
 )
 
 
@@ -178,6 +178,12 @@ class Pipeline(object):
         self.client = client
         self.stack = []
 
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *exc_info):
+        pass
+
     def __getattr__(self, key):
         if key not in self.__dict__:
 
@@ -342,11 +348,11 @@ class test_Channel(Case):
         with patch('kombu.transport.redis.loads') as loads:
             loads.return_value = 'M', 'EX', 'RK'
             client = self.channel.client = Mock(name='client')
+            client.pipeline = ContextMock()
             restore = self.channel._do_restore_message = Mock(
                 name='_do_restore_message',
             )
-            pipe = Mock(name='pipe')
-            client.pipeline.return_value = pipe
+            pipe = client.pipeline.return_value
             pipe_hget = Mock(name='pipe.hget')
             pipe.hget.return_value = pipe_hget
             pipe_hget_hdel = Mock(name='pipe.hget.hdel')
@@ -371,6 +377,10 @@ class test_Channel(Case):
 
     def test_qos_restore_visible(self):
         client = self.channel.client = Mock(name='client')
+
+        def pipe(*args, **kwargs):
+            return Pipeline(client)
+        client.pipeline = pipe
         client.zrevrangebyscore.return_value = [
             (1, 10),
             (2, 20),
@@ -1202,7 +1212,8 @@ class test_Mutex(Case):
             # Won
             uuid.return_value = lock_id
             client.setnx.return_value = True
-            pipe = client.pipeline.return_value = Mock(name='pipe')
+            client.pipeline = ContextMock()
+            pipe = client.pipeline.return_value
             pipe.get.return_value = lock_id
             held = False
             with redis.Mutex(client, 'foo1', 100):
diff --git a/kombu/transport/django/migrations/0001_initial.py b/kombu/transport/django/migrations/0001_initial.py
index 6a6a32d..0307c28 100644
--- a/kombu/transport/django/migrations/0001_initial.py
+++ b/kombu/transport/django/migrations/0001_initial.py
@@ -1,15 +1,11 @@
 # -*- coding: utf-8 -*-
-# Generated by Django 1.10.dev20151005181923 on 2015-10-05 20:53
 from __future__ import absolute_import, unicode_literals
 
 from django.db import migrations, models
-import django.db.models.deletion
 
 
 class Migration(migrations.Migration):
 
-    initial = True
-
     dependencies = [
     ]
 
@@ -18,11 +14,11 @@ class Migration(migrations.Migration):
             name='Message',
             fields=[
                 ('id', models.AutoField(
-                    auto_created=True, primary_key=True,
-                    serialize=False, verbose_name='ID')),
-                ('visible', models.BooleanField(db_index=True, default=True)),
+                    verbose_name='ID', serialize=False,
+                    auto_created=True, primary_key=True)),
+                ('visible', models.BooleanField(default=True, db_index=True)),
                 ('sent_at', models.DateTimeField(
-                    auto_now_add=True, db_index=True, null=True)),
+                    db_index=True, auto_now_add=True, null=True)),
                 ('payload', models.TextField(verbose_name='payload')),
             ],
             options={
@@ -35,10 +31,10 @@ class Migration(migrations.Migration):
             name='Queue',
             fields=[
                 ('id', models.AutoField(
-                    auto_created=True, primary_key=True,
-                    serialize=False, verbose_name='ID')),
+                    verbose_name='ID', serialize=False,
+                    auto_created=True, primary_key=True)),
                 ('name', models.CharField(
-                    max_length=200, unique=True, verbose_name='name')),
+                    unique=True, max_length=200, verbose_name='name')),
             ],
             options={
                 'db_table': 'djkombu_queue',
@@ -50,8 +46,6 @@ class Migration(migrations.Migration):
             model_name='message',
             name='queue',
             field=models.ForeignKey(
-                on_delete=django.db.models.deletion.CASCADE,
-                related_name='messages',
-                to='kombu_transport_django.Queue'),
+                related_name='messages', to='kombu_transport_django.Queue'),
         ),
     ]
diff --git a/kombu/transport/django/migrations/__init__.py b/kombu/transport/django/migrations/__init__.py
index e69de29..6b92838 100644
--- a/kombu/transport/django/migrations/__init__.py
+++ b/kombu/transport/django/migrations/__init__.py
@@ -0,0 +1,16 @@
+from __future__ import absolute_import
+
+SOUTH_ERROR_MESSAGE = """
+For South support, customize the SOUTH_MIGRATION_MODULES setting
+to point to the correct migrations module:
+
+    SOUTH_MIGRATION_MODULES = {
+        'kombu_transport_django': 'kombu.transport.django.south_migrations',
+    }
+"""
+
+try:
+    from django.db import migrations  # noqa
+except ImportError:
+    from django.core.exceptions import ImproperlyConfigured
+    raise ImproperlyConfigured(SOUTH_ERROR_MESSAGE)
diff --git a/kombu/transport/django/south_migrations/0001_initial.py b/kombu/transport/django/south_migrations/0001_initial.py
new file mode 100644
index 0000000..ea1edb0
--- /dev/null
+++ b/kombu/transport/django/south_migrations/0001_initial.py
@@ -0,0 +1,57 @@
+# encoding: utf-8
+from __future__ import absolute_import
+
+# flake8: noqa
+import datetime
+from south.db import db
+from south.v2 import SchemaMigration
+from django.db import models
+
+class Migration(SchemaMigration):
+
+    def forwards(self, orm):
+
+        # Adding model 'Queue'
+        db.create_table('djkombu_queue', (
+            ('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
+            ('name', self.gf('django.db.models.fields.CharField')(unique=True, max_length=200)),
+        ))
+        db.send_create_signal('django', ['Queue'])
+
+        # Adding model 'Message'
+        db.create_table('djkombu_message', (
+            ('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
+            ('visible', self.gf('django.db.models.fields.BooleanField')(default=True, db_index=True)),
+            ('sent_at', self.gf('django.db.models.fields.DateTimeField')(auto_now_add=True, null=True, db_index=True, blank=True)),
+            ('payload', self.gf('django.db.models.fields.TextField')()),
+            ('queue', self.gf('django.db.models.fields.related.ForeignKey')(related_name='messages', to=orm['django.Queue'])),
+        ))
+        db.send_create_signal('django', ['Message'])
+
+
+    def backwards(self, orm):
+
+        # Deleting model 'Queue'
+        db.delete_table('djkombu_queue')
+
+        # Deleting model 'Message'
+        db.delete_table('djkombu_message')
+
+
+    models = {
+        'django.message': {
+            'Meta': {'object_name': 'Message', 'db_table': "'djkombu_message'"},
+            'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+            'payload': ('django.db.models.fields.TextField', [], {}),
+            'queue': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'messages'", 'to': "orm['django.Queue']"}),
+            'sent_at': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'null': 'True', 'db_index': 'True', 'blank': 'True'}),
+            'visible': ('django.db.models.fields.BooleanField', [], {'default': 'True', 'db_index': 'True'})
+        },
+        'django.queue': {
+            'Meta': {'object_name': 'Queue', 'db_table': "'djkombu_queue'"},
+            'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+            'name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '200'})
+        }
+    }
+
+    complete_apps = ['django']
diff --git a/kombu/transport/django/migrations/__init__.py b/kombu/transport/django/south_migrations/__init__.py
similarity index 100%
copy from kombu/transport/django/migrations/__init__.py
copy to kombu/transport/django/south_migrations/__init__.py
diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py
index 2e1f9de..65b4a6d 100644
--- a/kombu/transport/mongodb.py
+++ b/kombu/transport/mongodb.py
@@ -22,6 +22,12 @@ from kombu.utils.encoding import bytes_to_str
 
 from . import virtual
 
+try:
+    from pymongo.cursor import CursorType
+except ImportError:
+    class CursorType(object):  # noqa
+        pass
+
 DEFAULT_HOST = '127.0.0.1'
 DEFAULT_PORT = 27017
 
@@ -172,15 +178,26 @@ class Channel(virtual.Channel):
 
         return hostname, dbname, options
 
+    def _prepare_client_options(self, options):
+        if pymongo.version_tuple >= (3, ):
+            options.pop('auto_start_request', None)
+        return options
+
     def _open(self, scheme='mongodb://'):
         hostname, dbname, options = self._parse_uri(scheme=scheme)
 
-        mongoconn = MongoClient(
-            host=hostname, ssl=options['ssl'],
-            auto_start_request=options['auto_start_request'],
-            connectTimeoutMS=options['connectTimeoutMS'],
-            use_greenlets=_detect_environment() != 'default',
-        )
+        conf = self._prepare_client_options(options)
+        conf['host'] = hostname
+
+        env = _detect_environment()
+        if env == 'gevent':
+            from gevent import monkey
+            monkey.patch_all()
+        elif env == 'eventlet':
+            from eventlet import monkey_patch
+            monkey_patch()
+
+        mongoconn = MongoClient(**conf)
         database = mongoconn[dbname]
 
         version = mongoconn.server_info()['version']
@@ -284,11 +301,18 @@ class Channel(virtual.Channel):
             )
 
     def create_broadcast_cursor(self, exchange, routing_key, pattern, queue):
-        cursor = self.get_broadcast().find(
-            query={'queue': exchange},
-            sort=[('$natural', 1)],
-            tailable=True,
-        )
+        if pymongo.version_tuple >= (3, ):
+            query = dict(filter={'queue': exchange},
+                         sort=[('$natural', 1)],
+                         cursor_type=CursorType.TAILABLE
+                         )
+        else:
+            query = dict(query={'queue': exchange},
+                         sort=[('$natural', 1)],
+                         tailable=True
+                         )
+
+        cursor = self.get_broadcast().find(**query)
         ret = self._broadcast_cursors[queue] = BroadcastCursor(cursor)
         return ret
 
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py
index 260ce83..5a3ad30 100644
--- a/kombu/transport/redis.py
+++ b/kombu/transport/redis.py
@@ -89,7 +89,7 @@ def get_redis_error_classes():
             OSError,
             exceptions.ConnectionError,
             exceptions.AuthenticationError,
-            getattr(exceptions, 'TimeoutError', None))))),
+            getattr(exceptions, 'south_south_TimeoutError', None))))),
         (virtual.Transport.channel_errors + (
             DataError,
             exceptions.InvalidResponse,
@@ -115,14 +115,14 @@ def Mutex(client, name, expire):
             raise MutexHeld()
     finally:
         if i_won:
-            pipe = client.pipeline(True)
             try:
-                pipe.watch(name)
-                if pipe.get(name) == lock_id:
-                    pipe.multi()
-                    pipe.delete(name)
-                    pipe.execute()
-                pipe.unwatch()
+                with client.pipeline(True) as pipe:
+                    pipe.watch(name)
+                    if pipe.get(name) == lock_id:
+                        pipe.multi()
+                        pipe.delete(name)
+                        pipe.execute()
+                    pipe.unwatch()
             except redis.WatchError:
                 pass
 
@@ -160,11 +160,11 @@ class QoS(virtual.QoS):
         self.ack(delivery_tag)
 
     @contextmanager
-    def pipe_or_acquire(self, pipe=None):
+    def pipe_or_acquire(self, pipe=None, client=None):
         if pipe:
             yield pipe
         else:
-            with self.channel.conn_or_acquire() as client:
+            with self.channel.conn_or_acquire(client) as client:
                 yield client.pipeline()
 
     def _remove_from_indices(self, delivery_tag, pipe=None):
@@ -191,8 +191,9 @@ class QoS(virtual.QoS):
 
     def restore_by_tag(self, tag, client=None, leftmost=False):
         with self.channel.conn_or_acquire(client) as client:
-            p, _, _ = self._remove_from_indices(
-                tag, client.pipeline().hget(self.unacked_key, tag)).execute()
+            with client.pipeline() as pipe:
+                p, _, _ = self._remove_from_indices(
+                    tag, pipe.hget(self.unacked_key, tag)).execute()
             if p:
                 M, EX, RK = loads(bytes_to_str(p))  # json is unicode
                 self.channel._do_restore_message(M, EX, RK, client, leftmost)
@@ -484,10 +485,10 @@ class Channel(virtual.Channel):
             return super(Channel, self)._restore(message)
         tag = message.delivery_tag
         with self.conn_or_acquire() as client:
-            P, _ = client.pipeline() \
-                .hget(self.unacked_key, tag) \
-                .hdel(self.unacked_key, tag) \
-                .execute()
+            with client.pipeline() as pipe:
+                P, _ = pipe.hget(self.unacked_key, tag) \
+                           .hdel(self.unacked_key, tag) \
+                           .execute()
             if P:
                 M, EX, RK = loads(bytes_to_str(P))  # json is unicode
                 self._do_restore_message(M, EX, RK, client, leftmost)
@@ -651,12 +652,12 @@ class Channel(virtual.Channel):
 
     def _size(self, queue):
         with self.conn_or_acquire() as client:
-            cmds = client.pipeline()
-            for pri in PRIORITY_STEPS:
-                cmds = cmds.llen(self._q_for_pri(queue, pri))
-            sizes = cmds.execute()
-            return sum(size for size in sizes
-                       if isinstance(size, numbers.Integral))
+            with client.pipeline() as pipe:
+                for pri in PRIORITY_STEPS:
+                    pipe = pipe.llen(self._q_for_pri(queue, pri))
+                sizes = pipe.execute()
+                return sum(size for size in sizes
+                           if isinstance(size, numbers.Integral))
 
     def _q_for_pri(self, queue, pri):
         pri = self.priority(pri)
@@ -707,17 +708,17 @@ class Channel(virtual.Channel):
                         self.sep.join([routing_key or '',
                                        pattern or '',
                                        queue or '']))
-            cmds = client.pipeline()
-            for pri in PRIORITY_STEPS:
-                cmds = cmds.delete(self._q_for_pri(queue, pri))
-            cmds.execute()
+            with client.pipeline() as pipe:
+                for pri in PRIORITY_STEPS:
+                    pipe = pipe.delete(self._q_for_pri(queue, pri))
+                pipe.execute()
 
     def _has_queue(self, queue, **kwargs):
         with self.conn_or_acquire() as client:
-            cmds = client.pipeline()
-            for pri in PRIORITY_STEPS:
-                cmds = cmds.exists(self._q_for_pri(queue, pri))
-            return any(cmds.execute())
+            with client.pipeline() as pipe:
+                for pri in PRIORITY_STEPS:
+                    pipe = pipe.exists(self._q_for_pri(queue, pri))
+                return any(pipe.execute())
 
     def get_table(self, exchange):
         key = self.keyprefix_queue % exchange
@@ -729,12 +730,12 @@ class Channel(virtual.Channel):
 
     def _purge(self, queue):
         with self.conn_or_acquire() as client:
-            cmds = client.pipeline()
-            for pri in PRIORITY_STEPS:
-                priq = self._q_for_pri(queue, pri)
-                cmds = cmds.llen(priq).delete(priq)
-            sizes = cmds.execute()
-            return sum(sizes[::2])
+            with client.pipeline() as pipe:
+                for pri in PRIORITY_STEPS:
+                    priq = self._q_for_pri(queue, pri)
+                    pipe = pipe.llen(priq).delete(priq)
+                sizes = pipe.execute()
+                return sum(sizes[::2])
 
     def close(self):
         if self._pool:
@@ -819,9 +820,9 @@ class Channel(virtual.Channel):
         return redis.ConnectionPool(**params)
 
     def _get_client(self):
-        if redis.VERSION < (2, 10, 0):
+        if redis.VERSION < (2, 4, 4):
             raise VersionMismatch(
-                'Redis transport requires redis-py versions 2.10.0 or later. '
+                'Redis transport requires redis-py versions 2.4.4 or later. '
                 'You have {0.__version__}'.format(redis))
 
         # KombuRedis maintains a connection attribute on it's instance and
diff --git a/requirements/test-ci3.txt b/requirements/test-ci3.txt
index c5617bb..3d5ce77 100644
--- a/requirements/test-ci3.txt
+++ b/requirements/test-ci3.txt
@@ -3,3 +3,4 @@ coveralls
 redis
 PyYAML
 msgpack-python>0.2.0  # 0.2.0 dropped 2.5 support
+boto

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/kombu.git



More information about the Python-modules-commits mailing list