[Python-modules-commits] [python-concurrent.futures] 03/08: Import python-concurrent.futures_3.2.0.orig.tar.gz
Ondrej Novy
onovy at debian.org
Wed Dec 6 12:32:13 UTC 2017
This is an automated email from the git hooks/post-receive script.
onovy pushed a commit to branch master
in repository python-concurrent.futures.
commit ac4205a53a50cc69aef48f732a8dcb0aff14c86e
Author: Ondřej Nový <onovy at debian.org>
Date: Wed Dec 6 13:18:51 2017 +0100
Import python-concurrent.futures_3.2.0.orig.tar.gz
---
.travis.yml | 46 ++++++++++++++++++----------
CHANGES => CHANGES.rst | 35 +++++++++++++++------
README.rst | 35 +++++++++++++++++++++
concurrent/futures/_base.py | 72 +++++++++++++++++++++++++++++++++-----------
concurrent/futures/thread.py | 17 +++++++++--
setup.py | 16 +++++++---
test_futures.py | 31 +++++++++++++++++--
tox.ini | 4 ---
8 files changed, 198 insertions(+), 58 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index d2b4ca9..4f3bd34 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,22 +1,36 @@
+language: python
sudo: false
-language: python
+stages:
+ - name: test
+ - name: deploy to pypi
+ if: type = push AND tag =~ ^\d+\.\d+\.\d+
-python:
- - "2.6"
- - "2.7"
+jobs:
+ fast_finish: true
+ include:
+ - env: TOXENV=pypy
+ python: pypy
-install: pip install tox-travis
+ - env: TOXENV=py26
+ python: "2.6"
-script: tox
+ - env: TOXENV=py27
+
+ - stage: deploy to pypi
+ install: skip
+ script: skip
+ deploy:
+ provider: pypi
+ user: agronholm
+ password:
+ secure: Dp2GLO5NK0cAV5uVSxq1s1Ux3IaJM/GYPskR8Nja9uTsktNmAhsPVDH+/w7a79G7ngyAoFv+GmDNXmAUOd6B3FrpnF9Lk4KIwSQ+agGQsdmQ5NrTVF33AXNMQiqLFd1nTq7g/or4qzUiDST9YgT3o6oIJwmGQRtTx4ZmIXSiGyMgQvxX27eMalLmA/FovgilxSkS3oEvhNTMgXI/E5CUgsFQC1rKcl+rzqfDv2kZoatrc0qrxMPeuDkNxZx9qb8gjstjcVJCjLZPZaIL2nSY6iQ2NdeKzLX2yDSXJn1yO+BX+1xlglxMYK+WUUQZvqExhWKm1sCsExSuvmF62FS0OLjLoUDNxKEQlSqS/fRZGAbDFkM4vxAWl89PF3MAFV12OES/HKA7cjbQcvPmAVC/88G3SSB5NxgsK/jOjhYh3scZtG/o6Mk5MB02ZqQdT7pNtMWK3B3XKUutK+zXWU975+1QG7 [...]
+ distributions: sdist bdist_wheel
+ on:
+ tags: true
-deploy:
- provider: pypi
- user: agronholm
- password:
- secure: Dp2GLO5NK0cAV5uVSxq1s1Ux3IaJM/GYPskR8Nja9uTsktNmAhsPVDH+/w7a79G7ngyAoFv+GmDNXmAUOd6B3FrpnF9Lk4KIwSQ+agGQsdmQ5NrTVF33AXNMQiqLFd1nTq7g/or4qzUiDST9YgT3o6oIJwmGQRtTx4ZmIXSiGyMgQvxX27eMalLmA/FovgilxSkS3oEvhNTMgXI/E5CUgsFQC1rKcl+rzqfDv2kZoatrc0qrxMPeuDkNxZx9qb8gjstjcVJCjLZPZaIL2nSY6iQ2NdeKzLX2yDSXJn1yO+BX+1xlglxMYK+WUUQZvqExhWKm1sCsExSuvmF62FS0OLjLoUDNxKEQlSqS/fRZGAbDFkM4vxAWl89PF3MAFV12OES/HKA7cjbQcvPmAVC/88G3SSB5NxgsK/jOjhYh3scZtG/o6Mk5MB02ZqQdT7pNtMWK3B3XKUutK+zXWU975+1QG7sy/9O2 [...]
- distributions: sdist bdist_wheel
- on:
- tags: true
- repo: agronholm/pythonfutures
- python: "2.7"
+python: "2.7"
+
+install: pip install tox
+
+script: tox
diff --git a/CHANGES b/CHANGES.rst
similarity index 74%
rename from CHANGES
rename to CHANGES.rst
index 4ce2585..879cfa1 100644
--- a/CHANGES
+++ b/CHANGES.rst
@@ -1,10 +1,24 @@
+3.2.0
+=====
+
+- The ThreadPoolExecutor class constructor now accepts an optional ``thread_name_prefix``
+ argument to make it possible to customize the names of the threads created by the pool.
+ Upstream contribution by Gregory P. Smith in https://bugs.python.org/issue27664.
+- Backported fixes from upstream (thanks Lisandro Dalcin):
+
+ - python/cpython#1560
+ - python/cpython#3270
+ - python/cpython#3830
+
+
3.1.1
=====
-- Backported sanity checks for the ``max_workers`` constructor argument for ThreadPoolExecutor and
- ProcessPoolExecutor
-- Set the default value of ``max_workers`` in ThreadPoolExecutor to ``None``, as in upstream code
- (computes the thread pool size based on the number of CPUs)
+- Backported sanity checks for the ``max_workers`` constructor argument for
+ ThreadPoolExecutor and ProcessPoolExecutor
+- Set the default value of ``max_workers`` in ThreadPoolExecutor to ``None``,
+ as in upstream code (computes the thread pool size based on the number of
+ CPUs)
- Added support for old-style exception objects
- Switched to the PSF license
@@ -12,13 +26,14 @@
3.1.0
=====
-(Failed release)
+- (Failed release)
3.0.5
=====
-- Fixed OverflowError with ProcessPoolExecutor on Windows (regression introduced in 3.0.4)
+- Fixed OverflowError with ProcessPoolExecutor on Windows (regression
+ introduced in 3.0.4)
3.0.4
@@ -51,11 +66,11 @@
- Dropped Python 2.5 and 3.1 support
- Removed the deprecated "futures" top level package
- Applied patch for issue 11777 (Executor.map does not submit futures until
- iter.next() is called)
+ iter.next() is called)
- Applied patch for issue 15015 (accessing an non-existing attribute)
- Applied patch for issue 16284 (memory leak)
- Applied patch for issue 20367 (behavior of concurrent.futures.as_completed()
- for duplicate arguments)
+ for duplicate arguments)
2.2.0
=====
@@ -81,7 +96,7 @@
2.1.4
=====
-- Ported the library again from Python 3.2.5 to get the latest bug fixes
+- Ported the library again from Python 3.2.5 to get the latest bug fixes
2.1.3
@@ -121,4 +136,4 @@
1.0
===
-Initial release.
+- Initial release
diff --git a/README.rst b/README.rst
new file mode 100644
index 0000000..1f072e4
--- /dev/null
+++ b/README.rst
@@ -0,0 +1,35 @@
+.. image:: https://travis-ci.org/agronholm/pythonfutures.svg?branch=master
+ :target: https://travis-ci.org/agronholm/pythonfutures
+ :alt: Build Status
+
+This is a backport of the `concurrent.futures`_ standard library module to Python 2.
+
+It should not be installed on Python 3, although there should be no harm in doing so, as the
+standard library takes precedence over third party libraries.
+
+To conditionally require this library only on Python 2, you can do this in your ``setup.py``:
+
+.. code-block:: python
+
+ setup(
+ ...
+ extras_require={
+ ':python_version == "2.7"': ['futures']
+ }
+ )
+
+Or, using the newer syntax:
+
+.. code-block:: python
+
+ setup(
+ ...
+ install_requires={
+ 'futures; python_version == "2.7"'
+ }
+ )
+
+.. warning:: The ``ProcessPoolExecutor`` class has known (unfixable) problems on Python 2 and
+ should not be relied on for mission critical work.
+
+.. _concurrent.futures: https://docs.python.org/library/concurrent.futures.html
diff --git a/concurrent/futures/_base.py b/concurrent/futures/_base.py
index ca2ebfb..510ffa5 100644
--- a/concurrent/futures/_base.py
+++ b/concurrent/futures/_base.py
@@ -172,6 +172,29 @@ def _create_and_install_waiters(fs, return_when):
return waiter
+
+def _yield_finished_futures(fs, waiter, ref_collect):
+ """
+ Iterate on the list *fs*, yielding finished futures one by one in
+ reverse order.
+ Before yielding a future, *waiter* is removed from its waiters
+ and the future is removed from each set in the collection of sets
+ *ref_collect*.
+
+ The aim of this function is to avoid keeping stale references after
+ the future is yielded and before the iterator resumes.
+ """
+ while fs:
+ f = fs[-1]
+ for futures_set in ref_collect:
+ futures_set.remove(f)
+ with f._condition:
+ f._waiters.remove(waiter)
+ del f
+ # Careful not to keep a reference to the popped value
+ yield fs.pop()
+
+
def as_completed(fs, timeout=None):
"""An iterator over the given futures that yields each as it completes.
@@ -194,16 +217,19 @@ def as_completed(fs, timeout=None):
end_time = timeout + time.time()
fs = set(fs)
+ total_futures = len(fs)
with _AcquireFutures(fs):
finished = set(
f for f in fs
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
pending = fs - finished
waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
-
+ finished = list(finished)
try:
- for future in finished:
- yield future
+ for f in _yield_finished_futures(finished, waiter,
+ ref_collect=(fs,)):
+ f = [f]
+ yield f.pop()
while pending:
if timeout is None:
@@ -213,7 +239,7 @@ def as_completed(fs, timeout=None):
if wait_timeout < 0:
raise TimeoutError(
'%d (of %d) futures unfinished' % (
- len(pending), len(fs)))
+ len(pending), total_futures))
waiter.event.wait(wait_timeout)
@@ -222,11 +248,15 @@ def as_completed(fs, timeout=None):
waiter.finished_futures = []
waiter.event.clear()
- for future in finished:
- yield future
- pending.remove(future)
+ # reverse to keep finishing order
+ finished.reverse()
+ for f in _yield_finished_futures(finished, waiter,
+ ref_collect=(fs, pending)):
+ f = [f]
+ yield f.pop()
finally:
+ # Remove waiter from unfinished futures
for f in fs:
with f._condition:
f._waiters.remove(waiter)
@@ -322,17 +352,20 @@ class Future(object):
with self._condition:
if self._state == FINISHED:
if self._exception:
- return '<Future at %s state=%s raised %s>' % (
- hex(id(self)),
+ return '<%s at %#x state=%s raised %s>' % (
+ self.__class__.__name__,
+ id(self),
_STATE_TO_DESCRIPTION_MAP[self._state],
self._exception.__class__.__name__)
else:
- return '<Future at %s state=%s returned %s>' % (
- hex(id(self)),
+ return '<%s at %#x state=%s returned %s>' % (
+ self.__class__.__name__,
+ id(self),
_STATE_TO_DESCRIPTION_MAP[self._state],
self._result.__class__.__name__)
- return '<Future at %s state=%s>' % (
- hex(id(self)),
+ return '<%s at %#x state=%s>' % (
+ self.__class__.__name__,
+ id(self),
_STATE_TO_DESCRIPTION_MAP[self._state])
def cancel(self):
@@ -355,7 +388,7 @@ class Future(object):
return True
def cancelled(self):
- """Return True if the future has cancelled."""
+ """Return True if the future was cancelled."""
with self._condition:
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
@@ -573,7 +606,7 @@ class Executor(object):
raise NotImplementedError()
def map(self, fn, *iterables, **kwargs):
- """Returns a iterator equivalent to map(fn, iter).
+ """Returns an iterator equivalent to map(fn, iter).
Args:
fn: A callable that will take as many arguments as there are
@@ -600,11 +633,14 @@ class Executor(object):
# before the first iterator value is required.
def result_iterator():
try:
- for future in fs:
+ # reverse to keep finishing order
+ fs.reverse()
+ while fs:
+ # Careful not to keep a reference to the popped future
if timeout is None:
- yield future.result()
+ yield fs.pop().result()
else:
- yield future.result(end_time - time.time())
+ yield fs.pop().result(end_time - time.time())
finally:
for future in fs:
future.cancel()
diff --git a/concurrent/futures/thread.py b/concurrent/futures/thread.py
index efae619..bb0ce9d 100644
--- a/concurrent/futures/thread.py
+++ b/concurrent/futures/thread.py
@@ -5,6 +5,7 @@
import atexit
from concurrent.futures import _base
+import itertools
import Queue as queue
import threading
import weakref
@@ -90,12 +91,17 @@ def _worker(executor_reference, work_queue):
class ThreadPoolExecutor(_base.Executor):
- def __init__(self, max_workers=None):
+
+ # Used to assign unique thread names when thread_name_prefix is not supplied.
+ _counter = itertools.count().next
+
+ def __init__(self, max_workers=None, thread_name_prefix=''):
"""Initializes a new ThreadPoolExecutor instance.
Args:
max_workers: The maximum number of threads that can be used to
execute the given calls.
+ thread_name_prefix: An optional name prefix to give our threads.
"""
if max_workers is None:
# Use this number because ThreadPoolExecutor is often
@@ -109,6 +115,8 @@ class ThreadPoolExecutor(_base.Executor):
self._threads = set()
self._shutdown = False
self._shutdown_lock = threading.Lock()
+ self._thread_name_prefix = (thread_name_prefix or
+ ("ThreadPoolExecutor-%d" % self._counter()))
def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
@@ -130,8 +138,11 @@ class ThreadPoolExecutor(_base.Executor):
q.put(None)
# TODO(bquinlan): Should avoid creating new threads if there are more
# idle threads than items in the work queue.
- if len(self._threads) < self._max_workers:
- t = threading.Thread(target=_worker,
+ num_threads = len(self._threads)
+ if num_threads < self._max_workers:
+ thread_name = '%s_%d' % (self._thread_name_prefix or self,
+ num_threads)
+ t = threading.Thread(name=thread_name, target=_worker,
args=(weakref.ref(self, weakref_cb),
self._work_queue))
t.daemon = True
diff --git a/setup.py b/setup.py
index ea0c022..d45e66d 100755
--- a/setup.py
+++ b/setup.py
@@ -1,5 +1,7 @@
#!/usr/bin/env python
+# coding: utf-8
from warnings import warn
+import os.path
import sys
if sys.version_info[0] > 2:
@@ -14,15 +16,21 @@ try:
except ImportError:
from distutils.core import setup
+here = os.path.dirname(__file__)
+with open(os.path.join(here, 'README.rst')) as f:
+ readme = f.read()
+
setup(name='futures',
- version='3.1.1',
- description='Backport of the concurrent.futures package from Python 3.2',
+ version='3.2.0',
+ description='Backport of the concurrent.futures package from Python 3',
+ long_description=readme,
author='Brian Quinlan',
author_email='brian at sweetapp.com',
- maintainer='Alex Gronholm',
- maintainer_email='alex.gronholm+pypi at nextday.fi',
+ maintainer=u'Alex Grönholm',
+ maintainer_email='alex.gronholm at nextday.fi',
url='https://github.com/agronholm/pythonfutures',
packages=['concurrent', 'concurrent.futures'],
+ python_requires='>=2.6, <3',
license='PSF',
classifiers=['License :: OSI Approved :: Python Software Foundation License',
'Development Status :: 5 - Production/Stable',
diff --git a/test_futures.py b/test_futures.py
index e7cd8cf..d5b3499 100644
--- a/test_futures.py
+++ b/test_futures.py
@@ -29,7 +29,7 @@ def reap_threads(func):
If threading is unavailable this function does nothing.
"""
@functools.wraps(func)
- def decorator(*args):
+ def decorator(*args):
key = test_support.threading_setup()
try:
return func(*args)
@@ -50,7 +50,7 @@ def _assert_python(expected_success, *args, **env_vars):
# caller is responsible to pass the full environment.
if env_vars.pop('__cleanenv', None):
env = {}
- env.update(env_vars)
+ env.update(env_vars)
cmd_line.extend(args)
p = subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
@@ -78,7 +78,7 @@ def assert_python_ok(*args, **env_vars):
return _assert_python(True, *args, **env_vars)
-def strip_python_stderr(stderr):
+def strip_python_stderr(stderr):
"""Strip the stderr of a Python process from potential debug output
emitted by the interpreter.
@@ -230,6 +230,31 @@ class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest):
for t in threads:
t.join()
+ def test_thread_names_assigned(self):
+ executor = futures.ThreadPoolExecutor(
+ max_workers=5, thread_name_prefix='SpecialPool')
+ executor.map(abs, range(-5, 5))
+ threads = executor._threads
+ del executor
+ gc.collect()
+
+ for t in threads:
+ self.assertRegexpMatches(t.name, r'^SpecialPool_[0-4]$')
+ t.join()
+
+ def test_thread_names_default(self):
+ executor = futures.ThreadPoolExecutor(max_workers=5)
+ executor.map(abs, range(-5, 5))
+ threads = executor._threads
+ del executor
+ gc.collect()
+
+ for t in threads:
+ # Ensure that our default name is reasonably sane and unique when
+ # no thread_name_prefix was supplied.
+ self.assertRegexpMatches(t.name, r'ThreadPoolExecutor-\d+_[0-4]$')
+ t.join()
+
class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
def _prime_executor(self):
diff --git a/tox.ini b/tox.ini
index a7fb9ed..fdff7cb 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,10 +1,6 @@
[tox]
envlist = py26,py27,pypy,jython
-[tox:travis]
-2.6 = py26
-2.7 = py27
-
[testenv]
commands = {envpython} test_futures.py {posargs}
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/python-concurrent.futures.git
More information about the Python-modules-commits
mailing list