[Python-modules-commits] [awscli] 01/05: New upstream version 1.11.13

Takaki Taniguchi takaki at moszumanska.debian.org
Thu Nov 10 10:55:20 UTC 2016


This is an automated email from the git hooks/post-receive script.

takaki pushed a commit to branch master
in repository awscli.

commit 3459a23b5bb3950729ff2953064cc98480fcf0a8
Author: TANIGUCHI Takaki <takaki at asis.media-as.org>
Date:   Thu Nov 10 19:44:54 2016 +0900

    New upstream version 1.11.13
---
 PKG-INFO                                      |   2 +-
 awscli.egg-info/PKG-INFO                      |   2 +-
 awscli.egg-info/SOURCES.txt                   |   2 -
 awscli.egg-info/requires.txt                  |   4 +-
 awscli/__init__.py                            |   2 +-
 awscli/customizations/cloudtrail/subscribe.py |   6 +-
 awscli/customizations/cloudtrail/utils.py     |   8 +-
 awscli/customizations/s3/executor.py          | 366 ------------
 awscli/customizations/s3/fileinfo.py          | 220 +-------
 awscli/customizations/s3/results.py           |  12 +-
 awscli/customizations/s3/s3handler.py         | 378 +------------
 awscli/customizations/s3/subcommands.py       |  37 +-
 awscli/customizations/s3/tasks.py             | 779 --------------------------
 awscli/customizations/s3/utils.py             | 213 ++-----
 awscli/examples/s3/sync.rst                   |   4 +-
 setup.cfg                                     |   4 +-
 setup.py                                      |   4 +-
 17 files changed, 125 insertions(+), 1918 deletions(-)

diff --git a/PKG-INFO b/PKG-INFO
index aa414d7..2463d01 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,6 +1,6 @@
 Metadata-Version: 1.1
 Name: awscli
-Version: 1.11.7
+Version: 1.11.13
 Summary: Universal Command Line Environment for AWS.
 Home-page: http://aws.amazon.com/cli/
 Author: Amazon Web Services
diff --git a/awscli.egg-info/PKG-INFO b/awscli.egg-info/PKG-INFO
index aa414d7..2463d01 100644
--- a/awscli.egg-info/PKG-INFO
+++ b/awscli.egg-info/PKG-INFO
@@ -1,6 +1,6 @@
 Metadata-Version: 1.1
 Name: awscli
-Version: 1.11.7
+Version: 1.11.13
 Summary: Universal Command Line Environment for AWS.
 Home-page: http://aws.amazon.com/cli/
 Author: Amazon Web Services
diff --git a/awscli.egg-info/SOURCES.txt b/awscli.egg-info/SOURCES.txt
index 497b202..0fa0ff7 100644
--- a/awscli.egg-info/SOURCES.txt
+++ b/awscli.egg-info/SOURCES.txt
@@ -137,7 +137,6 @@ awscli/customizations/gamelift/getlog.py
 awscli/customizations/gamelift/uploadbuild.py
 awscli/customizations/s3/__init__.py
 awscli/customizations/s3/comparator.py
-awscli/customizations/s3/executor.py
 awscli/customizations/s3/fileformat.py
 awscli/customizations/s3/filegenerator.py
 awscli/customizations/s3/fileinfo.py
@@ -147,7 +146,6 @@ awscli/customizations/s3/results.py
 awscli/customizations/s3/s3.py
 awscli/customizations/s3/s3handler.py
 awscli/customizations/s3/subcommands.py
-awscli/customizations/s3/tasks.py
 awscli/customizations/s3/transferconfig.py
 awscli/customizations/s3/utils.py
 awscli/customizations/s3/syncstrategy/__init__.py
diff --git a/awscli.egg-info/requires.txt b/awscli.egg-info/requires.txt
index 56ca809..9221e08 100644
--- a/awscli.egg-info/requires.txt
+++ b/awscli.egg-info/requires.txt
@@ -1,8 +1,8 @@
-botocore==1.4.64
+botocore==1.4.70
 colorama>=0.2.5,<=0.3.7
 docutils>=0.10
 rsa>=3.1.2,<=3.5.0
-s3transfer>=0.1.5,<0.2.0
+s3transfer>=0.1.9,<0.2.0
 
 [:python_version=="2.6"]
 argparse>=1.1
