[Python-modules-commits] [parallax] 01/08: Import parallax_1.0.1.orig.tar.gz

Valentin Vidic vvidic-guest at moszumanska.debian.org
Sun Nov 13 21:22:25 UTC 2016


This is an automated email from the git hooks/post-receive script.

vvidic-guest pushed a commit to branch master
in repository parallax.

commit fdf17ec903babd9f496782c607826b5482fc8247
Author: Valentin Vidic <Valentin.Vidic at CARNet.hr>
Date:   Sun Nov 13 21:55:51 2016 +0100

    Import parallax_1.0.1.orig.tar.gz
---
 AUTHORS                    |   4 +
 COPYING                    |  32 ++++
 PKG-INFO                   |  29 +++
 README.md                  |  76 ++++++++
 bin/parallax-askpass       |  11 ++
 parallax/__init__.py       | 347 +++++++++++++++++++++++++++++++++++
 parallax/askpass_client.py | 102 +++++++++++
 parallax/askpass_server.py |  99 ++++++++++
 parallax/callbacks.py      |  60 ++++++
 parallax/color.py          |  39 ++++
 parallax/manager.py        | 443 +++++++++++++++++++++++++++++++++++++++++++++
 parallax/psshutil.py       | 139 ++++++++++++++
 parallax/task.py           | 277 ++++++++++++++++++++++++++++
 parallax/version.py        |   1 +
 setup.py                   |  37 ++++
 test/test_api.py           |  86 +++++++++
 16 files changed, 1782 insertions(+)

