[Python-modules-commits] [celery] 02/11: Import celery_3.1.23.orig.tar.gz

Brian May bam at moszumanska.debian.org
Sun Apr 17 10:23:47 UTC 2016


This is an automated email from the git hooks/post-receive script.

bam pushed a commit to branch master
in repository celery.

commit 6753277798b7b79fec667e858b4cbf459c1c277c
Author: Brian May <bam at debian.org>
Date:   Sun Apr 17 19:16:20 2016 +1000

    Import celery_3.1.23.orig.tar.gz
---
 Changelog                                 | 137 +++++++++++++++++++++++++-----
 PKG-INFO                                  |   4 +-
 README.rst                                |   2 +-
 celery.egg-info/PKG-INFO                  |   4 +-
 celery.egg-info/entry_points.txt          |   2 +-
 celery.egg-info/requires.txt              |  70 +++++++--------
 celery/__init__.py                        |   2 +-
 celery/app/amqp.py                        |  10 +++
 celery/app/control.py                     |   2 +-
 celery/apps/worker.py                     |   5 +-
 celery/backends/amqp.py                   |   5 +-
 celery/backends/base.py                   |  23 ++++-
 celery/backends/cache.py                  |  10 +++
 celery/backends/cassandra.py              |   3 +
 celery/backends/couchbase.py              |  14 +--
 celery/backends/database/__init__.py      |   8 +-
 celery/backends/mongodb.py                |  39 ++++++---
 celery/backends/rpc.py                    |   3 +
 celery/bin/celery.py                      |  35 ++++++--
 celery/bin/celeryd_detach.py              |  12 ++-
 celery/concurrency/asynpool.py            |  62 +++++++++-----
 celery/concurrency/eventlet.py            |   4 +-
 celery/concurrency/gevent.py              |   2 +-
 celery/concurrency/prefork.py             |   2 +
 celery/contrib/batches.py                 |   2 +-
 celery/datastructures.py                  |   2 +-
 celery/tests/app/test_control.py          |  18 ++--
 celery/tests/backends/test_base.py        |  18 ++++
 celery/tests/backends/test_cache.py       |  33 ++++++-
 celery/tests/backends/test_mongodb.py     |  62 ++++++++------
 celery/tests/bin/test_celeryd_detach.py   |  13 +--
 celery/tests/bin/test_worker.py           |  23 +----
 celery/tests/case.py                      |  16 +++-
 celery/tests/utils/test_datastructures.py |   2 +
 celery/tests/worker/test_control.py       |   6 +-
 celery/tests/worker/test_heartbeat.py     |   4 +-
 celery/utils/__init__.py                  |  12 ++-
 celery/worker/consumer.py                 |   4 +-
 celery/worker/pidbox.py                   |   2 +-
 celery/worker/state.py                    |   8 ++
 docs/changelog.rst                        | 137 +++++++++++++++++++++++++-----
 docs/configuration.rst                    |   7 +-
 docs/includes/introduction.txt            |   2 +-
 docs/userguide/extending.rst              |  16 ++--
 docs/userguide/optimizing.rst             |  25 ++++--
 docs/userguide/workers.rst                |   4 +-
 docs/whatsnew-3.1.rst                     |  16 ++--
 requirements/default.txt                  |   4 +-
 setup.cfg                                 |   4 +-
 49 files changed, 640 insertions(+), 260 deletions(-)

diff --git a/Changelog b/Changelog
index a2130b1..44f2622 100644
--- a/Changelog
+++ b/Changelog
@@ -8,8 +8,99 @@ This document contains change notes for bugfix releases in the 3.1.x series
 (Cipater), please see :ref:`whatsnew-3.1` for an overview of what's
 new in Celery 3.1.
 
