[Python-modules-commits] [pyiosxr] 01/03: Import pyiosxr_0.41.orig.tar.gz

Vincent Bernat bernat at moszumanska.debian.org
Sun Dec 25 10:59:35 UTC 2016


This is an automated email from the git hooks/post-receive script.

bernat pushed a commit to branch master
in repository pyiosxr.

commit 1c59f6d3363ab1c467ddbc280e86cf4cea3c77aa
Author: Vincent Bernat <bernat at debian.org>
Date:   Sun Dec 25 11:57:06 2016 +0100

    Import pyiosxr_0.41.orig.tar.gz
---
 PKG-INFO                  |   8 +-
 pyIOSXR.egg-info/PKG-INFO |   8 +-
 pyIOSXR/__init__.py       |   2 +-
 pyIOSXR/exceptions.py     |  18 ++-
 pyIOSXR/iosxr.py          | 271 ++++++++++++++++++++++++++++++++--------------
 setup.py                  |   6 +-
 test/test.py              | 105 +++++++++---------
 7 files changed, 265 insertions(+), 153 deletions(-)

diff --git a/PKG-INFO b/PKG-INFO
index 54457de..a4bb64a 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,12 +1,12 @@
 Metadata-Version: 1.1
 Name: pyIOSXR
-Version: 0.21
+Version: 0.41
 Summary: Python API to interact with network devices running IOS-XR
 Home-page: https://github.com/fooelisa/pyiosxr/
-Author: Elisa Jasinska
-Author-email: elisa at bigwaveit.org
+Author: Elisa Jasinska, Mircea Ulinic
+Author-email: elisa at bigwaveit.org, mircea at cloudflare.com
 License: UNKNOWN
-Download-URL: https://github.com/fooelisa/pyiosxr/tarball/0.21
+Download-URL: https://github.com/fooelisa/pyiosxr/tarball/0.41
 Description: UNKNOWN
 Keywords: IOS-XR,IOSXR,Cisco,networking
 Platform: UNKNOWN
diff --git a/pyIOSXR.egg-info/PKG-INFO b/pyIOSXR.egg-info/PKG-INFO
index 54457de..a4bb64a 100644
--- a/pyIOSXR.egg-info/PKG-INFO
+++ b/pyIOSXR.egg-info/PKG-INFO
@@ -1,12 +1,12 @@
 Metadata-Version: 1.1
 Name: pyIOSXR
-Version: 0.21
+Version: 0.41
 Summary: Python API to interact with network devices running IOS-XR
 Home-page: https://github.com/fooelisa/pyiosxr/
-Author: Elisa Jasinska
-Author-email: elisa at bigwaveit.org
+Author: Elisa Jasinska, Mircea Ulinic
+Author-email: elisa at bigwaveit.org, mircea at cloudflare.com
 License: UNKNOWN
-Download-URL: https://github.com/fooelisa/pyiosxr/tarball/0.21
+Download-URL: https://github.com/fooelisa/pyiosxr/tarball/0.41
 Description: UNKNOWN
 Keywords: IOS-XR,IOSXR,Cisco,networking
 Platform: UNKNOWN
diff --git a/pyIOSXR/__init__.py b/pyIOSXR/__init__.py
index a4f7af2..0daeefb 100644
--- a/pyIOSXR/__init__.py
+++ b/pyIOSXR/__init__.py
@@ -17,4 +17,4 @@
 # License for the specific language governing permissions and limitations under
 # the License.
 
-from iosxr import IOSXR
+from pyIOSXR.iosxr import IOSXR
diff --git a/pyIOSXR/exceptions.py b/pyIOSXR/exceptions.py
index 2b83b56..629ca25 100644
--- a/pyIOSXR/exceptions.py
+++ b/pyIOSXR/exceptions.py
@@ -23,17 +23,22 @@ class IOSXRException(Exception):
 
     def __init__(self, msg=None, dev=None):
 
-        super(Exception, self).__init__(msg)
+        super(IOSXRException, self).__init__(msg)
         if dev:
             self._xr = dev
             # release the XML agent
-            self._xr._xml_agent_acquired = False
+            if self._xr._xml_agent_locker.locked():
+                self._xr._xml_agent_locker.release()
 
 
 class ConnectError(IOSXRException):
     """Exception while openning the connection."""
 
