[Python-modules-commits] [python-s3transfer] 02/07: New upstream version 0.1.12

Takaki Taniguchi takaki at moszumanska.debian.org
Sat Feb 3 12:55:01 UTC 2018


This is an automated email from the git hooks/post-receive script.

takaki pushed a commit to branch master
in repository python-s3transfer.

commit f4bcb1a9ab5faabb0be1b3102d4a37f73f10ffe5
Author: TANIGUCHI Takaki <takaki at asis.media-as.org>
Date:   Sat Feb 3 19:40:10 2018 +0900

    New upstream version 0.1.12
---
 .changes/0.1.11.json               |   7 +
 .changes/0.1.12.json               |   7 +
 .travis.yml                        |   1 +
 CHANGELOG.rst                      |  12 +
 s3transfer/__init__.py             |   2 +-
 s3transfer/bandwidth.py            | 416 +++++++++++++++++++++++++++++++++
 s3transfer/download.py             |  26 ++-
 s3transfer/manager.py              |  50 +++-
 s3transfer/tasks.py                |   2 +-
 s3transfer/upload.py               |  26 ++-
 s3transfer/utils.py                |  22 +-
 scripts/new-change                 |  10 +-
 setup.cfg                          |   2 +-
 setup.py                           |   1 +
 tests/functional/test_download.py  |  29 +++
 tests/functional/test_manager.py   |  38 ++-
 tests/functional/test_upload.py    |  37 ++-
 tests/integration/test_download.py |   8 +-
 tests/unit/test_bandwidth.py       | 465 +++++++++++++++++++++++++++++++++++++
 tests/unit/test_download.py        |  17 ++
 tests/unit/test_utils.py           |  51 ++++
 tox.ini                            |   2 +-
 22 files changed, 1175 insertions(+), 56 deletions(-)

diff --git a/.changes/0.1.11.json b/.changes/0.1.11.json
new file mode 100644
index 0000000..2f09d01
--- /dev/null
+++ b/.changes/0.1.11.json
@@ -0,0 +1,7 @@
+[
+  {
+    "category": "TransferManager", 
+    "description": "Properly handle unicode exceptions in the context manager. Fixes `#85 <https://github.com/boto/boto3/issues/85>`__", 
+    "type": "bugfix"
+  }
+]
\ No newline at end of file
diff --git a/.changes/0.1.12.json b/.changes/0.1.12.json
new file mode 100644
index 0000000..ffd0d74
--- /dev/null
+++ b/.changes/0.1.12.json
@@ -0,0 +1,7 @@
+[
+  {
+    "category": "``max_bandwidth``", 
+    "description": "Add ability to set maximum bandwidth consumption for streaming of S3 uploads and downloads", 
+    "type": "enhancement"
+  }
+]
\ No newline at end of file
diff --git a/.travis.yml b/.travis.yml
index afd045d..8469de1 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -5,6 +5,7 @@ python:
   - "3.3"
   - "3.4"
   - "3.5"
+  - "3.6"
 sudo: false
 install:
   - python scripts/ci/install
diff --git a/CHANGELOG.rst b/CHANGELOG.rst
index 9721d1a..19e07be 100644
--- a/CHANGELOG.rst
+++ b/CHANGELOG.rst
@@ -2,6 +2,18 @@
 CHANGELOG
 =========
 
+0.1.12
+======
+
+* enhancement:``max_bandwidth``: Add ability to set maximum bandwidth consumption for streaming of S3 uploads and downloads
+
+
+0.1.11
+======
+
+* bugfix:TransferManager: Properly handle unicode exceptions in the context manager. Fixes `#85 <https://github.com/boto/boto3/issues/85>`__
+
+
 0.1.10
 ======
 
diff --git a/s3transfer/__init__.py b/s3transfer/__init__.py
index 4ab0720..57051c7 100644
--- a/s3transfer/__init__.py
+++ b/s3transfer/__init__.py
@@ -143,7 +143,7 @@ from s3transfer.exceptions import RetriesExceededError, S3UploadFailedError
 
 
 __author__ = 'Amazon Web Services'
-__version__ = '0.1.10'
+__version__ = '0.1.12'
 
 
 class NullHandler(logging.Handler):