\ No newline at end of file
diff --git a/awscli/__init__.py b/awscli/__init__.py
index 7e21cbe..cd86c03 100644
--- a/awscli/__init__.py
+++ b/awscli/__init__.py
@@ -17,7 +17,7 @@ A Universal Command Line Environment for Amazon Web Services.
 """
 import os
 
-__version__ = '1.11.7'
+__version__ = '1.11.13'
 
 #
 # Get our data path to be added to botocore's search path
diff --git a/awscli/customizations/cloudtrail/subscribe.py b/awscli/customizations/cloudtrail/subscribe.py
index bfe8be6..7028491 100644
--- a/awscli/customizations/cloudtrail/subscribe.py
+++ b/awscli/customizations/cloudtrail/subscribe.py
@@ -80,7 +80,7 @@ class CloudTrailSubscribe(BasicCommand):
 
         # Initialize services
         LOG.debug('Initializing S3, SNS and CloudTrail...')
-        self.iam = self._session.create_client('iam', **client_args)
+        self.sts = self._session.create_client('sts', **client_args)
         self.s3 = self._session.create_client('s3', **client_args)
         self.sns = self._session.create_client('sns', **client_args)
         self.region_name = self.s3.meta.region_name
@@ -187,7 +187,7 @@ class CloudTrailSubscribe(BasicCommand):
         sys.stdout.write(
             'Setting up new S3 bucket {bucket}...\n'.format(bucket=bucket))
 
-        account_id = get_account_id(self.iam)
+        account_id = get_account_id(self.sts)
 
         # Clean up the prefix - it requires a trailing slash if set
         if prefix and not prefix.endswith('/'):
@@ -239,7 +239,7 @@ class CloudTrailSubscribe(BasicCommand):
         sys.stdout.write(
             'Setting up new SNS topic {topic}...\n'.format(topic=topic))
 
-        account_id = get_account_id(self.iam)
+        account_id = get_account_id(self.sts)
 
         # Make sure topic doesn't already exist
         # Warn but do not fail if ListTopics permissions
diff --git a/awscli/customizations/cloudtrail/utils.py b/awscli/customizations/cloudtrail/utils.py
index e886f01..2d9495f 100644
--- a/awscli/customizations/cloudtrail/utils.py
+++ b/awscli/customizations/cloudtrail/utils.py
@@ -17,10 +17,10 @@ def get_account_id_from_arn(trail_arn):
     return trail_arn.split(':')[4]
 
 
-def get_account_id(iam_client):
-    """Retrieve the AWS account ID for the authenticated user"""
-    response = iam_client.get_user()
-    return get_account_id_from_arn(response['User']['Arn'])
+def get_account_id(sts_client):
+    """Retrieve the AWS account ID for the authenticated user or role"""
+    response = sts_client.get_caller_identity()
+    return response['Account']
 
 
 def get_trail_by_arn(cloudtrail_client, trail_arn):
diff --git a/awscli/customizations/s3/executor.py b/awscli/customizations/s3/executor.py
deleted file mode 100644
index 758297d..0000000
--- a/awscli/customizations/s3/executor.py
+++ /dev/null
@@ -1,366 +0,0 @@
-# Copyright 2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"). You
-# may not use this file except in compliance with the License. A copy of
-# the License is located at
-#
-#     http://aws.amazon.com/apache2.0/
-#
-# or in the "license" file accompanying this file. This file is
-# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
-# ANY KIND, either express or implied. See the License for the specific
-# language governing permissions and limitations under the License.
-import os
-import logging
-import sys
-import threading
-
-from awscli.customizations.s3.utils import uni_print, \
-    IORequest, IOCloseRequest, StablePriorityQueue, set_file_utime
-from awscli.customizations.s3.tasks import OrderableTask
-from awscli.compat import queue, bytes_print
-
-
-LOGGER = logging.getLogger(__name__)
-
-
-class ShutdownThreadRequest(OrderableTask):
-    PRIORITY = 11
-
-    def __init__(self, priority_override=None):
-        if priority_override is not None:
-            self.PRIORITY = priority_override
-
-
-class Executor(object):
-    """
-    This class is in charge of all of the threads.  It starts up the threads
-    and cleans up the threads when finished.  The two type of threads the
-    ``Executor``runs is a worker and a print thread.
-    """
-    STANDARD_PRIORITY = 11
-    IMMEDIATE_PRIORITY = 1
-
-    def __init__(self, num_threads, result_queue, quiet,
-                 only_show_errors, max_queue_size, write_queue):
-        self._max_queue_size = max_queue_size
-        LOGGER.debug("Using max queue size for s3 tasks of: %s",
-                     self._max_queue_size)
-        self.queue = StablePriorityQueue(maxsize=self._max_queue_size,
-                                         max_priority=20)
-        self.num_threads = num_threads
-        self.result_queue = result_queue
-        self.quiet = quiet
-        self.only_show_errors = only_show_errors
-        self.threads_list = []
-        self.write_queue = write_queue
-        self.print_thread = PrintThread(self.result_queue, self.quiet,
-                                        self.only_show_errors)
-        self.print_thread.daemon = True
-        self.io_thread = IOWriterThread(self.write_queue)
-
-    @property
-    def num_tasks_failed(self):
-        tasks_failed = 0
-        if self.print_thread is not None:
-            tasks_failed = self.print_thread.num_errors_seen
-        return tasks_failed
-
-    @property
-    def num_tasks_warned(self):
-        tasks_warned = 0
-        if self.print_thread is not None:
-            tasks_warned = self.print_thread.num_warnings_seen
-        return tasks_warned
-
-    def start(self):
-        self.io_thread.start()
-        # Note that we're *not* adding the IO thread to the threads_list.
-        # There's a specific shutdown order we need and we're going to be
-        # explicit about it rather than relying on the threads_list order.
-        # See .join() for more info.
-        self.print_thread.start()
-        LOGGER.debug("Using a threadpool size of: %s", self.num_threads)
-        for i in range(self.num_threads):
-            worker = Worker(queue=self.queue)
-            worker.setDaemon(True)
-            self.threads_list.append(worker)
-            worker.start()
-
-    def submit(self, task):
-        """
-        This is the function used to submit a task to the ``Executor``.
-        """
-        LOGGER.debug("Submitting task: %s", task)
-        self.queue.put(task)
-
-    def initiate_shutdown(self, priority=STANDARD_PRIORITY):
-        """Instruct all threads to shutdown.
-
-        This is a graceful shutdown.  It will wait until all
-        currently queued tasks have been completed before the threads
-        shutdown.  If the task queue is completely full, it may
-        take a while for the threads to shutdown.
-
-        This method does not block.  Once ``initiate_shutdown`` has
-        been called, you can all ``wait_until_shutdown`` to block
-        until the Executor has been shutdown.
-
-        """
-        # Implementation detail:  we only queue the worker threads
-        # to shutdown.  The print/io threads are shutdown in the
-        # ``wait_until_shutdown`` method.
-        for i in range(self.num_threads):
-            LOGGER.debug(
-                "Queueing end sentinel for worker thread (priority: %s)",
-                priority)
-            self.queue.put(ShutdownThreadRequest(priority))
-
-    def wait_until_shutdown(self):
-        """Block until the Executor is fully shutdown.
-
-        This will wait until all worker threads are shutdown, along
-        with any additional helper threads used by the executor.
-
-        """
-        for thread in self.threads_list:
-            LOGGER.debug("Waiting for thread to shutdown: %s", thread)
-            while True:
-                thread.join(timeout=1)
-                if not thread.is_alive():
-                    break
-            LOGGER.debug("Thread has been shutdown: %s", thread)
-
-        LOGGER.debug("Queueing end sentinel for result thread.")
-        self.result_queue.put(ShutdownThreadRequest())
-        LOGGER.debug("Queueing end sentinel for IO thread.")
-        self.write_queue.put(ShutdownThreadRequest())
-
-        LOGGER.debug("Waiting for result thread to shutdown.")
-        self.print_thread.join()
-        LOGGER.debug("Waiting for IO thread to shutdown.")
-        self.io_thread.join()
-        LOGGER.debug("All threads have been shutdown.")
-
-
-class IOWriterThread(threading.Thread):
-    def __init__(self, queue):
-        threading.Thread.__init__(self)
-        self.queue = queue
-        self.fd_descriptor_cache = {}
-
-    def run(self):
-        while True:
-            task = self.queue.get(True)
-            if isinstance(task, ShutdownThreadRequest):
-                LOGGER.debug("Shutdown request received in IO thread, "
-                             "shutting down.")
-                self._cleanup()
-                return
-            try:
-                self._handle_task(task)
-            except Exception as e:
-                LOGGER.debug(
-                    "Error processing IO request: %s", e, exc_info=True)
-
-    def _handle_task(self, task):
-        if isinstance(task, IORequest):
-            filename, offset, data, is_stream = task
-            if is_stream:
-                self._handle_stream_task(data)
-            else:
-                self._handle_file_write_task(filename, offset, data)
-        elif isinstance(task, IOCloseRequest):
-            self._handle_io_close_request(task)
-
-    def _handle_io_close_request(self, task):
-        LOGGER.debug("IOCloseRequest received for %s, closing file.",
-                     task.filename)
-        fileobj = self.fd_descriptor_cache.get(task.filename)
-        if fileobj is not None:
-            fileobj.close()
-            del self.fd_descriptor_cache[task.filename]
-        if task.desired_mtime is not None:
-            set_file_utime(task.filename, task.desired_mtime)
-
-    def _handle_stream_task(self, data):
-        fileobj = sys.stdout
-        bytes_print(data)
-        fileobj.flush()
-
-    def _handle_file_write_task(self, filename, offset, data):
-        fileobj = self.fd_descriptor_cache.get(filename)
-        if fileobj is None:
-            fileobj = open(filename, 'rb+')
-            self.fd_descriptor_cache[filename] = fileobj
-        LOGGER.debug("Writing data to: %s, offset: %s",
-                     filename, offset)
-        fileobj.seek(offset)
-        fileobj.write(data)
-
-    def _cleanup(self):
-        for fileobj in self.fd_descriptor_cache.values():
-            fileobj.close()
-
-
-class Worker(threading.Thread):
-    """
-    This thread is in charge of performing the tasks provided via
-    the main queue ``queue``.
-    """
-    def __init__(self, queue):
-        threading.Thread.__init__(self)
-        # This is the queue where work (tasks) are submitted.
-        self.queue = queue
-
-    def run(self):
-        while True:
-            try:
-                function = self.queue.get(True)
-                if isinstance(function, ShutdownThreadRequest):
-                    LOGGER.debug("Shutdown request received in worker thread, "
-                                 "shutting down worker thread.")
-                    break
-                try:
-                    LOGGER.debug("Worker thread invoking task: %s", function)
-                    function()
-                except Exception as e:
-                    LOGGER.debug('Error calling task: %s', e, exc_info=True)
-            except queue.Empty:
-                pass
-
-
-class PrintThread(threading.Thread):
-    """
-    This thread controls the printing of results.  When a task is
-    completely finished it is permanently write the result to standard
-    out. Otherwise, it is a part of a multipart upload/download and
-    only shows the most current part upload/download.
-
-    Result Queue
-    ------------
-
-    Result queue items are PrintTask objects that have the following
-    attributes:
-
-        * message: An arbitrary string associated with the entry.   This
-            can be used to communicate the result of the task.
-        * error: Boolean indicating whether or not the task completely
-            successfully.
-        * total_parts: The total number of parts for multipart transfers (
-            deprecated, will be removed in the future).
-        * warning: Boolean indicating whether or not a file generated a
-            warning.
-
-    """
-    def __init__(self, result_queue, quiet, only_show_errors):
-        threading.Thread.__init__(self)
-        self._progress_dict = {}
-        self._result_queue = result_queue
-        self._quiet = quiet
-        self._only_show_errors = only_show_errors
-        self._progress_length = 0
-        self._num_parts = 0
-        self._file_count = 0
-        self._lock = threading.Lock()
-        self._needs_newline = False
-
-        self._total_parts = '...'
-        self._total_files = '...'
-
-        # This is a public attribute that clients can inspect to determine
-        # whether or not we saw any results indicating that an error occurred.
-        self.num_errors_seen = 0
-        self.num_warnings_seen = 0
-
-    def set_total_parts(self, total_parts):
-        with self._lock:
-            self._total_parts = total_parts
-
-    def set_total_files(self, total_files):
-        with self._lock:
-            self._total_files = total_files
-
-    def run(self):
-        while True:
-            try:
-                print_task = self._result_queue.get(True)
-                if isinstance(print_task, ShutdownThreadRequest):
-                    if self._needs_newline:
-                        sys.stdout.write('\n')
-                    LOGGER.debug("Shutdown request received in print thread, "
-                                 "shutting down print thread.")
-                    break
-                LOGGER.debug("Received print task: %s", print_task)
-                try:
-                    self._process_print_task(print_task)
-                except Exception as e:
-                    LOGGER.debug("Error processing print task: %s", e,
-                                 exc_info=True)
-            except queue.Empty:
-                pass
-
-    def _process_print_task(self, print_task):
-        print_str = print_task.message
-        print_to_stderr = False
-        if print_task.error:
-            self.num_errors_seen += 1
-            print_to_stderr = True
-
-        final_str = ''
-        if print_task.warning:
-            self.num_warnings_seen += 1
-            print_to_stderr = True
-            final_str += print_str.ljust(self._progress_length, ' ')
-            final_str += '\n'
-        elif print_task.total_parts:
-            # Normalize keys so failures and sucess
-            # look the same.
-            op_list = print_str.split(':')
-            print_str = ':'.join(op_list[1:])
-            total_part = print_task.total_parts
-            self._num_parts += 1
-            if print_str in self._progress_dict:
-                self._progress_dict[print_str]['parts'] += 1
-            else:
-                self._progress_dict[print_str] = {}
-                self._progress_dict[print_str]['parts'] = 1
-                self._progress_dict[print_str]['total'] = total_part
-        else:
-            print_components = print_str.split(':')
-            final_str += print_str.ljust(self._progress_length, ' ')
-            final_str += '\n'
-            key = ':'.join(print_components[1:])
-            if key in self._progress_dict:
-                self._progress_dict.pop(print_str, None)
-            else:
-                self._num_parts += 1
-            self._file_count += 1
-
-        # If the message is an error or warning, print it to standard error.
-        if print_to_stderr and not self._quiet:
-            uni_print(final_str, sys.stderr)
-            final_str = ''
-
-        is_done = self._total_files == self._file_count
-        if not is_done:
-            final_str += self._make_progress_bar()
-        if not (self._quiet or self._only_show_errors):
-            uni_print(final_str)
-            self._needs_newline = not final_str.endswith('\n')
-
-    def _make_progress_bar(self):
-        """Creates the progress bar string to print out."""
-
-        prog_str = "Completed %s " % self._num_parts
-        num_files = self._total_files
-        if self._total_files != '...':
-            prog_str += "of %s " % self._total_parts
-            num_files = self._total_files - self._file_count
-        prog_str += "part(s) with %s file(s) remaining" % \
-            num_files
-        length_prog = len(prog_str)
-        prog_str += '\r'
-        prog_str = prog_str.ljust(self._progress_length, ' ')
-        self._progress_length = length_prog
-        return prog_str
diff --git a/awscli/customizations/s3/fileinfo.py b/awscli/customizations/s3/fileinfo.py
index c0e0a9c..66b8c85 100644
--- a/awscli/customizations/s3/fileinfo.py
+++ b/awscli/customizations/s3/fileinfo.py
@@ -1,106 +1,15 @@
-import os
-import logging
-import sys
-import time
-from functools import partial
-import errno
-import hashlib
-
-from botocore.compat import MD5_AVAILABLE
-from awscli.customizations.s3.utils import (
-    find_bucket_key, guess_content_type, CreateDirectoryError, MD5Error,
-    bytes_print, set_file_utime, RequestParamsMapper)
-from awscli.compat import bytes_print
-
-
-LOGGER = logging.getLogger(__name__)
-
-
-def read_file(filename):
-    """
-    This reads the file into a form that can be sent to S3
-    """
-    with open(filename, 'rb') as in_file:
-        return in_file.read()
-
-
-def save_file(filename, response_data, last_update, is_stream=False):
-    """
-    This writes to the file upon downloading.  It reads the data in the
-    response.  Makes a new directory if needed and then writes the
-    data to the file.  It also modifies the last modified time to that
-    of the S3 object.
-    """
-    body = response_data['Body']
-    etag = response_data['ETag'][1:-1]
-    if not is_stream:
-        d = os.path.dirname(filename)
-        try:
-            if not os.path.exists(d):
-                os.makedirs(d)
-        except OSError as e:
-            if not e.errno == errno.EEXIST:
-                raise CreateDirectoryError(
-                    "Could not create directory %s: %s" % (d, e))
-
-    if MD5_AVAILABLE and _can_validate_md5_with_etag(etag, response_data):
-        md5 = hashlib.md5()
-    else:
-        md5 = None
-
-    file_chunks = iter(partial(body.read, 1024 * 1024), b'')
-    if is_stream:
-        # Need to save the data to be able to check the etag for a stream
-        # because once the data is written to the stream there is no
-        # undoing it.
-        payload = write_to_file(None, etag, file_chunks, md5, True)
-    else:
-        with open(filename, 'wb') as out_file:
-            write_to_file(out_file, etag, file_chunks, md5)
-
-    if md5 is not None and etag != md5.hexdigest():
-        if not is_stream:
-            os.remove(filename)
-        raise MD5Error(filename)
-
-    if not is_stream:
-        last_update_tuple = last_update.timetuple()
-        mod_timestamp = time.mktime(last_update_tuple)
-        set_file_utime(filename, int(mod_timestamp))
-    else:
-        # Now write the output to stdout since the md5 is correct.
-        bytes_print(payload)
-        sys.stdout.flush()
-
-
-def _can_validate_md5_with_etag(etag, response_data):
-    sse = response_data.get('ServerSideEncryption', None)
-    sse_customer_algorithm = response_data.get('SSECustomerAlgorithm', None)
-    if not _is_multipart_etag(etag) and sse != 'aws:kms' and \
-            sse_customer_algorithm is None:
-        return True
-    return False
-
-
-def write_to_file(out_file, etag, file_chunks, md5=None, is_stream=False):
-    """
-    Updates the etag for each file chunk.  It will write to the file if it a
-    file but if it is a stream it will return a byte string to be later
-    written to a stream.
-    """
-    body = b''
-    for chunk in file_chunks:
-        if md5 is not None and not _is_multipart_etag(etag):
-            md5.update(chunk)
-        if is_stream:
-            body += chunk
-        else:
-            out_file.write(chunk)
-    return body
-
-
-def _is_multipart_etag(etag):
-    return '-' in etag
+# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+#     http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
 
 
 class FileInfo(object):
@@ -151,17 +60,6 @@ class FileInfo(object):
         self.is_stream = is_stream
         self.associated_response_data = associated_response_data
 
-    def set_size_from_s3(self):
-        """
-        This runs a ``HeadObject`` on the s3 object and sets the size.
-        """
-        bucket, key = find_bucket_key(self.src)
-        params = {'Bucket': bucket,
-                  'Key': key}
-        RequestParamsMapper.map_head_object_params(params, self.parameters)
-        response_data = self.client.head_object(**params)
-        self.size = int(response_data['ContentLength'])
-
     def is_glacier_compatible(self):
         """Determines if a file info object is glacier compatible
 
