[Python-modules-commits] [python-s3transfer] 01/04: New upstream version 0.1.10

Takaki Taniguchi takaki at moszumanska.debian.org
Thu Feb 2 08:45:38 UTC 2017


This is an automated email from the git hooks/post-receive script.

takaki pushed a commit to branch master
in repository python-s3transfer.

commit 4338ce57855bd4889c7100cc4f7465bd05dc1eaa
Author: TANIGUCHI Takaki <takaki at asis.media-as.org>
Date:   Thu Feb 2 17:40:08 2017 +0900

    New upstream version 0.1.10
---
 .changes/0.1.10.json               |   7 +++
 .changes/0.1.9.json                |   7 +++
 CHANGELOG.rst                      |  12 +++++
 README.rst                         |  10 ++--
 s3transfer/__init__.py             |   2 +-
 s3transfer/futures.py              |  93 +++++++++++++++++++++++++++++++-
 s3transfer/manager.py              |  16 ++++--
 s3transfer/tasks.py                |   8 ++-
 scripts/ci/run-integ-tests         |   8 ++-
 scripts/ci/run-tests               |   8 ++-
 setup.py                           |   2 +-
 tests/__init__.py                  |  25 +++++++++
 tests/functional/test_download.py  |   3 ++
 tests/functional/test_manager.py   |  22 ++++++++
 tests/integration/test_download.py |  17 +++++-
 tests/integration/test_upload.py   |  18 ++++++-
 tests/unit/test_futures.py         | 106 +++++++++++++++++++++++++++++++++++++
 tests/unit/test_tasks.py           |  14 ++++-
 18 files changed, 355 insertions(+), 23 deletions(-)

diff --git a/.changes/0.1.10.json b/.changes/0.1.10.json
new file mode 100644
index 0000000..16279d0
--- /dev/null
+++ b/.changes/0.1.10.json
@@ -0,0 +1,7 @@
+[
+  {
+    "category": "``TransferManager``", 
+    "description": "Expose ability to use own executor class for ``TransferManager``", 
+    "type": "feature"
+  }
+]
\ No newline at end of file
diff --git a/.changes/0.1.9.json b/.changes/0.1.9.json
new file mode 100644
index 0000000..801cb5e
--- /dev/null
+++ b/.changes/0.1.9.json
@@ -0,0 +1,7 @@
+[
+  {
+    "category": "``TransferFuture``", 
+    "description": "Add support for setting exceptions on transfer future", 
+    "type": "feature"
+  }
+]
diff --git a/CHANGELOG.rst b/CHANGELOG.rst
index ad25ead..9721d1a 100644
--- a/CHANGELOG.rst
+++ b/CHANGELOG.rst
@@ -2,6 +2,18 @@
 CHANGELOG
 =========
 
+0.1.10
+======
+
+* feature:``TransferManager``: Expose ability to use own executor class for ``TransferManager``
+
+
+0.1.9
+=====
+
+* feature:``TransferFuture``: Add support for setting exceptions on transfer future
+
+
 0.1.8
 =====
 
diff --git a/README.rst b/README.rst
index ad476bd..0c5fc0e 100644
--- a/README.rst
+++ b/README.rst
@@ -2,9 +2,11 @@
 s3transfer - An Amazon S3 Transfer Manager for Python
 =====================================================
 
-.. warning::
+S3transfer is a Python library for managing Amazon S3 transfers.
 
-  This project is currently a work in progress. Please do not rely on
-  this functionality in production as the interfaces may change over time.
+.. note::
 
-S3transfer is a Python library for managing Amazon S3 transfers.
+  This project is not currently GA. If you are planning to use this code in
+  production, make sure to lock to a minor version as interfaces may break
+  from minor version to minor version. For a basic, stable interface of
+  s3transfer, try the interfaces exposed in `boto3 <https://boto3.readthedocs.io/en/latest/guide/s3.html#using-the-transfer-manager>`__
diff --git a/s3transfer/__init__.py b/s3transfer/__init__.py
index 28fbceb..4ab0720 100644
--- a/s3transfer/__init__.py
+++ b/s3transfer/__init__.py
@@ -143,7 +143,7 @@ from s3transfer.exceptions import RetriesExceededError, S3UploadFailedError
 
 
 __author__ = 'Amazon Web Services'
