[Python-modules-commits] [txwinrm] 01/09: Import txwinrm_1.3.3.orig.tar.gz

Christopher Stuart Hoskin mans0954 at moszumanska.debian.org
Fri Oct 27 20:01:22 UTC 2017


This is an automated email from the git hooks/post-receive script.

mans0954 pushed a commit to branch master
in repository txwinrm.

commit c09dc177d5c1be1d8f43c1869a7ecaba6a941eed
Author: Christopher Hoskin <mans0954 at debian.org>
Date:   Fri Oct 27 20:53:35 2017 +0100

    Import txwinrm_1.3.3.orig.tar.gz
---
 .gitignore                      |   1 +
 setup.py                        |   4 +-
 txwinrm/SessionManager.py       |  80 +++++++++++++------
 txwinrm/WinRMClient.py          | 173 +++++++++++++++++++++++++---------------
 txwinrm/krb5.py                 |  60 ++++----------
 txwinrm/request/enum_shells.xml |  18 +++++
 txwinrm/request/pull_shells.xml |  34 ++++++++
 txwinrm/shell.py                | 159 +++++++++++++++++++++++++++++-------
 txwinrm/twisted_utils.py        |  53 ++++++++++++
 txwinrm/util.py                 |  32 ++++----
 txwinrm/winrs.py                |   6 +-
 11 files changed, 437 insertions(+), 183 deletions(-)

diff --git a/.gitignore b/.gitignore
index 3afa72f..9ce43a6 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,4 +1,5 @@
 *.py[cod]
+*.bak
 
 # C extensions
 *.so
diff --git a/setup.py b/setup.py
index 28b475e..d18113d 100755
--- a/setup.py
+++ b/setup.py
@@ -2,7 +2,7 @@
 
 ##############################################################################
 #
-# Copyright (C) Zenoss, Inc. 2013, all rights reserved.
+# Copyright (C) Zenoss, Inc. 2013-2017, all rights reserved.
 #
 # This content is made available according to terms specified in the LICENSE
 # file at the top-level directory of this package.
