[Python-modules-commits] [celery] 01/11: Import celery_3.1.19.orig.tar.gz

Brian May bam at moszumanska.debian.org
Wed Nov 18 22:21:05 UTC 2015


This is an automated email from the git hooks/post-receive script.

bam pushed a commit to branch master
in repository celery.

commit ecfab0e5f18c81b19617e93999463c55955d714a
Author: Brian May <bam at debian.org>
Date:   Thu Nov 19 09:02:21 2015 +1100

    Import celery_3.1.19.orig.tar.gz
---
 CONTRIBUTORS.txt                                 |  15 ++
 Changelog                                        |  81 ++++++++++
 PKG-INFO                                         |   4 +-
 README.rst                                       |   2 +-
 celery.egg-info/PKG-INFO                         |   4 +-
 celery.egg-info/requires.txt                     |   4 +-
 celery/__init__.py                               |   9 +-
 celery/app/base.py                               |   3 +-
 celery/app/control.py                            |   2 +-
 celery/app/task.py                               |  23 +--
 celery/apps/worker.py                            |   3 +-
 celery/backends/__init__.py                      |   7 +-
 celery/backends/amqp.py                          |   3 +-
 celery/backends/base.py                          |  11 +-
 celery/backends/database/__init__.py             |   4 +-
 celery/backends/mongodb.py                       |  41 ++---
 celery/backends/redis.py                         |  48 +++---
 celery/beat.py                                   |  58 +++++---
 celery/bin/amqp.py                               |  11 ++
 celery/bin/beat.py                               |   6 +-
 celery/bin/celery.py                             |   3 +-
 celery/bin/events.py                             |   6 +-
 celery/concurrency/asynpool.py                   |   2 +-
 celery/concurrency/eventlet.py                   |   6 +-
 celery/concurrency/prefork.py                    |   3 +-
 celery/events/cursesmon.py                       |   4 +-
 celery/five.py                                   |  31 ++--
 celery/fixups/django.py                          |  10 +-
 celery/local.py                                  | 182 +++++++++++++++++------
 celery/platforms.py                              |  45 +++++-
 celery/task/trace.py                             |   2 +-
 celery/tests/app/test_app.py                     |  10 +-
 celery/tests/app/test_beat.py                    |   4 +-
 celery/tests/app/test_loaders.py                 |   3 +-
 celery/tests/backends/test_amqp.py               |  63 +++++++-
 celery/tests/backends/test_mongodb.py            |  50 ++++++-
 celery/tests/backends/test_redis.py              |   8 +-
 celery/tests/case.py                             |  19 ++-
 celery/tests/compat_modules/test_compat.py       |   8 +-
 celery/tests/events/test_state.py                |   3 +-
 celery/tests/tasks/test_chord.py                 |   4 +-
 celery/tests/utils/test_mail.py                  |   2 +-
 celery/tests/utils/test_platforms.py             |  14 +-
 celery/tests/utils/test_threads.py               |   4 +-
 celery/tests/worker/test_hub.py                  |   3 +-
 celery/tests/worker/test_loops.py                |  11 +-
 celery/utils/term.py                             |   7 +-
 celery/worker/components.py                      |   2 +-
 celery/worker/job.py                             |   7 +-
 celery/worker/loops.py                           |   6 +
 docs/_ext/applyxrefs.py                          |   2 -
 docs/changelog.rst                               |  81 ++++++++++
 docs/conf.py                                     |  20 +--
 docs/configuration.rst                           | 159 +++++++++++++++-----
 docs/getting-started/first-steps-with-celery.rst |   8 +-
 docs/includes/introduction.txt                   |   2 +-
 docs/userguide/canvas.rst                        |   6 +-
 docs/userguide/monitoring.rst                    |   2 +-
 docs/userguide/periodic-tasks.rst                |   3 +
 docs/userguide/routing.rst                       |   2 +-
 docs/userguide/security.rst                      |   5 +-
 docs/userguide/tasks.rst                         |  41 ++---
 examples/app/myapp.py                            |   5 +-
 examples/django/proj/__init__.py                 |   2 +-
 examples/django/proj/celery.py                   |   2 +-
 examples/django/proj/settings.py                 |   2 +-
 examples/django/proj/urls.py                     |   7 +-
 examples/django/proj/wsgi.py                     |   2 +-
 examples/eventlet/celeryconfig.py                |   2 +-
 examples/gevent/celeryconfig.py                  |   2 +-
 examples/next-steps/setup.py                     |   2 +-
 requirements/default.txt                         |   4 +-
 requirements/docs.txt                            |   2 +-
 requirements/jython.txt                          |   2 +-
 requirements/test-ci.txt                         |   7 +-
 setup.py                                         |  10 +-
 76 files changed, 901 insertions(+), 342 deletions(-)

diff --git a/CONTRIBUTORS.txt b/CONTRIBUTORS.txt
index 574f439..38df5c7 100644
--- a/CONTRIBUTORS.txt
+++ b/CONTRIBUTORS.txt
@@ -180,3 +180,18 @@ Bert Vanderbauwhede, 2014/12/18
 John Anderson, 2014/12/27
 Luke Burden, 2015/01/24
 Mickaël Penhard, 2015/02/15