-__version__ = '0.1.9'
+__version__ = '0.1.10'
 
 
 class NullHandler(logging.Handler):
diff --git a/s3transfer/futures.py b/s3transfer/futures.py
index afecc40..8f87b65 100644
--- a/s3transfer/futures.py
+++ b/s3transfer/futures.py
@@ -14,9 +14,11 @@ from concurrent import futures
 from collections import namedtuple
 import copy
 import logging
+import sys
 import threading
 
 from s3transfer.compat import MAXINT
+from s3transfer.compat import six
 from s3transfer.exceptions import CancelledError, TransferNotDoneError
 from s3transfer.utils import FunctionContainer
 from s3transfer.utils import TaskSemaphore
@@ -367,7 +369,8 @@ class TransferCoordinator(object):
 class BoundedExecutor(object):
     EXECUTOR_CLS = futures.ThreadPoolExecutor
 
-    def __init__(self, max_size, max_num_threads, tag_semaphores=None):
+    def __init__(self, max_size, max_num_threads, tag_semaphores=None,
+                 executor_cls=None):
         """An executor implentation that has a maximum queued up tasks
 
         The executor will block if the number of tasks that have been
@@ -385,9 +388,16 @@ class BoundedExecutor(object):
         :params tag_semaphores: A dictionary where the key is the name of the
             tag and the value is the semaphore to use when limiting the
             number of tasks the executor is processing at a time.
+
+        :type executor_cls: BaseExecutor
+        :param underlying_executor_cls: The executor class that
+            get bounded by this executor. If None is provided, the
+            concurrent.futures.ThreadPoolExecutor class is used.
         """
         self._max_num_threads = max_num_threads
-        self._executor = self.EXECUTOR_CLS(max_workers=self._max_num_threads)
+        if executor_cls is None:
+            executor_cls = self.EXECUTOR_CLS
+        self._executor = executor_cls(max_workers=self._max_num_threads)
         self._semaphore = TaskSemaphore(max_size)
         self._tag_semaphores = tag_semaphores
 
@@ -467,6 +477,85 @@ class ExecutorFuture(object):
         return self._future.done()
 
 
+class BaseExecutor(object):
+    """Base Executor class implementation needed to work with s3transfer"""
+    def __init__(self, max_workers=None):
+        pass
+
+    def submit(self, fn, *args, **kwargs):
+        raise NotImplementedError('submit()')
+
+    def shutdown(self, wait=True):
+        raise NotImplementedError('shutdown()')
+
+
+class NonThreadedExecutor(BaseExecutor):
+    """A drop-in replacement non-threaded version of ThreadPoolExecutor"""
+    def submit(self, fn, *args, **kwargs):
+        future = NonThreadedExecutorFuture()
+        try:
+            result = fn(*args, **kwargs)
+            future.set_result(result)
+        except Exception:
+            e, tb = sys.exc_info()[1:]
+            logger.debug(
+                'Setting exception for %s to %s with traceback %s',
+                future, e, tb
+            )
+            future.set_exception_info(e, tb)
+        return future
+
+    def shutdown(self, wait=True):
+        pass
+
+
+class NonThreadedExecutorFuture(object):
+    """The Future returned from NonThreadedExecutor
+
+    Note that this future is **not** thread-safe as it is being used
+    from the context of a non-threaded environment.
+    """
+    def __init__(self):
+        self._result = None
+        self._exception = None
+        self._traceback = None
+        self._done = False
+        self._done_callbacks = []
+
+    def set_result(self, result):
+        self._result = result
+        self._set_done()
+
+    def set_exception_info(self, exception, traceback):
+        self._exception = exception
+        self._traceback = traceback
+        self._set_done()
+
+    def result(self, timeout=None):
+        if self._exception:
+            six.reraise(
+                type(self._exception), self._exception, self._traceback)
+        return self._result
+
+    def _set_done(self):
+        self._done = True
+        for done_callback in self._done_callbacks:
+            self._invoke_done_callback(done_callback)
+        self._done_callbacks = []
+
+    def _invoke_done_callback(self, done_callback):
+        return done_callback(self)
+
+    def done(self):
+        return self._done
+
+    def add_done_callback(self, fn):
+        if self._done:
+            self._invoke_done_callback(fn)
+        else:
+            self._done_callbacks.append(fn)
+
+
 TaskTag = namedtuple('TaskTag', ['name'])
 
 IN_MEMORY_UPLOAD_TAG = TaskTag('in_memory_upload')
