[Python-modules-commits] [python-concurrent.futures] 03/08: Import python-concurrent.futures_3.2.0.orig.tar.gz

Ondrej Novy onovy at debian.org
Wed Dec 6 12:32:13 UTC 2017


This is an automated email from the git hooks/post-receive script.

onovy pushed a commit to branch master
in repository python-concurrent.futures.

commit ac4205a53a50cc69aef48f732a8dcb0aff14c86e
Author: Ondřej Nový <onovy at debian.org>
Date:   Wed Dec 6 13:18:51 2017 +0100

    Import python-concurrent.futures_3.2.0.orig.tar.gz
---
 .travis.yml                  | 46 ++++++++++++++++++----------
 CHANGES => CHANGES.rst       | 35 +++++++++++++++------
 README.rst                   | 35 +++++++++++++++++++++
 concurrent/futures/_base.py  | 72 +++++++++++++++++++++++++++++++++-----------
 concurrent/futures/thread.py | 17 +++++++++--
 setup.py                     | 16 +++++++---
 test_futures.py              | 31 +++++++++++++++++--
 tox.ini                      |  4 ---
 8 files changed, 198 insertions(+), 58 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index d2b4ca9..4f3bd34 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,22 +1,36 @@
+language: python
 sudo: false
 
-language: python
+stages:
+  - name: test
+  - name: deploy to pypi
+    if: type = push AND tag =~ ^\d+\.\d+\.\d+
 
-python:
-  - "2.6"
-  - "2.7"
+jobs:
+  fast_finish: true
+  include:
+    - env: TOXENV=pypy
+      python: pypy
 
-install: pip install tox-travis
+    - env: TOXENV=py26
+      python: "2.6"
 
-script: tox
+    - env: TOXENV=py27
+
+    - stage: deploy to pypi
+      install: skip
+      script: skip
+      deploy:
+        provider: pypi
+        user: agronholm
+        password:
+          secure: Dp2GLO5NK0cAV5uVSxq1s1Ux3IaJM/GYPskR8Nja9uTsktNmAhsPVDH+/w7a79G7ngyAoFv+GmDNXmAUOd6B3FrpnF9Lk4KIwSQ+agGQsdmQ5NrTVF33AXNMQiqLFd1nTq7g/or4qzUiDST9YgT3o6oIJwmGQRtTx4ZmIXSiGyMgQvxX27eMalLmA/FovgilxSkS3oEvhNTMgXI/E5CUgsFQC1rKcl+rzqfDv2kZoatrc0qrxMPeuDkNxZx9qb8gjstjcVJCjLZPZaIL2nSY6iQ2NdeKzLX2yDSXJn1yO+BX+1xlglxMYK+WUUQZvqExhWKm1sCsExSuvmF62FS0OLjLoUDNxKEQlSqS/fRZGAbDFkM4vxAWl89PF3MAFV12OES/HKA7cjbQcvPmAVC/88G3SSB5NxgsK/jOjhYh3scZtG/o6Mk5MB02ZqQdT7pNtMWK3B3XKUutK+zXWU975+1QG7 [...]
+        distributions: sdist bdist_wheel
+        on:
+          tags: true
 