@@ -11,7 +11,7 @@
 
 setup_kwargs = dict(
     name='txwinrm',
-    version='1.2.20',
+    version='1.3.3',
     description='Asynchronous Python WinRM client',
     long_description=open('README.rst').read(),
     license='See LICENSE file',
diff --git a/txwinrm/SessionManager.py b/txwinrm/SessionManager.py
index e60dc28..1ce700f 100755
--- a/txwinrm/SessionManager.py
+++ b/txwinrm/SessionManager.py
@@ -1,6 +1,6 @@
 ##############################################################################
 #
-# Copyright (C) Zenoss, Inc. 2016, all rights reserved.
+# Copyright (C) Zenoss, Inc. 2016-2017, all rights reserved.
 #
 # This content is made available according to terms specified in
 # License.zenoss under the directory where your Zenoss product is installed.
@@ -23,8 +23,11 @@ A Client should always have a key property.  This will be unique to the types
 of transactions/requests being made through a single Session
 
 """
-
+import copy
+from twisted.internet import reactor
 from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.protocol import Factory
+Factory.noisy = False
 
 
 class Session(object):
@@ -48,6 +51,9 @@ class Session(object):
         # Error from last login if applicable.
         self._login_error = None
 
+        # Deferred for logouts
+        self._logout_dc = None
+
     @inlineCallbacks
     def deferred_login(self, client):
         """Kick off a deferred login to a device from the first
@@ -85,20 +91,23 @@ class Session(object):
         returnValue(self._token)
 
     @inlineCallbacks
-    def deferred_logout(self, client):
+    def deferred_logout(self):
         """Calls session._deferred_logout() only if all other clients
         using the same session have also called deferred_logout.
         """
-        if len(self._clients) <= 1:
-            if self._token:
-                try:
-                    yield self._deferred_logout(client)
-                except Exception:
-                    pass
+        # we still have clients running, don't logout
+        if self._clients:
+            returnValue(None)
 
-            self._token = None
+        if self._token:
+            try:
+                # go ahead and clear the token
+                self._token = None
+
+                yield self._deferred_logout()
+            except Exception:
+                pass
 
-        self._clients.discard(client)
         returnValue(None)
 
     @inlineCallbacks
@@ -116,12 +125,12 @@ class Session(object):
         returnValue(None)
 
     @inlineCallbacks
-    def _deferred_logout(self, client):
+    def _deferred_logout(self, client=None):
         """Performs the ZenPack specific logout from a device.
 
         This will only be called by the last client to logout of the session.
 
-        :param client: Client closing connection
+        :param client: Client closing connection (Optional)
         :type client: ZenPack specific client
         :rtype: Deferred
         :return: Returns a Deferred which logs out of the device.
@@ -135,12 +144,15 @@ class SessionManager(object):
 
     def __init__(self):
         # Used to keep track of sessions.
+        # a session entry uses a key that is a tuple
+        # of (ipaddress, some_other_content)
+
         self._sessions = {}
 
     def get_connection(self, key):
         """Return the session for a given key."""
         if key is None:
-            raise Exception('Client key cannot be empty')
+            raise Exception('WinRM SessionManager: Client key cannot be empty')
         return self._sessions.get(key, None)
 
     def remove_connection(self, key):
@@ -165,22 +177,31 @@ class SessionManager(object):
         :type client: ZenPack defined client
         """
         if not hasattr(client, 'key'):
-            raise Exception('Client must contain a key field')
+            raise Exception('WinRM SessionManager: Client must contain a key field')
 
         session = self.get_connection(client.key)
-        if session:
+        if session is not None:
+            try:
+                session._logout_dc.cancel()
+            except Exception:
+                pass
+            # add client to set
+            session._clients.add(client)
+            # update conn_info in case something changed
+            session.update_conn_info(client)
+            # already connected, return
             if session._token:
-                session._clients.add(client)
                 returnValue(session._token)
 
+        # no session yet, so create a new one
         if session is None:
             session = session_class()
             self._sessions[client.key] = session
 
+        # log in
         token = yield session.deferred_login(client)
         returnValue(token)
 
-    @inlineCallbacks
     def close_connection(self, client):
         """Kick off a session's logout.
 
@@ -189,13 +210,24 @@ class SessionManager(object):
         :param client: Client closing connection
         :type client: ZenPack defined class
         """
-        session = self.get_connection(client.key)
+        key = copy.deepcopy(client.key)
+        session = self.get_connection(key)
         if not session:
-            returnValue(None)
-        yield session.deferred_logout(client)
-        if not session._clients:
-            # No more clients so we don't need to keep the session.
-            self._sessions.pop(client.key)
+            # should never happen, but check
+            return
+        try:
+            session._logout_dc.cancel()
+        except Exception:
+            pass
+        session._clients.discard(client)
+        session._logout_dc = reactor.callLater(65, self.deferred_logout, key)
+
+    @inlineCallbacks
+    def deferred_logout(self, key):
+        # first, get the session from the key
+        session = self.get_connection(key)
+        # close current connection and do cleanup for session
+        yield session._deferred_logout()
         returnValue(None)
 
 
diff --git a/txwinrm/WinRMClient.py b/txwinrm/WinRMClient.py
index 216cc42..13030ed 100755
--- a/txwinrm/WinRMClient.py
+++ b/txwinrm/WinRMClient.py
@@ -1,6 +1,6 @@
 ##############################################################################
 #
-# Copyright (C) Zenoss, Inc. 2013, all rights reserved.
+# Copyright (C) Zenoss, Inc. 2016-2017, all rights reserved.
 #
 # This content is made available according to terms specified in the LICENSE
 # file at the top-level directory of this package.
@@ -11,15 +11,11 @@ import copy
 import logging
 from collections import namedtuple
 from httplib import BAD_REQUEST, UNAUTHORIZED, FORBIDDEN, OK
-import shlex
-from cStringIO import StringIO
 
-from twisted.internet import reactor
 from twisted.internet.defer import (
     inlineCallbacks,
     returnValue,
-    DeferredSemaphore,
-    succeed
+    DeferredSemaphore
 )
 from twisted.internet.error import TimeoutError
 
@@ -50,7 +46,6 @@ from .util import (
     ET,
 )
 from .shell import (
-    _find_shell_id,
     _build_command_line_elem,
     _build_ps_command_line_elem,
     _find_command_id,
@@ -59,17 +54,35 @@ from .shell import (
     _find_exit_code,
     CommandResponse,
     _stripped_lines,
+    _find_shell_id
 )
 from .enumerate import (
     DEFAULT_RESOURCE_URI,
     SaxResponseHandler,
-    _MAX_REQUESTS_PER_ENUMERATION
+    _MAX_REQUESTS_PER_ENUMERATION,
+    ItemsAccumulator
 )
 from .SessionManager import SESSION_MANAGER, Session
+from .twisted_utils import with_timeout
 kerberos = None
 LOG = logging.getLogger('winrm')
 
 
+def create_shell_from_elem(elem):
+    accumulator = ItemsAccumulator()
+    accumulator.new_item()
+    for item in ['ShellId', 'Owner', 'ClientIP', 'ShellRunTime', 'ShellInactivity', 'IdleTimeOut']:
+        xpath = './/{{{}}}{}'.format(c.XML_NS_MSRSP, item)
+        try:
+            accumulator.add_property(item, elem.findtext(xpath).strip())
+        except AttributeError as e:
+            if item == 'ShellId':
+                raise Exception('Invalid response from create shell request: {}'.format(e))
+            # as long as we have a valid ShellId we should be fine
+            accumulator.add_property(item, '')
+    return accumulator.items[0]
+
+
 class WinRMSession(Session):
     '''
     Session class to keep track of single winrm connection
@@ -97,8 +110,18 @@ class WinRMSession(Session):
         # connection.
         self.sem = DeferredSemaphore(1)
 
+        # unused.  reserved for possible future use
         self._refresh_dc = None
 
+    def semrun(self, fn, *args, **kwargs):
+        """Run fn(*args, **kwargs) under a DeferredSemaphore with a timeout."""
+        return self.sem.run(
+            with_timeout,
+            fn=fn,
+            args=args,
+            kwargs=kwargs,
+            seconds=self._conn_info.timeout)
+
     def is_kerberos(self):
         return self._conn_info.auth_type == 'kerberos'
 
@@ -118,10 +141,13 @@ class WinRMSession(Session):
             self._headers.addRawHeader('Connection', self._conn_info.connectiontype)
         return self._headers
 
+    def update_conn_info(self, client):
+        self._conn_info = client._conn_info
+
     @inlineCallbacks
     def _deferred_login(self, client=None):
         if client:
-            self._conn_info = client._conn_info
+            self.update_conn_info(client)
         self._url = "{c.scheme}://{c.ipaddress}:{c.port}/wsman".format(c=self._conn_info)
         if self.is_kerberos():
             self._gssclient = yield _authenticate_with_kerberos(self._conn_info, self._url, self._agent)
@@ -130,30 +156,42 @@ class WinRMSession(Session):
             returnValue('basic_auth_token')
 
     @inlineCallbacks
-    def _deferred_logout(self):
+    def close_cached_connections(self):
         # close connections so we do not end up with orphans
         # return a Deferred()
-        self.loggedout = True
         if self._agent and hasattr(self._agent, 'closeCachedConnections'):
             # twisted 11 has no return and is part of the Agent
-            return succeed(self._agent.closeCachedConnections())
+            self._agent.closeCachedConnections()
         elif self._agent:
-            # twisted 12 returns a Deferred
-            return self._agent._pool.closeCachedConnections()
-        else:
-            # no agent
-            return succeed(None)
+            # twisted 12 has a pool
+            yield self._agent._pool.closeCachedConnections()
+        returnValue(None)
+
+    @inlineCallbacks
+    def _deferred_logout(self):
+        # close connections so they don't timeout
+        # gssclient will no longer be valid so get rid of it
+        # set token to None so the next client will reinitialize
+        #   the connection
+        if self._gssclient is not None:
+            self._gssclient.cleanup()
+            self._gssclient = None
+        self._token = None
+        yield self.close_cached_connections()
+        returnValue(None)
 
     @inlineCallbacks
     def handle_response(self, request, response, client):
         if response.code == UNAUTHORIZED or response.code == BAD_REQUEST:
             # check to see if we need to re-authorize due to lost connection or bad request error
+            # only retry if using kerberos
+            self._token = None
+            yield self.close_cached_connections()
+            self._login_d = None
             if self._gssclient is not None:
                 self._gssclient.cleanup()
                 self._gssclient = None
                 self._token = None
-                self._agent = _get_agent()
-                self._login_d = None
                 yield SESSION_MANAGER.init_connection(client, WinRMSession)
                 try:
                     yield self._set_headers()
@@ -190,8 +228,11 @@ class WinRMSession(Session):
                 reader = _ErrorReader()
             response.deliverBody(reader)
             message = yield reader.d
-            raise RequestError("HTTP status: {}. {}".format(
-                response.code, message))
+            if 'maximum number of concurrent operations for this user has been exceeded' in message:
+                message += '  To fix this, increase the MaxConcurrentOperationsPerUser WinRM'\
+                           ' Configuration option to 4294967295 and restart the winrm service.'
+            raise RequestError("{}: HTTP status: {}. {}.".format(
+                client._conn_info.ipaddress, response.code, message))
         returnValue(response)
 
     @inlineCallbacks
@@ -217,6 +258,12 @@ class WinRMSession(Session):
     @inlineCallbacks
     def _send_request(self, request_template_name, client, envelope_size=None,
                       locale=None, code_page=None, **kwargs):
+        if self._logout_dc is not None:
+            try:
+                self._logout_dc.cancel()
+                self._logout_dc = None
+            except Exception:
+                pass
         kwargs['envelope_size'] = envelope_size or client._conn_info.envelope_size
         kwargs['locale'] = locale or client._conn_info.locale
         kwargs['code_page'] = code_page or client._conn_info.code_page
@@ -245,16 +292,6 @@ class WinRMSession(Session):
         response = yield self.handle_response(request, response, client)
         returnValue(response)
 
-    def close_connection(self, client):
-        try:
-            self._refresh_dc.cancel()
-        except Exception:
-            pass
-
-        # Close the connection after 60 seconds.  This will give other clients
-        # enough time to keep the connection alive and continue using the same session.
-        self._refresh_dc = reactor.callLater(60, SESSION_MANAGER.close_connection, client)
-
 
 class WinRMClient(object):
     """Base winrm client class
@@ -266,18 +303,17 @@ class WinRMClient(object):
         self.key = None
         self._conn_info = conn_info
         self.ps_script = None
+        self._shell_id = None
+        self._session = None
 
-    def _session(self):
-        return SESSION_MANAGER.get_connection(self.key)
+    def session(self):
+        if self._session is None:
+            self._session = SESSION_MANAGER.get_connection(self.key)
+        return self._session
 
     @inlineCallbacks
     def init_connection(self):
         """Initialize a connection through the session_manager"""
-        if self._session() is not None:
-            try:
-                self._refresh_dc.cancel()
-            except Exception:
-                pass
         yield SESSION_MANAGER.init_connection(self, WinRMSession)
         returnValue(None)
 
@@ -285,22 +321,24 @@ class WinRMClient(object):
         return self._conn_info.auth_type == 'kerberos'
 
     def decrypt_body(self, body):
-        return self._session().decrypt_body(body)
+        return self.session().decrypt_body(body)
 
     @inlineCallbacks
     def send_request(self, request, **kwargs):
-        if not self._session():
+        if self.session() is None or self._session._token is None\
+                or (self._session.is_kerberos() and self._session._gssclient is None):
             yield self.init_connection()
 
-        if not self._session():
+        if not self.session():
             raise Exception('Could not connect to device {}'.format(self.conn_info.hostname))
-        response = yield self._session().send_request(request, self, **kwargs)
+        response = yield self.session().send_request(request, self, **kwargs)
         returnValue(response)
 
     @inlineCallbacks
     def _create_shell(self):
         elem = yield self.send_request('create')
-        returnValue(_find_shell_id(elem))
+        self._shell_id = _find_shell_id(elem)
+        returnValue(self._shell_id)
 
     @inlineCallbacks
     def _delete_shell(self, shell_id):
@@ -344,10 +382,12 @@ class WinRMClient(object):
             'receive', shell_id=shell_id, command_id=command_id)
         returnValue(receive_elem)
 
-    @inlineCallbacks
     def close_connection(self):
-        yield self._session().close_connection(self)
-        returnValue(None)
+        SESSION_MANAGER.close_connection(self)
+
+    @inlineCallbacks
+    def close_cached_connections(self):
+        yield self.session().close_cached_connections()
 
 
 class SingleCommandClient(WinRMClient):
@@ -370,10 +410,11 @@ class SingleCommandClient(WinRMClient):
         self.ps_script = ps_script
         yield self.init_connection()
         try:
-            cmd_response = yield self._session().sem.run(self.run_single_command,
-                                                         command_line)
+            cmd_response = yield self.session().semrun(
+                self.run_single_command,
+                command_line)
         except Exception:
-            yield self.close_connection()
+            self.close_connection()
             raise
         returnValue(cmd_response)
 
@@ -388,14 +429,13 @@ class SingleCommandClient(WinRMClient):
                 .stderr = [<non-empty, stripped line>, ...]
                 .exit_code = <int>
         """
-        shell_id = yield self._create_shell()
+        yield self._create_shell()
         cmd_response = None
         try:
-            cmd_response = yield self._run_command(shell_id, command_line)
+            cmd_response = yield self._run_command(self._shell_id, command_line)
         except TimeoutError:
-            yield self.close_connection()
-        yield self._delete_shell(shell_id)
-        yield self.close_connection()
+            yield self.close_cached_connections()
+        self.close_connection()
         returnValue(cmd_response)
 
     @inlineCallbacks
@@ -418,6 +458,7 @@ class SingleCommandClient(WinRMClient):
         yield self._signal_terminate(shell_id, command_id)
         stdout = _stripped_lines(stdout_parts)
         stderr = _stripped_lines(stderr_parts)
+        yield self._delete_shell(self._shell_id)
         returnValue(CommandResponse(stdout, stderr, exit_code))
 
 
@@ -450,14 +491,13 @@ class LongCommandClient(WinRMClient):
         self.key = (self._conn_info.ipaddress, command_line + str(ps_script))
         self.ps_script = ps_script
         yield self.init_connection()
-        if self._shell_id is None:
-            self._shell_id = yield self._create_shell()
+        self._shell_id = yield self._create_shell()
         try:
             command_elem = yield self._send_command(self._shell_id,
                                                     command_line)
         except TimeoutError:
             yield self._sender.send_request('delete', shell_id=self._shell_id)
-            yield self.close_connection()
+            self.close_connection()
             raise
         self._command_id = _find_command_id(command_elem)
         returnValue(None)
@@ -467,7 +507,7 @@ class LongCommandClient(WinRMClient):
         try:
             receive_elem = yield self._send_receive(self._shell_id, self._command_id)
         except TimeoutError:
-            yield self.close_connection()
+            self.close_connection()
             raise
         stdout_parts = _find_stream(receive_elem, self._command_id, 'stdout')
         stderr_parts = _find_stream(receive_elem, self._command_id, 'stderr')
@@ -486,7 +526,7 @@ class LongCommandClient(WinRMClient):
         yield self._signal_terminate(self._shell_id, self._command_id)
         yield self._delete_shell(self._shell_id)
         if close:
-            yield self.close_connection()
+            self.close_connection()
         returnValue(CommandResponse(stdout, stderr, self._exit_code))
 
 
@@ -501,12 +541,12 @@ class EnumerateClient(WinRMClient):
         super(EnumerateClient, self).__init__(conn_info)
         self._handler = SaxResponseHandler(self)
         self._hostname = conn_info.ipaddress
-        self.key = (conn_info.ipaddress, 'short')
+        self.key = (conn_info.ipaddress, 'enumerate')
 
     @inlineCallbacks
     def enumerate(self, wql, resource_uri=DEFAULT_RESOURCE_URI):
         """Runs a remote WQL query."""
-        if self._session() is None:
+        if self.session() is None:
             yield self.init_connection()
         request_template_name = 'enumerate'
         enumeration_context = None
@@ -515,7 +555,7 @@ class EnumerateClient(WinRMClient):
             for i in xrange(_MAX_REQUESTS_PER_ENUMERATION):
                 LOG.info('{0} "{1}" {2}'.format(
                     self._hostname, wql, request_template_name))
-                response = yield self._session()._send_request(
+                response = yield self.session()._send_request(
                     request_template_name,
                     self,
                     resource_uri=resource_uri,
@@ -549,9 +589,10 @@ class EnumerateClient(WinRMClient):
         yield self.init_connection()
         for enum_info in enum_infos:
             try:
-                items[enum_info] = yield self._session().sem.run(self.enumerate,
-                                                                 enum_info.wql,
-                                                                 enum_info.resource_uri)
+                items[enum_info] = yield self.session().semrun(
+                    self.enumerate,
+                    enum_info.wql,
+                    enum_info.resource_uri)
             except (UnauthorizedError, ForbiddenError):
                 # Fail the collection for general errors.
                 raise
@@ -559,7 +600,7 @@ class EnumerateClient(WinRMClient):
                 # Store empty results for other query-specific errors.
                 continue
 
-        yield self.close_connection()
+        self.close_connection()
         returnValue(items)
 
 
diff --git a/txwinrm/krb5.py b/txwinrm/krb5.py
old mode 100644
new mode 100755
index 1a733fe..6a74dad
--- a/txwinrm/krb5.py
+++ b/txwinrm/krb5.py
@@ -73,25 +73,6 @@ DOMAIN_REALM_TEMPLATE = (
 )
 
 
-class KlistProcessProtocol(ProcessProtocol):
-    """Communicates with klist command.
-
-    This is only to verify the includedir.  If there's an error or specific text
-    we'll discard the includedir
-    """
-
-    def __init__(self):
-        self.d = defer.Deferred()
-        self._error = ''
-
-    def errReceived(self, data):
-        if 'Included profile file could not be read while initializing krb5' in data:
-            self._error = data
-
-    def processEnded(self, reason):
-        self.d.callback(self._error if self._error else None)
-
-
 class Config(object):
     """Manages KRB5_CONFIG."""
 
@@ -105,33 +86,13 @@ class Config(object):
         # For further usage by kerberos python module.
         os.environ['KRB5_CONFIG'] = self.path
 
-    @defer.inlineCallbacks
     def add_includedir(self, includedir):
-        if includedir in self.includedirs:
-            return
-
         self.includedirs.add(includedir)
         self.save()
-        # test for valid directory
-        klist = None
-        for path in ('/usr/bin/klist', '/usr/kerberos/bin/klist'):
-            if os.path.isfile(path):
-                klist = path
-                break
-        klist_args = [klist]
-        klist_env = {
-            'KRB5_CONFIG': self.path,
-        }
-
-        protocol = KlistProcessProtocol()
-
-        reactor.spawnProcess(protocol, klist, klist_args, klist_env)
-
-        results = yield protocol.d
-        if results:
-            self.includedirs.discard(includedir)
-            self.save()
-        defer.returnValue(None)
+
+    def remove_includedir(self, includedir):
+        self.includedirs.discard(includedir)
+        self.save()
 
     def add_kdc(self, realm, kdcs, disable_rdns=False):
         """Add realm and KDC to KRB5_CONFIG.
@@ -148,6 +109,8 @@ class Config(object):
         if not kdcs or not kdcs.strip():
             return
 
+        # reload currently saved copy
+        self.realms, self.admin_servers = self.load()
         valid_kdcs = []
         remove_kdcs = []
         admin_server = None
@@ -375,7 +338,7 @@ def kinit(username, password, kdc, includedir=None, disable_rdns=False):
     global config
 
     if includedir:
-        yield config.add_includedir(includedir)
+        config.add_includedir(includedir)
     config.add_kdc(realm, kdc, disable_rdns)
 
     ccname = config.get_ccname(username)
@@ -394,6 +357,15 @@ def kinit(username, password, kdc, includedir=None, disable_rdns=False):
     reactor.spawnProcess(protocol, kinit, kinit_args, kinit_env)
 
     results = yield protocol.d
+    try:
+        if 'Included profile file could not be read while initializing Kerberos 5 library' in results:
+            config.remove_includedir(includedir)
+            retry_protocol = KinitProcessProtocol(password)
+            reactor.spawnProcess(retry_protocol, kinit, kinit_args, kinit_env)
+            results = yield retry_protocol.d
+    except TypeError:
+        # Everything's ok
+        pass
     defer.returnValue(results)
 
 
diff --git a/txwinrm/request/enum_shells.xml b/txwinrm/request/enum_shells.xml
new file mode 100644
index 0000000..e5c4b98
--- /dev/null
+++ b/txwinrm/request/enum_shells.xml
@@ -0,0 +1,18 @@
+<s:Envelope xmlns:s="http://www.w3.org/2003/05/soap-envelope" xmlns:wsa="http://schemas.xmlsoap.org/ws/2004/08/addressing" xmlns:wsen="http://schemas.xmlsoap.org/ws/2004/09/enumeration" xmlns:wsman="http://schemas.dmtf.org/wbem/wsman/1/wsman.xsd"> 
+  <s:Header> 
+    <wsa:To>http://SORINOTEST05:5985/wsman</wsa:To> 
+    <wsman:ResourceURI s:mustUnderstand="true"> http://schemas.microsoft.com/wbem/wsman/1/windows/shell </wsman:ResourceURI> 
+    <wsa:ReplyTo> 
+      <wsa:Address s:mustUnderstand="true"> http://schemas.xmlsoap.org/ws/2004/08/addressing/role/anonymous </wsa:Address> 
+    </wsa:ReplyTo> 
+    <wsa:Action s:mustUnderstand="true"> http://schemas.xmlsoap.org/ws/2004/09/enumeration/Enumerate </wsa:Action> 
+    <wsman:MaxEnvelopeSize s:mustUnderstand="true">153600</wsman:MaxEnvelopeSize> 
+    <wsa:MessageID>uuid:08E59736-6A1B-4560-8442-64E4D7A26EB5</wsa:MessageID> 
+    <wsman:Locale xml:lang="en-US" s:mustUnderstand="false" /> 
+    <wsman:OperationTimeout>PT60.000S</wsman:OperationTimeout> 
+  </s:Header> 
+  <s:Body> 
+    <wsen:Enumerate> </wsen:Enumerate> 
+  </s:Body> 
+</s:Envelope>
+
diff --git a/txwinrm/request/pull_shells.xml b/txwinrm/request/pull_shells.xml
new file mode 100644
index 0000000..f72b69f
--- /dev/null
+++ b/txwinrm/request/pull_shells.xml
@@ -0,0 +1,34 @@
+<s:Envelope
+   xmlns:s="http://www.w3.org/2003/05/soap-envelope"
+   xmlns:wsa="http://schemas.xmlsoap.org/ws/2004/08/addressing"
+   xmlns:wsen="http://schemas.xmlsoap.org/ws/2004/09/enumeration"
+   xmlns:wsman="http://schemas.dmtf.org/wbem/wsman/1/wsman.xsd">
+   <s:Header>
+     <wsa:To>http://SORINOTEST05:80/wsman</wsa:To>
+     <wsman:ResourceURI s:mustUnderstand="true">
+       http://schemas.microsoft.com/wbem/wsman/1/windows/shell
+     </wsman:ResourceURI>
+     <wsa:ReplyTo>
+       <wsa:Address s:mustUnderstand="true">
+         http://schemas.xmlsoap.org/ws/2004/08/addressing/role/anonymous
+       </wsa:Address>
+     </wsa:ReplyTo>
+     <wsa:Action s:mustUnderstand="true">
+       http://schemas.xmlsoap.org/ws/2004/09/enumeration/Pull
+     </wsa:Action>
+     <wsman:MaxEnvelopeSize s:mustUnderstand="true">153600</wsman:MaxEnvelopeSize>
+     <wsa:MessageID>
+       uuid:BE9EDB22-23A3-489A-B025-ED7F3461E4AB</wsa:MessageID>
+     <wsman:Locale xml:lang="en-US" s:mustUnderstand="false" />
+     <wsman:OperationTimeout>PT60.000S</wsman:OperationTimeout>
+   </s:Header>
+   <s:Body>
+     <wsen:Pull>
+       <wsen:EnumerationContext
+         xmlns:wsen="http://schemas.xmlsoap.org/ws/2004/09/enumeration">
+         uuid:{uuid}
+         </wsen:EnumerationContext>
+       <wsen:MaxElements>100</wsen:MaxElements>
+     </wsen:Pull>
+   </s:Body>
+ </s:Envelope>
diff --git a/txwinrm/shell.py b/txwinrm/shell.py
old mode 100644
new mode 100755
index f497499..ad2c0de
--- a/txwinrm/shell.py
+++ b/txwinrm/shell.py
@@ -18,8 +18,10 @@ from cStringIO import StringIO
 from twisted.internet import reactor, defer, task
 from twisted.internet.error import TimeoutError
 from xml.etree import cElementTree as ET
+from xml.etree import ElementTree
 from . import constants as c
 from .util import create_etree_request_sender, get_datetime, RequestError
+from .enumerate import create_parser_and_factory
 
 log = logging.getLogger('winrm')
 _MAX_REQUESTS_PER_COMMAND = 9999
@@ -214,36 +216,133 @@ def create_single_shot_command(conn_info):
     return SingleShotCommand(sender)
 
 
+def _find_enum_context(elem):
+    e_context = None
+    xpath = './/{{{}}}EnumerationContext'.format(c.XML_NS_ENUMERATION)
+    ctxt_elem = elem.find(xpath)
+    if ctxt_elem is not None:
+        e_context = ctxt_elem.text.split(':')[1]
+    return e_context
+
+
+def _find_shell_ids(elem):
+    ids = []
+    xpath = './/{{{}}}ShellId'.format(c.XML_NS_MSRSP)
+    shells = elem.findall(xpath)
+    for shell in shells:
+        ids.append(shell.text)
+    return ids
+
+
+ at defer.inlineCallbacks
+def _get_active_shells(request_sender):
+    elem = yield request_sender.send_request('enum_shells')
+    enum_context = _find_enum_context(elem)
+    if enum_context is None:
+        defer.returnValue(None)
+    response = yield request_sender.send_request('pull_shells', uuid=enum_context)
+    body = ElementTree.tostring(response)
+    parser, factory = create_parser_and_factory()
+    parser.feed(body)
+    defer.returnValue(factory.items)
+
+
+ at defer.inlineCallbacks
+def _get_active_shell(request_sender, conn_info, min_runtime=600):
+    """Sift through existing shells to find what should be the oldest active shell
+    created by our user.  Compare against minimum runtime so we grab the oldest
+    shell.  something less than min_runtime could have been created by a different
+    client.  sender can be RequestSender or WinRMClient.  conn_info is a
+    txwinrm.util.ConnectionInfo instance
+    """
+    shells = yield _get_active_shells(request_sender)
+    active_shell = None
+
+    user_domain = conn_info.username.split('@')
+    try:
+        # get domain user as netbios
+        user = (user_domain[1].split('.')[0] + '\\' + user_domain[0]).lower()
+    except IndexError:
+        # local user, no netbios
+        # user_domain[0] will always exist
+        user = user_domain[0].lower()
+
+    def get_runtime(runtime):
+        # return total runtime in seconds
+        # ShellRunTime is specified as P<days>DT<hours>H<minutes>M<seconds>S
+        # e.g. P1DT1H1M1S is a runtime of of 1 day, 1 hour, 1 minute, and 1 second
+        # we'll calculate the total number of seconds a shell has been running using
+        # these numbers
+        try:
+            rt_match = re.match('P(?P<d>\d+)DT(?P<h>\d+)H(?P<m>\d+)M(?P<s>\d+)S', runtime)
+            return int(rt_match.group('s')) + (int(rt_match.group('m')) * 60) + (int(rt_match.group('h')) * 3600) + (int(rt_match.group('d')) * 86400)
+        except Exception:
+            return 0
+
+    for shell in shells:
+        if user == shell.Owner.lower():
+            runtime = get_runtime(shell.ShellRunTime)
+            if runtime > min_runtime:
+                # found possible candidate, test against previous
+                if active_shell is not None:
+                    prev_runtime = get_runtime(active_shell.ShellRunTime)
+                    if runtime > prev_runtime:
+                        active_shell = shell
+                else:
+                    active_shell = shell
+    defer.returnValue(active_shell)
+
+
 class LongRunningCommand(object):
 
-    def __init__(self, sender):
+    def __init__(self, sender, min_runtime=600):
         self._sender = sender
         self._shell_id = None
         self._command_id = None
         self._exit_code = None
 
+        # attach to shell with a minimum runtime of x seconds to know
+        # that our user created the shell
+        self._min_runtime = min_runtime
+
+    @defer.inlineCallbacks
+    def is_shell_active(self, shell_id):
+        if shell_id is None:
+            defer.returnValue(False)
+        elem = yield self._sender.send_request('enum_shells')
+        enum_context = _find_enum_context(elem)
+        if enum_context is None:
+            defer.returnValue(False)
+        elem = yield self._sender.send_request('pull_shells', uuid=enum_context)
+        shell_ids = _find_shell_ids(elem)
+        if shell_id in shell_ids:
+            defer.returnValue(True)
+        else:
+            defer.returnValue(False)
+
     @defer.inlineCallbacks
     def start(self, command_line, ps_script=None):
-        log.debug("LongRunningCommand run_command: {0}".format(command_line))
-        if self._shell_id is None:
-            elem = yield self._sender.send_request('create')
-            self._shell_id = _find_shell_id(elem)
+        elem = yield self._sender.send_request('create')
+        self._shell_id = _find_shell_id(elem)
         if ps_script is not None:
+            log.debug("LongRunningCommand run_command: {0}".format(command_line + ps_script))
             command_line_elem = _build_ps_command_line_elem(command_line, ps_script)
         else:
+            log.debug("LongRunningCommand run_command: {0}".format(command_line))
             command_line_elem = _build_command_line_elem(command_line)
         log.debug('LongRunningCommand run_command: sending command request '
                   '(shell_id={0}, command_line_elem={1})'.format(
-                    self._shell_id, command_line_elem))
+                      self._shell_id, command_line_elem))
         try:
             command_elem = yield self._sender.send_request(
                 'command', shell_id=self._shell_id,
                 command_line_elem=command_line_elem,
                 timeout=self._sender._sender._conn_info.timeout)
         except TimeoutError:
-            yield self.delete_and_close()
+            yield self._sender.close_connections()
             raise
         self._command_id = _find_command_id(command_elem)
+        defer.returnValue(self._command_id)
 
     @defer.inlineCallbacks
     def receive(self):
@@ -253,8 +352,17 @@ class LongRunningCommand(object):
                 shell_id=self._shell_id,
                 command_id=self._command_id)
         except TimeoutError:
-            yield self.delete_and_close()
-            raise
+            # could be simple network problem, reconnect and try again
+            yield self._sender.close_connections()
+            try:
+                receive_elem = yield self._sender.send_request(
+                    'receive',
+                    shell_id=self._shell_id,
+                    command_id=self._command_id)
+            except TimeoutError:
+                yield self._sender.close_connections()
+            except Exception:
+                raise
         stdout_parts = _find_stream(receive_elem, self._command_id, 'stdout')
         stderr_parts = _find_stream(receive_elem, self._command_id, 'stderr')
         self._exit_code = _find_exit_code(receive_elem, self._command_id)
@@ -263,14 +371,6 @@ class LongRunningCommand(object):
         defer.returnValue((stdout, stderr))
 
     @defer.inlineCallbacks
-    def delete_and_close(self):
-        """Delete the remote shell and close connection"""
-        yield self._sender.send_request('delete', shell_id=self._shell_id)
-        self._shell_id = None
-        yield self._sender.close_connections()
-        defer.returnValue(None)
-
-    @defer.inlineCallbacks
     def stop(self):
         for _ in xrange(_MAX_RETRIES):
             try:
@@ -279,23 +379,22 @@ class LongRunningCommand(object):
                     shell_id=self._shell_id,
                     command_id=self._command_id,
                     signal_code=c.SHELL_SIGNAL_CTRL_C)
-            except RequestError as e:
-                # if we get a 500 error back, let's try again
-                if 'HTTP status: 500. An internal error occurred' in e.message:
-                    continue
-                else:
-                    yield self.delete_and_close()
-                    raise e
-            except Exception:
-                yield self.delete_and_close()
-                raise
-            else:
                 break
+            except TimeoutError:
+                # we may need to reset the connection and try again
+                yield self._sender.close_connections()
+            except Exception:
+                pass
... 190 lines suppressed ...

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/txwinrm.git



More information about the Python-modules-commits mailing list