diff --git a/s3transfer/bandwidth.py b/s3transfer/bandwidth.py
new file mode 100644
index 0000000..8b3f6f5
--- /dev/null
+++ b/s3transfer/bandwidth.py
@@ -0,0 +1,416 @@
+# Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+import time
+import threading
+
+
+class RequestExceededException(Exception):
+    def __init__(self, requested_amt, retry_time):
+        """Error when requested amount exceeds what is allowed
+
+        The request that raised this error should be retried after waiting
+        the time specified by ``retry_time``.
+
+        :type requested_amt: int
+        :param requested_amt: The originally requested byte amount
+
+        :type retry_time: float
+        :param retry_time: The length in time to wait to retry for the
+            requested amount
+        """
+        self.requested_amt = requested_amt
+        self.retry_time = retry_time
+        msg = (
+            'Request amount %s exceeded the amount available. Retry in %s' % (
+                requested_amt, retry_time)
+        )
+        super(RequestExceededException, self).__init__(msg)
+
+
+class RequestToken(object):
+    """A token to pass as an identifier when consuming from the LeakyBucket"""
+    pass
+
+
+class TimeUtils(object):
+    def time(self):
+        """Get the current time back
+
+        :rtype: float
+        :returns: The current time in seconds
+        """
+        return time.time()
+
+    def sleep(self, value):
+        """Sleep for a designated time
+
+        :type value: float
+        :param value: The time to sleep for in seconds
+        """
+        return time.sleep(value)
+
+
+class BandwidthLimiter(object):
+    def __init__(self, leaky_bucket, time_utils=None):
+        """Limits bandwidth for shared S3 transfers
+
+        :type leaky_bucket: LeakyBucket
+        :param leaky_bucket: The leaky bucket to use limit bandwidth
+
+        :type time_utils: TimeUtils
+        :param time_utils: Time utility to use for interacting with time.
+        """
+        self._leaky_bucket = leaky_bucket
+        self._time_utils = time_utils
+        if time_utils is None:
+            self._time_utils = TimeUtils()
+
+    def get_bandwith_limited_stream(self, fileobj, transfer_coordinator,
+                                    enabled=True):
+        """Wraps a fileobj in a bandwidth limited stream wrapper
+
+        :type fileobj: file-like obj
+        :param fileobj: The file-like obj to wrap
+
+        :type transfer_coordinator: s3transfer.futures.TransferCoordinator
+        param transfer_coordinator: The coordinator for the general transfer
+            that the wrapped stream is a part of
+
+        :type enabled: boolean
+        :param enabled: Whether bandwidth limiting should be enabled to start
+        """
+        stream = BandwidthLimitedStream(
+            fileobj, self._leaky_bucket, transfer_coordinator,
+            self._time_utils)
+        if not enabled:
+            stream.disable_bandwidth_limiting()
+        return stream
+
+
+class BandwidthLimitedStream(object):
+    def __init__(self, fileobj, leaky_bucket, transfer_coordinator,
+                 time_utils=None, bytes_threshold=256 * 1024):
+        """Limits bandwidth for reads on a wrapped stream
+
+        :type fileobj: file-like object
+        :param fileobj: The file like object to wrap
+
+        :type leaky_bucket: LeakyBucket
+        :param leaky_bucket: The leaky bucket to use to throttle reads on
+            the stream
+
+        :type transfer_coordinator: s3transfer.futures.TransferCoordinator
+        param transfer_coordinator: The coordinator for the general transfer
+            that the wrapped stream is a part of
+
+        :type time_utils: TimeUtils
+        :param time_utils: The time utility to use for interacting with time
+        """
+        self._fileobj = fileobj
+        self._leaky_bucket = leaky_bucket
+        self._transfer_coordinator = transfer_coordinator
+        self._time_utils = time_utils
+        if time_utils is None:
+            self._time_utils = TimeUtils()
+        self._bandwidth_limiting_enabled = True
+        self._request_token = RequestToken()
+        self._bytes_seen = 0
+        self._bytes_threshold = bytes_threshold
+
+    def enable_bandwidth_limiting(self):
+        """Enable bandwidth limiting on reads to the stream"""
+        self._bandwidth_limiting_enabled = True
+
+    def disable_bandwidth_limiting(self):
+        """Disable bandwidth limiting on reads to the stream"""
+        self._bandwidth_limiting_enabled = False
+
+    def read(self, amount):
+        """Read a specified amount
+
+        Reads will only be throttled if bandwidth limiting is enabled.
+        """
+        if not self._bandwidth_limiting_enabled:
+            return self._fileobj.read(amount)
+
+        # We do not want to be calling consume on every read as the read
+        # amounts can be small causing the lock of the leaky bucket to
+        # introduce noticeable overhead. So instead we keep track of
+        # how many bytes we have seen and only call consume once we pass a
+        # certain threshold.
+        self._bytes_seen += amount
+        if self._bytes_seen < self._bytes_threshold:
+            return self._fileobj.read(amount)
+
+        self._consume_through_leaky_bucket()
+        return self._fileobj.read(amount)
+
+    def _consume_through_leaky_bucket(self):
+        # NOTE: If the read amonut on the stream are high, it will result
+        # in large bursty behavior as there is not an interface for partial
+        # reads. However given the read's on this abstraction are at most 256KB
+        # (via downloads), it reduces the burstiness to be small KB bursts at
+        # worst.
+        while not self._transfer_coordinator.exception:
+            try:
+                self._leaky_bucket.consume(
+                    self._bytes_seen, self._request_token)
+                self._bytes_seen = 0
+                return
+            except RequestExceededException as e:
+                self._time_utils.sleep(e.retry_time)
+        else:
+            raise self._transfer_coordinator.exception
+
+    def signal_transferring(self):
+        """Signal that data being read is being transferred to S3"""
+        self.enable_bandwidth_limiting()
+
+    def signal_not_transferring(self):
+        """Signal that data being read is not being transferred to S3"""
+        self.disable_bandwidth_limiting()
+
+    def seek(self, where):
+        self._fileobj.seek(where)
+
+    def tell(self):
+        return self._fileobj.tell()
+
+    def close(self):
+        if self._bandwidth_limiting_enabled and self._bytes_seen:
+            # This handles the case where the file is small enough to never
+            # trigger the threshold and thus is never subjugated to the
+            # leaky bucket on read(). This specifically happens for small
+            # uploads. So instead to account for those bytes, have
+            # it go through the leaky bucket when the file gets closed.
+            self._consume_through_leaky_bucket()
+        self._fileobj.close()
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *args, **kwargs):
+        self.close()
+
+
+class LeakyBucket(object):
+    def __init__(self, max_rate, time_utils=None, rate_tracker=None,
+                 consumption_scheduler=None):
+        """A leaky bucket abstraction to limit bandwidth consumption
+
+        :type rate: int
+        :type rate: The maximum rate to allow. This rate is in terms of
+            bytes per second.
+
+        :type time_utils: TimeUtils
+        :param time_utils: The time utility to use for interacting with time
+
+        :type rate_tracker: BandwidthRateTracker
+        :param rate_tracker: Tracks bandwidth consumption
+
+        :type consumption_scheduler: ConsumptionScheduler
+        :param consumption_scheduler: Schedules consumption retries when
+            necessary
+        """
+        self._max_rate = float(max_rate)
+        self._time_utils = time_utils
+        if time_utils is None:
+            self._time_utils = TimeUtils()
+        self._lock = threading.Lock()
+        self._rate_tracker = rate_tracker
+        if rate_tracker is None:
+            self._rate_tracker = BandwidthRateTracker()
+        self._consumption_scheduler = consumption_scheduler
+        if consumption_scheduler is None:
+            self._consumption_scheduler = ConsumptionScheduler()
+
+    def consume(self, amt, request_token):
+        """Consume an a requested amount
+
+        :type amt: int
+        :param amt: The amount of bytes to request to consume
+
+        :type request_token: RequestToken
+        :param request_token: The token associated to the consumption
+            request that is used to identify the request. So if a
+            RequestExceededException is raised the token should be used
+            in subsequent retry consume() request.
+
+        :raises RequestExceededException: If the consumption amount would
+            exceed the maximum allocated bandwidth
+
+        :rtype: int
+        :returns: The amount consumed
+        """
+        with self._lock:
+            time_now = self._time_utils.time()
+            if self._consumption_scheduler.is_scheduled(request_token):
+                return self._release_requested_amt_for_scheduled_request(
+                    amt, request_token, time_now)
+            elif self._projected_to_exceed_max_rate(amt, time_now):
+                self._raise_request_exceeded_exception(
+                    amt, request_token, time_now)
+            else:
+                return self._release_requested_amt(amt, time_now)
+
+    def _projected_to_exceed_max_rate(self, amt, time_now):
+        projected_rate = self._rate_tracker.get_projected_rate(amt, time_now)
+        return projected_rate > self._max_rate
+
+    def _release_requested_amt_for_scheduled_request(self, amt, request_token,
+                                                     time_now):
+        self._consumption_scheduler.process_scheduled_consumption(
+            request_token)
+        return self._release_requested_amt(amt, time_now)
+
+    def _raise_request_exceeded_exception(self, amt, request_token, time_now):
+        allocated_time = amt/float(self._max_rate)
+        retry_time = self._consumption_scheduler.schedule_consumption(
+            amt, request_token, allocated_time)
+        raise RequestExceededException(
+            requested_amt=amt, retry_time=retry_time)
+
+    def _release_requested_amt(self, amt, time_now):
+        self._rate_tracker.record_consumption_rate(amt, time_now)
+        return amt
+
+
+class ConsumptionScheduler(object):
+    def __init__(self):
+        """Schedules when to consume a desired amount"""
+        self._tokens_to_scheduled_consumption = {}
+        self._total_wait = 0
+
+    def is_scheduled(self, token):
+        """Indicates if a consumption request has been scheduled
+
+        :type token: RequestToken
+        :param token: The token associated to the consumption
+            request that is used to identify the request.
+        """
+        return token in self._tokens_to_scheduled_consumption
+
+    def schedule_consumption(self, amt, token, time_to_consume):
+        """Schedules a wait time to be able to consume an amount
+
+        :type amt: int
+        :param amt: The amount of bytes scheduled to be consumed
+
+        :type token: RequestToken
+        :param token: The token associated to the consumption
+            request that is used to identify the request.
+
+        :type time_to_consume: float
+        :param time_to_consume: The desired time it should take for that
+            specific request amount to be consumed in regardless of previously
+            scheduled consumption requests
+
+        :rtype: float
+        :returns: The amount of time to wait for the specific request before
+            actually consuming the specified amount.
+        """
+        self._total_wait += time_to_consume
+        self._tokens_to_scheduled_consumption[token] = {
+            'wait_duration': self._total_wait,
+            'time_to_consume': time_to_consume,
+        }
+        return self._total_wait
+
+    def process_scheduled_consumption(self, token):
+        """Processes a scheduled consumption request that has completed
+
+        :type token: RequestToken
+        :param token: The token associated to the consumption
+            request that is used to identify the request.
+        """
+        scheduled_retry = self._tokens_to_scheduled_consumption.pop(token)
+        self._total_wait = max(
+            self._total_wait - scheduled_retry['time_to_consume'], 0)
+
+
+class BandwidthRateTracker(object):
+    def __init__(self, alpha=0.8):
+        """Tracks the rate of bandwidth consumption
+
+        :type a: float
+        :param a: The constant to use in calculating the exponentional moving
+            average of the bandwidth rate. Specifically it is used in the
+            following calculation:
+
+            current_rate = alpha * new_rate + (1 - alpha) * current_rate
+
+            This value of this constant should be between 0 and 1.
+        """
+        self._alpha = alpha
+        self._last_time = None
+        self._current_rate = None
+
+    @property
+    def current_rate(self):
+        """The current transfer rate
+
+        :rtype: float
+        :returns: The current tracked transfer rate
+        """
+        if self._last_time is None:
+            return 0.0
+        return self._current_rate
+
+    def get_projected_rate(self, amt, time_at_consumption):
+        """Get the projected rate using a provided amount and time
+
+        :type amt: int
+        :param amt: The proposed amount to consume
+
+        :type time_at_consumption: float
+        :param time_at_consumption: The proposed time to consume at
+
+        :rtype: float
+        :returns: The consumption rate if that amt and time were consumed
+        """
+        if self._last_time is None:
+            return 0.0
+        return self._calculate_exponential_moving_average_rate(
+            amt, time_at_consumption)
+
+    def record_consumption_rate(self, amt, time_at_consumption):
+        """Record the consumption rate based off amount and time point
+
+        :type amt: int
+        :param amt: The amount that got consumed
+
+        :type time_at_consumption: float
+        :param time_at_consumption: The time at which the amount was consumed
+        """
+        if self._last_time is None:
+            self._last_time = time_at_consumption
+            self._current_rate = 0.0
+            return
+        self._current_rate = self._calculate_exponential_moving_average_rate(
+            amt, time_at_consumption)
+        self._last_time = time_at_consumption
+
+    def _calculate_rate(self, amt, time_at_consumption):
+        time_delta = time_at_consumption - self._last_time
+        if time_delta <= 0:
+            # While it is really unlikley to see this in an actual transfer,
+            # we do not want to be returning back a negative rate or try to
+            # divide the amount by zero. So instead return back an infinite
+            # rate as the time delta is infinitesimally small.
+            return float('inf')
+        return amt / (time_delta)
+
+    def _calculate_exponential_moving_average_rate(self, amt,
+                                                   time_at_consumption):
+        new_rate = self._calculate_rate(amt, time_at_consumption)
+        return self._alpha * new_rate + (1 - self._alpha) * self._current_rate
diff --git a/s3transfer/download.py b/s3transfer/download.py
index b60d398..9b1a49e 100644
--- a/s3transfer/download.py
+++ b/s3transfer/download.py
@@ -317,7 +317,7 @@ class DownloadSubmissionTask(SubmissionTask):
                 fileobj, type(fileobj)))
 
     def _submit(self, client, config, osutil, request_executor, io_executor,
-                transfer_future):
+                transfer_future, bandwidth_limiter=None):
         """
         :param client: The client associated with the transfer manager
 
@@ -339,6 +339,10 @@ class DownloadSubmissionTask(SubmissionTask):
         :type transfer_future: s3transfer.futures.TransferFuture
         :param transfer_future: The transfer future associated with the
             transfer request that tasks are being submitted for
+
+        :type bandwidth_limiter: s3transfer.bandwidth.BandwidthLimiter
+        :param bandwidth_limiter: The bandwidth limiter to use when
+            downloading streams
         """
         if transfer_future.meta.size is None:
             # If a size was not provided figure out the size for the