-deploy:
-  provider: pypi
-  user: agronholm
-  password:
-    secure: Dp2GLO5NK0cAV5uVSxq1s1Ux3IaJM/GYPskR8Nja9uTsktNmAhsPVDH+/w7a79G7ngyAoFv+GmDNXmAUOd6B3FrpnF9Lk4KIwSQ+agGQsdmQ5NrTVF33AXNMQiqLFd1nTq7g/or4qzUiDST9YgT3o6oIJwmGQRtTx4ZmIXSiGyMgQvxX27eMalLmA/FovgilxSkS3oEvhNTMgXI/E5CUgsFQC1rKcl+rzqfDv2kZoatrc0qrxMPeuDkNxZx9qb8gjstjcVJCjLZPZaIL2nSY6iQ2NdeKzLX2yDSXJn1yO+BX+1xlglxMYK+WUUQZvqExhWKm1sCsExSuvmF62FS0OLjLoUDNxKEQlSqS/fRZGAbDFkM4vxAWl89PF3MAFV12OES/HKA7cjbQcvPmAVC/88G3SSB5NxgsK/jOjhYh3scZtG/o6Mk5MB02ZqQdT7pNtMWK3B3XKUutK+zXWU975+1QG7sy/9O2 [...]
-  distributions: sdist bdist_wheel
-  on:
-    tags: true
-    repo: agronholm/pythonfutures
-    python: "2.7"
+python: "2.7"
+
+install: pip install tox
+
+script: tox
diff --git a/CHANGES b/CHANGES.rst
similarity index 74%
rename from CHANGES
rename to CHANGES.rst
index 4ce2585..879cfa1 100644
--- a/CHANGES
+++ b/CHANGES.rst
@@ -1,10 +1,24 @@
+3.2.0
+=====
+
+- The ThreadPoolExecutor class constructor now accepts an optional ``thread_name_prefix``
+  argument to make it possible to customize the names of the threads created by the pool.
+  Upstream contribution by Gregory P. Smith in https://bugs.python.org/issue27664.
+- Backported fixes from upstream (thanks Lisandro Dalcin):
+
+ - python/cpython#1560
+ - python/cpython#3270
+ - python/cpython#3830
+
+
 3.1.1
 =====
 
-- Backported sanity checks for the ``max_workers`` constructor argument for ThreadPoolExecutor and
-  ProcessPoolExecutor
-- Set the default value of ``max_workers`` in ThreadPoolExecutor to ``None``, as in upstream code
-  (computes the thread pool size based on the number of CPUs)
+- Backported sanity checks for the ``max_workers`` constructor argument for
+  ThreadPoolExecutor and ProcessPoolExecutor
+- Set the default value of ``max_workers`` in ThreadPoolExecutor to ``None``,
+  as in upstream code (computes the thread pool size based on the number of
+  CPUs)
 - Added support for old-style exception objects
 - Switched to the PSF license
 
@@ -12,13 +26,14 @@
 3.1.0
 =====
 
-(Failed release)
+- (Failed release)
 
 
 3.0.5
 =====
 
-- Fixed OverflowError with ProcessPoolExecutor on Windows (regression introduced in 3.0.4)
+- Fixed OverflowError with ProcessPoolExecutor on Windows (regression
+  introduced in 3.0.4)
 
 
 3.0.4
@@ -51,11 +66,11 @@
 - Dropped Python 2.5 and 3.1 support
 - Removed the deprecated "futures" top level package
 - Applied patch for issue 11777 (Executor.map does not submit futures until
-                                 iter.next() is called)
+  iter.next() is called)
 - Applied patch for issue 15015 (accessing an non-existing attribute)
 - Applied patch for issue 16284 (memory leak)
 - Applied patch for issue 20367 (behavior of concurrent.futures.as_completed()
-                                 for duplicate arguments)
+  for duplicate arguments)
 
 2.2.0
 =====
@@ -81,7 +96,7 @@
 2.1.4
 =====
 
-- Ported the library again from Python 3.2.5 to get the latest bug fixes 
+- Ported the library again from Python 3.2.5 to get the latest bug fixes
 
 
 2.1.3
@@ -121,4 +136,4 @@
 1.0
 ===
 
