[Python-modules-commits] [python-s3transfer] 01/04: New upstream version 0.1.10
Takaki Taniguchi
takaki at moszumanska.debian.org
Thu Feb 2 08:45:38 UTC 2017
This is an automated email from the git hooks/post-receive script.
takaki pushed a commit to branch master
in repository python-s3transfer.
commit 4338ce57855bd4889c7100cc4f7465bd05dc1eaa
Author: TANIGUCHI Takaki <takaki at asis.media-as.org>
Date: Thu Feb 2 17:40:08 2017 +0900
New upstream version 0.1.10
---
.changes/0.1.10.json | 7 +++
.changes/0.1.9.json | 7 +++
CHANGELOG.rst | 12 +++++
README.rst | 10 ++--
s3transfer/__init__.py | 2 +-
s3transfer/futures.py | 93 +++++++++++++++++++++++++++++++-
s3transfer/manager.py | 16 ++++--
s3transfer/tasks.py | 8 ++-
scripts/ci/run-integ-tests | 8 ++-
scripts/ci/run-tests | 8 ++-
setup.py | 2 +-
tests/__init__.py | 25 +++++++++
tests/functional/test_download.py | 3 ++
tests/functional/test_manager.py | 22 ++++++++
tests/integration/test_download.py | 17 +++++-
tests/integration/test_upload.py | 18 ++++++-
tests/unit/test_futures.py | 106 +++++++++++++++++++++++++++++++++++++
tests/unit/test_tasks.py | 14 ++++-
18 files changed, 355 insertions(+), 23 deletions(-)
diff --git a/.changes/0.1.10.json b/.changes/0.1.10.json
new file mode 100644
index 0000000..16279d0
--- /dev/null
+++ b/.changes/0.1.10.json
@@ -0,0 +1,7 @@
+[
+ {
+ "category": "``TransferManager``",
+ "description": "Expose ability to use own executor class for ``TransferManager``",
+ "type": "feature"
+ }
+]
\ No newline at end of file
diff --git a/.changes/0.1.9.json b/.changes/0.1.9.json
new file mode 100644
index 0000000..801cb5e
--- /dev/null
+++ b/.changes/0.1.9.json
@@ -0,0 +1,7 @@
+[
+ {
+ "category": "``TransferFuture``",
+ "description": "Add support for setting exceptions on transfer future",
+ "type": "feature"
+ }
+]
diff --git a/CHANGELOG.rst b/CHANGELOG.rst
index ad25ead..9721d1a 100644
--- a/CHANGELOG.rst
+++ b/CHANGELOG.rst
@@ -2,6 +2,18 @@
CHANGELOG
=========
+0.1.10
+======
+
+* feature:``TransferManager``: Expose ability to use own executor class for ``TransferManager``
+
+
+0.1.9
+=====
+
+* feature:``TransferFuture``: Add support for setting exceptions on transfer future
+
+
0.1.8
=====
diff --git a/README.rst b/README.rst
index ad476bd..0c5fc0e 100644
--- a/README.rst
+++ b/README.rst
@@ -2,9 +2,11 @@
s3transfer - An Amazon S3 Transfer Manager for Python
=====================================================
-.. warning::
+S3transfer is a Python library for managing Amazon S3 transfers.
- This project is currently a work in progress. Please do not rely on
- this functionality in production as the interfaces may change over time.
+.. note::
-S3transfer is a Python library for managing Amazon S3 transfers.
+ This project is not currently GA. If you are planning to use this code in
+ production, make sure to lock to a minor version as interfaces may break
+ from minor version to minor version. For a basic, stable interface of
+ s3transfer, try the interfaces exposed in `boto3 <https://boto3.readthedocs.io/en/latest/guide/s3.html#using-the-transfer-manager>`__
diff --git a/s3transfer/__init__.py b/s3transfer/__init__.py
index 28fbceb..4ab0720 100644
--- a/s3transfer/__init__.py
+++ b/s3transfer/__init__.py
@@ -143,7 +143,7 @@ from s3transfer.exceptions import RetriesExceededError, S3UploadFailedError
__author__ = 'Amazon Web Services'
-__version__ = '0.1.9'
+__version__ = '0.1.10'
class NullHandler(logging.Handler):
diff --git a/s3transfer/futures.py b/s3transfer/futures.py
index afecc40..8f87b65 100644
--- a/s3transfer/futures.py
+++ b/s3transfer/futures.py
@@ -14,9 +14,11 @@ from concurrent import futures
from collections import namedtuple
import copy
import logging
+import sys
import threading
from s3transfer.compat import MAXINT
+from s3transfer.compat import six
from s3transfer.exceptions import CancelledError, TransferNotDoneError
from s3transfer.utils import FunctionContainer
from s3transfer.utils import TaskSemaphore
@@ -367,7 +369,8 @@ class TransferCoordinator(object):
class BoundedExecutor(object):
EXECUTOR_CLS = futures.ThreadPoolExecutor
- def __init__(self, max_size, max_num_threads, tag_semaphores=None):
+ def __init__(self, max_size, max_num_threads, tag_semaphores=None,
+ executor_cls=None):
"""An executor implentation that has a maximum queued up tasks
The executor will block if the number of tasks that have been
@@ -385,9 +388,16 @@ class BoundedExecutor(object):
:params tag_semaphores: A dictionary where the key is the name of the
tag and the value is the semaphore to use when limiting the
number of tasks the executor is processing at a time.
+
+ :type executor_cls: BaseExecutor
+ :param underlying_executor_cls: The executor class that
+ get bounded by this executor. If None is provided, the
+ concurrent.futures.ThreadPoolExecutor class is used.
"""
self._max_num_threads = max_num_threads
- self._executor = self.EXECUTOR_CLS(max_workers=self._max_num_threads)
+ if executor_cls is None:
+ executor_cls = self.EXECUTOR_CLS
+ self._executor = executor_cls(max_workers=self._max_num_threads)
self._semaphore = TaskSemaphore(max_size)
self._tag_semaphores = tag_semaphores
@@ -467,6 +477,85 @@ class ExecutorFuture(object):
return self._future.done()
+class BaseExecutor(object):
+ """Base Executor class implementation needed to work with s3transfer"""
+ def __init__(self, max_workers=None):
+ pass
+
+ def submit(self, fn, *args, **kwargs):
+ raise NotImplementedError('submit()')
+
+ def shutdown(self, wait=True):
+ raise NotImplementedError('shutdown()')
+
+
+class NonThreadedExecutor(BaseExecutor):
+ """A drop-in replacement non-threaded version of ThreadPoolExecutor"""
+ def submit(self, fn, *args, **kwargs):
+ future = NonThreadedExecutorFuture()
+ try:
+ result = fn(*args, **kwargs)
+ future.set_result(result)
+ except Exception:
+ e, tb = sys.exc_info()[1:]
+ logger.debug(
+ 'Setting exception for %s to %s with traceback %s',
+ future, e, tb
+ )
+ future.set_exception_info(e, tb)
+ return future
+
+ def shutdown(self, wait=True):
+ pass
+
+
+class NonThreadedExecutorFuture(object):
+ """The Future returned from NonThreadedExecutor
+
+ Note that this future is **not** thread-safe as it is being used
+ from the context of a non-threaded environment.
+ """
+ def __init__(self):
+ self._result = None
+ self._exception = None
+ self._traceback = None
+ self._done = False
+ self._done_callbacks = []
+
+ def set_result(self, result):
+ self._result = result
+ self._set_done()
+
+ def set_exception_info(self, exception, traceback):
+ self._exception = exception
+ self._traceback = traceback
+ self._set_done()
+
+ def result(self, timeout=None):
+ if self._exception:
+ six.reraise(
+ type(self._exception), self._exception, self._traceback)
+ return self._result
+
+ def _set_done(self):
+ self._done = True
+ for done_callback in self._done_callbacks:
+ self._invoke_done_callback(done_callback)
+ self._done_callbacks = []
+
+ def _invoke_done_callback(self, done_callback):
+ return done_callback(self)
+
+ def done(self):
+ return self._done
+
+ def add_done_callback(self, fn):
+ if self._done:
+ self._invoke_done_callback(fn)
+ else:
+ self._done_callbacks.append(fn)
+
+
TaskTag = namedtuple('TaskTag', ['name'])
IN_MEMORY_UPLOAD_TAG = TaskTag('in_memory_upload')
diff --git a/s3transfer/manager.py b/s3transfer/manager.py
index cfc62be..ba5eb61 100644
--- a/s3transfer/manager.py
+++ b/s3transfer/manager.py
@@ -193,13 +193,17 @@ class TransferManager(object):
'RequestPayer',
]
- def __init__(self, client, config=None, osutil=None):
+ def __init__(self, client, config=None, osutil=None, executor_cls=None):
"""A transfer manager interface for Amazon S3
:param client: Client to be used by the manager
:param config: TransferConfig to associate specific configurations
:param osutil: OSUtils object to use for os-related behavior when
using with transfer manager.
+
+ :type executor_cls: s3transfer.futures.BaseExecutor
+ :param executor_cls: The class of executor to use with the transfer
+ manager. By default, concurrent.futures.ThreadPoolExecutor is used.
"""
self._client = client
self._config = config
@@ -221,21 +225,25 @@ class TransferManager(object):
self._config.max_in_memory_upload_chunks),
IN_MEMORY_DOWNLOAD_TAG: SlidingWindowSemaphore(
self._config.max_in_memory_download_chunks)
- }
+ },
+ executor_cls=executor_cls
)
# The executor responsible for submitting the necessary tasks to
# perform the desired transfer
self._submission_executor = BoundedExecutor(
max_size=self._config.max_submission_queue_size,
- max_num_threads=self._config.max_submission_concurrency
+ max_num_threads=self._config.max_submission_concurrency,
+ executor_cls=executor_cls
+
)
# There is one thread available for writing to disk. It will handle
# downloads for all files.
self._io_executor = BoundedExecutor(
max_size=self._config.max_io_queue_size,
- max_num_threads=1
+ max_num_threads=1,
+ executor_cls=executor_cls
)
self._register_handlers()
diff --git a/s3transfer/tasks.py b/s3transfer/tasks.py
index 9ce0f21..4427927 100644
--- a/s3transfer/tasks.py
+++ b/s3transfer/tasks.py
@@ -253,11 +253,17 @@ class SubmissionTask(Task):
# Call the submit method to start submitting tasks to execute the
# transfer.
self._submit(transfer_future=transfer_future, **kwargs)
- except Exception as e:
+ except BaseException as e:
# If there was an exception rasied during the submission of task
# there is a chance that the final task that signals if a transfer
# is done and too run the cleanup may never have been submitted in
# the first place so we need to account accordingly.
+ #
+ # Note that BaseException is caught, instead of Exception, because
+ # for some implmentations of executors, specifically the serial
+ # implementation, the SubmissionTask is directly exposed to
+ # KeyboardInterupts and so needs to cleanup and signal done
+ # for those as well.
# Set the exception, that caused the process to fail.
self._log_and_set_exception(e)
diff --git a/scripts/ci/run-integ-tests b/scripts/ci/run-integ-tests
index 3118bde..5cd8328 100755
--- a/scripts/ci/run-integ-tests
+++ b/scripts/ci/run-integ-tests
@@ -12,9 +12,13 @@ REPO_ROOT = _dname(_dname(_dname(os.path.abspath(__file__))))
os.chdir(os.path.join(REPO_ROOT, 'tests'))
-def run(command):
- return check_call(command, shell=True)
+def run(command, env=None):
+ return check_call(command, shell=True, env=env)
run('nosetests --with-xunit --cover-erase --with-coverage '
'--cover-package s3transfer --cover-xml -v integration')
+
+# Run the serial implementation of s3transfer
+os.environ['USE_SERIAL_EXECUTOR'] = 'True'
+run('nosetests -v integration', env=os.environ)
diff --git a/scripts/ci/run-tests b/scripts/ci/run-tests
index fd7e189..8d23f2c 100755
--- a/scripts/ci/run-tests
+++ b/scripts/ci/run-tests
@@ -12,9 +12,13 @@ REPO_ROOT = _dname(_dname(_dname(os.path.abspath(__file__))))
os.chdir(os.path.join(REPO_ROOT, 'tests'))
-def run(command):
- return check_call(command, shell=True)
+def run(command, env=None):
+ return check_call(command, shell=True, env=env)
run('nosetests --with-coverage --cover-erase --cover-package s3transfer '
'--with-xunit --cover-xml -v unit/ functional/')
+
+# Run the serial implementation of s3transfer
+os.environ['USE_SERIAL_EXECUTOR'] = 'True'
+run('nosetests -v functional/', env=os.environ)
diff --git a/setup.py b/setup.py
index dc67232..942edf9 100644
--- a/setup.py
+++ b/setup.py
@@ -43,7 +43,7 @@ setup(
},
license="Apache License 2.0",
classifiers=(
- 'Development Status :: 1 - Planning',
+ 'Development Status :: 3 - Alpha',
'Intended Audience :: Developers',
'Natural Language :: English',
'License :: OSI Approved :: Apache Software License',
diff --git a/tests/__init__.py b/tests/__init__.py
index eeaa758..3cef6e7 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -34,6 +34,7 @@ from s3transfer.futures import TransferCoordinator
from s3transfer.futures import TransferMeta
from s3transfer.futures import TransferFuture
from s3transfer.futures import BoundedExecutor
+from s3transfer.futures import NonThreadedExecutor
from s3transfer.subscribers import BaseSubscriber
from s3transfer.utils import OSUtils
from s3transfer.utils import CallArgs
@@ -41,6 +42,22 @@ from s3transfer.utils import TaskSemaphore
from s3transfer.utils import SlidingWindowSemaphore
+ORIGINAL_EXECUTOR_CLS = BoundedExecutor.EXECUTOR_CLS
+
+
+def setup_package():
+ if is_serial_implementation():
+ BoundedExecutor.EXECUTOR_CLS = NonThreadedExecutor
+
+
+def teardown_package():
+ BoundedExecutor.EXECUTOR_CLS = ORIGINAL_EXECUTOR_CLS
+
+
+def is_serial_implementation():
+ return os.environ.get('USE_SERIAL_EXECUTOR', False)
+
+
def assert_files_equal(first, second):
if os.path.getsize(first) != os.path.getsize(second):
raise AssertionError("Files are not equal: %s, %s" % (first, second))
@@ -82,6 +99,14 @@ def skip_if_windows(reason):
return decorator
+def skip_if_using_serial_implementation(reason):
+ """Decorator to skip tests when running as the serial implementation"""
+ def decorator(func):
+ return unittest.skipIf(
+ is_serial_implementation(), reason)(func)
+ return decorator
+
+
class StreamWithError(object):
"""A wrapper to simulate errors while reading from a stream
diff --git a/tests/functional/test_download.py b/tests/functional/test_download.py
index 7959fb6..4d99913 100644
--- a/tests/functional/test_download.py
+++ b/tests/functional/test_download.py
@@ -25,6 +25,7 @@ from tests import RecordingOSUtils
from tests import NonSeekableWriter
from tests import BaseGeneralInterfaceTest
from tests import skip_if_windows
+from tests import skip_if_using_serial_implementation
from s3transfer.compat import six
from s3transfer.compat import SOCKET_ERROR
from s3transfer.exceptions import RetriesExceededError
@@ -317,6 +318,8 @@ class BaseDownloadTest(BaseGeneralInterfaceTest):
self.assertEqual(len(osutil.rename_records), 1)
@skip_if_windows('Windows does not support UNIX special files')
+ @skip_if_using_serial_implementation(
+ 'A seperate thread is needed to read from the fifo')
def test_download_for_fifo_file(self):
self.add_head_object_response()
self.add_successful_get_object_responses()
diff --git a/tests/functional/test_manager.py b/tests/functional/test_manager.py
index c6638e0..5b9116d 100644
--- a/tests/functional/test_manager.py
+++ b/tests/functional/test_manager.py
@@ -11,10 +11,13 @@
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
from botocore.awsrequest import create_request_object
+import mock
+from tests import skip_if_using_serial_implementation
from tests import StubbedClientTest
from s3transfer.exceptions import CancelledError
from s3transfer.exceptions import FatalError
+from s3transfer.futures import BaseExecutor
from s3transfer.manager import TransferManager
from s3transfer.manager import TransferConfig
@@ -37,6 +40,12 @@ class CallbackEnablingBody(object):
class TestTransferManager(StubbedClientTest):
+ @skip_if_using_serial_implementation(
+ 'Exception is thrown once all transfers are submitted. '
+ 'However for the serial implementation, transfers are performed '
+ 'in main thread meaning all transfers will complete before the '
+ 'exception being thrown.'
+ )
def test_error_in_context_manager_cancels_incomplete_transfers(self):
# The purpose of this test is to make sure if an error is raised
# in the body of the context manager, incomplete transfers will
@@ -69,6 +78,12 @@ class TestTransferManager(StubbedClientTest):
for future in futures:
future.result()
+ @skip_if_using_serial_implementation(
+ 'Exception is thrown once all transfers are submitted. '
+ 'However for the serial implementation, transfers are performed '
+ 'in main thread meaning all transfers will complete before the '
+ 'exception being thrown.'
+ )
def test_cntrl_c_in_context_manager_cancels_incomplete_transfers(self):
# The purpose of this test is to make sure if an error is raised
# in the body of the context manager, incomplete transfers will
@@ -125,3 +140,10 @@ class TestTransferManager(StubbedClientTest):
body.disable_callback_call_count, 1,
'The disable_callback() should have only ever been registered '
'once')
+
+ def test_use_custom_executor_implementation(self):
+ mocked_executor_cls = mock.Mock(BaseExecutor)
+ transfer_manager = TransferManager(
+ self.client, executor_cls=mocked_executor_cls)
+ transfer_manager.delete('bucket', 'key')
+ self.assertTrue(mocked_executor_cls.return_value.submit.called)
diff --git a/tests/integration/test_download.py b/tests/integration/test_download.py
index f549707..9215718 100644
--- a/tests/integration/test_download.py
+++ b/tests/integration/test_download.py
@@ -18,6 +18,7 @@ from concurrent.futures import CancelledError
from tests import assert_files_equal
from tests import skip_if_windows
+from tests import skip_if_using_serial_implementation
from tests import RecordingSubscriber
from tests import NonSeekableWriter
from tests.integration import BaseTransferManagerIntegTest
@@ -58,6 +59,12 @@ class TestDownload(BaseTransferManagerIntegTest):
future.result()
assert_files_equal(filename, download_path)
+ @skip_if_using_serial_implementation(
+ 'Exception is thrown once the transfer is submitted. '
+ 'However for the serial implementation, transfers are performed '
+ 'in main thread meaning the transfer will complete before the '
+ 'KeyboardInterrupt being thrown.'
+ )
def test_large_download_exits_quicky_on_exception(self):
transfer_manager = self.create_transfer_manager(self.config)
@@ -83,7 +90,7 @@ class TestDownload(BaseTransferManagerIntegTest):
# The maximum time allowed for the transfer manager to exit.
# This means that it should take less than a couple second after
# sleeping to exit.
- max_allowed_exit_time = sleep_time + 1
+ max_allowed_exit_time = sleep_time + 4
self.assertLess(
end_time - start_time, max_allowed_exit_time,
"Failed to exit under %s. Instead exited in %s." % (
@@ -99,6 +106,12 @@ class TestDownload(BaseTransferManagerIntegTest):
possible_matches = glob.glob('%s*' % download_path)
self.assertEqual(possible_matches, [])
+ @skip_if_using_serial_implementation(
+ 'Exception is thrown once the transfer is submitted. '
+ 'However for the serial implementation, transfers are performed '
+ 'in main thread meaning the transfer will complete before the '
+ 'KeyboardInterrupt being thrown.'
+ )
def test_many_files_exits_quicky_on_exception(self):
# Set the max request queue size and number of submission threads
# to something small to simulate having a large queue
@@ -131,7 +144,7 @@ class TestDownload(BaseTransferManagerIntegTest):
end_time = time.time()
# The maximum time allowed for the transfer manager to exit.
# This means that it should take less than a couple seconds to exit.
- max_allowed_exit_time = 2
+ max_allowed_exit_time = 5
self.assertLess(
end_time - start_time, max_allowed_exit_time,
"Failed to exit under %s. Instead exited in %s." % (
diff --git a/tests/integration/test_upload.py b/tests/integration/test_upload.py
index e8ab7c9..3887a85 100644
--- a/tests/integration/test_upload.py
+++ b/tests/integration/test_upload.py
@@ -15,6 +15,7 @@ import time
from concurrent.futures import CancelledError
from botocore.compat import six
+from tests import skip_if_using_serial_implementation
from tests import RecordingSubscriber, NonSeekableReader
from tests.integration import BaseTransferManagerIntegTest
from s3transfer.manager import TransferConfig
@@ -49,6 +50,12 @@ class TestUpload(BaseTransferManagerIntegTest):
future.result()
self.assertTrue(self.object_exists('20mb.txt'))
+ @skip_if_using_serial_implementation(
+ 'Exception is thrown once the transfer is submitted. '
+ 'However for the serial implementation, transfers are performed '
+ 'in main thread meaning the transfer will complete before the '
+ 'KeyboardInterrupt being thrown.'
+ )
def test_large_upload_exits_quicky_on_exception(self):
transfer_manager = self.create_transfer_manager(self.config)
@@ -72,7 +79,7 @@ class TestUpload(BaseTransferManagerIntegTest):
# The maximum time allowed for the transfer manager to exit.
# This means that it should take less than a couple second after
# sleeping to exit.
- max_allowed_exit_time = sleep_time + 2
+ max_allowed_exit_time = sleep_time + 5
self.assertLess(
end_time - start_time, max_allowed_exit_time,
"Failed to exit under %s. Instead exited in %s." % (
@@ -90,6 +97,13 @@ class TestUpload(BaseTransferManagerIntegTest):
# make sure the object does not exist.
self.assertFalse(self.object_exists('20mb.txt'))
+
+ @skip_if_using_serial_implementation(
+ 'Exception is thrown once the transfers are submitted. '
+ 'However for the serial implementation, transfers are performed '
+ 'in main thread meaning the transfers will complete before the '
+ 'KeyboardInterrupt being thrown.'
+ )
def test_many_files_exits_quicky_on_exception(self):
# Set the max request queue size and number of submission threads
# to something small to simulate having a large queue
@@ -121,7 +135,7 @@ class TestUpload(BaseTransferManagerIntegTest):
end_time = time.time()
# The maximum time allowed for the transfer manager to exit.
# This means that it should take less than a couple seconds to exit.
- max_allowed_exit_time = 2
+ max_allowed_exit_time = 5
self.assertLess(
end_time - start_time, max_allowed_exit_time,
"Failed to exit under %s. Instead exited in %s." % (
diff --git a/tests/unit/test_futures.py b/tests/unit/test_futures.py
index e14cd07..a145c20 100644
--- a/tests/unit/test_futures.py
+++ b/tests/unit/test_futures.py
@@ -10,7 +10,11 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
+import sys
import time
+import traceback
+
+import mock
from concurrent.futures import ThreadPoolExecutor
from tests import unittest
@@ -24,6 +28,9 @@ from s3transfer.futures import TransferMeta
from s3transfer.futures import TransferCoordinator
from s3transfer.futures import BoundedExecutor
from s3transfer.futures import ExecutorFuture
+from s3transfer.futures import BaseExecutor
+from s3transfer.futures import NonThreadedExecutor
+from s3transfer.futures import NonThreadedExecutorFuture
from s3transfer.tasks import Task
from s3transfer.utils import FunctionContainer
from s3transfer.utils import TaskSemaphore
@@ -34,6 +41,17 @@ def return_call_args(*args, **kwargs):
return args, kwargs
+def raise_exception(exception):
+ raise exception
+
+
+def get_exc_info(exception):
+ try:
+ raise_exception(exception)
+ except:
+ return sys.exc_info()
+
+
class RecordingTransferCoordinator(TransferCoordinator):
def __init__(self):
self.all_transfer_futures_ever_associated = set()
@@ -530,6 +548,12 @@ class TestBoundedExecutor(unittest.TestCase):
# not done, which it should not be because it it slow.
self.assertFalse(future.done())
+ def test_replace_underlying_executor(self):
+ mocked_executor_cls = mock.Mock(BaseExecutor)
+ executor = BoundedExecutor(10, 1, {}, mocked_executor_cls)
+ executor.submit(self.get_task(ReturnFooTask))
+ self.assertTrue(mocked_executor_cls.return_value.submit.called)
+
class TestExecutorFuture(unittest.TestCase):
def test_result(self):
@@ -552,3 +576,85 @@ class TestExecutorFuture(unittest.TestCase):
wrapped_future.add_done_callback(
FunctionContainer(done_callbacks.append, 'called'))
self.assertEqual(done_callbacks, ['called'])
+
+
+class TestNonThreadedExecutor(unittest.TestCase):
+ def test_submit(self):
+ executor = NonThreadedExecutor()
+ future = executor.submit(return_call_args, 1, 2, foo='bar')
+ self.assertIsInstance(future, NonThreadedExecutorFuture)
+ self.assertEqual(future.result(), ((1, 2), {'foo': 'bar'}))
+
+ def test_submit_with_exception(self):
+ executor = NonThreadedExecutor()
+ future = executor.submit(raise_exception, RuntimeError())
+ self.assertIsInstance(future, NonThreadedExecutorFuture)
+ with self.assertRaises(RuntimeError):
+ future.result()
+
+ def test_submit_with_exception_and_captures_info(self):
+ exception = ValueError('message')
+ tb = get_exc_info(exception)[2]
+ future = NonThreadedExecutor().submit(raise_exception, exception)
+ try:
+ future.result()
+ # An exception should have been raised
+ self.fail('Future should have raised a ValueError')
+ except ValueError:
+ actual_tb = sys.exc_info()[2]
+ last_frame = traceback.extract_tb(actual_tb)[-1]
+ last_expected_frame = traceback.extract_tb(tb)[-1]
+ self.assertEqual(last_frame, last_expected_frame)
+
+
+class TestNonThreadedExecutorFuture(unittest.TestCase):
+ def setUp(self):
+ self.future = NonThreadedExecutorFuture()
+
+ def test_done_starts_false(self):
+ self.assertFalse(self.future.done())
+
+ def test_done_after_setting_result(self):
+ self.future.set_result('result')
+ self.assertTrue(self.future.done())
+
+ def test_done_after_setting_exception(self):
+ self.future.set_exception_info(Exception(), None)
+ self.assertTrue(self.future.done())
+
+ def test_result(self):
+ self.future.set_result('result')
+ self.assertEqual(self.future.result(), 'result')
+
+ def test_exception_result(self):
+ exception = ValueError('message')
+ self.future.set_exception_info(exception, None)
+ with self.assertRaisesRegexp(ValueError, 'message'):
+ self.future.result()
+
+ def test_exception_result_doesnt_modify_last_frame(self):
+ exception = ValueError('message')
+ tb = get_exc_info(exception)[2]
+ self.future.set_exception_info(exception, tb)
+ try:
+ self.future.result()
+ # An exception should have been raised
+ self.fail()
+ except ValueError:
+ actual_tb = sys.exc_info()[2]
+ last_frame = traceback.extract_tb(actual_tb)[-1]
+ last_expected_frame = traceback.extract_tb(tb)[-1]
+ self.assertEqual(last_frame, last_expected_frame)
+
+ def test_done_callback(self):
+ done_futures = []
+ self.future.add_done_callback(done_futures.append)
+ self.assertEqual(done_futures, [])
+ self.future.set_result('result')
+ self.assertEqual(done_futures, [self.future])
+
+ def test_done_callback_after_done(self):
+ self.future.set_result('result')
+ done_futures = []
+ self.future.add_done_callback(done_futures.append)
+ self.assertEqual(done_futures, [self.future])
diff --git a/tests/unit/test_tasks.py b/tests/unit/test_tasks.py
index 82e5e38..b126d8c 100644
--- a/tests/unit/test_tasks.py
+++ b/tests/unit/test_tasks.py
@@ -69,14 +69,14 @@ class NOOPSubmissionTask(SubmissionTask):
class ExceptionSubmissionTask(SubmissionTask):
def _submit(self, transfer_future, executor=None, tasks_to_submit=None,
- additional_callbacks=None):
+ additional_callbacks=None, exception=TaskFailureException):
if executor and tasks_to_submit:
for task_to_submit in tasks_to_submit:
self._transfer_coordinator.submit(executor, task_to_submit)
if additional_callbacks:
for callback in additional_callbacks:
callback()
- raise TaskFailureException()
+ raise exception()
class StatusRecordingTransferCoordinator(TransferCoordinator):
@@ -164,6 +164,16 @@ class TestSubmissionTask(BaseSubmissionTaskTest):
with self.assertRaises(TaskFailureException):
self.transfer_future.result()
+ def test_catches_and_sets_keyboard_interrupt_exception_from_submit(self):
+ self.main_kwargs['exception'] = KeyboardInterrupt
+ submission_task = self.get_task(
+ ExceptionSubmissionTask, main_kwargs=self.main_kwargs)
+ submission_task()
+
+ self.assertEqual(self.transfer_coordinator.status, 'failed')
+ with self.assertRaises(KeyboardInterrupt):
+ self.transfer_future.result()
+
def test_calls_done_callbacks_on_exception(self):
submission_task = self.get_task(
ExceptionSubmissionTask, main_kwargs=self.main_kwargs)
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/python-s3transfer.git
More information about the Python-modules-commits
mailing list