@@ -360,15 +364,16 @@ class DownloadSubmissionTask(SubmissionTask):
         if transfer_future.meta.size < config.multipart_threshold:
             self._submit_download_request(
                 client, config, osutil, request_executor, io_executor,
-                download_output_manager, transfer_future)
+                download_output_manager, transfer_future, bandwidth_limiter)
         else:
             self._submit_ranged_download_request(
                 client, config, osutil, request_executor, io_executor,
-                download_output_manager, transfer_future)
+                download_output_manager, transfer_future, bandwidth_limiter)
 
     def _submit_download_request(self, client, config, osutil,
                                  request_executor, io_executor,
-                                 download_output_manager, transfer_future):
+                                 download_output_manager, transfer_future,
+                                 bandwidth_limiter):
         call_args = transfer_future.meta.call_args
 
         # Get a handle to the file that will be used for writing downloaded
@@ -400,6 +405,7 @@ class DownloadSubmissionTask(SubmissionTask):
                     'max_attempts': config.num_download_attempts,
                     'download_output_manager': download_output_manager,
                     'io_chunksize': config.io_chunksize,
+                    'bandwidth_limiter': bandwidth_limiter
                 },
                 done_callbacks=[final_task]
             ),
@@ -409,7 +415,8 @@ class DownloadSubmissionTask(SubmissionTask):
     def _submit_ranged_download_request(self, client, config, osutil,
                                         request_executor, io_executor,
                                         download_output_manager,
-                                        transfer_future):
+                                        transfer_future,
+                                        bandwidth_limiter):
         call_args = transfer_future.meta.call_args
 
         # Get the needed progress callbacks for the task