diff --git a/s3transfer/manager.py b/s3transfer/manager.py
index cfc62be..ba5eb61 100644
--- a/s3transfer/manager.py
+++ b/s3transfer/manager.py
@@ -193,13 +193,17 @@ class TransferManager(object):
         'RequestPayer',
     ]
 
-    def __init__(self, client, config=None, osutil=None):
+    def __init__(self, client, config=None, osutil=None, executor_cls=None):
         """A transfer manager interface for Amazon S3
 
         :param client: Client to be used by the manager
         :param config: TransferConfig to associate specific configurations
         :param osutil: OSUtils object to use for os-related behavior when
             using with transfer manager.
+
+        :type executor_cls: s3transfer.futures.BaseExecutor
+        :param executor_cls: The class of executor to use with the transfer
+            manager. By default, concurrent.futures.ThreadPoolExecutor is used.
         """
         self._client = client
         self._config = config
@@ -221,21 +225,25 @@ class TransferManager(object):
                     self._config.max_in_memory_upload_chunks),
                 IN_MEMORY_DOWNLOAD_TAG: SlidingWindowSemaphore(
                     self._config.max_in_memory_download_chunks)
-            }
+            },
+            executor_cls=executor_cls
         )
 
         # The executor responsible for submitting the necessary tasks to
         # perform the desired transfer
         self._submission_executor = BoundedExecutor(
             max_size=self._config.max_submission_queue_size,
-            max_num_threads=self._config.max_submission_concurrency
+            max_num_threads=self._config.max_submission_concurrency,
+            executor_cls=executor_cls
+
         )
 
         # There is one thread available for writing to disk. It will handle
         # downloads for all files.
         self._io_executor = BoundedExecutor(
             max_size=self._config.max_io_queue_size,
-            max_num_threads=1
+            max_num_threads=1,
+            executor_cls=executor_cls
         )
         self._register_handlers()
 
diff --git a/s3transfer/tasks.py b/s3transfer/tasks.py
index 9ce0f21..4427927 100644
--- a/s3transfer/tasks.py
+++ b/s3transfer/tasks.py
@@ -253,11 +253,17 @@ class SubmissionTask(Task):
             # Call the submit method to start submitting tasks to execute the
             # transfer.
             self._submit(transfer_future=transfer_future, **kwargs)
-        except Exception as e:
+        except BaseException as e:
             # If there was an exception rasied during the submission of task
             # there is a chance that the final task that signals if a transfer
             # is done and too run the cleanup may never have been submitted in
             # the first place so we need to account accordingly.
+            #
+            # Note that BaseException is caught, instead of Exception, because
+            # for some implmentations of executors, specifically the serial
+            # implementation, the SubmissionTask is directly exposed to
+            # KeyboardInterupts and so needs to cleanup and signal done
+            # for those as well.
 
             # Set the exception, that caused the process to fail.
             self._log_and_set_exception(e)