diff --git a/AUTHORS b/AUTHORS
new file mode 100644
index 0000000..d738197
--- /dev/null
+++ b/AUTHORS
@@ -0,0 +1,4 @@
+Andrew McNabb <amcnabb at mcnabbs.org>
+Brent Chun <bnc at theether.org>
+Kristoffer Gronlund <krig at koru.se>
+Vladislav Bogdanov <bubble at hoster-ok.com>
diff --git a/COPYING b/COPYING
new file mode 100644
index 0000000..dd89bbd
--- /dev/null
+++ b/COPYING
@@ -0,0 +1,32 @@
+Copyright (c) 2009, Andrew McNabb
+Copyright (c) 2003-2008, Brent N. Chun
+
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+    * Redistributions of source code must retain the above copyright
+      notice, this list of conditions and the following disclaimer.
+
+    * Redistributions in binary form must reproduce the above
+      copyright notice, this list of conditions and the following
+      disclaimer in the documentation and/or other materials provided
+      with the distribution.
+
+    * The names of its contributors may not be used to endorse or
+      promote products derived from this software without specific
+      prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/PKG-INFO b/PKG-INFO
new file mode 100644
index 0000000..a9885d4
--- /dev/null
+++ b/PKG-INFO
@@ -0,0 +1,29 @@
+Metadata-Version: 1.1
+Name: parallax
+Version: 1.0.1
+Summary: Execute commands and copy files over SSH to multiple machines at once
+Home-page: https://github.com/krig/parallax/
+Author: Kristoffer Gronlund
+Author-email: krig at koru.se
+License: BSD
+Description: Parallax SSH provides an interface to executing commands on multiple
+        nodes at once using SSH. It also provides commands for sending and receiving files to
+        multiple nodes using SCP.
+Platform: linux
+Classifier: Development Status :: 3 - Alpha
+Classifier: Intended Audience :: System Administrators
+Classifier: License :: OSI Approved :: BSD License
+Classifier: Operating System :: POSIX
+Classifier: Programming Language :: Python
+Classifier: Programming Language :: Python :: 2
+Classifier: Programming Language :: Python :: 2.6
+Classifier: Programming Language :: Python :: 2.7
+Classifier: Programming Language :: Python :: 3
+Classifier: Programming Language :: Python :: 3.1
+Classifier: Programming Language :: Python :: 3.2
+Classifier: Programming Language :: Python :: 3.3
+Classifier: Programming Language :: Python :: 3.4
+Classifier: Topic :: Software Development :: Libraries :: Python Modules
+Classifier: Topic :: System :: Clustering
+Classifier: Topic :: System :: Networking
+Classifier: Topic :: System :: Systems Administration
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..beb5620
--- /dev/null
+++ b/README.md
@@ -0,0 +1,76 @@
+# Parallax SSH
+
+Parallax SSH is a fork of [Parallel SSH][pssh] which focuses less on
+command-line tools and more on providing a flexible and programmable
+API that can be used by Python application developers to perform SSH
+operations across multiple machines.
+
+## Installation
+
+Parallax intends to be compatible with Python 2.6 and above (including
+Python 3.1 and greater), but is primarily tested with Python 2.7.
+
+Installation requires setuptools or ez_setup.py. The latter can be
+downloaded [here][ez].
+
+Once those requirements are fulfilled, installation is as simple as:
+
+    # sudo python setup.py install
+
+Packaged versions of Parallax SSH for various distributions can be
+downloaded from the openSUSE [OBS][obs].
+
+To install via PyPI, use `pip`:
+
+    # pip install parallax
+
+Share and enjoy!
+
+## Usage
+
+* `parallax.call(hosts, cmdline, opts)`
+
+  Executes the given command on a set of hosts, collecting the output.
+
+  Returns a dict mapping the hostname of
+  each host either to a tuple containing a return code,
+  stdout and stderr, or an `parallax.Error` instance
+  describing the error.
+
+* `parallax.copy(hosts, src, dst, opts)`
+
+  Copies files from `src` on the local machine to `dst` on the
+  remote hosts.
+
+  Returns a dict mapping the hostname of
+  each host either to a path, or an `parallax.Error` instance
+  describing the error.
+
+* `parallax.slurp(hosts, src, dst, opts)`
+
+  Copies files from `src` on the remote hosts to a local folder for
+  each of the remote hosts.
+
+  Returns a dict mapping the hostname of
+  each host either to a path, or an `parallax.Error` instance
+  describing the error.
+
+## How it works
+
+By default, Parallax SSH uses at most 32 SSH process in parallel to
+SSH to the nodes. By default, it uses a timeout of one minute to SSH
+to a node and obtain a result.
+
+## Environment variables
+
+* `PARALLAX_HOSTS`
+* `PARALLAX_USER`
+* `PARALLAX_PAR`
+* `PARALLAX_OUTDIR`
+* `PARALLAX_VERBOSE`
+* `PARALLAX_OPTIONS`
+
+
+  [pssh]: https://code.google.com/p/parallel-ssh/ "parallel-ssh"
+  [ez]: http://peak.telecommunity.com/dist/ez_setup.py "ez_setup.py"
+  [obs]: https://build.opensuse.org/package/show/devel:languages:python/python-parallax "OBS:python-parallax"
diff --git a/bin/parallax-askpass b/bin/parallax-askpass
new file mode 100755
index 0000000..cb50143
--- /dev/null
+++ b/bin/parallax-askpass
@@ -0,0 +1,11 @@
+#!/usr/bin/env python
+
+import os
+import sys
+
+parent, bindir = os.path.split(os.path.dirname(os.path.abspath(sys.argv[0])))
+if os.path.exists(os.path.join(parent, 'parallax')):
+    sys.path.insert(0, parent)
+
+from parallax.askpass_client import askpass_main
+askpass_main()
diff --git a/parallax/__init__.py b/parallax/__init__.py
new file mode 100644
index 0000000..8b9add8
--- /dev/null
+++ b/parallax/__init__.py
@@ -0,0 +1,347 @@
+# Copyright (c) 2013, Kristoffer Gronlund
+#
+# Parallax SSH API
+#
+# Exposes an API for performing
+# parallel SSH operations
+#
+# Three commands are supplied:
+#
+# call(hosts, cmdline, opts)
+#
+# copy(hosts, src, dst, opts)
+#
+# slurp(hosts, src, dst, opts)
+#
+# call returns {host: (rc, stdout, stdin) | error}
+# copy returns {host: path | error}
+# slurp returns {host: path | error}
+#
+# error is an error object which has an error message (or more)
+#
+# opts is bascially command line options
+#
+# call: Executes the given command on a set of hosts, collecting the output
+# copy: Copies files from the local machine to a set of remote hosts
+# slurp: Copies files from a set of remote hosts to local folders
+
+import os
+import sys
+
+DEFAULT_PARALLELISM = 32
+DEFAULT_TIMEOUT = 0  # "infinity" by default
+
+from parallax.manager import Manager, FatalError
+from parallax.task import Task
+
+
+try:
+    basestring
+except NameError:
+    basestring = str
+
+
+class Error(BaseException):
+    """
+    Returned instead of a result for a host
+    in case of an error during the processing for
+    that host.
+    """
+    def __init__(self, msg, task):
+        self.msg = msg
+        self.task = task
+
+    def __str__(self):
+        if self.task and self.task.errorbuffer:
+            return "%s, Error output: %s" % (self.msg,
+                                             self.task.errorbuffer)
+        return self.msg
+
+
+class Options(object):
+    """
+    Common options for call, copy and slurp.
+
+    Note:
+    Setting the inline or inline_stdout options prints the
+    output to stdout unless an alternative manager callback
+    has been set. inline is True by default. This is a change
+    from pssh to parallax.
+    """
+    limit = DEFAULT_PARALLELISM  # Max number of parallel threads
+    timeout = DEFAULT_TIMEOUT    # Timeout in seconds
+    askpass = False              # Ask for a password
+    outdir = None                # Write stdout to a file per host in this directory
+    errdir = None                # Write stderr to a file per host in this directory
+    ssh_options = []             # Extra options to pass to SSH
+    ssh_extra = []               # Extra arguments to pass to SSH
+    verbose = False              # Warning and diagnostic messages
+    quiet = False                # Silence extra output
+    print_out = False            # Print output to stdout when received
+    inline = True                # Store stdout and stderr in memory buffers
+    inline_stdout = False        # Store stdout in memory buffer
+    input_stream = None          # Stream to read stdin from
+    default_user = None          # User to connect as (unless overridden per host)
+    recursive = True             # (copy, slurp only) Copy recursively
+    localdir = None              # (slurp only) Local base directory to copy to
+
+
+def _expand_host_port_user(lst):
+    """
+    Input: list containing hostnames, (host, port)-tuples or (host, port, user)-tuples.
+    Output: list of (host, port, user)-tuples.
+    """
+    def expand(v):
+        if isinstance(v, basestring):
+            return (v, None, None)
+        elif len(v) == 1:
+            return (v[0], None, None)
+        elif len(v) == 2:
+            return (v[0], v[1], None)
+        else:
+            return v
+    return [expand(x) for x in lst]
+
+
+class _CallOutputBuilder(object):
+    def __init__(self):
+        self.finished_tasks = []
+
+    def finished(self, task, n):
+        """Called when Task is complete"""
+        self.finished_tasks.append(task)
+
+    def result(self, manager):
+        """Called when all Tasks are complete to generate result"""
+        ret = {}
+        for task in self.finished_tasks:
+            if task.failures:
+                ret[task.host] = Error(', '.join(task.failures), task)
+            else:
+                ret[task.host] = (task.exitstatus,
+                                  task.outputbuffer or manager.outdir,
+                                  task.errorbuffer or manager.errdir)
+        return ret
+
+
+def _build_call_cmd(host, port, user, cmdline, options, extra):
+    cmd = ['ssh', host,
+           '-o', 'NumberOfPasswordPrompts=1',
+           '-o', 'SendEnv=PARALLAX_NODENUM PARALLAX_HOST']
+    if options:
+        for opt in options:
+            cmd += ['-o', opt]
+    if user:
+        cmd += ['-l', user]
+    if port:
+        cmd += ['-p', port]
+    if extra:
+        cmd.extend(extra)
+    if cmdline:
+        cmd.append(cmdline)
+    return cmd
+
+
+def call(hosts, cmdline, opts=Options()):
+    """
+    Executes the given command on a set of hosts, collecting the output
+    Returns {host: (rc, stdout, stdin) | Error}
+    """
+    if opts.outdir and not os.path.exists(opts.outdir):
+        os.makedirs(opts.outdir)
+    if opts.errdir and not os.path.exists(opts.errdir):
+        os.makedirs(opts.errdir)
+    manager = Manager(limit=opts.limit,
+                      timeout=opts.timeout,
+                      askpass=opts.askpass,
+                      outdir=opts.outdir,
+                      errdir=opts.errdir,
+                      callbacks=_CallOutputBuilder())
+    for host, port, user in _expand_host_port_user(hosts):
+        cmd = _build_call_cmd(host, port, user, cmdline,
+                              options=opts.ssh_options,
+                              extra=opts.ssh_extra)
+        t = Task(host, port, user, cmd,
+                 stdin=opts.input_stream,
+                 verbose=opts.verbose,
+                 quiet=opts.quiet,
+                 print_out=opts.print_out,
+                 inline=opts.inline,
+                 inline_stdout=opts.inline_stdout,
+                 default_user=opts.default_user)
+        manager.add_task(t)
+    try:
+        return manager.run()
+    except FatalError:
+        sys.exit(1)
+
+
+class _CopyOutputBuilder(object):
+    def __init__(self):
+        self.finished_tasks = []
+
+    def finished(self, task, n):
+        self.finished_tasks.append(task)
+
+    def result(self, manager):
+        ret = {}
+        for task in self.finished_tasks:
+            if task.failures:
+                ret[task.host] = Error(', '.join(task.failures), task)
+            else:
+                ret[task.host] = (task.exitstatus,
+                                  task.outputbuffer or manager.outdir,
+                                  task.errorbuffer or manager.errdir)
+        return ret
+
+
+def _build_copy_cmd(host, port, user, src, dst, opts):
+    cmd = ['scp', '-qC']
+    if opts.ssh_options:
+        for opt in opts.ssh_options:
+            cmd += ['-o', opt]
+    if port:
+        cmd += ['-P', port]
+    if opts.recursive:
+        cmd.append('-r')
+    if opts.ssh_extra:
+        cmd.extend(opts.ssh_extra)
+    cmd.append(src)
+    if user:
+        cmd.append('%s@%s:%s' % (user, host, dst))
+    else:
+        cmd.append('%s:%s' % (host, dst))
+    return cmd
+
+
+def copy(hosts, src, dst, opts=Options()):
+    """
+    Copies from the local node to a set of remote hosts
+    hosts: [(host, port, user)...]
+    src: local path
+    dst: remote path
+    opts: CopyOptions (optional)
+    Returns {host: (rc, stdout, stdin) | Error}
+    """
+    if opts.outdir and not os.path.exists(opts.outdir):
+        os.makedirs(opts.outdir)
+    if opts.errdir and not os.path.exists(opts.errdir):
+        os.makedirs(opts.errdir)
+    manager = Manager(limit=opts.limit,
+                      timeout=opts.timeout,
+                      askpass=opts.askpass,
+                      outdir=opts.outdir,
+                      errdir=opts.errdir,
+                      callbacks=_CopyOutputBuilder())
+    for host, port, user in _expand_host_port_user(hosts):
+        cmd = _build_copy_cmd(host, port, user, src, dst, opts)
+        t = Task(host, port, user, cmd,
+                 stdin=opts.input_stream,
+                 verbose=opts.verbose,
+                 quiet=opts.quiet,
+                 print_out=opts.print_out,
+                 inline=opts.inline,
+                 inline_stdout=opts.inline_stdout,
+                 default_user=opts.default_user)
+        manager.add_task(t)
+    try:
+        return manager.run()
+    except FatalError:
+        sys.exit(1)
+
+
+class _SlurpOutputBuilder(object):
+    def __init__(self, localdirs):
+        self.finished_tasks = []
+        self.localdirs = localdirs
+
+    def finished(self, task, n):
+        self.finished_tasks.append(task)
+
+    def result(self, manager):
+        ret = {}
+        for task in self.finished_tasks:
+            if task.failures:
+                ret[task.host] = Error(', '.join(task.failures), task)
+            else:
+                # TODO: save name of output file in Task
+                ret[task.host] = (task.exitstatus,
+                                  task.outputbuffer or manager.outdir,
+                                  task.errorbuffer or manager.errdir,
+                                  self.localdirs.get(task.host, None)
+)
+        return ret
+
+
+def _slurp_make_local_dirs(hosts, dst, opts):
+    if opts.localdir and not os.path.exists(opts.localdir):
+        os.makedirs(opts.localdir)
+    localdirs = {}
+    for host, port, user in _expand_host_port_user(hosts):
+        if opts.localdir:
+            dirname = os.path.join(opts.localdir, host)
+        else:
+            dirname = host
+        if not os.path.exists(dirname):
+            os.makedirs(dirname)
+        localdirs[host] = os.path.join(dirname, dst)
+    return localdirs
+
+
+def _build_slurp_cmd(host, port, user, src, dst, opts):
+    cmd = ['scp', '-qC']
+    if opts.ssh_options:
+        for opt in opts.ssh_options:
+            cmd += ['-o', opt]
+    if port:
+        cmd += ['-P', port]
+    if opts.recursive:
+        cmd.append('-r')
+    if opts.ssh_extra:
+        cmd.extend(opts.ssh_extra)
+    if user:
+        cmd.append('%s@%s:%s' % (user, host, src))
+    else:
+        cmd.append('%s:%s' % (host, src))
+    cmd.append(dst)
+    return cmd
+
+
+def slurp(hosts, src, dst, opts=Options()):
+    """
+    Copies from the remote node to the local node
+    hosts: [(host, port, user)...]
+    src: remote path
+    dst: local path
+    opts: CopyOptions (optional)
+    Returns {host: (rc, stdout, stdin, localpath) | Error}
+    """
+    if os.path.isabs(dst):
+        raise ValueError("slurp: Destination must be a relative path")
+    localdirs = _slurp_make_local_dirs(hosts, dst, opts)
+    if opts.outdir and not os.path.exists(opts.outdir):
+        os.makedirs(opts.outdir)
+    if opts.errdir and not os.path.exists(opts.errdir):
+        os.makedirs(opts.errdir)
+    manager = Manager(limit=opts.limit,
+                      timeout=opts.timeout,
+                      askpass=opts.askpass,
+                      outdir=opts.outdir,
+                      errdir=opts.errdir,
+                      callbacks=_SlurpOutputBuilder(localdirs))
+    for host, port, user in _expand_host_port_user(hosts):
+        localpath = localdirs[host]
+        cmd = _build_slurp_cmd(host, port, user, src, localpath, opts)
+        t = Task(host, port, user, cmd,
+                 stdin=opts.input_stream,
+                 verbose=opts.verbose,
+                 quiet=opts.quiet,
+                 print_out=opts.print_out,
+                 inline=opts.inline,
+                 inline_stdout=opts.inline_stdout,
+                 default_user=opts.default_user)
+        manager.add_task(t)
+    try:
+        return manager.run()
+    except FatalError:
+        sys.exit(1)
diff --git a/parallax/askpass_client.py b/parallax/askpass_client.py
new file mode 100644
index 0000000..365e142
--- /dev/null
+++ b/parallax/askpass_client.py
@@ -0,0 +1,102 @@
+# -*- Mode: python -*-
+
+# Copyright (c) 2009-2012, Andrew McNabb
+
+"""Implementation of SSH_ASKPASS to get a password to ssh from parallax.
+
+The password is read from the socket specified by the environment variable
+PARALLAX_ASKPASS_SOCKET.  The other end of this socket is parallax.
+
+The ssh man page discusses SSH_ASKPASS as follows:
+    If ssh needs a passphrase, it will read the passphrase from the current
+    terminal if it was run from a terminal.  If ssh does not have a terminal
+    associated with it but DISPLAY and SSH_ASKPASS are set, it will execute
+    the program specified by SSH_ASKPASS and open an X11 window to read the
+    passphrase.  This is particularly useful when calling ssh from a .xsession
+    or related script.  (Note that on some machines it may be necessary to
+    redirect the input from /dev/null to make this work.)
+"""
+
+import os
+import socket
+import sys
+import textwrap
+
+bin_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
+askpass_bin_path = os.path.join(bin_dir, 'parallax-askpass')
+ASKPASS_PATHS = (askpass_bin_path,
+        '/usr/bin/parallax-askpass',
+        '/usr/libexec/parallax/parallax-askpass',
+        '/usr/local/libexec/parallax/parallax-askpass',
+        '/usr/lib/parallax/parallax-askpass',
+        '/usr/local/lib/parallax/parallax-askpass')
+
+_executable_path = None
+
+def executable_path():
+    """Determines the value to use for SSH_ASKPASS.
+
+    The value is cached since this may be called many times.
+    """
+    global _executable_path
+    if _executable_path is None:
+        for path in ASKPASS_PATHS:
+            if os.access(path, os.X_OK):
+                _executable_path = path
+                break
+        else:
+            _executable_path = ''
+            sys.stderr.write(textwrap.fill("Warning: could not find an"
+                    " executable path for askpass because Parallax SSH was not"
+                    " installed correctly.  Password prompts will not work."))
+            sys.stderr.write('\n')
+    return _executable_path
+
+def askpass_main():
+    """Connects to parallax over the socket specified at PARALLAX_ASKPASS_SOCKET."""
+
+    verbose = os.getenv('PARALLAX_ASKPASS_VERBOSE')
+
+    # It's not documented anywhere, as far as I can tell, but ssh may prompt
+    # for a password or ask a yes/no question.  The command-line argument
+    # specifies what is needed.
+    if len(sys.argv) > 1:
+        prompt = sys.argv[1]
+        if verbose:
+            sys.stderr.write('parallax-askpass received prompt: "%s"\n' % prompt)
+        if not prompt.strip().lower().endswith('password:'):
+            sys.stderr.write(prompt)
+            sys.stderr.write('\n')
+            sys.exit(1)
+    else:
+        sys.stderr.write('Error: parallax-askpass called without a prompt.\n')
+        sys.exit(1)
+
+    address = os.getenv('PARALLAX_ASKPASS_SOCKET')
+    if not address:
+        sys.stderr.write(textwrap.fill("parallax error: SSH requested a password."
+                " Please create SSH keys or use the -A option to provide a"
+                " password."))
+        sys.stderr.write('\n')
+        sys.exit(1)
+
+    sock = socket.socket(socket.AF_UNIX)
+    try:
+        sock.connect(address)
+    except socket.error:
+        _, e, _ = sys.exc_info()
+        message = e.args[1]
+        sys.stderr.write("Couldn't bind to %s: %s.\n" % (address, message))
+        sys.exit(2)
+
+    try:
+        password = sock.makefile().read()
+    except socket.error:
+        sys.stderr.write("Socket error.\n")
+        sys.exit(3)
+
+    print(password)
+
+
+if __name__ == '__main__':
+    askpass_main()
diff --git a/parallax/askpass_server.py b/parallax/askpass_server.py
new file mode 100644
index 0000000..a75a995
--- /dev/null
+++ b/parallax/askpass_server.py
@@ -0,0 +1,99 @@
+# -*- Mode: python -*-
+
+# Copyright (c) 2009-2012, Andrew McNabb
+
+"""Sends the password over a socket to askpass.
+"""
+
+import errno
+import getpass
+import os
+import socket
+import sys
+import tempfile
+import textwrap
+
+from parallax import psshutil
+
+
+class PasswordServer(object):
+    """Listens on a UNIX domain socket for password requests."""
+    def __init__(self):
+        self.sock = None
+        self.tempdir = None
+        self.address = None
+        self.socketmap = {}
+        self.buffermap = {}
+
+    def start(self, iomap, backlog):
+        """Prompts for the password, creates a socket, and starts listening.
+
+        The specified backlog should be the max number of clients connecting
+        at once.
+        """
+        message = ('Warning: do not enter your password if anyone else has'
+                   ' superuser privileges or access to your account.')
+        print(textwrap.fill(message))
+
+        self.password = getpass.getpass()
+
+        # Note that according to the docs for mkdtemp, "The directory is
+        # readable, writable, and searchable only by the creating user."
+        self.tempdir = tempfile.mkdtemp(prefix='parallax.')
+        self.address = os.path.join(self.tempdir, 'parallax_askpass_socket')
+        self.sock = socket.socket(socket.AF_UNIX)
+        psshutil.set_cloexec(self.sock)
+        self.sock.bind(self.address)
+        self.sock.listen(backlog)
+        iomap.register_read(self.sock.fileno(), self.handle_listen)
+
+    def handle_listen(self, fd, iomap):
+        try:
+            conn = self.sock.accept()[0]
+        except socket.error:
+            _, e, _ = sys.exc_info()
+            number = e.args[0]
+            if number == errno.EINTR:
+                return
+            else:
+                # TODO: print an error message here?
+                self.sock.close()
+                self.sock = None
+        fd = conn.fileno()
+        iomap.register_write(fd, self.handle_write)
+        self.socketmap[fd] = conn
+        self.buffermap[fd] = self.password.encode()
+
+    def handle_write(self, fd, iomap):
+        buffer = self.buffermap[fd]
+        conn = self.socketmap[fd]
+        try:
+            bytes_written = conn.send(buffer)
+        except socket.error:
+            _, e, _ = sys.exc_info()
+            number = e.args[0]
+            if number == errno.EINTR:
+                return
+            else:
+                self.close_socket(fd, iomap)
+
+        buffer = buffer[bytes_written:]
+        if buffer:
+            self.buffermap[fd] = buffer
+        else:
+            self.close_socket(fd, iomap)
+
+    def close_socket(self, fd, iomap):
+        iomap.unregister(fd)
+        self.socketmap[fd].close()
+        del self.socketmap[fd]
+        del self.buffermap[fd]
+
+    def __del__(self):
+        if self.sock:
+            self.sock.close()
+            self.sock = None
+        if self.address:
+            os.remove(self.address)
+        if self.tempdir:
+            os.rmdir(self.tempdir)
diff --git a/parallax/callbacks.py b/parallax/callbacks.py
new file mode 100644
index 0000000..2de6213
--- /dev/null
+++ b/parallax/callbacks.py
@@ -0,0 +1,60 @@
+# Copyright (c) 2009-2012, Andrew McNabb
+# Copyright (c) 2013, Kristoffer Gronlund
+
+import sys
+import time
+
+from parallax import color
+
+
+class DefaultCallbacks(object):
+    """
+    Passed to the Manager and called when events occur.
+    """
+    def finished(self, task, n):
+        """Pretty prints a status report after the Task completes.
+        task: a Task object
+        n: Index in sequence of completed tasks.
+        """
+        error = ', '.join(task.failures)
+        tstamp = time.asctime().split()[3]  # Current time
+        if color.has_colors(sys.stdout):
+            progress = color.c("[%s]" % color.B(n))
+            success = color.g("[%s]" % color.B("SUCCESS"))
+            failure = color.r("[%s]" % color.B("FAILURE"))
+            stderr = color.r("Stderr: ")
+            error = color.r(color.B(error))
+        else:
+            progress = "[%s]" % n
+            success = "[SUCCESS]"
+            failure = "[FAILURE]"
+            stderr = "Stderr: "
+        host = task.pretty_host
+        if not task.quiet:
+            if task.failures:
+                print(' '.join((progress, tstamp, failure, host, error)))
+            else:
+                print(' '.join((progress, tstamp, success, host)))
+        # NOTE: The extra flushes are to ensure that the data is output in
+        # the correct order with the C implementation of io.
+        if task.inline_stdout and task.outputbuffer:
+            sys.stdout.flush()
+            try:
+                sys.stdout.buffer.write(task.outputbuffer)
+                sys.stdout.flush()
+            except AttributeError:
+                sys.stdout.write(task.outputbuffer)
+        if task.inline and task.errorbuffer:
+            sys.stdout.write(stderr)
+            # Flush the TextIOWrapper before writing to the binary buffer.
+            sys.stdout.flush()
+            try:
+                sys.stdout.buffer.write(task.errorbuffer)
+            except AttributeError:
+                sys.stdout.write(task.errorbuffer)
+
+    def result(self, manager):
+        """
+        When all Tasks are completed, generate a result to return.
+        """
+        return [task.exitstatus for task in manager.save_tasks if task in manager.done]
diff --git a/parallax/color.py b/parallax/color.py
new file mode 100644
index 0000000..adcdbda
--- /dev/null
+++ b/parallax/color.py
@@ -0,0 +1,39 @@
+# Copyright (c) 2009-2012, Andrew McNabb
+# Copyright (c) 2003-2008, Brent N. Chun
+
+def with_color(string, fg, bg=49):
+    '''Given foreground/background ANSI color codes, return a string that,
+    when printed, will format the supplied string using the supplied colors.
+    '''
+    return "\x1b[%dm\x1b[%dm%s\x1b[39m\x1b[49m" % (fg, bg, string)
+
+def B(string):
+    '''Returns a string that, when printed, will display the supplied string
+    in ANSI bold.
+    '''
+    return "\x1b[1m%s\x1b[22m" % string
+
+def r(string): return with_color(string, 31) # Red
+def g(string): return with_color(string, 32) # Green
+def y(string): return with_color(string, 33) # Yellow
+def b(string): return with_color(string, 34) # Blue
+def m(string): return with_color(string, 35) # Magenta
+def c(string): return with_color(string, 36) # Cyan
+def w(string): return with_color(string, 37) # White
+
+#following from Python cookbook, #475186
+def has_colors(stream):
+    '''Returns boolean indicating whether or not the supplied stream supports
+    ANSI color.
+    '''
+    if not hasattr(stream, "isatty"):
+        return False
+    if not stream.isatty():
+        return False # auto color only on TTYs
+    try:
+        import curses
+        curses.setupterm()
+        return curses.tigetnum("colors") > 2
+    except:
+        # guess false in case of error
+        return False
diff --git a/parallax/manager.py b/parallax/manager.py
new file mode 100644
index 0000000..ed58d5d
--- /dev/null
+++ b/parallax/manager.py
@@ -0,0 +1,443 @@
+# Copyright (c) 2009-2012, Andrew McNabb
+# Copyright (c) 2013, Kristoffer Gronlund
+
+from errno import EINTR
+import os
+import select
+import signal
+import sys
+import threading
+import copy
+
+try:
+    import queue
+except ImportError:
+    import Queue as queue
+
+from parallax.askpass_server import PasswordServer
+from parallax import psshutil
+from parallax import DEFAULT_PARALLELISM, DEFAULT_TIMEOUT
+from parallax.callbacks import DefaultCallbacks
+
+READ_SIZE = 1 << 16
+
+
+class FatalError(RuntimeError):
+    """A fatal error in the Parallax SSH Manager."""
+    pass
+
+
+class Manager(object):
+    """Executes tasks concurrently.
+
+    Tasks are added with add_task() and executed in parallel with run().
+    Returns a list of the exit statuses of the processes.
+
+    Arguments:
+        limit: Maximum number of commands running at once.
+        timeout: Maximum allowed execution time in seconds.
+    """
+    def __init__(self,
+                 limit=DEFAULT_PARALLELISM,
+                 timeout=DEFAULT_TIMEOUT,
+                 askpass=False,
+                 outdir=None,
+                 errdir=None,
+                 callbacks=DefaultCallbacks()):
+        # Backwards compatibility with old __init__
+        # format: Only argument is an options dict
+        if not isinstance(limit, int):
+            if hasattr(limit, 'limit'):
+                self.limit = limit.limit
+            elif hasattr(limit, 'par'):
+                self.limit = limit.par
+            else:
+                self.limit = DEFAULT_PARALLELISM
+            if hasattr(limit, 'timeout'):
+                self.timeout = limit.timeout
+            else:
+                self.timeout = DEFAULT_TIMEOUT
+            if hasattr(limit, 'askpass'):
+                self.askpass = limit.askpass
+            else:
+                self.askpass = False
+            if hasattr(limit, 'outdir'):
+                self.outdir = limit.outdir
+            else:
+                self.outdir = None
+            if hasattr(limit, 'errdir'):
+                self.errdir = limit.errdir
+            else:
+                self.errdir = None
+        else:
+            self.limit = limit
+            self.timeout = timeout
+            self.askpass = askpass
+            self.outdir = outdir
+            self.errdir = errdir
+        self.iomap = make_iomap()
+        self.callbacks = callbacks
+
+        self.taskcount = 0
+        self.tasks = []
+        self.save_tasks = []
+        self.running = []
+        self.done = []
+
+        self.askpass_socket = None
+
+    def run(self):
+        """Processes tasks previously added with add_task."""
+        self.save_tasks = copy.copy(self.tasks)
+        try:
+            if self.outdir or self.errdir:
+                writer = Writer(self.outdir, self.errdir)
+                writer.start()
+            else:
+                writer = None
+
+            if self.askpass:
+                pass_server = PasswordServer()
+                pass_server.start(self.iomap, self.limit)
+                self.askpass_socket = pass_server.address
+
+            self.set_sigchld_handler()
+
+            try:
+                self.update_tasks(writer)
+                wait = None
+                while self.running or self.tasks:
+                    # Opt for efficiency over subsecond timeout accuracy.
+                    if wait is None or wait < 1:
... 902 lines suppressed ...

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/parallax.git



More information about the Python-modules-commits mailing list