-Initial release.
+- Initial release
diff --git a/README.rst b/README.rst
new file mode 100644
index 0000000..1f072e4
--- /dev/null
+++ b/README.rst
@@ -0,0 +1,35 @@
+.. image:: https://travis-ci.org/agronholm/pythonfutures.svg?branch=master
+  :target: https://travis-ci.org/agronholm/pythonfutures
+  :alt: Build Status
+
+This is a backport of the `concurrent.futures`_ standard library module to Python 2.
+
+It should not be installed on Python 3, although there should be no harm in doing so, as the
+standard library takes precedence over third party libraries.
+
+To conditionally require this library only on Python 2, you can do this in your ``setup.py``:
+
+.. code-block:: python
+
+    setup(
+        ...
+        extras_require={
+            ':python_version == "2.7"': ['futures']
+        }
+    )
+
+Or, using the newer syntax:
+
+.. code-block:: python
+
+    setup(
+        ...
+        install_requires={
+            'futures; python_version == "2.7"'
+        }
+    )
+
+.. warning:: The ``ProcessPoolExecutor`` class has known (unfixable) problems on Python 2 and
+  should not be relied on for mission critical work.
+
+.. _concurrent.futures: https://docs.python.org/library/concurrent.futures.html
diff --git a/concurrent/futures/_base.py b/concurrent/futures/_base.py
index ca2ebfb..510ffa5 100644
--- a/concurrent/futures/_base.py
+++ b/concurrent/futures/_base.py
@@ -172,6 +172,29 @@ def _create_and_install_waiters(fs, return_when):
 
     return waiter
 
+
+def _yield_finished_futures(fs, waiter, ref_collect):
+    """
+    Iterate on the list *fs*, yielding finished futures one by one in
+    reverse order.
+    Before yielding a future, *waiter* is removed from its waiters
+    and the future is removed from each set in the collection of sets
+    *ref_collect*.
+
+    The aim of this function is to avoid keeping stale references after
+    the future is yielded and before the iterator resumes.
+    """
+    while fs:
+        f = fs[-1]
+        for futures_set in ref_collect:
+            futures_set.remove(f)
+        with f._condition:
+            f._waiters.remove(waiter)
+        del f
+        # Careful not to keep a reference to the popped value
+        yield fs.pop()
+
+
 def as_completed(fs, timeout=None):
     """An iterator over the given futures that yields each as it completes.
 
@@ -194,16 +217,19 @@ def as_completed(fs, timeout=None):
         end_time = timeout + time.time()
 
     fs = set(fs)
+    total_futures = len(fs)
     with _AcquireFutures(fs):
         finished = set(
                 f for f in fs
                 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
         pending = fs - finished
         waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
-
+    finished = list(finished)
     try:
-        for future in finished:
-            yield future
+        for f in _yield_finished_futures(finished, waiter,
+                                         ref_collect=(fs,)):
+            f = [f]
+            yield f.pop()
 
         while pending:
             if timeout is None:
@@ -213,7 +239,7 @@ def as_completed(fs, timeout=None):
                 if wait_timeout < 0:
                     raise TimeoutError(
                             '%d (of %d) futures unfinished' % (
-                            len(pending), len(fs)))
+                            len(pending), total_futures))
 
             waiter.event.wait(wait_timeout)
 
@@ -222,11 +248,15 @@ def as_completed(fs, timeout=None):
                 waiter.finished_futures = []
                 waiter.event.clear()
 
-            for future in finished:
-                yield future
-                pending.remove(future)
+            # reverse to keep finishing order
+            finished.reverse()
+            for f in _yield_finished_futures(finished, waiter,
+                                             ref_collect=(fs, pending)):
+                f = [f]
+                yield f.pop()
 
     finally:
+        # Remove waiter from unfinished futures
         for f in fs:
             with f._condition:
                 f._waiters.remove(waiter)
@@ -322,17 +352,20 @@ class Future(object):
         with self._condition:
             if self._state == FINISHED:
                 if self._exception:
-                    return '<Future at %s state=%s raised %s>' % (
-                        hex(id(self)),
+                    return '<%s at %#x state=%s raised %s>' % (
+                        self.__class__.__name__,
+                        id(self),
                         _STATE_TO_DESCRIPTION_MAP[self._state],
                         self._exception.__class__.__name__)
                 else:
-                    return '<Future at %s state=%s returned %s>' % (
-                        hex(id(self)),
+                    return '<%s at %#x state=%s returned %s>' % (
+                        self.__class__.__name__,
+                        id(self),
                         _STATE_TO_DESCRIPTION_MAP[self._state],
                         self._result.__class__.__name__)
-            return '<Future at %s state=%s>' % (
-                    hex(id(self)),
+            return '<%s at %#x state=%s>' % (
+                    self.__class__.__name__,
+                    id(self),
                    _STATE_TO_DESCRIPTION_MAP[self._state])
 
     def cancel(self):
@@ -355,7 +388,7 @@ class Future(object):
         return True
 
     def cancelled(self):
-        """Return True if the future has cancelled."""
+        """Return True if the future was cancelled."""
         with self._condition:
             return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
 
@@ -573,7 +606,7 @@ class Executor(object):
         raise NotImplementedError()
 
     def map(self, fn, *iterables, **kwargs):
-        """Returns a iterator equivalent to map(fn, iter).
+        """Returns an iterator equivalent to map(fn, iter).
 
         Args:
             fn: A callable that will take as many arguments as there are
@@ -600,11 +633,14 @@ class Executor(object):
         # before the first iterator value is required.
         def result_iterator():
             try:
-                for future in fs:
+                # reverse to keep finishing order
+                fs.reverse()
+                while fs:
+                    # Careful not to keep a reference to the popped future
                     if timeout is None:
-                        yield future.result()
+                        yield fs.pop().result()
                     else:
-                        yield future.result(end_time - time.time())
+                        yield fs.pop().result(end_time - time.time())
             finally:
                 for future in fs:
                     future.cancel()
diff --git a/concurrent/futures/thread.py b/concurrent/futures/thread.py
index efae619..bb0ce9d 100644
--- a/concurrent/futures/thread.py
+++ b/concurrent/futures/thread.py
@@ -5,6 +5,7 @@
 
 import atexit
 from concurrent.futures import _base
+import itertools
 import Queue as queue
 import threading
 import weakref
@@ -90,12 +91,17 @@ def _worker(executor_reference, work_queue):
 
 
 class ThreadPoolExecutor(_base.Executor):
-    def __init__(self, max_workers=None):
+
+    # Used to assign unique thread names when thread_name_prefix is not supplied.
+    _counter = itertools.count().next
+
+    def __init__(self, max_workers=None, thread_name_prefix=''):
         """Initializes a new ThreadPoolExecutor instance.
 
         Args:
             max_workers: The maximum number of threads that can be used to
                 execute the given calls.