-    pass
+    def __init__(self, msg=None, dev=None):
+        super(ConnectError, self).__init__(msg=msg, dev=dev)
+        if dev:
+            self._xr = dev
+            self._xr._xml_agent_alive = False
 
 
 class CommitError(IOSXRException):
@@ -84,10 +89,15 @@ class InvalidXMLResponse(IOSXRException):
 
     pass
 
+
 class TimeoutError(IOSXRException):
     """TimeoutError Exception."""
 
-    pass
+    def __init__(self, msg=None, dev=None):
+        super(TimeoutError, self).__init__(msg=msg, dev=dev)
+        if dev:
+            self._xr = dev
+            self._xr._xml_agent_alive = False
 
 
 class EOFError(IOSXRException):
diff --git a/pyIOSXR/iosxr.py b/pyIOSXR/iosxr.py
index b55dfaa..2f43324 100644
--- a/pyIOSXR/iosxr.py
+++ b/pyIOSXR/iosxr.py
@@ -20,6 +20,8 @@
 import re
 import time
 import difflib
+from threading import Lock
+from xml.sax.saxutils import escape as escape_xml
 
 # third party lib
 from lxml import etree as ET
@@ -28,37 +30,28 @@ from netmiko.ssh_exception import NetMikoTimeoutException
 from netmiko.ssh_exception import NetMikoAuthenticationException
 
 # local modules
-from exceptions import LockError
-from exceptions import UnlockError
-from exceptions import XMLCLIError
-from exceptions import CommitError
-from exceptions import ConnectError
-from exceptions import TimeoutError
-from exceptions import IteratorIDError
-from exceptions import InvalidInputError
-from exceptions import CompareConfigError
-from exceptions import InvalidXMLResponse
+from pyIOSXR.exceptions import LockError
+from pyIOSXR.exceptions import UnlockError
+from pyIOSXR.exceptions import XMLCLIError
+from pyIOSXR.exceptions import CommitError
+from pyIOSXR.exceptions import ConnectError
+from pyIOSXR.exceptions import TimeoutError
+from pyIOSXR.exceptions import IteratorIDError
+from pyIOSXR.exceptions import InvalidInputError
+from pyIOSXR.exceptions import CompareConfigError
+from pyIOSXR.exceptions import InvalidXMLResponse
 
 
-# ~~~ all three functions below should be deprecated and completely removed in the next releases ~~~
-####################################################################################################
-# anyway they are supposed to be private module functions
-
-
-def __execute_rpc__(device, rpc_command, timeout):
-    return device._execute_rpc(rpc_command)
-
-
-def __execute_show__(device, show_command, timeout):
-    return device._execute_show(show_command)
-
-
-def __execute_config_show__(device, show_command, timeout):
-    return device._execute_config_show(show_command)
-####################################################################################################
+class IOSXR(object):
 
+    """
+    Establishes a connection with the IOS-XR device via SSH and facilitates the communication through the XML agent.
+    """
 
-class IOSXR(object):
+    _XML_SHELL = 'xml'
+    _XML_MODE_PROMPT = r'XML>'
+    _READ_DELAY = 0.1  # at least 0.1, corresponding to 600 max loops (60s timeout)
+    _XML_MODE_DELAY = 1  # should be able to read within one second
 
     _ITERATOR_ID_ERROR_MSG = (
         'Non supported IteratorID in Response object.'
@@ -68,10 +61,14 @@ class IOSXR(object):
         'Please turn iteration off for the XML agent.'
     )
 
-    """
-    Establishes a connection with the IOS-XR device via SSH and facilitates the communication through the XML agent.
-    """
-    def __init__(self, hostname, username, password, port=22, timeout=60, logfile=None, lock=True):
+    def __init__(self,
+                 hostname,
+                 username,
+                 password,
+                 port=22,
+                 timeout=60,
+                 logfile=None,
+                 lock=True):
         """
         IOS-XR device constructor.
 
@@ -93,7 +90,8 @@ class IOSXR(object):
         self.lock_on_connect = lock
         self.locked = False
         self._cli_prompt = None
-        self._xml_agent_acquired = False
+        self._xml_agent_locker = Lock()
+        self._xml_agent_alive = False
 
     def __getattr__(self, item):
         """