@@ -461,6 +468,7 @@ class DownloadSubmissionTask(SubmissionTask):
                         'start_index': i * part_size,
                         'download_output_manager': download_output_manager,
                         'io_chunksize': config.io_chunksize,
+                        'bandwidth_limiter': bandwidth_limiter
                     },
                     done_callbacks=[finalize_download_invoker.decrement]
                 ),
@@ -488,7 +496,7 @@ class DownloadSubmissionTask(SubmissionTask):
 class GetObjectTask(Task):
     def _main(self, client, bucket, key, fileobj, extra_args, callbacks,
               max_attempts, download_output_manager, io_chunksize,
-              start_index=0):
+              start_index=0, bandwidth_limiter=None):
         """Downloads an object and places content into io queue
 
         :param client: The client to use when calling GetObject
@@ -504,6 +512,8 @@ class GetObjectTask(Task):
             download stream and queue in the io queue.
         :param start_index: The location in the file to start writing the
             content of the key to.
+        :param bandwidth_limiter: The bandwidth limiter to use when throttling
+            the downloading of data in streams.
         """
         last_exception = None
         for i in range(max_attempts):
@@ -512,6 +522,10 @@ class GetObjectTask(Task):
                     Bucket=bucket, Key=key, **extra_args)
                 streaming_body = StreamReaderProgress(
                     response['Body'], callbacks)