+            thread_name_prefix: An optional name prefix to give our threads.
         """
         if max_workers is None:
             # Use this number because ThreadPoolExecutor is often
@@ -109,6 +115,8 @@ class ThreadPoolExecutor(_base.Executor):
         self._threads = set()
         self._shutdown = False
         self._shutdown_lock = threading.Lock()
+        self._thread_name_prefix = (thread_name_prefix or
+                                    ("ThreadPoolExecutor-%d" % self._counter()))
 
     def submit(self, fn, *args, **kwargs):
         with self._shutdown_lock:
@@ -130,8 +138,11 @@ class ThreadPoolExecutor(_base.Executor):
             q.put(None)
         # TODO(bquinlan): Should avoid creating new threads if there are more
         # idle threads than items in the work queue.
-        if len(self._threads) < self._max_workers:
-            t = threading.Thread(target=_worker,
+        num_threads = len(self._threads)
+        if num_threads < self._max_workers:
+            thread_name = '%s_%d' % (self._thread_name_prefix or self,
+                                     num_threads)
+            t = threading.Thread(name=thread_name, target=_worker,
                                  args=(weakref.ref(self, weakref_cb),
                                        self._work_queue))
             t.daemon = True
diff --git a/setup.py b/setup.py
index ea0c022..d45e66d 100755
--- a/setup.py
+++ b/setup.py
@@ -1,5 +1,7 @@
 #!/usr/bin/env python
+# coding: utf-8
 from warnings import warn
+import os.path
 import sys
 
 if sys.version_info[0] > 2:
@@ -14,15 +16,21 @@ try:
 except ImportError:
     from distutils.core import setup
 
+here = os.path.dirname(__file__)
+with open(os.path.join(here, 'README.rst')) as f:
+    readme = f.read()
+
 setup(name='futures',
-      version='3.1.1',
-      description='Backport of the concurrent.futures package from Python 3.2',
+      version='3.2.0',
+      description='Backport of the concurrent.futures package from Python 3',
+      long_description=readme,
       author='Brian Quinlan',
       author_email='brian at sweetapp.com',
-      maintainer='Alex Gronholm',
-      maintainer_email='alex.gronholm+pypi at nextday.fi',
+      maintainer=u'Alex Grönholm',
+      maintainer_email='alex.gronholm at nextday.fi',
       url='https://github.com/agronholm/pythonfutures',
       packages=['concurrent', 'concurrent.futures'],
+      python_requires='>=2.6, <3',
       license='PSF',
       classifiers=['License :: OSI Approved :: Python Software Foundation License',
                    'Development Status :: 5 - Production/Stable',
diff --git a/test_futures.py b/test_futures.py
index e7cd8cf..d5b3499 100644
--- a/test_futures.py
+++ b/test_futures.py
@@ -29,7 +29,7 @@ def reap_threads(func):
     If threading is unavailable this function does nothing.
     """
     @functools.wraps(func)