+.. _version-3.1.23:
+
+3.1.23
+======
+:release-date: 2016-03-09 06:00 P.M PST
+:release-by: Ask Solem
+
+- **Programs**: Last release broke support for the ``--hostnmame`` argument
+  to :program:`celery multi` and :program:`celery worker --detach`
+  (Issue #3103).
+
+- **Results**: MongoDB result backend could crash the worker at startup
+  if not configured using an URL.
+
+.. _version-3.1.22:
+
+3.1.22
+======
+:release-date: 2016-03-07 01:30 P.M PST
+:release-by: Ask Solem
+
+- **Programs**: The worker would crash immediately on startup on
+  ``backend.as_uri()`` when using some result backends (Issue #3094).
+
+- **Programs**: :program:`celery multi`/:program:`celery worker --detach`
+  would create an extraneous logfile including literal formats (e.g. ``%I``)
+  in the filename (Issue #3096).
+
+.. _version-3.1.21:
+
+3.1.21
+======
+:release-date: 2016-03-04 11:16 A.M PST
+:release-by: Ask Solem
+
+- **Requirements**
+
+    - Now depends on :ref:`Kombu 3.0.34 <kombu:version-3.0.34>`.
+
+    - Now depends on :mod:`billiard` 3.3.0.23.
+
+- **Prefork pool**: Fixes 100% CPU loop on Linux epoll (Issue #1845).
+
+    Also potential fix for: Issue #2142, Issue #2606
+
+- **Prefork pool**: Fixes memory leak related to processes exiting
+  (Issue #2927).
+
+- **Worker**: Fixes crash at startup when trying to censor passwords
+  in MongoDB and Cache result backend URLs (Issue #3079, Issue #3045,
+  Issue #3049, Issue #3068, Issue #3073).
+
+    Fix contributed by Maxime Verger.
+
+- **Task**: An exception is now raised if countdown/expires is less
+  than -2147483648 (Issue #3078).
+
+- **Programs**: :program:`celery shell --ipython` now compatible with newer
+  IPython versions.
+
+- **Programs**: The DuplicateNodeName warning emitted by inspect/control
+  now includes a list of the node names returned.
+
+    Contributed by Sebastian Kalinowski.
+
+- **Utils**: The ``.discard(item)`` method of
+  :class:`~celery.datastructures.LimitedSet` did not actually remove the item
+  (Issue #3087).
+
+    Fix contributed by Dave Smith.
+
+- **Worker**: Node name formatting now emits less confusing error message
+  for unmatched format keys (Issue #3016).
+
+- **Results**: amqp/rpc backends: Fixed deserialization of JSON exceptions
+  (Issue #2518).
+
+    Fix contributed by Allard Hoeve.
+
+- **Prefork pool**: The `process inqueue damaged` error message now includes
+  the original exception raised.
+
+- **Documentation**: Includes improvements by:
+
+    - Jeff Widman.
+
 .. _version-3.1.20:
 
+3.1.20
+======
+:release-date: 2016-01-22 06:50 P.M UTC
+:release-by: Ask Solem
+
 - **Requirements**
 
     - Now depends on :ref:`Kombu 3.0.33 <kombu:version-3.0.33>`.
@@ -90,29 +181,29 @@ new in Celery 3.1.
 
 - **Documentation**: Includes improvements by:
 
-    Bryson
-    Caleb Mingle
-    Christopher Martin
-    Dieter Adriaenssens
-    Jason Veatch
-    Jeremy Cline
-    Juan Rossi
-    Kevin Harvey
-    Kevin McCarthy
-    Kirill Pavlov
-    Marco Buttu
-    Mayflower
-    Mher Movsisyan
-    Michael Floering
-    michael-k
-    Nathaniel Varona
-    Rudy Attias
-    Ryan Luckie
-    Steven Parker
-    squfrans
-    Tadej Janež
-    TakesxiSximada
-    Tom S
+    - Bryson
+    - Caleb Mingle
+    - Christopher Martin
+    - Dieter Adriaenssens
+    - Jason Veatch
+    - Jeremy Cline
+    - Juan Rossi
+    - Kevin Harvey
+    - Kevin McCarthy
+    - Kirill Pavlov
+    - Marco Buttu
+    - Mayflower
+    - Mher Movsisyan
+    - Michael Floering
+    - michael-k
+    - Nathaniel Varona
+    - Rudy Attias
+    - Ryan Luckie
+    - Steven Parker
+    - squfrans
+    - Tadej Janež
+    - TakesxiSximada
+    - Tom S
 
 .. _version-3.1.19:
 
diff --git a/PKG-INFO b/PKG-INFO
index aaa9bc1..56ea82b 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,6 +1,6 @@
 Metadata-Version: 1.1
 Name: celery
-Version: 3.1.20
+Version: 3.1.23
 Summary: Distributed Task Queue
 Home-page: http://celeryproject.org
 Author: Ask Solem
@@ -12,7 +12,7 @@ Description: =================================
         
         .. image:: http://cloud.github.com/downloads/celery/celery/celery_128.png
         
-        :Version: 3.1.20 (Cipater)
+        :Version: 3.1.23 (Cipater)
         :Web: http://celeryproject.org/
         :Download: http://pypi.python.org/pypi/celery/
         :Source: http://github.com/celery/celery/
diff --git a/README.rst b/README.rst
index 8eb854b..e629b9f 100644
--- a/README.rst
+++ b/README.rst
@@ -4,7 +4,7 @@
 
 .. image:: http://cloud.github.com/downloads/celery/celery/celery_128.png
 
-:Version: 3.1.20 (Cipater)
+:Version: 3.1.23 (Cipater)
 :Web: http://celeryproject.org/
 :Download: http://pypi.python.org/pypi/celery/
 :Source: http://github.com/celery/celery/
diff --git a/celery.egg-info/PKG-INFO b/celery.egg-info/PKG-INFO
index aaa9bc1..56ea82b 100644
--- a/celery.egg-info/PKG-INFO
+++ b/celery.egg-info/PKG-INFO
@@ -1,6 +1,6 @@
 Metadata-Version: 1.1
 Name: celery
-Version: 3.1.20
+Version: 3.1.23
 Summary: Distributed Task Queue
 Home-page: http://celeryproject.org
 Author: Ask Solem
@@ -12,7 +12,7 @@ Description: =================================
         
         .. image:: http://cloud.github.com/downloads/celery/celery/celery_128.png
         
-        :Version: 3.1.20 (Cipater)
+        :Version: 3.1.23 (Cipater)
         :Web: http://celeryproject.org/
         :Download: http://pypi.python.org/pypi/celery/
         :Source: http://github.com/celery/celery/
diff --git a/celery.egg-info/entry_points.txt b/celery.egg-info/entry_points.txt
index 18210e3..26ac737 100644
--- a/celery.egg-info/entry_points.txt
+++ b/celery.egg-info/entry_points.txt
@@ -1,6 +1,6 @@
 [console_scripts]
 celery = celery.__main__:main
+celerybeat = celery.__main__:_compat_beat
 celeryd = celery.__main__:_compat_worker
 celeryd-multi = celery.__main__:_compat_multi
-celerybeat = celery.__main__:_compat_beat
 
diff --git a/celery.egg-info/requires.txt b/celery.egg-info/requires.txt
index e52f257..8c16787 100644
--- a/celery.egg-info/requires.txt
+++ b/celery.egg-info/requires.txt
@@ -1,57 +1,48 @@
 pytz>dev
-billiard>=3.3.0.22,<3.4
-kombu>=3.0.33,<3.1
+billiard>=3.3.0.23,<3.4
+kombu>=3.0.34,<3.1
 
-[zookeeper]
-kazoo>=1.3.1
-
-[msgpack]
-msgpack-python>=0.3.0
-
-[librabbitmq]
-librabbitmq>=1.6.1
+[auth]
+pyOpenSSL
 
-[slmq]
-softlayer_messaging>=1.0.3
+[beanstalk]
+beanstalkc
 
-[eventlet]
-eventlet
+[cassandra]
+pycassa
 
 [couchbase]
 couchbase
 
-[redis]
-redis>=2.8.0
-
 [couchdb]
 couchdb
 
+[eventlet]
+eventlet
+
 [gevent]
 gevent
 
-[auth]
-pyOpenSSL
-
-[pyro]
-pyro4
-
-[yaml]
-PyYAML>=3.10
-
-[beanstalk]
-beanstalkc
+[librabbitmq]
+librabbitmq>=1.6.1
 
 [memcache]
 pylibmc
 
-[threads]
-threadpool
-
 [mongodb]
 pymongo>=2.6.2
 
-[zeromq]
-pyzmq>=13.1.0
+[msgpack]
+msgpack-python>=0.3.0
+
+[pyro]
+pyro4
+
+[redis]
+redis>=2.8.0
+
+[slmq]
+softlayer_messaging>=1.0.3
 
 [sqlalchemy]
 sqlalchemy
@@ -59,5 +50,14 @@ sqlalchemy
 [sqs]
 boto>=2.13.3
 
-[cassandra]
-pycassa
\ No newline at end of file
+[threads]
+threadpool
+
+[yaml]
+PyYAML>=3.10
+
+[zeromq]
+pyzmq>=13.1.0
+
+[zookeeper]
+kazoo>=1.3.1
diff --git a/celery/__init__.py b/celery/__init__.py
index 8918e1c..c2837a2 100644
--- a/celery/__init__.py
+++ b/celery/__init__.py
@@ -19,7 +19,7 @@ version_info_t = namedtuple(
 )
 
 SERIES = 'Cipater'
-VERSION = version_info_t(3, 1, 20, '', '')
+VERSION = version_info_t(3, 1, 23, '', '')
 __version__ = '{0.major}.{0.minor}.{0.micro}{0.releaselevel}'.format(VERSION)
 __author__ = 'Ask Solem'
 __contact__ = 'ask at celeryproject.org'
diff --git a/celery/app/amqp.py b/celery/app/amqp.py
index 1d65841..27838c2 100644
--- a/celery/app/amqp.py
+++ b/celery/app/amqp.py
@@ -30,6 +30,9 @@ from . import routes as _routes
 
 __all__ = ['AMQP', 'Queues', 'TaskProducer', 'TaskConsumer']
 
+#: earliest date supported by time.mktime.
+INT_MIN = -2147483648
+
 #: Human readable queue declaration.
 QUEUE_FORMAT = """
 .> {0.name:<16} exchange={0.exchange.name}({0.exchange.type}) \
@@ -253,11 +256,13 @@ class TaskProducer(Producer):
         if not isinstance(task_kwargs, dict):
             raise ValueError('task kwargs must be a dictionary')
         if countdown:  # Convert countdown to ETA.
+            self._verify_seconds(countdown, 'countdown')
             now = now or self.app.now()
             eta = now + timedelta(seconds=countdown)
             if self.utc:
                 eta = to_utc(eta).astimezone(self.app.timezone)
         if isinstance(expires, numbers.Real):
+            self._verify_seconds(expires, 'expires')
             now = now or self.app.now()
             expires = now + timedelta(seconds=expires)
             if self.utc:
@@ -338,6 +343,11 @@ class TaskProducer(Producer):
         return task_id
     delay_task = publish_task   # XXX Compat
 
+    def _verify_seconds(self, s, what):
+        if s < INT_MIN:
+            raise ValueError('%s is out of range: %r' % (what, s))
+        return s
+
     @cached_property
     def event_dispatcher(self):
         # We call Dispatcher.publish with a custom producer
diff --git a/celery/app/control.py b/celery/app/control.py
index 7058025..7258dd6 100644
--- a/celery/app/control.py
+++ b/celery/app/control.py
@@ -20,7 +20,7 @@ from celery.utils.text import pluralize
 __all__ = ['Inspect', 'Control', 'flatten_reply']
 
 W_DUPNODE = """\
-Received multiple replies from node name: {0!r}.
+Received multiple replies from node {0}: {1}.
 Please make sure you give each node a unique nodename using the `-n` option.\
 """
 
diff --git a/celery/apps/worker.py b/celery/apps/worker.py
index 2859c6d..637a082 100644
--- a/celery/apps/worker.py
+++ b/celery/apps/worker.py
@@ -22,7 +22,6 @@ from functools import partial
 
 from billiard import current_process
 from kombu.utils.encoding import safe_str
-from kombu.utils.url import maybe_sanitize_url
 
 from celery import VERSION_BANNER, platforms, signals
 from celery.app import trace
@@ -228,9 +227,7 @@ class Worker(WorkController):
             hostname=safe_str(self.hostname),
             version=VERSION_BANNER,
             conninfo=self.app.connection().as_uri(),
-            results=maybe_sanitize_url(
-                self.app.conf.CELERY_RESULT_BACKEND or 'disabled',
-            ),
+            results=self.app.backend.as_uri(),
             concurrency=concurrency,
             platform=safe_str(_platform.platform()),
             events=events,
diff --git a/celery/backends/amqp.py b/celery/backends/amqp.py
index a4a6e84..6e7f778 100644
--- a/celery/backends/amqp.py
+++ b/celery/backends/amqp.py
@@ -200,7 +200,7 @@ class AMQPBackend(BaseBackend):
 
         def callback(meta, message):
             if meta['status'] in states.READY_STATES:
-                results[meta['task_id']] = meta
+                results[meta['task_id']] = self.meta_from_decoded(meta)
 
         consumer.callbacks[:] = [callback]
         time_start = now()
@@ -301,6 +301,9 @@ class AMQPBackend(BaseBackend):
         raise NotImplementedError(
             'delete_group is not supported by this backend.')
 
+    def as_uri(self, include_password=True):
+        return 'amqp://'
+
     def __reduce__(self, args=(), kwargs={}):
         kwargs.update(
             connection=self._connection,
diff --git a/celery/backends/base.py b/celery/backends/base.py
index b11ecb4..03b6909 100644
--- a/celery/backends/base.py
+++ b/celery/backends/base.py
@@ -24,6 +24,7 @@ from kombu.serialization import (
     registry as serializer_registry,
 )
 from kombu.utils.encoding import bytes_to_str, ensure_bytes, from_utf8
+from kombu.utils.url import maybe_sanitize_url
 
 from celery import states
 from celery import current_app, maybe_signature
@@ -92,8 +93,9 @@ class BaseBackend(object):
         'interval_max': 1,
     }
 
-    def __init__(self, app, serializer=None,
-                 max_cached_results=None, accept=None, **kwargs):
+    def __init__(self, app,
+                 serializer=None, max_cached_results=None, accept=None,
+                 url=None, **kwargs):
         self.app = app
         conf = self.app.conf
         self.serializer = serializer or conf.CELERY_RESULT_SERIALIZER
@@ -105,6 +107,16 @@ class BaseBackend(object):
         self.accept = prepare_accept_content(
             conf.CELERY_ACCEPT_CONTENT if accept is None else accept,
         )
+        self.url = url
+
+    def as_uri(self, include_password=False):
+        """Return the backend as an URI, sanitizing the password or not"""
+        # when using maybe_sanitize_url(), "/" is added
+        # we're stripping it for consistency
+        if include_password:
+            return self.url
+        url = maybe_sanitize_url(self.url or '')
+        return url[:-1] if url.endswith(':///') else url
 
     def mark_as_started(self, task_id, **meta):
         """Mark a task as started"""
@@ -603,4 +615,9 @@ class DisabledBackend(BaseBackend):
         raise NotImplementedError(
             'No result backend configured.  '
             'Please see the documentation for more information.')
-    wait_for = get_status = get_result = get_traceback = _is_disabled
+
+    def as_uri(self, *args, **kwargs):
+        return 'disabled://'
+
+    get_state = get_status = get_result = get_traceback = _is_disabled
+    wait_for = get_many = _is_disabled
diff --git a/celery/backends/cache.py b/celery/backends/cache.py
index cfc9171..3c8230c 100644
--- a/celery/backends/cache.py
+++ b/celery/backends/cache.py
@@ -100,6 +100,7 @@ class CacheBackend(KeyValueStoreBackend):
     def __init__(self, app, expires=None, backend=None,
                  options={}, url=None, **kwargs):
         super(CacheBackend, self).__init__(app, **kwargs)
+        self.url = url
 
         self.options = dict(self.app.conf.CELERY_CACHE_BACKEND_OPTIONS,
                             **options)
@@ -149,3 +150,12 @@ class CacheBackend(KeyValueStoreBackend):
                  expires=self.expires,
                  options=self.options))
         return super(CacheBackend, self).__reduce__(args, kwargs)
+
+    def as_uri(self, *args, **kwargs):
+        """Return the backend as an URI.
+
+        This properly handles the case of multiple servers.
+
+        """
+        servers = ';'.join(self.servers)
+        return '{0}://{1}/'.format(self.backend, servers)
diff --git a/celery/backends/cassandra.py b/celery/backends/cassandra.py
index 663aade..79f17ee 100644
--- a/celery/backends/cassandra.py
+++ b/celery/backends/cassandra.py
@@ -158,6 +158,9 @@ class CassandraBackend(BaseBackend):
 
         return self._retry_on_error(_do_store)
 
+    def as_uri(self, include_password=True):
+        return 'cassandra://'
+
     def _get_task_meta_for(self, task_id):
         """Get task metadata for a task by id."""
 
diff --git a/celery/backends/couchbase.py b/celery/backends/couchbase.py
index 2d51b80..cd7555e 100644
--- a/celery/backends/couchbase.py
+++ b/celery/backends/couchbase.py
@@ -28,6 +28,12 @@ __all__ = ['CouchBaseBackend']
 
 
 class CouchBaseBackend(KeyValueStoreBackend):
+    """CouchBase backend.
+
+    :raises celery.exceptions.ImproperlyConfigured: if
+        module :mod:`couchbase` is not available.
+
+    """
     bucket = 'default'
     host = 'localhost'
     port = 8091
@@ -38,16 +44,10 @@ class CouchBaseBackend(KeyValueStoreBackend):
     unlock_gil = True
     timeout = 2.5
     transcoder = None
-    # supports_autoexpire = False
 
     def __init__(self, url=None, *args, **kwargs):
-        """Initialize CouchBase backend instance.
-
-        :raises celery.exceptions.ImproperlyConfigured: if
-            module :mod:`couchbase` is not available.
-
-        """
         super(CouchBaseBackend, self).__init__(*args, **kwargs)
+        self.url = url
 
         self.expires = kwargs.get('expires') or maybe_timedelta(
             self.app.conf.CELERY_TASK_RESULT_EXPIRES)
diff --git a/celery/backends/database/__init__.py b/celery/backends/database/__init__.py
index 2e08164..f47fdd5 100644
--- a/celery/backends/database/__init__.py
+++ b/celery/backends/database/__init__.py
@@ -86,7 +86,7 @@ class DatabaseBackend(BaseBackend):
         super(DatabaseBackend, self).__init__(**kwargs)
         conf = self.app.conf
         self.expires = maybe_timedelta(self.prepare_expires(expires))
-        self.dburi = url or dburi or conf.CELERY_RESULT_DBURI
+        self.url = url or dburi or conf.CELERY_RESULT_DBURI
         self.engine_options = dict(
             engine_options or {},
             **conf.CELERY_RESULT_ENGINE_OPTIONS or {})
@@ -99,14 +99,14 @@ class DatabaseBackend(BaseBackend):
         Task.__table__.name = tablenames.get('task', 'celery_taskmeta')
         TaskSet.__table__.name = tablenames.get('group', 'celery_tasksetmeta')
 
-        if not self.dburi:
+        if not self.url:
             raise ImproperlyConfigured(
                 'Missing connection string! Do you have '
                 'CELERY_RESULT_DBURI set to a real value?')
 
     def ResultSession(self, session_manager=SessionManager()):
         return session_manager.session_factory(
-            dburi=self.dburi,
+            dburi=self.url,
             short_lived_sessions=self.short_lived_sessions,
             **self.engine_options
         )
@@ -195,7 +195,7 @@ class DatabaseBackend(BaseBackend):
 
     def __reduce__(self, args=(), kwargs={}):
         kwargs.update(
-            dict(dburi=self.dburi,
+            dict(dburi=self.url,
                  expires=self.expires,
                  engine_options=self.engine_options))
         return super(DatabaseBackend, self).__reduce__(args, kwargs)
diff --git a/celery/backends/mongodb.py b/celery/backends/mongodb.py
index 2bc8e0e..281c38c 100644
--- a/celery/backends/mongodb.py
+++ b/celery/backends/mongodb.py
@@ -12,6 +12,7 @@ from datetime import datetime
 
 from kombu.syn import detect_environment
 from kombu.utils import cached_property
+from kombu.utils.url import maybe_sanitize_url
 
 from celery import states
 from celery.exceptions import ImproperlyConfigured
@@ -37,6 +38,13 @@ __all__ = ['MongoBackend']
 
 
 class MongoBackend(BaseBackend):
+    """MongoDB result backend.
+
+    :raises celery.exceptions.ImproperlyConfigured: if
+        module :mod:`pymongo` is not available.
+
+    """
+
     host = 'localhost'
     port = 27017
     user = None
@@ -51,12 +59,6 @@ class MongoBackend(BaseBackend):
     _connection = None
 
     def __init__(self, app=None, url=None, **kwargs):
-        """Initialize MongoDB backend instance.
-
-        :raises celery.exceptions.ImproperlyConfigured: if
-            module :mod:`pymongo` is not available.
-
-        """
         self.options = {}
         super(MongoBackend, self).__init__(app, **kwargs)
         self.expires = kwargs.get('expires') or maybe_timedelta(
@@ -196,11 +198,11 @@ class MongoBackend(BaseBackend):
         self.collection.remove({'_id': group_id})
 
     def _forget(self, task_id):
-        """
-        Remove result from MongoDB.
+        """Remove result from MongoDB.
+
+        :raises celery.exceptions.OperationsError:
+            if the task_id could not be removed.
 
-        :raises celery.exceptions.OperationsError: if the task_id could not be
-                                                   removed.
         """
         # By using safe=True, this will wait until it receives a response from
         # the server.  Likewise, it will raise an OperationsError if the
@@ -243,3 +245,20 @@ class MongoBackend(BaseBackend):
         # in the background. Once completed cleanup will be much faster
         collection.ensure_index('date_done', background='true')
         return collection
+
+    def as_uri(self, include_password=False):
+        """Return the backend as an URI.
+
+        :keyword include_password: Censor passwords.
+
+        """
+        if not self.url:
+            return 'mongodb://'
+        if include_password:
+            return self.url
+
+        if ',' not in self.url:
+            return maybe_sanitize_url(self.url)
+
+        uri1, remainder = self.url.split(',', 1)
+        return ','.join([maybe_sanitize_url(uri1), remainder])
diff --git a/celery/backends/rpc.py b/celery/backends/rpc.py
index 28d5426..92bcc61 100644
--- a/celery/backends/rpc.py
+++ b/celery/backends/rpc.py
@@ -54,6 +54,9 @@ class RPCBackend(amqp.AMQPBackend):
     def on_reply_declare(self, task_id):
         pass
 
+    def as_uri(self, include_password=True):
+        return 'rpc://'
+
     @property
     def binding(self):
         return self.Queue(self.oid, self.exchange, self.oid,
diff --git a/celery/bin/celery.py b/celery/bin/celery.py
index 3af8afa..4676b30 100644
--- a/celery/bin/celery.py
+++ b/celery/bin/celery.py
@@ -613,12 +613,35 @@ class shell(Command):  # pragma: no cover
         code.interact(local=self.locals)
 
     def invoke_ipython_shell(self):
-        try:
-            from IPython.terminal import embed
-            embed.TerminalInteractiveShell(user_ns=self.locals).mainloop()
-        except ImportError:  # ipython < 0.11
-            from IPython.Shell import IPShell
-            IPShell(argv=[], user_ns=self.locals).mainloop()
+        for ip in (self._ipython, self._ipython_pre_10,
+                   self._ipython_terminal, self._ipython_010,
+                   self._no_ipython):
+            try:
+                return ip()
+            except ImportError:
+                pass
+
+    def _ipython(self):
+        from IPython import start_ipython
+        start_ipython(argv=[], user_ns=self.locals)
+
+    def _ipython_pre_10(self):  # pragma: no cover
+        from IPython.frontend.terminal.ipapp import TerminalIPythonApp
+        app = TerminalIPythonApp.instance()
+        app.initialize(argv=[])
+        app.shell.user_ns.update(self.locals)
+        app.start()
+
+    def _ipython_terminal(self):  # pragma: no cover
+        from IPython.terminal import embed
+        embed.TerminalInteractiveShell(user_ns=self.locals).mainloop()
+
+    def _ipython_010(self):  # pragma: no cover
+        from IPython.Shell import IPShell
+        IPShell(argv=[], user_ns=self.locals).mainloop()
+
+    def _no_ipython(self):  # pragma: no cover
+        raise ImportError("no suitable ipython found")
 
     def invoke_bpython_shell(self):
         import bpython
diff --git a/celery/bin/celeryd_detach.py b/celery/bin/celeryd_detach.py
index d9d6141..4d37d5f 100644
--- a/celery/bin/celeryd_detach.py
+++ b/celery/bin/celeryd_detach.py
@@ -19,6 +19,7 @@ import sys
 from optparse import OptionParser, BadOptionError
 
 from celery.platforms import EX_FAILURE, detached
+from celery.utils import default_nodename, node_format
 from celery.utils.log import get_logger
 
 from celery.bin.base import daemon_options, Option
@@ -31,6 +32,7 @@ C_FAKEFORK = os.environ.get('C_FAKEFORK')
 
 OPTION_LIST = daemon_options(default_pidfile='celeryd.pid') + (
     Option('--workdir', default=None, dest='working_directory'),
+    Option('-n', '--hostname'),
     Option('--fake',
            default=False, action='store_true', dest='fake',
            help="Don't fork (for debugging purposes)"),
@@ -39,7 +41,10 @@ OPTION_LIST = daemon_options(default_pidfile='celeryd.pid') + (
 
 def detach(path, argv, logfile=None, pidfile=None, uid=None,
            gid=None, umask=None, working_directory=None, fake=False, app=None,
-           executable=None):
+           executable=None, hostname=None):
+    hostname = default_nodename(hostname)
+    logfile = node_format(logfile, hostname)
+    pidfile = node_format(pidfile, hostname)
     fake = 1 if C_FAKEFORK else fake
     with detached(logfile, pidfile, uid, gid, umask, working_directory, fake,
                   after_forkers=False):
@@ -51,7 +56,8 @@ def detach(path, argv, logfile=None, pidfile=None, uid=None,
             if app is None:
                 from celery import current_app
                 app = current_app
-            app.log.setup_logging_subsystem('ERROR', logfile)
+            app.log.setup_logging_subsystem(
+                'ERROR', logfile, hostname=hostname)
             logger.critical("Can't exec %r", ' '.join([path] + argv),
                             exc_info=True)
         return EX_FAILURE
@@ -143,6 +149,8 @@ class detached_celeryd(object):
             parser.leftovers.append('--logfile={0}'.format(options.logfile))
         if options.pidfile:
             parser.leftovers.append('--pidfile={0}'.format(options.pidfile))
+        if options.hostname:
+            parser.leftovers.append('--hostname={0}'.format(options.hostname))
         return options, values, parser.leftovers
 
     def execute_from_commandline(self, argv=None):
diff --git a/celery/concurrency/asynpool.py b/celery/concurrency/asynpool.py
index 3283597..bc29d9c 100644
--- a/celery/concurrency/asynpool.py
+++ b/celery/concurrency/asynpool.py
@@ -19,6 +19,7 @@
 from __future__ import absolute_import
 
 import errno
+import gc
 import os
 import select
 import socket
@@ -409,11 +410,32 @@ class AsynPool(_pool.Pool):
             self.on_soft_timeout = self._timeout_handler.on_soft_timeout
             self.on_hard_timeout = self._timeout_handler.on_hard_timeout
 
-    def _event_process_exit(self, hub, fd):
+    def _create_worker_process(self, i):
+        gc.collect()  # Issue #2927
+        return super(AsynPool, self)._create_worker_process(i)
+
+    def _event_process_exit(self, hub, proc):
         # This method is called whenever the process sentinel is readable.
-        hub.remove(fd)
+        self._untrack_child_process(proc, hub)
         self.maintain_pool()
 
+    def _track_child_process(self, proc, hub):
+        try:
+            fd = proc._sentinel_poll
+        except AttributeError:
+            # we need to duplicate the fd here to carefully
+            # control when the fd is removed from the process table,
+            # as once the original fd is closed we cannot unregister
+            # the fd from epoll(7) anymore, causing a 100% CPU poll loop.
+            fd = proc._sentinel_poll = os.dup(proc._popen.sentinel)
+        hub.add_reader(fd, self._event_process_exit, hub, proc)
+
+    def _untrack_child_process(self, proc, hub):
+        if proc._sentinel_poll is not None:
+            fd, proc._sentinel_poll = proc._sentinel_poll, None
+            hub.remove(fd)
+            os.close(fd)
+
     def register_with_event_loop(self, hub):
         """Registers the async pool with the current event loop."""
         self._result_handler.register_with_event_loop(hub)
@@ -423,8 +445,7 @@ class AsynPool(_pool.Pool):
         self._create_write_handlers(hub)
 
         # Add handler for when a process exits (calls maintain_pool)
-        [hub.add_reader(fd, self._event_process_exit, hub, fd)
-         for fd in self.process_sentinels]
+        [self._track_child_process(w, hub) for w in self._pool]
         # Handle_result_event is called whenever one of the
         # result queues are readable.
         [hub.add_reader(fd, self.handle_result_event, fd)
@@ -511,13 +532,14 @@ class AsynPool(_pool.Pool):
         fileno_to_outq = self._fileno_to_outq
         fileno_to_synq = self._fileno_to_synq
         busy_workers = self._busy_workers
-        event_process_exit = self._event_process_exit
         handle_result_event = self.handle_result_event
         process_flush_queues = self.process_flush_queues
         waiting_to_start = self._waiting_to_start
 
         def verify_process_alive(proc):
-            if proc._is_alive() and proc in waiting_to_start:
+            proc = proc()  # is a weakref
+            if (proc is not None and proc._is_alive() and
+                    proc in waiting_to_start):
                 assert proc.outqR_fd in fileno_to_outq
                 assert fileno_to_outq[proc.outqR_fd] is proc
                 assert proc.outqR_fd in hub.readers
@@ -537,10 +559,9 @@ class AsynPool(_pool.Pool):
                 if job._scheduled_for and job._scheduled_for.inqW_fd == infd:
                     job._scheduled_for = proc
             fileno_to_outq[proc.outqR_fd] = proc
+
             # maintain_pool is called whenever a process exits.
-            add_reader(
-                proc.sentinel, event_process_exit, hub, proc.sentinel,
-            )
+            self._track_child_process(proc, hub)
 
             assert not isblocking(proc.outq._reader)
 
@@ -550,7 +571,7 @@ class AsynPool(_pool.Pool):
 
             waiting_to_start.add(proc)
             hub.call_later(
-                self._proc_alive_timeout, verify_process_alive, proc,
+                self._proc_alive_timeout, verify_process_alive, ref(proc),
             )
 
         self.on_process_up = on_process_up
@@ -594,16 +615,16 @@ class AsynPool(_pool.Pool):
             )
             if inq:
                 busy_workers.discard(inq)
-            remove_reader(proc.sentinel)
+            self._untrack_child_process(proc, hub)
             waiting_to_start.discard(proc)
             self._active_writes.discard(proc.inqW_fd)
-            remove_writer(proc.inqW_fd)
-            remove_reader(proc.outqR_fd)
+            remove_writer(proc.inq._writer)
+            remove_reader(proc.outq._reader)
             if proc.synqR_fd:
-                remove_reader(proc.synqR_fd)
+                remove_reader(proc.synq._reader)
             if proc.synqW_fd:
                 self._active_writes.discard(proc.synqW_fd)
-                remove_reader(proc.synqW_fd)
+                remove_reader(proc.synq._writer)
         self.on_process_down = on_process_down
 
     def _create_write_handlers(self, hub,
@@ -778,8 +799,9 @@ class AsynPool(_pool.Pool):
             append_message(job)
         self._quick_put = send_job
 
-        def on_not_recovering(proc, fd, job):
-            error('Process inqueue damaged: %r %r' % (proc, proc.exitcode))
+        def on_not_recovering(proc, fd, job, exc):
+            error('Process inqueue damaged: %r %r: %r',
+                  proc, proc.exitcode, exc, exc_info=1)
             if proc._is_alive():
                 proc.terminate()
             hub.remove(fd)
@@ -789,7 +811,7 @@ class AsynPool(_pool.Pool):
             # writes job to the worker process.
             # Operation must complete if more than one byte of data
             # was written.  If the broker connection is lost
-            # and no data was written the operation shall be cancelled.
+            # and no data was written the operation shall be canceled.
             header, body, body_size = job._payload
             errors = 0
             try:
@@ -808,7 +830,7 @@ class AsynPool(_pool.Pool):
                         # suspend until more data
                         errors += 1
                         if errors > 100:
-                            on_not_recovering(proc, fd, job)
+                            on_not_recovering(proc, fd, job, exc)
                             raise StopIteration()
                         yield
                     else:
@@ -824,7 +846,7 @@ class AsynPool(_pool.Pool):
                         # suspend until more data
                         errors += 1
... 973 lines suppressed ...

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/celery.git



More information about the Python-modules-commits mailing list