+                if bandwidth_limiter:
+                    streaming_body = \
+                        bandwidth_limiter.get_bandwith_limited_stream(
+                            streaming_body, self._transfer_coordinator)
 
                 current_index = start_index
                 chunks = DownloadChunkIterator(streaming_body, io_chunksize)
diff --git a/s3transfer/manager.py b/s3transfer/manager.py
index ba5eb61..fb7a9cb 100644
--- a/s3transfer/manager.py
+++ b/s3transfer/manager.py
@@ -14,9 +14,11 @@ import copy
 import logging
 import threading
 
+from botocore.compat import six
+
 from s3transfer.utils import get_callbacks
-from s3transfer.utils import disable_upload_callbacks
-from s3transfer.utils import enable_upload_callbacks
+from s3transfer.utils import signal_transferring
+from s3transfer.utils import signal_not_transferring
 from s3transfer.utils import CallArgs
 from s3transfer.utils import OSUtils
 from s3transfer.utils import TaskSemaphore
@@ -33,6 +35,8 @@ from s3transfer.download import DownloadSubmissionTask
 from s3transfer.upload import UploadSubmissionTask
 from s3transfer.copies import CopySubmissionTask
 from s3transfer.delete import DeleteSubmissionTask
+from s3transfer.bandwidth import LeakyBucket
+from s3transfer.bandwidth import BandwidthLimiter
 
 KB = 1024
 MB = KB * KB
@@ -51,7 +55,8 @@ class TransferConfig(object):
                  io_chunksize=256 * KB,
                  num_download_attempts=5,
                  max_in_memory_upload_chunks=10,