diff --git a/scripts/ci/run-integ-tests b/scripts/ci/run-integ-tests
index 3118bde..5cd8328 100755
--- a/scripts/ci/run-integ-tests
+++ b/scripts/ci/run-integ-tests
@@ -12,9 +12,13 @@ REPO_ROOT = _dname(_dname(_dname(os.path.abspath(__file__))))
 os.chdir(os.path.join(REPO_ROOT, 'tests'))
 
 
-def run(command):
-    return check_call(command, shell=True)
+def run(command, env=None):
+    return check_call(command, shell=True, env=env)
 
 
 run('nosetests --with-xunit --cover-erase --with-coverage '
     '--cover-package s3transfer --cover-xml -v integration')
+
+# Run the serial implementation of s3transfer
+os.environ['USE_SERIAL_EXECUTOR'] = 'True'
+run('nosetests -v integration', env=os.environ)
diff --git a/scripts/ci/run-tests b/scripts/ci/run-tests
index fd7e189..8d23f2c 100755
--- a/scripts/ci/run-tests
+++ b/scripts/ci/run-tests
@@ -12,9 +12,13 @@ REPO_ROOT = _dname(_dname(_dname(os.path.abspath(__file__))))
 os.chdir(os.path.join(REPO_ROOT, 'tests'))
 
 
-def run(command):
-    return check_call(command, shell=True)
+def run(command, env=None):
+    return check_call(command, shell=True, env=env)
 
 
 run('nosetests --with-coverage --cover-erase --cover-package s3transfer '
     '--with-xunit --cover-xml -v unit/ functional/')