+Mark Parncutt, 2015/02/16
+Samuel Jaillet, 2015/03/24
+Ilya Georgievsky, 2015/03/31
+Fatih Sucu, 2015/04/17
+James Pulec, 2015/04/19
+Alexander Lebedev, 2015/04/25
+Frantisek Holop, 2015/05/21
+Feanil Patel, 2015/05/21
+Jocelyn Delalande, 2015/06/03
+Justin Patrin, 2015/08/06
+Juan Rossi, 2015/08/10
+Piotr Maślanka, 2015/08/24
+Gerald Manipon, 2015/10/19
+Krzysztof Bujniewicz, 2015/10/21
+Sukrit Khera, 2015/10/26
diff --git a/Changelog b/Changelog
index a99b967..423df3f 100644
--- a/Changelog
+++ b/Changelog
@@ -8,6 +8,87 @@ This document contains change notes for bugfix releases in the 3.1.x series
 (Cipater), please see :ref:`whatsnew-3.1` for an overview of what's
 new in Celery 3.1.
 
+.. _version-3.1.19:
+
+3.1.19
+======
+:release-date: 2015-10-26 01:00 P.M UTC
+:release-by: Ask Solem
+
+- **Requirements**
+
+    - Now depends on :ref:`Kombu 3.0.29 <kombu:version-3.0.29>`.
+
+    - Now depends on :mod:`billiard` 3.3.0.21.
+
+-  **Results**: Fixed MongoDB result backend URL parsing problem
+   (Issue celery/kombu#375).
+
+- **Worker**: Task request now properly sets ``priority`` in delivery_info.
+
+    Fix contributed by Gerald Manipon.
+
+- **Beat**: PyPy shelve may raise ``KeyError`` when setting keys
+  (Issue #2862).
+
+- **Programs**: :program:`celery beat --deatched` now working on PyPy.
+
+    Fix contributed by Krzysztof Bujniewicz.
+
+- **Results**: Redis result backend now ensures all pipelines are cleaned up.
+
+    Contributed by Justin Patrin.
+
+- **Results**: Redis result backend now allows for timeout to be set in the
+  query portion of the result backend URL.
+
+    E.g. ``CELERY_RESULT_BACKEND = 'redis://?timeout=10'``
+
+    Contributed by Justin Patrin.
+
+- **Results**: ``result.get`` now properly handles failures where the
+  exception value is set to :const:`None` (Issue #2560).
+
+- **Prefork pool**: Fixed attribute error ``proc.dead``.
+
+- **Worker**: Fixed worker hanging when gossip/heartbeat disabled
+  (Issue #1847).
+
+    Fix contributed by Aaron Webber and Bryan Helmig.
+
+- **Results**: MongoDB result backend now supports pymongo 3.x
+  (Issue #2744).
+
+    Fix contributed by Sukrit Khera.
+
+- **Results**: RPC/amqp backends did not deserialize exceptions properly
+  (Issue #2691).
+
+    Fix contributed by Sukrit Khera.
+
+- **Programs**: Fixed problem with :program:`celery amqp`'s
+  ``basic_publish`` (Issue #2013).
+
+- **Worker**: Embedded beat now properly sets app for thread/process
+  (Issue #2594).
+
+- **Documentation**: Many improvements and typos fixed.
+
+    Contributions by:
+
+        Carlos Garcia-Dubus
+        D. Yu
+        jerry
+        Jocelyn Delalande
+        Josh Kupershmidt
+        Juan Rossi
+        kanemra
+        Paul Pearce
+        Pavel Savchenko
+        Sean Wang
+        Seungha Kim
+        Zhaorong Ma
+
 .. _version-3.1.18:
 
 3.1.18
diff --git a/PKG-INFO b/PKG-INFO
index 3eefa90..8ab88c7 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,6 +1,6 @@
 Metadata-Version: 1.1
 Name: celery
-Version: 3.1.18
+Version: 3.1.19
 Summary: Distributed Task Queue
 Home-page: http://celeryproject.org
 Author: Ask Solem
@@ -12,7 +12,7 @@ Description: =================================
         
         .. image:: http://cloud.github.com/downloads/celery/celery/celery_128.png
         
-        :Version: 3.1.18 (Cipater)
+        :Version: 3.1.19 (Cipater)
         :Web: http://celeryproject.org/
         :Download: http://pypi.python.org/pypi/celery/
         :Source: http://github.com/celery/celery/
diff --git a/README.rst b/README.rst
index 258b8ed..dc22840 100644
--- a/README.rst
+++ b/README.rst
@@ -4,7 +4,7 @@
 
 .. image:: http://cloud.github.com/downloads/celery/celery/celery_128.png
 
-:Version: 3.1.18 (Cipater)
+:Version: 3.1.19 (Cipater)
 :Web: http://celeryproject.org/
 :Download: http://pypi.python.org/pypi/celery/
 :Source: http://github.com/celery/celery/
diff --git a/celery.egg-info/PKG-INFO b/celery.egg-info/PKG-INFO
index 3eefa90..8ab88c7 100644
--- a/celery.egg-info/PKG-INFO
+++ b/celery.egg-info/PKG-INFO
@@ -1,6 +1,6 @@
 Metadata-Version: 1.1
 Name: celery
-Version: 3.1.18
+Version: 3.1.19
 Summary: Distributed Task Queue
 Home-page: http://celeryproject.org
 Author: Ask Solem
@@ -12,7 +12,7 @@ Description: =================================
         
         .. image:: http://cloud.github.com/downloads/celery/celery/celery_128.png
         
-        :Version: 3.1.18 (Cipater)
+        :Version: 3.1.19 (Cipater)
         :Web: http://celeryproject.org/
         :Download: http://pypi.python.org/pypi/celery/
         :Source: http://github.com/celery/celery/
diff --git a/celery.egg-info/requires.txt b/celery.egg-info/requires.txt
index da27b3c..09bf4b3 100644
--- a/celery.egg-info/requires.txt
+++ b/celery.egg-info/requires.txt
@@ -1,6 +1,6 @@
 pytz>dev
-billiard>=3.3.0.20,<3.4
-kombu>=3.0.25,<3.1
+billiard>=3.3.0.21,<3.4
+kombu>=3.0.29,<3.1
 
 [zookeeper]
 kazoo>=1.3.1
diff --git a/celery/__init__.py b/celery/__init__.py
index fd0cb0e..2366551 100644
--- a/celery/__init__.py
+++ b/celery/__init__.py
@@ -7,6 +7,9 @@
 
 from __future__ import absolute_import
 
+import os
+import sys
+
 from collections import namedtuple
 
 version_info_t = namedtuple(
@@ -14,7 +17,7 @@ version_info_t = namedtuple(
 )
 
 SERIES = 'Cipater'
-VERSION = version_info_t(3, 1, 18, '', '')
+VERSION = version_info_t(3, 1, 19, '', '')
 __version__ = '{0.major}.{0.minor}.{0.micro}{0.releaselevel}'.format(VERSION)
 __author__ = 'Ask Solem'
 __contact__ = 'ask at celeryproject.org'
@@ -30,8 +33,6 @@ VERSION_BANNER = '{0} ({1})'.format(__version__, SERIES)
 
 # -eof meta-
 
-import os
-import sys
 if os.environ.get('C_IMPDEBUG'):  # pragma: no cover
     from .five import builtins
     real_import = builtins.__import__
@@ -127,7 +128,7 @@ def maybe_patch_concurrency(argv=sys.argv,
         concurrency.get_implementation(pool)
 
 # Lazy loading
-from celery import five
+from celery import five  # noqa
 
 old_module, new_module = five.recreate_module(  # pragma: no cover
     __name__,
diff --git a/celery/app/base.py b/celery/app/base.py
index d228310..7044cfa 100644
--- a/celery/app/base.py
+++ b/celery/app/base.py
@@ -221,7 +221,8 @@ class Celery(object):
 
             def _create_task_cls(fun):
                 if shared:
-                    cons = lambda app: app._task_from_fun(fun, **opts)
+                    def cons(app):
+                        return app._task_from_fun(fun, **opts)
                     cons.__name__ = fun.__name__
                     connect_on_app_finalize(cons)
                 if self.accept_magic_kwargs:  # compat mode
diff --git a/celery/app/control.py b/celery/app/control.py
index 2845374..10baf59 100644
--- a/celery/app/control.py
+++ b/celery/app/control.py
@@ -263,7 +263,7 @@ class Control(object):
         return self.broadcast('enable_events', {}, destination, **kwargs)
 
     def disable_events(self, destination=None, **kwargs):
-        """Tell all (or specific) workers to enable events."""
+        """Tell all (or specific) workers to disable events."""
         return self.broadcast('disable_events', {}, destination, **kwargs)
 
     def pool_grow(self, n=1, destination=None, **kwargs):
diff --git a/celery/app/task.py b/celery/app/task.py
index ed88ce7..6c55ca5 100644
--- a/celery/app/task.py
+++ b/celery/app/task.py
@@ -483,11 +483,12 @@ class Task(object):
         :keyword retry: If enabled sending of the task message will be retried
                         in the event of connection loss or failure.  Default
                         is taken from the :setting:`CELERY_TASK_PUBLISH_RETRY`
-                        setting.  Note you need to handle the
+                        setting.  Note that you need to handle the
                         producer/connection manually for this to work.
 
         :keyword retry_policy:  Override the retry policy used.  See the
-                                :setting:`CELERY_TASK_PUBLISH_RETRY` setting.
+                                :setting:`CELERY_TASK_PUBLISH_RETRY_POLICY`
+                                setting.
 
         :keyword routing_key: Custom routing key used to route the task to a
                               worker server. If in combination with a
@@ -599,7 +600,12 @@ class Task(object):
         :keyword countdown: Time in seconds to delay the retry for.
         :keyword eta: Explicit time and date to run the retry at
                       (must be a :class:`~datetime.datetime` instance).
-        :keyword max_retries: If set, overrides the default retry limit.
+        :keyword max_retries: If set, overrides the default retry limit for
+            this execution. Changes to this parameter do not propagate to
+            subsequent task retry attempts. A value of :const:`None`, means
+            "use the default", so if you want infinite retries you would
+            have to set the :attr:`max_retries` attribute of the task to
+            :const:`None` first.
         :keyword time_limit: If set, overrides the default time limit.
         :keyword soft_time_limit: If set, overrides the default soft
                                   time limit.
@@ -624,14 +630,14 @@ class Task(object):
             >>> from imaginary_twitter_lib import Twitter
             >>> from proj.celery import app
 
-            >>> @app.task()
-            ... def tweet(auth, message):
+            >>> @app.task(bind=True)
+            ... def tweet(self, auth, message):
             ...     twitter = Twitter(oauth=auth)
             ...     try:
             ...         twitter.post_status_update(message)
             ...     except twitter.FailWhale as exc:
             ...         # Retry in 5 minutes.
-            ...         raise tweet.retry(countdown=60 * 5, exc=exc)
+            ...         raise self.retry(countdown=60 * 5, exc=exc)
 
         Although the task will never return above as `retry` raises an
         exception to notify the worker, we use `raise` in front of the retry
@@ -863,9 +869,8 @@ class Task(object):
         :param status: Current task state.
         :param retval: Task return value/exception.
         :param task_id: Unique id of the task.
-        :param args: Original arguments for the task that failed.
-        :param kwargs: Original keyword arguments for the task
-                       that failed.
+        :param args: Original arguments for the task.
+        :param kwargs: Original keyword arguments for the task.
 
         :keyword einfo: :class:`~billiard.einfo.ExceptionInfo`
                         instance, containing the traceback (if any).
diff --git a/celery/apps/worker.py b/celery/apps/worker.py
index d190711..4e55c4a 100644
--- a/celery/apps/worker.py
+++ b/celery/apps/worker.py
@@ -313,7 +313,8 @@ if not is_jython:  # pragma: no cover
         _shutdown_handler, sig='SIGINT', callback=on_SIGINT
     )
 else:  # pragma: no cover
-    install_worker_int_handler = lambda *a, **kw: None
+    def install_worker_int_handler(*a, **kw):
+        pass
 
 
 def _reload_current_worker():
diff --git a/celery/backends/__init__.py b/celery/backends/__init__.py
index fbe8a9c..420ba24 100644
--- a/celery/backends/__init__.py
+++ b/celery/backends/__init__.py
@@ -10,8 +10,6 @@ from __future__ import absolute_import
 
 import sys
 
-from kombu.utils.url import _parse_url
-
 from celery.local import Proxy
 from celery._state import current_app
 from celery.five import reraise
@@ -56,8 +54,9 @@ def get_backend_by_url(backend=None, loader=None):
     url = None
     if backend and '://' in backend:
         url = backend
-        if '+' in url[:url.index('://')]:
+        scheme, _, _ = url.partition('://')
+        if '+' in scheme:
             backend, url = url.split('+', 1)
         else:
-            backend, _, _, _, _, _, _ = _parse_url(url)
+            backend = scheme
     return get_backend_cls(backend, loader), url
diff --git a/celery/backends/amqp.py b/celery/backends/amqp.py
index 5587943..a4a6e84 100644
--- a/celery/backends/amqp.py
+++ b/celery/backends/amqp.py
@@ -180,7 +180,8 @@ class AMQPBackend(BaseBackend):
                 raise self.BacklogLimitExceeded(task_id)
 
             if latest:
-                payload = self._cache[task_id] = latest.payload
+                payload = self._cache[task_id] = \
+                    self.meta_from_decoded(latest.payload)
                 latest.requeue()
                 return payload
             else:
diff --git a/celery/backends/base.py b/celery/backends/base.py
index 68e8b00..f95b704 100644
--- a/celery/backends/base.py
+++ b/celery/backends/base.py
@@ -165,11 +165,12 @@ class BaseBackend(object):
 
     def exception_to_python(self, exc):
         """Convert serialized exception to Python exception."""
-        if self.serializer in EXCEPTION_ABLE_CODECS:
-            return get_pickled_exception(exc)
-        elif not isinstance(exc, BaseException):
-            return create_exception_cls(
-                from_utf8(exc['exc_type']), __name__)(exc['exc_message'])
+        if exc:
+            if self.serializer in EXCEPTION_ABLE_CODECS:
+                return get_pickled_exception(exc)
+            elif not isinstance(exc, BaseException):
+                return create_exception_cls(
+                    from_utf8(exc['exc_type']), __name__)(exc['exc_message'])
         return exc
 
     def prepare_value(self, result):
diff --git a/celery/backends/database/__init__.py b/celery/backends/database/__init__.py
index c52e758..6df9d34 100644
--- a/celery/backends/database/__init__.py
+++ b/celery/backends/database/__init__.py
@@ -37,8 +37,8 @@ def _sqlalchemy_installed():
     return sqlalchemy
 _sqlalchemy_installed()
 
-from sqlalchemy.exc import DatabaseError, InvalidRequestError
-from sqlalchemy.orm.exc import StaleDataError
+from sqlalchemy.exc import DatabaseError, InvalidRequestError  # noqa
+from sqlalchemy.orm.exc import StaleDataError  # noqa
 
 
 @contextmanager
diff --git a/celery/backends/mongodb.py b/celery/backends/mongodb.py
index ef1a97c..ba2761d 100644
--- a/celery/backends/mongodb.py
+++ b/celery/backends/mongodb.py
@@ -10,6 +10,16 @@ from __future__ import absolute_import
 
 from datetime import datetime
 
+from kombu.syn import detect_environment
+from kombu.utils import cached_property
+
+from celery import states
+from celery.exceptions import ImproperlyConfigured
+from celery.five import items, string_t
+from celery.utils.timeutils import maybe_timedelta
+
+from .base import BaseBackend
+
 try:
     import pymongo
 except ImportError:  # pragma: no cover
@@ -23,16 +33,6 @@ if pymongo:
 else:                                       # pragma: no cover
     Binary = None                           # noqa
 
-from kombu.syn import detect_environment
-from kombu.utils import cached_property
-
-from celery import states
-from celery.exceptions import ImproperlyConfigured
-from celery.five import string_t
-from celery.utils.timeutils import maybe_timedelta
-
-from .base import BaseBackend
-
 __all__ = ['MongoBackend']
 
 
@@ -92,17 +92,26 @@ class MongoBackend(BaseBackend):
             self.options = dict(config, **config.pop('options', None) or {})
 
             # Set option defaults
-            if pymongo.version_tuple >= (3, ):
-                self.options.setdefault('maxPoolSize', self.max_pool_size)
-            else:
-                self.options.setdefault('max_pool_size', self.max_pool_size)
-                self.options.setdefault('auto_start_request', False)
+            for key, value in items(self._prepare_client_options()):
+                self.options.setdefault(key, value)
 
         self.url = url
         if self.url:
             # Specifying backend as an URL
             self.host = self.url
 
+    def _prepare_client_options(self):
+            if pymongo.version_tuple >= (3, ):
+                return {'maxPoolSize': self.max_pool_size}
+            else:  # pragma: no cover
+                options = {
+                    'max_pool_size': self.max_pool_size,
+                    'auto_start_request': False
+                }
+                if detect_environment() != 'default':
+                    options['use_greenlets'] = True
+                return options
+
     def _get_connection(self):
         """Connect to the MongoDB server."""
         if self._connection is None:
@@ -120,8 +129,6 @@ class MongoBackend(BaseBackend):
                 url = 'mongodb://{0}:{1}'.format(url, self.port)
             if url == 'mongodb://':
                 url = url + 'localhost'
-            if detect_environment() != 'default':
-                self.options['use_greenlets'] = True
             self._connection = MongoClient(host=url, **self.options)
 
         return self._connection
diff --git a/celery/backends/redis.py b/celery/backends/redis.py
index 5acdf37..14885ee 100644
--- a/celery/backends/redis.py
+++ b/celery/backends/redis.py
@@ -160,13 +160,13 @@ class RedisBackend(KeyValueStoreBackend):
         return self.ensure(self._set, (key, value), **retry_policy)
 
     def _set(self, key, value):
-        pipe = self.client.pipeline()
-        if self.expires:
-            pipe.setex(key, value, self.expires)
-        else:
-            pipe.set(key, value)
-        pipe.publish(key, value)
-        pipe.execute()
+        with self.client.pipeline() as pipe:
+            if self.expires:
+                pipe.setex(key, value, self.expires)
+            else:
+                pipe.set(key, value)
+            pipe.publish(key, value)
+            pipe.execute()
 
     def delete(self, key):
         self.client.delete(key)
@@ -205,21 +205,23 @@ class RedisBackend(KeyValueStoreBackend):
         client = self.client
         jkey = self.get_key_for_group(gid, '.j')
         result = self.encode_result(result, state)
-        _, readycount, _ = client.pipeline()                            \
-            .rpush(jkey, self.encode([1, tid, state, result]))          \
-            .llen(jkey)                                                 \
-            .expire(jkey, 86400)                                        \
-            .execute()
+        with client.pipeline() as pipe:
+            _, readycount, _ = pipe                                         \
+                .rpush(jkey, self.encode([1, tid, state, result]))          \
+                .llen(jkey)                                                 \
+                .expire(jkey, 86400)                                        \
+                .execute()
 
         try:
             callback = maybe_signature(request.chord, app=app)
             total = callback['chord_size']
             if readycount == total:
                 decode, unpack = self.decode, self._unpack_chord_result
-                resl, _ = client.pipeline()     \
-                    .lrange(jkey, 0, total)     \
-                    .delete(jkey)               \
-                    .execute()
+                with client.pipeline() as pipe:
+                    resl, _, = pipe                 \
+                        .lrange(jkey, 0, total)     \
+                        .delete(jkey)               \
+                        .execute()
                 try:
                     callback.delay([unpack(tup, decode) for tup in resl])
                 except Exception as exc:
@@ -240,6 +242,16 @@ class RedisBackend(KeyValueStoreBackend):
                 callback.id, exc=ChordError('Join error: {0!r}'.format(exc)),
             )
 
+    def _create_client(self, socket_timeout=None, socket_connect_timeout=None,
+                       **params):
+        return self.redis.Redis(
+            connection_pool=self.ConnectionPool(
+                socket_timeout=socket_timeout and float(socket_timeout),
+                socket_connect_timeout=socket_connect_timeout and float(
+                    socket_connect_timeout),
+                **params),
+        )
+
     @property
     def ConnectionPool(self):
         if self._ConnectionPool is None:
@@ -248,9 +260,7 @@ class RedisBackend(KeyValueStoreBackend):
 
     @cached_property
     def client(self):
-        return self.redis.Redis(
-            connection_pool=self.ConnectionPool(**self.connparams),
-        )
+        return self._create_client(**self.connparams)
 
     def __reduce__(self, args=(), kwargs={}):
         return super(RedisBackend, self).__reduce__(
diff --git a/celery/beat.py b/celery/beat.py
index 13de9ee..7b14777 100644
--- a/celery/beat.py
+++ b/celery/beat.py
@@ -174,9 +174,9 @@ class Scheduler(object):
                  Publisher=None, lazy=False, sync_every_tasks=None, **kwargs):
         self.app = app
         self.data = maybe_evaluate({} if schedule is None else schedule)
-        self.max_interval = (max_interval
-                             or app.conf.CELERYBEAT_MAX_LOOP_INTERVAL
-                             or self.max_interval)
+        self.max_interval = (max_interval or
+                             app.conf.CELERYBEAT_MAX_LOOP_INTERVAL or
+                             self.max_interval)
         self.sync_every_tasks = (
             app.conf.CELERYBEAT_SYNC_EVERY if sync_every_tasks is None
             else sync_every_tasks)
@@ -362,22 +362,31 @@ class PersistentScheduler(Scheduler):
             with platforms.ignore_errno(errno.ENOENT):
                 os.remove(self.schedule_filename + suffix)
 
+    def _open_schedule(self):
+        return self.persistence.open(self.schedule_filename, writeback=True)
+
+    def _destroy_open_corrupted_schedule(self, exc):
+        error('Removing corrupted schedule file %r: %r',
+              self.schedule_filename, exc, exc_info=True)
+        self._remove_db()
+        return self._open_schedule()
+
     def setup_schedule(self):
         try:
-            self._store = self.persistence.open(self.schedule_filename,
-                                                writeback=True)
+            self._store = self._open_schedule()
         except Exception as exc:
-            error('Removing corrupted schedule file %r: %r',
-                  self.schedule_filename, exc, exc_info=True)
-            self._remove_db()
-            self._store = self.persistence.open(self.schedule_filename,
-                                                writeback=True)
-        else:
+            self._store = self._destroy_open_corrupted_schedule(exc)
+
+        for _ in (1, 2):
             try:
                 self._store['entries']
             except KeyError:
                 # new schedule db
-                self._store['entries'] = {}
+                try:
+                    self._store['entries'] = {}
+                except KeyError as exc:
+                    self._store = self._destroy_open_corrupted_schedule(exc)
+                    continue
             else:
                 if '__version__' not in self._store:
                     warning('DB Reset: Account for new __version__ field')
@@ -388,6 +397,7 @@ class PersistentScheduler(Scheduler):
                 elif 'utc_enabled' not in self._store:
                     warning('DB Reset: Account for new utc_enabled field')
                     self._store.clear()   # remove schedule at 3.0.9 upgrade
+            break
 
         tz = self.app.conf.CELERY_TIMEZONE
         stored_tz = self._store.get('tz')
@@ -435,8 +445,8 @@ class Service(object):
     def __init__(self, app, max_interval=None, schedule_filename=None,
                  scheduler_cls=None):
         self.app = app
-        self.max_interval = (max_interval
-                             or app.conf.CELERYBEAT_MAX_LOOP_INTERVAL)
+        self.max_interval = (max_interval or
+                             app.conf.CELERYBEAT_MAX_LOOP_INTERVAL)
         self.scheduler_cls = scheduler_cls or self.scheduler_cls
         self.schedule_filename = (
             schedule_filename or app.conf.CELERYBEAT_SCHEDULE_FILENAME)
@@ -497,13 +507,15 @@ class Service(object):
 class _Threaded(Thread):
     """Embedded task scheduler using threading."""
 
-    def __init__(self, *args, **kwargs):
+    def __init__(self, app, **kwargs):
         super(_Threaded, self).__init__()
-        self.service = Service(*args, **kwargs)
+        self.app = app
+        self.service = Service(app, **kwargs)
         self.daemon = True
         self.name = 'Beat'
 
     def run(self):
+        self.app.set_current()
         self.service.start()
 
     def stop(self):
@@ -517,9 +529,10 @@ except NotImplementedError:     # pragma: no cover
 else:
     class _Process(Process):    # noqa
 
-        def __init__(self, *args, **kwargs):
+        def __init__(self, app, **kwargs):
             super(_Process, self).__init__()
-            self.service = Service(*args, **kwargs)
+            self.app = app
+            self.service = Service(app, **kwargs)
             self.name = 'Beat'
 
         def run(self):
@@ -527,6 +540,8 @@ else:
             platforms.close_open_fds([
                 sys.__stdin__, sys.__stdout__, sys.__stderr__,
             ] + list(iter_open_logger_fds()))
+            self.app.set_default()
+            self.app.set_current()
             self.service.start(embedded_process=True)
 
         def stop(self):
@@ -534,7 +549,7 @@ else:
             self.terminate()
 
 
-def EmbeddedService(*args, **kwargs):
+def EmbeddedService(app, max_interval=None, **kwargs):
     """Return embedded clock service.
 
     :keyword thread: Run threaded instead of as a separate process.
@@ -544,6 +559,5 @@ def EmbeddedService(*args, **kwargs):
     if kwargs.pop('thread', False) or _Process is None:
         # Need short max interval to be able to stop thread
         # in reasonable time.
-        kwargs.setdefault('max_interval', 1)
-        return _Threaded(*args, **kwargs)
-    return _Process(*args, **kwargs)
+        return _Threaded(app, max_interval=1, **kwargs)
+    return _Process(app, max_interval=max_interval, **kwargs)
diff --git a/celery/bin/amqp.py b/celery/bin/amqp.py
index 4dab152..ce3b351 100644
--- a/celery/bin/amqp.py
+++ b/celery/bin/amqp.py
@@ -182,6 +182,16 @@ class AMQShell(cmd.Cmd):
         'basic.ack': Spec(('delivery_tag', int)),
     }
 
+    def _prepare_spec(self, conn):
+        # XXX Hack to fix Issue #2013
+        from amqp import Connection, Message
+        if isinstance(conn.connection, Connection):
+            self.amqp['basic.publish'] = Spec(('msg', Message),
+                                              ('exchange', str),
+                                              ('routing_key', str),
+                                              ('mandatory', bool, 'no'),
+                                              ('immediate', bool, 'no'))
+
     def __init__(self, *args, **kwargs):
         self.connect = kwargs.pop('connect')
         self.silent = kwargs.pop('silent', False)
@@ -296,6 +306,7 @@ class AMQShell(cmd.Cmd):
     def _reconnect(self):
         """Re-establish connection to the AMQP server."""
         self.conn = self.connect(self.conn)
+        self._prepare_spec(self.conn)
         self.chan = self.conn.default_channel
         self.needs_reconnect = False
 
diff --git a/celery/bin/beat.py b/celery/bin/beat.py
index 6b5b734..4bcbc62 100644
--- a/celery/bin/beat.py
+++ b/celery/bin/beat.py
@@ -87,9 +87,9 @@ class beat(Command):
                     default=c.CELERYBEAT_SCHEDULE_FILENAME),
              Option('--max-interval', type='float'),
              Option('-S', '--scheduler', dest='scheduler_cls'),
-             Option('-l', '--loglevel', default=c.CELERYBEAT_LOG_LEVEL))
-            + daemon_options(default_pidfile='celerybeat.pid')
-            + tuple(self.app.user_options['beat'])
+             Option('-l', '--loglevel', default=c.CELERYBEAT_LOG_LEVEL)) +
+            daemon_options(default_pidfile='celerybeat.pid') +
+            tuple(self.app.user_options['beat'])
         )
 
 
diff --git a/celery/bin/celery.py b/celery/bin/celery.py
index 10d7c03..3af8afa 100644
--- a/celery/bin/celery.py
+++ b/celery/bin/celery.py
@@ -115,7 +115,8 @@ class list_(Command):
         except NotImplementedError:
             raise self.Error('Your transport cannot list bindings.')
 
-        fmt = lambda q, e, r: self.out('{0:<28} {1:<28} {2}'.format(q, e, r))
+        def fmt(q, e, r):
+            return self.out('{0:<28} {1:<28} {2}'.format(q, e, r))
         fmt('Queue', 'Exchange', 'Routing Key')
         fmt('-' * 16, '-' * 16, '-' * 16)
         for b in bindings:
diff --git a/celery/bin/events.py b/celery/bin/events.py
index d987505..8cc61b6 100644
--- a/celery/bin/events.py
+++ b/celery/bin/events.py
@@ -125,9 +125,9 @@ class events(Command):
              Option('-F', '--frequency', '--freq',
                     type='float', default=1.0),
              Option('-r', '--maxrate'),
-             Option('-l', '--loglevel', default='INFO'))
-            + daemon_options(default_pidfile='celeryev.pid')
-            + tuple(self.app.user_options['events'])
+             Option('-l', '--loglevel', default='INFO')) +
+            daemon_options(default_pidfile='celeryev.pid') +
+            tuple(self.app.user_options['events'])
         )
 
 
diff --git a/celery/concurrency/asynpool.py b/celery/concurrency/asynpool.py
index 33e2bf6..0d5dfb0 100644
--- a/celery/concurrency/asynpool.py
+++ b/celery/concurrency/asynpool.py
@@ -576,7 +576,7 @@ class AsynPool(_pool.Pool):
 
         def on_process_down(proc):
             """Called when a worker process exits."""
-            if proc.dead:
+            if getattr(proc, 'dead', None):
                 return
             process_flush_queues(proc)
             _remove_from_index(
diff --git a/celery/concurrency/eventlet.py b/celery/concurrency/eventlet.py
index 00082dd..98bddcf 100644
--- a/celery/concurrency/eventlet.py
+++ b/celery/concurrency/eventlet.py
@@ -29,10 +29,10 @@ for mod in (mod for mod in sys.modules if mod.startswith(RACE_MODS)):
             warnings.warn(RuntimeWarning(W_RACE % side))
 
 
-from celery import signals
-from celery.utils import timer2
+from celery import signals  # noqa
+from celery.utils import timer2  # noqa
 
-from . import base
+from . import base  # noqa
 
 
 def apply_target(target, args=(), kwargs={}, callback=None,
diff --git a/celery/concurrency/prefork.py b/celery/concurrency/prefork.py
index b579d0e..b71eae3 100644
--- a/celery/concurrency/prefork.py
+++ b/celery/concurrency/prefork.py
@@ -159,7 +159,8 @@ class TaskPool(BasePool):
         try:
             write_stats = self._pool.human_write_stats
         except AttributeError:
-            write_stats = lambda: 'N/A'  # only supported by asynpool
+            def write_stats():
+                return 'N/A'  # only supported by asynpool
         return {
             'max-concurrency': self.limit,
             'processes': [p.pid for p in self._pool._pool],
diff --git a/celery/events/cursesmon.py b/celery/events/cursesmon.py
index 796565f..dfeff2d 100644
--- a/celery/events/cursesmon.py
+++ b/celery/events/cursesmon.py
@@ -318,8 +318,8 @@ class CursesMonitor(object):  # pragma: no cover
         def alert_callback(my, mx, xs):
             y = count(xs)
             task = self.state.tasks[self.selected_task]
-            result = (getattr(task, 'result', None)
-                      or getattr(task, 'exception', None))
+            result = (getattr(task, 'result', None) or
+                      getattr(task, 'exception', None))
             for line in wrap(result, mx - 2):
                 self.win.addstr(next(y), 3, line)
 
diff --git a/celery/five.py b/celery/five.py
index f7817d0..2406920 100644
--- a/celery/five.py
+++ b/celery/five.py
@@ -10,15 +10,14 @@
 """
 from __future__ import absolute_import
 
-__all__ = ['Counter', 'reload', 'UserList', 'UserDict', 'Queue', 'Empty',
-           'zip_longest', 'map', 'string', 'string_t',
-           'long_t', 'text_t', 'range', 'int_types', 'items', 'keys', 'values',
-           'nextfun', 'reraise', 'WhateverIO', 'with_metaclass',
-           'OrderedDict', 'THREAD_TIMEOUT_MAX', 'format_d',
-           'class_property', 'reclassmethod', 'create_module',
-           'recreate_module', 'monotonic']
-
 import io
+import operator
+import sys
+
+from importlib import import_module
+from types import ModuleType
+
+from kombu.five import monotonic
 
 try:
     from collections import Counter
@@ -28,8 +27,15 @@ except ImportError:  # pragma: no cover
     def Counter():  # noqa
         return defaultdict(int)
 
+__all__ = ['Counter', 'reload', 'UserList', 'UserDict', 'Queue', 'Empty',
+           'zip_longest', 'map', 'string', 'string_t',
+           'long_t', 'text_t', 'range', 'int_types', 'items', 'keys', 'values',
+           'nextfun', 'reraise', 'WhateverIO', 'with_metaclass',
+           'OrderedDict', 'THREAD_TIMEOUT_MAX', 'format_d',
+           'class_property', 'reclassmethod', 'create_module',
+           'recreate_module', 'monotonic']
+
 # ############# py3k #########################################################
-import sys
 PY3 = sys.version_info[0] == 3
 
 try:
@@ -48,8 +54,6 @@ except ImportError:                         # pragma: no cover
     from collections import UserDict        # noqa
 
 
-from kombu.five import monotonic
-
 if PY3:  # pragma: no cover
     import builtins
 
@@ -175,8 +179,6 @@ else:  # pragma: no cover
 # recreate modules, either for lazy loading or
 # to create old modules at runtime instead of
 # having them litter the source tree.
-import operator
-import sys
 
 # import fails in python 2.5. fallback to reduce in stdlib
 try:
@@ -184,9 +186,6 @@ try:
 except ImportError:
     pass
 
-from importlib import import_module
-from types import ModuleType
-
 MODULE_DEPRECATED = """
 The module %s is deprecated and will be removed in a future version.
 """
diff --git a/celery/fixups/django.py b/celery/fixups/django.py
index 05c68d0..73c5c28 100644
--- a/celery/fixups/django.py
... 1702 lines suppressed ...

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/celery.git



More information about the Python-modules-commits mailing list