-                 max_in_memory_download_chunks=10):
+                 max_in_memory_download_chunks=10,
+                 max_bandwidth=None):
         """Configurations for the transfer mangager
 
         :param multipart_threshold: The threshold for which multipart
@@ -122,6 +127,9 @@ class TransferConfig(object):
 
                 max_in_memory_download_chunks * multipart_chunksize
 
+        :param max_bandwidth: The maximum bandwidth that will be consumed
+            in uploading and downloading file content. The value is in terms of
+            bytes per second.
         """
         self.multipart_threshold = multipart_threshold
         self.multipart_chunksize = multipart_chunksize
@@ -134,11 +142,12 @@ class TransferConfig(object):
         self.num_download_attempts = num_download_attempts
         self.max_in_memory_upload_chunks = max_in_memory_upload_chunks
         self.max_in_memory_download_chunks = max_in_memory_download_chunks
+        self.max_bandwidth = max_bandwidth
         self._validate_attrs_are_nonzero()
 
     def _validate_attrs_are_nonzero(self):
         for attr, attr_val, in self.__dict__.items():
-            if attr_val <= 0:
+            if attr_val is not None and attr_val <= 0:
                 raise ValueError(
                     'Provided parameter %s of value %s must be greater than '
                     '0.' % (attr, attr_val))
@@ -245,6 +254,16 @@ class TransferManager(object):
             max_num_threads=1,
             executor_cls=executor_cls
         )
+
+        # The component responsible for limiting bandwidth usage if it
+        # is configured.
+        self._bandwidth_limiter = None
+        if self._config.max_bandwidth is not None:
+            logger.debug(
+                'Setting max_bandwidth to %s', self._config.max_bandwidth)
+            leaky_bucket = LeakyBucket(self._config.max_bandwidth)
+            self._bandwidth_limiter = BandwidthLimiter(leaky_bucket)
+
         self._register_handlers()
 
     def upload(self, fileobj, bucket, key, extra_args=None, subscribers=None):
@@ -282,7 +301,11 @@ class TransferManager(object):
             fileobj=fileobj, bucket=bucket, key=key, extra_args=extra_args,
             subscribers=subscribers
         )
-        return self._submit_transfer(call_args, UploadSubmissionTask)
+        extra_main_kwargs = {}
+        if self._bandwidth_limiter:
+            extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter
+        return self._submit_transfer(
+            call_args, UploadSubmissionTask, extra_main_kwargs)
 
     def download(self, bucket, key, fileobj, extra_args=None,
                  subscribers=None):
@@ -318,8 +341,11 @@ class TransferManager(object):
             bucket=bucket, key=key, fileobj=fileobj, extra_args=extra_args,
             subscribers=subscribers
         )
-        return self._submit_transfer(call_args, DownloadSubmissionTask,
-                                     {'io_executor': self._io_executor})
+        extra_main_kwargs = {'io_executor': self._io_executor}
+        if self._bandwidth_limiter:
+            extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter
+        return self._submit_transfer(
+            call_args, DownloadSubmissionTask, extra_main_kwargs)
 
     def copy(self, copy_source, bucket, key, extra_args=None,
              subscribers=None, source_client=None):
@@ -479,11 +505,11 @@ class TransferManager(object):
         # Register handlers to enable/disable callbacks on uploads.
         event_name = 'request-created.s3'
         self._client.meta.events.register_first(
-            event_name, disable_upload_callbacks,
-            unique_id='s3upload-callback-disable')
+            event_name, signal_not_transferring,
+            unique_id='s3upload-not-transferring')
         self._client.meta.events.register_last(
-            event_name, enable_upload_callbacks,
-            unique_id='s3upload-callback-enable')
+            event_name, signal_transferring,
+            unique_id='s3upload-transferring')
 
     def __enter__(self):
         return self
@@ -496,7 +522,7 @@ class TransferManager(object):
         # all of the inprogress futures in the shutdown.
         if exc_type:
             cancel = True
-            cancel_msg = str(exc_value)
+            cancel_msg = six.text_type(exc_value)
             if not cancel_msg:
                 cancel_msg = repr(exc_value)
             # If it was a KeyboardInterrupt, the cancellation was initiated
diff --git a/s3transfer/tasks.py b/s3transfer/tasks.py
index 4427927..ae4abdd 100644
--- a/s3transfer/tasks.py
+++ b/s3transfer/tasks.py
@@ -254,7 +254,7 @@ class SubmissionTask(Task):
             # transfer.
             self._submit(transfer_future=transfer_future, **kwargs)
         except BaseException as e:
-            # If there was an exception rasied during the submission of task
+            # If there was an exception raised during the submission of task
             # there is a chance that the final task that signals if a transfer
             # is done and too run the cleanup may never have been submitted in
             # the first place so we need to account accordingly.
