[Python-modules-commits] [celery] 02/11: Import celery_3.1.23.orig.tar.gz
Brian May
bam at moszumanska.debian.org
Sun Apr 17 10:23:47 UTC 2016
This is an automated email from the git hooks/post-receive script.
bam pushed a commit to branch master
in repository celery.
commit 6753277798b7b79fec667e858b4cbf459c1c277c
Author: Brian May <bam at debian.org>
Date: Sun Apr 17 19:16:20 2016 +1000
Import celery_3.1.23.orig.tar.gz
---
Changelog | 137 +++++++++++++++++++++++++-----
PKG-INFO | 4 +-
README.rst | 2 +-
celery.egg-info/PKG-INFO | 4 +-
celery.egg-info/entry_points.txt | 2 +-
celery.egg-info/requires.txt | 70 +++++++--------
celery/__init__.py | 2 +-
celery/app/amqp.py | 10 +++
celery/app/control.py | 2 +-
celery/apps/worker.py | 5 +-
celery/backends/amqp.py | 5 +-
celery/backends/base.py | 23 ++++-
celery/backends/cache.py | 10 +++
celery/backends/cassandra.py | 3 +
celery/backends/couchbase.py | 14 +--
celery/backends/database/__init__.py | 8 +-
celery/backends/mongodb.py | 39 ++++++---
celery/backends/rpc.py | 3 +
celery/bin/celery.py | 35 ++++++--
celery/bin/celeryd_detach.py | 12 ++-
celery/concurrency/asynpool.py | 62 +++++++++-----
celery/concurrency/eventlet.py | 4 +-
celery/concurrency/gevent.py | 2 +-
celery/concurrency/prefork.py | 2 +
celery/contrib/batches.py | 2 +-
celery/datastructures.py | 2 +-
celery/tests/app/test_control.py | 18 ++--
celery/tests/backends/test_base.py | 18 ++++
celery/tests/backends/test_cache.py | 33 ++++++-
celery/tests/backends/test_mongodb.py | 62 ++++++++------
celery/tests/bin/test_celeryd_detach.py | 13 +--
celery/tests/bin/test_worker.py | 23 +----
celery/tests/case.py | 16 +++-
celery/tests/utils/test_datastructures.py | 2 +
celery/tests/worker/test_control.py | 6 +-
celery/tests/worker/test_heartbeat.py | 4 +-
celery/utils/__init__.py | 12 ++-
celery/worker/consumer.py | 4 +-
celery/worker/pidbox.py | 2 +-
celery/worker/state.py | 8 ++
docs/changelog.rst | 137 +++++++++++++++++++++++++-----
docs/configuration.rst | 7 +-
docs/includes/introduction.txt | 2 +-
docs/userguide/extending.rst | 16 ++--
docs/userguide/optimizing.rst | 25 ++++--
docs/userguide/workers.rst | 4 +-
docs/whatsnew-3.1.rst | 16 ++--
requirements/default.txt | 4 +-
setup.cfg | 4 +-
49 files changed, 640 insertions(+), 260 deletions(-)
diff --git a/Changelog b/Changelog
index a2130b1..44f2622 100644
--- a/Changelog
+++ b/Changelog
@@ -8,8 +8,99 @@ This document contains change notes for bugfix releases in the 3.1.x series
(Cipater), please see :ref:`whatsnew-3.1` for an overview of what's
new in Celery 3.1.
+.. _version-3.1.23:
+
+3.1.23
+======
+:release-date: 2016-03-09 06:00 P.M PST
+:release-by: Ask Solem
+
+- **Programs**: Last release broke support for the ``--hostnmame`` argument
+ to :program:`celery multi` and :program:`celery worker --detach`
+ (Issue #3103).
+
+- **Results**: MongoDB result backend could crash the worker at startup
+ if not configured using an URL.
+
+.. _version-3.1.22:
+
+3.1.22
+======
+:release-date: 2016-03-07 01:30 P.M PST
+:release-by: Ask Solem
+
+- **Programs**: The worker would crash immediately on startup on
+ ``backend.as_uri()`` when using some result backends (Issue #3094).
+
+- **Programs**: :program:`celery multi`/:program:`celery worker --detach`
+ would create an extraneous logfile including literal formats (e.g. ``%I``)
+ in the filename (Issue #3096).
+
+.. _version-3.1.21:
+
+3.1.21
+======
+:release-date: 2016-03-04 11:16 A.M PST
+:release-by: Ask Solem
+
+- **Requirements**
+
+ - Now depends on :ref:`Kombu 3.0.34 <kombu:version-3.0.34>`.
+
+ - Now depends on :mod:`billiard` 3.3.0.23.
+
+- **Prefork pool**: Fixes 100% CPU loop on Linux epoll (Issue #1845).
+
+ Also potential fix for: Issue #2142, Issue #2606
+
+- **Prefork pool**: Fixes memory leak related to processes exiting
+ (Issue #2927).
+
+- **Worker**: Fixes crash at startup when trying to censor passwords
+ in MongoDB and Cache result backend URLs (Issue #3079, Issue #3045,
+ Issue #3049, Issue #3068, Issue #3073).
+
+ Fix contributed by Maxime Verger.
+
+- **Task**: An exception is now raised if countdown/expires is less
+ than -2147483648 (Issue #3078).
+
+- **Programs**: :program:`celery shell --ipython` now compatible with newer
+ IPython versions.
+
+- **Programs**: The DuplicateNodeName warning emitted by inspect/control
+ now includes a list of the node names returned.
+
+ Contributed by Sebastian Kalinowski.
+
+- **Utils**: The ``.discard(item)`` method of
+ :class:`~celery.datastructures.LimitedSet` did not actually remove the item
+ (Issue #3087).
+
+ Fix contributed by Dave Smith.
+
+- **Worker**: Node name formatting now emits less confusing error message
+ for unmatched format keys (Issue #3016).
+
+- **Results**: amqp/rpc backends: Fixed deserialization of JSON exceptions
+ (Issue #2518).
+
+ Fix contributed by Allard Hoeve.
+
+- **Prefork pool**: The `process inqueue damaged` error message now includes
+ the original exception raised.
+
+- **Documentation**: Includes improvements by:
+
+ - Jeff Widman.
+
.. _version-3.1.20:
+3.1.20
+======
+:release-date: 2016-01-22 06:50 P.M UTC
+:release-by: Ask Solem
+
- **Requirements**
- Now depends on :ref:`Kombu 3.0.33 <kombu:version-3.0.33>`.
@@ -90,29 +181,29 @@ new in Celery 3.1.
- **Documentation**: Includes improvements by:
- Bryson
- Caleb Mingle
- Christopher Martin
- Dieter Adriaenssens
- Jason Veatch
- Jeremy Cline
- Juan Rossi
- Kevin Harvey
- Kevin McCarthy
- Kirill Pavlov
- Marco Buttu
- Mayflower
- Mher Movsisyan
- Michael Floering
- michael-k
- Nathaniel Varona
- Rudy Attias
- Ryan Luckie
- Steven Parker
- squfrans
- Tadej Janež
- TakesxiSximada
- Tom S
+ - Bryson
+ - Caleb Mingle
+ - Christopher Martin
+ - Dieter Adriaenssens
+ - Jason Veatch
+ - Jeremy Cline
+ - Juan Rossi
+ - Kevin Harvey
+ - Kevin McCarthy
+ - Kirill Pavlov
+ - Marco Buttu
+ - Mayflower
+ - Mher Movsisyan
+ - Michael Floering
+ - michael-k
+ - Nathaniel Varona
+ - Rudy Attias
+ - Ryan Luckie
+ - Steven Parker
+ - squfrans
+ - Tadej Janež
+ - TakesxiSximada
+ - Tom S
.. _version-3.1.19:
diff --git a/PKG-INFO b/PKG-INFO
index aaa9bc1..56ea82b 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,6 +1,6 @@
Metadata-Version: 1.1
Name: celery
-Version: 3.1.20
+Version: 3.1.23
Summary: Distributed Task Queue
Home-page: http://celeryproject.org
Author: Ask Solem
@@ -12,7 +12,7 @@ Description: =================================
.. image:: http://cloud.github.com/downloads/celery/celery/celery_128.png
- :Version: 3.1.20 (Cipater)
+ :Version: 3.1.23 (Cipater)
:Web: http://celeryproject.org/
:Download: http://pypi.python.org/pypi/celery/
:Source: http://github.com/celery/celery/
diff --git a/README.rst b/README.rst
index 8eb854b..e629b9f 100644
--- a/README.rst
+++ b/README.rst
@@ -4,7 +4,7 @@
.. image:: http://cloud.github.com/downloads/celery/celery/celery_128.png
-:Version: 3.1.20 (Cipater)
+:Version: 3.1.23 (Cipater)
:Web: http://celeryproject.org/
:Download: http://pypi.python.org/pypi/celery/
:Source: http://github.com/celery/celery/
diff --git a/celery.egg-info/PKG-INFO b/celery.egg-info/PKG-INFO
index aaa9bc1..56ea82b 100644
--- a/celery.egg-info/PKG-INFO
+++ b/celery.egg-info/PKG-INFO
@@ -1,6 +1,6 @@
Metadata-Version: 1.1
Name: celery
-Version: 3.1.20
+Version: 3.1.23
Summary: Distributed Task Queue
Home-page: http://celeryproject.org
Author: Ask Solem
@@ -12,7 +12,7 @@ Description: =================================
.. image:: http://cloud.github.com/downloads/celery/celery/celery_128.png
- :Version: 3.1.20 (Cipater)
+ :Version: 3.1.23 (Cipater)
:Web: http://celeryproject.org/
:Download: http://pypi.python.org/pypi/celery/
:Source: http://github.com/celery/celery/
diff --git a/celery.egg-info/entry_points.txt b/celery.egg-info/entry_points.txt
index 18210e3..26ac737 100644
--- a/celery.egg-info/entry_points.txt
+++ b/celery.egg-info/entry_points.txt
@@ -1,6 +1,6 @@
[console_scripts]
celery = celery.__main__:main
+celerybeat = celery.__main__:_compat_beat
celeryd = celery.__main__:_compat_worker
celeryd-multi = celery.__main__:_compat_multi
-celerybeat = celery.__main__:_compat_beat
diff --git a/celery.egg-info/requires.txt b/celery.egg-info/requires.txt
index e52f257..8c16787 100644
--- a/celery.egg-info/requires.txt
+++ b/celery.egg-info/requires.txt
@@ -1,57 +1,48 @@
pytz>dev
-billiard>=3.3.0.22,<3.4
-kombu>=3.0.33,<3.1
+billiard>=3.3.0.23,<3.4
+kombu>=3.0.34,<3.1
-[zookeeper]
-kazoo>=1.3.1
-
-[msgpack]
-msgpack-python>=0.3.0
-
-[librabbitmq]
-librabbitmq>=1.6.1
+[auth]
+pyOpenSSL
-[slmq]
-softlayer_messaging>=1.0.3
+[beanstalk]
+beanstalkc
-[eventlet]
-eventlet
+[cassandra]
+pycassa
[couchbase]
couchbase
-[redis]
-redis>=2.8.0
-
[couchdb]
couchdb
+[eventlet]
+eventlet
+
[gevent]
gevent
-[auth]
-pyOpenSSL
-
-[pyro]
-pyro4
-
-[yaml]
-PyYAML>=3.10
-
-[beanstalk]
-beanstalkc
+[librabbitmq]
+librabbitmq>=1.6.1
[memcache]
pylibmc
-[threads]
-threadpool
-
[mongodb]
pymongo>=2.6.2
-[zeromq]
-pyzmq>=13.1.0
+[msgpack]
+msgpack-python>=0.3.0
+
+[pyro]
+pyro4
+
+[redis]
+redis>=2.8.0
+
+[slmq]
+softlayer_messaging>=1.0.3
[sqlalchemy]
sqlalchemy
@@ -59,5 +50,14 @@ sqlalchemy
[sqs]
boto>=2.13.3
-[cassandra]
-pycassa
\ No newline at end of file
+[threads]
+threadpool
+
+[yaml]
+PyYAML>=3.10
+
+[zeromq]
+pyzmq>=13.1.0
+
+[zookeeper]
+kazoo>=1.3.1
diff --git a/celery/__init__.py b/celery/__init__.py
index 8918e1c..c2837a2 100644
--- a/celery/__init__.py
+++ b/celery/__init__.py
@@ -19,7 +19,7 @@ version_info_t = namedtuple(
)
SERIES = 'Cipater'
-VERSION = version_info_t(3, 1, 20, '', '')
+VERSION = version_info_t(3, 1, 23, '', '')
__version__ = '{0.major}.{0.minor}.{0.micro}{0.releaselevel}'.format(VERSION)
__author__ = 'Ask Solem'
__contact__ = 'ask at celeryproject.org'
diff --git a/celery/app/amqp.py b/celery/app/amqp.py
index 1d65841..27838c2 100644
--- a/celery/app/amqp.py
+++ b/celery/app/amqp.py
@@ -30,6 +30,9 @@ from . import routes as _routes
__all__ = ['AMQP', 'Queues', 'TaskProducer', 'TaskConsumer']
+#: earliest date supported by time.mktime.
+INT_MIN = -2147483648
+
#: Human readable queue declaration.
QUEUE_FORMAT = """
.> {0.name:<16} exchange={0.exchange.name}({0.exchange.type}) \
@@ -253,11 +256,13 @@ class TaskProducer(Producer):
if not isinstance(task_kwargs, dict):
raise ValueError('task kwargs must be a dictionary')
if countdown: # Convert countdown to ETA.
+ self._verify_seconds(countdown, 'countdown')
now = now or self.app.now()
eta = now + timedelta(seconds=countdown)
if self.utc:
eta = to_utc(eta).astimezone(self.app.timezone)
if isinstance(expires, numbers.Real):
+ self._verify_seconds(expires, 'expires')
now = now or self.app.now()
expires = now + timedelta(seconds=expires)
if self.utc:
@@ -338,6 +343,11 @@ class TaskProducer(Producer):
return task_id
delay_task = publish_task # XXX Compat
+ def _verify_seconds(self, s, what):
+ if s < INT_MIN:
+ raise ValueError('%s is out of range: %r' % (what, s))
+ return s
+
@cached_property
def event_dispatcher(self):
# We call Dispatcher.publish with a custom producer
diff --git a/celery/app/control.py b/celery/app/control.py
index 7058025..7258dd6 100644
--- a/celery/app/control.py
+++ b/celery/app/control.py
@@ -20,7 +20,7 @@ from celery.utils.text import pluralize
__all__ = ['Inspect', 'Control', 'flatten_reply']
W_DUPNODE = """\
-Received multiple replies from node name: {0!r}.
+Received multiple replies from node {0}: {1}.
Please make sure you give each node a unique nodename using the `-n` option.\
"""
diff --git a/celery/apps/worker.py b/celery/apps/worker.py
index 2859c6d..637a082 100644
--- a/celery/apps/worker.py
+++ b/celery/apps/worker.py
@@ -22,7 +22,6 @@ from functools import partial
from billiard import current_process
from kombu.utils.encoding import safe_str
-from kombu.utils.url import maybe_sanitize_url
from celery import VERSION_BANNER, platforms, signals
from celery.app import trace
@@ -228,9 +227,7 @@ class Worker(WorkController):
hostname=safe_str(self.hostname),
version=VERSION_BANNER,
conninfo=self.app.connection().as_uri(),
- results=maybe_sanitize_url(
- self.app.conf.CELERY_RESULT_BACKEND or 'disabled',
- ),
+ results=self.app.backend.as_uri(),
concurrency=concurrency,
platform=safe_str(_platform.platform()),
events=events,
diff --git a/celery/backends/amqp.py b/celery/backends/amqp.py
index a4a6e84..6e7f778 100644
--- a/celery/backends/amqp.py
+++ b/celery/backends/amqp.py
@@ -200,7 +200,7 @@ class AMQPBackend(BaseBackend):
def callback(meta, message):
if meta['status'] in states.READY_STATES:
- results[meta['task_id']] = meta
+ results[meta['task_id']] = self.meta_from_decoded(meta)
consumer.callbacks[:] = [callback]
time_start = now()
@@ -301,6 +301,9 @@ class AMQPBackend(BaseBackend):
raise NotImplementedError(
'delete_group is not supported by this backend.')
+ def as_uri(self, include_password=True):
+ return 'amqp://'
+
def __reduce__(self, args=(), kwargs={}):
kwargs.update(
connection=self._connection,
diff --git a/celery/backends/base.py b/celery/backends/base.py
index b11ecb4..03b6909 100644
--- a/celery/backends/base.py
+++ b/celery/backends/base.py
@@ -24,6 +24,7 @@ from kombu.serialization import (
registry as serializer_registry,
)
from kombu.utils.encoding import bytes_to_str, ensure_bytes, from_utf8
+from kombu.utils.url import maybe_sanitize_url
from celery import states
from celery import current_app, maybe_signature
@@ -92,8 +93,9 @@ class BaseBackend(object):
'interval_max': 1,
}
- def __init__(self, app, serializer=None,
- max_cached_results=None, accept=None, **kwargs):
+ def __init__(self, app,
+ serializer=None, max_cached_results=None, accept=None,
+ url=None, **kwargs):
self.app = app
conf = self.app.conf
self.serializer = serializer or conf.CELERY_RESULT_SERIALIZER
@@ -105,6 +107,16 @@ class BaseBackend(object):
self.accept = prepare_accept_content(
conf.CELERY_ACCEPT_CONTENT if accept is None else accept,
)
+ self.url = url
+
+ def as_uri(self, include_password=False):
+ """Return the backend as an URI, sanitizing the password or not"""
+ # when using maybe_sanitize_url(), "/" is added
+ # we're stripping it for consistency
+ if include_password:
+ return self.url
+ url = maybe_sanitize_url(self.url or '')
+ return url[:-1] if url.endswith(':///') else url
def mark_as_started(self, task_id, **meta):
"""Mark a task as started"""
@@ -603,4 +615,9 @@ class DisabledBackend(BaseBackend):
raise NotImplementedError(
'No result backend configured. '
'Please see the documentation for more information.')
- wait_for = get_status = get_result = get_traceback = _is_disabled
+
+ def as_uri(self, *args, **kwargs):
+ return 'disabled://'
+
+ get_state = get_status = get_result = get_traceback = _is_disabled
+ wait_for = get_many = _is_disabled
diff --git a/celery/backends/cache.py b/celery/backends/cache.py
index cfc9171..3c8230c 100644
--- a/celery/backends/cache.py
+++ b/celery/backends/cache.py
@@ -100,6 +100,7 @@ class CacheBackend(KeyValueStoreBackend):
def __init__(self, app, expires=None, backend=None,
options={}, url=None, **kwargs):
super(CacheBackend, self).__init__(app, **kwargs)
+ self.url = url
self.options = dict(self.app.conf.CELERY_CACHE_BACKEND_OPTIONS,
**options)
@@ -149,3 +150,12 @@ class CacheBackend(KeyValueStoreBackend):
expires=self.expires,
options=self.options))
return super(CacheBackend, self).__reduce__(args, kwargs)
+
+ def as_uri(self, *args, **kwargs):
+ """Return the backend as an URI.
+
+ This properly handles the case of multiple servers.
+
+ """
+ servers = ';'.join(self.servers)
+ return '{0}://{1}/'.format(self.backend, servers)
diff --git a/celery/backends/cassandra.py b/celery/backends/cassandra.py
index 663aade..79f17ee 100644
--- a/celery/backends/cassandra.py
+++ b/celery/backends/cassandra.py
@@ -158,6 +158,9 @@ class CassandraBackend(BaseBackend):
return self._retry_on_error(_do_store)
+ def as_uri(self, include_password=True):
+ return 'cassandra://'
+
def _get_task_meta_for(self, task_id):
"""Get task metadata for a task by id."""
diff --git a/celery/backends/couchbase.py b/celery/backends/couchbase.py
index 2d51b80..cd7555e 100644
--- a/celery/backends/couchbase.py
+++ b/celery/backends/couchbase.py
@@ -28,6 +28,12 @@ __all__ = ['CouchBaseBackend']
class CouchBaseBackend(KeyValueStoreBackend):
+ """CouchBase backend.
+
+ :raises celery.exceptions.ImproperlyConfigured: if
+ module :mod:`couchbase` is not available.
+
+ """
bucket = 'default'
host = 'localhost'
port = 8091
@@ -38,16 +44,10 @@ class CouchBaseBackend(KeyValueStoreBackend):
unlock_gil = True
timeout = 2.5
transcoder = None
- # supports_autoexpire = False
def __init__(self, url=None, *args, **kwargs):
- """Initialize CouchBase backend instance.
-
- :raises celery.exceptions.ImproperlyConfigured: if
- module :mod:`couchbase` is not available.
-
- """
super(CouchBaseBackend, self).__init__(*args, **kwargs)
+ self.url = url
self.expires = kwargs.get('expires') or maybe_timedelta(
self.app.conf.CELERY_TASK_RESULT_EXPIRES)
diff --git a/celery/backends/database/__init__.py b/celery/backends/database/__init__.py
index 2e08164..f47fdd5 100644
--- a/celery/backends/database/__init__.py
+++ b/celery/backends/database/__init__.py
@@ -86,7 +86,7 @@ class DatabaseBackend(BaseBackend):
super(DatabaseBackend, self).__init__(**kwargs)
conf = self.app.conf
self.expires = maybe_timedelta(self.prepare_expires(expires))
- self.dburi = url or dburi or conf.CELERY_RESULT_DBURI
+ self.url = url or dburi or conf.CELERY_RESULT_DBURI
self.engine_options = dict(
engine_options or {},
**conf.CELERY_RESULT_ENGINE_OPTIONS or {})
@@ -99,14 +99,14 @@ class DatabaseBackend(BaseBackend):
Task.__table__.name = tablenames.get('task', 'celery_taskmeta')
TaskSet.__table__.name = tablenames.get('group', 'celery_tasksetmeta')
- if not self.dburi:
+ if not self.url:
raise ImproperlyConfigured(
'Missing connection string! Do you have '
'CELERY_RESULT_DBURI set to a real value?')
def ResultSession(self, session_manager=SessionManager()):
return session_manager.session_factory(
- dburi=self.dburi,
+ dburi=self.url,
short_lived_sessions=self.short_lived_sessions,
**self.engine_options
)
@@ -195,7 +195,7 @@ class DatabaseBackend(BaseBackend):
def __reduce__(self, args=(), kwargs={}):
kwargs.update(
- dict(dburi=self.dburi,
+ dict(dburi=self.url,
expires=self.expires,
engine_options=self.engine_options))
return super(DatabaseBackend, self).__reduce__(args, kwargs)
diff --git a/celery/backends/mongodb.py b/celery/backends/mongodb.py
index 2bc8e0e..281c38c 100644
--- a/celery/backends/mongodb.py
+++ b/celery/backends/mongodb.py
@@ -12,6 +12,7 @@ from datetime import datetime
from kombu.syn import detect_environment
from kombu.utils import cached_property
+from kombu.utils.url import maybe_sanitize_url
from celery import states
from celery.exceptions import ImproperlyConfigured
@@ -37,6 +38,13 @@ __all__ = ['MongoBackend']
class MongoBackend(BaseBackend):
+ """MongoDB result backend.
+
+ :raises celery.exceptions.ImproperlyConfigured: if
+ module :mod:`pymongo` is not available.
+
+ """
+
host = 'localhost'
port = 27017
user = None
@@ -51,12 +59,6 @@ class MongoBackend(BaseBackend):
_connection = None
def __init__(self, app=None, url=None, **kwargs):
- """Initialize MongoDB backend instance.
-
- :raises celery.exceptions.ImproperlyConfigured: if
- module :mod:`pymongo` is not available.
-
- """
self.options = {}
super(MongoBackend, self).__init__(app, **kwargs)
self.expires = kwargs.get('expires') or maybe_timedelta(
@@ -196,11 +198,11 @@ class MongoBackend(BaseBackend):
self.collection.remove({'_id': group_id})
def _forget(self, task_id):
- """
- Remove result from MongoDB.
+ """Remove result from MongoDB.
+
+ :raises celery.exceptions.OperationsError:
+ if the task_id could not be removed.
- :raises celery.exceptions.OperationsError: if the task_id could not be
- removed.
"""
# By using safe=True, this will wait until it receives a response from
# the server. Likewise, it will raise an OperationsError if the
@@ -243,3 +245,20 @@ class MongoBackend(BaseBackend):
# in the background. Once completed cleanup will be much faster
collection.ensure_index('date_done', background='true')
return collection
+
+ def as_uri(self, include_password=False):
+ """Return the backend as an URI.
+
+ :keyword include_password: Censor passwords.
+
+ """
+ if not self.url:
+ return 'mongodb://'
+ if include_password:
+ return self.url
+
+ if ',' not in self.url:
+ return maybe_sanitize_url(self.url)
+
+ uri1, remainder = self.url.split(',', 1)
+ return ','.join([maybe_sanitize_url(uri1), remainder])
diff --git a/celery/backends/rpc.py b/celery/backends/rpc.py
index 28d5426..92bcc61 100644
--- a/celery/backends/rpc.py
+++ b/celery/backends/rpc.py
@@ -54,6 +54,9 @@ class RPCBackend(amqp.AMQPBackend):
def on_reply_declare(self, task_id):
pass
+ def as_uri(self, include_password=True):
+ return 'rpc://'
+
@property
def binding(self):
return self.Queue(self.oid, self.exchange, self.oid,
diff --git a/celery/bin/celery.py b/celery/bin/celery.py
index 3af8afa..4676b30 100644
--- a/celery/bin/celery.py
+++ b/celery/bin/celery.py
@@ -613,12 +613,35 @@ class shell(Command): # pragma: no cover
code.interact(local=self.locals)
def invoke_ipython_shell(self):
- try:
- from IPython.terminal import embed
- embed.TerminalInteractiveShell(user_ns=self.locals).mainloop()
- except ImportError: # ipython < 0.11
- from IPython.Shell import IPShell
- IPShell(argv=[], user_ns=self.locals).mainloop()
+ for ip in (self._ipython, self._ipython_pre_10,
+ self._ipython_terminal, self._ipython_010,
+ self._no_ipython):
+ try:
+ return ip()
+ except ImportError:
+ pass
+
+ def _ipython(self):
+ from IPython import start_ipython
+ start_ipython(argv=[], user_ns=self.locals)
+
+ def _ipython_pre_10(self): # pragma: no cover
+ from IPython.frontend.terminal.ipapp import TerminalIPythonApp
+ app = TerminalIPythonApp.instance()
+ app.initialize(argv=[])
+ app.shell.user_ns.update(self.locals)
+ app.start()
+
+ def _ipython_terminal(self): # pragma: no cover
+ from IPython.terminal import embed
+ embed.TerminalInteractiveShell(user_ns=self.locals).mainloop()
+
+ def _ipython_010(self): # pragma: no cover
+ from IPython.Shell import IPShell
+ IPShell(argv=[], user_ns=self.locals).mainloop()
+
+ def _no_ipython(self): # pragma: no cover
+ raise ImportError("no suitable ipython found")
def invoke_bpython_shell(self):
import bpython
diff --git a/celery/bin/celeryd_detach.py b/celery/bin/celeryd_detach.py
index d9d6141..4d37d5f 100644
--- a/celery/bin/celeryd_detach.py
+++ b/celery/bin/celeryd_detach.py
@@ -19,6 +19,7 @@ import sys
from optparse import OptionParser, BadOptionError
from celery.platforms import EX_FAILURE, detached
+from celery.utils import default_nodename, node_format
from celery.utils.log import get_logger
from celery.bin.base import daemon_options, Option
@@ -31,6 +32,7 @@ C_FAKEFORK = os.environ.get('C_FAKEFORK')
OPTION_LIST = daemon_options(default_pidfile='celeryd.pid') + (
Option('--workdir', default=None, dest='working_directory'),
+ Option('-n', '--hostname'),
Option('--fake',
default=False, action='store_true', dest='fake',
help="Don't fork (for debugging purposes)"),
@@ -39,7 +41,10 @@ OPTION_LIST = daemon_options(default_pidfile='celeryd.pid') + (
def detach(path, argv, logfile=None, pidfile=None, uid=None,
gid=None, umask=None, working_directory=None, fake=False, app=None,
- executable=None):
+ executable=None, hostname=None):
+ hostname = default_nodename(hostname)
+ logfile = node_format(logfile, hostname)
+ pidfile = node_format(pidfile, hostname)
fake = 1 if C_FAKEFORK else fake
with detached(logfile, pidfile, uid, gid, umask, working_directory, fake,
after_forkers=False):
@@ -51,7 +56,8 @@ def detach(path, argv, logfile=None, pidfile=None, uid=None,
if app is None:
from celery import current_app
app = current_app
- app.log.setup_logging_subsystem('ERROR', logfile)
+ app.log.setup_logging_subsystem(
+ 'ERROR', logfile, hostname=hostname)
logger.critical("Can't exec %r", ' '.join([path] + argv),
exc_info=True)
return EX_FAILURE
@@ -143,6 +149,8 @@ class detached_celeryd(object):
parser.leftovers.append('--logfile={0}'.format(options.logfile))
if options.pidfile:
parser.leftovers.append('--pidfile={0}'.format(options.pidfile))
+ if options.hostname:
+ parser.leftovers.append('--hostname={0}'.format(options.hostname))
return options, values, parser.leftovers
def execute_from_commandline(self, argv=None):
diff --git a/celery/concurrency/asynpool.py b/celery/concurrency/asynpool.py
index 3283597..bc29d9c 100644
--- a/celery/concurrency/asynpool.py
+++ b/celery/concurrency/asynpool.py
@@ -19,6 +19,7 @@
from __future__ import absolute_import
import errno
+import gc
import os
import select
import socket
@@ -409,11 +410,32 @@ class AsynPool(_pool.Pool):
self.on_soft_timeout = self._timeout_handler.on_soft_timeout
self.on_hard_timeout = self._timeout_handler.on_hard_timeout
- def _event_process_exit(self, hub, fd):
+ def _create_worker_process(self, i):
+ gc.collect() # Issue #2927
+ return super(AsynPool, self)._create_worker_process(i)
+
+ def _event_process_exit(self, hub, proc):
# This method is called whenever the process sentinel is readable.
- hub.remove(fd)
+ self._untrack_child_process(proc, hub)
self.maintain_pool()
+ def _track_child_process(self, proc, hub):
+ try:
+ fd = proc._sentinel_poll
+ except AttributeError:
+ # we need to duplicate the fd here to carefully
+ # control when the fd is removed from the process table,
+ # as once the original fd is closed we cannot unregister
+ # the fd from epoll(7) anymore, causing a 100% CPU poll loop.
+ fd = proc._sentinel_poll = os.dup(proc._popen.sentinel)
+ hub.add_reader(fd, self._event_process_exit, hub, proc)
+
+ def _untrack_child_process(self, proc, hub):
+ if proc._sentinel_poll is not None:
+ fd, proc._sentinel_poll = proc._sentinel_poll, None
+ hub.remove(fd)
+ os.close(fd)
+
def register_with_event_loop(self, hub):
"""Registers the async pool with the current event loop."""
self._result_handler.register_with_event_loop(hub)
@@ -423,8 +445,7 @@ class AsynPool(_pool.Pool):
self._create_write_handlers(hub)
# Add handler for when a process exits (calls maintain_pool)
- [hub.add_reader(fd, self._event_process_exit, hub, fd)
- for fd in self.process_sentinels]
+ [self._track_child_process(w, hub) for w in self._pool]
# Handle_result_event is called whenever one of the
# result queues are readable.
[hub.add_reader(fd, self.handle_result_event, fd)
@@ -511,13 +532,14 @@ class AsynPool(_pool.Pool):
fileno_to_outq = self._fileno_to_outq
fileno_to_synq = self._fileno_to_synq
busy_workers = self._busy_workers
- event_process_exit = self._event_process_exit
handle_result_event = self.handle_result_event
process_flush_queues = self.process_flush_queues
waiting_to_start = self._waiting_to_start
def verify_process_alive(proc):
- if proc._is_alive() and proc in waiting_to_start:
+ proc = proc() # is a weakref
+ if (proc is not None and proc._is_alive() and
+ proc in waiting_to_start):
assert proc.outqR_fd in fileno_to_outq
assert fileno_to_outq[proc.outqR_fd] is proc
assert proc.outqR_fd in hub.readers
@@ -537,10 +559,9 @@ class AsynPool(_pool.Pool):
if job._scheduled_for and job._scheduled_for.inqW_fd == infd:
job._scheduled_for = proc
fileno_to_outq[proc.outqR_fd] = proc
+
# maintain_pool is called whenever a process exits.
- add_reader(
- proc.sentinel, event_process_exit, hub, proc.sentinel,
- )
+ self._track_child_process(proc, hub)
assert not isblocking(proc.outq._reader)
@@ -550,7 +571,7 @@ class AsynPool(_pool.Pool):
waiting_to_start.add(proc)
hub.call_later(
- self._proc_alive_timeout, verify_process_alive, proc,
+ self._proc_alive_timeout, verify_process_alive, ref(proc),
)
self.on_process_up = on_process_up
@@ -594,16 +615,16 @@ class AsynPool(_pool.Pool):
)
if inq:
busy_workers.discard(inq)
- remove_reader(proc.sentinel)
+ self._untrack_child_process(proc, hub)
waiting_to_start.discard(proc)
self._active_writes.discard(proc.inqW_fd)
- remove_writer(proc.inqW_fd)
- remove_reader(proc.outqR_fd)
+ remove_writer(proc.inq._writer)
+ remove_reader(proc.outq._reader)
if proc.synqR_fd:
- remove_reader(proc.synqR_fd)
+ remove_reader(proc.synq._reader)
if proc.synqW_fd:
self._active_writes.discard(proc.synqW_fd)
- remove_reader(proc.synqW_fd)
+ remove_reader(proc.synq._writer)
self.on_process_down = on_process_down
def _create_write_handlers(self, hub,
@@ -778,8 +799,9 @@ class AsynPool(_pool.Pool):
append_message(job)
self._quick_put = send_job
- def on_not_recovering(proc, fd, job):
- error('Process inqueue damaged: %r %r' % (proc, proc.exitcode))
+ def on_not_recovering(proc, fd, job, exc):
+ error('Process inqueue damaged: %r %r: %r',
+ proc, proc.exitcode, exc, exc_info=1)
if proc._is_alive():
proc.terminate()
hub.remove(fd)
@@ -789,7 +811,7 @@ class AsynPool(_pool.Pool):
# writes job to the worker process.
# Operation must complete if more than one byte of data
# was written. If the broker connection is lost
- # and no data was written the operation shall be cancelled.
+ # and no data was written the operation shall be canceled.
header, body, body_size = job._payload
errors = 0
try:
@@ -808,7 +830,7 @@ class AsynPool(_pool.Pool):
# suspend until more data
errors += 1
if errors > 100:
- on_not_recovering(proc, fd, job)
+ on_not_recovering(proc, fd, job, exc)
raise StopIteration()
yield
else:
@@ -824,7 +846,7 @@ class AsynPool(_pool.Pool):
# suspend until more data
errors += 1
... 973 lines suppressed ...
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/celery.git
More information about the Python-modules-commits
mailing list