[Python-modules-commits] [awscli] 01/05: New upstream version 1.11.13
Takaki Taniguchi
takaki at moszumanska.debian.org
Thu Nov 10 10:55:20 UTC 2016
This is an automated email from the git hooks/post-receive script.
takaki pushed a commit to branch master
in repository awscli.
commit 3459a23b5bb3950729ff2953064cc98480fcf0a8
Author: TANIGUCHI Takaki <takaki at asis.media-as.org>
Date: Thu Nov 10 19:44:54 2016 +0900
New upstream version 1.11.13
---
PKG-INFO | 2 +-
awscli.egg-info/PKG-INFO | 2 +-
awscli.egg-info/SOURCES.txt | 2 -
awscli.egg-info/requires.txt | 4 +-
awscli/__init__.py | 2 +-
awscli/customizations/cloudtrail/subscribe.py | 6 +-
awscli/customizations/cloudtrail/utils.py | 8 +-
awscli/customizations/s3/executor.py | 366 ------------
awscli/customizations/s3/fileinfo.py | 220 +-------
awscli/customizations/s3/results.py | 12 +-
awscli/customizations/s3/s3handler.py | 378 +------------
awscli/customizations/s3/subcommands.py | 37 +-
awscli/customizations/s3/tasks.py | 779 --------------------------
awscli/customizations/s3/utils.py | 213 ++-----
awscli/examples/s3/sync.rst | 4 +-
setup.cfg | 4 +-
setup.py | 4 +-
17 files changed, 125 insertions(+), 1918 deletions(-)
diff --git a/PKG-INFO b/PKG-INFO
index aa414d7..2463d01 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,6 +1,6 @@
Metadata-Version: 1.1
Name: awscli
-Version: 1.11.7
+Version: 1.11.13
Summary: Universal Command Line Environment for AWS.
Home-page: http://aws.amazon.com/cli/
Author: Amazon Web Services
diff --git a/awscli.egg-info/PKG-INFO b/awscli.egg-info/PKG-INFO
index aa414d7..2463d01 100644
--- a/awscli.egg-info/PKG-INFO
+++ b/awscli.egg-info/PKG-INFO
@@ -1,6 +1,6 @@
Metadata-Version: 1.1
Name: awscli
-Version: 1.11.7
+Version: 1.11.13
Summary: Universal Command Line Environment for AWS.
Home-page: http://aws.amazon.com/cli/
Author: Amazon Web Services
diff --git a/awscli.egg-info/SOURCES.txt b/awscli.egg-info/SOURCES.txt
index 497b202..0fa0ff7 100644
--- a/awscli.egg-info/SOURCES.txt
+++ b/awscli.egg-info/SOURCES.txt
@@ -137,7 +137,6 @@ awscli/customizations/gamelift/getlog.py
awscli/customizations/gamelift/uploadbuild.py
awscli/customizations/s3/__init__.py
awscli/customizations/s3/comparator.py
-awscli/customizations/s3/executor.py
awscli/customizations/s3/fileformat.py
awscli/customizations/s3/filegenerator.py
awscli/customizations/s3/fileinfo.py
@@ -147,7 +146,6 @@ awscli/customizations/s3/results.py
awscli/customizations/s3/s3.py
awscli/customizations/s3/s3handler.py
awscli/customizations/s3/subcommands.py
-awscli/customizations/s3/tasks.py
awscli/customizations/s3/transferconfig.py
awscli/customizations/s3/utils.py
awscli/customizations/s3/syncstrategy/__init__.py
diff --git a/awscli.egg-info/requires.txt b/awscli.egg-info/requires.txt
index 56ca809..9221e08 100644
--- a/awscli.egg-info/requires.txt
+++ b/awscli.egg-info/requires.txt
@@ -1,8 +1,8 @@
-botocore==1.4.64
+botocore==1.4.70
colorama>=0.2.5,<=0.3.7
docutils>=0.10
rsa>=3.1.2,<=3.5.0
-s3transfer>=0.1.5,<0.2.0
+s3transfer>=0.1.9,<0.2.0
[:python_version=="2.6"]
argparse>=1.1
\ No newline at end of file
diff --git a/awscli/__init__.py b/awscli/__init__.py
index 7e21cbe..cd86c03 100644
--- a/awscli/__init__.py
+++ b/awscli/__init__.py
@@ -17,7 +17,7 @@ A Universal Command Line Environment for Amazon Web Services.
"""
import os
-__version__ = '1.11.7'
+__version__ = '1.11.13'
#
# Get our data path to be added to botocore's search path
diff --git a/awscli/customizations/cloudtrail/subscribe.py b/awscli/customizations/cloudtrail/subscribe.py
index bfe8be6..7028491 100644
--- a/awscli/customizations/cloudtrail/subscribe.py
+++ b/awscli/customizations/cloudtrail/subscribe.py
@@ -80,7 +80,7 @@ class CloudTrailSubscribe(BasicCommand):
# Initialize services
LOG.debug('Initializing S3, SNS and CloudTrail...')
- self.iam = self._session.create_client('iam', **client_args)
+ self.sts = self._session.create_client('sts', **client_args)
self.s3 = self._session.create_client('s3', **client_args)
self.sns = self._session.create_client('sns', **client_args)
self.region_name = self.s3.meta.region_name
@@ -187,7 +187,7 @@ class CloudTrailSubscribe(BasicCommand):
sys.stdout.write(
'Setting up new S3 bucket {bucket}...\n'.format(bucket=bucket))
- account_id = get_account_id(self.iam)
+ account_id = get_account_id(self.sts)
# Clean up the prefix - it requires a trailing slash if set
if prefix and not prefix.endswith('/'):
@@ -239,7 +239,7 @@ class CloudTrailSubscribe(BasicCommand):
sys.stdout.write(
'Setting up new SNS topic {topic}...\n'.format(topic=topic))
- account_id = get_account_id(self.iam)
+ account_id = get_account_id(self.sts)
# Make sure topic doesn't already exist
# Warn but do not fail if ListTopics permissions
diff --git a/awscli/customizations/cloudtrail/utils.py b/awscli/customizations/cloudtrail/utils.py
index e886f01..2d9495f 100644
--- a/awscli/customizations/cloudtrail/utils.py
+++ b/awscli/customizations/cloudtrail/utils.py
@@ -17,10 +17,10 @@ def get_account_id_from_arn(trail_arn):
return trail_arn.split(':')[4]
-def get_account_id(iam_client):
- """Retrieve the AWS account ID for the authenticated user"""
- response = iam_client.get_user()
- return get_account_id_from_arn(response['User']['Arn'])
+def get_account_id(sts_client):
+ """Retrieve the AWS account ID for the authenticated user or role"""
+ response = sts_client.get_caller_identity()
+ return response['Account']
def get_trail_by_arn(cloudtrail_client, trail_arn):
diff --git a/awscli/customizations/s3/executor.py b/awscli/customizations/s3/executor.py
deleted file mode 100644
index 758297d..0000000
--- a/awscli/customizations/s3/executor.py
+++ /dev/null
@@ -1,366 +0,0 @@
-# Copyright 2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"). You
-# may not use this file except in compliance with the License. A copy of
-# the License is located at
-#
-# http://aws.amazon.com/apache2.0/
-#
-# or in the "license" file accompanying this file. This file is
-# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
-# ANY KIND, either express or implied. See the License for the specific
-# language governing permissions and limitations under the License.
-import os
-import logging
-import sys
-import threading
-
-from awscli.customizations.s3.utils import uni_print, \
- IORequest, IOCloseRequest, StablePriorityQueue, set_file_utime
-from awscli.customizations.s3.tasks import OrderableTask
-from awscli.compat import queue, bytes_print
-
-
-LOGGER = logging.getLogger(__name__)
-
-
-class ShutdownThreadRequest(OrderableTask):
- PRIORITY = 11
-
- def __init__(self, priority_override=None):
- if priority_override is not None:
- self.PRIORITY = priority_override
-
-
-class Executor(object):
- """
- This class is in charge of all of the threads. It starts up the threads
- and cleans up the threads when finished. The two type of threads the
- ``Executor``runs is a worker and a print thread.
- """
- STANDARD_PRIORITY = 11
- IMMEDIATE_PRIORITY = 1
-
- def __init__(self, num_threads, result_queue, quiet,
- only_show_errors, max_queue_size, write_queue):
- self._max_queue_size = max_queue_size
- LOGGER.debug("Using max queue size for s3 tasks of: %s",
- self._max_queue_size)
- self.queue = StablePriorityQueue(maxsize=self._max_queue_size,
- max_priority=20)
- self.num_threads = num_threads
- self.result_queue = result_queue
- self.quiet = quiet
- self.only_show_errors = only_show_errors
- self.threads_list = []
- self.write_queue = write_queue
- self.print_thread = PrintThread(self.result_queue, self.quiet,
- self.only_show_errors)
- self.print_thread.daemon = True
- self.io_thread = IOWriterThread(self.write_queue)
-
- @property
- def num_tasks_failed(self):
- tasks_failed = 0
- if self.print_thread is not None:
- tasks_failed = self.print_thread.num_errors_seen
- return tasks_failed
-
- @property
- def num_tasks_warned(self):
- tasks_warned = 0
- if self.print_thread is not None:
- tasks_warned = self.print_thread.num_warnings_seen
- return tasks_warned
-
- def start(self):
- self.io_thread.start()
- # Note that we're *not* adding the IO thread to the threads_list.
- # There's a specific shutdown order we need and we're going to be
- # explicit about it rather than relying on the threads_list order.
- # See .join() for more info.
- self.print_thread.start()
- LOGGER.debug("Using a threadpool size of: %s", self.num_threads)
- for i in range(self.num_threads):
- worker = Worker(queue=self.queue)
- worker.setDaemon(True)
- self.threads_list.append(worker)
- worker.start()
-
- def submit(self, task):
- """
- This is the function used to submit a task to the ``Executor``.
- """
- LOGGER.debug("Submitting task: %s", task)
- self.queue.put(task)
-
- def initiate_shutdown(self, priority=STANDARD_PRIORITY):
- """Instruct all threads to shutdown.
-
- This is a graceful shutdown. It will wait until all
- currently queued tasks have been completed before the threads
- shutdown. If the task queue is completely full, it may
- take a while for the threads to shutdown.
-
- This method does not block. Once ``initiate_shutdown`` has
- been called, you can all ``wait_until_shutdown`` to block
- until the Executor has been shutdown.
-
- """
- # Implementation detail: we only queue the worker threads
- # to shutdown. The print/io threads are shutdown in the
- # ``wait_until_shutdown`` method.
- for i in range(self.num_threads):
- LOGGER.debug(
- "Queueing end sentinel for worker thread (priority: %s)",
- priority)
- self.queue.put(ShutdownThreadRequest(priority))
-
- def wait_until_shutdown(self):
- """Block until the Executor is fully shutdown.
-
- This will wait until all worker threads are shutdown, along
- with any additional helper threads used by the executor.
-
- """
- for thread in self.threads_list:
- LOGGER.debug("Waiting for thread to shutdown: %s", thread)
- while True:
- thread.join(timeout=1)
- if not thread.is_alive():
- break
- LOGGER.debug("Thread has been shutdown: %s", thread)
-
- LOGGER.debug("Queueing end sentinel for result thread.")
- self.result_queue.put(ShutdownThreadRequest())
- LOGGER.debug("Queueing end sentinel for IO thread.")
- self.write_queue.put(ShutdownThreadRequest())
-
- LOGGER.debug("Waiting for result thread to shutdown.")
- self.print_thread.join()
- LOGGER.debug("Waiting for IO thread to shutdown.")
- self.io_thread.join()
- LOGGER.debug("All threads have been shutdown.")
-
-
-class IOWriterThread(threading.Thread):
- def __init__(self, queue):
- threading.Thread.__init__(self)
- self.queue = queue
- self.fd_descriptor_cache = {}
-
- def run(self):
- while True:
- task = self.queue.get(True)
- if isinstance(task, ShutdownThreadRequest):
- LOGGER.debug("Shutdown request received in IO thread, "
- "shutting down.")
- self._cleanup()
- return
- try:
- self._handle_task(task)
- except Exception as e:
- LOGGER.debug(
- "Error processing IO request: %s", e, exc_info=True)
-
- def _handle_task(self, task):
- if isinstance(task, IORequest):
- filename, offset, data, is_stream = task
- if is_stream:
- self._handle_stream_task(data)
- else:
- self._handle_file_write_task(filename, offset, data)
- elif isinstance(task, IOCloseRequest):
- self._handle_io_close_request(task)
-
- def _handle_io_close_request(self, task):
- LOGGER.debug("IOCloseRequest received for %s, closing file.",
- task.filename)
- fileobj = self.fd_descriptor_cache.get(task.filename)
- if fileobj is not None:
- fileobj.close()
- del self.fd_descriptor_cache[task.filename]
- if task.desired_mtime is not None:
- set_file_utime(task.filename, task.desired_mtime)
-
- def _handle_stream_task(self, data):
- fileobj = sys.stdout
- bytes_print(data)
- fileobj.flush()
-
- def _handle_file_write_task(self, filename, offset, data):
- fileobj = self.fd_descriptor_cache.get(filename)
- if fileobj is None:
- fileobj = open(filename, 'rb+')
- self.fd_descriptor_cache[filename] = fileobj
- LOGGER.debug("Writing data to: %s, offset: %s",
- filename, offset)
- fileobj.seek(offset)
- fileobj.write(data)
-
- def _cleanup(self):
- for fileobj in self.fd_descriptor_cache.values():
- fileobj.close()
-
-
-class Worker(threading.Thread):
- """
- This thread is in charge of performing the tasks provided via
- the main queue ``queue``.
- """
- def __init__(self, queue):
- threading.Thread.__init__(self)
- # This is the queue where work (tasks) are submitted.
- self.queue = queue
-
- def run(self):
- while True:
- try:
- function = self.queue.get(True)
- if isinstance(function, ShutdownThreadRequest):
- LOGGER.debug("Shutdown request received in worker thread, "
- "shutting down worker thread.")
- break
- try:
- LOGGER.debug("Worker thread invoking task: %s", function)
- function()
- except Exception as e:
- LOGGER.debug('Error calling task: %s', e, exc_info=True)
- except queue.Empty:
- pass
-
-
-class PrintThread(threading.Thread):
- """
- This thread controls the printing of results. When a task is
- completely finished it is permanently write the result to standard
- out. Otherwise, it is a part of a multipart upload/download and
- only shows the most current part upload/download.
-
- Result Queue
- ------------
-
- Result queue items are PrintTask objects that have the following
- attributes:
-
- * message: An arbitrary string associated with the entry. This
- can be used to communicate the result of the task.
- * error: Boolean indicating whether or not the task completely
- successfully.
- * total_parts: The total number of parts for multipart transfers (
- deprecated, will be removed in the future).
- * warning: Boolean indicating whether or not a file generated a
- warning.
-
- """
- def __init__(self, result_queue, quiet, only_show_errors):
- threading.Thread.__init__(self)
- self._progress_dict = {}
- self._result_queue = result_queue
- self._quiet = quiet
- self._only_show_errors = only_show_errors
- self._progress_length = 0
- self._num_parts = 0
- self._file_count = 0
- self._lock = threading.Lock()
- self._needs_newline = False
-
- self._total_parts = '...'
- self._total_files = '...'
-
- # This is a public attribute that clients can inspect to determine
- # whether or not we saw any results indicating that an error occurred.
- self.num_errors_seen = 0
- self.num_warnings_seen = 0
-
- def set_total_parts(self, total_parts):
- with self._lock:
- self._total_parts = total_parts
-
- def set_total_files(self, total_files):
- with self._lock:
- self._total_files = total_files
-
- def run(self):
- while True:
- try:
- print_task = self._result_queue.get(True)
- if isinstance(print_task, ShutdownThreadRequest):
- if self._needs_newline:
- sys.stdout.write('\n')
- LOGGER.debug("Shutdown request received in print thread, "
- "shutting down print thread.")
- break
- LOGGER.debug("Received print task: %s", print_task)
- try:
- self._process_print_task(print_task)
- except Exception as e:
- LOGGER.debug("Error processing print task: %s", e,
- exc_info=True)
- except queue.Empty:
- pass
-
- def _process_print_task(self, print_task):
- print_str = print_task.message
- print_to_stderr = False
- if print_task.error:
- self.num_errors_seen += 1
- print_to_stderr = True
-
- final_str = ''
- if print_task.warning:
- self.num_warnings_seen += 1
- print_to_stderr = True
- final_str += print_str.ljust(self._progress_length, ' ')
- final_str += '\n'
- elif print_task.total_parts:
- # Normalize keys so failures and sucess
- # look the same.
- op_list = print_str.split(':')
- print_str = ':'.join(op_list[1:])
- total_part = print_task.total_parts
- self._num_parts += 1
- if print_str in self._progress_dict:
- self._progress_dict[print_str]['parts'] += 1
- else:
- self._progress_dict[print_str] = {}
- self._progress_dict[print_str]['parts'] = 1
- self._progress_dict[print_str]['total'] = total_part
- else:
- print_components = print_str.split(':')
- final_str += print_str.ljust(self._progress_length, ' ')
- final_str += '\n'
- key = ':'.join(print_components[1:])
- if key in self._progress_dict:
- self._progress_dict.pop(print_str, None)
- else:
- self._num_parts += 1
- self._file_count += 1
-
- # If the message is an error or warning, print it to standard error.
- if print_to_stderr and not self._quiet:
- uni_print(final_str, sys.stderr)
- final_str = ''
-
- is_done = self._total_files == self._file_count
- if not is_done:
- final_str += self._make_progress_bar()
- if not (self._quiet or self._only_show_errors):
- uni_print(final_str)
- self._needs_newline = not final_str.endswith('\n')
-
- def _make_progress_bar(self):
- """Creates the progress bar string to print out."""
-
- prog_str = "Completed %s " % self._num_parts
- num_files = self._total_files
- if self._total_files != '...':
- prog_str += "of %s " % self._total_parts
- num_files = self._total_files - self._file_count
- prog_str += "part(s) with %s file(s) remaining" % \
- num_files
- length_prog = len(prog_str)
- prog_str += '\r'
- prog_str = prog_str.ljust(self._progress_length, ' ')
- self._progress_length = length_prog
- return prog_str
diff --git a/awscli/customizations/s3/fileinfo.py b/awscli/customizations/s3/fileinfo.py
index c0e0a9c..66b8c85 100644
--- a/awscli/customizations/s3/fileinfo.py
+++ b/awscli/customizations/s3/fileinfo.py
@@ -1,106 +1,15 @@
-import os
-import logging
-import sys
-import time
-from functools import partial
-import errno
-import hashlib
-
-from botocore.compat import MD5_AVAILABLE
-from awscli.customizations.s3.utils import (
- find_bucket_key, guess_content_type, CreateDirectoryError, MD5Error,
- bytes_print, set_file_utime, RequestParamsMapper)
-from awscli.compat import bytes_print
-
-
-LOGGER = logging.getLogger(__name__)
-
-
-def read_file(filename):
- """
- This reads the file into a form that can be sent to S3
- """
- with open(filename, 'rb') as in_file:
- return in_file.read()
-
-
-def save_file(filename, response_data, last_update, is_stream=False):
- """
- This writes to the file upon downloading. It reads the data in the
- response. Makes a new directory if needed and then writes the
- data to the file. It also modifies the last modified time to that
- of the S3 object.
- """
- body = response_data['Body']
- etag = response_data['ETag'][1:-1]
- if not is_stream:
- d = os.path.dirname(filename)
- try:
- if not os.path.exists(d):
- os.makedirs(d)
- except OSError as e:
- if not e.errno == errno.EEXIST:
- raise CreateDirectoryError(
- "Could not create directory %s: %s" % (d, e))
-
- if MD5_AVAILABLE and _can_validate_md5_with_etag(etag, response_data):
- md5 = hashlib.md5()
- else:
- md5 = None
-
- file_chunks = iter(partial(body.read, 1024 * 1024), b'')
- if is_stream:
- # Need to save the data to be able to check the etag for a stream
- # because once the data is written to the stream there is no
- # undoing it.
- payload = write_to_file(None, etag, file_chunks, md5, True)
- else:
- with open(filename, 'wb') as out_file:
- write_to_file(out_file, etag, file_chunks, md5)
-
- if md5 is not None and etag != md5.hexdigest():
- if not is_stream:
- os.remove(filename)
- raise MD5Error(filename)
-
- if not is_stream:
- last_update_tuple = last_update.timetuple()
- mod_timestamp = time.mktime(last_update_tuple)
- set_file_utime(filename, int(mod_timestamp))
- else:
- # Now write the output to stdout since the md5 is correct.
- bytes_print(payload)
- sys.stdout.flush()
-
-
-def _can_validate_md5_with_etag(etag, response_data):
- sse = response_data.get('ServerSideEncryption', None)
- sse_customer_algorithm = response_data.get('SSECustomerAlgorithm', None)
- if not _is_multipart_etag(etag) and sse != 'aws:kms' and \
- sse_customer_algorithm is None:
- return True
- return False
-
-
-def write_to_file(out_file, etag, file_chunks, md5=None, is_stream=False):
- """
- Updates the etag for each file chunk. It will write to the file if it a
- file but if it is a stream it will return a byte string to be later
- written to a stream.
- """
- body = b''
- for chunk in file_chunks:
- if md5 is not None and not _is_multipart_etag(etag):
- md5.update(chunk)
- if is_stream:
- body += chunk
- else:
- out_file.write(chunk)
- return body
-
-
-def _is_multipart_etag(etag):
- return '-' in etag
+# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
class FileInfo(object):
@@ -151,17 +60,6 @@ class FileInfo(object):
self.is_stream = is_stream
self.associated_response_data = associated_response_data
- def set_size_from_s3(self):
- """
- This runs a ``HeadObject`` on the s3 object and sets the size.
- """
- bucket, key = find_bucket_key(self.src)
- params = {'Bucket': bucket,
- 'Key': key}
- RequestParamsMapper.map_head_object_params(params, self.parameters)
- response_data = self.client.head_object(**params)
- self.size = int(response_data['ContentLength'])
-
def is_glacier_compatible(self):
"""Determines if a file info object is glacier compatible
@@ -193,97 +91,3 @@ class FileInfo(object):
# restored back to S3.
# 'Restore' looks like: 'ongoing-request="false", expiry-date="..."'
return 'ongoing-request="false"' in response_data.get('Restore', '')
-
- def upload(self, payload=None):
- """
- Redirects the file to the multipart upload function if the file is
- large. If it is small enough, it puts the file as an object in s3.
- """
- if payload:
- self._handle_upload(payload)
- else:
- with open(self.src, 'rb') as body:
- self._handle_upload(body)
-
- def _handle_upload(self, body):
- bucket, key = find_bucket_key(self.dest)
- params = {
- 'Bucket': bucket,
- 'Key': key,
- 'Body': body,
- }
- self._inject_content_type(params)
- RequestParamsMapper.map_put_object_params(params, self.parameters)
- response_data = self.client.put_object(**params)
-
- def _inject_content_type(self, params):
- if not self.parameters['guess_mime_type']:
- return
- filename = self.src
- # Add a content type param if we can guess the type.
- guessed_type = guess_content_type(filename)
- if guessed_type is not None:
- params['ContentType'] = guessed_type
-
- def download(self):
- """
- Redirects the file to the multipart download function if the file is
- large. If it is small enough, it gets the file as an object from s3.
- """
- bucket, key = find_bucket_key(self.src)
- params = {'Bucket': bucket, 'Key': key}
- RequestParamsMapper.map_get_object_params(params, self.parameters)
- response_data = self.client.get_object(**params)
- save_file(self.dest, response_data, self.last_update,
- self.is_stream)
-
- def copy(self):
- """
- Copies a object in s3 to another location in s3.
- """
- source_bucket, source_key = find_bucket_key(self.src)
- copy_source = {'Bucket': source_bucket, 'Key': source_key}
- bucket, key = find_bucket_key(self.dest)
- params = {'Bucket': bucket,
- 'CopySource': copy_source, 'Key': key}
- self._inject_content_type(params)
- RequestParamsMapper.map_copy_object_params(params, self.parameters)
- response_data = self.client.copy_object(**params)
-
- def delete(self):
- """
- Deletes the file from s3 or local. The src file and type is used
- from the file info object.
- """
- if self.src_type == 's3':
- bucket, key = find_bucket_key(self.src)
- params = {'Bucket': bucket, 'Key': key}
- self.source_client.delete_object(**params)
- else:
- os.remove(self.src)
-
- def move(self):
- """
- Implements a move command for s3.
- """
- src = self.src_type
- dest = self.dest_type
- if src == 'local' and dest == 's3':
- self.upload()
- elif src == 's3' and dest == 's3':
- self.copy()
- elif src == 's3' and dest == 'local':
- self.download()
- else:
- raise Exception("Invalid path arguments for mv")
- self.delete()
-
- def create_multipart_upload(self):
- bucket, key = find_bucket_key(self.dest)
- params = {'Bucket': bucket, 'Key': key}
- self._inject_content_type(params)
- RequestParamsMapper.map_create_multipart_upload_params(
- params, self.parameters)
- response_data = self.client.create_multipart_upload(**params)
- upload_id = response_data['UploadId']
- return upload_id
diff --git a/awscli/customizations/s3/results.py b/awscli/customizations/s3/results.py
index f8f62a6..a6ef589 100644
--- a/awscli/customizations/s3/results.py
+++ b/awscli/customizations/s3/results.py
@@ -21,7 +21,6 @@ from s3transfer.exceptions import FatalError
from s3transfer.subscribers import BaseSubscriber
from awscli.compat import queue
-from awscli.customizations.s3.executor import ShutdownThreadRequest
from awscli.customizations.s3.utils import relative_path
from awscli.customizations.s3.utils import human_readable_size
from awscli.customizations.s3.utils import uni_print
@@ -70,10 +69,14 @@ FinalTotalSubmissionsResult = namedtuple(
'FinalTotalSubmissionsResult', ['total_submissions'])
+class ShutdownThreadRequest(object):
+ pass
+
+
class BaseResultSubscriber(OnDoneFilteredSubscriber):
TRANSFER_TYPE = None
- def __init__(self, result_queue):
+ def __init__(self, result_queue, transfer_type=None):
"""Subscriber to send result notifications during transfer process
:param result_queue: The queue to place results to be processed later
@@ -81,6 +84,9 @@ class BaseResultSubscriber(OnDoneFilteredSubscriber):
"""
self._result_queue = result_queue
self._result_kwargs_cache = {}
+ self._transfer_type = transfer_type
+ if transfer_type is None:
+ self._transfer_type = self.TRANSFER_TYPE
def on_queued(self, future, **kwargs):
self._add_to_result_kwargs_cache(future)
@@ -111,7 +117,7 @@ class BaseResultSubscriber(OnDoneFilteredSubscriber):
def _add_to_result_kwargs_cache(self, future):
src, dest = self._get_src_dest(future)
result_kwargs = {
- 'transfer_type': self.TRANSFER_TYPE,
+ 'transfer_type': self._transfer_type,
'src': src,
'dest': dest,
'total_transfer_size': future.meta.size
diff --git a/awscli/customizations/s3/s3handler.py b/awscli/customizations/s3/s3handler.py
index e1513f6..8fc4786 100644
--- a/awscli/customizations/s3/s3handler.py
+++ b/awscli/customizations/s3/s3handler.py
@@ -11,18 +11,14 @@
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import logging
-import math
import os
from s3transfer.manager import TransferManager
from awscli.customizations.s3.utils import (
- find_chunksize, human_readable_size,
- MAX_UPLOAD_SIZE, find_bucket_key, relative_path, PrintTask, create_warning,
- NonSeekableStream)
-from awscli.customizations.s3.executor import Executor
-from awscli.customizations.s3 import tasks
-from awscli.customizations.s3.transferconfig import RuntimeConfig, \
+ human_readable_size, MAX_UPLOAD_SIZE, find_bucket_key, relative_path,
+ create_warning, NonSeekableStream)
+from awscli.customizations.s3.transferconfig import \
create_transfer_config_from_runtime_config
from awscli.customizations.s3.results import UploadResultSubscriber
from awscli.customizations.s3.results import DownloadResultSubscriber
@@ -33,7 +29,6 @@ from awscli.customizations.s3.results import DeleteResultSubscriber
from awscli.customizations.s3.results import QueuedResult
from awscli.customizations.s3.results import SuccessResult
from awscli.customizations.s3.results import FailureResult
-from awscli.customizations.s3.results import CommandResult
from awscli.customizations.s3.results import DryRunResult
from awscli.customizations.s3.results import ResultRecorder
from awscli.customizations.s3.results import ResultPrinter
@@ -47,356 +42,15 @@ from awscli.customizations.s3.utils import ProvideUploadContentTypeSubscriber
from awscli.customizations.s3.utils import ProvideCopyContentTypeSubscriber
from awscli.customizations.s3.utils import ProvideLastModifiedTimeSubscriber
from awscli.customizations.s3.utils import DirectoryCreatorSubscriber
-from awscli.compat import queue
+from awscli.customizations.s3.utils import DeleteSourceFileSubscriber
+from awscli.customizations.s3.utils import DeleteSourceObjectSubscriber
+from awscli.customizations.s3.utils import DeleteCopySourceObjectSubscriber
from awscli.compat import binary_stdin
LOGGER = logging.getLogger(__name__)
-class BaseS3Handler(object):
- def __init__(self, session, params, result_queue=None,
- runtime_config=None):
- self.session = session
- if runtime_config is None:
- runtime_config = RuntimeConfig.defaults()
- self._runtime_config = runtime_config
- self.result_queue = result_queue
- if not self.result_queue:
- self.result_queue = queue.Queue()
-
- self.params = {
- 'dryrun': False, 'quiet': False, 'acl': None,
- 'guess_mime_type': True, 'sse_c_copy_source': None,
- 'sse_c_copy_source_key': None, 'sse': None,
- 'sse_c': None, 'sse_c_key': None, 'sse_kms_key_id': None,
- 'storage_class': None, 'website_redirect': None,
- 'content_type': None, 'cache_control': None,
- 'content_disposition': None, 'content_encoding': None,
- 'content_language': None, 'expires': None, 'grants': None,
- 'only_show_errors': False, 'is_stream': False,
- 'paths_type': None, 'expected_size': None, 'metadata': None,
- 'metadata_directive': None, 'ignore_glacier_warnings': False,
- 'force_glacier_transfer': False
- }
- self.params['region'] = params['region']
- for key in self.params.keys():
- if key in params:
- self.params[key] = params[key]
-
-
-class S3Handler(BaseS3Handler):
- """
- This class sets up the process to perform the tasks sent to it. It
- sources the ``self.executor`` from which threads inside the
- class pull tasks from to complete.
- """
- MAX_IO_QUEUE_SIZE = 20
-
- def __init__(self, session, params, result_queue=None,
- runtime_config=None):
- super(S3Handler, self).__init__(
- session, params, result_queue, runtime_config)
- # The write_queue has potential for optimizations, so the constant
- # for maxsize is scoped to this class (as opposed to constants.py)
- # so we have the ability to change this value later.
- self.write_queue = queue.Queue(maxsize=self.MAX_IO_QUEUE_SIZE)
- self.multi_threshold = self._runtime_config['multipart_threshold']
- self.chunksize = self._runtime_config['multipart_chunksize']
- LOGGER.debug("Using a multipart threshold of %s and a part size of %s",
- self.multi_threshold, self.chunksize)
- self.executor = Executor(
- num_threads=self._runtime_config['max_concurrent_requests'],
- result_queue=self.result_queue,
- quiet=self.params['quiet'],
- only_show_errors=self.params['only_show_errors'],
- max_queue_size=self._runtime_config['max_queue_size'],
- write_queue=self.write_queue
- )
- self._multipart_uploads = []
- self._multipart_downloads = []
-
- def call(self, files):
- """
- This function pulls a ``FileInfo`` or ``TaskInfo`` object from
- a list ``files``. Each object is then deemed if it will be a
- multipart operation and add the necessary attributes if so. Each
- object is then wrapped with a ``BasicTask`` object which is
- essentially a thread of execution for a thread to follow. These
- tasks are then submitted to the main executor.
- """
- try:
- self.executor.start()
- total_files, total_parts = self._enqueue_tasks(files)
- self.executor.print_thread.set_total_files(total_files)
- self.executor.print_thread.set_total_parts(total_parts)
- self.executor.initiate_shutdown()
- self._finalize_shutdown()
- except Exception as e:
- LOGGER.debug('Exception caught during task execution: %s',
- str(e), exc_info=True)
- self.result_queue.put(PrintTask(message=str(e), error=True))
- self.executor.initiate_shutdown(
- priority=self.executor.IMMEDIATE_PRIORITY)
- self._finalize_shutdown()
- except KeyboardInterrupt:
- self.result_queue.put(PrintTask(message=("Cleaning up. "
- "Please wait..."),
- error=True))
- self.executor.initiate_shutdown(
- priority=self.executor.IMMEDIATE_PRIORITY)
- self._finalize_shutdown()
- return CommandResult(self.executor.num_tasks_failed,
- self.executor.num_tasks_warned)
-
- def _finalize_shutdown(self):
- # Run all remaining tasks needed to completely shutdown the
- # S3 handler. This method will block until shutdown is complete.
- # The order here is important. We need to wait until all the
- # tasks have been completed before we can cleanup. Otherwise
- # we can have race conditions where we're trying to cleanup
- # uploads/downloads that are still in progress.
- self.executor.wait_until_shutdown()
- self._cleanup()
-
- def _cleanup(self):
- # And finally we need to make a pass through all the existing
- # multipart uploads and abort any pending multipart uploads.
- self._abort_pending_multipart_uploads()
- self._remove_pending_downloads()
-
- def _abort_pending_multipart_uploads(self):
- # precondition: this method is assumed to be called when there are no ongoing
- # uploads (the executor has been shutdown).
- for upload, filename in self._multipart_uploads:
- if upload.is_cancelled() or upload.in_progress():
- # Cancel any upload that's not unstarted and not complete.
- upload.cancel_upload(self._cancel_upload, args=(filename,))
-
- def _remove_pending_downloads(self):
- # The downloads case is easier than the uploads case because we don't
- # need to make any service calls. To properly cleanup we just need
- # to go through the multipart downloads that were in progress but
- # cancelled and remove the local file.
- for context, local_filename in self._multipart_downloads:
- if (context.is_cancelled() or context.is_started()) and \
- os.path.exists(local_filename):
- # The file is in an inconsistent state (not all the parts
- # were written to the file) so we should remove the
- # local file rather than leave it in a bad state. We don't
- # want to remove the files if the download has *not* been
- # started because we haven't touched the file yet, so it's
- # better to leave the old version of the file rather than
- # deleting the file entirely.
- os.remove(local_filename)
- context.cancel()
-
- def _cancel_upload(self, upload_id, filename):
- bucket, key = find_bucket_key(filename.dest)
- params = {
- 'Bucket': bucket,
- 'Key': key,
- 'UploadId': upload_id,
- }
- LOGGER.debug("Aborting multipart upload for: %s", key)
- filename.client.abort_multipart_upload(**params)
... 1461 lines suppressed ...
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/awscli.git
More information about the Python-modules-commits
mailing list