[Python-modules-commits] [python-s3transfer] 02/07: New upstream version 0.1.12
Takaki Taniguchi
takaki at moszumanska.debian.org
Sat Feb 3 12:55:01 UTC 2018
This is an automated email from the git hooks/post-receive script.
takaki pushed a commit to branch master
in repository python-s3transfer.
commit f4bcb1a9ab5faabb0be1b3102d4a37f73f10ffe5
Author: TANIGUCHI Takaki <takaki at asis.media-as.org>
Date: Sat Feb 3 19:40:10 2018 +0900
New upstream version 0.1.12
---
.changes/0.1.11.json | 7 +
.changes/0.1.12.json | 7 +
.travis.yml | 1 +
CHANGELOG.rst | 12 +
s3transfer/__init__.py | 2 +-
s3transfer/bandwidth.py | 416 +++++++++++++++++++++++++++++++++
s3transfer/download.py | 26 ++-
s3transfer/manager.py | 50 +++-
s3transfer/tasks.py | 2 +-
s3transfer/upload.py | 26 ++-
s3transfer/utils.py | 22 +-
scripts/new-change | 10 +-
setup.cfg | 2 +-
setup.py | 1 +
tests/functional/test_download.py | 29 +++
tests/functional/test_manager.py | 38 ++-
tests/functional/test_upload.py | 37 ++-
tests/integration/test_download.py | 8 +-
tests/unit/test_bandwidth.py | 465 +++++++++++++++++++++++++++++++++++++
tests/unit/test_download.py | 17 ++
tests/unit/test_utils.py | 51 ++++
tox.ini | 2 +-
22 files changed, 1175 insertions(+), 56 deletions(-)
diff --git a/.changes/0.1.11.json b/.changes/0.1.11.json
new file mode 100644
index 0000000..2f09d01
--- /dev/null
+++ b/.changes/0.1.11.json
@@ -0,0 +1,7 @@
+[
+ {
+ "category": "TransferManager",
+ "description": "Properly handle unicode exceptions in the context manager. Fixes `#85 <https://github.com/boto/boto3/issues/85>`__",
+ "type": "bugfix"
+ }
+]
\ No newline at end of file
diff --git a/.changes/0.1.12.json b/.changes/0.1.12.json
new file mode 100644
index 0000000..ffd0d74
--- /dev/null
+++ b/.changes/0.1.12.json
@@ -0,0 +1,7 @@
+[
+ {
+ "category": "``max_bandwidth``",
+ "description": "Add ability to set maximum bandwidth consumption for streaming of S3 uploads and downloads",
+ "type": "enhancement"
+ }
+]
\ No newline at end of file
diff --git a/.travis.yml b/.travis.yml
index afd045d..8469de1 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -5,6 +5,7 @@ python:
- "3.3"
- "3.4"
- "3.5"
+ - "3.6"
sudo: false
install:
- python scripts/ci/install
diff --git a/CHANGELOG.rst b/CHANGELOG.rst
index 9721d1a..19e07be 100644
--- a/CHANGELOG.rst
+++ b/CHANGELOG.rst
@@ -2,6 +2,18 @@
CHANGELOG
=========
+0.1.12
+======
+
+* enhancement:``max_bandwidth``: Add ability to set maximum bandwidth consumption for streaming of S3 uploads and downloads
+
+
+0.1.11
+======
+
+* bugfix:TransferManager: Properly handle unicode exceptions in the context manager. Fixes `#85 <https://github.com/boto/boto3/issues/85>`__
+
+
0.1.10
======
diff --git a/s3transfer/__init__.py b/s3transfer/__init__.py
index 4ab0720..57051c7 100644
--- a/s3transfer/__init__.py
+++ b/s3transfer/__init__.py
@@ -143,7 +143,7 @@ from s3transfer.exceptions import RetriesExceededError, S3UploadFailedError
__author__ = 'Amazon Web Services'
-__version__ = '0.1.10'
+__version__ = '0.1.12'
class NullHandler(logging.Handler):
diff --git a/s3transfer/bandwidth.py b/s3transfer/bandwidth.py
new file mode 100644
index 0000000..8b3f6f5
--- /dev/null
+++ b/s3transfer/bandwidth.py
@@ -0,0 +1,416 @@
+# Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+import time
+import threading
+
+
+class RequestExceededException(Exception):
+ def __init__(self, requested_amt, retry_time):
+ """Error when requested amount exceeds what is allowed
+
+ The request that raised this error should be retried after waiting
+ the time specified by ``retry_time``.
+
+ :type requested_amt: int
+ :param requested_amt: The originally requested byte amount
+
+ :type retry_time: float
+ :param retry_time: The length in time to wait to retry for the
+ requested amount
+ """
+ self.requested_amt = requested_amt
+ self.retry_time = retry_time
+ msg = (
+ 'Request amount %s exceeded the amount available. Retry in %s' % (
+ requested_amt, retry_time)
+ )
+ super(RequestExceededException, self).__init__(msg)
+
+
+class RequestToken(object):
+ """A token to pass as an identifier when consuming from the LeakyBucket"""
+ pass
+
+
+class TimeUtils(object):
+ def time(self):
+ """Get the current time back
+
+ :rtype: float
+ :returns: The current time in seconds
+ """
+ return time.time()
+
+ def sleep(self, value):
+ """Sleep for a designated time
+
+ :type value: float
+ :param value: The time to sleep for in seconds
+ """
+ return time.sleep(value)
+
+
+class BandwidthLimiter(object):
+ def __init__(self, leaky_bucket, time_utils=None):
+ """Limits bandwidth for shared S3 transfers
+
+ :type leaky_bucket: LeakyBucket
+ :param leaky_bucket: The leaky bucket to use limit bandwidth
+
+ :type time_utils: TimeUtils
+ :param time_utils: Time utility to use for interacting with time.
+ """
+ self._leaky_bucket = leaky_bucket
+ self._time_utils = time_utils
+ if time_utils is None:
+ self._time_utils = TimeUtils()
+
+ def get_bandwith_limited_stream(self, fileobj, transfer_coordinator,
+ enabled=True):
+ """Wraps a fileobj in a bandwidth limited stream wrapper
+
+ :type fileobj: file-like obj
+ :param fileobj: The file-like obj to wrap
+
+ :type transfer_coordinator: s3transfer.futures.TransferCoordinator
+ param transfer_coordinator: The coordinator for the general transfer
+ that the wrapped stream is a part of
+
+ :type enabled: boolean
+ :param enabled: Whether bandwidth limiting should be enabled to start
+ """
+ stream = BandwidthLimitedStream(
+ fileobj, self._leaky_bucket, transfer_coordinator,
+ self._time_utils)
+ if not enabled:
+ stream.disable_bandwidth_limiting()
+ return stream
+
+
+class BandwidthLimitedStream(object):
+ def __init__(self, fileobj, leaky_bucket, transfer_coordinator,
+ time_utils=None, bytes_threshold=256 * 1024):
+ """Limits bandwidth for reads on a wrapped stream
+
+ :type fileobj: file-like object
+ :param fileobj: The file like object to wrap
+
+ :type leaky_bucket: LeakyBucket
+ :param leaky_bucket: The leaky bucket to use to throttle reads on
+ the stream
+
+ :type transfer_coordinator: s3transfer.futures.TransferCoordinator
+ param transfer_coordinator: The coordinator for the general transfer
+ that the wrapped stream is a part of
+
+ :type time_utils: TimeUtils
+ :param time_utils: The time utility to use for interacting with time
+ """
+ self._fileobj = fileobj
+ self._leaky_bucket = leaky_bucket
+ self._transfer_coordinator = transfer_coordinator
+ self._time_utils = time_utils
+ if time_utils is None:
+ self._time_utils = TimeUtils()
+ self._bandwidth_limiting_enabled = True
+ self._request_token = RequestToken()
+ self._bytes_seen = 0
+ self._bytes_threshold = bytes_threshold
+
+ def enable_bandwidth_limiting(self):
+ """Enable bandwidth limiting on reads to the stream"""
+ self._bandwidth_limiting_enabled = True
+
+ def disable_bandwidth_limiting(self):
+ """Disable bandwidth limiting on reads to the stream"""
+ self._bandwidth_limiting_enabled = False
+
+ def read(self, amount):
+ """Read a specified amount
+
+ Reads will only be throttled if bandwidth limiting is enabled.
+ """
+ if not self._bandwidth_limiting_enabled:
+ return self._fileobj.read(amount)
+
+ # We do not want to be calling consume on every read as the read
+ # amounts can be small causing the lock of the leaky bucket to
+ # introduce noticeable overhead. So instead we keep track of
+ # how many bytes we have seen and only call consume once we pass a
+ # certain threshold.
+ self._bytes_seen += amount
+ if self._bytes_seen < self._bytes_threshold:
+ return self._fileobj.read(amount)
+
+ self._consume_through_leaky_bucket()
+ return self._fileobj.read(amount)
+
+ def _consume_through_leaky_bucket(self):
+ # NOTE: If the read amonut on the stream are high, it will result
+ # in large bursty behavior as there is not an interface for partial
+ # reads. However given the read's on this abstraction are at most 256KB
+ # (via downloads), it reduces the burstiness to be small KB bursts at
+ # worst.
+ while not self._transfer_coordinator.exception:
+ try:
+ self._leaky_bucket.consume(
+ self._bytes_seen, self._request_token)
+ self._bytes_seen = 0
+ return
+ except RequestExceededException as e:
+ self._time_utils.sleep(e.retry_time)
+ else:
+ raise self._transfer_coordinator.exception
+
+ def signal_transferring(self):
+ """Signal that data being read is being transferred to S3"""
+ self.enable_bandwidth_limiting()
+
+ def signal_not_transferring(self):
+ """Signal that data being read is not being transferred to S3"""
+ self.disable_bandwidth_limiting()
+
+ def seek(self, where):
+ self._fileobj.seek(where)
+
+ def tell(self):
+ return self._fileobj.tell()
+
+ def close(self):
+ if self._bandwidth_limiting_enabled and self._bytes_seen:
+ # This handles the case where the file is small enough to never
+ # trigger the threshold and thus is never subjugated to the
+ # leaky bucket on read(). This specifically happens for small
+ # uploads. So instead to account for those bytes, have
+ # it go through the leaky bucket when the file gets closed.
+ self._consume_through_leaky_bucket()
+ self._fileobj.close()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args, **kwargs):
+ self.close()
+
+
+class LeakyBucket(object):
+ def __init__(self, max_rate, time_utils=None, rate_tracker=None,
+ consumption_scheduler=None):
+ """A leaky bucket abstraction to limit bandwidth consumption
+
+ :type rate: int
+ :type rate: The maximum rate to allow. This rate is in terms of
+ bytes per second.
+
+ :type time_utils: TimeUtils
+ :param time_utils: The time utility to use for interacting with time
+
+ :type rate_tracker: BandwidthRateTracker
+ :param rate_tracker: Tracks bandwidth consumption
+
+ :type consumption_scheduler: ConsumptionScheduler
+ :param consumption_scheduler: Schedules consumption retries when
+ necessary
+ """
+ self._max_rate = float(max_rate)
+ self._time_utils = time_utils
+ if time_utils is None:
+ self._time_utils = TimeUtils()
+ self._lock = threading.Lock()
+ self._rate_tracker = rate_tracker
+ if rate_tracker is None:
+ self._rate_tracker = BandwidthRateTracker()
+ self._consumption_scheduler = consumption_scheduler
+ if consumption_scheduler is None:
+ self._consumption_scheduler = ConsumptionScheduler()
+
+ def consume(self, amt, request_token):
+ """Consume an a requested amount
+
+ :type amt: int
+ :param amt: The amount of bytes to request to consume
+
+ :type request_token: RequestToken
+ :param request_token: The token associated to the consumption
+ request that is used to identify the request. So if a
+ RequestExceededException is raised the token should be used
+ in subsequent retry consume() request.
+
+ :raises RequestExceededException: If the consumption amount would
+ exceed the maximum allocated bandwidth
+
+ :rtype: int
+ :returns: The amount consumed
+ """
+ with self._lock:
+ time_now = self._time_utils.time()
+ if self._consumption_scheduler.is_scheduled(request_token):
+ return self._release_requested_amt_for_scheduled_request(
+ amt, request_token, time_now)
+ elif self._projected_to_exceed_max_rate(amt, time_now):
+ self._raise_request_exceeded_exception(
+ amt, request_token, time_now)
+ else:
+ return self._release_requested_amt(amt, time_now)
+
+ def _projected_to_exceed_max_rate(self, amt, time_now):
+ projected_rate = self._rate_tracker.get_projected_rate(amt, time_now)
+ return projected_rate > self._max_rate
+
+ def _release_requested_amt_for_scheduled_request(self, amt, request_token,
+ time_now):
+ self._consumption_scheduler.process_scheduled_consumption(
+ request_token)
+ return self._release_requested_amt(amt, time_now)
+
+ def _raise_request_exceeded_exception(self, amt, request_token, time_now):
+ allocated_time = amt/float(self._max_rate)
+ retry_time = self._consumption_scheduler.schedule_consumption(
+ amt, request_token, allocated_time)
+ raise RequestExceededException(
+ requested_amt=amt, retry_time=retry_time)
+
+ def _release_requested_amt(self, amt, time_now):
+ self._rate_tracker.record_consumption_rate(amt, time_now)
+ return amt
+
+
+class ConsumptionScheduler(object):
+ def __init__(self):
+ """Schedules when to consume a desired amount"""
+ self._tokens_to_scheduled_consumption = {}
+ self._total_wait = 0
+
+ def is_scheduled(self, token):
+ """Indicates if a consumption request has been scheduled
+
+ :type token: RequestToken
+ :param token: The token associated to the consumption
+ request that is used to identify the request.
+ """
+ return token in self._tokens_to_scheduled_consumption
+
+ def schedule_consumption(self, amt, token, time_to_consume):
+ """Schedules a wait time to be able to consume an amount
+
+ :type amt: int
+ :param amt: The amount of bytes scheduled to be consumed
+
+ :type token: RequestToken
+ :param token: The token associated to the consumption
+ request that is used to identify the request.
+
+ :type time_to_consume: float
+ :param time_to_consume: The desired time it should take for that
+ specific request amount to be consumed in regardless of previously
+ scheduled consumption requests
+
+ :rtype: float
+ :returns: The amount of time to wait for the specific request before
+ actually consuming the specified amount.
+ """
+ self._total_wait += time_to_consume
+ self._tokens_to_scheduled_consumption[token] = {
+ 'wait_duration': self._total_wait,
+ 'time_to_consume': time_to_consume,
+ }
+ return self._total_wait
+
+ def process_scheduled_consumption(self, token):
+ """Processes a scheduled consumption request that has completed
+
+ :type token: RequestToken
+ :param token: The token associated to the consumption
+ request that is used to identify the request.
+ """
+ scheduled_retry = self._tokens_to_scheduled_consumption.pop(token)
+ self._total_wait = max(
+ self._total_wait - scheduled_retry['time_to_consume'], 0)
+
+
+class BandwidthRateTracker(object):
+ def __init__(self, alpha=0.8):
+ """Tracks the rate of bandwidth consumption
+
+ :type a: float
+ :param a: The constant to use in calculating the exponentional moving
+ average of the bandwidth rate. Specifically it is used in the
+ following calculation:
+
+ current_rate = alpha * new_rate + (1 - alpha) * current_rate
+
+ This value of this constant should be between 0 and 1.
+ """
+ self._alpha = alpha
+ self._last_time = None
+ self._current_rate = None
+
+ @property
+ def current_rate(self):
+ """The current transfer rate
+
+ :rtype: float
+ :returns: The current tracked transfer rate
+ """
+ if self._last_time is None:
+ return 0.0
+ return self._current_rate
+
+ def get_projected_rate(self, amt, time_at_consumption):
+ """Get the projected rate using a provided amount and time
+
+ :type amt: int
+ :param amt: The proposed amount to consume
+
+ :type time_at_consumption: float
+ :param time_at_consumption: The proposed time to consume at
+
+ :rtype: float
+ :returns: The consumption rate if that amt and time were consumed
+ """
+ if self._last_time is None:
+ return 0.0
+ return self._calculate_exponential_moving_average_rate(
+ amt, time_at_consumption)
+
+ def record_consumption_rate(self, amt, time_at_consumption):
+ """Record the consumption rate based off amount and time point
+
+ :type amt: int
+ :param amt: The amount that got consumed
+
+ :type time_at_consumption: float
+ :param time_at_consumption: The time at which the amount was consumed
+ """
+ if self._last_time is None:
+ self._last_time = time_at_consumption
+ self._current_rate = 0.0
+ return
+ self._current_rate = self._calculate_exponential_moving_average_rate(
+ amt, time_at_consumption)
+ self._last_time = time_at_consumption
+
+ def _calculate_rate(self, amt, time_at_consumption):
+ time_delta = time_at_consumption - self._last_time
+ if time_delta <= 0:
+ # While it is really unlikley to see this in an actual transfer,
+ # we do not want to be returning back a negative rate or try to
+ # divide the amount by zero. So instead return back an infinite
+ # rate as the time delta is infinitesimally small.
+ return float('inf')
+ return amt / (time_delta)
+
+ def _calculate_exponential_moving_average_rate(self, amt,
+ time_at_consumption):
+ new_rate = self._calculate_rate(amt, time_at_consumption)
+ return self._alpha * new_rate + (1 - self._alpha) * self._current_rate
diff --git a/s3transfer/download.py b/s3transfer/download.py
index b60d398..9b1a49e 100644
--- a/s3transfer/download.py
+++ b/s3transfer/download.py
@@ -317,7 +317,7 @@ class DownloadSubmissionTask(SubmissionTask):
fileobj, type(fileobj)))
def _submit(self, client, config, osutil, request_executor, io_executor,
- transfer_future):
+ transfer_future, bandwidth_limiter=None):
"""
:param client: The client associated with the transfer manager
@@ -339,6 +339,10 @@ class DownloadSubmissionTask(SubmissionTask):
:type transfer_future: s3transfer.futures.TransferFuture
:param transfer_future: The transfer future associated with the
transfer request that tasks are being submitted for
+
+ :type bandwidth_limiter: s3transfer.bandwidth.BandwidthLimiter
+ :param bandwidth_limiter: The bandwidth limiter to use when
+ downloading streams
"""
if transfer_future.meta.size is None:
# If a size was not provided figure out the size for the
@@ -360,15 +364,16 @@ class DownloadSubmissionTask(SubmissionTask):
if transfer_future.meta.size < config.multipart_threshold:
self._submit_download_request(
client, config, osutil, request_executor, io_executor,
- download_output_manager, transfer_future)
+ download_output_manager, transfer_future, bandwidth_limiter)
else:
self._submit_ranged_download_request(
client, config, osutil, request_executor, io_executor,
- download_output_manager, transfer_future)
+ download_output_manager, transfer_future, bandwidth_limiter)
def _submit_download_request(self, client, config, osutil,
request_executor, io_executor,
- download_output_manager, transfer_future):
+ download_output_manager, transfer_future,
+ bandwidth_limiter):
call_args = transfer_future.meta.call_args
# Get a handle to the file that will be used for writing downloaded
@@ -400,6 +405,7 @@ class DownloadSubmissionTask(SubmissionTask):
'max_attempts': config.num_download_attempts,
'download_output_manager': download_output_manager,
'io_chunksize': config.io_chunksize,
+ 'bandwidth_limiter': bandwidth_limiter
},
done_callbacks=[final_task]
),
@@ -409,7 +415,8 @@ class DownloadSubmissionTask(SubmissionTask):
def _submit_ranged_download_request(self, client, config, osutil,
request_executor, io_executor,
download_output_manager,
- transfer_future):
+ transfer_future,
+ bandwidth_limiter):
call_args = transfer_future.meta.call_args
# Get the needed progress callbacks for the task
@@ -461,6 +468,7 @@ class DownloadSubmissionTask(SubmissionTask):
'start_index': i * part_size,
'download_output_manager': download_output_manager,
'io_chunksize': config.io_chunksize,
+ 'bandwidth_limiter': bandwidth_limiter
},
done_callbacks=[finalize_download_invoker.decrement]
),
@@ -488,7 +496,7 @@ class DownloadSubmissionTask(SubmissionTask):
class GetObjectTask(Task):
def _main(self, client, bucket, key, fileobj, extra_args, callbacks,
max_attempts, download_output_manager, io_chunksize,
- start_index=0):
+ start_index=0, bandwidth_limiter=None):
"""Downloads an object and places content into io queue
:param client: The client to use when calling GetObject
@@ -504,6 +512,8 @@ class GetObjectTask(Task):
download stream and queue in the io queue.
:param start_index: The location in the file to start writing the
content of the key to.
+ :param bandwidth_limiter: The bandwidth limiter to use when throttling
+ the downloading of data in streams.
"""
last_exception = None
for i in range(max_attempts):
@@ -512,6 +522,10 @@ class GetObjectTask(Task):
Bucket=bucket, Key=key, **extra_args)
streaming_body = StreamReaderProgress(
response['Body'], callbacks)
+ if bandwidth_limiter:
+ streaming_body = \
+ bandwidth_limiter.get_bandwith_limited_stream(
+ streaming_body, self._transfer_coordinator)
current_index = start_index
chunks = DownloadChunkIterator(streaming_body, io_chunksize)
diff --git a/s3transfer/manager.py b/s3transfer/manager.py
index ba5eb61..fb7a9cb 100644
--- a/s3transfer/manager.py
+++ b/s3transfer/manager.py
@@ -14,9 +14,11 @@ import copy
import logging
import threading
+from botocore.compat import six
+
from s3transfer.utils import get_callbacks
-from s3transfer.utils import disable_upload_callbacks
-from s3transfer.utils import enable_upload_callbacks
+from s3transfer.utils import signal_transferring
+from s3transfer.utils import signal_not_transferring
from s3transfer.utils import CallArgs
from s3transfer.utils import OSUtils
from s3transfer.utils import TaskSemaphore
@@ -33,6 +35,8 @@ from s3transfer.download import DownloadSubmissionTask
from s3transfer.upload import UploadSubmissionTask
from s3transfer.copies import CopySubmissionTask
from s3transfer.delete import DeleteSubmissionTask
+from s3transfer.bandwidth import LeakyBucket
+from s3transfer.bandwidth import BandwidthLimiter
KB = 1024
MB = KB * KB
@@ -51,7 +55,8 @@ class TransferConfig(object):
io_chunksize=256 * KB,
num_download_attempts=5,
max_in_memory_upload_chunks=10,
- max_in_memory_download_chunks=10):
+ max_in_memory_download_chunks=10,
+ max_bandwidth=None):
"""Configurations for the transfer mangager
:param multipart_threshold: The threshold for which multipart
@@ -122,6 +127,9 @@ class TransferConfig(object):
max_in_memory_download_chunks * multipart_chunksize
+ :param max_bandwidth: The maximum bandwidth that will be consumed
+ in uploading and downloading file content. The value is in terms of
+ bytes per second.
"""
self.multipart_threshold = multipart_threshold
self.multipart_chunksize = multipart_chunksize
@@ -134,11 +142,12 @@ class TransferConfig(object):
self.num_download_attempts = num_download_attempts
self.max_in_memory_upload_chunks = max_in_memory_upload_chunks
self.max_in_memory_download_chunks = max_in_memory_download_chunks
+ self.max_bandwidth = max_bandwidth
self._validate_attrs_are_nonzero()
def _validate_attrs_are_nonzero(self):
for attr, attr_val, in self.__dict__.items():
- if attr_val <= 0:
+ if attr_val is not None and attr_val <= 0:
raise ValueError(
'Provided parameter %s of value %s must be greater than '
'0.' % (attr, attr_val))
@@ -245,6 +254,16 @@ class TransferManager(object):
max_num_threads=1,
executor_cls=executor_cls
)
+
+ # The component responsible for limiting bandwidth usage if it
+ # is configured.
+ self._bandwidth_limiter = None
+ if self._config.max_bandwidth is not None:
+ logger.debug(
+ 'Setting max_bandwidth to %s', self._config.max_bandwidth)
+ leaky_bucket = LeakyBucket(self._config.max_bandwidth)
+ self._bandwidth_limiter = BandwidthLimiter(leaky_bucket)
+
self._register_handlers()
def upload(self, fileobj, bucket, key, extra_args=None, subscribers=None):
@@ -282,7 +301,11 @@ class TransferManager(object):
fileobj=fileobj, bucket=bucket, key=key, extra_args=extra_args,
subscribers=subscribers
)
- return self._submit_transfer(call_args, UploadSubmissionTask)
+ extra_main_kwargs = {}
+ if self._bandwidth_limiter:
+ extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter
+ return self._submit_transfer(
+ call_args, UploadSubmissionTask, extra_main_kwargs)
def download(self, bucket, key, fileobj, extra_args=None,
subscribers=None):
@@ -318,8 +341,11 @@ class TransferManager(object):
bucket=bucket, key=key, fileobj=fileobj, extra_args=extra_args,
subscribers=subscribers
)
- return self._submit_transfer(call_args, DownloadSubmissionTask,
- {'io_executor': self._io_executor})
+ extra_main_kwargs = {'io_executor': self._io_executor}
+ if self._bandwidth_limiter:
+ extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter
+ return self._submit_transfer(
+ call_args, DownloadSubmissionTask, extra_main_kwargs)
def copy(self, copy_source, bucket, key, extra_args=None,
subscribers=None, source_client=None):
@@ -479,11 +505,11 @@ class TransferManager(object):
# Register handlers to enable/disable callbacks on uploads.
event_name = 'request-created.s3'
self._client.meta.events.register_first(
- event_name, disable_upload_callbacks,
- unique_id='s3upload-callback-disable')
+ event_name, signal_not_transferring,
+ unique_id='s3upload-not-transferring')
self._client.meta.events.register_last(
- event_name, enable_upload_callbacks,
- unique_id='s3upload-callback-enable')
+ event_name, signal_transferring,
+ unique_id='s3upload-transferring')
def __enter__(self):
return self
@@ -496,7 +522,7 @@ class TransferManager(object):
# all of the inprogress futures in the shutdown.
if exc_type:
cancel = True
- cancel_msg = str(exc_value)
+ cancel_msg = six.text_type(exc_value)
if not cancel_msg:
cancel_msg = repr(exc_value)
# If it was a KeyboardInterrupt, the cancellation was initiated
diff --git a/s3transfer/tasks.py b/s3transfer/tasks.py
index 4427927..ae4abdd 100644
--- a/s3transfer/tasks.py
+++ b/s3transfer/tasks.py
@@ -254,7 +254,7 @@ class SubmissionTask(Task):
# transfer.
self._submit(transfer_future=transfer_future, **kwargs)
except BaseException as e:
- # If there was an exception rasied during the submission of task
+ # If there was an exception raised during the submission of task
# there is a chance that the final task that signals if a transfer
# is done and too run the cleanup may never have been submitted in
# the first place so we need to account accordingly.
diff --git a/s3transfer/upload.py b/s3transfer/upload.py
index d9a9ec5..0c2feda 100644
--- a/s3transfer/upload.py
+++ b/s3transfer/upload.py
@@ -115,9 +115,10 @@ class UploadInputManager(object):
that may be accepted. All implementations must subclass and override
public methods from this class.
"""
- def __init__(self, osutil, transfer_coordinator):
+ def __init__(self, osutil, transfer_coordinator, bandwidth_limiter=None):
self._osutil = osutil
self._transfer_coordinator = transfer_coordinator
+ self._bandwidth_limiter = bandwidth_limiter
@classmethod
def is_compatible(cls, upload_source):
@@ -200,8 +201,12 @@ class UploadInputManager(object):
"""
raise NotImplementedError('must implement yield_upload_part_bodies()')
- def _wrap_with_interrupt_reader(self, fileobj):
- return InterruptReader(fileobj, self._transfer_coordinator)
+ def _wrap_fileobj(self, fileobj):
+ fileobj = InterruptReader(fileobj, self._transfer_coordinator)
+ if self._bandwidth_limiter:
+ fileobj = self._bandwidth_limiter.get_bandwith_limited_stream(
+ fileobj, self._transfer_coordinator, enabled=False)
+ return fileobj
def _get_progress_callbacks(self, transfer_future):
callbacks = get_callbacks(transfer_future, 'progress')
@@ -241,7 +246,7 @@ class UploadFilenameInputManager(UploadInputManager):
# Wrap fileobj with interrupt reader that will quickly cancel
# uploads if needed instead of having to wait for the socket
# to completely read all of the data.
- fileobj = self._wrap_with_interrupt_reader(fileobj)
+ fileobj = self._wrap_fileobj(fileobj)
callbacks = self._get_progress_callbacks(transfer_future)
close_callbacks = self._get_close_callbacks(callbacks)
@@ -268,7 +273,7 @@ class UploadFilenameInputManager(UploadInputManager):
# Wrap fileobj with interrupt reader that will quickly cancel
# uploads if needed instead of having to wait for the socket
# to completely read all of the data.
- fileobj = self._wrap_with_interrupt_reader(fileobj)
+ fileobj = self._wrap_fileobj(fileobj)
# Wrap the file-like object into a ReadFileChunk to get progress.
read_file_chunk = self._osutil.open_file_chunk_reader_from_fileobj(
@@ -346,9 +351,9 @@ class UploadSeekableInputManager(UploadFilenameInputManager):
class UploadNonSeekableInputManager(UploadInputManager):
"""Upload utility for a file-like object that cannot seek."""
- def __init__(self, osutil, transfer_coordinator):
+ def __init__(self, osutil, transfer_coordinator, bandwidth_limiter=None):
super(UploadNonSeekableInputManager, self).__init__(
- osutil, transfer_coordinator)
+ osutil, transfer_coordinator, bandwidth_limiter)
self._initial_data = b''
@classmethod
@@ -470,7 +475,7 @@ class UploadNonSeekableInputManager(UploadInputManager):
:return: Fully wrapped data.
"""
- fileobj = self._wrap_with_interrupt_reader(six.BytesIO(data))
+ fileobj = self._wrap_fileobj(six.BytesIO(data))
return self._osutil.open_file_chunk_reader_from_fileobj(
fileobj=fileobj, chunk_size=len(data), full_file_size=len(data),
callbacks=callbacks, close_callbacks=close_callbacks)
@@ -511,7 +516,7 @@ class UploadSubmissionTask(SubmissionTask):
fileobj, type(fileobj)))
def _submit(self, client, config, osutil, request_executor,
- transfer_future):
+ transfer_future, bandwidth_limiter=None):
"""
:param client: The client associated with the transfer manager
@@ -531,7 +536,8 @@ class UploadSubmissionTask(SubmissionTask):
transfer request that tasks are being submitted for
"""
upload_input_manager = self._get_upload_input_manager_cls(
- transfer_future)(osutil, self._transfer_coordinator)
+ transfer_future)(
+ osutil, self._transfer_coordinator, bandwidth_limiter)
# Determine the size if it was not provided
if transfer_future.meta.size is None:
diff --git a/s3transfer/utils.py b/s3transfer/utils.py
index 65b34b8..fba5648 100644
--- a/s3transfer/utils.py
+++ b/s3transfer/utils.py
@@ -39,16 +39,16 @@ def random_file_extension(num_digits=8):
return ''.join(random.choice(string.hexdigits) for _ in range(num_digits))
-def disable_upload_callbacks(request, operation_name, **kwargs):
+def signal_not_transferring(request, operation_name, **kwargs):
if operation_name in ['PutObject', 'UploadPart'] and \
- hasattr(request.body, 'disable_callback'):
- request.body.disable_callback()
+ hasattr(request.body, 'signal_not_transferring'):
+ request.body.signal_not_transferring()
-def enable_upload_callbacks(request, operation_name, **kwargs):
+def signal_transferring(request, operation_name, **kwargs):
if operation_name in ['PutObject', 'UploadPart'] and \
- hasattr(request.body, 'enable_callback'):
- request.body.enable_callback()
+ hasattr(request.body, 'signal_transferring'):
+ request.body.signal_transferring()
def calculate_range_parameter(part_size, part_index, num_parts,
@@ -433,6 +433,16 @@ class ReadFileChunk(object):
invoke_progress_callbacks(self._callbacks, len(data))
return data
+ def signal_transferring(self):
+ self.enable_callback()
+ if hasattr(self._fileobj, 'signal_transferring'):
+ self._fileobj.signal_transferring()
+
+ def signal_not_transferring(self):
+ self.disable_callback()
+ if hasattr(self._fileobj, 'signal_not_transferring'):
+ self._fileobj.signal_not_transferring()
+
def enable_callback(self):
self._callbacks_enabled = True
diff --git a/scripts/new-change b/scripts/new-change
index 6d56220..59109fc 100755
--- a/scripts/new-change
+++ b/scripts/new-change
@@ -53,7 +53,12 @@ CHANGES_DIR = os.path.join(
'.changes'
)
TEMPLATE = """\
-# Type should be one of: feature, bugfix
+# Type should be one of: feature, bugfix, enhancement, api-change
+# feature: A larger feature or change in behavior, usually resulting in a
+# minor version bump.
+# bugfix: Fixing a bug in an existing code path.
+# enhancment: Small change to an underlying implementation detail.
+# api-change: Changes to a modeled API.
type: {change_type}
# Category is the high level feature area.
@@ -194,7 +199,8 @@ def parse_filled_in_contents(contents):
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-t', '--type', dest='change_type',
- default='', choices=('bugfix', 'feature'))
+ default='', choices=('bugfix', 'feature',
+ 'enhancement', 'api-change'))
parser.add_argument('-c', '--category', dest='category',
default='')
parser.add_argument('-d', '--description', dest='description',
diff --git a/setup.cfg b/setup.cfg
index aa2f8b0..ea1c571 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -1,4 +1,4 @@
-[wheel]
+[bdist_wheel]
universal = 1
[metadata]
diff --git a/setup.py b/setup.py
index 942edf9..47dd267 100644
--- a/setup.py
+++ b/setup.py
@@ -54,5 +54,6 @@ setup(
'Programming Language :: Python :: 3.3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
+ 'Programming Language :: Python :: 3.6',
),
)
diff --git a/tests/functional/test_download.py b/tests/functional/test_download.py
index 4d99913..d15d901 100644
--- a/tests/functional/test_download.py
+++ b/tests/functional/test_download.py
@@ -13,6 +13,7 @@
import copy
import os
import tempfile
+import time
import shutil
import glob
@@ -381,6 +382,34 @@ class TestNonRangedDownload(BaseDownloadTest):
with open(self.filename, 'rb') as f:
self.assertEqual(b'', f.read())
+ def test_uses_bandwidth_limiter(self):
+ self.content = b'a' * 1024 * 1024
+ self.stream = six.BytesIO(self.content)
+ self.config = TransferConfig(
+ max_request_concurrency=1, max_bandwidth=len(self.content)/2)
+ self._manager = TransferManager(self.client, self.config)
+
+ self.add_head_object_response()
+ self.add_successful_get_object_responses()
+
+ start = time.time()
+ future = self.manager.download(
+ self.bucket, self.key, self.filename, self.extra_args)
+ future.result()
+ # This is just a smoke test to make sure that the limiter is
+ # being used and not necessary its exactness. So we set the maximum
+ # bandwidth to len(content)/2 per sec and make sure that it is
+ # noticeably slower. Ideally it will take more than two seconds, but
+ # given tracking at the beginning of transfers are not entirely
+ # accurate setting at the initial start of a transfer, we give us
+ # some flexibility by setting the expected time to half of the
+ # theoretical time to take.
+ self.assertGreaterEqual(time.time() - start, 1)
+
+ # Ensure that the contents are correct
+ with open(self.filename, 'rb') as f:
+ self.assertEqual(self.content, f.read())
+
class TestRangedDownload(BaseDownloadTest):
# TODO: If you want to add tests outside of this test class and still
diff --git a/tests/functional/test_manager.py b/tests/functional/test_manager.py
... 751 lines suppressed ...
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/python-s3transfer.git
More information about the Python-modules-commits
mailing list