diff --git a/s3transfer/upload.py b/s3transfer/upload.py
index d9a9ec5..0c2feda 100644
--- a/s3transfer/upload.py
+++ b/s3transfer/upload.py
@@ -115,9 +115,10 @@ class UploadInputManager(object):
     that may be accepted. All implementations must subclass and override
     public methods from this class.
     """
-    def __init__(self, osutil, transfer_coordinator):
+    def __init__(self, osutil, transfer_coordinator, bandwidth_limiter=None):
         self._osutil = osutil
         self._transfer_coordinator = transfer_coordinator
+        self._bandwidth_limiter = bandwidth_limiter
 
     @classmethod
     def is_compatible(cls, upload_source):
@@ -200,8 +201,12 @@ class UploadInputManager(object):
         """
         raise NotImplementedError('must implement yield_upload_part_bodies()')
 
-    def _wrap_with_interrupt_reader(self, fileobj):
-        return InterruptReader(fileobj, self._transfer_coordinator)
+    def _wrap_fileobj(self, fileobj):
+        fileobj = InterruptReader(fileobj, self._transfer_coordinator)
+        if self._bandwidth_limiter:
+            fileobj = self._bandwidth_limiter.get_bandwith_limited_stream(
+                fileobj, self._transfer_coordinator, enabled=False)
+        return fileobj
 
     def _get_progress_callbacks(self, transfer_future):
         callbacks = get_callbacks(transfer_future, 'progress')
@@ -241,7 +246,7 @@ class UploadFilenameInputManager(UploadInputManager):
         # Wrap fileobj with interrupt reader that will quickly cancel
         # uploads if needed instead of having to wait for the socket
         # to completely read all of the data.
-        fileobj = self._wrap_with_interrupt_reader(fileobj)
+        fileobj = self._wrap_fileobj(fileobj)
 
         callbacks = self._get_progress_callbacks(transfer_future)
         close_callbacks = self._get_close_callbacks(callbacks)
@@ -268,7 +273,7 @@ class UploadFilenameInputManager(UploadInputManager):
             # Wrap fileobj with interrupt reader that will quickly cancel
             # uploads if needed instead of having to wait for the socket
             # to completely read all of the data.
-            fileobj = self._wrap_with_interrupt_reader(fileobj)
+            fileobj = self._wrap_fileobj(fileobj)
 
             # Wrap the file-like object into a ReadFileChunk to get progress.
             read_file_chunk = self._osutil.open_file_chunk_reader_from_fileobj(
@@ -346,9 +351,9 @@ class UploadSeekableInputManager(UploadFilenameInputManager):
 
 class UploadNonSeekableInputManager(UploadInputManager):
     """Upload utility for a file-like object that cannot seek."""
-    def __init__(self, osutil, transfer_coordinator):
+    def __init__(self, osutil, transfer_coordinator, bandwidth_limiter=None):
         super(UploadNonSeekableInputManager, self).__init__(
-            osutil, transfer_coordinator)
+            osutil, transfer_coordinator, bandwidth_limiter)
         self._initial_data = b''
 
     @classmethod
@@ -470,7 +475,7 @@ class UploadNonSeekableInputManager(UploadInputManager):
 
         :return: Fully wrapped data.
         """
-        fileobj = self._wrap_with_interrupt_reader(six.BytesIO(data))
+        fileobj = self._wrap_fileobj(six.BytesIO(data))
         return self._osutil.open_file_chunk_reader_from_fileobj(
             fileobj=fileobj, chunk_size=len(data), full_file_size=len(data),
             callbacks=callbacks, close_callbacks=close_callbacks)
@@ -511,7 +516,7 @@ class UploadSubmissionTask(SubmissionTask):
                 fileobj, type(fileobj)))
 
     def _submit(self, client, config, osutil, request_executor,
-                transfer_future):
+                transfer_future, bandwidth_limiter=None):
         """
         :param client: The client associated with the transfer manager
 
