[Python-modules-commits] [txwinrm] 01/09: Import txwinrm_1.3.3.orig.tar.gz
Christopher Stuart Hoskin
mans0954 at moszumanska.debian.org
Fri Oct 27 20:01:22 UTC 2017
This is an automated email from the git hooks/post-receive script.
mans0954 pushed a commit to branch master
in repository txwinrm.
commit c09dc177d5c1be1d8f43c1869a7ecaba6a941eed
Author: Christopher Hoskin <mans0954 at debian.org>
Date: Fri Oct 27 20:53:35 2017 +0100
Import txwinrm_1.3.3.orig.tar.gz
---
.gitignore | 1 +
setup.py | 4 +-
txwinrm/SessionManager.py | 80 +++++++++++++------
txwinrm/WinRMClient.py | 173 +++++++++++++++++++++++++---------------
txwinrm/krb5.py | 60 ++++----------
txwinrm/request/enum_shells.xml | 18 +++++
txwinrm/request/pull_shells.xml | 34 ++++++++
txwinrm/shell.py | 159 +++++++++++++++++++++++++++++-------
txwinrm/twisted_utils.py | 53 ++++++++++++
txwinrm/util.py | 32 ++++----
txwinrm/winrs.py | 6 +-
11 files changed, 437 insertions(+), 183 deletions(-)
diff --git a/.gitignore b/.gitignore
index 3afa72f..9ce43a6 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,4 +1,5 @@
*.py[cod]
+*.bak
# C extensions
*.so
diff --git a/setup.py b/setup.py
index 28b475e..d18113d 100755
--- a/setup.py
+++ b/setup.py
@@ -2,7 +2,7 @@
##############################################################################
#
-# Copyright (C) Zenoss, Inc. 2013, all rights reserved.
+# Copyright (C) Zenoss, Inc. 2013-2017, all rights reserved.
#
# This content is made available according to terms specified in the LICENSE
# file at the top-level directory of this package.
@@ -11,7 +11,7 @@
setup_kwargs = dict(
name='txwinrm',
- version='1.2.20',
+ version='1.3.3',
description='Asynchronous Python WinRM client',
long_description=open('README.rst').read(),
license='See LICENSE file',
diff --git a/txwinrm/SessionManager.py b/txwinrm/SessionManager.py
index e60dc28..1ce700f 100755
--- a/txwinrm/SessionManager.py
+++ b/txwinrm/SessionManager.py
@@ -1,6 +1,6 @@
##############################################################################
#
-# Copyright (C) Zenoss, Inc. 2016, all rights reserved.
+# Copyright (C) Zenoss, Inc. 2016-2017, all rights reserved.
#
# This content is made available according to terms specified in
# License.zenoss under the directory where your Zenoss product is installed.
@@ -23,8 +23,11 @@ A Client should always have a key property. This will be unique to the types
of transactions/requests being made through a single Session
"""
-
+import copy
+from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.protocol import Factory
+Factory.noisy = False
class Session(object):
@@ -48,6 +51,9 @@ class Session(object):
# Error from last login if applicable.
self._login_error = None
+ # Deferred for logouts
+ self._logout_dc = None
+
@inlineCallbacks
def deferred_login(self, client):
"""Kick off a deferred login to a device from the first
@@ -85,20 +91,23 @@ class Session(object):
returnValue(self._token)
@inlineCallbacks
- def deferred_logout(self, client):
+ def deferred_logout(self):
"""Calls session._deferred_logout() only if all other clients
using the same session have also called deferred_logout.
"""
- if len(self._clients) <= 1:
- if self._token:
- try:
- yield self._deferred_logout(client)
- except Exception:
- pass
+ # we still have clients running, don't logout
+ if self._clients:
+ returnValue(None)
- self._token = None
+ if self._token:
+ try:
+ # go ahead and clear the token
+ self._token = None
+
+ yield self._deferred_logout()
+ except Exception:
+ pass
- self._clients.discard(client)
returnValue(None)
@inlineCallbacks
@@ -116,12 +125,12 @@ class Session(object):
returnValue(None)
@inlineCallbacks
- def _deferred_logout(self, client):
+ def _deferred_logout(self, client=None):
"""Performs the ZenPack specific logout from a device.
This will only be called by the last client to logout of the session.
- :param client: Client closing connection
+ :param client: Client closing connection (Optional)
:type client: ZenPack specific client
:rtype: Deferred
:return: Returns a Deferred which logs out of the device.
@@ -135,12 +144,15 @@ class SessionManager(object):
def __init__(self):
# Used to keep track of sessions.
+ # a session entry uses a key that is a tuple
+ # of (ipaddress, some_other_content)
+
self._sessions = {}
def get_connection(self, key):
"""Return the session for a given key."""
if key is None:
- raise Exception('Client key cannot be empty')
+ raise Exception('WinRM SessionManager: Client key cannot be empty')
return self._sessions.get(key, None)
def remove_connection(self, key):
@@ -165,22 +177,31 @@ class SessionManager(object):
:type client: ZenPack defined client
"""
if not hasattr(client, 'key'):
- raise Exception('Client must contain a key field')
+ raise Exception('WinRM SessionManager: Client must contain a key field')
session = self.get_connection(client.key)
- if session:
+ if session is not None:
+ try:
+ session._logout_dc.cancel()
+ except Exception:
+ pass
+ # add client to set
+ session._clients.add(client)
+ # update conn_info in case something changed
+ session.update_conn_info(client)
+ # already connected, return
if session._token:
- session._clients.add(client)
returnValue(session._token)
+ # no session yet, so create a new one
if session is None:
session = session_class()
self._sessions[client.key] = session
+ # log in
token = yield session.deferred_login(client)
returnValue(token)
- @inlineCallbacks
def close_connection(self, client):
"""Kick off a session's logout.
@@ -189,13 +210,24 @@ class SessionManager(object):
:param client: Client closing connection
:type client: ZenPack defined class
"""
- session = self.get_connection(client.key)
+ key = copy.deepcopy(client.key)
+ session = self.get_connection(key)
if not session:
- returnValue(None)
- yield session.deferred_logout(client)
- if not session._clients:
- # No more clients so we don't need to keep the session.
- self._sessions.pop(client.key)
+ # should never happen, but check
+ return
+ try:
+ session._logout_dc.cancel()
+ except Exception:
+ pass
+ session._clients.discard(client)
+ session._logout_dc = reactor.callLater(65, self.deferred_logout, key)
+
+ @inlineCallbacks
+ def deferred_logout(self, key):
+ # first, get the session from the key
+ session = self.get_connection(key)
+ # close current connection and do cleanup for session
+ yield session._deferred_logout()
returnValue(None)
diff --git a/txwinrm/WinRMClient.py b/txwinrm/WinRMClient.py
index 216cc42..13030ed 100755
--- a/txwinrm/WinRMClient.py
+++ b/txwinrm/WinRMClient.py
@@ -1,6 +1,6 @@
##############################################################################
#
-# Copyright (C) Zenoss, Inc. 2013, all rights reserved.
+# Copyright (C) Zenoss, Inc. 2016-2017, all rights reserved.
#
# This content is made available according to terms specified in the LICENSE
# file at the top-level directory of this package.
@@ -11,15 +11,11 @@ import copy
import logging
from collections import namedtuple
from httplib import BAD_REQUEST, UNAUTHORIZED, FORBIDDEN, OK
-import shlex
-from cStringIO import StringIO
-from twisted.internet import reactor
from twisted.internet.defer import (
inlineCallbacks,
returnValue,
- DeferredSemaphore,
- succeed
+ DeferredSemaphore
)
from twisted.internet.error import TimeoutError
@@ -50,7 +46,6 @@ from .util import (
ET,
)
from .shell import (
- _find_shell_id,
_build_command_line_elem,
_build_ps_command_line_elem,
_find_command_id,
@@ -59,17 +54,35 @@ from .shell import (
_find_exit_code,
CommandResponse,
_stripped_lines,
+ _find_shell_id
)
from .enumerate import (
DEFAULT_RESOURCE_URI,
SaxResponseHandler,
- _MAX_REQUESTS_PER_ENUMERATION
+ _MAX_REQUESTS_PER_ENUMERATION,
+ ItemsAccumulator
)
from .SessionManager import SESSION_MANAGER, Session
+from .twisted_utils import with_timeout
kerberos = None
LOG = logging.getLogger('winrm')
+def create_shell_from_elem(elem):
+ accumulator = ItemsAccumulator()
+ accumulator.new_item()
+ for item in ['ShellId', 'Owner', 'ClientIP', 'ShellRunTime', 'ShellInactivity', 'IdleTimeOut']:
+ xpath = './/{{{}}}{}'.format(c.XML_NS_MSRSP, item)
+ try:
+ accumulator.add_property(item, elem.findtext(xpath).strip())
+ except AttributeError as e:
+ if item == 'ShellId':
+ raise Exception('Invalid response from create shell request: {}'.format(e))
+ # as long as we have a valid ShellId we should be fine
+ accumulator.add_property(item, '')
+ return accumulator.items[0]
+
+
class WinRMSession(Session):
'''
Session class to keep track of single winrm connection
@@ -97,8 +110,18 @@ class WinRMSession(Session):
# connection.
self.sem = DeferredSemaphore(1)
+ # unused. reserved for possible future use
self._refresh_dc = None
+ def semrun(self, fn, *args, **kwargs):
+ """Run fn(*args, **kwargs) under a DeferredSemaphore with a timeout."""
+ return self.sem.run(
+ with_timeout,
+ fn=fn,
+ args=args,
+ kwargs=kwargs,
+ seconds=self._conn_info.timeout)
+
def is_kerberos(self):
return self._conn_info.auth_type == 'kerberos'
@@ -118,10 +141,13 @@ class WinRMSession(Session):
self._headers.addRawHeader('Connection', self._conn_info.connectiontype)
return self._headers
+ def update_conn_info(self, client):
+ self._conn_info = client._conn_info
+
@inlineCallbacks
def _deferred_login(self, client=None):
if client:
- self._conn_info = client._conn_info
+ self.update_conn_info(client)
self._url = "{c.scheme}://{c.ipaddress}:{c.port}/wsman".format(c=self._conn_info)
if self.is_kerberos():
self._gssclient = yield _authenticate_with_kerberos(self._conn_info, self._url, self._agent)
@@ -130,30 +156,42 @@ class WinRMSession(Session):
returnValue('basic_auth_token')
@inlineCallbacks
- def _deferred_logout(self):
+ def close_cached_connections(self):
# close connections so we do not end up with orphans
# return a Deferred()
- self.loggedout = True
if self._agent and hasattr(self._agent, 'closeCachedConnections'):
# twisted 11 has no return and is part of the Agent
- return succeed(self._agent.closeCachedConnections())
+ self._agent.closeCachedConnections()
elif self._agent:
- # twisted 12 returns a Deferred
- return self._agent._pool.closeCachedConnections()
- else:
- # no agent
- return succeed(None)
+ # twisted 12 has a pool
+ yield self._agent._pool.closeCachedConnections()
+ returnValue(None)
+
+ @inlineCallbacks
+ def _deferred_logout(self):
+ # close connections so they don't timeout
+ # gssclient will no longer be valid so get rid of it
+ # set token to None so the next client will reinitialize
+ # the connection
+ if self._gssclient is not None:
+ self._gssclient.cleanup()
+ self._gssclient = None
+ self._token = None
+ yield self.close_cached_connections()
+ returnValue(None)
@inlineCallbacks
def handle_response(self, request, response, client):
if response.code == UNAUTHORIZED or response.code == BAD_REQUEST:
# check to see if we need to re-authorize due to lost connection or bad request error
+ # only retry if using kerberos
+ self._token = None
+ yield self.close_cached_connections()
+ self._login_d = None
if self._gssclient is not None:
self._gssclient.cleanup()
self._gssclient = None
self._token = None
- self._agent = _get_agent()
- self._login_d = None
yield SESSION_MANAGER.init_connection(client, WinRMSession)
try:
yield self._set_headers()
@@ -190,8 +228,11 @@ class WinRMSession(Session):
reader = _ErrorReader()
response.deliverBody(reader)
message = yield reader.d
- raise RequestError("HTTP status: {}. {}".format(
- response.code, message))
+ if 'maximum number of concurrent operations for this user has been exceeded' in message:
+ message += ' To fix this, increase the MaxConcurrentOperationsPerUser WinRM'\
+ ' Configuration option to 4294967295 and restart the winrm service.'
+ raise RequestError("{}: HTTP status: {}. {}.".format(
+ client._conn_info.ipaddress, response.code, message))
returnValue(response)
@inlineCallbacks
@@ -217,6 +258,12 @@ class WinRMSession(Session):
@inlineCallbacks
def _send_request(self, request_template_name, client, envelope_size=None,
locale=None, code_page=None, **kwargs):
+ if self._logout_dc is not None:
+ try:
+ self._logout_dc.cancel()
+ self._logout_dc = None
+ except Exception:
+ pass
kwargs['envelope_size'] = envelope_size or client._conn_info.envelope_size
kwargs['locale'] = locale or client._conn_info.locale
kwargs['code_page'] = code_page or client._conn_info.code_page
@@ -245,16 +292,6 @@ class WinRMSession(Session):
response = yield self.handle_response(request, response, client)
returnValue(response)
- def close_connection(self, client):
- try:
- self._refresh_dc.cancel()
- except Exception:
- pass
-
- # Close the connection after 60 seconds. This will give other clients
- # enough time to keep the connection alive and continue using the same session.
- self._refresh_dc = reactor.callLater(60, SESSION_MANAGER.close_connection, client)
-
class WinRMClient(object):
"""Base winrm client class
@@ -266,18 +303,17 @@ class WinRMClient(object):
self.key = None
self._conn_info = conn_info
self.ps_script = None
+ self._shell_id = None
+ self._session = None
- def _session(self):
- return SESSION_MANAGER.get_connection(self.key)
+ def session(self):
+ if self._session is None:
+ self._session = SESSION_MANAGER.get_connection(self.key)
+ return self._session
@inlineCallbacks
def init_connection(self):
"""Initialize a connection through the session_manager"""
- if self._session() is not None:
- try:
- self._refresh_dc.cancel()
- except Exception:
- pass
yield SESSION_MANAGER.init_connection(self, WinRMSession)
returnValue(None)
@@ -285,22 +321,24 @@ class WinRMClient(object):
return self._conn_info.auth_type == 'kerberos'
def decrypt_body(self, body):
- return self._session().decrypt_body(body)
+ return self.session().decrypt_body(body)
@inlineCallbacks
def send_request(self, request, **kwargs):
- if not self._session():
+ if self.session() is None or self._session._token is None\
+ or (self._session.is_kerberos() and self._session._gssclient is None):
yield self.init_connection()
- if not self._session():
+ if not self.session():
raise Exception('Could not connect to device {}'.format(self.conn_info.hostname))
- response = yield self._session().send_request(request, self, **kwargs)
+ response = yield self.session().send_request(request, self, **kwargs)
returnValue(response)
@inlineCallbacks
def _create_shell(self):
elem = yield self.send_request('create')
- returnValue(_find_shell_id(elem))
+ self._shell_id = _find_shell_id(elem)
+ returnValue(self._shell_id)
@inlineCallbacks
def _delete_shell(self, shell_id):
@@ -344,10 +382,12 @@ class WinRMClient(object):
'receive', shell_id=shell_id, command_id=command_id)
returnValue(receive_elem)
- @inlineCallbacks
def close_connection(self):
- yield self._session().close_connection(self)
- returnValue(None)
+ SESSION_MANAGER.close_connection(self)
+
+ @inlineCallbacks
+ def close_cached_connections(self):
+ yield self.session().close_cached_connections()
class SingleCommandClient(WinRMClient):
@@ -370,10 +410,11 @@ class SingleCommandClient(WinRMClient):
self.ps_script = ps_script
yield self.init_connection()
try:
- cmd_response = yield self._session().sem.run(self.run_single_command,
- command_line)
+ cmd_response = yield self.session().semrun(
+ self.run_single_command,
+ command_line)
except Exception:
- yield self.close_connection()
+ self.close_connection()
raise
returnValue(cmd_response)
@@ -388,14 +429,13 @@ class SingleCommandClient(WinRMClient):
.stderr = [<non-empty, stripped line>, ...]
.exit_code = <int>
"""
- shell_id = yield self._create_shell()
+ yield self._create_shell()
cmd_response = None
try:
- cmd_response = yield self._run_command(shell_id, command_line)
+ cmd_response = yield self._run_command(self._shell_id, command_line)
except TimeoutError:
- yield self.close_connection()
- yield self._delete_shell(shell_id)
- yield self.close_connection()
+ yield self.close_cached_connections()
+ self.close_connection()
returnValue(cmd_response)
@inlineCallbacks
@@ -418,6 +458,7 @@ class SingleCommandClient(WinRMClient):
yield self._signal_terminate(shell_id, command_id)
stdout = _stripped_lines(stdout_parts)
stderr = _stripped_lines(stderr_parts)
+ yield self._delete_shell(self._shell_id)
returnValue(CommandResponse(stdout, stderr, exit_code))
@@ -450,14 +491,13 @@ class LongCommandClient(WinRMClient):
self.key = (self._conn_info.ipaddress, command_line + str(ps_script))
self.ps_script = ps_script
yield self.init_connection()
- if self._shell_id is None:
- self._shell_id = yield self._create_shell()
+ self._shell_id = yield self._create_shell()
try:
command_elem = yield self._send_command(self._shell_id,
command_line)
except TimeoutError:
yield self._sender.send_request('delete', shell_id=self._shell_id)
- yield self.close_connection()
+ self.close_connection()
raise
self._command_id = _find_command_id(command_elem)
returnValue(None)
@@ -467,7 +507,7 @@ class LongCommandClient(WinRMClient):
try:
receive_elem = yield self._send_receive(self._shell_id, self._command_id)
except TimeoutError:
- yield self.close_connection()
+ self.close_connection()
raise
stdout_parts = _find_stream(receive_elem, self._command_id, 'stdout')
stderr_parts = _find_stream(receive_elem, self._command_id, 'stderr')
@@ -486,7 +526,7 @@ class LongCommandClient(WinRMClient):
yield self._signal_terminate(self._shell_id, self._command_id)
yield self._delete_shell(self._shell_id)
if close:
- yield self.close_connection()
+ self.close_connection()
returnValue(CommandResponse(stdout, stderr, self._exit_code))
@@ -501,12 +541,12 @@ class EnumerateClient(WinRMClient):
super(EnumerateClient, self).__init__(conn_info)
self._handler = SaxResponseHandler(self)
self._hostname = conn_info.ipaddress
- self.key = (conn_info.ipaddress, 'short')
+ self.key = (conn_info.ipaddress, 'enumerate')
@inlineCallbacks
def enumerate(self, wql, resource_uri=DEFAULT_RESOURCE_URI):
"""Runs a remote WQL query."""
- if self._session() is None:
+ if self.session() is None:
yield self.init_connection()
request_template_name = 'enumerate'
enumeration_context = None
@@ -515,7 +555,7 @@ class EnumerateClient(WinRMClient):
for i in xrange(_MAX_REQUESTS_PER_ENUMERATION):
LOG.info('{0} "{1}" {2}'.format(
self._hostname, wql, request_template_name))
- response = yield self._session()._send_request(
+ response = yield self.session()._send_request(
request_template_name,
self,
resource_uri=resource_uri,
@@ -549,9 +589,10 @@ class EnumerateClient(WinRMClient):
yield self.init_connection()
for enum_info in enum_infos:
try:
- items[enum_info] = yield self._session().sem.run(self.enumerate,
- enum_info.wql,
- enum_info.resource_uri)
+ items[enum_info] = yield self.session().semrun(
+ self.enumerate,
+ enum_info.wql,
+ enum_info.resource_uri)
except (UnauthorizedError, ForbiddenError):
# Fail the collection for general errors.
raise
@@ -559,7 +600,7 @@ class EnumerateClient(WinRMClient):
# Store empty results for other query-specific errors.
continue
- yield self.close_connection()
+ self.close_connection()
returnValue(items)
diff --git a/txwinrm/krb5.py b/txwinrm/krb5.py
old mode 100644
new mode 100755
index 1a733fe..6a74dad
--- a/txwinrm/krb5.py
+++ b/txwinrm/krb5.py
@@ -73,25 +73,6 @@ DOMAIN_REALM_TEMPLATE = (
)
-class KlistProcessProtocol(ProcessProtocol):
- """Communicates with klist command.
-
- This is only to verify the includedir. If there's an error or specific text
- we'll discard the includedir
- """
-
- def __init__(self):
- self.d = defer.Deferred()
- self._error = ''
-
- def errReceived(self, data):
- if 'Included profile file could not be read while initializing krb5' in data:
- self._error = data
-
- def processEnded(self, reason):
- self.d.callback(self._error if self._error else None)
-
-
class Config(object):
"""Manages KRB5_CONFIG."""
@@ -105,33 +86,13 @@ class Config(object):
# For further usage by kerberos python module.
os.environ['KRB5_CONFIG'] = self.path
- @defer.inlineCallbacks
def add_includedir(self, includedir):
- if includedir in self.includedirs:
- return
-
self.includedirs.add(includedir)
self.save()
- # test for valid directory
- klist = None
- for path in ('/usr/bin/klist', '/usr/kerberos/bin/klist'):
- if os.path.isfile(path):
- klist = path
- break
- klist_args = [klist]
- klist_env = {
- 'KRB5_CONFIG': self.path,
- }
-
- protocol = KlistProcessProtocol()
-
- reactor.spawnProcess(protocol, klist, klist_args, klist_env)
-
- results = yield protocol.d
- if results:
- self.includedirs.discard(includedir)
- self.save()
- defer.returnValue(None)
+
+ def remove_includedir(self, includedir):
+ self.includedirs.discard(includedir)
+ self.save()
def add_kdc(self, realm, kdcs, disable_rdns=False):
"""Add realm and KDC to KRB5_CONFIG.
@@ -148,6 +109,8 @@ class Config(object):
if not kdcs or not kdcs.strip():
return
+ # reload currently saved copy
+ self.realms, self.admin_servers = self.load()
valid_kdcs = []
remove_kdcs = []
admin_server = None
@@ -375,7 +338,7 @@ def kinit(username, password, kdc, includedir=None, disable_rdns=False):
global config
if includedir:
- yield config.add_includedir(includedir)
+ config.add_includedir(includedir)
config.add_kdc(realm, kdc, disable_rdns)
ccname = config.get_ccname(username)
@@ -394,6 +357,15 @@ def kinit(username, password, kdc, includedir=None, disable_rdns=False):
reactor.spawnProcess(protocol, kinit, kinit_args, kinit_env)
results = yield protocol.d
+ try:
+ if 'Included profile file could not be read while initializing Kerberos 5 library' in results:
+ config.remove_includedir(includedir)
+ retry_protocol = KinitProcessProtocol(password)
+ reactor.spawnProcess(retry_protocol, kinit, kinit_args, kinit_env)
+ results = yield retry_protocol.d
+ except TypeError:
+ # Everything's ok
+ pass
defer.returnValue(results)
diff --git a/txwinrm/request/enum_shells.xml b/txwinrm/request/enum_shells.xml
new file mode 100644
index 0000000..e5c4b98
--- /dev/null
+++ b/txwinrm/request/enum_shells.xml
@@ -0,0 +1,18 @@
+<s:Envelope xmlns:s="http://www.w3.org/2003/05/soap-envelope" xmlns:wsa="http://schemas.xmlsoap.org/ws/2004/08/addressing" xmlns:wsen="http://schemas.xmlsoap.org/ws/2004/09/enumeration" xmlns:wsman="http://schemas.dmtf.org/wbem/wsman/1/wsman.xsd">
+ <s:Header>
+ <wsa:To>http://SORINOTEST05:5985/wsman</wsa:To>
+ <wsman:ResourceURI s:mustUnderstand="true"> http://schemas.microsoft.com/wbem/wsman/1/windows/shell </wsman:ResourceURI>
+ <wsa:ReplyTo>
+ <wsa:Address s:mustUnderstand="true"> http://schemas.xmlsoap.org/ws/2004/08/addressing/role/anonymous </wsa:Address>
+ </wsa:ReplyTo>
+ <wsa:Action s:mustUnderstand="true"> http://schemas.xmlsoap.org/ws/2004/09/enumeration/Enumerate </wsa:Action>
+ <wsman:MaxEnvelopeSize s:mustUnderstand="true">153600</wsman:MaxEnvelopeSize>
+ <wsa:MessageID>uuid:08E59736-6A1B-4560-8442-64E4D7A26EB5</wsa:MessageID>
+ <wsman:Locale xml:lang="en-US" s:mustUnderstand="false" />
+ <wsman:OperationTimeout>PT60.000S</wsman:OperationTimeout>
+ </s:Header>
+ <s:Body>
+ <wsen:Enumerate> </wsen:Enumerate>
+ </s:Body>
+</s:Envelope>
+
diff --git a/txwinrm/request/pull_shells.xml b/txwinrm/request/pull_shells.xml
new file mode 100644
index 0000000..f72b69f
--- /dev/null
+++ b/txwinrm/request/pull_shells.xml
@@ -0,0 +1,34 @@
+<s:Envelope
+ xmlns:s="http://www.w3.org/2003/05/soap-envelope"
+ xmlns:wsa="http://schemas.xmlsoap.org/ws/2004/08/addressing"
+ xmlns:wsen="http://schemas.xmlsoap.org/ws/2004/09/enumeration"
+ xmlns:wsman="http://schemas.dmtf.org/wbem/wsman/1/wsman.xsd">
+ <s:Header>
+ <wsa:To>http://SORINOTEST05:80/wsman</wsa:To>
+ <wsman:ResourceURI s:mustUnderstand="true">
+ http://schemas.microsoft.com/wbem/wsman/1/windows/shell
+ </wsman:ResourceURI>
+ <wsa:ReplyTo>
+ <wsa:Address s:mustUnderstand="true">
+ http://schemas.xmlsoap.org/ws/2004/08/addressing/role/anonymous
+ </wsa:Address>
+ </wsa:ReplyTo>
+ <wsa:Action s:mustUnderstand="true">
+ http://schemas.xmlsoap.org/ws/2004/09/enumeration/Pull
+ </wsa:Action>
+ <wsman:MaxEnvelopeSize s:mustUnderstand="true">153600</wsman:MaxEnvelopeSize>
+ <wsa:MessageID>
+ uuid:BE9EDB22-23A3-489A-B025-ED7F3461E4AB</wsa:MessageID>
+ <wsman:Locale xml:lang="en-US" s:mustUnderstand="false" />
+ <wsman:OperationTimeout>PT60.000S</wsman:OperationTimeout>
+ </s:Header>
+ <s:Body>
+ <wsen:Pull>
+ <wsen:EnumerationContext
+ xmlns:wsen="http://schemas.xmlsoap.org/ws/2004/09/enumeration">
+ uuid:{uuid}
+ </wsen:EnumerationContext>
+ <wsen:MaxElements>100</wsen:MaxElements>
+ </wsen:Pull>
+ </s:Body>
+ </s:Envelope>
diff --git a/txwinrm/shell.py b/txwinrm/shell.py
old mode 100644
new mode 100755
index f497499..ad2c0de
--- a/txwinrm/shell.py
+++ b/txwinrm/shell.py
@@ -18,8 +18,10 @@ from cStringIO import StringIO
from twisted.internet import reactor, defer, task
from twisted.internet.error import TimeoutError
from xml.etree import cElementTree as ET
+from xml.etree import ElementTree
from . import constants as c
from .util import create_etree_request_sender, get_datetime, RequestError
+from .enumerate import create_parser_and_factory
log = logging.getLogger('winrm')
_MAX_REQUESTS_PER_COMMAND = 9999
@@ -214,36 +216,133 @@ def create_single_shot_command(conn_info):
return SingleShotCommand(sender)
+def _find_enum_context(elem):
+ e_context = None
+ xpath = './/{{{}}}EnumerationContext'.format(c.XML_NS_ENUMERATION)
+ ctxt_elem = elem.find(xpath)
+ if ctxt_elem is not None:
+ e_context = ctxt_elem.text.split(':')[1]
+ return e_context
+
+
+def _find_shell_ids(elem):
+ ids = []
+ xpath = './/{{{}}}ShellId'.format(c.XML_NS_MSRSP)
+ shells = elem.findall(xpath)
+ for shell in shells:
+ ids.append(shell.text)
+ return ids
+
+
+ at defer.inlineCallbacks
+def _get_active_shells(request_sender):
+ elem = yield request_sender.send_request('enum_shells')
+ enum_context = _find_enum_context(elem)
+ if enum_context is None:
+ defer.returnValue(None)
+ response = yield request_sender.send_request('pull_shells', uuid=enum_context)
+ body = ElementTree.tostring(response)
+ parser, factory = create_parser_and_factory()
+ parser.feed(body)
+ defer.returnValue(factory.items)
+
+
+ at defer.inlineCallbacks
+def _get_active_shell(request_sender, conn_info, min_runtime=600):
+ """Sift through existing shells to find what should be the oldest active shell
+ created by our user. Compare against minimum runtime so we grab the oldest
+ shell. something less than min_runtime could have been created by a different
+ client. sender can be RequestSender or WinRMClient. conn_info is a
+ txwinrm.util.ConnectionInfo instance
+ """
+ shells = yield _get_active_shells(request_sender)
+ active_shell = None
+
+ user_domain = conn_info.username.split('@')
+ try:
+ # get domain user as netbios
+ user = (user_domain[1].split('.')[0] + '\\' + user_domain[0]).lower()
+ except IndexError:
+ # local user, no netbios
+ # user_domain[0] will always exist
+ user = user_domain[0].lower()
+
+ def get_runtime(runtime):
+ # return total runtime in seconds
+ # ShellRunTime is specified as P<days>DT<hours>H<minutes>M<seconds>S
+ # e.g. P1DT1H1M1S is a runtime of of 1 day, 1 hour, 1 minute, and 1 second
+ # we'll calculate the total number of seconds a shell has been running using
+ # these numbers
+ try:
+ rt_match = re.match('P(?P<d>\d+)DT(?P<h>\d+)H(?P<m>\d+)M(?P<s>\d+)S', runtime)
+ return int(rt_match.group('s')) + (int(rt_match.group('m')) * 60) + (int(rt_match.group('h')) * 3600) + (int(rt_match.group('d')) * 86400)
+ except Exception:
+ return 0
+
+ for shell in shells:
+ if user == shell.Owner.lower():
+ runtime = get_runtime(shell.ShellRunTime)
+ if runtime > min_runtime:
+ # found possible candidate, test against previous
+ if active_shell is not None:
+ prev_runtime = get_runtime(active_shell.ShellRunTime)
+ if runtime > prev_runtime:
+ active_shell = shell
+ else:
+ active_shell = shell
+ defer.returnValue(active_shell)
+
+
class LongRunningCommand(object):
- def __init__(self, sender):
+ def __init__(self, sender, min_runtime=600):
self._sender = sender
self._shell_id = None
self._command_id = None
self._exit_code = None
+ # attach to shell with a minimum runtime of x seconds to know
+ # that our user created the shell
+ self._min_runtime = min_runtime
+
+ @defer.inlineCallbacks
+ def is_shell_active(self, shell_id):
+ if shell_id is None:
+ defer.returnValue(False)
+ elem = yield self._sender.send_request('enum_shells')
+ enum_context = _find_enum_context(elem)
+ if enum_context is None:
+ defer.returnValue(False)
+ elem = yield self._sender.send_request('pull_shells', uuid=enum_context)
+ shell_ids = _find_shell_ids(elem)
+ if shell_id in shell_ids:
+ defer.returnValue(True)
+ else:
+ defer.returnValue(False)
+
@defer.inlineCallbacks
def start(self, command_line, ps_script=None):
- log.debug("LongRunningCommand run_command: {0}".format(command_line))
- if self._shell_id is None:
- elem = yield self._sender.send_request('create')
- self._shell_id = _find_shell_id(elem)
+ elem = yield self._sender.send_request('create')
+ self._shell_id = _find_shell_id(elem)
if ps_script is not None:
+ log.debug("LongRunningCommand run_command: {0}".format(command_line + ps_script))
command_line_elem = _build_ps_command_line_elem(command_line, ps_script)
else:
+ log.debug("LongRunningCommand run_command: {0}".format(command_line))
command_line_elem = _build_command_line_elem(command_line)
log.debug('LongRunningCommand run_command: sending command request '
'(shell_id={0}, command_line_elem={1})'.format(
- self._shell_id, command_line_elem))
+ self._shell_id, command_line_elem))
try:
command_elem = yield self._sender.send_request(
'command', shell_id=self._shell_id,
command_line_elem=command_line_elem,
timeout=self._sender._sender._conn_info.timeout)
except TimeoutError:
- yield self.delete_and_close()
+ yield self._sender.close_connections()
raise
self._command_id = _find_command_id(command_elem)
+ defer.returnValue(self._command_id)
@defer.inlineCallbacks
def receive(self):
@@ -253,8 +352,17 @@ class LongRunningCommand(object):
shell_id=self._shell_id,
command_id=self._command_id)
except TimeoutError:
- yield self.delete_and_close()
- raise
+ # could be simple network problem, reconnect and try again
+ yield self._sender.close_connections()
+ try:
+ receive_elem = yield self._sender.send_request(
+ 'receive',
+ shell_id=self._shell_id,
+ command_id=self._command_id)
+ except TimeoutError:
+ yield self._sender.close_connections()
+ except Exception:
+ raise
stdout_parts = _find_stream(receive_elem, self._command_id, 'stdout')
stderr_parts = _find_stream(receive_elem, self._command_id, 'stderr')
self._exit_code = _find_exit_code(receive_elem, self._command_id)
@@ -263,14 +371,6 @@ class LongRunningCommand(object):
defer.returnValue((stdout, stderr))
@defer.inlineCallbacks
- def delete_and_close(self):
- """Delete the remote shell and close connection"""
- yield self._sender.send_request('delete', shell_id=self._shell_id)
- self._shell_id = None
- yield self._sender.close_connections()
- defer.returnValue(None)
-
- @defer.inlineCallbacks
def stop(self):
for _ in xrange(_MAX_RETRIES):
try:
@@ -279,23 +379,22 @@ class LongRunningCommand(object):
shell_id=self._shell_id,
command_id=self._command_id,
signal_code=c.SHELL_SIGNAL_CTRL_C)
- except RequestError as e:
- # if we get a 500 error back, let's try again
- if 'HTTP status: 500. An internal error occurred' in e.message:
- continue
- else:
- yield self.delete_and_close()
- raise e
- except Exception:
- yield self.delete_and_close()
- raise
- else:
break
+ except TimeoutError:
+ # we may need to reset the connection and try again
+ yield self._sender.close_connections()
+ except Exception:
+ pass
... 190 lines suppressed ...
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/txwinrm.git
More information about the Python-modules-commits
mailing list