[Python-modules-commits] [parallax] 01/08: Import parallax_1.0.1.orig.tar.gz
Valentin Vidic
vvidic-guest at moszumanska.debian.org
Sun Nov 13 21:22:25 UTC 2016
This is an automated email from the git hooks/post-receive script.
vvidic-guest pushed a commit to branch master
in repository parallax.
commit fdf17ec903babd9f496782c607826b5482fc8247
Author: Valentin Vidic <Valentin.Vidic at CARNet.hr>
Date: Sun Nov 13 21:55:51 2016 +0100
Import parallax_1.0.1.orig.tar.gz
---
AUTHORS | 4 +
COPYING | 32 ++++
PKG-INFO | 29 +++
README.md | 76 ++++++++
bin/parallax-askpass | 11 ++
parallax/__init__.py | 347 +++++++++++++++++++++++++++++++++++
parallax/askpass_client.py | 102 +++++++++++
parallax/askpass_server.py | 99 ++++++++++
parallax/callbacks.py | 60 ++++++
parallax/color.py | 39 ++++
parallax/manager.py | 443 +++++++++++++++++++++++++++++++++++++++++++++
parallax/psshutil.py | 139 ++++++++++++++
parallax/task.py | 277 ++++++++++++++++++++++++++++
parallax/version.py | 1 +
setup.py | 37 ++++
test/test_api.py | 86 +++++++++
16 files changed, 1782 insertions(+)
diff --git a/AUTHORS b/AUTHORS
new file mode 100644
index 0000000..d738197
--- /dev/null
+++ b/AUTHORS
@@ -0,0 +1,4 @@
+Andrew McNabb <amcnabb at mcnabbs.org>
+Brent Chun <bnc at theether.org>
+Kristoffer Gronlund <krig at koru.se>
+Vladislav Bogdanov <bubble at hoster-ok.com>
diff --git a/COPYING b/COPYING
new file mode 100644
index 0000000..dd89bbd
--- /dev/null
+++ b/COPYING
@@ -0,0 +1,32 @@
+Copyright (c) 2009, Andrew McNabb
+Copyright (c) 2003-2008, Brent N. Chun
+
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+ * Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+
+ * Redistributions in binary form must reproduce the above
+ copyright notice, this list of conditions and the following
+ disclaimer in the documentation and/or other materials provided
+ with the distribution.
+
+ * The names of its contributors may not be used to endorse or
+ promote products derived from this software without specific
+ prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/PKG-INFO b/PKG-INFO
new file mode 100644
index 0000000..a9885d4
--- /dev/null
+++ b/PKG-INFO
@@ -0,0 +1,29 @@
+Metadata-Version: 1.1
+Name: parallax
+Version: 1.0.1
+Summary: Execute commands and copy files over SSH to multiple machines at once
+Home-page: https://github.com/krig/parallax/
+Author: Kristoffer Gronlund
+Author-email: krig at koru.se
+License: BSD
+Description: Parallax SSH provides an interface to executing commands on multiple
+ nodes at once using SSH. It also provides commands for sending and receiving files to
+ multiple nodes using SCP.
+Platform: linux
+Classifier: Development Status :: 3 - Alpha
+Classifier: Intended Audience :: System Administrators
+Classifier: License :: OSI Approved :: BSD License
+Classifier: Operating System :: POSIX
+Classifier: Programming Language :: Python
+Classifier: Programming Language :: Python :: 2
+Classifier: Programming Language :: Python :: 2.6
+Classifier: Programming Language :: Python :: 2.7
+Classifier: Programming Language :: Python :: 3
+Classifier: Programming Language :: Python :: 3.1
+Classifier: Programming Language :: Python :: 3.2
+Classifier: Programming Language :: Python :: 3.3
+Classifier: Programming Language :: Python :: 3.4
+Classifier: Topic :: Software Development :: Libraries :: Python Modules
+Classifier: Topic :: System :: Clustering
+Classifier: Topic :: System :: Networking
+Classifier: Topic :: System :: Systems Administration
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..beb5620
--- /dev/null
+++ b/README.md
@@ -0,0 +1,76 @@
+# Parallax SSH
+
+Parallax SSH is a fork of [Parallel SSH][pssh] which focuses less on
+command-line tools and more on providing a flexible and programmable
+API that can be used by Python application developers to perform SSH
+operations across multiple machines.
+
+## Installation
+
+Parallax intends to be compatible with Python 2.6 and above (including
+Python 3.1 and greater), but is primarily tested with Python 2.7.
+
+Installation requires setuptools or ez_setup.py. The latter can be
+downloaded [here][ez].
+
+Once those requirements are fulfilled, installation is as simple as:
+
+ # sudo python setup.py install
+
+Packaged versions of Parallax SSH for various distributions can be
+downloaded from the openSUSE [OBS][obs].
+
+To install via PyPI, use `pip`:
+
+ # pip install parallax
+
+Share and enjoy!
+
+## Usage
+
+* `parallax.call(hosts, cmdline, opts)`
+
+ Executes the given command on a set of hosts, collecting the output.
+
+ Returns a dict mapping the hostname of
+ each host either to a tuple containing a return code,
+ stdout and stderr, or an `parallax.Error` instance
+ describing the error.
+
+* `parallax.copy(hosts, src, dst, opts)`
+
+ Copies files from `src` on the local machine to `dst` on the
+ remote hosts.
+
+ Returns a dict mapping the hostname of
+ each host either to a path, or an `parallax.Error` instance
+ describing the error.
+
+* `parallax.slurp(hosts, src, dst, opts)`
+
+ Copies files from `src` on the remote hosts to a local folder for
+ each of the remote hosts.
+
+ Returns a dict mapping the hostname of
+ each host either to a path, or an `parallax.Error` instance
+ describing the error.
+
+## How it works
+
+By default, Parallax SSH uses at most 32 SSH process in parallel to
+SSH to the nodes. By default, it uses a timeout of one minute to SSH
+to a node and obtain a result.
+
+## Environment variables
+
+* `PARALLAX_HOSTS`
+* `PARALLAX_USER`
+* `PARALLAX_PAR`
+* `PARALLAX_OUTDIR`
+* `PARALLAX_VERBOSE`
+* `PARALLAX_OPTIONS`
+
+
+ [pssh]: https://code.google.com/p/parallel-ssh/ "parallel-ssh"
+ [ez]: http://peak.telecommunity.com/dist/ez_setup.py "ez_setup.py"
+ [obs]: https://build.opensuse.org/package/show/devel:languages:python/python-parallax "OBS:python-parallax"
diff --git a/bin/parallax-askpass b/bin/parallax-askpass
new file mode 100755
index 0000000..cb50143
--- /dev/null
+++ b/bin/parallax-askpass
@@ -0,0 +1,11 @@
+#!/usr/bin/env python
+
+import os
+import sys
+
+parent, bindir = os.path.split(os.path.dirname(os.path.abspath(sys.argv[0])))
+if os.path.exists(os.path.join(parent, 'parallax')):
+ sys.path.insert(0, parent)
+
+from parallax.askpass_client import askpass_main
+askpass_main()
diff --git a/parallax/__init__.py b/parallax/__init__.py
new file mode 100644
index 0000000..8b9add8
--- /dev/null
+++ b/parallax/__init__.py
@@ -0,0 +1,347 @@
+# Copyright (c) 2013, Kristoffer Gronlund
+#
+# Parallax SSH API
+#
+# Exposes an API for performing
+# parallel SSH operations
+#
+# Three commands are supplied:
+#
+# call(hosts, cmdline, opts)
+#
+# copy(hosts, src, dst, opts)
+#
+# slurp(hosts, src, dst, opts)
+#
+# call returns {host: (rc, stdout, stdin) | error}
+# copy returns {host: path | error}
+# slurp returns {host: path | error}
+#
+# error is an error object which has an error message (or more)
+#
+# opts is bascially command line options
+#
+# call: Executes the given command on a set of hosts, collecting the output
+# copy: Copies files from the local machine to a set of remote hosts
+# slurp: Copies files from a set of remote hosts to local folders
+
+import os
+import sys
+
+DEFAULT_PARALLELISM = 32
+DEFAULT_TIMEOUT = 0 # "infinity" by default
+
+from parallax.manager import Manager, FatalError
+from parallax.task import Task
+
+
+try:
+ basestring
+except NameError:
+ basestring = str
+
+
+class Error(BaseException):
+ """
+ Returned instead of a result for a host
+ in case of an error during the processing for
+ that host.
+ """
+ def __init__(self, msg, task):
+ self.msg = msg
+ self.task = task
+
+ def __str__(self):
+ if self.task and self.task.errorbuffer:
+ return "%s, Error output: %s" % (self.msg,
+ self.task.errorbuffer)
+ return self.msg
+
+
+class Options(object):
+ """
+ Common options for call, copy and slurp.
+
+ Note:
+ Setting the inline or inline_stdout options prints the
+ output to stdout unless an alternative manager callback
+ has been set. inline is True by default. This is a change
+ from pssh to parallax.
+ """
+ limit = DEFAULT_PARALLELISM # Max number of parallel threads
+ timeout = DEFAULT_TIMEOUT # Timeout in seconds
+ askpass = False # Ask for a password
+ outdir = None # Write stdout to a file per host in this directory
+ errdir = None # Write stderr to a file per host in this directory
+ ssh_options = [] # Extra options to pass to SSH
+ ssh_extra = [] # Extra arguments to pass to SSH
+ verbose = False # Warning and diagnostic messages
+ quiet = False # Silence extra output
+ print_out = False # Print output to stdout when received
+ inline = True # Store stdout and stderr in memory buffers
+ inline_stdout = False # Store stdout in memory buffer
+ input_stream = None # Stream to read stdin from
+ default_user = None # User to connect as (unless overridden per host)
+ recursive = True # (copy, slurp only) Copy recursively
+ localdir = None # (slurp only) Local base directory to copy to
+
+
+def _expand_host_port_user(lst):
+ """
+ Input: list containing hostnames, (host, port)-tuples or (host, port, user)-tuples.
+ Output: list of (host, port, user)-tuples.
+ """
+ def expand(v):
+ if isinstance(v, basestring):
+ return (v, None, None)
+ elif len(v) == 1:
+ return (v[0], None, None)
+ elif len(v) == 2:
+ return (v[0], v[1], None)
+ else:
+ return v
+ return [expand(x) for x in lst]
+
+
+class _CallOutputBuilder(object):
+ def __init__(self):
+ self.finished_tasks = []
+
+ def finished(self, task, n):
+ """Called when Task is complete"""
+ self.finished_tasks.append(task)
+
+ def result(self, manager):
+ """Called when all Tasks are complete to generate result"""
+ ret = {}
+ for task in self.finished_tasks:
+ if task.failures:
+ ret[task.host] = Error(', '.join(task.failures), task)
+ else:
+ ret[task.host] = (task.exitstatus,
+ task.outputbuffer or manager.outdir,
+ task.errorbuffer or manager.errdir)
+ return ret
+
+
+def _build_call_cmd(host, port, user, cmdline, options, extra):
+ cmd = ['ssh', host,
+ '-o', 'NumberOfPasswordPrompts=1',
+ '-o', 'SendEnv=PARALLAX_NODENUM PARALLAX_HOST']
+ if options:
+ for opt in options:
+ cmd += ['-o', opt]
+ if user:
+ cmd += ['-l', user]
+ if port:
+ cmd += ['-p', port]
+ if extra:
+ cmd.extend(extra)
+ if cmdline:
+ cmd.append(cmdline)
+ return cmd
+
+
+def call(hosts, cmdline, opts=Options()):
+ """
+ Executes the given command on a set of hosts, collecting the output
+ Returns {host: (rc, stdout, stdin) | Error}
+ """
+ if opts.outdir and not os.path.exists(opts.outdir):
+ os.makedirs(opts.outdir)
+ if opts.errdir and not os.path.exists(opts.errdir):
+ os.makedirs(opts.errdir)
+ manager = Manager(limit=opts.limit,
+ timeout=opts.timeout,
+ askpass=opts.askpass,
+ outdir=opts.outdir,
+ errdir=opts.errdir,
+ callbacks=_CallOutputBuilder())
+ for host, port, user in _expand_host_port_user(hosts):
+ cmd = _build_call_cmd(host, port, user, cmdline,
+ options=opts.ssh_options,
+ extra=opts.ssh_extra)
+ t = Task(host, port, user, cmd,
+ stdin=opts.input_stream,
+ verbose=opts.verbose,
+ quiet=opts.quiet,
+ print_out=opts.print_out,
+ inline=opts.inline,
+ inline_stdout=opts.inline_stdout,
+ default_user=opts.default_user)
+ manager.add_task(t)
+ try:
+ return manager.run()
+ except FatalError:
+ sys.exit(1)
+
+
+class _CopyOutputBuilder(object):
+ def __init__(self):
+ self.finished_tasks = []
+
+ def finished(self, task, n):
+ self.finished_tasks.append(task)
+
+ def result(self, manager):
+ ret = {}
+ for task in self.finished_tasks:
+ if task.failures:
+ ret[task.host] = Error(', '.join(task.failures), task)
+ else:
+ ret[task.host] = (task.exitstatus,
+ task.outputbuffer or manager.outdir,
+ task.errorbuffer or manager.errdir)
+ return ret
+
+
+def _build_copy_cmd(host, port, user, src, dst, opts):
+ cmd = ['scp', '-qC']
+ if opts.ssh_options:
+ for opt in opts.ssh_options:
+ cmd += ['-o', opt]
+ if port:
+ cmd += ['-P', port]
+ if opts.recursive:
+ cmd.append('-r')
+ if opts.ssh_extra:
+ cmd.extend(opts.ssh_extra)
+ cmd.append(src)
+ if user:
+ cmd.append('%s@%s:%s' % (user, host, dst))
+ else:
+ cmd.append('%s:%s' % (host, dst))
+ return cmd
+
+
+def copy(hosts, src, dst, opts=Options()):
+ """
+ Copies from the local node to a set of remote hosts
+ hosts: [(host, port, user)...]
+ src: local path
+ dst: remote path
+ opts: CopyOptions (optional)
+ Returns {host: (rc, stdout, stdin) | Error}
+ """
+ if opts.outdir and not os.path.exists(opts.outdir):
+ os.makedirs(opts.outdir)
+ if opts.errdir and not os.path.exists(opts.errdir):
+ os.makedirs(opts.errdir)
+ manager = Manager(limit=opts.limit,
+ timeout=opts.timeout,
+ askpass=opts.askpass,
+ outdir=opts.outdir,
+ errdir=opts.errdir,
+ callbacks=_CopyOutputBuilder())
+ for host, port, user in _expand_host_port_user(hosts):
+ cmd = _build_copy_cmd(host, port, user, src, dst, opts)
+ t = Task(host, port, user, cmd,
+ stdin=opts.input_stream,
+ verbose=opts.verbose,
+ quiet=opts.quiet,
+ print_out=opts.print_out,
+ inline=opts.inline,
+ inline_stdout=opts.inline_stdout,
+ default_user=opts.default_user)
+ manager.add_task(t)
+ try:
+ return manager.run()
+ except FatalError:
+ sys.exit(1)
+
+
+class _SlurpOutputBuilder(object):
+ def __init__(self, localdirs):
+ self.finished_tasks = []
+ self.localdirs = localdirs
+
+ def finished(self, task, n):
+ self.finished_tasks.append(task)
+
+ def result(self, manager):
+ ret = {}
+ for task in self.finished_tasks:
+ if task.failures:
+ ret[task.host] = Error(', '.join(task.failures), task)
+ else:
+ # TODO: save name of output file in Task
+ ret[task.host] = (task.exitstatus,
+ task.outputbuffer or manager.outdir,
+ task.errorbuffer or manager.errdir,
+ self.localdirs.get(task.host, None)
+)
+ return ret
+
+
+def _slurp_make_local_dirs(hosts, dst, opts):
+ if opts.localdir and not os.path.exists(opts.localdir):
+ os.makedirs(opts.localdir)
+ localdirs = {}
+ for host, port, user in _expand_host_port_user(hosts):
+ if opts.localdir:
+ dirname = os.path.join(opts.localdir, host)
+ else:
+ dirname = host
+ if not os.path.exists(dirname):
+ os.makedirs(dirname)
+ localdirs[host] = os.path.join(dirname, dst)
+ return localdirs
+
+
+def _build_slurp_cmd(host, port, user, src, dst, opts):
+ cmd = ['scp', '-qC']
+ if opts.ssh_options:
+ for opt in opts.ssh_options:
+ cmd += ['-o', opt]
+ if port:
+ cmd += ['-P', port]
+ if opts.recursive:
+ cmd.append('-r')
+ if opts.ssh_extra:
+ cmd.extend(opts.ssh_extra)
+ if user:
+ cmd.append('%s@%s:%s' % (user, host, src))
+ else:
+ cmd.append('%s:%s' % (host, src))
+ cmd.append(dst)
+ return cmd
+
+
+def slurp(hosts, src, dst, opts=Options()):
+ """
+ Copies from the remote node to the local node
+ hosts: [(host, port, user)...]
+ src: remote path
+ dst: local path
+ opts: CopyOptions (optional)
+ Returns {host: (rc, stdout, stdin, localpath) | Error}
+ """
+ if os.path.isabs(dst):
+ raise ValueError("slurp: Destination must be a relative path")
+ localdirs = _slurp_make_local_dirs(hosts, dst, opts)
+ if opts.outdir and not os.path.exists(opts.outdir):
+ os.makedirs(opts.outdir)
+ if opts.errdir and not os.path.exists(opts.errdir):
+ os.makedirs(opts.errdir)
+ manager = Manager(limit=opts.limit,
+ timeout=opts.timeout,
+ askpass=opts.askpass,
+ outdir=opts.outdir,
+ errdir=opts.errdir,
+ callbacks=_SlurpOutputBuilder(localdirs))
+ for host, port, user in _expand_host_port_user(hosts):
+ localpath = localdirs[host]
+ cmd = _build_slurp_cmd(host, port, user, src, localpath, opts)
+ t = Task(host, port, user, cmd,
+ stdin=opts.input_stream,
+ verbose=opts.verbose,
+ quiet=opts.quiet,
+ print_out=opts.print_out,
+ inline=opts.inline,
+ inline_stdout=opts.inline_stdout,
+ default_user=opts.default_user)
+ manager.add_task(t)
+ try:
+ return manager.run()
+ except FatalError:
+ sys.exit(1)
diff --git a/parallax/askpass_client.py b/parallax/askpass_client.py
new file mode 100644
index 0000000..365e142
--- /dev/null
+++ b/parallax/askpass_client.py
@@ -0,0 +1,102 @@
+# -*- Mode: python -*-
+
+# Copyright (c) 2009-2012, Andrew McNabb
+
+"""Implementation of SSH_ASKPASS to get a password to ssh from parallax.
+
+The password is read from the socket specified by the environment variable
+PARALLAX_ASKPASS_SOCKET. The other end of this socket is parallax.
+
+The ssh man page discusses SSH_ASKPASS as follows:
+ If ssh needs a passphrase, it will read the passphrase from the current
+ terminal if it was run from a terminal. If ssh does not have a terminal
+ associated with it but DISPLAY and SSH_ASKPASS are set, it will execute
+ the program specified by SSH_ASKPASS and open an X11 window to read the
+ passphrase. This is particularly useful when calling ssh from a .xsession
+ or related script. (Note that on some machines it may be necessary to
+ redirect the input from /dev/null to make this work.)
+"""
+
+import os
+import socket
+import sys
+import textwrap
+
+bin_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
+askpass_bin_path = os.path.join(bin_dir, 'parallax-askpass')
+ASKPASS_PATHS = (askpass_bin_path,
+ '/usr/bin/parallax-askpass',
+ '/usr/libexec/parallax/parallax-askpass',
+ '/usr/local/libexec/parallax/parallax-askpass',
+ '/usr/lib/parallax/parallax-askpass',
+ '/usr/local/lib/parallax/parallax-askpass')
+
+_executable_path = None
+
+def executable_path():
+ """Determines the value to use for SSH_ASKPASS.
+
+ The value is cached since this may be called many times.
+ """
+ global _executable_path
+ if _executable_path is None:
+ for path in ASKPASS_PATHS:
+ if os.access(path, os.X_OK):
+ _executable_path = path
+ break
+ else:
+ _executable_path = ''
+ sys.stderr.write(textwrap.fill("Warning: could not find an"
+ " executable path for askpass because Parallax SSH was not"
+ " installed correctly. Password prompts will not work."))
+ sys.stderr.write('\n')
+ return _executable_path
+
+def askpass_main():
+ """Connects to parallax over the socket specified at PARALLAX_ASKPASS_SOCKET."""
+
+ verbose = os.getenv('PARALLAX_ASKPASS_VERBOSE')
+
+ # It's not documented anywhere, as far as I can tell, but ssh may prompt
+ # for a password or ask a yes/no question. The command-line argument
+ # specifies what is needed.
+ if len(sys.argv) > 1:
+ prompt = sys.argv[1]
+ if verbose:
+ sys.stderr.write('parallax-askpass received prompt: "%s"\n' % prompt)
+ if not prompt.strip().lower().endswith('password:'):
+ sys.stderr.write(prompt)
+ sys.stderr.write('\n')
+ sys.exit(1)
+ else:
+ sys.stderr.write('Error: parallax-askpass called without a prompt.\n')
+ sys.exit(1)
+
+ address = os.getenv('PARALLAX_ASKPASS_SOCKET')
+ if not address:
+ sys.stderr.write(textwrap.fill("parallax error: SSH requested a password."
+ " Please create SSH keys or use the -A option to provide a"
+ " password."))
+ sys.stderr.write('\n')
+ sys.exit(1)
+
+ sock = socket.socket(socket.AF_UNIX)
+ try:
+ sock.connect(address)
+ except socket.error:
+ _, e, _ = sys.exc_info()
+ message = e.args[1]
+ sys.stderr.write("Couldn't bind to %s: %s.\n" % (address, message))
+ sys.exit(2)
+
+ try:
+ password = sock.makefile().read()
+ except socket.error:
+ sys.stderr.write("Socket error.\n")
+ sys.exit(3)
+
+ print(password)
+
+
+if __name__ == '__main__':
+ askpass_main()
diff --git a/parallax/askpass_server.py b/parallax/askpass_server.py
new file mode 100644
index 0000000..a75a995
--- /dev/null
+++ b/parallax/askpass_server.py
@@ -0,0 +1,99 @@
+# -*- Mode: python -*-
+
+# Copyright (c) 2009-2012, Andrew McNabb
+
+"""Sends the password over a socket to askpass.
+"""
+
+import errno
+import getpass
+import os
+import socket
+import sys
+import tempfile
+import textwrap
+
+from parallax import psshutil
+
+
+class PasswordServer(object):
+ """Listens on a UNIX domain socket for password requests."""
+ def __init__(self):
+ self.sock = None
+ self.tempdir = None
+ self.address = None
+ self.socketmap = {}
+ self.buffermap = {}
+
+ def start(self, iomap, backlog):
+ """Prompts for the password, creates a socket, and starts listening.
+
+ The specified backlog should be the max number of clients connecting
+ at once.
+ """
+ message = ('Warning: do not enter your password if anyone else has'
+ ' superuser privileges or access to your account.')
+ print(textwrap.fill(message))
+
+ self.password = getpass.getpass()
+
+ # Note that according to the docs for mkdtemp, "The directory is
+ # readable, writable, and searchable only by the creating user."
+ self.tempdir = tempfile.mkdtemp(prefix='parallax.')
+ self.address = os.path.join(self.tempdir, 'parallax_askpass_socket')
+ self.sock = socket.socket(socket.AF_UNIX)
+ psshutil.set_cloexec(self.sock)
+ self.sock.bind(self.address)
+ self.sock.listen(backlog)
+ iomap.register_read(self.sock.fileno(), self.handle_listen)
+
+ def handle_listen(self, fd, iomap):
+ try:
+ conn = self.sock.accept()[0]
+ except socket.error:
+ _, e, _ = sys.exc_info()
+ number = e.args[0]
+ if number == errno.EINTR:
+ return
+ else:
+ # TODO: print an error message here?
+ self.sock.close()
+ self.sock = None
+ fd = conn.fileno()
+ iomap.register_write(fd, self.handle_write)
+ self.socketmap[fd] = conn
+ self.buffermap[fd] = self.password.encode()
+
+ def handle_write(self, fd, iomap):
+ buffer = self.buffermap[fd]
+ conn = self.socketmap[fd]
+ try:
+ bytes_written = conn.send(buffer)
+ except socket.error:
+ _, e, _ = sys.exc_info()
+ number = e.args[0]
+ if number == errno.EINTR:
+ return
+ else:
+ self.close_socket(fd, iomap)
+
+ buffer = buffer[bytes_written:]
+ if buffer:
+ self.buffermap[fd] = buffer
+ else:
+ self.close_socket(fd, iomap)
+
+ def close_socket(self, fd, iomap):
+ iomap.unregister(fd)
+ self.socketmap[fd].close()
+ del self.socketmap[fd]
+ del self.buffermap[fd]
+
+ def __del__(self):
+ if self.sock:
+ self.sock.close()
+ self.sock = None
+ if self.address:
+ os.remove(self.address)
+ if self.tempdir:
+ os.rmdir(self.tempdir)
diff --git a/parallax/callbacks.py b/parallax/callbacks.py
new file mode 100644
index 0000000..2de6213
--- /dev/null
+++ b/parallax/callbacks.py
@@ -0,0 +1,60 @@
+# Copyright (c) 2009-2012, Andrew McNabb
+# Copyright (c) 2013, Kristoffer Gronlund
+
+import sys
+import time
+
+from parallax import color
+
+
+class DefaultCallbacks(object):
+ """
+ Passed to the Manager and called when events occur.
+ """
+ def finished(self, task, n):
+ """Pretty prints a status report after the Task completes.
+ task: a Task object
+ n: Index in sequence of completed tasks.
+ """
+ error = ', '.join(task.failures)
+ tstamp = time.asctime().split()[3] # Current time
+ if color.has_colors(sys.stdout):
+ progress = color.c("[%s]" % color.B(n))
+ success = color.g("[%s]" % color.B("SUCCESS"))
+ failure = color.r("[%s]" % color.B("FAILURE"))
+ stderr = color.r("Stderr: ")
+ error = color.r(color.B(error))
+ else:
+ progress = "[%s]" % n
+ success = "[SUCCESS]"
+ failure = "[FAILURE]"
+ stderr = "Stderr: "
+ host = task.pretty_host
+ if not task.quiet:
+ if task.failures:
+ print(' '.join((progress, tstamp, failure, host, error)))
+ else:
+ print(' '.join((progress, tstamp, success, host)))
+ # NOTE: The extra flushes are to ensure that the data is output in
+ # the correct order with the C implementation of io.
+ if task.inline_stdout and task.outputbuffer:
+ sys.stdout.flush()
+ try:
+ sys.stdout.buffer.write(task.outputbuffer)
+ sys.stdout.flush()
+ except AttributeError:
+ sys.stdout.write(task.outputbuffer)
+ if task.inline and task.errorbuffer:
+ sys.stdout.write(stderr)
+ # Flush the TextIOWrapper before writing to the binary buffer.
+ sys.stdout.flush()
+ try:
+ sys.stdout.buffer.write(task.errorbuffer)
+ except AttributeError:
+ sys.stdout.write(task.errorbuffer)
+
+ def result(self, manager):
+ """
+ When all Tasks are completed, generate a result to return.
+ """
+ return [task.exitstatus for task in manager.save_tasks if task in manager.done]
diff --git a/parallax/color.py b/parallax/color.py
new file mode 100644
index 0000000..adcdbda
--- /dev/null
+++ b/parallax/color.py
@@ -0,0 +1,39 @@
+# Copyright (c) 2009-2012, Andrew McNabb
+# Copyright (c) 2003-2008, Brent N. Chun
+
+def with_color(string, fg, bg=49):
+ '''Given foreground/background ANSI color codes, return a string that,
+ when printed, will format the supplied string using the supplied colors.
+ '''
+ return "\x1b[%dm\x1b[%dm%s\x1b[39m\x1b[49m" % (fg, bg, string)
+
+def B(string):
+ '''Returns a string that, when printed, will display the supplied string
+ in ANSI bold.
+ '''
+ return "\x1b[1m%s\x1b[22m" % string
+
+def r(string): return with_color(string, 31) # Red
+def g(string): return with_color(string, 32) # Green
+def y(string): return with_color(string, 33) # Yellow
+def b(string): return with_color(string, 34) # Blue
+def m(string): return with_color(string, 35) # Magenta
+def c(string): return with_color(string, 36) # Cyan
+def w(string): return with_color(string, 37) # White
+
+#following from Python cookbook, #475186
+def has_colors(stream):
+ '''Returns boolean indicating whether or not the supplied stream supports
+ ANSI color.
+ '''
+ if not hasattr(stream, "isatty"):
+ return False
+ if not stream.isatty():
+ return False # auto color only on TTYs
+ try:
+ import curses
+ curses.setupterm()
+ return curses.tigetnum("colors") > 2
+ except:
+ # guess false in case of error
+ return False
diff --git a/parallax/manager.py b/parallax/manager.py
new file mode 100644
index 0000000..ed58d5d
--- /dev/null
+++ b/parallax/manager.py
@@ -0,0 +1,443 @@
+# Copyright (c) 2009-2012, Andrew McNabb
+# Copyright (c) 2013, Kristoffer Gronlund
+
+from errno import EINTR
+import os
+import select
+import signal
+import sys
+import threading
+import copy
+
+try:
+ import queue
+except ImportError:
+ import Queue as queue
+
+from parallax.askpass_server import PasswordServer
+from parallax import psshutil
+from parallax import DEFAULT_PARALLELISM, DEFAULT_TIMEOUT
+from parallax.callbacks import DefaultCallbacks
+
+READ_SIZE = 1 << 16
+
+
+class FatalError(RuntimeError):
+ """A fatal error in the Parallax SSH Manager."""
+ pass
+
+
+class Manager(object):
+ """Executes tasks concurrently.
+
+ Tasks are added with add_task() and executed in parallel with run().
+ Returns a list of the exit statuses of the processes.
+
+ Arguments:
+ limit: Maximum number of commands running at once.
+ timeout: Maximum allowed execution time in seconds.
+ """
+ def __init__(self,
+ limit=DEFAULT_PARALLELISM,
+ timeout=DEFAULT_TIMEOUT,
+ askpass=False,
+ outdir=None,
+ errdir=None,
+ callbacks=DefaultCallbacks()):
+ # Backwards compatibility with old __init__
+ # format: Only argument is an options dict
+ if not isinstance(limit, int):
+ if hasattr(limit, 'limit'):
+ self.limit = limit.limit
+ elif hasattr(limit, 'par'):
+ self.limit = limit.par
+ else:
+ self.limit = DEFAULT_PARALLELISM
+ if hasattr(limit, 'timeout'):
+ self.timeout = limit.timeout
+ else:
+ self.timeout = DEFAULT_TIMEOUT
+ if hasattr(limit, 'askpass'):
+ self.askpass = limit.askpass
+ else:
+ self.askpass = False
+ if hasattr(limit, 'outdir'):
+ self.outdir = limit.outdir
+ else:
+ self.outdir = None
+ if hasattr(limit, 'errdir'):
+ self.errdir = limit.errdir
+ else:
+ self.errdir = None
+ else:
+ self.limit = limit
+ self.timeout = timeout
+ self.askpass = askpass
+ self.outdir = outdir
+ self.errdir = errdir
+ self.iomap = make_iomap()
+ self.callbacks = callbacks
+
+ self.taskcount = 0
+ self.tasks = []
+ self.save_tasks = []
+ self.running = []
+ self.done = []
+
+ self.askpass_socket = None
+
+ def run(self):
+ """Processes tasks previously added with add_task."""
+ self.save_tasks = copy.copy(self.tasks)
+ try:
+ if self.outdir or self.errdir:
+ writer = Writer(self.outdir, self.errdir)
+ writer.start()
+ else:
+ writer = None
+
+ if self.askpass:
+ pass_server = PasswordServer()
+ pass_server.start(self.iomap, self.limit)
+ self.askpass_socket = pass_server.address
+
+ self.set_sigchld_handler()
+
+ try:
+ self.update_tasks(writer)
+ wait = None
+ while self.running or self.tasks:
+ # Opt for efficiency over subsecond timeout accuracy.
+ if wait is None or wait < 1:
... 902 lines suppressed ...
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/parallax.git
More information about the Python-modules-commits
mailing list