@@ -142,6 +140,11 @@ class IOSXR(object):
         :param rpc_command: (str) rpc command such as:
                                   <Get><Operational><LLDP><NodeTable></NodeTable></LLDP></Operational></Get>
         """
+        # ~~~ hack: ~~~
+        if not self.is_alive():
+            self.close()  # force close for safety
+            self.open()  # reopen
+        # ~~~ end hack ~~~
         result = self._execute_rpc(rpc_command)
         return ET.tostring(result)
 
@@ -158,65 +161,158 @@ class IOSXR(object):
                                          username=self.username,
                                          password=self.password)
             self.device.timeout = self.timeout
+            self._xml_agent_alive = True  # successfully open thus alive
         except NetMikoTimeoutException as t_err:
-            raise ConnectError(t_err.message)
+            raise ConnectError(t_err.args[0])
         except NetMikoAuthenticationException as au_err:
-            raise ConnectError(au_err.message)
-
-        self._cli_prompt = self.device.find_prompt()
+            raise ConnectError(au_err.args[0])
 
+        self._cli_prompt = self.device.find_prompt()  # get the prompt
         self._enter_xml_mode()
 
+    def is_alive(self):
+        """
+        Returns the XML agent connection state (and SSH connection state).
+        """
+        if hasattr(self.device, 'remote_conn'):
+            return self.device.remote_conn.transport.is_active() and self._xml_agent_alive
+        return False  # remote_conn not there => connection not init => not alive
+
+    def _timeout_exceeded(self, start=None, msg='Timeout exceeded!'):
+        if not start:
+            return False  # reference not specified, noth to compare => no error
+        if time.time() - start > self.timeout:
+            # it timeout exceeded, throw TimeoutError
+            raise TimeoutError(msg, self)
+        return False
+
+    def _lock_xml_agent(self, start=None):
+        while (not self._xml_agent_locker.acquire(False)
+            and not self._timeout_exceeded(start, 'Waiting to acquire the XML agent!')):
+                # will wait here till the XML agent is ready to receive new requests
+                # if stays too much, _timeout_exceeded will raise TimeoutError
+                pass  # do nothing, just wait
+        return True  # ready to go now
+
+    def _unlock_xml_agent(self):
+        if self._xml_agent_locker.locked():
+            self._xml_agent_locker.release()
+
+    def _send_command_timing(self, command):
+
+        return self.device.send_command_timing(command,
+                                               delay_factor=self._READ_DELAY,
+                                               max_loops=self._XML_MODE_DELAY/self._READ_DELAY,
+                                               strip_prompt=False,
+                                               strip_command=False)
+
+    def _in_cli_mode(self):
+
+        out = self._send_command_timing('\n')
+        if not out:
+            return False
+        if self._cli_prompt in out:
+            return True
+        return False
+
     def _enter_xml_mode(self):
 
-        try:
-            out = self._send_command('xml')  # enter in XML mode
-        except TimeoutError as terr:
-            raise ConnectError('Cannot connect to the XML agent. Enabled?', self)
+        self._unlock_xml_agent()
+        # release - other commands should not have anyway access to the XML agent
+        # when not in XML mode
+        self._lock_xml_agent()  # make sure it won't collide with other parallel requests
+
+        out = self._send_command_timing(self._XML_SHELL)  # send xml shell command
+
+        if '0x24319600' in out:
+            # XML agent is not enabled
+            raise ConnectError('XML agent is not enabled. Please configure `xml agent tty iteration off`!', self)
+
+        self._unlock_xml_agent()
 
         if self.lock_on_connect:
             self.lock()
 
-    def _timeout_exceeded(self, start, msg='Timeout exceeded!'):
-
-        if time.time() - start > self.timeout:
-            # it timeout exceeded, throw TimeoutError
-            raise TimeoutError(msg, self)
+    def _send_command(self,
+                      command,
+                      delay_factor=None,
+                      start=None,
+                      expect_string=None,
+                      read_output=None,
+                      receive=False):
 
-        return False
+        if not expect_string:
+            expect_string = self._XML_MODE_PROMPT
 
-    def _send_command(self, command, delay_factor=.1, receive=False, start=None, expect_string=r'XML>'):
+        if read_output is None:
+            read_output = ''
 
-        output = ''
+        if not delay_factor:
+            delay_factor = self._READ_DELAY
 
-        if not receive:
+        if not start:
             start = time.time()
+
+        output = read_output
+
+        last_read = ''
+
+        if not read_output and not receive:
             # because the XML agent is able to process only one single request over the same SSH session at a time
             # first come first served
-            while self._xml_agent_acquired and not self._timeout_exceeded(start, 'Waiting to acquire the XML agent!'):
-                # will wait here till the XML agent is ready to receive new requests
-                # if stays too much, _timeout_exceeded will raise TimeoutError
-                time.sleep(delay_factor)  # rest a bit
-            self._xml_agent_acquired = True
+            self._lock_xml_agent(start)
             try:
-                output = self.device.send_command_expect(command,
-                                                         expect_string=expect_string,
-                                                         strip_prompt=False,
-                                                         strip_command=False,
-                                                         delay_factor=delay_factor,
-                                                         max_loops=1500)
+                max_loops = self.timeout / delay_factor
+                last_read = self.device.send_command_expect(command,
+                                                            expect_string=expect_string,
+                                                            strip_prompt=False,
+                                                            strip_command=False,
+                                                            delay_factor=delay_factor,
+                                                            max_loops=max_loops)
+                output += last_read
             except IOError as ioe:
-                if self._timeout_exceeded(start):
-                    time.sleep(delay_factor)  # go sleep a bit, you still got time
-                    return self._send_command(command, receive=True, start=start)  # let's try receiving more
+                if ((not last_read and self._in_cli_mode()) or
+                    (self._cli_prompt in output and "% Invalid input detected at '^' marker." in output)):
+                    # something happened
+                    # e.g. connection with the XML agent died while reading
+                    # netmiko throws error and the last output read is empty (ofc)
+                    # and in CLI mode
+                    #
+                    # OR
+                    #
+                    # Sometimes the XML agent simply exits and all issued commands provide the following output
+                    # (as in CLI mode)
+                    # <?
+                    #       ^
+                    # % Invalid input detected at '^' marker.
+                    # RP/0/RSP1/CPU0:edge01.dus01#<xml version="1.0" encoding="UTF-8"?
+                    #                             ^
+                    # % Invalid input detected at '^' marker.
+                    # RP/0/RSP1/CPU0:edge01.dus01#<xml version
+                    #
+                    # Which of course does not contain the XML and netmiko throws the not found error
+                    # therefore we need to re-enter in XML mode
+                    self._enter_xml_mode()
+                    # and let's issue the command again if still got time
+                    if not self._timeout_exceeded(start=start):
+                        # if still got time
+                        # reiterate the command from the beginning
+                        return self._send_command(command,
+                                                  expect_string=expect_string,
+                                                  delay_factor=delay_factor)
         else:
-            output = self._netmiko_recv()  # try to read some more
+            output += self._netmiko_recv()  # try to read some more
 
-        if '0xa3679e00' in output:
-                # when multiple parallel request are made, the device throws the error:
+        if '0xa3679e00' in output or '0xa367da00' in output:
+                # when multiple parallel request are made, the device throws one of the the errors:
+                # ---
                 # ERROR: 0xa3679e00 'XML Service Library' detected the 'fatal' condition
                 # 'Multiple concurrent requests are not allowed over the same session.
                 # A request is already in progress on this session.'
+                #
+                # ERROR: 0xa367da00 XML Service Library' detected the 'fatal' condition
+                # 'Sending multiple documents is not supported.'
+                # ---
                 # we could use a mechanism similar to NETCONF and push the requests in queue and serve them sequentially
                 # BUT we are not able to assign unique IDs and identify the request-reply map
                 # so will throw an error that does not help too much :(
@@ -233,20 +329,21 @@ class IOSXR(object):
                 # In both cases, we need to re-enter in XML mode...
                 # so, whenever the CLI promt is detected, will re-enter in XML mode
                 # unless the expected string is the prompt
-                self._xml_agent_acquired = False  # release the channel
+                self._unlock_xml_agent()
                 self._enter_xml_mode()
                 # however, the command could not be executed properly, so we need to raise the XMLCLIError exception
                 raise XMLCLIError('Could not properly execute the command. Re-entering XML mode...', self)
             if not output.strip():  # empty output, means that the device did not start delivering the output
-                if not self._timeout_exceeded(start):
-                    time.sleep(delay_factor)  # go sleep a bit, you still got time
+                # but for sure is still in XML mode as netmiko did not throw error
+                if not self._timeout_exceeded(start=start):
                     return self._send_command(command, receive=True, start=start)  # let's try receiving more
+
             raise XMLCLIError(output.strip(), self)
 
-        self._xml_agent_acquired = False  # release the XML agent
+        self._unlock_xml_agent()
         return str(output.replace('XML>', '').strip())
 
-    def _netmiko_recv(self, max_loops=1500):
+    def _netmiko_recv(self):
 
         output = ''
 
@@ -264,7 +361,7 @@ class IOSXR(object):
         response = self._send_command(xml_rpc_command, delay_factor=delay_factor)
 
         try:
-            root = ET.fromstring(response)
+            root = ET.fromstring(str.encode(response))
         except ET.XMLSyntaxError as xml_err:
             if 'IteratorID="' in response:
                 raise IteratorIDError(self._ITERATOR_ID_ERROR_MSG, self)
@@ -294,6 +391,7 @@ class IOSXR(object):
                     # or since the last commit was made from this session.'
                     # dumb.
                     # in this case we need to re-open the connection with the XML agent
+                    _candidate_config = self.get_candidate_config(merge=True)
                     self.discard_config()  # discard candidate config
                     try:
                         # exiting from the XML mode
@@ -301,13 +399,11 @@ class IOSXR(object):
                     except XMLCLIError:
                         pass  # because does not end with `XML>`
                     self._enter_xml_mode()  # re-entering XML mode
-                    raise CommitError(
-                        error_msg + '\nPlease reload the changes and try committing again!',
-                        self
-                    )
+                    self.load_candidate_config(config=_candidate_config)
+                    return self.commit_config()
                 elif error_code == '0x41864e00' or error_code == '0x43682c00':
                     # raises this error when the commit buffer is empty
-                    raise CommitError('The target configuration buffer is empty.')
+                    raise CommitError('The target configuration buffer is empty.', self)
 
             else:
                 error_msg = root.get('ErrorMsg') or ''
@@ -333,7 +429,9 @@ class IOSXR(object):
         """
         Executes an operational show-type command.
         """
-        rpc_command = '<CLI><Exec>'+show_command+'</Exec></CLI>'
+        rpc_command = '<CLI><Exec>{show_command}</Exec></CLI>'.format(
+            show_command=escape_xml(show_command)
+        )
         response = self._execute_rpc(rpc_command)
         raw_response = response.xpath('.//CLI/Exec')[0].text
         return raw_response.strip() if raw_response else ''
@@ -343,7 +441,9 @@ class IOSXR(object):
         """
         Executes a configuration show-type command.
         """
-        rpc_command = '<CLI><Configuration>'+show_command+'</Configuration></CLI>'
+        rpc_command = '<CLI><Configuration>{show_command}</Configuration></CLI>'.format(
+            show_command=escape_xml(show_command)
+        )
         response = self._execute_rpc(rpc_command, delay_factor=delay_factor)
         raw_response = response.xpath('.//CLI/Configuration')[0].text
         return raw_response.strip() if raw_response else ''
@@ -355,9 +455,10 @@ class IOSXR(object):
         Clean up after you are done and explicitly close the router connection.
         """
         if self.lock_on_connect or self.locked:
-            self.unlock()
-        self._xml_agent_acquired = False
-        self.device.remote_conn.close()
+            self.unlock()  # this refers to the config DB
+        self._unlock_xml_agent()  # this refers to the XML agent
+        if hasattr(self.device, 'remote_conn'):
+            self.device.remote_conn.close()  # close the underlying SSH session
 
     def lock(self):
         """
@@ -408,13 +509,15 @@ class IOSXR(object):
             with open(filename) as f:
                 configuration = f.read()
 
-        rpc_command = '<CLI><Configuration>'+configuration+'</Configuration></CLI>'
+        rpc_command = '<CLI><Configuration>{configuration}</Configuration></CLI>'.format(
+            configuration=escape_xml(configuration)  # need to escape, otherwise will try to load invalid XML
+        )
 
         try:
             self._execute_rpc(rpc_command)
         except InvalidInputError as e:
             self.discard_config()
-            raise InvalidInputError(e.message, self)
+            raise InvalidInputError(e.args[0], self)
 
     def get_candidate_config(self, merge=False, formal=False):
         """
diff --git a/setup.py b/setup.py
index 6ad489d..5ae34ad 100644
--- a/setup.py
+++ b/setup.py
@@ -28,7 +28,7 @@ install_reqs = parse_requirements('requirements.txt', session=uuid.uuid1())
 # e.g. ['django==1.5.1', 'mezzanine==1.4.6']
 reqs = [str(ir.req) for ir in install_reqs]
 
