[Python-modules-commits] [napalm-ios] 01/03: Import napalm-ios_0.5.1.orig.tar.gz
Vincent Bernat
bernat at moszumanska.debian.org
Sun Dec 25 10:52:11 UTC 2016
This is an automated email from the git hooks/post-receive script.
bernat pushed a commit to branch master
in repository napalm-ios.
commit b2866c4df615b25645484f9d64166a7f29ddf908
Author: Vincent Bernat <bernat at debian.org>
Date: Sun Dec 25 11:49:45 2016 +0100
Import napalm-ios_0.5.1.orig.tar.gz
---
PKG-INFO | 6 +-
napalm_ios.egg-info/PKG-INFO | 6 +-
napalm_ios.egg-info/SOURCES.txt | 6 +-
napalm_ios.egg-info/requires.txt | 2 +-
napalm_ios/ios.py | 579 +++++++++++++++++++++--------
napalm_ios/templates/delete_ntp_peers.j2 | 4 +-
napalm_ios/templates/delete_ntp_servers.j2 | 3 +
napalm_ios/templates/delete_snmp_config.j2 | 14 +
napalm_ios/templates/set_ntp_peers.j2 | 4 +-
napalm_ios/templates/set_ntp_servers.j2 | 3 +
napalm_ios/templates/snmp_config.j2 | 22 ++
requirements.txt | 2 +-
setup.cfg | 6 +
setup.py | 8 +-
14 files changed, 500 insertions(+), 165 deletions(-)
diff --git a/PKG-INFO b/PKG-INFO
index 1100a50..431ce6a 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,10 +1,10 @@
Metadata-Version: 1.1
Name: napalm-ios
-Version: 0.3.1
+Version: 0.5.1
Summary: Network Automation and Programmability Abstraction Layer with Multivendor support
Home-page: https://github.com/napalm-automation/napalm-ios
-Author: David Barroso
-Author-email: dbarrosop at dravetech.com
+Author: Kirk Byers
+Author-email: ktbyers at twb-tech.com
License: UNKNOWN
Description: UNKNOWN
Platform: UNKNOWN
diff --git a/napalm_ios.egg-info/PKG-INFO b/napalm_ios.egg-info/PKG-INFO
index 1100a50..431ce6a 100644
--- a/napalm_ios.egg-info/PKG-INFO
+++ b/napalm_ios.egg-info/PKG-INFO
@@ -1,10 +1,10 @@
Metadata-Version: 1.1
Name: napalm-ios
-Version: 0.3.1
+Version: 0.5.1
Summary: Network Automation and Programmability Abstraction Layer with Multivendor support
Home-page: https://github.com/napalm-automation/napalm-ios
-Author: David Barroso
-Author-email: dbarrosop at dravetech.com
+Author: Kirk Byers
+Author-email: ktbyers at twb-tech.com
License: UNKNOWN
Description: UNKNOWN
Platform: UNKNOWN
diff --git a/napalm_ios.egg-info/SOURCES.txt b/napalm_ios.egg-info/SOURCES.txt
index e1fe75a..8e38a0c 100644
--- a/napalm_ios.egg-info/SOURCES.txt
+++ b/napalm_ios.egg-info/SOURCES.txt
@@ -10,5 +10,9 @@ napalm_ios.egg-info/dependency_links.txt
napalm_ios.egg-info/requires.txt
napalm_ios.egg-info/top_level.txt
napalm_ios/templates/delete_ntp_peers.j2
+napalm_ios/templates/delete_ntp_servers.j2
+napalm_ios/templates/delete_snmp_config.j2
napalm_ios/templates/set_hostname.j2
-napalm_ios/templates/set_ntp_peers.j2
\ No newline at end of file
+napalm_ios/templates/set_ntp_peers.j2
+napalm_ios/templates/set_ntp_servers.j2
+napalm_ios/templates/snmp_config.j2
\ No newline at end of file
diff --git a/napalm_ios.egg-info/requires.txt b/napalm_ios.egg-info/requires.txt
index 93aafe3..2b7a494 100644
--- a/napalm_ios.egg-info/requires.txt
+++ b/napalm_ios.egg-info/requires.txt
@@ -1,2 +1,2 @@
-napalm-base>=0.18.0
+napalm-base>=0.20.4
netmiko>=1.0.0
diff --git a/napalm_ios/ios.py b/napalm_ios/ios.py
old mode 100644
new mode 100755
index 620814f..e3048d6
--- a/napalm_ios/ios.py
+++ b/napalm_ios/ios.py
@@ -14,6 +14,7 @@
# the License.
from __future__ import print_function
+from __future__ import unicode_literals
import re
@@ -21,6 +22,8 @@ from netmiko import ConnectHandler, FileTransfer
from netmiko import __version__ as netmiko_version
from napalm_base.base import NetworkDriver
from napalm_base.exceptions import ReplaceConfigException, MergeConfigException
+from napalm_base.utils import py23_compat
+import napalm_base.constants as C
# Easier to store these as constants
HOUR_SECONDS = 3600
@@ -28,6 +31,18 @@ DAY_SECONDS = 24 * HOUR_SECONDS
WEEK_SECONDS = 7 * DAY_SECONDS
YEAR_SECONDS = 365 * DAY_SECONDS
+# STD REGEX PATTERNS
+IP_ADDR_REGEX = r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}"
+MAC_REGEX = r"[a-fA-F0-9]{4}\.[a-fA-F0-9]{4}\.[a-fA-F0-9]{4}"
+VLAN_REGEX = r"\d{1,4}"
+RE_IPADDR = re.compile(r"{}".format(IP_ADDR_REGEX))
+RE_IPADDR_STRIP = re.compile(r"({})\n".format(IP_ADDR_REGEX))
+RE_MAC = re.compile(r"{}".format(MAC_REGEX))
+
+IOS_COMMANDS = {
+ 'show_mac_address': ['show mac-address-table', 'show mac address-table'],
+}
+
class IOSDriver(NetworkDriver):
"""NAPALM Cisco IOS Handler."""
@@ -94,6 +109,8 @@ class IOSDriver(NetworkDriver):
username=self.username,
password=self.password,
**self.netmiko_optional_args)
+ # ensure in enable mode
+ self.device.enable()
if not self.dest_file_system:
try:
self.dest_file_system = self.device._autodetect_fs()
@@ -105,6 +122,26 @@ class IOSDriver(NetworkDriver):
"""Close the connection to the device."""
self.device.disconnect()
+ def _send_command(self, command):
+ """Wrapper for self.device.send.command().
+
+ If command is a list will iterate through commands until valid command.
+ """
+ if isinstance(command, list):
+ for cmd in command:
+ output = self.device.send_command(cmd)
+ if "% Invalid" not in output:
+ break
+ else:
+ output = self.device.send_command(command)
+ return self._send_command_postprocess(output)
+
+ def is_alive(self):
+ """Returns a flag with the state of the SSH connection."""
+ return {
+ 'is_alive': self.device.remote_conn.transport.is_active()
+ }
+
def load_replace_candidate(self, filename=None, config=None):
"""
SCP file to device filesystem, defaults to candidate_config.
@@ -115,9 +152,9 @@ class IOSDriver(NetworkDriver):
if config:
raise NotImplementedError
if filename:
- (return_status, msg) = self.scp_file(source_file=filename,
- dest_file=self.candidate_cfg,
- file_system=self.dest_file_system)
+ (return_status, msg) = self._scp_file(source_file=filename,
+ dest_file=self.candidate_cfg,
+ file_system=self.dest_file_system)
if not return_status:
if msg == '':
msg = "SCP transfer to remote device failed"
@@ -133,16 +170,16 @@ class IOSDriver(NetworkDriver):
if config:
raise NotImplementedError
if filename:
- (return_status, msg) = self.scp_file(source_file=filename,
- dest_file=self.merge_cfg,
- file_system=self.dest_file_system)
+ (return_status, msg) = self._scp_file(source_file=filename,
+ dest_file=self.merge_cfg,
+ file_system=self.dest_file_system)
if not return_status:
if msg == '':
msg = "SCP transfer to remote device failed"
raise MergeConfigException(msg)
@staticmethod
- def normalize_compare_config(diff):
+ def _normalize_compare_config(diff):
"""Filter out strings that should not show up in the diff."""
ignore_strings = ['Contextual Config Diffs', 'No changes were found',
'file prompt quiet', 'ntp clock-period']
@@ -171,31 +208,28 @@ class IOSDriver(NetworkDriver):
new_diff.append('! No changes specified in merge file.')
return "\n".join(new_diff)
- def compare_config(self,
- base_file='running-config',
- new_file=None,
- base_file_system='system:',
- new_file_system=None):
+ def compare_config(self):
"""
show archive config differences <base_file> <new_file>.
Default operation is to compare system:running-config to self.candidate_cfg
"""
- # Set defaults if not passed as arguments
- if new_file is None:
- if self.config_replace:
- new_file = self.candidate_cfg
- else:
- new_file = self.merge_cfg
- if new_file_system is None:
- new_file_system = self.dest_file_system
- base_file_full = self.gen_full_path(filename=base_file, file_system=base_file_system)
- new_file_full = self.gen_full_path(filename=new_file, file_system=new_file_system)
+ # Set defaults
+ base_file = 'running-config'
+ base_file_system = 'system:'
+ if self.config_replace:
+ new_file = self.candidate_cfg
+ else:
+ new_file = self.merge_cfg
+ new_file_system = self.dest_file_system
+
+ base_file_full = self._gen_full_path(filename=base_file, file_system=base_file_system)
+ new_file_full = self._gen_full_path(filename=new_file, file_system=new_file_system)
if self.config_replace:
cmd = 'show archive config differences {} {}'.format(base_file_full, new_file_full)
diff = self.device.send_command_expect(cmd)
- diff = self.normalize_compare_config(diff)
+ diff = self._normalize_compare_config(diff)
else:
cmd = 'more {}'.format(new_file_full)
diff = self.device.send_command_expect(cmd)
@@ -217,7 +251,7 @@ class IOSDriver(NetworkDriver):
output = ''
return output
- def commit_config(self, filename=None):
+ def commit_config(self):
"""
If replacement operation, perform 'configure replace' for the entire config.
@@ -228,9 +262,8 @@ class IOSDriver(NetworkDriver):
if self.config_replace:
# Replace operation
- if filename is None:
- filename = self.candidate_cfg
- cfg_file = self.gen_full_path(filename)
+ filename = self.candidate_cfg
+ cfg_file = self._gen_full_path(filename)
if not self._check_file_exists(cfg_file):
raise ReplaceConfigException("Candidate config file does not exist")
if self.auto_rollback_on_error:
@@ -243,9 +276,8 @@ class IOSDriver(NetworkDriver):
raise ReplaceConfigException("Candidate config could not be applied")
else:
# Merge operation
- if filename is None:
- filename = self.merge_cfg
- cfg_file = self.gen_full_path(filename)
+ filename = self.merge_cfg
+ cfg_file = self._gen_full_path(filename)
if not self._check_file_exists(cfg_file):
raise MergeConfigException("Merge source config file does not exist")
cmd = 'copy {} running-config'.format(cfg_file)
@@ -262,24 +294,23 @@ class IOSDriver(NetworkDriver):
def discard_config(self):
"""Set candidate_cfg to current running-config. Erase the merge_cfg file."""
- discard_candidate = 'copy running-config {}'.format(self.gen_full_path(self.candidate_cfg))
- discard_merge = 'copy null: {}'.format(self.gen_full_path(self.merge_cfg))
+ discard_candidate = 'copy running-config {}'.format(self._gen_full_path(self.candidate_cfg))
+ discard_merge = 'copy null: {}'.format(self._gen_full_path(self.merge_cfg))
self._disable_confirm()
self.device.send_command_expect(discard_candidate)
self.device.send_command_expect(discard_merge)
self._enable_confirm()
- def rollback(self, filename=None):
+ def rollback(self):
"""Rollback configuration to filename or to self.rollback_cfg file."""
- if filename is None:
- filename = self.rollback_cfg
- cfg_file = self.gen_full_path(filename)
+ filename = self.rollback_cfg
+ cfg_file = self._gen_full_path(filename)
if not self._check_file_exists(cfg_file):
raise ReplaceConfigException("Rollback config file does not exist")
cmd = 'configure replace {} force'.format(cfg_file)
self.device.send_command_expect(cmd)
- def scp_file(self, source_file, dest_file, file_system):
+ def _scp_file(self, source_file, dest_file, file_system):
"""
SCP file to remote device.
@@ -328,7 +359,7 @@ class IOSDriver(NetworkDriver):
cmd = 'file prompt quiet'
self.device.send_config_set([cmd])
- def gen_full_path(self, filename, file_system=None):
+ def _gen_full_path(self, filename, file_system=None):
"""Generate full file path on remote device."""
if file_system is None:
return '{}/{}'.format(self.dest_file_system, filename)
@@ -339,7 +370,7 @@ class IOSDriver(NetworkDriver):
def _gen_rollback_cfg(self):
"""Save a configuration that can be used for rollback."""
- cfg_file = self.gen_full_path(self.rollback_cfg)
+ cfg_file = self._gen_full_path(self.rollback_cfg)
cmd = 'copy running-config {}'.format(cfg_file)
self._disable_confirm()
self.device.send_command_expect(cmd)
@@ -377,8 +408,7 @@ class IOSDriver(NetworkDriver):
if self.interface_map.get(interface_brief):
return self.interface_map.get(interface_brief)
command = 'show int {}'.format(interface_brief)
- output = self.device.send_command(command)
- output = output.strip()
+ output = self._send_command(command)
first_line = output.splitlines()[0]
if 'line protocol' in first_line:
full_int_name = first_line.split()[0]
@@ -387,11 +417,23 @@ class IOSDriver(NetworkDriver):
else:
return interface_brief
+ @staticmethod
+ def _send_command_postprocess(output):
+ """
+ Cleanup actions on send_command() for NAPALM getters.
+
+ Remove "Load for five sec; one minute if in output"
+ Remove "Time source is"
+ """
+ output = re.sub(r"^Load for five secs.*$", "", output, flags=re.M)
+ output = re.sub(r"^Time source is .*$", "", output, flags=re.M)
+ return output.strip()
+
def get_lldp_neighbors(self):
"""IOS implementation of get_lldp_neighbors."""
lldp = {}
command = 'show lldp neighbors'
- output = self.device.send_command(command)
+ output = self._send_command(command)
# Check if router supports the command
if '% Invalid input' in output:
@@ -417,28 +459,26 @@ class IOSDriver(NetworkDriver):
return lldp
- def get_lldp_neighbors_detail(self):
+ def get_lldp_neighbors_detail(self, interface=''):
"""
IOS implementation of get_lldp_neighbors_detail.
Calls get_lldp_neighbors.
"""
- def pad_list_entries(my_list, list_length):
- """Normalize the length of all the LLDP fields."""
- if len(my_list) < list_length:
- for i in range(list_length):
- try:
- my_list[i]
- except IndexError:
- my_list[i] = u"N/A"
- return my_list
-
lldp = {}
lldp_neighbors = self.get_lldp_neighbors()
+ # Filter to specific interface
+ if interface:
+ lldp_data = lldp_neighbors.get(interface)
+ if lldp_data:
+ lldp_neighbors = {interface: lldp_data}
+ else:
+ lldp_neighbors = {}
+
for interface in lldp_neighbors:
command = "show lldp neighbors {} detail".format(interface)
- output = self.device.send_command(command)
+ output = self._send_command(command)
# Check if router supports the command
if '% Invalid input' in output:
@@ -459,14 +499,11 @@ class IOSDriver(NetworkDriver):
lldp_fields = [port_id, port_description, chassis_id, system_name, system_description,
system_capabilities, enabled_capabilities, remote_address]
- # Check length of each list
+ # re.findall will return a list. Make sure same number of entries always returned.
for test_list in lldp_fields:
- if len(test_list) > number_entries:
+ if len(test_list) != number_entries:
raise ValueError("Failure processing show lldp neighbors detail")
- # Pad any missing entries with "N/A"
- lldp_fields = [pad_list_entries(field, number_entries) for field in lldp_fields]
-
# Standardize the fields
port_id, port_description, chassis_id, system_name, system_description, \
system_capabilities, enabled_capabilities, remote_address = lldp_fields
@@ -528,9 +565,9 @@ class IOSDriver(NetworkDriver):
serial_number, fqdn, os_version, hostname = (u'Unknown', u'Unknown', u'Unknown', u'Unknown')
# obtain output from device
- show_ver = self.device.send_command('show version')
- show_hosts = self.device.send_command('show hosts')
- show_ip_int_br = self.device.send_command('show ip interface brief')
+ show_ver = self._send_command('show version')
+ show_hosts = self._send_command('show hosts')
+ show_ip_int_br = self._send_command('show ip interface brief')
# uptime/serial_number/IOS version
for line in show_ver.splitlines():
@@ -582,10 +619,10 @@ class IOSDriver(NetworkDriver):
return {
'uptime': uptime,
'vendor': vendor,
- 'os_version': unicode(os_version),
- 'serial_number': unicode(serial_number),
- 'model': unicode(model),
- 'hostname': unicode(hostname),
+ 'os_version': py23_compat.text_type(os_version),
+ 'serial_number': py23_compat.text_type(serial_number),
+ 'model': py23_compat.text_type(model),
+ 'hostname': py23_compat.text_type(hostname),
'fqdn': fqdn,
'interface_list': interface_list
}
@@ -627,7 +664,7 @@ class IOSDriver(NetworkDriver):
speed_regex = r".*BW\s(?P<speed>\d+)\s(?P<speed_format>\S+).*"
command = 'show ip interface brief'
- output = self.device.send_command(command)
+ output = self._send_command(command)
for line in output.splitlines():
if 'Interface' in line and 'Status' in line:
continue
@@ -673,7 +710,7 @@ class IOSDriver(NetworkDriver):
for interface in interface_list:
show_command = "show interface {0}".format(interface)
- interface_output = self.device.send_command(show_command)
+ interface_output = self._send_command(show_command)
try:
# description filter
description = re.search(r" Description: (.+)", interface_output)
@@ -686,7 +723,7 @@ class IOSDriver(NetworkDriver):
match_mac = re.match(mac_regex, interface_output, flags=re.DOTALL)
group_mac = match_mac.groupdict()
mac_address = group_mac["mac_address"]
- interface_list[interface]['mac_address'] = unicode(mac_address)
+ interface_list[interface]['mac_address'] = py23_compat.text_type(mac_address)
except AttributeError:
interface_list[interface]['mac_address'] = u'N/A'
try:
@@ -731,7 +768,7 @@ class IOSDriver(NetworkDriver):
interfaces = {}
command = 'show ip interface brief'
- output = self.device.send_command(command)
+ output = self._send_command(command)
for line in output.splitlines():
if 'Interface' in line and 'Status' in line:
continue
@@ -745,7 +782,7 @@ class IOSDriver(NetworkDriver):
# Parse IP Address and Subnet Mask from Interfaces
for interface in interfaces:
show_command = "show run interface {0}".format(interface)
- interface_output = self.device.send_command(show_command)
+ interface_output = self._send_command(show_command)
for line in interface_output.splitlines():
if 'ip address ' in line and 'no ip address' not in line:
fields = line.split()
@@ -753,7 +790,7 @@ class IOSDriver(NetworkDriver):
# Check for 'ip address dhcp', convert to ip address and mask
if fields[2] == 'dhcp':
cmd = "show interface {} | in Internet address is".format(interface)
- show_int = self.device.send_command(cmd)
+ show_int = self._send_command(cmd)
int_fields = show_int.split()
ip_address, subnet = int_fields[3].split(r'/')
interfaces[interface]['ipv4'] = {ip_address: {}}
@@ -803,11 +840,11 @@ class IOSDriver(NetworkDriver):
raise ValueError(u"Unexpected Response from the device")
# remove keys with no data
- for key in interfaces.keys():
- if not bool(interfaces[key]):
- del interfaces[key]
-
- return interfaces
+ new_interfaces = {}
+ for k, val in interfaces.items():
+ if val:
+ new_interfaces[k] = val
+ return new_interfaces
@staticmethod
def bgp_time_conversion(bgp_uptime):
@@ -888,7 +925,9 @@ class IOSDriver(NetworkDriver):
bgp_neighbor_data = {}
bgp_neighbor_data['global'] = {}
- output = self.device.send_command(cmd_bgp_summary).strip()
+ output = self._send_command(cmd_bgp_summary).strip()
+ # Cisco issue where new lines are inserted after neighbor IP
+ output = re.sub(RE_IPADDR_STRIP, r"\1", output)
if 'Neighbor' not in output:
return {}
for line in output.splitlines():
@@ -899,11 +938,13 @@ class IOSDriver(NetworkDriver):
router_id = match.group(1)
local_as = int(match.group(2))
break
- bgp_neighbor_data['global']['router_id'] = unicode(router_id)
+ bgp_neighbor_data['global']['router_id'] = py23_compat.text_type(router_id)
bgp_neighbor_data['global']['peers'] = {}
cmd_neighbor_table = 'show ip bgp summary | begin Neighbor'
- output = self.device.send_command(cmd_neighbor_table).strip()
+ output = self._send_command(cmd_neighbor_table).strip()
+ # Cisco issue where new lines are inserted after neighbor IP
+ output = re.sub(RE_IPADDR_STRIP, r"\1", output)
for line in output.splitlines():
line = line.strip()
if 'Neighbor' in line or line == '':
@@ -911,6 +952,7 @@ class IOSDriver(NetworkDriver):
fields = line.split()[:10]
peer_id, bgp_version, remote_as, msg_rcvd, msg_sent, table_version, in_queue, \
out_queue, up_time, state_prefix = fields
+ peer_id = peer_id.replace('*', '')
if '(Admin)' in state_prefix:
is_enabled = False
@@ -931,7 +973,7 @@ class IOSDriver(NetworkDriver):
cmd_remote_rid = 'show ip bgp neighbors {} | inc router ID'.format(peer_id)
# output: BGP version 4, remote router ID 1.1.1.1
- remote_rid_out = self.device.send_command(cmd_remote_rid).strip()
+ remote_rid_out = self._send_command(cmd_remote_rid)
remote_rid = remote_rid_out.split()[-1]
bgp_neighbor_data['global']['peers'].setdefault(peer_id, {})
@@ -942,11 +984,11 @@ class IOSDriver(NetworkDriver):
peer_dict['local_as'] = local_as
peer_dict['is_enabled'] = is_enabled
peer_dict['is_up'] = is_up
- peer_dict['remote_id'] = unicode(remote_rid)
+ peer_dict['remote_id'] = py23_compat.text_type(remote_rid)
cmd_current_prefixes = 'show ip bgp neighbors {} | inc Prefixes Current'.format(peer_id)
# output: Prefixes Current: 0 0
- current_prefixes_out = self.device.send_command(cmd_current_prefixes).strip()
+ current_prefixes_out = self._send_command(cmd_current_prefixes)
pattern = r'Prefixes Current:\s+(\d+)\s+(\d+).*' # Prefixes Current: 0 0
match = re.search(pattern, current_prefixes_out)
if match:
@@ -960,7 +1002,7 @@ class IOSDriver(NetworkDriver):
# Local Policy Denied Prefixes: -------- -------
# prefix-list 0 2
# Total: 0 2
- filtered_prefixes_out = self.device.send_command(cmd_filtered_prefix).strip()
+ filtered_prefixes_out = self._send_command(cmd_filtered_prefix)
sent_prefixes = int(sent_prefixes)
pattern = r'Total:\s+\d+\s+(\d+).*' # Total: 0 2
match = re.search(pattern, filtered_prefixes_out)
@@ -1004,8 +1046,7 @@ class IOSDriver(NetworkDriver):
"""
counters = {}
command = 'show interfaces'
- output = self.device.send_command(command)
- output = output.strip()
+ output = self._send_command(command)
# Break output into per-interface sections
interface_strings = re.split(r'.* line protocol is .*', output, flags=re.M)
@@ -1080,8 +1121,7 @@ class IOSDriver(NetworkDriver):
cpu_cmd = 'show proc cpu'
mem_cmd = 'show memory statistics'
- output = self.device.send_command(cpu_cmd)
- output = output.strip()
+ output = self._send_command(cpu_cmd)
environment.setdefault('cpu', {})
environment['cpu'][0] = {}
environment['cpu'][0]['%usage'] = 0.0
@@ -1093,8 +1133,7 @@ class IOSDriver(NetworkDriver):
environment['cpu'][0]['%usage'] = float(match.group(1))
break
- output = self.device.send_command(mem_cmd)
- output = output.strip()
+ output = self._send_command(mem_cmd)
for line in output.splitlines():
if 'Processor' in line:
_, _, _, proc_used_mem, proc_free_mem = line.split()[:5]
@@ -1145,36 +1184,47 @@ class IOSDriver(NetworkDriver):
arp_table = []
command = 'show arp | exclude Incomplete'
- output = self.device.send_command(command)
- output = output.split('\n')
+ output = self._send_command(command)
# Skip the first line which is a header
- output = output[1:-1]
+ output = output.split('\n')
+ output = output[1:]
for line in output:
if len(line) == 0:
return {}
- if len(line.split()) == 6:
+ if len(line.split()) == 5:
+ # Static ARP entries have no interface
+ # Internet 10.0.0.1 - 0010.2345.1cda ARPA
+ interface = ''
+ protocol, address, age, mac, eth_type = line.split()
+ elif len(line.split()) == 6:
protocol, address, age, mac, eth_type, interface = line.split()
- try:
- if age == '-':
- age = 0
- age = float(age)
- except ValueError:
- print("Unable to convert age value to float: {}".format(age))
- entry = {
- 'interface': interface,
- 'mac': mac,
- 'ip': address,
- 'age': age
- }
- arp_table.append(entry)
else:
raise ValueError("Unexpected output from: {}".format(line.split()))
+ try:
+ if age == '-':
+ age = 0
+ age = float(age)
+ except ValueError:
+ raise ValueError("Unable to convert age value to float: {}".format(age))
+
+ # Validate we matched correctly
+ if not re.search(RE_IPADDR, address):
+ raise ValueError("Invalid IP Address detected: {}".format(address))
+ if not re.search(RE_MAC, mac):
+ raise ValueError("Invalid MAC Address detected: {}".format(mac))
+ entry = {
+ 'interface': interface,
+ 'mac': mac,
+ 'ip': address,
+ 'age': age
+ }
+ arp_table.append(entry)
return arp_table
- def cli(self, commands=None):
+ def cli(self, commands):
"""
Execute a list of commands and return the output in a dictionary format using the command
as the key.
@@ -1188,12 +1238,11 @@ class IOSDriver(NetworkDriver):
"""
cli_output = dict()
-
if type(commands) is not list:
raise TypeError('Please enter a valid list of commands!')
for command in commands:
- output = self.device.send_command(command)
+ output = self._send_command(command)
if 'Invalid input detected' in output:
raise ValueError('Unable to execute command "{}"'.format(command))
cli_output.setdefault(command, {})
@@ -1201,12 +1250,39 @@ class IOSDriver(NetworkDriver):
return cli_output
+ def get_ntp_servers(self):
+ """Implementation of get_ntp_servers for IOS.
+
+ Returns the NTP servers configuration as dictionary.
+ The keys of the dictionary represent the IP Addresses of the servers.
+ Inner dictionaries do not have yet any available keys.
+ Example::
+ {
+ '192.168.0.1': {},
+ '17.72.148.53': {},
+ '37.187.56.220': {},
+ '162.158.20.18': {}
+ }
+ """
+ ntp_servers = {}
+ command = 'show run | include ntp server'
+ output = self._send_command(command)
+
+ for line in output.splitlines():
+ split_line = line.split()
+ if "vrf" == split_line[2]:
+ ntp_servers[split_line[4]] = {}
+ else:
+ ntp_servers[split_line[2]] = {}
+
+ return ntp_servers
+
def get_ntp_stats(self):
"""Implementation of get_ntp_stats for IOS."""
ntp_stats = []
command = 'show ntp associations'
- output = self.device.send_command(command)
+ output = self._send_command(command)
for line in output.splitlines():
# Skip first two lines and last line of command output
@@ -1221,12 +1297,12 @@ class IOSDriver(NetworkDriver):
address_regex = re.match('(\W*)([0-9.*]*)', address)
try:
ntp_stats.append({
- 'remote': unicode(address_regex.group(2)),
+ 'remote': py23_compat.text_type(address_regex.group(2)),
'synchronized': ('*' in address_regex.group(1)),
- 'referenceid': unicode(ref_clock),
+ 'referenceid': py23_compat.text_type(ref_clock),
'stratum': int(st),
'type': u'-',
- 'when': unicode(when),
+ 'when': py23_compat.text_type(when),
'hostpoll': int(poll),
'reachability': int(reach),
'delay': float(delay),
@@ -1249,39 +1325,108 @@ class IOSDriver(NetworkDriver):
* static (boolean)
* moves (int)
* last_move (float)
+
+ Format1:
+ Destination Address Address Type VLAN Destination Port
+ ------------------- ------------ ---- --------------------
+ 6400.f1cf.2cc6 Dynamic 1 Wlan-GigabitEthernet0
+
+ Cat 6500:
+ Legend: * - primary entry
+ age - seconds since last seen
+ n/a - not available
+
+ vlan mac address type learn age ports
+ ------+----------------+--------+-----+----------+--------------------------
+ * 999 1111.2222.3333 dynamic Yes 0 Port-channel1
+ 999 1111.2222.3333 dynamic Yes 0 Port-channel1
+
+ Cat 4948
+ Unicast Entries
+ vlan mac address type protocols port
+ -------+---------------+--------+---------------------+--------------------
+ 999 1111.2222.3333 dynamic ip Port-channel1
+
+ Cat 2960
+ Mac Address Table
+ -------------------------------------------
+
+ Vlan Mac Address Type Ports
+ ---- ----------- -------- -----
+ All 1111.2222.3333 STATIC CPU
"""
+
+ RE_MACTABLE_DEFAULT = r"^" + MAC_REGEX
+ RE_MACTABLE_6500_1 = r"^\*\s+{}\s+{}\s+".format(VLAN_REGEX, MAC_REGEX) # 7 fields
+ RE_MACTABLE_6500_2 = r"^{}\s+{}\s+".format(VLAN_REGEX, MAC_REGEX) # 6 fields
+ RE_MACTABLE_4500 = r"^{}\s+{}\s+".format(VLAN_REGEX, MAC_REGEX) # 5 fields
+ RE_MACTABLE_2960_1 = r"^All\s+{}".format(MAC_REGEX)
+ RE_MACTABLE_2960_2 = r"^{}\s+{}\s+".format(VLAN_REGEX, MAC_REGEX) # 4 fields
+
+ def process_mac_fields(vlan, mac, mac_type, interface):
+ """Return proper data for mac address fields."""
+ if mac_type.lower() in ['self', 'static']:
+ static = True
+ if vlan.lower() == 'all':
+ vlan = 0
+ if interface.lower() == 'cpu':
+ interface = ''
+ else:
+ static = False
+ if mac_type.lower() in ['dynamic']:
+ active = True
+ else:
+ active = False
+ return {
+ 'mac': mac,
+ 'interface': interface,
+ 'vlan': int(vlan),
+ 'static': static,
+ 'active': active,
+ 'moves': -1,
+ 'last_move': -1.0
+ }
+
mac_address_table = []
- command = 'show mac-address-table'
- output = self.device.send_command(command)
- output = output.strip().split('\n')
+ command = IOS_COMMANDS['show_mac_address']
+ output = self._send_command(command)
- # Skip the first two lines which are headers
- output = output[2:]
- for line in output:
- if len(line) == 0:
- return mac_address_table
- elif len(line.split()) == 4:
- mac, mac_type, vlan, interface = line.split()
- if mac_type.lower() in ['self', 'static']:
- static = True
- else:
- static = False
- if mac_type.lower() in ['dynamic']:
- active = True
+ # Skip the header lines
+ output = re.split(r'^----.*', output, flags=re.M)[1:]
+ output = "\n".join(output).strip()
+ for line in output.splitlines():
+ line = line.strip()
+ if line == '':
+ continue
+ # Format1
+ elif re.search(RE_MACTABLE_DEFAULT, line):
+ if len(line.split()) == 4:
+ mac, mac_type, vlan, interface = line.split()
+ mac_address_table.append(process_mac_fields(vlan, mac, mac_type, interface))
else:
- active = False
- entry = {
- 'mac': mac,
- 'interface': interface,
- 'vlan': int(vlan),
- 'static': static,
- 'active': active,
- 'moves': -1,
- 'last_move': -1.0
- }
- mac_address_table.append(entry)
+ raise ValueError("Unexpected output from: {}".format(line.split()))
+ # Cat6500 format
+ elif (re.search(RE_MACTABLE_6500_1, line) or re.search(RE_MACTABLE_6500_2, line)) and \
+ len(line.split()) >= 6:
+ if len(line.split()) == 7:
+ _, vlan, mac, mac_type, _, _, interface = line.split()
+ elif len(line.split()) == 6:
+ vlan, mac, mac_type, _, _, interface = line.split()
+ mac_address_table.append(process_mac_fields(vlan, mac, mac_type, interface))
+ # Cat4948 format
+ elif re.search(RE_MACTABLE_4500, line) and len(line.split()) == 5:
+ vlan, mac, mac_type, _, interface = line.split()
+ mac_address_table.append(process_mac_fields(vlan, mac, mac_type, interface))
+ # Cat2960 format - ignore extra header line
+ elif re.search(r"^Vlan\s+Mac Address\s+", line):
+ continue
+ # Cat2960 format
+ elif (re.search(RE_MACTABLE_2960_1, line) or re.search(RE_MACTABLE_2960_2, line)) and \
+ len(line.split()) == 4:
+ vlan, mac, mac_type, interface = line.split()
+ mac_address_table.append(process_mac_fields(vlan, mac, mac_type, interface))
else:
- raise ValueError("Unexpected output from: {}".format(line.split()))
+ raise ValueError("Unexpected output from: {}".format(repr(line)))
return mac_address_table
def get_snmp_information(self):
@@ -1308,7 +1453,7 @@ class IOSDriver(NetworkDriver):
'location': u'unknown'
}
command = 'show run | include snmp-server'
- output = self.device.send_command(command)
+ output = self._send_command(command)
for line in output.splitlines():
fields = line.split()
if 'snmp-server community' in line:
@@ -1333,7 +1478,7 @@ class IOSDriver(NetworkDriver):
# If SNMP Chassis wasn't found; obtain using direct command
if snmp_dict['chassis_id'] == 'unknown':
command = 'show snmp chassis'
- snmp_chassis = self.device.send_command(command)
+ snmp_chassis = self._send_command(command)
snmp_dict['chassis_id'] = snmp_chassis
return snmp_dict
@@ -1364,8 +1509,7 @@ class IOSDriver(NetworkDriver):
if source != '':
command += ' source {}'.format(source)
- output = self.device.send_command(command)
-
+ output = self._send_command(command)
if '%' in output:
ping_dict['error'] = output
elif 'Sending' in output:
@@ -1402,8 +1546,147 @@ class IOSDriver(NetworkDriver):
})
results_array = []
for i in range(probes_received):
- results_array.append({'ip_address': unicode(destination), 'rtt': 0.0})
-
+ results_array.append({'ip_address': py23_compat.text_type(destination),
+ 'rtt': 0.0})
ping_dict['success'].update({'results': results_array})
return ping_dict
+
+ def traceroute(self, destination, source=C.TRACEROUTE_SOURCE,
+ ttl=C.TRACEROUTE_TTL, timeout=C.TRACEROUTE_TIMEOUT):
+ """
+ Executes traceroute on the device and returns a dictionary with the result.
+
+ :param destination: Host or IP Address of the destination
+ :param source (optional): Use a specific IP Address to execute the traceroute
+ :param ttl (optional): Maimum number of hops -> int (0-255)
+ :param timeout (optional): Number of seconds to wait for response -> int (1-3600)
+
+ Output dictionary has one of the following keys:
+
+ * success
+ * error
+
+ In case of success, the keys of the dictionary represent the hop ID, while values are
+ dictionaries containing the probes results:
+ * rtt (float)
+ * ip_address (str)
+ * host_name (str)
+ """
+
+ command = "traceroute {}".format(destination)
+ if source:
+ command += " source {}".format(source)
+ if ttl:
+ if isinstance(ttl, int) and 0 <= timeout <= 255:
+ command += " ttl 0 {}".format(str(ttl))
+ if timeout:
+ # Timeout should be an integer between 1 and 3600
+ if isinstance(timeout, int) and 1 <= timeout <= 3600:
+ command += " timeout {}".format(str(timeout))
+
+ # Calculation to leave enough time for traceroute to complete assumes send_command
+ # delay of .2 seconds.
+ max_loops = (5 * ttl * timeout) + 150
+ if max_loops < 500: # Make sure max_loops isn't set artificially low
+ max_loops = 500
+ output = self.device.send_command(command, max_loops=max_loops)
+
+ # Prepare return dict
+ traceroute_dict = dict()
+ if re.search('Unrecognized host or address', output):
+ traceroute_dict['error'] = 'unknown host %s' % destination
+ return traceroute_dict
+ else:
+ traceroute_dict['success'] = dict()
+
+ results = dict()
+ # Find all hops
+ hops = re.findall('\\n\s+[0-9]{1,3}\s', output)
+ for hop in hops:
+ # Search for hop in the output
+ hop_match = re.search(hop, output)
+ # Find the start index for hop
+ start_index = hop_match.start()
+ # If this is last hop
+ if hops.index(hop) + 1 == len(hops):
+ # Set the stop index for hop to len of output
+ stop_index = len(output)
+ # else, find the start index for next hop
+ else:
+ next_hop_match = re.search(hops[hops.index(hop) + 1], output)
+ stop_index = next_hop_match.start()
+ # Now you have the start and stop index for each hop
+ # and you can parse the probes
+ # Set the hop_variable, and remove spaces between msec for easier matching
+ hop_string = output[start_index:stop_index].replace(' msec', 'msec')
+ hop_list = hop_string.split()
+ current_hop = int(hop_list.pop(0))
+ # Prepare dictionary for each hop (assuming there are 3 probes in each hop)
+ results[current_hop] = dict()
+ results[current_hop]['probes'] = dict()
+ results[current_hop]['probes'][1] = {'rtt': float(),
+ 'ip_address': '',
+ 'host_name': ''}
+ results[current_hop]['probes'][2] = {'rtt': float(),
... 202 lines suppressed ...
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/napalm-ios.git
More information about the Python-modules-commits
mailing list