+
+# Run the serial implementation of s3transfer
+os.environ['USE_SERIAL_EXECUTOR'] = 'True'
+run('nosetests -v functional/', env=os.environ)
diff --git a/setup.py b/setup.py
index dc67232..942edf9 100644
--- a/setup.py
+++ b/setup.py
@@ -43,7 +43,7 @@ setup(
     },
     license="Apache License 2.0",
     classifiers=(
-        'Development Status :: 1 - Planning',
+        'Development Status :: 3 - Alpha',
         'Intended Audience :: Developers',
         'Natural Language :: English',
         'License :: OSI Approved :: Apache Software License',
diff --git a/tests/__init__.py b/tests/__init__.py
index eeaa758..3cef6e7 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -34,6 +34,7 @@ from s3transfer.futures import TransferCoordinator
 from s3transfer.futures import TransferMeta
 from s3transfer.futures import TransferFuture
 from s3transfer.futures import BoundedExecutor
+from s3transfer.futures import NonThreadedExecutor
 from s3transfer.subscribers import BaseSubscriber
 from s3transfer.utils import OSUtils
 from s3transfer.utils import CallArgs
@@ -41,6 +42,22 @@ from s3transfer.utils import TaskSemaphore
 from s3transfer.utils import SlidingWindowSemaphore
 
 
+ORIGINAL_EXECUTOR_CLS = BoundedExecutor.EXECUTOR_CLS
+
+
+def setup_package():
+    if is_serial_implementation():
+        BoundedExecutor.EXECUTOR_CLS = NonThreadedExecutor
+
+
+def teardown_package():
+    BoundedExecutor.EXECUTOR_CLS = ORIGINAL_EXECUTOR_CLS
+
+
+def is_serial_implementation():
+    return os.environ.get('USE_SERIAL_EXECUTOR', False)
+
+
 def assert_files_equal(first, second):
     if os.path.getsize(first) != os.path.getsize(second):
         raise AssertionError("Files are not equal: %s, %s" % (first, second))
@@ -82,6 +99,14 @@ def skip_if_windows(reason):
     return decorator
 
 
+def skip_if_using_serial_implementation(reason):
+    """Decorator to skip tests when running as the serial implementation"""
+    def decorator(func):
+        return unittest.skipIf(
+            is_serial_implementation(), reason)(func)
+    return decorator
+
+
 class StreamWithError(object):
     """A wrapper to simulate errors while reading from a stream
 
diff --git a/tests/functional/test_download.py b/tests/functional/test_download.py
index 7959fb6..4d99913 100644
--- a/tests/functional/test_download.py
+++ b/tests/functional/test_download.py
@@ -25,6 +25,7 @@ from tests import RecordingOSUtils
 from tests import NonSeekableWriter
 from tests import BaseGeneralInterfaceTest
 from tests import skip_if_windows
+from tests import skip_if_using_serial_implementation
 from s3transfer.compat import six
 from s3transfer.compat import SOCKET_ERROR
 from s3transfer.exceptions import RetriesExceededError
@@ -317,6 +318,8 @@ class BaseDownloadTest(BaseGeneralInterfaceTest):
         self.assertEqual(len(osutil.rename_records), 1)
 
     @skip_if_windows('Windows does not support UNIX special files')
+    @skip_if_using_serial_implementation(
+        'A seperate thread is needed to read from the fifo')
     def test_download_for_fifo_file(self):
         self.add_head_object_response()
         self.add_successful_get_object_responses()
diff --git a/tests/functional/test_manager.py b/tests/functional/test_manager.py
index c6638e0..5b9116d 100644
--- a/tests/functional/test_manager.py
+++ b/tests/functional/test_manager.py
@@ -11,10 +11,13 @@
 # ANY KIND, either express or implied. See the License for the specific
 # language governing permissions and limitations under the License.
 from botocore.awsrequest import create_request_object
+import mock
 
+from tests import skip_if_using_serial_implementation
 from tests import StubbedClientTest
 from s3transfer.exceptions import CancelledError
 from s3transfer.exceptions import FatalError
+from s3transfer.futures import BaseExecutor
 from s3transfer.manager import TransferManager
 from s3transfer.manager import TransferConfig
 
@@ -37,6 +40,12 @@ class CallbackEnablingBody(object):
 
 
 class TestTransferManager(StubbedClientTest):
+    @skip_if_using_serial_implementation(
+        'Exception is thrown once all transfers are submitted. '
+        'However for the serial implementation, transfers are performed '
+        'in main thread meaning all transfers will complete before the '
+        'exception being thrown.'
+    )
     def test_error_in_context_manager_cancels_incomplete_transfers(self):
         # The purpose of this test is to make sure if an error is raised
         # in the body of the context manager, incomplete transfers will
@@ -69,6 +78,12 @@ class TestTransferManager(StubbedClientTest):
                 for future in futures:
                     future.result()
 
+    @skip_if_using_serial_implementation(
+        'Exception is thrown once all transfers are submitted. '
+        'However for the serial implementation, transfers are performed '
+        'in main thread meaning all transfers will complete before the '
+        'exception being thrown.'
+    )
     def test_cntrl_c_in_context_manager_cancels_incomplete_transfers(self):
         # The purpose of this test is to make sure if an error is raised
         # in the body of the context manager, incomplete transfers will
@@ -125,3 +140,10 @@ class TestTransferManager(StubbedClientTest):
             body.disable_callback_call_count, 1,
             'The disable_callback() should have only ever been registered '
             'once')
+
+    def test_use_custom_executor_implementation(self):
+        mocked_executor_cls = mock.Mock(BaseExecutor)
+        transfer_manager = TransferManager(
+            self.client, executor_cls=mocked_executor_cls)
+        transfer_manager.delete('bucket', 'key')
+        self.assertTrue(mocked_executor_cls.return_value.submit.called)
diff --git a/tests/integration/test_download.py b/tests/integration/test_download.py
index f549707..9215718 100644
--- a/tests/integration/test_download.py
+++ b/tests/integration/test_download.py
@@ -18,6 +18,7 @@ from concurrent.futures import CancelledError
 
 from tests import assert_files_equal
 from tests import skip_if_windows
+from tests import skip_if_using_serial_implementation
 from tests import RecordingSubscriber
 from tests import NonSeekableWriter
 from tests.integration import BaseTransferManagerIntegTest
@@ -58,6 +59,12 @@ class TestDownload(BaseTransferManagerIntegTest):
         future.result()
         assert_files_equal(filename, download_path)
 
+    @skip_if_using_serial_implementation(
+        'Exception is thrown once the transfer is submitted. '
+        'However for the serial implementation, transfers are performed '
+        'in main thread meaning the transfer will complete before the '
+        'KeyboardInterrupt being thrown.'
+    )
     def test_large_download_exits_quicky_on_exception(self):
         transfer_manager = self.create_transfer_manager(self.config)
 
@@ -83,7 +90,7 @@ class TestDownload(BaseTransferManagerIntegTest):
         # The maximum time allowed for the transfer manager to exit.
         # This means that it should take less than a couple second after
         # sleeping to exit.
-        max_allowed_exit_time = sleep_time + 1
+        max_allowed_exit_time = sleep_time + 4
         self.assertLess(
             end_time - start_time, max_allowed_exit_time,
             "Failed to exit under %s. Instead exited in %s." % (
@@ -99,6 +106,12 @@ class TestDownload(BaseTransferManagerIntegTest):
         possible_matches = glob.glob('%s*' % download_path)
         self.assertEqual(possible_matches, [])
 
+    @skip_if_using_serial_implementation(
+        'Exception is thrown once the transfer is submitted. '
+        'However for the serial implementation, transfers are performed '
+        'in main thread meaning the transfer will complete before the '
+        'KeyboardInterrupt being thrown.'
+    )
     def test_many_files_exits_quicky_on_exception(self):
         # Set the max request queue size and number of submission threads
         # to something small to simulate having a large queue
@@ -131,7 +144,7 @@ class TestDownload(BaseTransferManagerIntegTest):
         end_time = time.time()
         # The maximum time allowed for the transfer manager to exit.
         # This means that it should take less than a couple seconds to exit.
-        max_allowed_exit_time = 2
+        max_allowed_exit_time = 5
         self.assertLess(
             end_time - start_time, max_allowed_exit_time,
             "Failed to exit under %s. Instead exited in %s." % (
diff --git a/tests/integration/test_upload.py b/tests/integration/test_upload.py
index e8ab7c9..3887a85 100644
--- a/tests/integration/test_upload.py
+++ b/tests/integration/test_upload.py
@@ -15,6 +15,7 @@ import time
 from concurrent.futures import CancelledError
 
 from botocore.compat import six
+from tests import skip_if_using_serial_implementation
 from tests import RecordingSubscriber, NonSeekableReader
 from tests.integration import BaseTransferManagerIntegTest
 from s3transfer.manager import TransferConfig
@@ -49,6 +50,12 @@ class TestUpload(BaseTransferManagerIntegTest):
         future.result()
         self.assertTrue(self.object_exists('20mb.txt'))
 
+    @skip_if_using_serial_implementation(
+        'Exception is thrown once the transfer is submitted. '
+        'However for the serial implementation, transfers are performed '
+        'in main thread meaning the transfer will complete before the '
+        'KeyboardInterrupt being thrown.'
+    )
     def test_large_upload_exits_quicky_on_exception(self):
         transfer_manager = self.create_transfer_manager(self.config)
 
@@ -72,7 +79,7 @@ class TestUpload(BaseTransferManagerIntegTest):
         # The maximum time allowed for the transfer manager to exit.
         # This means that it should take less than a couple second after
         # sleeping to exit.
-        max_allowed_exit_time = sleep_time + 2
+        max_allowed_exit_time = sleep_time + 5
         self.assertLess(
             end_time - start_time, max_allowed_exit_time,
             "Failed to exit under %s. Instead exited in %s." % (
@@ -90,6 +97,13 @@ class TestUpload(BaseTransferManagerIntegTest):
             # make sure the object does not exist.
             self.assertFalse(self.object_exists('20mb.txt'))
 
+
+    @skip_if_using_serial_implementation(
+        'Exception is thrown once the transfers are submitted. '
+        'However for the serial implementation, transfers are performed '
+        'in main thread meaning the transfers will complete before the '
+        'KeyboardInterrupt being thrown.'
+    )
     def test_many_files_exits_quicky_on_exception(self):
         # Set the max request queue size and number of submission threads
         # to something small to simulate having a large queue
@@ -121,7 +135,7 @@ class TestUpload(BaseTransferManagerIntegTest):
         end_time = time.time()
         # The maximum time allowed for the transfer manager to exit.
         # This means that it should take less than a couple seconds to exit.
-        max_allowed_exit_time = 2
+        max_allowed_exit_time = 5
         self.assertLess(
             end_time - start_time, max_allowed_exit_time,
             "Failed to exit under %s. Instead exited in %s." % (
diff --git a/tests/unit/test_futures.py b/tests/unit/test_futures.py
index e14cd07..a145c20 100644
--- a/tests/unit/test_futures.py
+++ b/tests/unit/test_futures.py
@@ -10,7 +10,11 @@
 # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
 # ANY KIND, either express or implied. See the License for the specific
 # language governing permissions and limitations under the License.
+import sys
 import time
+import traceback
+
+import mock
 from concurrent.futures import ThreadPoolExecutor
 
 from tests import unittest
@@ -24,6 +28,9 @@ from s3transfer.futures import TransferMeta
 from s3transfer.futures import TransferCoordinator
 from s3transfer.futures import BoundedExecutor
 from s3transfer.futures import ExecutorFuture
+from s3transfer.futures import BaseExecutor
+from s3transfer.futures import NonThreadedExecutor
+from s3transfer.futures import NonThreadedExecutorFuture
 from s3transfer.tasks import Task
 from s3transfer.utils import FunctionContainer
 from s3transfer.utils import TaskSemaphore
@@ -34,6 +41,17 @@ def return_call_args(*args, **kwargs):
     return args, kwargs
 
 
+def raise_exception(exception):
+    raise exception
+
+
+def get_exc_info(exception):
+    try:
+        raise_exception(exception)
+    except:
+        return sys.exc_info()
+
+
 class RecordingTransferCoordinator(TransferCoordinator):
     def __init__(self):
         self.all_transfer_futures_ever_associated = set()
@@ -530,6 +548,12 @@ class TestBoundedExecutor(unittest.TestCase):
         # not done, which it should not be because it it slow.
         self.assertFalse(future.done())
 
+    def test_replace_underlying_executor(self):
+        mocked_executor_cls = mock.Mock(BaseExecutor)
+        executor = BoundedExecutor(10, 1, {}, mocked_executor_cls)
+        executor.submit(self.get_task(ReturnFooTask))
+        self.assertTrue(mocked_executor_cls.return_value.submit.called)
+
 
 class TestExecutorFuture(unittest.TestCase):
     def test_result(self):
@@ -552,3 +576,85 @@ class TestExecutorFuture(unittest.TestCase):
             wrapped_future.add_done_callback(
                 FunctionContainer(done_callbacks.append, 'called'))
         self.assertEqual(done_callbacks, ['called'])
+
+
+class TestNonThreadedExecutor(unittest.TestCase):
+    def test_submit(self):
+        executor = NonThreadedExecutor()
+        future = executor.submit(return_call_args, 1, 2, foo='bar')
+        self.assertIsInstance(future, NonThreadedExecutorFuture)
+        self.assertEqual(future.result(), ((1, 2), {'foo': 'bar'}))
+
+    def test_submit_with_exception(self):
+        executor = NonThreadedExecutor()
+        future = executor.submit(raise_exception, RuntimeError())
+        self.assertIsInstance(future, NonThreadedExecutorFuture)
+        with self.assertRaises(RuntimeError):
+            future.result()
+
+    def test_submit_with_exception_and_captures_info(self):
+        exception = ValueError('message')
+        tb = get_exc_info(exception)[2]
+        future = NonThreadedExecutor().submit(raise_exception, exception)
+        try:
+            future.result()
+            # An exception should have been raised
+            self.fail('Future should have raised a ValueError')
+        except ValueError:
+            actual_tb = sys.exc_info()[2]
+            last_frame = traceback.extract_tb(actual_tb)[-1]
+            last_expected_frame = traceback.extract_tb(tb)[-1]
+            self.assertEqual(last_frame, last_expected_frame)
+
+
+class TestNonThreadedExecutorFuture(unittest.TestCase):
+    def setUp(self):
+        self.future = NonThreadedExecutorFuture()
+
+    def test_done_starts_false(self):
+        self.assertFalse(self.future.done())
+
+    def test_done_after_setting_result(self):
+        self.future.set_result('result')
+        self.assertTrue(self.future.done())
+
+    def test_done_after_setting_exception(self):
+        self.future.set_exception_info(Exception(), None)
+        self.assertTrue(self.future.done())
+
+    def test_result(self):
+        self.future.set_result('result')
+        self.assertEqual(self.future.result(), 'result')
+
+    def test_exception_result(self):
+        exception = ValueError('message')
+        self.future.set_exception_info(exception, None)
+        with self.assertRaisesRegexp(ValueError, 'message'):
+            self.future.result()
+
+    def test_exception_result_doesnt_modify_last_frame(self):
+        exception = ValueError('message')
+        tb = get_exc_info(exception)[2]
+        self.future.set_exception_info(exception, tb)
+        try:
+            self.future.result()
+            # An exception should have been raised
+            self.fail()
+        except ValueError:
+            actual_tb = sys.exc_info()[2]
+            last_frame = traceback.extract_tb(actual_tb)[-1]
+            last_expected_frame = traceback.extract_tb(tb)[-1]
+            self.assertEqual(last_frame, last_expected_frame)
+
+    def test_done_callback(self):
+        done_futures = []
+        self.future.add_done_callback(done_futures.append)
+        self.assertEqual(done_futures, [])
+        self.future.set_result('result')
+        self.assertEqual(done_futures, [self.future])
+
+    def test_done_callback_after_done(self):
+        self.future.set_result('result')
+        done_futures = []
+        self.future.add_done_callback(done_futures.append)
+        self.assertEqual(done_futures, [self.future])
diff --git a/tests/unit/test_tasks.py b/tests/unit/test_tasks.py
index 82e5e38..b126d8c 100644
--- a/tests/unit/test_tasks.py
+++ b/tests/unit/test_tasks.py
@@ -69,14 +69,14 @@ class NOOPSubmissionTask(SubmissionTask):
 
 class ExceptionSubmissionTask(SubmissionTask):
     def _submit(self, transfer_future, executor=None, tasks_to_submit=None,
-                additional_callbacks=None):
+                additional_callbacks=None, exception=TaskFailureException):
         if executor and tasks_to_submit:
             for task_to_submit in tasks_to_submit:
                 self._transfer_coordinator.submit(executor, task_to_submit)
         if additional_callbacks:
             for callback in additional_callbacks:
                 callback()
-        raise TaskFailureException()
+        raise exception()
 
 
 class StatusRecordingTransferCoordinator(TransferCoordinator):
@@ -164,6 +164,16 @@ class TestSubmissionTask(BaseSubmissionTaskTest):
         with self.assertRaises(TaskFailureException):
             self.transfer_future.result()
 
+    def test_catches_and_sets_keyboard_interrupt_exception_from_submit(self):
+        self.main_kwargs['exception'] = KeyboardInterrupt
+        submission_task = self.get_task(
+            ExceptionSubmissionTask, main_kwargs=self.main_kwargs)
+        submission_task()
+
+        self.assertEqual(self.transfer_coordinator.status, 'failed')
+        with self.assertRaises(KeyboardInterrupt):
+            self.transfer_future.result()
+
     def test_calls_done_callbacks_on_exception(self):
         submission_task = self.get_task(
             ExceptionSubmissionTask, main_kwargs=self.main_kwargs)

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/python-s3transfer.git



More information about the Python-modules-commits mailing list