@@ -193,97 +91,3 @@ class FileInfo(object):
         # restored back to S3.
         # 'Restore' looks like: 'ongoing-request="false", expiry-date="..."'
         return 'ongoing-request="false"' in response_data.get('Restore', '')
-
-    def upload(self, payload=None):
-        """
-        Redirects the file to the multipart upload function if the file is
-        large.  If it is small enough, it puts the file as an object in s3.
-        """
-        if payload:
-            self._handle_upload(payload)
-        else:
-            with open(self.src, 'rb') as body:
-                self._handle_upload(body)
-
-    def _handle_upload(self, body):
-        bucket, key = find_bucket_key(self.dest)
-        params = {
-            'Bucket': bucket,
-            'Key': key,
-            'Body': body,
-        }
-        self._inject_content_type(params)
-        RequestParamsMapper.map_put_object_params(params, self.parameters)
-        response_data = self.client.put_object(**params)
-
-    def _inject_content_type(self, params):
-        if not self.parameters['guess_mime_type']:
-            return
-        filename = self.src
-        # Add a content type param if we can guess the type.
-        guessed_type = guess_content_type(filename)
-        if guessed_type is not None:
-            params['ContentType'] = guessed_type
-
-    def download(self):
-        """
-        Redirects the file to the multipart download function if the file is
-        large.  If it is small enough, it gets the file as an object from s3.
-        """
-        bucket, key = find_bucket_key(self.src)
-        params = {'Bucket': bucket, 'Key': key}
-        RequestParamsMapper.map_get_object_params(params, self.parameters)
-        response_data = self.client.get_object(**params)
-        save_file(self.dest, response_data, self.last_update,
-                  self.is_stream)
-
-    def copy(self):
-        """
-        Copies a object in s3 to another location in s3.
-        """
-        source_bucket, source_key = find_bucket_key(self.src)
-        copy_source = {'Bucket': source_bucket, 'Key': source_key}
-        bucket, key = find_bucket_key(self.dest)
-        params = {'Bucket': bucket,
-                  'CopySource': copy_source, 'Key': key}
-        self._inject_content_type(params)
-        RequestParamsMapper.map_copy_object_params(params, self.parameters)
-        response_data = self.client.copy_object(**params)
-
-    def delete(self):
-        """
-        Deletes the file from s3 or local.  The src file and type is used
-        from the file info object.
-        """
-        if self.src_type == 's3':
-            bucket, key = find_bucket_key(self.src)
-            params = {'Bucket': bucket, 'Key': key}
-            self.source_client.delete_object(**params)
-        else:
-            os.remove(self.src)
-
-    def move(self):
-        """
-        Implements a move command for s3.
-        """
-        src = self.src_type
-        dest = self.dest_type
-        if src == 'local' and dest == 's3':
-            self.upload()
-        elif src == 's3' and dest == 's3':
-            self.copy()
-        elif src == 's3' and dest == 'local':
-            self.download()
-        else:
-            raise Exception("Invalid path arguments for mv")
-        self.delete()
-
-    def create_multipart_upload(self):
-        bucket, key = find_bucket_key(self.dest)
-        params = {'Bucket': bucket, 'Key': key}
-        self._inject_content_type(params)
-        RequestParamsMapper.map_create_multipart_upload_params(
-            params, self.parameters)
-        response_data = self.client.create_multipart_upload(**params)
-        upload_id = response_data['UploadId']
-        return upload_id
diff --git a/awscli/customizations/s3/results.py b/awscli/customizations/s3/results.py
index f8f62a6..a6ef589 100644
--- a/awscli/customizations/s3/results.py
+++ b/awscli/customizations/s3/results.py
@@ -21,7 +21,6 @@ from s3transfer.exceptions import FatalError
 from s3transfer.subscribers import BaseSubscriber
 
 from awscli.compat import queue