-version = '0.21'
+version = '0.41'
 
 setup(
     name='pyIOSXR',
@@ -38,8 +38,8 @@ setup(
     install_requires=reqs,
     include_package_data=True,
     description='Python API to interact with network devices running IOS-XR',
-    author='Elisa Jasinska',
-    author_email='elisa at bigwaveit.org',
+    author='Elisa Jasinska, Mircea Ulinic',
+    author_email='elisa at bigwaveit.org, mircea at cloudflare.com',
     url='https://github.com/fooelisa/pyiosxr/',
     download_url='https://github.com/fooelisa/pyiosxr/tarball/%s' % version,
     keywords=['IOS-XR', 'IOSXR', 'Cisco', 'networking'],
diff --git a/test/test.py b/test/test.py
index 7ef6b99..3227f1e 100755
--- a/test/test.py
+++ b/test/test.py
@@ -4,15 +4,14 @@
 
 import os
 import sys
+import time
 import unittest
 from lxml import etree as ET
+from six import binary_type
 
 # ~~~ import pyIOSXR modules ~~~
 from pyIOSXR import IOSXR
-# private functions
-from pyIOSXR.iosxr import __execute_rpc__
-from pyIOSXR.iosxr import __execute_show__
-from pyIOSXR.iosxr import __execute_config_show__
+
 # exceptions
 from pyIOSXR.exceptions import LockError
 from pyIOSXR.exceptions import UnlockError
@@ -53,6 +52,7 @@ class _MockedNetMikoDevice(object):
                    .replace('"', '_')\
                    .replace('=', '_')\
                    .replace('$', '')\
+                   .replace(':', '')\
                    .replace('!', '')[:150]
         curr_dir = os.path.dirname(os.path.abspath(__file__))
         filename = '{filename}.{fmt}'.format(
@@ -60,7 +60,8 @@ class _MockedNetMikoDevice(object):
             fmt=format
         )
         fullpath = os.path.join(curr_dir, 'mock', filename)
-        return open(fullpath).read()
+        with open(fullpath) as file_data:
+            return file_data.read()
 
     def find_prompt(self):
         return self.get_mock_file('\n', format='txt')
@@ -73,6 +74,9 @@ class _MockedNetMikoDevice(object):
                      strip_command=True):
         return self.get_mock_file(command_string)
 
+    def send_command_timing(self, command_string, **kvargs):
+        return self.get_mock_file(command_string)
+
     def receive_data_generator(self):
         return ['', '']  # to have an iteration inside private method _netmiko_recv
 
@@ -100,6 +104,9 @@ class _MockedIOSXRDevice(IOSXR):
         self._cli_prompt = self.device.find_prompt()
         self._enter_xml_mode()
 
+    def is_alive(self):
+        return True
+
 
 class TestIOSXRDevice(unittest.TestCase):
 
@@ -111,7 +118,7 @@ class TestIOSXRDevice(unittest.TestCase):
     USERNAME = 'vagrant'
     PASSWORD = 'vagrant'
     PORT = 12205
-    TIMEOUT = 1  # for tests, smaller values are prefferred
+    TIMEOUT = .1  # for tests, smaller values are prefferred
     LOCK = False
     LOG = sys.stdout
     MOCK = True
@@ -167,6 +174,7 @@ class TestIOSXRDevice(unittest.TestCase):
                 LockError,
                 self.device.lock
             )