@@ -531,7 +536,8 @@ class UploadSubmissionTask(SubmissionTask):
             transfer request that tasks are being submitted for
         """
         upload_input_manager = self._get_upload_input_manager_cls(
-            transfer_future)(osutil, self._transfer_coordinator)
+            transfer_future)(
+                osutil, self._transfer_coordinator, bandwidth_limiter)
 
         # Determine the size if it was not provided
         if transfer_future.meta.size is None:
diff --git a/s3transfer/utils.py b/s3transfer/utils.py
index 65b34b8..fba5648 100644
--- a/s3transfer/utils.py
+++ b/s3transfer/utils.py
@@ -39,16 +39,16 @@ def random_file_extension(num_digits=8):
     return ''.join(random.choice(string.hexdigits) for _ in range(num_digits))
 
 
-def disable_upload_callbacks(request, operation_name, **kwargs):
+def signal_not_transferring(request, operation_name, **kwargs):
     if operation_name in ['PutObject', 'UploadPart'] and \
-            hasattr(request.body, 'disable_callback'):
-        request.body.disable_callback()
+            hasattr(request.body, 'signal_not_transferring'):
+        request.body.signal_not_transferring()
 
 
-def enable_upload_callbacks(request, operation_name, **kwargs):
+def signal_transferring(request, operation_name, **kwargs):
     if operation_name in ['PutObject', 'UploadPart'] and \
-            hasattr(request.body, 'enable_callback'):
-        request.body.enable_callback()
+            hasattr(request.body, 'signal_transferring'):
+        request.body.signal_transferring()
 
 
 def calculate_range_parameter(part_size, part_index, num_parts,
@@ -433,6 +433,16 @@ class ReadFileChunk(object):
             invoke_progress_callbacks(self._callbacks, len(data))
         return data
 
+    def signal_transferring(self):
+        self.enable_callback()
+        if hasattr(self._fileobj, 'signal_transferring'):
+            self._fileobj.signal_transferring()
+
+    def signal_not_transferring(self):
+        self.disable_callback()
+        if hasattr(self._fileobj, 'signal_not_transferring'):
+            self._fileobj.signal_not_transferring()
+
     def enable_callback(self):
         self._callbacks_enabled = True
 
diff --git a/scripts/new-change b/scripts/new-change
index 6d56220..59109fc 100755
--- a/scripts/new-change
+++ b/scripts/new-change
@@ -53,7 +53,12 @@ CHANGES_DIR = os.path.join(
     '.changes'
 )
 TEMPLATE = """\
-# Type should be one of: feature, bugfix
+# Type should be one of: feature, bugfix, enhancement, api-change
+# feature: A larger feature or change in behavior, usually resulting in a
+#          minor version bump.
+# bugfix: Fixing a bug in an existing code path.
+# enhancment: Small change to an underlying implementation detail.
+# api-change: Changes to a modeled API.
 type: {change_type}
 
 # Category is the high level feature area.
@@ -194,7 +199,8 @@ def parse_filled_in_contents(contents):
 def main():
     parser = argparse.ArgumentParser()
     parser.add_argument('-t', '--type', dest='change_type',
-                        default='', choices=('bugfix', 'feature'))
+                        default='', choices=('bugfix', 'feature',
+                                             'enhancement', 'api-change'))
     parser.add_argument('-c', '--category', dest='category',
                         default='')
     parser.add_argument('-d', '--description', dest='description',
diff --git a/setup.cfg b/setup.cfg
index aa2f8b0..ea1c571 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -1,4 +1,4 @@
-[wheel]
+[bdist_wheel]
 universal = 1
 
 [metadata]
diff --git a/setup.py b/setup.py
index 942edf9..47dd267 100644
--- a/setup.py
+++ b/setup.py
@@ -54,5 +54,6 @@ setup(
         'Programming Language :: Python :: 3.3',
         'Programming Language :: Python :: 3.4',
         'Programming Language :: Python :: 3.5',
+        'Programming Language :: Python :: 3.6',
     ),
 )
diff --git a/tests/functional/test_download.py b/tests/functional/test_download.py
index 4d99913..d15d901 100644
--- a/tests/functional/test_download.py
+++ b/tests/functional/test_download.py
@@ -13,6 +13,7 @@
 import copy
 import os
 import tempfile
+import time
 import shutil
 import glob
 
@@ -381,6 +382,34 @@ class TestNonRangedDownload(BaseDownloadTest):
         with open(self.filename, 'rb') as f:
             self.assertEqual(b'', f.read())
 
+    def test_uses_bandwidth_limiter(self):
+        self.content = b'a' * 1024 * 1024
+        self.stream = six.BytesIO(self.content)
+        self.config = TransferConfig(
+            max_request_concurrency=1, max_bandwidth=len(self.content)/2)
+        self._manager = TransferManager(self.client, self.config)
+
+        self.add_head_object_response()
+        self.add_successful_get_object_responses()
+
+        start = time.time()
+        future = self.manager.download(
+            self.bucket, self.key, self.filename, self.extra_args)
+        future.result()
+        # This is just a smoke test to make sure that the limiter is
+        # being used and not necessary its exactness. So we set the maximum
+        # bandwidth to len(content)/2 per sec and make sure that it is
+        # noticeably slower. Ideally it will take more than two seconds, but
+        # given tracking at the beginning of transfers are not entirely
+        # accurate setting at the initial start of a transfer, we give us
+        # some flexibility by setting the expected time to half of the
+        # theoretical time to take.
+        self.assertGreaterEqual(time.time() - start, 1)
+
+        # Ensure that the contents are correct
+        with open(self.filename, 'rb') as f:
+            self.assertEqual(self.content, f.read())
+
 
 class TestRangedDownload(BaseDownloadTest):
     # TODO: If you want to add tests outside of this test class and still
diff --git a/tests/functional/test_manager.py b/tests/functional/test_manager.py
... 751 lines suppressed ...

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/python-s3transfer.git



More information about the Python-modules-commits mailing list