-from awscli.customizations.s3.executor import ShutdownThreadRequest
 from awscli.customizations.s3.utils import relative_path
 from awscli.customizations.s3.utils import human_readable_size
 from awscli.customizations.s3.utils import uni_print
@@ -70,10 +69,14 @@ FinalTotalSubmissionsResult = namedtuple(
     'FinalTotalSubmissionsResult', ['total_submissions'])
 
 
+class ShutdownThreadRequest(object):
+    pass
+
+
 class BaseResultSubscriber(OnDoneFilteredSubscriber):
     TRANSFER_TYPE = None
 
-    def __init__(self, result_queue):
+    def __init__(self, result_queue, transfer_type=None):
         """Subscriber to send result notifications during transfer process
 
         :param result_queue: The queue to place results to be processed later
@@ -81,6 +84,9 @@ class BaseResultSubscriber(OnDoneFilteredSubscriber):
         """
         self._result_queue = result_queue
         self._result_kwargs_cache = {}
+        self._transfer_type = transfer_type
+        if transfer_type is None:
+            self._transfer_type = self.TRANSFER_TYPE
 
     def on_queued(self, future, **kwargs):
         self._add_to_result_kwargs_cache(future)
@@ -111,7 +117,7 @@ class BaseResultSubscriber(OnDoneFilteredSubscriber):
     def _add_to_result_kwargs_cache(self, future):
         src, dest = self._get_src_dest(future)
         result_kwargs = {
-            'transfer_type': self.TRANSFER_TYPE,
+            'transfer_type': self._transfer_type,
             'src': src,
             'dest': dest,
             'total_transfer_size': future.meta.size
diff --git a/awscli/customizations/s3/s3handler.py b/awscli/customizations/s3/s3handler.py
index e1513f6..8fc4786 100644
--- a/awscli/customizations/s3/s3handler.py
+++ b/awscli/customizations/s3/s3handler.py
@@ -11,18 +11,14 @@
 # ANY KIND, either express or implied. See the License for the specific
 # language governing permissions and limitations under the License.
 import logging
-import math
 import os
 
 from s3transfer.manager import TransferManager
 
 from awscli.customizations.s3.utils import (
-    find_chunksize, human_readable_size,
-    MAX_UPLOAD_SIZE, find_bucket_key, relative_path, PrintTask, create_warning,
-    NonSeekableStream)
-from awscli.customizations.s3.executor import Executor
-from awscli.customizations.s3 import tasks
-from awscli.customizations.s3.transferconfig import RuntimeConfig, \
+    human_readable_size, MAX_UPLOAD_SIZE, find_bucket_key, relative_path,
+    create_warning, NonSeekableStream)
+from awscli.customizations.s3.transferconfig import \
     create_transfer_config_from_runtime_config
 from awscli.customizations.s3.results import UploadResultSubscriber
 from awscli.customizations.s3.results import DownloadResultSubscriber