-    def decorator(*args): 
+    def decorator(*args):
         key = test_support.threading_setup()
         try:
             return func(*args)
@@ -50,7 +50,7 @@ def _assert_python(expected_success, *args, **env_vars):
     # caller is responsible to pass the full environment.
     if env_vars.pop('__cleanenv', None):
         env = {}
-    env.update(env_vars) 
+    env.update(env_vars)
     cmd_line.extend(args)
     p = subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
                          stdout=subprocess.PIPE, stderr=subprocess.PIPE,
@@ -78,7 +78,7 @@ def assert_python_ok(*args, **env_vars):
     return _assert_python(True, *args, **env_vars)
 
 
-def strip_python_stderr(stderr): 
+def strip_python_stderr(stderr):
     """Strip the stderr of a Python process from potential debug output
     emitted by the interpreter.
 
@@ -230,6 +230,31 @@ class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest):
         for t in threads:
             t.join()
 
+    def test_thread_names_assigned(self):
+        executor = futures.ThreadPoolExecutor(
+            max_workers=5, thread_name_prefix='SpecialPool')
+        executor.map(abs, range(-5, 5))
+        threads = executor._threads
+        del executor
+        gc.collect()
+
+        for t in threads:
+            self.assertRegexpMatches(t.name, r'^SpecialPool_[0-4]$')
+            t.join()
+
+    def test_thread_names_default(self):
+        executor = futures.ThreadPoolExecutor(max_workers=5)
+        executor.map(abs, range(-5, 5))
+        threads = executor._threads
+        del executor
+        gc.collect()
+
+        for t in threads:
+            # Ensure that our default name is reasonably sane and unique when
+            # no thread_name_prefix was supplied.
+            self.assertRegexpMatches(t.name, r'ThreadPoolExecutor-\d+_[0-4]$')
+            t.join()
+
 
 class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
     def _prime_executor(self):
diff --git a/tox.ini b/tox.ini
index a7fb9ed..fdff7cb 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,10 +1,6 @@
 [tox]
 envlist = py26,py27,pypy,jython
 
-[tox:travis]
-2.6 = py26
-2.7 = py27
-
 [testenv]
 commands = {envpython} test_futures.py {posargs}
 

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/python-concurrent.futures.git



More information about the Python-modules-commits mailing list