[Python-modules-commits] [napalm-ios] 01/03: New upstream release.
Vincent Bernat
bernat at moszumanska.debian.org
Tue Nov 14 09:48:56 UTC 2017
This is an automated email from the git hooks/post-receive script.
bernat pushed a commit to annotated tag debian/0.8.1-1
in repository napalm-ios.
commit cd19b30f6d809f38df31bb43d9945b72112983de
Author: Vincent Bernat <bernat at debian.org>
Date: Tue Nov 14 10:46:09 2017 +0100
New upstream release.
---
PKG-INFO | 2 +-
napalm_ios.egg-info/PKG-INFO | 2 +-
napalm_ios.egg-info/SOURCES.txt | 1 +
napalm_ios.egg-info/not-zip-safe | 1 +
napalm_ios.egg-info/requires.txt | 4 +-
napalm_ios/ios.py | 1297 ++++++++++++++++++++++++++------------
requirements.txt | 4 +-
setup.cfg | 6 +-
setup.py | 3 +-
9 files changed, 909 insertions(+), 411 deletions(-)
diff --git a/PKG-INFO b/PKG-INFO
index 431ce6a..8998419 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,6 +1,6 @@
Metadata-Version: 1.1
Name: napalm-ios
-Version: 0.5.1
+Version: 0.8.1
Summary: Network Automation and Programmability Abstraction Layer with Multivendor support
Home-page: https://github.com/napalm-automation/napalm-ios
Author: Kirk Byers
diff --git a/napalm_ios.egg-info/PKG-INFO b/napalm_ios.egg-info/PKG-INFO
index 431ce6a..8998419 100644
--- a/napalm_ios.egg-info/PKG-INFO
+++ b/napalm_ios.egg-info/PKG-INFO
@@ -1,6 +1,6 @@
Metadata-Version: 1.1
Name: napalm-ios
-Version: 0.5.1
+Version: 0.8.1
Summary: Network Automation and Programmability Abstraction Layer with Multivendor support
Home-page: https://github.com/napalm-automation/napalm-ios
Author: Kirk Byers
diff --git a/napalm_ios.egg-info/SOURCES.txt b/napalm_ios.egg-info/SOURCES.txt
index 8e38a0c..aa9be8f 100644
--- a/napalm_ios.egg-info/SOURCES.txt
+++ b/napalm_ios.egg-info/SOURCES.txt
@@ -7,6 +7,7 @@ napalm_ios/ios.py
napalm_ios.egg-info/PKG-INFO
napalm_ios.egg-info/SOURCES.txt
napalm_ios.egg-info/dependency_links.txt
+napalm_ios.egg-info/not-zip-safe
napalm_ios.egg-info/requires.txt
napalm_ios.egg-info/top_level.txt
napalm_ios/templates/delete_ntp_peers.j2
diff --git a/napalm_ios.egg-info/not-zip-safe b/napalm_ios.egg-info/not-zip-safe
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/napalm_ios.egg-info/not-zip-safe
@@ -0,0 +1 @@
+
diff --git a/napalm_ios.egg-info/requires.txt b/napalm_ios.egg-info/requires.txt
index 2b7a494..c361a7f 100644
--- a/napalm_ios.egg-info/requires.txt
+++ b/napalm_ios.egg-info/requires.txt
@@ -1,2 +1,2 @@
-napalm-base>=0.20.4
-netmiko>=1.0.0
+napalm_base>=0.25.0
+netmiko>=1.4.3
diff --git a/napalm_ios/ios.py b/napalm_ios/ios.py
old mode 100755
new mode 100644
index e3048d6..caa45e7
--- a/napalm_ios/ios.py
+++ b/napalm_ios/ios.py
@@ -17,13 +17,22 @@ from __future__ import print_function
from __future__ import unicode_literals
import re
-
-from netmiko import ConnectHandler, FileTransfer
-from netmiko import __version__ as netmiko_version
+import os
+import uuid
+import socket
+import tempfile
+import telnetlib
+import copy
+
+from netmiko import ConnectHandler, FileTransfer, InLineTransfer
from napalm_base.base import NetworkDriver
-from napalm_base.exceptions import ReplaceConfigException, MergeConfigException
+from napalm_base.exceptions import ReplaceConfigException, MergeConfigException, \
+ ConnectionClosedException, CommandErrorException
+
from napalm_base.utils import py23_compat
import napalm_base.constants as C
+import napalm_base.helpers
+
# Easier to store these as constants
HOUR_SECONDS = 3600
@@ -33,12 +42,23 @@ YEAR_SECONDS = 365 * DAY_SECONDS
# STD REGEX PATTERNS
IP_ADDR_REGEX = r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}"
+IPV4_ADDR_REGEX = IP_ADDR_REGEX
+IPV6_ADDR_REGEX_1 = r"::"
+IPV6_ADDR_REGEX_2 = r"[0-9a-fA-F:]{1,39}::[0-9a-fA-F:]{1,39}"
+IPV6_ADDR_REGEX_3 = r"[0-9a-fA-F]{1,3}:[0-9a-fA-F]{1,3}:[0-9a-fA-F]{1,3}:[0-9a-fA-F]{1,3}:" \
+ "[0-9a-fA-F]{1,3}:[0-9a-fA-F]{1,3}:[0-9a-fA-F]{1,3}:[0-9a-fA-F]{1,3}"
+# Should validate IPv6 address using an IP address library after matching with this regex
+IPV6_ADDR_REGEX = "(?:{}|{}|{})".format(IPV6_ADDR_REGEX_1, IPV6_ADDR_REGEX_2, IPV6_ADDR_REGEX_3)
+
MAC_REGEX = r"[a-fA-F0-9]{4}\.[a-fA-F0-9]{4}\.[a-fA-F0-9]{4}"
VLAN_REGEX = r"\d{1,4}"
RE_IPADDR = re.compile(r"{}".format(IP_ADDR_REGEX))
RE_IPADDR_STRIP = re.compile(r"({})\n".format(IP_ADDR_REGEX))
RE_MAC = re.compile(r"{}".format(MAC_REGEX))
+# Period needed for 32-bit AS Numbers
+ASN_REGEX = r"[\d\.]+"
+
IOS_COMMANDS = {
'show_mac_address': ['show mac-address-table', 'show mac address-table'],
}
@@ -56,13 +76,19 @@ class IOSDriver(NetworkDriver):
self.password = password
self.timeout = timeout
+ self.transport = optional_args.get('transport', 'ssh')
+
# Retrieve file names
self.candidate_cfg = optional_args.get('candidate_cfg', 'candidate_config.txt')
self.merge_cfg = optional_args.get('merge_cfg', 'merge_config.txt')
self.rollback_cfg = optional_args.get('rollback_cfg', 'rollback_config.txt')
+ self.inline_transfer = optional_args.get('inline_transfer', False)
+ if self.transport == 'telnet':
+ # Telnet only supports inline_transfer
+ self.inline_transfer = True
# None will cause autodetection of dest_file_system
- self.dest_file_system = optional_args.get('dest_file_system', None)
+ self._dest_file_system = optional_args.get('dest_file_system', None)
self.auto_rollback_on_error = optional_args.get('auto_rollback_on_error', True)
# Netmiko possible arguments
@@ -70,6 +96,7 @@ class IOSDriver(NetworkDriver):
'port': None,
'secret': '',
'verbose': False,
+ 'keepalive': 30,
'global_delay_factor': 1,
'use_keys': False,
'key_file': None,
@@ -78,16 +105,9 @@ class IOSDriver(NetworkDriver):
'alt_host_keys': False,
'alt_key_file': '',
'ssh_config_file': None,
+ 'allow_agent': False,
}
- fields = netmiko_version.split('.')
- fields = [int(x) for x in fields]
- maj_ver, min_ver, bug_fix = fields
- if maj_ver >= 2:
- netmiko_argument_map['allow_agent'] = False
- elif maj_ver == 1 and min_ver >= 1:
- netmiko_argument_map['allow_agent'] = False
-
# Build dict of any optional Netmiko args
self.netmiko_optional_args = {}
for k, v in netmiko_argument_map.items():
@@ -95,28 +115,39 @@ class IOSDriver(NetworkDriver):
self.netmiko_optional_args[k] = optional_args[k]
except KeyError:
pass
- self.global_delay_factor = optional_args.get('global_delay_factor', 1)
- self.port = optional_args.get('port', 22)
+
+ default_port = {
+ 'ssh': 22,
+ 'telnet': 23
+ }
+ self.port = optional_args.get('port', default_port[self.transport])
self.device = None
self.config_replace = False
self.interface_map = {}
+ self.profile = ["ios"]
+
def open(self):
"""Open a connection to the device."""
- self.device = ConnectHandler(device_type='cisco_ios',
+ device_type = 'cisco_ios'
+ if self.transport == 'telnet':
+ device_type = 'cisco_ios_telnet'
+ self.device = ConnectHandler(device_type=device_type,
host=self.hostname,
username=self.username,
password=self.password,
**self.netmiko_optional_args)
# ensure in enable mode
self.device.enable()
- if not self.dest_file_system:
- try:
- self.dest_file_system = self.device._autodetect_fs()
- except AttributeError:
- raise AttributeError("Netmiko _autodetect_fs not found please upgrade Netmiko or "
- "specify dest_file_system in optional_args.")
+
+ def _discover_file_system(self):
+ try:
+ return self.device._autodetect_fs()
+ except Exception:
+ msg = "Netmiko _autodetect_fs failed (to workaround specify " \
+ "dest_file_system in optional_args.)"
+ raise CommandErrorException(msg)
def close(self):
"""Close the connection to the device."""
@@ -127,20 +158,91 @@ class IOSDriver(NetworkDriver):
If command is a list will iterate through commands until valid command.
"""
- if isinstance(command, list):
- for cmd in command:
- output = self.device.send_command(cmd)
- if "% Invalid" not in output:
- break
- else:
- output = self.device.send_command(command)
- return self._send_command_postprocess(output)
+ try:
+ if isinstance(command, list):
+ for cmd in command:
+ output = self.device.send_command(cmd)
+ if "% Invalid" not in output:
+ break
+ else:
+ output = self.device.send_command(command)
+ return self._send_command_postprocess(output)
+ except (socket.error, EOFError) as e:
+ raise ConnectionClosedException(str(e))
def is_alive(self):
- """Returns a flag with the state of the SSH connection."""
- return {
- 'is_alive': self.device.remote_conn.transport.is_active()
- }
+ """Returns a flag with the state of the connection."""
+ null = chr(0)
+ if self.device is None:
+ return {'is_alive': False}
+ if self.transport == 'telnet':
+ try:
+ # Try sending IAC + NOP (IAC is telnet way of sending command
+ # IAC = Interpret as Command (it comes before the NOP)
+ self.device.write_channel(telnetlib.IAC + telnetlib.NOP)
+ return {'is_alive': True}
+ except UnicodeDecodeError:
+ # Netmiko logging bug (remove after Netmiko >= 1.4.3)
+ return {'is_alive': True}
+ except AttributeError:
+ return {'is_alive': False}
+ else:
+ # SSH
+ try:
+ # Try sending ASCII null byte to maintain the connection alive
+ self.device.write_channel(null)
+ return {'is_alive': self.device.remote_conn.transport.is_active()}
+ except (socket.error, EOFError):
+ # If unable to send, we can tell for sure that the connection is unusable
+ return {'is_alive': False}
+ return {'is_alive': False}
+
+ @staticmethod
+ def _create_tmp_file(config):
+ """Write temp file and for use with inline config and SCP."""
+ tmp_dir = tempfile.gettempdir()
+ rand_fname = py23_compat.text_type(uuid.uuid4())
+ filename = os.path.join(tmp_dir, rand_fname)
+ with open(filename, 'wt') as fobj:
+ fobj.write(config)
+ return filename
+
+ def _load_candidate_wrapper(self, source_file=None, source_config=None, dest_file=None,
+ file_system=None):
+ """
+ Transfer file to remote device for either merge or replace operations
+
+ Returns (return_status, msg)
+ """
+ return_status = False
+ msg = ''
+ if source_file and source_config:
+ raise ValueError("Cannot simultaneously set source_file and source_config")
+
+ if source_config:
+ if self.inline_transfer:
+ (return_status, msg) = self._inline_tcl_xfer(source_config=source_config,
+ dest_file=dest_file,
+ file_system=file_system)
+ else:
+ # Use SCP
+ tmp_file = self._create_tmp_file(source_config)
+ (return_status, msg) = self._scp_file(source_file=tmp_file, dest_file=dest_file,
+ file_system=file_system)
+ if tmp_file and os.path.isfile(tmp_file):
+ os.remove(tmp_file)
+ if source_file:
+ if self.inline_transfer:
+ (return_status, msg) = self._inline_tcl_xfer(source_file=source_file,
+ dest_file=dest_file,
+ file_system=file_system)
+ else:
+ (return_status, msg) = self._scp_file(source_file=source_file, dest_file=dest_file,
+ file_system=file_system)
+ if not return_status:
+ if msg == '':
+ msg = "Transfer to remote device failed"
+ return (return_status, msg)
def load_replace_candidate(self, filename=None, config=None):
"""
@@ -149,16 +251,12 @@ class IOSDriver(NetworkDriver):
Return None or raise exception
"""
self.config_replace = True
- if config:
- raise NotImplementedError
- if filename:
- (return_status, msg) = self._scp_file(source_file=filename,
- dest_file=self.candidate_cfg,
- file_system=self.dest_file_system)
- if not return_status:
- if msg == '':
- msg = "SCP transfer to remote device failed"
- raise ReplaceConfigException(msg)
+ return_status, msg = self._load_candidate_wrapper(source_file=filename,
+ source_config=config,
+ dest_file=self.candidate_cfg,
+ file_system=self.dest_file_system)
+ if not return_status:
+ raise ReplaceConfigException(msg)
def load_merge_candidate(self, filename=None, config=None):
"""
@@ -167,16 +265,12 @@ class IOSDriver(NetworkDriver):
Merge configuration in: copy <file> running-config
"""
self.config_replace = False
- if config:
- raise NotImplementedError
- if filename:
- (return_status, msg) = self._scp_file(source_file=filename,
- dest_file=self.merge_cfg,
- file_system=self.dest_file_system)
- if not return_status:
- if msg == '':
- msg = "SCP transfer to remote device failed"
- raise MergeConfigException(msg)
+ return_status, msg = self._load_candidate_wrapper(source_file=filename,
+ source_config=config,
+ dest_file=self.merge_cfg,
+ file_system=self.dest_file_system)
+ if not return_status:
+ raise MergeConfigException(msg)
@staticmethod
def _normalize_compare_config(diff):
@@ -194,6 +288,42 @@ class IOSDriver(NetworkDriver):
return "\n".join(new_list)
@staticmethod
+ def _normalize_merge_diff_incr(diff):
+ """Make the compare config output look better.
+
+ Cisco IOS incremental-diff output
+
+ No changes:
+ !List of Commands:
+ end
+ !No changes were found
+ """
+ new_diff = []
+
+ changes_found = False
+ for line in diff.splitlines():
+ if re.search(r'order-dependent line.*re-ordered', line):
+ changes_found = True
+ elif 'No changes were found' in line:
+ # IOS in the re-order case still claims "No changes were found"
+ if not changes_found:
+ return ''
+ else:
+ continue
+
+ if line.strip() == 'end':
+ continue
+ elif 'List of Commands' in line:
+ continue
+ # Filter blank lines and prepend +sign
+ elif line.strip():
+ if re.search(r"^no\s+", line.strip()):
+ new_diff.append('-' + line)
+ else:
+ new_diff.append('+' + line)
+ return "\n".join(new_diff)
+
+ @staticmethod
def _normalize_merge_diff(diff):
"""Make compare_config() for merge look similar to replace config diff."""
new_diff = []
@@ -202,8 +332,7 @@ class IOSDriver(NetworkDriver):
if line.strip():
new_diff.append('+' + line)
if new_diff:
- new_diff.insert(0, '! Cisco IOS does not support true compare_config() for merge: '
- 'echo merge file.')
+ new_diff.insert(0, '! incremental-diff failed; falling back to echo of merge file')
else:
new_diff.append('! No changes specified in merge file.')
return "\n".join(new_diff)
@@ -231,24 +360,30 @@ class IOSDriver(NetworkDriver):
diff = self.device.send_command_expect(cmd)
diff = self._normalize_compare_config(diff)
else:
- cmd = 'more {}'.format(new_file_full)
+ # merge
+ cmd = 'show archive config incremental-diffs {} ignorecase'.format(new_file_full)
diff = self.device.send_command_expect(cmd)
- diff = self._normalize_merge_diff(diff)
+ if 'error code 5' in diff or 'returned error 5' in diff:
+ diff = "You have encountered the obscure 'error 5' message. This generally " \
+ "means you need to add an 'end' statement to the end of your merge changes."
+ elif '% Invalid' not in diff:
+ diff = self._normalize_merge_diff_incr(diff)
+ else:
+ cmd = 'more {}'.format(new_file_full)
+ diff = self.device.send_command_expect(cmd)
+ diff = self._normalize_merge_diff(diff)
+
return diff.strip()
def _commit_hostname_handler(self, cmd):
"""Special handler for hostname change on commit operation."""
- try:
- current_prompt = self.device.find_prompt()
- # Wait 12 seconds for output to come back (.2 * 60)
- output = self.device.send_command_expect(cmd, delay_factor=.2, max_loops=60)
- except IOError:
- # Check if hostname change
- if current_prompt == self.device.find_prompt():
- raise
- else:
- self.device.set_base_prompt()
- output = ''
+ current_prompt = self.device.find_prompt().strip()
+ terminating_char = current_prompt[-1]
+ pattern = r"[>#{}]\s*$".format(terminating_char)
+ # Look exclusively for trailing pattern that includes '#' and '>'
+ output = self.device.send_command_expect(cmd, expect_string=pattern)
+ # Reset base prompt in case hostname changed
+ self.device.set_base_prompt()
return output
def commit_config(self):
@@ -271,9 +406,14 @@ class IOSDriver(NetworkDriver):
else:
cmd = 'configure replace {} force'.format(cfg_file)
output = self._commit_hostname_handler(cmd)
- if ('Failed to apply command' in output) or \
- ('original configuration has been successfully restored' in output):
- raise ReplaceConfigException("Candidate config could not be applied")
+ if ('original configuration has been successfully restored' in output) or \
+ ('error' in output.lower()) or \
+ ('failed' in output.lower()):
+ msg = "Candidate config could not be applied\n{}".format(output)
+ raise ReplaceConfigException(msg)
+ elif '%Please turn config archive on' in output:
+ msg = "napalm-ios replace() requires Cisco 'archive' feature to be enabled."
+ raise ReplaceConfigException(msg)
else:
# Merge operation
filename = self.merge_cfg
@@ -286,7 +426,8 @@ class IOSDriver(NetworkDriver):
self._enable_confirm()
if 'Invalid input detected' in output:
self.rollback()
- merge_error = "Configuration merge failed; automatic rollback attempted"
+ err_header = "Configuration merge failed; automatic rollback attempted"
+ merge_error = "{0}:\n{1}".format(err_header, output)
raise MergeConfigException(merge_error)
# Save config to startup (both replace and merge)
@@ -310,6 +451,26 @@ class IOSDriver(NetworkDriver):
cmd = 'configure replace {} force'.format(cfg_file)
self.device.send_command_expect(cmd)
+ # Save config to startup
+ self.device.send_command_expect("write mem")
+
+ def _inline_tcl_xfer(self, source_file=None, source_config=None, dest_file=None,
+ file_system=None):
+ """
+ Use Netmiko InlineFileTransfer (TCL) to transfer file or config to remote device.
+
+ Return (status, msg)
+ status = boolean
+ msg = details on what happened
+ """
+ if source_file:
+ return self._xfer_file(source_file=source_file, dest_file=dest_file,
+ file_system=file_system, TransferClass=InLineTransfer)
+ if source_config:
+ return self._xfer_file(source_config=source_config, dest_file=dest_file,
+ file_system=file_system, TransferClass=InLineTransfer)
+ raise ValueError("File source not specified for transfer.")
+
def _scp_file(self, source_file, dest_file, file_system):
"""
SCP file to remote device.
@@ -318,30 +479,53 @@ class IOSDriver(NetworkDriver):
status = boolean
msg = details on what happened
"""
- # Will automaticall enable SCP on remote device
- enable_scp = True
+ return self._xfer_file(source_file=source_file, dest_file=dest_file,
+ file_system=file_system, TransferClass=FileTransfer)
+
+ def _xfer_file(self, source_file=None, source_config=None, dest_file=None, file_system=None,
+ TransferClass=FileTransfer):
+ """Transfer file to remote device.
- with FileTransfer(self.device,
- source_file=source_file,
- dest_file=dest_file,
- file_system=file_system) as scp_transfer:
+ By default, this will use Secure Copy if self.inline_transfer is set, then will use
+ Netmiko InlineTransfer method to transfer inline using either SSH or telnet (plus TCL
+ onbox).
+
+ Return (status, msg)
+ status = boolean
+ msg = details on what happened
+ """
+ if not source_file and not source_config:
+ raise ValueError("File source not specified for transfer.")
+ if not dest_file or not file_system:
+ raise ValueError("Destination file or file system not specified.")
+
+ if source_file:
+ kwargs = dict(ssh_conn=self.device, source_file=source_file, dest_file=dest_file,
+ direction='put', file_system=file_system)
+ elif source_config:
+ kwargs = dict(ssh_conn=self.device, source_config=source_config, dest_file=dest_file,
+ direction='put', file_system=file_system)
+ enable_scp = True
+ if self.inline_transfer:
+ enable_scp = False
+ with TransferClass(**kwargs) as transfer:
# Check if file already exists and has correct MD5
- if scp_transfer.check_file_exists() and scp_transfer.compare_md5():
+ if transfer.check_file_exists() and transfer.compare_md5():
msg = "File already exists and has correct MD5: no SCP needed"
return (True, msg)
- if not scp_transfer.verify_space_available():
+ if not transfer.verify_space_available():
msg = "Insufficient space available on remote device"
return (False, msg)
if enable_scp:
- scp_transfer.enable_scp()
+ transfer.enable_scp()
# Transfer file
- scp_transfer.transfer_file()
+ transfer.transfer_file()
# Compares MD5 between local-remote files
- if scp_transfer.verify_file():
+ if transfer.verify_file():
msg = "File successfully transferred to remote device"
return (True, msg)
else:
@@ -429,6 +613,80 @@ class IOSDriver(NetworkDriver):
output = re.sub(r"^Time source is .*$", "", output, flags=re.M)
return output.strip()
+ def get_optics(self):
+ command = 'show interfaces transceiver'
+ output = self._send_command(command)
+
+ # Check if router supports the command
+ if '% Invalid input' in output:
+ return {}
+
+ # Formatting data into return data structure
+ optics_detail = {}
+
+ try:
+ split_output = re.split(r'^---------.*$', output, flags=re.M)[1]
+ except IndexError:
+ return {}
+
+ split_output = split_output.strip()
+
+ for optics_entry in split_output.splitlines():
+ # Example, Te1/0/1 34.6 3.29 -2.0 -3.5
+ try:
+ split_list = optics_entry.split()
+ except ValueError:
+ return {}
+
+ int_brief = split_list[0]
+ output_power = split_list[3]
+ input_power = split_list[4]
+
+ port = self._expand_interface_name(int_brief)
+
+ port_detail = {}
+
+ port_detail['physical_channels'] = {}
+ port_detail['physical_channels']['channel'] = []
+
+ # If interface is shutdown it returns "N/A" as output power.
+ # Converting that to -100.0 float
+ try:
+ float(output_power)
+ except ValueError:
+ output_power = -100.0
+
+ # Defaulting avg, min, max values to -100.0 since device does not
+ # return these values
+ optic_states = {
+ 'index': 0,
+ 'state': {
+ 'input_power': {
+ 'instant': (float(input_power) if 'input_power' else -100.0),
+ 'avg': -100.0,
+ 'min': -100.0,
+ 'max': -100.0
+ },
+ 'output_power': {
+ 'instant': (float(output_power) if 'output_power' else -100.0),
+ 'avg': -100.0,
+ 'min': -100.0,
+ 'max': -100.0
+ },
+ 'laser_bias_current': {
+ 'instant': 0.0,
+ 'avg': 0.0,
+ 'min': 0.0,
+ 'max': 0.0
+ }
+ }
+ }
+
+ port_detail['physical_channels']['channel'].append(optic_states)
+ optics_detail[port] = port_detail
+
+ return optics_detail
+
def get_lldp_neighbors(self):
"""IOS implementation of get_lldp_neighbors."""
lldp = {}
@@ -450,7 +708,27 @@ class IOSDriver(NetworkDriver):
for lldp_entry in split_output.splitlines():
# Example, twb-sf-hpsw1 Fa4 120 B 17
- device_id, local_int_brief, hold_time, capability, remote_port = lldp_entry.split()
+ try:
+ device_id, local_int_brief, hold_time, capability, remote_port = lldp_entry.split()
+ except ValueError:
+ if len(lldp_entry.split()) == 4:
+ # Four fields might be long_name or missing capability
+ capability_missing = True if lldp_entry[46] == ' ' else False
+ if capability_missing:
+ device_id, local_int_brief, hold_time, remote_port = lldp_entry.split()
+ else:
+ # Might be long_name issue
+ tmp_field, hold_time, capability, remote_port = lldp_entry.split()
+ device_id = tmp_field[:20]
+ local_int_brief = tmp_field[20:]
+ # device_id might be abbreviated, try to get full name
+ lldp_tmp = self._lldp_detail_parser(local_int_brief)
+ device_id_new = lldp_tmp[3][0]
+ # Verify abbreviated and full name are consistent
+ if device_id_new[:20] == device_id:
+ device_id = device_id_new
+ else:
+ raise ValueError("Unable to obtain remote device name")
local_port = self._expand_interface_name(local_int_brief)
entry = {'port': remote_port, 'hostname': device_id}
@@ -459,6 +737,39 @@ class IOSDriver(NetworkDriver):
return lldp
+ def _lldp_detail_parser(self, interface):
+ command = "show lldp neighbors {} detail".format(interface)
+ output = self._send_command(command)
+
+ # Check if router supports the command
+ if '% Invalid input' in output:
+ raise ValueError("Command not supported by network device")
+
+ # Cisco generally use : for string divider, but sometimes has ' - '
+ port_id = re.findall(r"Port id\s*?[:-]\s+(.+)", output)
+ port_description = re.findall(r"Port Description\s*?[:-]\s+(.+)", output)
+ chassis_id = re.findall(r"Chassis id\s*?[:-]\s+(.+)", output)
+ system_name = re.findall(r"System Name\s*?[:-]\s+(.+)", output)
+ system_description = re.findall(r"System Description\s*?[:-]\s*(not advertised|\s*\n.+)",
+ output)
+ system_description = [x.strip() for x in system_description]
+ system_capabilities = re.findall(r"System Capabilities\s*?[:-]\s+(.+)", output)
+ enabled_capabilities = re.findall(r"Enabled Capabilities\s*?[:-]\s+(.+)", output)
+ remote_address = re.findall(r"Management Addresses\s*[:-]\s*(not advertised|\n.+)", output)
+ # remote address had two possible patterns which required some secondary processing
+ new_remote_address = []
+ for val in remote_address:
+ val = val.strip()
+ pattern = r'(?:IP|Other)(?::\s+?)(.+)'
+ match = re.search(pattern, val)
+ if match:
+ new_remote_address.append(match.group(1))
+ else:
+ new_remote_address.append(val)
+ remote_address = new_remote_address
+ return [port_id, port_description, chassis_id, system_name, system_description,
+ system_capabilities, enabled_capabilities, remote_address]
+
def get_lldp_neighbors_detail(self, interface=''):
"""
IOS implementation of get_lldp_neighbors_detail.
@@ -477,27 +788,14 @@ class IOSDriver(NetworkDriver):
lldp_neighbors = {}
for interface in lldp_neighbors:
- command = "show lldp neighbors {} detail".format(interface)
- output = self._send_command(command)
-
- # Check if router supports the command
- if '% Invalid input' in output:
- return {}
-
local_port = interface
- port_id = re.findall(r"Port id: (.+)", output)
- port_description = re.findall(r"Port Description: (.+)", output)
- chassis_id = re.findall(r"Chassis id: (.+)", output)
- system_name = re.findall(r"System Name: (.+)", output)
- system_description = re.findall(r"System Description: \n(.+)", output)
- system_capabilities = re.findall(r"System Capabilities: (.+)", output)
- enabled_capabilities = re.findall(r"Enabled Capabilities: (.+)", output)
- remote_address = re.findall(r"Management Addresses:\n IP: (.+)", output)
- if not remote_address:
- remote_address = re.findall(r"Management Addresses:\n Other: (.+)", output)
- number_entries = len(port_id)
- lldp_fields = [port_id, port_description, chassis_id, system_name, system_description,
- system_capabilities, enabled_capabilities, remote_address]
+ lldp_fields = self._lldp_detail_parser(interface)
+ # Convert any 'not advertised' to 'N/A'
+ for field in lldp_fields:
+ for i, value in enumerate(field):
+ if 'not advertised' in value:
+ field[i] = 'N/A'
+ number_entries = len(lldp_fields[0])
# re.findall will return a list. Make sure same number of entries always returned.
for test_list in lldp_fields:
@@ -562,7 +860,7 @@ class IOSDriver(NetworkDriver):
# default values.
vendor = u'Cisco'
uptime = -1
- serial_number, fqdn, os_version, hostname = (u'Unknown', u'Unknown', u'Unknown', u'Unknown')
+ serial_number, fqdn, os_version, hostname, domain_name = ('Unknown',) * 5
# obtain output from device
show_ver = self._send_command('show version')
@@ -654,95 +952,75 @@ class IOSDriver(NetworkDriver):
'mac_address': u'a493.4cc1.67a7',
'speed': 100}}
"""
- interface_list = {}
-
# default values.
last_flapped = -1.0
- # creating the parsing regex.
- mac_regex = r".*,\saddress\sis\s(?P<mac_address>\S+).*"
- speed_regex = r".*BW\s(?P<speed>\d+)\s(?P<speed_format>\S+).*"
-
- command = 'show ip interface brief'
+ command = 'show interfaces'
output = self._send_command(command)
- for line in output.splitlines():
- if 'Interface' in line and 'Status' in line:
- continue
- fields = line.split()
- """
- router#sh ip interface brief
- Interface IP-Address OK? Method Status Protocol
- FastEthernet8 10.65.43.169 YES DHCP up up
- GigabitEthernet0 unassigned YES NVRAM administratively down down
- Loopback234 unassigned YES unset up up
- Loopback555 unassigned YES unset up up
- NVI0 unassigned YES unset administratively down down
- Tunnel0 10.63.100.9 YES NVRAM up up
- Tunnel1 10.63.101.9 YES NVRAM up up
- Vlan1 unassigned YES unset up up
- Vlan100 10.40.0.1 YES NVRAM up up
- Vlan200 10.63.176.57 YES NVRAM up up
- Wlan-GigabitEthernet0 unassigned YES unset up up
- wlan-ap0 10.40.0.1 YES unset up up
- """
-
- # Check for administratively down
- if len(fields) == 6:
- interface, ip_address, ok, method, status, protocol = fields
- elif len(fields) == 7:
- # Administratively down is two fields in the output for status
- interface, ip_address, ok, method, status, status2, protocol = fields
- else:
- raise ValueError(u"Unexpected Response from the device")
- status = status.lower()
- protocol = protocol.lower()
- if 'admin' in status:
- is_enabled = False
- else:
- is_enabled = True
- is_up = bool('up' in protocol)
- interface_list[interface] = {
- 'is_up': is_up,
- 'is_enabled': is_enabled,
- 'last_flapped': last_flapped,
- }
+ interface = description = mac_address = speed = speedformat = ''
+ is_enabled = is_up = None
- for interface in interface_list:
- show_command = "show interface {0}".format(interface)
- interface_output = self._send_command(show_command)
- try:
- # description filter
- description = re.search(r" Description: (.+)", interface_output)
- interface_list[interface]['description'] = description.group(1).strip('\r')
- except AttributeError:
- interface_list[interface]['description'] = u'N/A'
+ interface_dict = {}
+ for line in output.splitlines():
- try:
- # mac_address filter.
- match_mac = re.match(mac_regex, interface_output, flags=re.DOTALL)
- group_mac = match_mac.groupdict()
- mac_address = group_mac["mac_address"]
- interface_list[interface]['mac_address'] = py23_compat.text_type(mac_address)
- except AttributeError:
- interface_list[interface]['mac_address'] = u'N/A'
- try:
- # BW filter.
- match_speed = re.match(speed_regex, interface_output, flags=re.DOTALL)
- group_speed = match_speed.groupdict()
- speed = group_speed["speed"]
- speed_format = group_speed["speed_format"]
- if speed_format == 'Mbit':
- interface_list[interface]['speed'] = int(speed)
- else:
- speed = int(speed) / 1000
- interface_list[interface]['speed'] = int(speed)
- except AttributeError:
- interface_list[interface]['speed'] = -1
- except ValueError:
- interface_list[interface]['speed'] = -1
+ interface_regex_1 = r"^(\S+?)\s+is\s+(.+?),\s+line\s+protocol\s+is\s+(\S+)"
+ interface_regex_2 = r"^(\S+)\s+is\s+(up|down)"
+ for pattern in (interface_regex_1, interface_regex_2):
+ interface_match = re.search(pattern, line)
+ if interface_match:
+ interface = interface_match.group(1)
+ status = interface_match.group(2)
+ try:
+ protocol = interface_match.group(3)
+ except IndexError:
+ protocol = ''
+ if 'admin' in status.lower():
+ is_enabled = False
+ else:
+ is_enabled = True
+ if protocol:
+ is_up = bool('up' in protocol)
+ else:
+ is_up = bool('up' in status)
+ break
- return interface_list
+ mac_addr_regex = r"^\s+Hardware.+address\s+is\s+({})".format(MAC_REGEX)
+ if re.search(mac_addr_regex, line):
+ mac_addr_match = re.search(mac_addr_regex, line)
+ mac_address = napalm_base.helpers.mac(mac_addr_match.groups()[0])
+
+ descr_regex = "^\s+Description:\s+(.+?)$"
+ if re.search(descr_regex, line):
+ descr_match = re.search(descr_regex, line)
+ description = descr_match.groups()[0]
+
+ speed_regex = r"^\s+MTU\s+\d+.+BW\s+(\d+)\s+([KMG]?b)"
+ if re.search(speed_regex, line):
+ speed_match = re.search(speed_regex, line)
+ speed = speed_match.groups()[0]
+ speedformat = speed_match.groups()[1]
+ speed = float(speed)
+ if speedformat.startswith('Kb'):
+ speed = speed / 1000.0
+ elif speedformat.startswith('Gb'):
+ speed = speed * 1000
+ speed = int(round(speed))
+
+ if interface == '':
+ raise ValueError("Interface attributes were \
+ found without any known interface")
+ if not isinstance(is_up, bool) or not isinstance(is_enabled, bool):
+ raise ValueError("Did not correctly find the interface status")
+
+ interface_dict[interface] = {'is_enabled': is_enabled, 'is_up': is_up,
+ 'description': description, 'mac_address': mac_address,
+ 'last_flapped': last_flapped, 'speed': speed}
+
+ interface = description = mac_address = speed = speedformat = ''
+ is_enabled = is_up = None
+
+ return interface_dict
def get_interfaces_ip(self):
"""
@@ -757,7 +1035,7 @@ class IOSDriver(NetworkDriver):
'ipv6': { u'1::1': { 'prefix_length': 64},
u'2001:DB8:1::1': { 'prefix_length': 64},
u'2::': { 'prefix_length': 64},
- u'FE80::3': { 'prefix_length': u'N/A'}}},
+ u'FE80::3': { 'prefix_length': 10}}},
u'Tunnel0': { 'ipv4': { u'10.63.100.9': { 'prefix_length': 24}}},
u'Tunnel1': { 'ipv4': { u'10.63.101.9': { 'prefix_length': 24}}},
u'Vlan100': { 'ipv4': { u'10.40.0.1': { 'prefix_length': 24},
@@ -767,84 +1045,50 @@ class IOSDriver(NetworkDriver):
"""
interfaces = {}
- command = 'show ip interface brief'
- output = self._send_command(command)
- for line in output.splitlines():
- if 'Interface' in line and 'Status' in line:
- continue
- fields = line.split()
- if len(fields) >= 3:
- interface = fields[0]
- else:
- raise ValueError("Unexpected response from the router")
- interfaces.update({interface: {}})
-
- # Parse IP Address and Subnet Mask from Interfaces
- for interface in interfaces:
- show_command = "show run interface {0}".format(interface)
- interface_output = self._send_command(show_command)
- for line in interface_output.splitlines():
- if 'ip address ' in line and 'no ip address' not in line:
- fields = line.split()
- if len(fields) == 3:
- # Check for 'ip address dhcp', convert to ip address and mask
- if fields[2] == 'dhcp':
- cmd = "show interface {} | in Internet address is".format(interface)
- show_int = self._send_command(cmd)
- int_fields = show_int.split()
- ip_address, subnet = int_fields[3].split(r'/')
- interfaces[interface]['ipv4'] = {ip_address: {}}
- try:
- val = {'prefix_length': int(subnet)}
- except ValueError:
- val = {'prefix_length': u'N/A'}
- interfaces[interface]['ipv4'][ip_address] = val
- elif len(fields) in [4, 5]:
- # Check for 'ip address 10.10.10.1 255.255.255.0'
- # Check for 'ip address 10.10.11.1 255.255.255.0 secondary'
- if 'ipv4' not in interfaces[interface].keys():
- interfaces[interface].update({'ipv4': {}})
- ip_address = fields[2]
-
- try:
- subnet = sum([bin(int(x)).count('1') for x in fields[3].split('.')])
- except ValueError:
- subnet = u'N/A'
-
- ip_dict = {'prefix_length': subnet}
- interfaces[interface]['ipv4'].update({ip_address: {}})
- interfaces[interface]['ipv4'][ip_address].update(ip_dict)
... 814 lines suppressed ...
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/napalm-ios.git
More information about the Python-modules-commits
mailing list