@@ -33,7 +29,6 @@ from awscli.customizations.s3.results import DeleteResultSubscriber
 from awscli.customizations.s3.results import QueuedResult
 from awscli.customizations.s3.results import SuccessResult
 from awscli.customizations.s3.results import FailureResult
-from awscli.customizations.s3.results import CommandResult
 from awscli.customizations.s3.results import DryRunResult
 from awscli.customizations.s3.results import ResultRecorder
 from awscli.customizations.s3.results import ResultPrinter
@@ -47,356 +42,15 @@ from awscli.customizations.s3.utils import ProvideUploadContentTypeSubscriber
 from awscli.customizations.s3.utils import ProvideCopyContentTypeSubscriber
 from awscli.customizations.s3.utils import ProvideLastModifiedTimeSubscriber
 from awscli.customizations.s3.utils import DirectoryCreatorSubscriber
-from awscli.compat import queue
+from awscli.customizations.s3.utils import DeleteSourceFileSubscriber
+from awscli.customizations.s3.utils import DeleteSourceObjectSubscriber
+from awscli.customizations.s3.utils import DeleteCopySourceObjectSubscriber
 from awscli.compat import binary_stdin
 
 
 LOGGER = logging.getLogger(__name__)
 
 
-class BaseS3Handler(object):
-    def __init__(self, session, params, result_queue=None,
-                 runtime_config=None):
-        self.session = session
-        if runtime_config is None:
-            runtime_config = RuntimeConfig.defaults()
-        self._runtime_config = runtime_config
-        self.result_queue = result_queue
-        if not self.result_queue:
-            self.result_queue = queue.Queue()
-
-        self.params = {
-            'dryrun': False, 'quiet': False, 'acl': None,
-            'guess_mime_type': True, 'sse_c_copy_source': None,
-            'sse_c_copy_source_key': None, 'sse': None,
-            'sse_c': None, 'sse_c_key': None, 'sse_kms_key_id': None,
-            'storage_class': None, 'website_redirect': None,
-            'content_type': None, 'cache_control': None,
-            'content_disposition': None, 'content_encoding': None,
-            'content_language': None, 'expires': None, 'grants': None,
-            'only_show_errors': False, 'is_stream': False,
-            'paths_type': None, 'expected_size': None, 'metadata': None,
-            'metadata_directive': None, 'ignore_glacier_warnings': False,
-            'force_glacier_transfer': False
-        }
-        self.params['region'] = params['region']
-        for key in self.params.keys():
-            if key in params:
-                self.params[key] = params[key]
-
-
-class S3Handler(BaseS3Handler):
-    """
-    This class sets up the process to perform the tasks sent to it.  It
-    sources the ``self.executor`` from which threads inside the
-    class pull tasks from to complete.
-    """
-    MAX_IO_QUEUE_SIZE = 20
-
-    def __init__(self, session, params, result_queue=None,
-                 runtime_config=None):
-        super(S3Handler, self).__init__(
-            session, params, result_queue, runtime_config)
-        # The write_queue has potential for optimizations, so the constant
-        # for maxsize is scoped to this class (as opposed to constants.py)
-        # so we have the ability to change this value later.
-        self.write_queue = queue.Queue(maxsize=self.MAX_IO_QUEUE_SIZE)
-        self.multi_threshold = self._runtime_config['multipart_threshold']
-        self.chunksize = self._runtime_config['multipart_chunksize']
-        LOGGER.debug("Using a multipart threshold of %s and a part size of %s",
-                     self.multi_threshold, self.chunksize)
-        self.executor = Executor(
-            num_threads=self._runtime_config['max_concurrent_requests'],
-            result_queue=self.result_queue,
-            quiet=self.params['quiet'],
-            only_show_errors=self.params['only_show_errors'],
-            max_queue_size=self._runtime_config['max_queue_size'],
-            write_queue=self.write_queue
-        )
-        self._multipart_uploads = []
-        self._multipart_downloads = []
-
-    def call(self, files):
-        """
-        This function pulls a ``FileInfo`` or ``TaskInfo`` object from
-        a list ``files``.  Each object is then deemed if it will be a
-        multipart operation and add the necessary attributes if so.  Each
-        object is then wrapped with a ``BasicTask`` object which is
-        essentially a thread of execution for a thread to follow.  These
-        tasks are then submitted to the main executor.
-        """
-        try:
-            self.executor.start()
-            total_files, total_parts = self._enqueue_tasks(files)
-            self.executor.print_thread.set_total_files(total_files)
-            self.executor.print_thread.set_total_parts(total_parts)
-            self.executor.initiate_shutdown()
-            self._finalize_shutdown()
-        except Exception as e:
-            LOGGER.debug('Exception caught during task execution: %s',
-                         str(e), exc_info=True)
-            self.result_queue.put(PrintTask(message=str(e), error=True))
-            self.executor.initiate_shutdown(
-                priority=self.executor.IMMEDIATE_PRIORITY)
-            self._finalize_shutdown()
-        except KeyboardInterrupt:
-            self.result_queue.put(PrintTask(message=("Cleaning up. "
-                                                     "Please wait..."),
-                                            error=True))
-            self.executor.initiate_shutdown(
-                priority=self.executor.IMMEDIATE_PRIORITY)
-            self._finalize_shutdown()
-        return CommandResult(self.executor.num_tasks_failed,
-                             self.executor.num_tasks_warned)
-
-    def _finalize_shutdown(self):
-        # Run all remaining tasks needed to completely shutdown the
-        # S3 handler.  This method will block until shutdown is complete.
-        # The order here is important.  We need to wait until all the
-        # tasks have been completed before we can cleanup.  Otherwise
-        # we can have race conditions where we're trying to cleanup
-        # uploads/downloads that are still in progress.
-        self.executor.wait_until_shutdown()
-        self._cleanup()
-
-    def _cleanup(self):
-        # And finally we need to make a pass through all the existing
-        # multipart uploads and abort any pending multipart uploads.
-        self._abort_pending_multipart_uploads()
-        self._remove_pending_downloads()
-
-    def _abort_pending_multipart_uploads(self):
-        # precondition: this method is assumed to be called when there are no ongoing
-        # uploads (the executor has been shutdown).
-        for upload, filename in self._multipart_uploads:
-            if upload.is_cancelled() or upload.in_progress():
-                # Cancel any upload that's not unstarted and not complete.
-                upload.cancel_upload(self._cancel_upload, args=(filename,))
-
-    def _remove_pending_downloads(self):
-        # The downloads case is easier than the uploads case because we don't
-        # need to make any service calls.  To properly cleanup we just need
-        # to go through the multipart downloads that were in progress but
-        # cancelled and remove the local file.
-        for context, local_filename in self._multipart_downloads:
-            if (context.is_cancelled() or context.is_started()) and \
-                    os.path.exists(local_filename):
-                # The file is in an inconsistent state (not all the parts
-                # were written to the file) so we should remove the
-                # local file rather than leave it in a bad state.  We don't
-                # want to remove the files if the download has *not* been
-                # started because we haven't touched the file yet, so it's
-                # better to leave the old version of the file rather than
-                # deleting the file entirely.
-                os.remove(local_filename)
-            context.cancel()
-
-    def _cancel_upload(self, upload_id, filename):
-        bucket, key = find_bucket_key(filename.dest)
-        params = {
-            'Bucket': bucket,
-            'Key': key,
-            'UploadId': upload_id,
-        }
-        LOGGER.debug("Aborting multipart upload for: %s", key)
-        filename.client.abort_multipart_upload(**params)
... 1461 lines suppressed ...

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/awscli.git



More information about the Python-modules-commits mailing list