+            self.device.lock_on_connect = False
             # enough to see that will try to lock during connect
 
     def test_mock_close(self):
@@ -208,6 +216,35 @@ class TestIOSXRDevice(unittest.TestCase):
             str
         )
 
+    def test_acquire_xml_agent(self):
+
+        """Testing if able to acquire the XML agent."""
+
+        self.device._lock_xml_agent(time.time())
+        self.assertTrue(self.device._xml_agent_locker.locked())
+        self.device._unlock_xml_agent()
+
+    def test_acquire_locked_agent_raises_timeout_error(self):
+        """Testing if trying to acquire the XML agent while locked raises TimeoutError."""
+        self.device._lock_xml_agent(time.time())  # acquiring
+        self.assertRaises(
+            TimeoutError,
+            self.device._lock_xml_agent,  # trying to acquire again
+            time.time()
+        )
+        self.device._unlock_xml_agent()  # releasing back
+
+    def test_release_xml_agent(self):
+        """Testing releasing of XML agent."""
+        self.device._lock_xml_agent(time.time())
+        self.assertTrue(self.device._xml_agent_locker.locked())
+        self.device._unlock_xml_agent()
+        self.assertFalse(self.device._xml_agent_locker.locked())
+
+    def test_in_cli_mode(self):
+        """Testing the private method _in_cli_mode."""
+        self.assertTrue(self.device._in_cli_mode())
+
     def test__getattr_show_config(self):
 
         """Testing special attribute __getattr___ against valid show config command"""
