[Python-modules-commits] [celery] 01/11: Import celery_3.1.19.orig.tar.gz
Brian May
bam at moszumanska.debian.org
Wed Nov 18 22:21:05 UTC 2015
This is an automated email from the git hooks/post-receive script.
bam pushed a commit to branch master
in repository celery.
commit ecfab0e5f18c81b19617e93999463c55955d714a
Author: Brian May <bam at debian.org>
Date: Thu Nov 19 09:02:21 2015 +1100
Import celery_3.1.19.orig.tar.gz
---
CONTRIBUTORS.txt | 15 ++
Changelog | 81 ++++++++++
PKG-INFO | 4 +-
README.rst | 2 +-
celery.egg-info/PKG-INFO | 4 +-
celery.egg-info/requires.txt | 4 +-
celery/__init__.py | 9 +-
celery/app/base.py | 3 +-
celery/app/control.py | 2 +-
celery/app/task.py | 23 +--
celery/apps/worker.py | 3 +-
celery/backends/__init__.py | 7 +-
celery/backends/amqp.py | 3 +-
celery/backends/base.py | 11 +-
celery/backends/database/__init__.py | 4 +-
celery/backends/mongodb.py | 41 ++---
celery/backends/redis.py | 48 +++---
celery/beat.py | 58 +++++---
celery/bin/amqp.py | 11 ++
celery/bin/beat.py | 6 +-
celery/bin/celery.py | 3 +-
celery/bin/events.py | 6 +-
celery/concurrency/asynpool.py | 2 +-
celery/concurrency/eventlet.py | 6 +-
celery/concurrency/prefork.py | 3 +-
celery/events/cursesmon.py | 4 +-
celery/five.py | 31 ++--
celery/fixups/django.py | 10 +-
celery/local.py | 182 +++++++++++++++++------
celery/platforms.py | 45 +++++-
celery/task/trace.py | 2 +-
celery/tests/app/test_app.py | 10 +-
celery/tests/app/test_beat.py | 4 +-
celery/tests/app/test_loaders.py | 3 +-
celery/tests/backends/test_amqp.py | 63 +++++++-
celery/tests/backends/test_mongodb.py | 50 ++++++-
celery/tests/backends/test_redis.py | 8 +-
celery/tests/case.py | 19 ++-
celery/tests/compat_modules/test_compat.py | 8 +-
celery/tests/events/test_state.py | 3 +-
celery/tests/tasks/test_chord.py | 4 +-
celery/tests/utils/test_mail.py | 2 +-
celery/tests/utils/test_platforms.py | 14 +-
celery/tests/utils/test_threads.py | 4 +-
celery/tests/worker/test_hub.py | 3 +-
celery/tests/worker/test_loops.py | 11 +-
celery/utils/term.py | 7 +-
celery/worker/components.py | 2 +-
celery/worker/job.py | 7 +-
celery/worker/loops.py | 6 +
docs/_ext/applyxrefs.py | 2 -
docs/changelog.rst | 81 ++++++++++
docs/conf.py | 20 +--
docs/configuration.rst | 159 +++++++++++++++-----
docs/getting-started/first-steps-with-celery.rst | 8 +-
docs/includes/introduction.txt | 2 +-
docs/userguide/canvas.rst | 6 +-
docs/userguide/monitoring.rst | 2 +-
docs/userguide/periodic-tasks.rst | 3 +
docs/userguide/routing.rst | 2 +-
docs/userguide/security.rst | 5 +-
docs/userguide/tasks.rst | 41 ++---
examples/app/myapp.py | 5 +-
examples/django/proj/__init__.py | 2 +-
examples/django/proj/celery.py | 2 +-
examples/django/proj/settings.py | 2 +-
examples/django/proj/urls.py | 7 +-
examples/django/proj/wsgi.py | 2 +-
examples/eventlet/celeryconfig.py | 2 +-
examples/gevent/celeryconfig.py | 2 +-
examples/next-steps/setup.py | 2 +-
requirements/default.txt | 4 +-
requirements/docs.txt | 2 +-
requirements/jython.txt | 2 +-
requirements/test-ci.txt | 7 +-
setup.py | 10 +-
76 files changed, 901 insertions(+), 342 deletions(-)
diff --git a/CONTRIBUTORS.txt b/CONTRIBUTORS.txt
index 574f439..38df5c7 100644
--- a/CONTRIBUTORS.txt
+++ b/CONTRIBUTORS.txt
@@ -180,3 +180,18 @@ Bert Vanderbauwhede, 2014/12/18
John Anderson, 2014/12/27
Luke Burden, 2015/01/24
Mickaël Penhard, 2015/02/15
+Mark Parncutt, 2015/02/16
+Samuel Jaillet, 2015/03/24
+Ilya Georgievsky, 2015/03/31
+Fatih Sucu, 2015/04/17
+James Pulec, 2015/04/19
+Alexander Lebedev, 2015/04/25
+Frantisek Holop, 2015/05/21
+Feanil Patel, 2015/05/21
+Jocelyn Delalande, 2015/06/03
+Justin Patrin, 2015/08/06
+Juan Rossi, 2015/08/10
+Piotr Maślanka, 2015/08/24
+Gerald Manipon, 2015/10/19
+Krzysztof Bujniewicz, 2015/10/21
+Sukrit Khera, 2015/10/26
diff --git a/Changelog b/Changelog
index a99b967..423df3f 100644
--- a/Changelog
+++ b/Changelog
@@ -8,6 +8,87 @@ This document contains change notes for bugfix releases in the 3.1.x series
(Cipater), please see :ref:`whatsnew-3.1` for an overview of what's
new in Celery 3.1.
+.. _version-3.1.19:
+
+3.1.19
+======
+:release-date: 2015-10-26 01:00 P.M UTC
+:release-by: Ask Solem
+
+- **Requirements**
+
+ - Now depends on :ref:`Kombu 3.0.29 <kombu:version-3.0.29>`.
+
+ - Now depends on :mod:`billiard` 3.3.0.21.
+
+- **Results**: Fixed MongoDB result backend URL parsing problem
+ (Issue celery/kombu#375).
+
+- **Worker**: Task request now properly sets ``priority`` in delivery_info.
+
+ Fix contributed by Gerald Manipon.
+
+- **Beat**: PyPy shelve may raise ``KeyError`` when setting keys
+ (Issue #2862).
+
+- **Programs**: :program:`celery beat --deatched` now working on PyPy.
+
+ Fix contributed by Krzysztof Bujniewicz.
+
+- **Results**: Redis result backend now ensures all pipelines are cleaned up.
+
+ Contributed by Justin Patrin.
+
+- **Results**: Redis result backend now allows for timeout to be set in the
+ query portion of the result backend URL.
+
+ E.g. ``CELERY_RESULT_BACKEND = 'redis://?timeout=10'``
+
+ Contributed by Justin Patrin.
+
+- **Results**: ``result.get`` now properly handles failures where the
+ exception value is set to :const:`None` (Issue #2560).
+
+- **Prefork pool**: Fixed attribute error ``proc.dead``.
+
+- **Worker**: Fixed worker hanging when gossip/heartbeat disabled
+ (Issue #1847).
+
+ Fix contributed by Aaron Webber and Bryan Helmig.
+
+- **Results**: MongoDB result backend now supports pymongo 3.x
+ (Issue #2744).
+
+ Fix contributed by Sukrit Khera.
+
+- **Results**: RPC/amqp backends did not deserialize exceptions properly
+ (Issue #2691).
+
+ Fix contributed by Sukrit Khera.
+
+- **Programs**: Fixed problem with :program:`celery amqp`'s
+ ``basic_publish`` (Issue #2013).
+
+- **Worker**: Embedded beat now properly sets app for thread/process
+ (Issue #2594).
+
+- **Documentation**: Many improvements and typos fixed.
+
+ Contributions by:
+
+ Carlos Garcia-Dubus
+ D. Yu
+ jerry
+ Jocelyn Delalande
+ Josh Kupershmidt
+ Juan Rossi
+ kanemra
+ Paul Pearce
+ Pavel Savchenko
+ Sean Wang
+ Seungha Kim
+ Zhaorong Ma
+
.. _version-3.1.18:
3.1.18
diff --git a/PKG-INFO b/PKG-INFO
index 3eefa90..8ab88c7 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,6 +1,6 @@
Metadata-Version: 1.1
Name: celery
-Version: 3.1.18
+Version: 3.1.19
Summary: Distributed Task Queue
Home-page: http://celeryproject.org
Author: Ask Solem
@@ -12,7 +12,7 @@ Description: =================================
.. image:: http://cloud.github.com/downloads/celery/celery/celery_128.png
- :Version: 3.1.18 (Cipater)
+ :Version: 3.1.19 (Cipater)
:Web: http://celeryproject.org/
:Download: http://pypi.python.org/pypi/celery/
:Source: http://github.com/celery/celery/
diff --git a/README.rst b/README.rst
index 258b8ed..dc22840 100644
--- a/README.rst
+++ b/README.rst
@@ -4,7 +4,7 @@
.. image:: http://cloud.github.com/downloads/celery/celery/celery_128.png
-:Version: 3.1.18 (Cipater)
+:Version: 3.1.19 (Cipater)
:Web: http://celeryproject.org/
:Download: http://pypi.python.org/pypi/celery/
:Source: http://github.com/celery/celery/
diff --git a/celery.egg-info/PKG-INFO b/celery.egg-info/PKG-INFO
index 3eefa90..8ab88c7 100644
--- a/celery.egg-info/PKG-INFO
+++ b/celery.egg-info/PKG-INFO
@@ -1,6 +1,6 @@
Metadata-Version: 1.1
Name: celery
-Version: 3.1.18
+Version: 3.1.19
Summary: Distributed Task Queue
Home-page: http://celeryproject.org
Author: Ask Solem
@@ -12,7 +12,7 @@ Description: =================================
.. image:: http://cloud.github.com/downloads/celery/celery/celery_128.png
- :Version: 3.1.18 (Cipater)
+ :Version: 3.1.19 (Cipater)
:Web: http://celeryproject.org/
:Download: http://pypi.python.org/pypi/celery/
:Source: http://github.com/celery/celery/
diff --git a/celery.egg-info/requires.txt b/celery.egg-info/requires.txt
index da27b3c..09bf4b3 100644
--- a/celery.egg-info/requires.txt
+++ b/celery.egg-info/requires.txt
@@ -1,6 +1,6 @@
pytz>dev
-billiard>=3.3.0.20,<3.4
-kombu>=3.0.25,<3.1
+billiard>=3.3.0.21,<3.4
+kombu>=3.0.29,<3.1
[zookeeper]
kazoo>=1.3.1
diff --git a/celery/__init__.py b/celery/__init__.py
index fd0cb0e..2366551 100644
--- a/celery/__init__.py
+++ b/celery/__init__.py
@@ -7,6 +7,9 @@
from __future__ import absolute_import
+import os
+import sys
+
from collections import namedtuple
version_info_t = namedtuple(
@@ -14,7 +17,7 @@ version_info_t = namedtuple(
)
SERIES = 'Cipater'
-VERSION = version_info_t(3, 1, 18, '', '')
+VERSION = version_info_t(3, 1, 19, '', '')
__version__ = '{0.major}.{0.minor}.{0.micro}{0.releaselevel}'.format(VERSION)
__author__ = 'Ask Solem'
__contact__ = 'ask at celeryproject.org'
@@ -30,8 +33,6 @@ VERSION_BANNER = '{0} ({1})'.format(__version__, SERIES)
# -eof meta-
-import os
-import sys
if os.environ.get('C_IMPDEBUG'): # pragma: no cover
from .five import builtins
real_import = builtins.__import__
@@ -127,7 +128,7 @@ def maybe_patch_concurrency(argv=sys.argv,
concurrency.get_implementation(pool)
# Lazy loading
-from celery import five
+from celery import five # noqa
old_module, new_module = five.recreate_module( # pragma: no cover
__name__,
diff --git a/celery/app/base.py b/celery/app/base.py
index d228310..7044cfa 100644
--- a/celery/app/base.py
+++ b/celery/app/base.py
@@ -221,7 +221,8 @@ class Celery(object):
def _create_task_cls(fun):
if shared:
- cons = lambda app: app._task_from_fun(fun, **opts)
+ def cons(app):
+ return app._task_from_fun(fun, **opts)
cons.__name__ = fun.__name__
connect_on_app_finalize(cons)
if self.accept_magic_kwargs: # compat mode
diff --git a/celery/app/control.py b/celery/app/control.py
index 2845374..10baf59 100644
--- a/celery/app/control.py
+++ b/celery/app/control.py
@@ -263,7 +263,7 @@ class Control(object):
return self.broadcast('enable_events', {}, destination, **kwargs)
def disable_events(self, destination=None, **kwargs):
- """Tell all (or specific) workers to enable events."""
+ """Tell all (or specific) workers to disable events."""
return self.broadcast('disable_events', {}, destination, **kwargs)
def pool_grow(self, n=1, destination=None, **kwargs):
diff --git a/celery/app/task.py b/celery/app/task.py
index ed88ce7..6c55ca5 100644
--- a/celery/app/task.py
+++ b/celery/app/task.py
@@ -483,11 +483,12 @@ class Task(object):
:keyword retry: If enabled sending of the task message will be retried
in the event of connection loss or failure. Default
is taken from the :setting:`CELERY_TASK_PUBLISH_RETRY`
- setting. Note you need to handle the
+ setting. Note that you need to handle the
producer/connection manually for this to work.
:keyword retry_policy: Override the retry policy used. See the
- :setting:`CELERY_TASK_PUBLISH_RETRY` setting.
+ :setting:`CELERY_TASK_PUBLISH_RETRY_POLICY`
+ setting.
:keyword routing_key: Custom routing key used to route the task to a
worker server. If in combination with a
@@ -599,7 +600,12 @@ class Task(object):
:keyword countdown: Time in seconds to delay the retry for.
:keyword eta: Explicit time and date to run the retry at
(must be a :class:`~datetime.datetime` instance).
- :keyword max_retries: If set, overrides the default retry limit.
+ :keyword max_retries: If set, overrides the default retry limit for
+ this execution. Changes to this parameter do not propagate to
+ subsequent task retry attempts. A value of :const:`None`, means
+ "use the default", so if you want infinite retries you would
+ have to set the :attr:`max_retries` attribute of the task to
+ :const:`None` first.
:keyword time_limit: If set, overrides the default time limit.
:keyword soft_time_limit: If set, overrides the default soft
time limit.
@@ -624,14 +630,14 @@ class Task(object):
>>> from imaginary_twitter_lib import Twitter
>>> from proj.celery import app
- >>> @app.task()
- ... def tweet(auth, message):
+ >>> @app.task(bind=True)
+ ... def tweet(self, auth, message):
... twitter = Twitter(oauth=auth)
... try:
... twitter.post_status_update(message)
... except twitter.FailWhale as exc:
... # Retry in 5 minutes.
- ... raise tweet.retry(countdown=60 * 5, exc=exc)
+ ... raise self.retry(countdown=60 * 5, exc=exc)
Although the task will never return above as `retry` raises an
exception to notify the worker, we use `raise` in front of the retry
@@ -863,9 +869,8 @@ class Task(object):
:param status: Current task state.
:param retval: Task return value/exception.
:param task_id: Unique id of the task.
- :param args: Original arguments for the task that failed.
- :param kwargs: Original keyword arguments for the task
- that failed.
+ :param args: Original arguments for the task.
+ :param kwargs: Original keyword arguments for the task.
:keyword einfo: :class:`~billiard.einfo.ExceptionInfo`
instance, containing the traceback (if any).
diff --git a/celery/apps/worker.py b/celery/apps/worker.py
index d190711..4e55c4a 100644
--- a/celery/apps/worker.py
+++ b/celery/apps/worker.py
@@ -313,7 +313,8 @@ if not is_jython: # pragma: no cover
_shutdown_handler, sig='SIGINT', callback=on_SIGINT
)
else: # pragma: no cover
- install_worker_int_handler = lambda *a, **kw: None
+ def install_worker_int_handler(*a, **kw):
+ pass
def _reload_current_worker():
diff --git a/celery/backends/__init__.py b/celery/backends/__init__.py
index fbe8a9c..420ba24 100644
--- a/celery/backends/__init__.py
+++ b/celery/backends/__init__.py
@@ -10,8 +10,6 @@ from __future__ import absolute_import
import sys
-from kombu.utils.url import _parse_url
-
from celery.local import Proxy
from celery._state import current_app
from celery.five import reraise
@@ -56,8 +54,9 @@ def get_backend_by_url(backend=None, loader=None):
url = None
if backend and '://' in backend:
url = backend
- if '+' in url[:url.index('://')]:
+ scheme, _, _ = url.partition('://')
+ if '+' in scheme:
backend, url = url.split('+', 1)
else:
- backend, _, _, _, _, _, _ = _parse_url(url)
+ backend = scheme
return get_backend_cls(backend, loader), url
diff --git a/celery/backends/amqp.py b/celery/backends/amqp.py
index 5587943..a4a6e84 100644
--- a/celery/backends/amqp.py
+++ b/celery/backends/amqp.py
@@ -180,7 +180,8 @@ class AMQPBackend(BaseBackend):
raise self.BacklogLimitExceeded(task_id)
if latest:
- payload = self._cache[task_id] = latest.payload
+ payload = self._cache[task_id] = \
+ self.meta_from_decoded(latest.payload)
latest.requeue()
return payload
else:
diff --git a/celery/backends/base.py b/celery/backends/base.py
index 68e8b00..f95b704 100644
--- a/celery/backends/base.py
+++ b/celery/backends/base.py
@@ -165,11 +165,12 @@ class BaseBackend(object):
def exception_to_python(self, exc):
"""Convert serialized exception to Python exception."""
- if self.serializer in EXCEPTION_ABLE_CODECS:
- return get_pickled_exception(exc)
- elif not isinstance(exc, BaseException):
- return create_exception_cls(
- from_utf8(exc['exc_type']), __name__)(exc['exc_message'])
+ if exc:
+ if self.serializer in EXCEPTION_ABLE_CODECS:
+ return get_pickled_exception(exc)
+ elif not isinstance(exc, BaseException):
+ return create_exception_cls(
+ from_utf8(exc['exc_type']), __name__)(exc['exc_message'])
return exc
def prepare_value(self, result):
diff --git a/celery/backends/database/__init__.py b/celery/backends/database/__init__.py
index c52e758..6df9d34 100644
--- a/celery/backends/database/__init__.py
+++ b/celery/backends/database/__init__.py
@@ -37,8 +37,8 @@ def _sqlalchemy_installed():
return sqlalchemy
_sqlalchemy_installed()
-from sqlalchemy.exc import DatabaseError, InvalidRequestError
-from sqlalchemy.orm.exc import StaleDataError
+from sqlalchemy.exc import DatabaseError, InvalidRequestError # noqa
+from sqlalchemy.orm.exc import StaleDataError # noqa
@contextmanager
diff --git a/celery/backends/mongodb.py b/celery/backends/mongodb.py
index ef1a97c..ba2761d 100644
--- a/celery/backends/mongodb.py
+++ b/celery/backends/mongodb.py
@@ -10,6 +10,16 @@ from __future__ import absolute_import
from datetime import datetime
+from kombu.syn import detect_environment
+from kombu.utils import cached_property
+
+from celery import states
+from celery.exceptions import ImproperlyConfigured
+from celery.five import items, string_t
+from celery.utils.timeutils import maybe_timedelta
+
+from .base import BaseBackend
+
try:
import pymongo
except ImportError: # pragma: no cover
@@ -23,16 +33,6 @@ if pymongo:
else: # pragma: no cover
Binary = None # noqa
-from kombu.syn import detect_environment
-from kombu.utils import cached_property
-
-from celery import states
-from celery.exceptions import ImproperlyConfigured
-from celery.five import string_t
-from celery.utils.timeutils import maybe_timedelta
-
-from .base import BaseBackend
-
__all__ = ['MongoBackend']
@@ -92,17 +92,26 @@ class MongoBackend(BaseBackend):
self.options = dict(config, **config.pop('options', None) or {})
# Set option defaults
- if pymongo.version_tuple >= (3, ):
- self.options.setdefault('maxPoolSize', self.max_pool_size)
- else:
- self.options.setdefault('max_pool_size', self.max_pool_size)
- self.options.setdefault('auto_start_request', False)
+ for key, value in items(self._prepare_client_options()):
+ self.options.setdefault(key, value)
self.url = url
if self.url:
# Specifying backend as an URL
self.host = self.url
+ def _prepare_client_options(self):
+ if pymongo.version_tuple >= (3, ):
+ return {'maxPoolSize': self.max_pool_size}
+ else: # pragma: no cover
+ options = {
+ 'max_pool_size': self.max_pool_size,
+ 'auto_start_request': False
+ }
+ if detect_environment() != 'default':
+ options['use_greenlets'] = True
+ return options
+
def _get_connection(self):
"""Connect to the MongoDB server."""
if self._connection is None:
@@ -120,8 +129,6 @@ class MongoBackend(BaseBackend):
url = 'mongodb://{0}:{1}'.format(url, self.port)
if url == 'mongodb://':
url = url + 'localhost'
- if detect_environment() != 'default':
- self.options['use_greenlets'] = True
self._connection = MongoClient(host=url, **self.options)
return self._connection
diff --git a/celery/backends/redis.py b/celery/backends/redis.py
index 5acdf37..14885ee 100644
--- a/celery/backends/redis.py
+++ b/celery/backends/redis.py
@@ -160,13 +160,13 @@ class RedisBackend(KeyValueStoreBackend):
return self.ensure(self._set, (key, value), **retry_policy)
def _set(self, key, value):
- pipe = self.client.pipeline()
- if self.expires:
- pipe.setex(key, value, self.expires)
- else:
- pipe.set(key, value)
- pipe.publish(key, value)
- pipe.execute()
+ with self.client.pipeline() as pipe:
+ if self.expires:
+ pipe.setex(key, value, self.expires)
+ else:
+ pipe.set(key, value)
+ pipe.publish(key, value)
+ pipe.execute()
def delete(self, key):
self.client.delete(key)
@@ -205,21 +205,23 @@ class RedisBackend(KeyValueStoreBackend):
client = self.client
jkey = self.get_key_for_group(gid, '.j')
result = self.encode_result(result, state)
- _, readycount, _ = client.pipeline() \
- .rpush(jkey, self.encode([1, tid, state, result])) \
- .llen(jkey) \
- .expire(jkey, 86400) \
- .execute()
+ with client.pipeline() as pipe:
+ _, readycount, _ = pipe \
+ .rpush(jkey, self.encode([1, tid, state, result])) \
+ .llen(jkey) \
+ .expire(jkey, 86400) \
+ .execute()
try:
callback = maybe_signature(request.chord, app=app)
total = callback['chord_size']
if readycount == total:
decode, unpack = self.decode, self._unpack_chord_result
- resl, _ = client.pipeline() \
- .lrange(jkey, 0, total) \
- .delete(jkey) \
- .execute()
+ with client.pipeline() as pipe:
+ resl, _, = pipe \
+ .lrange(jkey, 0, total) \
+ .delete(jkey) \
+ .execute()
try:
callback.delay([unpack(tup, decode) for tup in resl])
except Exception as exc:
@@ -240,6 +242,16 @@ class RedisBackend(KeyValueStoreBackend):
callback.id, exc=ChordError('Join error: {0!r}'.format(exc)),
)
+ def _create_client(self, socket_timeout=None, socket_connect_timeout=None,
+ **params):
+ return self.redis.Redis(
+ connection_pool=self.ConnectionPool(
+ socket_timeout=socket_timeout and float(socket_timeout),
+ socket_connect_timeout=socket_connect_timeout and float(
+ socket_connect_timeout),
+ **params),
+ )
+
@property
def ConnectionPool(self):
if self._ConnectionPool is None:
@@ -248,9 +260,7 @@ class RedisBackend(KeyValueStoreBackend):
@cached_property
def client(self):
- return self.redis.Redis(
- connection_pool=self.ConnectionPool(**self.connparams),
- )
+ return self._create_client(**self.connparams)
def __reduce__(self, args=(), kwargs={}):
return super(RedisBackend, self).__reduce__(
diff --git a/celery/beat.py b/celery/beat.py
index 13de9ee..7b14777 100644
--- a/celery/beat.py
+++ b/celery/beat.py
@@ -174,9 +174,9 @@ class Scheduler(object):
Publisher=None, lazy=False, sync_every_tasks=None, **kwargs):
self.app = app
self.data = maybe_evaluate({} if schedule is None else schedule)
- self.max_interval = (max_interval
- or app.conf.CELERYBEAT_MAX_LOOP_INTERVAL
- or self.max_interval)
+ self.max_interval = (max_interval or
+ app.conf.CELERYBEAT_MAX_LOOP_INTERVAL or
+ self.max_interval)
self.sync_every_tasks = (
app.conf.CELERYBEAT_SYNC_EVERY if sync_every_tasks is None
else sync_every_tasks)
@@ -362,22 +362,31 @@ class PersistentScheduler(Scheduler):
with platforms.ignore_errno(errno.ENOENT):
os.remove(self.schedule_filename + suffix)
+ def _open_schedule(self):
+ return self.persistence.open(self.schedule_filename, writeback=True)
+
+ def _destroy_open_corrupted_schedule(self, exc):
+ error('Removing corrupted schedule file %r: %r',
+ self.schedule_filename, exc, exc_info=True)
+ self._remove_db()
+ return self._open_schedule()
+
def setup_schedule(self):
try:
- self._store = self.persistence.open(self.schedule_filename,
- writeback=True)
+ self._store = self._open_schedule()
except Exception as exc:
- error('Removing corrupted schedule file %r: %r',
- self.schedule_filename, exc, exc_info=True)
- self._remove_db()
- self._store = self.persistence.open(self.schedule_filename,
- writeback=True)
- else:
+ self._store = self._destroy_open_corrupted_schedule(exc)
+
+ for _ in (1, 2):
try:
self._store['entries']
except KeyError:
# new schedule db
- self._store['entries'] = {}
+ try:
+ self._store['entries'] = {}
+ except KeyError as exc:
+ self._store = self._destroy_open_corrupted_schedule(exc)
+ continue
else:
if '__version__' not in self._store:
warning('DB Reset: Account for new __version__ field')
@@ -388,6 +397,7 @@ class PersistentScheduler(Scheduler):
elif 'utc_enabled' not in self._store:
warning('DB Reset: Account for new utc_enabled field')
self._store.clear() # remove schedule at 3.0.9 upgrade
+ break
tz = self.app.conf.CELERY_TIMEZONE
stored_tz = self._store.get('tz')
@@ -435,8 +445,8 @@ class Service(object):
def __init__(self, app, max_interval=None, schedule_filename=None,
scheduler_cls=None):
self.app = app
- self.max_interval = (max_interval
- or app.conf.CELERYBEAT_MAX_LOOP_INTERVAL)
+ self.max_interval = (max_interval or
+ app.conf.CELERYBEAT_MAX_LOOP_INTERVAL)
self.scheduler_cls = scheduler_cls or self.scheduler_cls
self.schedule_filename = (
schedule_filename or app.conf.CELERYBEAT_SCHEDULE_FILENAME)
@@ -497,13 +507,15 @@ class Service(object):
class _Threaded(Thread):
"""Embedded task scheduler using threading."""
- def __init__(self, *args, **kwargs):
+ def __init__(self, app, **kwargs):
super(_Threaded, self).__init__()
- self.service = Service(*args, **kwargs)
+ self.app = app
+ self.service = Service(app, **kwargs)
self.daemon = True
self.name = 'Beat'
def run(self):
+ self.app.set_current()
self.service.start()
def stop(self):
@@ -517,9 +529,10 @@ except NotImplementedError: # pragma: no cover
else:
class _Process(Process): # noqa
- def __init__(self, *args, **kwargs):
+ def __init__(self, app, **kwargs):
super(_Process, self).__init__()
- self.service = Service(*args, **kwargs)
+ self.app = app
+ self.service = Service(app, **kwargs)
self.name = 'Beat'
def run(self):
@@ -527,6 +540,8 @@ else:
platforms.close_open_fds([
sys.__stdin__, sys.__stdout__, sys.__stderr__,
] + list(iter_open_logger_fds()))
+ self.app.set_default()
+ self.app.set_current()
self.service.start(embedded_process=True)
def stop(self):
@@ -534,7 +549,7 @@ else:
self.terminate()
-def EmbeddedService(*args, **kwargs):
+def EmbeddedService(app, max_interval=None, **kwargs):
"""Return embedded clock service.
:keyword thread: Run threaded instead of as a separate process.
@@ -544,6 +559,5 @@ def EmbeddedService(*args, **kwargs):
if kwargs.pop('thread', False) or _Process is None:
# Need short max interval to be able to stop thread
# in reasonable time.
- kwargs.setdefault('max_interval', 1)
- return _Threaded(*args, **kwargs)
- return _Process(*args, **kwargs)
+ return _Threaded(app, max_interval=1, **kwargs)
+ return _Process(app, max_interval=max_interval, **kwargs)
diff --git a/celery/bin/amqp.py b/celery/bin/amqp.py
index 4dab152..ce3b351 100644
--- a/celery/bin/amqp.py
+++ b/celery/bin/amqp.py
@@ -182,6 +182,16 @@ class AMQShell(cmd.Cmd):
'basic.ack': Spec(('delivery_tag', int)),
}
+ def _prepare_spec(self, conn):
+ # XXX Hack to fix Issue #2013
+ from amqp import Connection, Message
+ if isinstance(conn.connection, Connection):
+ self.amqp['basic.publish'] = Spec(('msg', Message),
+ ('exchange', str),
+ ('routing_key', str),
+ ('mandatory', bool, 'no'),
+ ('immediate', bool, 'no'))
+
def __init__(self, *args, **kwargs):
self.connect = kwargs.pop('connect')
self.silent = kwargs.pop('silent', False)
@@ -296,6 +306,7 @@ class AMQShell(cmd.Cmd):
def _reconnect(self):
"""Re-establish connection to the AMQP server."""
self.conn = self.connect(self.conn)
+ self._prepare_spec(self.conn)
self.chan = self.conn.default_channel
self.needs_reconnect = False
diff --git a/celery/bin/beat.py b/celery/bin/beat.py
index 6b5b734..4bcbc62 100644
--- a/celery/bin/beat.py
+++ b/celery/bin/beat.py
@@ -87,9 +87,9 @@ class beat(Command):
default=c.CELERYBEAT_SCHEDULE_FILENAME),
Option('--max-interval', type='float'),
Option('-S', '--scheduler', dest='scheduler_cls'),
- Option('-l', '--loglevel', default=c.CELERYBEAT_LOG_LEVEL))
- + daemon_options(default_pidfile='celerybeat.pid')
- + tuple(self.app.user_options['beat'])
+ Option('-l', '--loglevel', default=c.CELERYBEAT_LOG_LEVEL)) +
+ daemon_options(default_pidfile='celerybeat.pid') +
+ tuple(self.app.user_options['beat'])
)
diff --git a/celery/bin/celery.py b/celery/bin/celery.py
index 10d7c03..3af8afa 100644
--- a/celery/bin/celery.py
+++ b/celery/bin/celery.py
@@ -115,7 +115,8 @@ class list_(Command):
except NotImplementedError:
raise self.Error('Your transport cannot list bindings.')
- fmt = lambda q, e, r: self.out('{0:<28} {1:<28} {2}'.format(q, e, r))
+ def fmt(q, e, r):
+ return self.out('{0:<28} {1:<28} {2}'.format(q, e, r))
fmt('Queue', 'Exchange', 'Routing Key')
fmt('-' * 16, '-' * 16, '-' * 16)
for b in bindings:
diff --git a/celery/bin/events.py b/celery/bin/events.py
index d987505..8cc61b6 100644
--- a/celery/bin/events.py
+++ b/celery/bin/events.py
@@ -125,9 +125,9 @@ class events(Command):
Option('-F', '--frequency', '--freq',
type='float', default=1.0),
Option('-r', '--maxrate'),
- Option('-l', '--loglevel', default='INFO'))
- + daemon_options(default_pidfile='celeryev.pid')
- + tuple(self.app.user_options['events'])
+ Option('-l', '--loglevel', default='INFO')) +
+ daemon_options(default_pidfile='celeryev.pid') +
+ tuple(self.app.user_options['events'])
)
diff --git a/celery/concurrency/asynpool.py b/celery/concurrency/asynpool.py
index 33e2bf6..0d5dfb0 100644
--- a/celery/concurrency/asynpool.py
+++ b/celery/concurrency/asynpool.py
@@ -576,7 +576,7 @@ class AsynPool(_pool.Pool):
def on_process_down(proc):
"""Called when a worker process exits."""
- if proc.dead:
+ if getattr(proc, 'dead', None):
return
process_flush_queues(proc)
_remove_from_index(
diff --git a/celery/concurrency/eventlet.py b/celery/concurrency/eventlet.py
index 00082dd..98bddcf 100644
--- a/celery/concurrency/eventlet.py
+++ b/celery/concurrency/eventlet.py
@@ -29,10 +29,10 @@ for mod in (mod for mod in sys.modules if mod.startswith(RACE_MODS)):
warnings.warn(RuntimeWarning(W_RACE % side))
-from celery import signals
-from celery.utils import timer2
+from celery import signals # noqa
+from celery.utils import timer2 # noqa
-from . import base
+from . import base # noqa
def apply_target(target, args=(), kwargs={}, callback=None,
diff --git a/celery/concurrency/prefork.py b/celery/concurrency/prefork.py
index b579d0e..b71eae3 100644
--- a/celery/concurrency/prefork.py
+++ b/celery/concurrency/prefork.py
@@ -159,7 +159,8 @@ class TaskPool(BasePool):
try:
write_stats = self._pool.human_write_stats
except AttributeError:
- write_stats = lambda: 'N/A' # only supported by asynpool
+ def write_stats():
+ return 'N/A' # only supported by asynpool
return {
'max-concurrency': self.limit,
'processes': [p.pid for p in self._pool._pool],
diff --git a/celery/events/cursesmon.py b/celery/events/cursesmon.py
index 796565f..dfeff2d 100644
--- a/celery/events/cursesmon.py
+++ b/celery/events/cursesmon.py
@@ -318,8 +318,8 @@ class CursesMonitor(object): # pragma: no cover
def alert_callback(my, mx, xs):
y = count(xs)
task = self.state.tasks[self.selected_task]
- result = (getattr(task, 'result', None)
- or getattr(task, 'exception', None))
+ result = (getattr(task, 'result', None) or
+ getattr(task, 'exception', None))
for line in wrap(result, mx - 2):
self.win.addstr(next(y), 3, line)
diff --git a/celery/five.py b/celery/five.py
index f7817d0..2406920 100644
--- a/celery/five.py
+++ b/celery/five.py
@@ -10,15 +10,14 @@
"""
from __future__ import absolute_import
-__all__ = ['Counter', 'reload', 'UserList', 'UserDict', 'Queue', 'Empty',
- 'zip_longest', 'map', 'string', 'string_t',
- 'long_t', 'text_t', 'range', 'int_types', 'items', 'keys', 'values',
- 'nextfun', 'reraise', 'WhateverIO', 'with_metaclass',
- 'OrderedDict', 'THREAD_TIMEOUT_MAX', 'format_d',
- 'class_property', 'reclassmethod', 'create_module',
- 'recreate_module', 'monotonic']
-
import io
+import operator
+import sys
+
+from importlib import import_module
+from types import ModuleType
+
+from kombu.five import monotonic
try:
from collections import Counter
@@ -28,8 +27,15 @@ except ImportError: # pragma: no cover
def Counter(): # noqa
return defaultdict(int)
+__all__ = ['Counter', 'reload', 'UserList', 'UserDict', 'Queue', 'Empty',
+ 'zip_longest', 'map', 'string', 'string_t',
+ 'long_t', 'text_t', 'range', 'int_types', 'items', 'keys', 'values',
+ 'nextfun', 'reraise', 'WhateverIO', 'with_metaclass',
+ 'OrderedDict', 'THREAD_TIMEOUT_MAX', 'format_d',
+ 'class_property', 'reclassmethod', 'create_module',
+ 'recreate_module', 'monotonic']
+
# ############# py3k #########################################################
-import sys
PY3 = sys.version_info[0] == 3
try:
@@ -48,8 +54,6 @@ except ImportError: # pragma: no cover
from collections import UserDict # noqa
-from kombu.five import monotonic
-
if PY3: # pragma: no cover
import builtins
@@ -175,8 +179,6 @@ else: # pragma: no cover
# recreate modules, either for lazy loading or
# to create old modules at runtime instead of
# having them litter the source tree.
-import operator
-import sys
# import fails in python 2.5. fallback to reduce in stdlib
try:
@@ -184,9 +186,6 @@ try:
except ImportError:
pass
-from importlib import import_module
-from types import ModuleType
-
MODULE_DEPRECATED = """
The module %s is deprecated and will be removed in a future version.
"""
diff --git a/celery/fixups/django.py b/celery/fixups/django.py
index 05c68d0..73c5c28 100644
--- a/celery/fixups/django.py
... 1702 lines suppressed ...
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/celery.git
More information about the Python-modules-commits
mailing list