[Python-modules-commits] [pyiosxr] 01/03: Import pyiosxr_0.41.orig.tar.gz
Vincent Bernat
bernat at moszumanska.debian.org
Sun Dec 25 10:59:35 UTC 2016
This is an automated email from the git hooks/post-receive script.
bernat pushed a commit to branch master
in repository pyiosxr.
commit 1c59f6d3363ab1c467ddbc280e86cf4cea3c77aa
Author: Vincent Bernat <bernat at debian.org>
Date: Sun Dec 25 11:57:06 2016 +0100
Import pyiosxr_0.41.orig.tar.gz
---
PKG-INFO | 8 +-
pyIOSXR.egg-info/PKG-INFO | 8 +-
pyIOSXR/__init__.py | 2 +-
pyIOSXR/exceptions.py | 18 ++-
pyIOSXR/iosxr.py | 271 ++++++++++++++++++++++++++++++++--------------
setup.py | 6 +-
test/test.py | 105 +++++++++---------
7 files changed, 265 insertions(+), 153 deletions(-)
diff --git a/PKG-INFO b/PKG-INFO
index 54457de..a4bb64a 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,12 +1,12 @@
Metadata-Version: 1.1
Name: pyIOSXR
-Version: 0.21
+Version: 0.41
Summary: Python API to interact with network devices running IOS-XR
Home-page: https://github.com/fooelisa/pyiosxr/
-Author: Elisa Jasinska
-Author-email: elisa at bigwaveit.org
+Author: Elisa Jasinska, Mircea Ulinic
+Author-email: elisa at bigwaveit.org, mircea at cloudflare.com
License: UNKNOWN
-Download-URL: https://github.com/fooelisa/pyiosxr/tarball/0.21
+Download-URL: https://github.com/fooelisa/pyiosxr/tarball/0.41
Description: UNKNOWN
Keywords: IOS-XR,IOSXR,Cisco,networking
Platform: UNKNOWN
diff --git a/pyIOSXR.egg-info/PKG-INFO b/pyIOSXR.egg-info/PKG-INFO
index 54457de..a4bb64a 100644
--- a/pyIOSXR.egg-info/PKG-INFO
+++ b/pyIOSXR.egg-info/PKG-INFO
@@ -1,12 +1,12 @@
Metadata-Version: 1.1
Name: pyIOSXR
-Version: 0.21
+Version: 0.41
Summary: Python API to interact with network devices running IOS-XR
Home-page: https://github.com/fooelisa/pyiosxr/
-Author: Elisa Jasinska
-Author-email: elisa at bigwaveit.org
+Author: Elisa Jasinska, Mircea Ulinic
+Author-email: elisa at bigwaveit.org, mircea at cloudflare.com
License: UNKNOWN
-Download-URL: https://github.com/fooelisa/pyiosxr/tarball/0.21
+Download-URL: https://github.com/fooelisa/pyiosxr/tarball/0.41
Description: UNKNOWN
Keywords: IOS-XR,IOSXR,Cisco,networking
Platform: UNKNOWN
diff --git a/pyIOSXR/__init__.py b/pyIOSXR/__init__.py
index a4f7af2..0daeefb 100644
--- a/pyIOSXR/__init__.py
+++ b/pyIOSXR/__init__.py
@@ -17,4 +17,4 @@
# License for the specific language governing permissions and limitations under
# the License.
-from iosxr import IOSXR
+from pyIOSXR.iosxr import IOSXR
diff --git a/pyIOSXR/exceptions.py b/pyIOSXR/exceptions.py
index 2b83b56..629ca25 100644
--- a/pyIOSXR/exceptions.py
+++ b/pyIOSXR/exceptions.py
@@ -23,17 +23,22 @@ class IOSXRException(Exception):
def __init__(self, msg=None, dev=None):
- super(Exception, self).__init__(msg)
+ super(IOSXRException, self).__init__(msg)
if dev:
self._xr = dev
# release the XML agent
- self._xr._xml_agent_acquired = False
+ if self._xr._xml_agent_locker.locked():
+ self._xr._xml_agent_locker.release()
class ConnectError(IOSXRException):
"""Exception while openning the connection."""
- pass
+ def __init__(self, msg=None, dev=None):
+ super(ConnectError, self).__init__(msg=msg, dev=dev)
+ if dev:
+ self._xr = dev
+ self._xr._xml_agent_alive = False
class CommitError(IOSXRException):
@@ -84,10 +89,15 @@ class InvalidXMLResponse(IOSXRException):
pass
+
class TimeoutError(IOSXRException):
"""TimeoutError Exception."""
- pass
+ def __init__(self, msg=None, dev=None):
+ super(TimeoutError, self).__init__(msg=msg, dev=dev)
+ if dev:
+ self._xr = dev
+ self._xr._xml_agent_alive = False
class EOFError(IOSXRException):
diff --git a/pyIOSXR/iosxr.py b/pyIOSXR/iosxr.py
index b55dfaa..2f43324 100644
--- a/pyIOSXR/iosxr.py
+++ b/pyIOSXR/iosxr.py
@@ -20,6 +20,8 @@
import re
import time
import difflib
+from threading import Lock
+from xml.sax.saxutils import escape as escape_xml
# third party lib
from lxml import etree as ET
@@ -28,37 +30,28 @@ from netmiko.ssh_exception import NetMikoTimeoutException
from netmiko.ssh_exception import NetMikoAuthenticationException
# local modules
-from exceptions import LockError
-from exceptions import UnlockError
-from exceptions import XMLCLIError
-from exceptions import CommitError
-from exceptions import ConnectError
-from exceptions import TimeoutError
-from exceptions import IteratorIDError
-from exceptions import InvalidInputError
-from exceptions import CompareConfigError
-from exceptions import InvalidXMLResponse
+from pyIOSXR.exceptions import LockError
+from pyIOSXR.exceptions import UnlockError
+from pyIOSXR.exceptions import XMLCLIError
+from pyIOSXR.exceptions import CommitError
+from pyIOSXR.exceptions import ConnectError
+from pyIOSXR.exceptions import TimeoutError
+from pyIOSXR.exceptions import IteratorIDError
+from pyIOSXR.exceptions import InvalidInputError
+from pyIOSXR.exceptions import CompareConfigError
+from pyIOSXR.exceptions import InvalidXMLResponse
-# ~~~ all three functions below should be deprecated and completely removed in the next releases ~~~
-####################################################################################################
-# anyway they are supposed to be private module functions
-
-
-def __execute_rpc__(device, rpc_command, timeout):
- return device._execute_rpc(rpc_command)
-
-
-def __execute_show__(device, show_command, timeout):
- return device._execute_show(show_command)
-
-
-def __execute_config_show__(device, show_command, timeout):
- return device._execute_config_show(show_command)
-####################################################################################################
+class IOSXR(object):
+ """
+ Establishes a connection with the IOS-XR device via SSH and facilitates the communication through the XML agent.
+ """
-class IOSXR(object):
+ _XML_SHELL = 'xml'
+ _XML_MODE_PROMPT = r'XML>'
+ _READ_DELAY = 0.1 # at least 0.1, corresponding to 600 max loops (60s timeout)
+ _XML_MODE_DELAY = 1 # should be able to read within one second
_ITERATOR_ID_ERROR_MSG = (
'Non supported IteratorID in Response object.'
@@ -68,10 +61,14 @@ class IOSXR(object):
'Please turn iteration off for the XML agent.'
)
- """
- Establishes a connection with the IOS-XR device via SSH and facilitates the communication through the XML agent.
- """
- def __init__(self, hostname, username, password, port=22, timeout=60, logfile=None, lock=True):
+ def __init__(self,
+ hostname,
+ username,
+ password,
+ port=22,
+ timeout=60,
+ logfile=None,
+ lock=True):
"""
IOS-XR device constructor.
@@ -93,7 +90,8 @@ class IOSXR(object):
self.lock_on_connect = lock
self.locked = False
self._cli_prompt = None
- self._xml_agent_acquired = False
+ self._xml_agent_locker = Lock()
+ self._xml_agent_alive = False
def __getattr__(self, item):
"""
@@ -142,6 +140,11 @@ class IOSXR(object):
:param rpc_command: (str) rpc command such as:
<Get><Operational><LLDP><NodeTable></NodeTable></LLDP></Operational></Get>
"""
+ # ~~~ hack: ~~~
+ if not self.is_alive():
+ self.close() # force close for safety
+ self.open() # reopen
+ # ~~~ end hack ~~~
result = self._execute_rpc(rpc_command)
return ET.tostring(result)
@@ -158,65 +161,158 @@ class IOSXR(object):
username=self.username,
password=self.password)
self.device.timeout = self.timeout
+ self._xml_agent_alive = True # successfully open thus alive
except NetMikoTimeoutException as t_err:
- raise ConnectError(t_err.message)
+ raise ConnectError(t_err.args[0])
except NetMikoAuthenticationException as au_err:
- raise ConnectError(au_err.message)
-
- self._cli_prompt = self.device.find_prompt()
+ raise ConnectError(au_err.args[0])
+ self._cli_prompt = self.device.find_prompt() # get the prompt
self._enter_xml_mode()
+ def is_alive(self):
+ """
+ Returns the XML agent connection state (and SSH connection state).
+ """
+ if hasattr(self.device, 'remote_conn'):
+ return self.device.remote_conn.transport.is_active() and self._xml_agent_alive
+ return False # remote_conn not there => connection not init => not alive
+
+ def _timeout_exceeded(self, start=None, msg='Timeout exceeded!'):
+ if not start:
+ return False # reference not specified, noth to compare => no error
+ if time.time() - start > self.timeout:
+ # it timeout exceeded, throw TimeoutError
+ raise TimeoutError(msg, self)
+ return False
+
+ def _lock_xml_agent(self, start=None):
+ while (not self._xml_agent_locker.acquire(False)
+ and not self._timeout_exceeded(start, 'Waiting to acquire the XML agent!')):
+ # will wait here till the XML agent is ready to receive new requests
+ # if stays too much, _timeout_exceeded will raise TimeoutError
+ pass # do nothing, just wait
+ return True # ready to go now
+
+ def _unlock_xml_agent(self):
+ if self._xml_agent_locker.locked():
+ self._xml_agent_locker.release()
+
+ def _send_command_timing(self, command):
+
+ return self.device.send_command_timing(command,
+ delay_factor=self._READ_DELAY,
+ max_loops=self._XML_MODE_DELAY/self._READ_DELAY,
+ strip_prompt=False,
+ strip_command=False)
+
+ def _in_cli_mode(self):
+
+ out = self._send_command_timing('\n')
+ if not out:
+ return False
+ if self._cli_prompt in out:
+ return True
+ return False
+
def _enter_xml_mode(self):
- try:
- out = self._send_command('xml') # enter in XML mode
- except TimeoutError as terr:
- raise ConnectError('Cannot connect to the XML agent. Enabled?', self)
+ self._unlock_xml_agent()
+ # release - other commands should not have anyway access to the XML agent
+ # when not in XML mode
+ self._lock_xml_agent() # make sure it won't collide with other parallel requests
+
+ out = self._send_command_timing(self._XML_SHELL) # send xml shell command
+
+ if '0x24319600' in out:
+ # XML agent is not enabled
+ raise ConnectError('XML agent is not enabled. Please configure `xml agent tty iteration off`!', self)
+
+ self._unlock_xml_agent()
if self.lock_on_connect:
self.lock()
- def _timeout_exceeded(self, start, msg='Timeout exceeded!'):
-
- if time.time() - start > self.timeout:
- # it timeout exceeded, throw TimeoutError
- raise TimeoutError(msg, self)
+ def _send_command(self,
+ command,
+ delay_factor=None,
+ start=None,
+ expect_string=None,
+ read_output=None,
+ receive=False):
- return False
+ if not expect_string:
+ expect_string = self._XML_MODE_PROMPT
- def _send_command(self, command, delay_factor=.1, receive=False, start=None, expect_string=r'XML>'):
+ if read_output is None:
+ read_output = ''
- output = ''
+ if not delay_factor:
+ delay_factor = self._READ_DELAY
- if not receive:
+ if not start:
start = time.time()
+
+ output = read_output
+
+ last_read = ''
+
+ if not read_output and not receive:
# because the XML agent is able to process only one single request over the same SSH session at a time
# first come first served
- while self._xml_agent_acquired and not self._timeout_exceeded(start, 'Waiting to acquire the XML agent!'):
- # will wait here till the XML agent is ready to receive new requests
- # if stays too much, _timeout_exceeded will raise TimeoutError
- time.sleep(delay_factor) # rest a bit
- self._xml_agent_acquired = True
+ self._lock_xml_agent(start)
try:
- output = self.device.send_command_expect(command,
- expect_string=expect_string,
- strip_prompt=False,
- strip_command=False,
- delay_factor=delay_factor,
- max_loops=1500)
+ max_loops = self.timeout / delay_factor
+ last_read = self.device.send_command_expect(command,
+ expect_string=expect_string,
+ strip_prompt=False,
+ strip_command=False,
+ delay_factor=delay_factor,
+ max_loops=max_loops)
+ output += last_read
except IOError as ioe:
- if self._timeout_exceeded(start):
- time.sleep(delay_factor) # go sleep a bit, you still got time
- return self._send_command(command, receive=True, start=start) # let's try receiving more
+ if ((not last_read and self._in_cli_mode()) or
+ (self._cli_prompt in output and "% Invalid input detected at '^' marker." in output)):
+ # something happened
+ # e.g. connection with the XML agent died while reading
+ # netmiko throws error and the last output read is empty (ofc)
+ # and in CLI mode
+ #
+ # OR
+ #
+ # Sometimes the XML agent simply exits and all issued commands provide the following output
+ # (as in CLI mode)
+ # <?
+ # ^
+ # % Invalid input detected at '^' marker.
+ # RP/0/RSP1/CPU0:edge01.dus01#<xml version="1.0" encoding="UTF-8"?
+ # ^
+ # % Invalid input detected at '^' marker.
+ # RP/0/RSP1/CPU0:edge01.dus01#<xml version
+ #
+ # Which of course does not contain the XML and netmiko throws the not found error
+ # therefore we need to re-enter in XML mode
+ self._enter_xml_mode()
+ # and let's issue the command again if still got time
+ if not self._timeout_exceeded(start=start):
+ # if still got time
+ # reiterate the command from the beginning
+ return self._send_command(command,
+ expect_string=expect_string,
+ delay_factor=delay_factor)
else:
- output = self._netmiko_recv() # try to read some more
+ output += self._netmiko_recv() # try to read some more
- if '0xa3679e00' in output:
- # when multiple parallel request are made, the device throws the error:
+ if '0xa3679e00' in output or '0xa367da00' in output:
+ # when multiple parallel request are made, the device throws one of the the errors:
+ # ---
# ERROR: 0xa3679e00 'XML Service Library' detected the 'fatal' condition
# 'Multiple concurrent requests are not allowed over the same session.
# A request is already in progress on this session.'
+ #
+ # ERROR: 0xa367da00 XML Service Library' detected the 'fatal' condition
+ # 'Sending multiple documents is not supported.'
+ # ---
# we could use a mechanism similar to NETCONF and push the requests in queue and serve them sequentially
# BUT we are not able to assign unique IDs and identify the request-reply map
# so will throw an error that does not help too much :(
@@ -233,20 +329,21 @@ class IOSXR(object):
# In both cases, we need to re-enter in XML mode...
# so, whenever the CLI promt is detected, will re-enter in XML mode
# unless the expected string is the prompt
- self._xml_agent_acquired = False # release the channel
+ self._unlock_xml_agent()
self._enter_xml_mode()
# however, the command could not be executed properly, so we need to raise the XMLCLIError exception
raise XMLCLIError('Could not properly execute the command. Re-entering XML mode...', self)
if not output.strip(): # empty output, means that the device did not start delivering the output
- if not self._timeout_exceeded(start):
- time.sleep(delay_factor) # go sleep a bit, you still got time
+ # but for sure is still in XML mode as netmiko did not throw error
+ if not self._timeout_exceeded(start=start):
return self._send_command(command, receive=True, start=start) # let's try receiving more
+
raise XMLCLIError(output.strip(), self)
- self._xml_agent_acquired = False # release the XML agent
+ self._unlock_xml_agent()
return str(output.replace('XML>', '').strip())
- def _netmiko_recv(self, max_loops=1500):
+ def _netmiko_recv(self):
output = ''
@@ -264,7 +361,7 @@ class IOSXR(object):
response = self._send_command(xml_rpc_command, delay_factor=delay_factor)
try:
- root = ET.fromstring(response)
+ root = ET.fromstring(str.encode(response))
except ET.XMLSyntaxError as xml_err:
if 'IteratorID="' in response:
raise IteratorIDError(self._ITERATOR_ID_ERROR_MSG, self)
@@ -294,6 +391,7 @@ class IOSXR(object):
# or since the last commit was made from this session.'
# dumb.
# in this case we need to re-open the connection with the XML agent
+ _candidate_config = self.get_candidate_config(merge=True)
self.discard_config() # discard candidate config
try:
# exiting from the XML mode
@@ -301,13 +399,11 @@ class IOSXR(object):
except XMLCLIError:
pass # because does not end with `XML>`
self._enter_xml_mode() # re-entering XML mode
- raise CommitError(
- error_msg + '\nPlease reload the changes and try committing again!',
- self
- )
+ self.load_candidate_config(config=_candidate_config)
+ return self.commit_config()
elif error_code == '0x41864e00' or error_code == '0x43682c00':
# raises this error when the commit buffer is empty
- raise CommitError('The target configuration buffer is empty.')
+ raise CommitError('The target configuration buffer is empty.', self)
else:
error_msg = root.get('ErrorMsg') or ''
@@ -333,7 +429,9 @@ class IOSXR(object):
"""
Executes an operational show-type command.
"""
- rpc_command = '<CLI><Exec>'+show_command+'</Exec></CLI>'
+ rpc_command = '<CLI><Exec>{show_command}</Exec></CLI>'.format(
+ show_command=escape_xml(show_command)
+ )
response = self._execute_rpc(rpc_command)
raw_response = response.xpath('.//CLI/Exec')[0].text
return raw_response.strip() if raw_response else ''
@@ -343,7 +441,9 @@ class IOSXR(object):
"""
Executes a configuration show-type command.
"""
- rpc_command = '<CLI><Configuration>'+show_command+'</Configuration></CLI>'
+ rpc_command = '<CLI><Configuration>{show_command}</Configuration></CLI>'.format(
+ show_command=escape_xml(show_command)
+ )
response = self._execute_rpc(rpc_command, delay_factor=delay_factor)
raw_response = response.xpath('.//CLI/Configuration')[0].text
return raw_response.strip() if raw_response else ''
@@ -355,9 +455,10 @@ class IOSXR(object):
Clean up after you are done and explicitly close the router connection.
"""
if self.lock_on_connect or self.locked:
- self.unlock()
- self._xml_agent_acquired = False
- self.device.remote_conn.close()
+ self.unlock() # this refers to the config DB
+ self._unlock_xml_agent() # this refers to the XML agent
+ if hasattr(self.device, 'remote_conn'):
+ self.device.remote_conn.close() # close the underlying SSH session
def lock(self):
"""
@@ -408,13 +509,15 @@ class IOSXR(object):
with open(filename) as f:
configuration = f.read()
- rpc_command = '<CLI><Configuration>'+configuration+'</Configuration></CLI>'
+ rpc_command = '<CLI><Configuration>{configuration}</Configuration></CLI>'.format(
+ configuration=escape_xml(configuration) # need to escape, otherwise will try to load invalid XML
+ )
try:
self._execute_rpc(rpc_command)
except InvalidInputError as e:
self.discard_config()
- raise InvalidInputError(e.message, self)
+ raise InvalidInputError(e.args[0], self)
def get_candidate_config(self, merge=False, formal=False):
"""
diff --git a/setup.py b/setup.py
index 6ad489d..5ae34ad 100644
--- a/setup.py
+++ b/setup.py
@@ -28,7 +28,7 @@ install_reqs = parse_requirements('requirements.txt', session=uuid.uuid1())
# e.g. ['django==1.5.1', 'mezzanine==1.4.6']
reqs = [str(ir.req) for ir in install_reqs]
-version = '0.21'
+version = '0.41'
setup(
name='pyIOSXR',
@@ -38,8 +38,8 @@ setup(
install_requires=reqs,
include_package_data=True,
description='Python API to interact with network devices running IOS-XR',
- author='Elisa Jasinska',
- author_email='elisa at bigwaveit.org',
+ author='Elisa Jasinska, Mircea Ulinic',
+ author_email='elisa at bigwaveit.org, mircea at cloudflare.com',
url='https://github.com/fooelisa/pyiosxr/',
download_url='https://github.com/fooelisa/pyiosxr/tarball/%s' % version,
keywords=['IOS-XR', 'IOSXR', 'Cisco', 'networking'],
diff --git a/test/test.py b/test/test.py
index 7ef6b99..3227f1e 100755
--- a/test/test.py
+++ b/test/test.py
@@ -4,15 +4,14 @@
import os
import sys
+import time
import unittest
from lxml import etree as ET
+from six import binary_type
# ~~~ import pyIOSXR modules ~~~
from pyIOSXR import IOSXR
-# private functions
-from pyIOSXR.iosxr import __execute_rpc__
-from pyIOSXR.iosxr import __execute_show__
-from pyIOSXR.iosxr import __execute_config_show__
+
# exceptions
from pyIOSXR.exceptions import LockError
from pyIOSXR.exceptions import UnlockError
@@ -53,6 +52,7 @@ class _MockedNetMikoDevice(object):
.replace('"', '_')\
.replace('=', '_')\
.replace('$', '')\
+ .replace(':', '')\
.replace('!', '')[:150]
curr_dir = os.path.dirname(os.path.abspath(__file__))
filename = '{filename}.{fmt}'.format(
@@ -60,7 +60,8 @@ class _MockedNetMikoDevice(object):
fmt=format
)
fullpath = os.path.join(curr_dir, 'mock', filename)
- return open(fullpath).read()
+ with open(fullpath) as file_data:
+ return file_data.read()
def find_prompt(self):
return self.get_mock_file('\n', format='txt')
@@ -73,6 +74,9 @@ class _MockedNetMikoDevice(object):
strip_command=True):
return self.get_mock_file(command_string)
+ def send_command_timing(self, command_string, **kvargs):
+ return self.get_mock_file(command_string)
+
def receive_data_generator(self):
return ['', ''] # to have an iteration inside private method _netmiko_recv
@@ -100,6 +104,9 @@ class _MockedIOSXRDevice(IOSXR):
self._cli_prompt = self.device.find_prompt()
self._enter_xml_mode()
+ def is_alive(self):
+ return True
+
class TestIOSXRDevice(unittest.TestCase):
@@ -111,7 +118,7 @@ class TestIOSXRDevice(unittest.TestCase):
USERNAME = 'vagrant'
PASSWORD = 'vagrant'
PORT = 12205
- TIMEOUT = 1 # for tests, smaller values are prefferred
+ TIMEOUT = .1 # for tests, smaller values are prefferred
LOCK = False
LOG = sys.stdout
MOCK = True
@@ -167,6 +174,7 @@ class TestIOSXRDevice(unittest.TestCase):
LockError,
self.device.lock
)
+ self.device.lock_on_connect = False
# enough to see that will try to lock during connect
def test_mock_close(self):
@@ -208,6 +216,35 @@ class TestIOSXRDevice(unittest.TestCase):
str
)
+ def test_acquire_xml_agent(self):
+
+ """Testing if able to acquire the XML agent."""
+
+ self.device._lock_xml_agent(time.time())
+ self.assertTrue(self.device._xml_agent_locker.locked())
+ self.device._unlock_xml_agent()
+
+ def test_acquire_locked_agent_raises_timeout_error(self):
+ """Testing if trying to acquire the XML agent while locked raises TimeoutError."""
+ self.device._lock_xml_agent(time.time()) # acquiring
+ self.assertRaises(
+ TimeoutError,
+ self.device._lock_xml_agent, # trying to acquire again
+ time.time()
+ )
+ self.device._unlock_xml_agent() # releasing back
+
+ def test_release_xml_agent(self):
+ """Testing releasing of XML agent."""
+ self.device._lock_xml_agent(time.time())
+ self.assertTrue(self.device._xml_agent_locker.locked())
+ self.device._unlock_xml_agent()
+ self.assertFalse(self.device._xml_agent_locker.locked())
+
+ def test_in_cli_mode(self):
+ """Testing the private method _in_cli_mode."""
+ self.assertTrue(self.device._in_cli_mode())
+
def test__getattr_show_config(self):
"""Testing special attribute __getattr___ against valid show config command"""
@@ -230,47 +267,20 @@ class TestIOSXRDevice(unittest.TestCase):
self.assertTrue(raised)
- def test__execute_rpc__(self):
-
- """Testing private module function __execute_rpc___"""
-
- self.assertIsInstance(
- __execute_rpc__(self.device, '<Get><Configuration><NTP></NTP></Configuration></Get>', self.device.timeout),
- ET._Element
- )
-
- def test__execute_show__(self):
-
- """Testing private module function __execute_show__"""
-
- self.assertIsInstance(
- __execute_show__(self.device, 'show ntp ass', self.device.timeout),
- str
- )
-
- def test__execute_config_show__(self):
-
- """Testing private module function __execute_config_show__"""
-
- self.assertIsInstance(
- __execute_config_show__(self.device, 'show run ntp', self.device.timeout),
- str
- )
-
def test_make_rpc_call_returns_XML(self):
"""Test if public method make_rpc_call returns str"""
self.assertIsInstance(
self.device.make_rpc_call('<Get><Configuration><NTP></NTP></Configuration></Get>'),
- str
+ binary_type
)
def test_acquired_xml_agent(self):
"""Testing if raises TimeoutError if the XML agent is alredy acquired and released when exception thrown"""
- self.device._xml_agent_acquired = True # acquiring the XML agent
+ self.device._lock_xml_agent(time.time()) # acquiring the XML agent
self.assertRaises(
TimeoutError,
@@ -278,7 +288,7 @@ class TestIOSXRDevice(unittest.TestCase):
'<Get><Operational><SystemTime/><PlatformInventory/></Operational></Get>'
)
- self.assertFalse(self.device._xml_agent_acquired) # Exception raised => xml agent released
+ self.assertFalse(self.device._xml_agent_locker.locked()) # Exception raised => xml agent released
def test_try_to_read_till_timeout(self):
@@ -328,14 +338,11 @@ class TestIOSXRDevice(unittest.TestCase):
def test_channel_acquired_enter_xml_mode(self):
- """Test if raises ConnectError when the channel is busy with other requests"""
+ """Test if not raises ConnectError when the channel is busy with other requests"""
- self.device._xml_agent_acquired = True
+ self.device._lock_xml_agent()
- self.assertRaises(
- ConnectError,
- self.device._enter_xml_mode
- )
+ self.assertIsNone(self.device._enter_xml_mode())
def test_truncated_response_raises_InvalidXMLResponse(self):
@@ -643,15 +650,11 @@ class TestIOSXRDevice(unittest.TestCase):
def test_commit_after_other_session_commit(self):
- """Testing if trying to commit after another process commited raises CommitError"""
+ """Testing if trying to commit after another process commited does not raise CommitError"""
if self.MOCK:
# mock data contains the error message we are looking for
- self.assertRaises(
- CommitError,
- self.device.commit_config,
- comment="parallel"
- )
+ self.assertIsNone(self.device.commit_config(comment="parallel"))
else:
# to test this will neet to apply changes to the same device
# through a different SSH session
@@ -673,11 +676,7 @@ class TestIOSXRDevice(unittest.TestCase):
# trying to load something from the test instance
self.device.load_candidate_config(config='interface MgmtEth0/RP0/CPU0/0 description this wont work')
# and will fail because of the commit above
- self.assertRaises(
- CommitError,
- self.device.commit_config,
- comment="parallel"
- )
+ self.assertIsNone(self.device.commit_config(comment="parallel"))
# let's rollback the committed changes
same_device.rollback()
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/pyiosxr.git
More information about the Python-modules-commits
mailing list