@@ -230,47 +267,20 @@ class TestIOSXRDevice(unittest.TestCase):
 
         self.assertTrue(raised)
 
-    def test__execute_rpc__(self):
-
-        """Testing private module function __execute_rpc___"""
-
-        self.assertIsInstance(
-            __execute_rpc__(self.device, '<Get><Configuration><NTP></NTP></Configuration></Get>', self.device.timeout),
-            ET._Element
-        )
-
-    def test__execute_show__(self):
-
-        """Testing private module function __execute_show__"""
-
-        self.assertIsInstance(
-            __execute_show__(self.device, 'show ntp ass', self.device.timeout),
-            str
-        )
-
-    def test__execute_config_show__(self):
-
-        """Testing private module function __execute_config_show__"""
-
-        self.assertIsInstance(
-            __execute_config_show__(self.device, 'show run ntp', self.device.timeout),
-            str
-        )
-
     def test_make_rpc_call_returns_XML(self):
 
         """Test if public method make_rpc_call returns str"""
 
         self.assertIsInstance(
             self.device.make_rpc_call('<Get><Configuration><NTP></NTP></Configuration></Get>'),
-            str
+            binary_type
         )
 
     def test_acquired_xml_agent(self):
 
         """Testing if raises TimeoutError if the XML agent is alredy acquired and released when exception thrown"""
 
-        self.device._xml_agent_acquired = True  # acquiring the XML agent
+        self.device._lock_xml_agent(time.time())  # acquiring the XML agent
 
         self.assertRaises(
             TimeoutError,
@@ -278,7 +288,7 @@ class TestIOSXRDevice(unittest.TestCase):
             '<Get><Operational><SystemTime/><PlatformInventory/></Operational></Get>'
         )
 
-        self.assertFalse(self.device._xml_agent_acquired)  # Exception raised => xml agent released
+        self.assertFalse(self.device._xml_agent_locker.locked())  # Exception raised => xml agent released
 
     def test_try_to_read_till_timeout(self):
 
@@ -328,14 +338,11 @@ class TestIOSXRDevice(unittest.TestCase):
 
     def test_channel_acquired_enter_xml_mode(self):
 
-        """Test if raises ConnectError when the channel is busy with other requests"""
+        """Test if not raises ConnectError when the channel is busy with other requests"""
 
-        self.device._xml_agent_acquired = True
+        self.device._lock_xml_agent()
 
-        self.assertRaises(
-            ConnectError,
-            self.device._enter_xml_mode
-        )
+        self.assertIsNone(self.device._enter_xml_mode())
 
     def test_truncated_response_raises_InvalidXMLResponse(self):
 
@@ -643,15 +650,11 @@ class TestIOSXRDevice(unittest.TestCase):
 
     def test_commit_after_other_session_commit(self):
 
-        """Testing if trying to commit after another process commited raises CommitError"""
+        """Testing if trying to commit after another process commited does not raise CommitError"""
 
         if self.MOCK:
             # mock data contains the error message we are looking for
-            self.assertRaises(
-                CommitError,
-                self.device.commit_config,
-                comment="parallel"
-            )
+            self.assertIsNone(self.device.commit_config(comment="parallel"))
         else:
             # to test this will neet to apply changes to the same device
             # through a different SSH session
@@ -673,11 +676,7 @@ class TestIOSXRDevice(unittest.TestCase):
             # trying to load something from the test instance
             self.device.load_candidate_config(config='interface MgmtEth0/RP0/CPU0/0 description this wont work')
             # and will fail because of the commit above
-            self.assertRaises(
-                CommitError,
-                self.device.commit_config,
-                comment="parallel"
-            )
+            self.assertIsNone(self.device.commit_config(comment="parallel"))
 
             # let's rollback the committed changes
             same_device.rollback()

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/pyiosxr.git



More information about the Python-modules-commits mailing list