# -*- coding: utf-8 -*-
"""
nat_mch_telnet.py
~~~~~~~~~~~~~~~~~
This is a lazy implementation of the GenDev interface. The ultimate purpose
of this module is to be encapsulated within another module that really
implements that interface.
Unfortunately, NAT MCHs don't allow to go for a full implementation of the
GenDev interface based on an only communication interface.
The main operation that is only supported by the command line interface is the
firmware update. So that, this module based on Telnet, as access way, and the
command line interface of the MCH, shall only be used to perform a firmware
update and nothing else. For other operations, better use the module based on
the communication via the MHC web interface.
"""
from __future__ import annotations
import re
import time
import socket
import ipaddress
from collections import OrderedDict
from ..gendev_err import (
ConnTimeout,
NoRouteToDevice,
FeatureNotSupported,
ConnectionRefused,
)
from ..gendev_logger import GenDevLogger
from telnetlib import Telnet
__author__ = ["Felipe Torres González", "Ross Elliot"]
__copyright__ = "Copyright 2021, ESS MTCA Tools"
__credits__ = ["Jeong Han Lee"]
__license__ = "GPL-3.0"
__version__ = "1.1"
__maintainer__ = "Ross ELliot"
__email__ = "ross.elliot@ess.eu"
__status__ = "Development"
[docs]class NATMCHTelnet:
"""NATMCTelnet access an NAT MCH via Telnet.
This module implements some operations using the command line interface
via Telnet. Thus, the MCH has to be accessible in the network.
The firmware update relies on **the mchconfig-server** to serve the
firmware image using the FTP protocol.
Supported operations:
- Retrieve the general information of the MCH.
- Firmware update of the MCH.
- Enable DHCP mode of the MCH.
"""
# Telnet prompt
PROMPT = b"nat> "
BOOT_TIME = 60
[docs] def __init__(
self,
ip_address: str,
backplane,
port: int = 23,
logger: GenDevLogger = None,
hostname: str = None,
open: bool = True,
**kwargs,
):
"""Class constructor.
Args:
ip_address(str): the IP address of the MCH.
backplane(BackplaneType): type of backplane in which the MCH is installed.
port(int): port of the Telnet service (usually, 23).
logger(Logger): reference to a logger that is being used.
log_to_file(Bool): boolean flag to indicate if the log should be written
to a file.
hostname(str): hostname of the MCH.
open(bool): whether to establish the connection at instantiation time or later. \
Set this argument to False when a deferred link establishment is aimed.
**telnet_ip_address(str): the ip address of the telnet \
connection, if not equal to the IP address of the MCH, i.e. when \
connecting via a MOXA (optional).
**tftp_server_address(str): IP address of the machine hosting the TFTP \
server with the fw images. Default value = **172.30.4.69**.
**tftp_server_fw_path(str): path to the fw binaries in the TFTP server. \
Default value = "fw/".
Raises:
ConnTimeout: if the connection to the device gets lost.
NoRouteToDevice: if the device is not reachable.
"""
self.ip_address = ip_address
self.backplane = backplane
self.hostname = hostname
self.port = port
# Optional arguments
# If no telnet IP address is provided, use MCH IP address
if "telnet_ip_address" in kwargs:
self._telnet_ip_address = kwargs["telnet_ip_address"]
else:
self._telnet_ip_address = self.ip_address
if "tftp_server_address" in kwargs:
self._server_ip = kwargs["tftp_server_address"]
else:
self._server_ip = "172.30.4.69"
if "tftp_server_fw_path" in kwargs:
self._fw_path = kwargs["tftp_server_fw_path"]
else:
self._fw_path = "fw/"
if "log_to_file" in kwargs:
self._log_to_file = kwargs["log_to_file"]
else:
self._log_to_file = False
# Initialise Logger instance
if isinstance(logger, (GenDevLogger)):
self.logger = logger
else:
self.logger = GenDevLogger(logger=logger, log_to_file=self._log_to_file)
self.logger.logger.info(
"NATMCHTelnet - NAT MCH Telnet instance created."
"\tMCH IP Address: {}".format(self.ip_address)
)
if open:
self.open()
self._session_open = True
else:
self._session = Telnet()
self._session_open = False
# Regular expresions for extracting the infomration relative to the
# MCH from the version command.
self._match_fw_ver = re.compile(r"Firmware (V\d{1,2}\.\d{1,2}\.\d{1,2})")
# Search for the first occurrence of the token FPGA
self._match_fpga_ver = re.compile(r"FPGA (V\d{1,2}\.\d{1,2})")
self._match_mcu_ver = re.compile(r"AVR (\d{1,2}\.\d{1,2})")
self._match_board_sn = re.compile(r"sn: (\d{6}-\d{4})")
self._match_ip_addr = re.compile(r"ip address +: +((\d{1,3}\.?){4})")
self._match_mac_addr = re.compile(r"ieee address +: +(([\d\D]{2}:?){6})")
self._match_subnet_mask = re.compile(r"network mask +: +((\d{1,3}\.?){4})")
self._match_gateway_addr = re.compile(r"default gateway +: +((\d{1,3}\.?){4})")
self._match_hostname = re.compile("hostname +: (.+?(?=\r))")
self._match_dhcp_state = re.compile("dhcp state +: (.+?(?=\r))")
[docs] def open(self):
"""Establish the connection to the target MCH.
Raises:
ConnTimeout: if the connection to the device gets lost.
NoRouteToDevice: if the device is not reachable.
ConnectionRefused: if the telnet connection is refused.
"""
self.logger.logger.debug(
"Opening Telnet connection to {}:{}.".format(
self._telnet_ip_address, self.port
)
)
try:
self._session = Telnet(self._telnet_ip_address, self.port, timeout=10)
self._session_open = True
except Exception as e:
if isinstance(e, socket.timeout):
msg = "Timeout while opening the link to the MCH using Telnet."
self.logger.logger.error(msg)
raise ConnTimeout(msg)
elif isinstance(e, OSError) and (e.errno == 113 or e.errno == 101):
msg = "Check the connectivity to the MCH" " using the IP: {}.".format(
self._telnet_ip_address
)
self.logger.logger.error(msg)
raise NoRouteToDevice(msg)
elif isinstance(e, OSError) and (e.errno == 111):
msg = (
"Cannot connect to Telnet port {}:{} "
"Please close any existing "
"connections.".format(self._telnet_ip_address, self.port)
)
self.logger.logger.error(msg)
raise ConnectionRefused(msg)
else:
self.logger.logger.error("Unhandled exception: {}".format(e.strerror))
raise e
[docs] def close(self):
"""Close the Telnet session.
This is needed when multiple methods of this module are called in a row.
"""
self.logger.logger.debug(
"Closing Telnet connection to {}:{}.".format(self.ip_address, self.port)
)
self._session.close()
self._session_open = False
def _send_backspace(self, sleep: float = 0.25):
"""Internal method to write a backspace character to the MCH console.
Some fields in the MCH interface are already populated, and must be
cleared before writing the new value. The only way to do this is to
issue a backspace character.
"""
# Backspace is ASCII character 0x08, or special character ''\b'
self._session.write(b"\b")
time.sleep(sleep)
def _send_command(self, command: str, sleep: int = 1, clear_buffer: bool = True):
"""Internal method for sending a low level command to the MCH.
This command allows forgetting about the particular details of using
a Telnet session behind the scenes. A regular command from the MCH
command line interface can be sent through this interface without
worrying about the underlying communication.
Args:
command: command to be sent to the MCH.
sleep: amount of seconds to wait after sending a command.
clear_buffer: send a carriage return before the command. This
helps clearing previous garbage from the buffer, but
it should be used with caution because there are
commands that doesn't expect a carriage return after.
"""
# clean up
if clear_buffer:
self._session.write(b"\r")
self._session.read_until(self.PROMPT)
time.sleep(sleep)
self._session.write(command.encode("ascii") + b"\r")
time.sleep(sleep)
def _reboot(self, sleep: int = 70):
"""Internal command to send a reboot to the MCH.
Args:
sleep: indicates how many seconds to wait after returning from the
method. Write a 0 to avoid it.
"""
self.logger.logger.info("Rebooting MCH device.")
if self._session_open is False:
self.open()
self._send_command("reboot")
self.close()
# self._session_open = False
time.sleep(sleep)
def _read_command(self) -> str:
"""Internal command to read the Telnet Rx buffer.
This method attempts to read the content from the buffer without I/O
blocking.
Returns:
A string containing the content of the Rx buffer.
"""
response = self._session.read_very_eager()
return response.decode("ascii")
def _set_hostname(self):
"""Sets the hostname of the MCH device.
Returns:
True on success.
False on failure.
"""
self.logger.logger.info("Setting hostname to {}".format(self.hostname))
if self._session_open is False:
self.open()
self._send_command("mchcfg")
# Modify
self._send_command("11", clear_buffer=False)
# Clear previous entry
for _ in range(0, 75):
self._send_backspace()
self._send_command(self.hostname, clear_buffer=False)
self._send_command("q", clear_buffer=False)
# TODO: re-enable this check
# success = self._check_hostname()
success = True
self.close()
return success
def _check_hostname(self) -> bool:
"""Checks that the hostname set on the MCH matches
the value provided.
Returns:
True if the hostname setting matches the expected string.
False if the hostname setting does not match the expected string.
"""
if self._session_open is False:
self.open()
self._send_command("ni")
network_info = self._read_command()
mch_hostname = self._match_hostname.search(network_info).group(1)
success = False
if mch_hostname == self.hostname:
success = True
self.close()
return success
def _enable_dhcp(self):
"""Enables DHCP mode on the MCH
Returns:
True if DHCP was enabled successfully.
False if DHCP setting failed.
"""
self._send_command("mchcfg")
self._send_command("3", clear_buffer=False)
for _ in range(0, 5):
self._send_command("", clear_buffer=False)
self._send_command("2", clear_buffer=False)
for _ in range(0, 7):
self._send_command("", clear_buffer=False)
self._send_command("q", clear_buffer=False)
# The MCH needs a reboot for the hostname
# to update.
self._reboot()
# Wait for the MCH to complete the reboot process
time.sleep(self.BOOT_TIME)
# Finally, verify the change
success = self._check_dhcp()
return success
def _check_dhcp(self):
"""Check wether DHCP is enabled.
Returns:
True if DHCP is enabled
False if DHCP is disabled
"""
if self._session_open is False:
self.open()
self._send_command("ni")
network_info = self._read_command()
dhcp_state = self._match_dhcp_state.search(network_info).group(1)
enabled = False
if dhcp_state == " enabled":
enabled = True
self.close()
return enabled
def _clear_ip_address(self):
"""Helper method to send enough backspace characters
to clear an IP address
"""
# Maximum length of an IP address is 15 characters
for _ in range(0, 15):
self._send_backspace()
def _set_ip_addr(self):
"""Set the IP address of the MCH device."""
if self._session_open is False:
self.open()
# Get network information
self._send_command("ni")
network_info = self._read_command()
# Retrieve gateway address and netmask from network info
gateway_addr = self._match_gateway_addr.search(network_info).group(1)
netmask = self._match_subnet_mask.search(network_info).group(1)
# Calculate broadcast ip_address
ipv4net = ipaddress.IPv4Network(self.ip_address + "/" + netmask, False)
broadcast_addr = ipv4net.broadcast_address.compressed
# Trigger IP setting menu
self._send_command("ip")
# Clear the existing IP address, and update with new value
self._clear_ip_address()
self._send_command(self.ip_address, clear_buffer=False)
# Clear the existing netmask, and update with new value
self._clear_ip_address()
self._send_command(netmask, clear_buffer=False)
# Clear the existing broadcast address, and update with new value
self._clear_ip_address()
self._send_command(broadcast_addr, clear_buffer=False)
# Clear the existing gateway address, and update with new value
self._clear_ip_address()
self._send_command(gateway_addr, clear_buffer=False)
# Check we are at the confirmation prompt
response = self._read_command()
if not response.endswith("Are you really sure ?"):
self.close()
return False # Failure
# Issue confirmation 'y'
self._send_command("y", clear_buffer=False)
self.close()
# Success
return True
[docs] def device_info(self) -> tuple[bool, OrderedDict]:
"""Retrieve the main information about the device.
The information is returned in a dictionary with 2 categories:
*Board* and *Network*.
This feature is supported by all the implemented communication
interfaces, so the best one is chosen when multiple are allowed.
An example of the returned dictionary::
{
'Board': {
'fw_ver': 'V2.21.8',
'fpga_ver': 'V1.14',
'mcu_ver': '1.2',
'serial_num': '113522-1426'},
'Network': {'ip_address': '172.30.5.238',
'mac_address': '00:40:42:22:05:92',
'subnet_address': '255.255.252.0',
'gateway_address': '172.30.7.254'}
}
Returns:
A tuple containing the a success flag in the first position. In the \
second position:
- On success, a dictionary with the device information.
- On failure, an empty dictionary.
Raises:
ConnTimeout: if the connection to the device gets lost.
NoRouteToDevice: if the device is not reachable.
"""
if self._session_open is False:
self.open()
self._send_command("version")
raw_info_version = self._read_command()
self._send_command("ni")
raw_info_network = self._read_command()
if raw_info_version != "" and raw_info_network != "":
resp_dict = OrderedDict() # type: OrderedDict
resp_dict["Board"] = dict()
resp_dict["Board"]["fw_ver"] = self._match_fw_ver.search(
raw_info_version
).group(1)
resp_dict["Board"]["fpga_ver"] = self._match_fpga_ver.search(
raw_info_version
).group(1)
resp_dict["Board"]["mcu_ver"] = self._match_mcu_ver.search(
raw_info_version
).group(1)
resp_dict["Board"]["serial_num"] = self._match_board_sn.search(
raw_info_version
).group(1)
resp_dict["Network"] = dict()
resp_dict["Network"]["ip_address"] = self._match_ip_addr.search(
raw_info_network
).group(1)
resp_dict["Network"]["mac_address"] = self._match_mac_addr.search(
raw_info_network
).group(1)
resp_dict["Network"]["subnet_address"] = self._match_subnet_mask.search(
raw_info_network
).group(1)
resp_dict["Network"]["gateway_address"] = self._match_gateway_addr.search(
raw_info_network
).group(1)
valid = True
else:
valid = False
resp_dict = OrderedDict()
self.close()
return valid, resp_dict
[docs] def set_dhcp_mode(self) -> tuple[bool, str]:
"""Enables DHCP mode in the network configuration of the device.
Performs the following steps:
- Enables DHCP mode on the MCH.
- Sets the internal IP address value to match the address
provided by the DHCP server(required to prevent DHCP lease
issues).
- Sets the hostname value.
Returns
If failure, a tuple containing False, and a message about the
failure.
If success, a tuple containing True, and an empty string.
Raises:
ConnectionError: If the device is not accessible.
NoValidConn: If no valid connection types supporting this feature
are used by the device.
"""
self.logger.logger.info("Enabling DHCP mode.")
if self._session_open is False:
self.open()
total_success = True
success = [True] * 3
response = ""
# Enable DHCP mode
success[0] = self._enable_dhcp()
if not success[0]:
response = "Enabling of DHCP mode failed.\r\n"
# Set the internal IP address
success[1] = self._set_ip_addr()
if not success[1]:
response = response + "Setting IP address failed.\r\n"
# Set the hostname
success[2] = self._set_hostname()
if not success[2]:
response = response + "Setting hostname failed.\r\n"
# If any of the subtasks failed, fail overall
if False in success:
total_success = False
self.close()
return total_success, response
[docs] def update_fw(self, fw_version: str, part: str = "MCH") -> tuple[bool, str]:
"""Update the firmware of the device.
This method expects the firmware binary pointed by the value of the
argument *fw_version* to be available in the TFTP server.
Mainly, this method injects the command *update_firmware* to an NAT
MCH.
Args:
fw_version: version release number for the new fw.
part: not used
Returns:
If failure, it returns a tuple containing False, and a message
about the failure.
If success, it returns True,
"""
self.logger.logger.info("Updating MCH firmware")
if self._session_open is False:
self.open()
self._send_command("update_firmware")
# Avoid clearing the buffer bewteen these commands because it would
# skip the update mode in the MCH.
self._send_command(
"{}:{}{}/mch_fw_{}.bin".format(
self._server_ip, self._fw_path, fw_version, fw_version
),
clear_buffer=False,
)
# Erasing the internal memory. If it is attempted to read now from the
# buffer, it will get the promt.
time.sleep(30)
# There's a useless promt which is received first, get rid of it, and
# wait for the good one that should come when the flashing is finished.
response_b = self._session.read_until(b"nat> ") # type: bytes
# Sometimes, at this point, the buffer has content, sometimes not.
# It seems reasonable using a length 100 to detect this situation.
if len(response_b) < 100:
response_b = self._session.read_until(b"nat> ")
response = response_b.decode("ascii") # type: str
# Let's see if the update was successful. The MCH prints the word
# "successful" at the end of the process, just before the prompt.
if "successful" in response:
success = (True, "")
self._reboot()
# Finally, wait for the MCH to complete the reboot process
time.sleep(50)
else:
# Something went wrong, let's check it!
if "TFTP: could not get file" in response:
# This error is mainly caused when the target fw_version
# is not available in the TFTP server.
success = (
False,
"The fw version {} couldn't be found in the"
" TFTP server".format(fw_version),
)
else:
success = False, "Unknown error. Check the debug log."
self.close()
return success
[docs] def set_configuration(
self, category: str, data: OrderedDict, apply: bool = True
) -> tuple[bool, str]:
"""Change the configuration of the device.
This method focuses on the configuration parameters that are not
defined within a configuration script. Specifying the entire set of
parameters is not mandatory, and also, a particular category of
settings can be modified without affecting the rest.
This method supports the following configuration categories (taken
from the webpage names):
- Backplane configuration [backplane]
The configuration files are hosted by the server, and they are named
following this way: `latest_mch_conf_<form factor>_<modifier>_cfg.txt`.
For factor should match the type of chassis, while the modifier allows
choosing a non-standard configuration.
Args:
category(str): the target set of parameters to be affected by the change. \
The accepted values are previously listed.
data(dic): dictionary containing modifiers that allow for choosing a particular \
backplane configuration file.
apply(bool): whether to reboot the MCH to make active the changes or not. \
Disable rebooting when multiple changes are going to be performed in the device. \
Reboot the device after the last change.
Returns:
A tuple containing a `bool` indicating if the operation was successful,
and an error message.
Raises:
ConnectionError: If the device is not accessible.
NoRouteToDevice: when the device is not reachable after applying the \
new configuration.
"""
if category != "backplane":
raise FeatureNotSupported(
"The Telnet backend only supports changing "
"the backplane configuration."
)
if not isinstance(data, dict):
return (False, "Wrong data type passed.")
if self._session_open is False:
self.open()
option = "generic"
if "Backplane configuration" in data:
if "option" in data["Backplane configuration"]:
option = data["Backplane configuration"]["option"]
self._send_command("upload_cfg", clear_buffer=True)
# Provide enough timeout in case the network is slow
self._send_command(
"{}:latest_mch_conf_{}_{}_cfg.txt".format(
self._server_ip,
"{}u".format(int(self.backplane)),
option,
),
sleep=3,
clear_buffer=False,
)
# Read the buffer to check whether the file was found in the TFTP server
validfile = self._read_command()
if "TFTP: getting file done" not in validfile:
self.close()
return False, "The backplane file wasn't found in the server. See the log."
self._send_command("y", clear_buffer=False)
self._send_command("", clear_buffer=True)
if apply:
self._reboot()
self.close()
return (
True,
"Expect some delay until the MCH is accessible in the network again.",
)
def _get_slot_state(self, amc_num, close_conn: bool = True):
"""Internal method to get the current state of an
AMC slot.
Args:
amc_num(int): AMC slot number.
close_conn(bool): Whether to close the telnet
session when returning, or not.
Default: True
Returns:
A string containing the state code {M1, M2, ... M7}
"""
if self._session_open is False:
self.open()
# Get fru_info for all AMCs
self._send_command("show_fru")
fru_info = self._read_command()
# Parse fru_info for line containing the requested AMC, and parse
# the state
match = re.search(r"AMC{}(\s+)(M\d)".format(amc_num), fru_info)
# Check if the Telnet session should be closed
if close_conn is True:
self.close()
if match is None:
success = False
state = "Unable to determine state of AMC {}. " "Check AMC number.".format(
amc_num
)
else:
success = True
state = match.group(2)
# State is in group 2 of returned regex search
return (success, state)
def _start_slot(self, amc_num, sleep: int = 2, upstream=False):
"""Internal method to start the card in an AMC slot
Args:
amc_num(int): AMC slot number to power on.
sleep(int): Time to allow for the AMC to power down.
Default: 2 seconds
Returns:
True if successful
False if unsuccessful
"""
success = False
# MCH fru_start command expects fru number, instead of AMC
# number, where:
# fru_num = amc_num + 4
fru_num = amc_num + 4
# Check current state of the AMC, and leave Telnet session
# open
res, state = self._get_slot_state(amc_num, False)
# If unable to get state of slot, return
if res is False:
self.close()
return False
if state == "M4":
self.logger.logger.info("AMC{} is already powered up.".format(amc_num))
success = True
else:
# Sleep to allow previous telnet connection to properly
# close
time.sleep(1)
if self._session_open is False:
self.open()
self.logger.logger.info("Powering up AMC {}.".format(amc_num))
# Send fru_start command
self._send_command("fru_start {}".format(fru_num))
fru_start = self._read_command()
# Check if the slot has a start up delay
if upstream or ("Upstream Power Up Delay active!" in fru_start):
self.logger.logger.info(
"Slot has upstream delay set. Waiting 60s for AMC to power on..."
)
sleep = 65
# Sleep to allow device in AMC slot to power on
time.sleep(sleep)
# Get updated state
_, state = self._get_slot_state(amc_num)
if state == "M4":
success = True
self.logger.logger.info(
"AMC {} powered up successfully.".format(amc_num)
)
else:
self.logger.logger.warning(
"Failed to power on slot. Current state: {}.".format(state)
)
self.close()
return success
def _stop_slot(self, amc_num, sleep: int = 2):
"""Internal method to stop the card in an AMC slot
Args:
amc_num(int): AMC slot number to power off.
sleep(int): Time to allow for the AMC to power down.
Default: 2 seconds
Returns:
True if successful
False if unsuccessful
"""
success = False
# MCH shutdown command expects fru number, instead of AMC
# number, where:
# fru_num = amc_num + 4
fru_num = amc_num + 4
# Check current state of the AMC, and leave Telent
# session open
res, state = self._get_slot_state(amc_num, False)
# If unable to get state of slot, return
if res is False:
self.logger.logger.warning(
"Unable to determine state of slot {}:\n {}".format(amc_num, state)
)
self.close()
return False
if state == "M1":
self.logger.logger.info("AMC{} is already powered down.".format(amc_num))
success = True
else:
# Sleep to allow previous telnet connection to properly
# close
time.sleep(1)
if self._session_open is False:
self.open()
self.logger.logger.info("Powering down AMC {}.".format(amc_num))
# Send shutdown command
self._send_command("shutdown {}".format(fru_num))
# Sleep to allow device in AMC slot to power down
self.logger.logger.info("Sleeping for {} seconds.".format(sleep))
time.sleep(sleep)
# Get updated state
_, state = self._get_slot_state(amc_num)
if state == "M1":
success = True
self.logger.logger.info(
"AMC {} powered down successfully.".format(amc_num)
)
else:
self.logger.logger.warning(
"Failed to power down slot. Current state: {}.".format(state)
)
self.close()
return success
def _slot_status_all(self):
"""Internal method to get the current state of all
AMC slots.
Args:
close_conn(bool): Whether to close the telnet
session when returning, or not.
Default: True
Returns:
A list of tuples containing the AMC number and state code {M1, M2, ... M7} for all slots.
If a slot is empty, the entry in the list will contain None.
"""
self.logger.logger.debug("Getting state of AMCs in chassis.")
if self._session_open is False:
self.open()
# Get fru_info for all AMCs
self._send_command("show_fru")
fru_info = self._read_command()
self.close()
# Search for populated AMC slots
match = re.findall(r"AMC(\d+)(\s+)(M\d+)", fru_info)
amcs = []
# Create list for all AMCs
for x in range(1, int(match[-1][0]) + 1):
slot_state = [item for item in match if str(x) in item]
if slot_state:
self.logger.logger.debug(
"AMC{} slot is populated, and in the {} state.".format(
x, slot_state[0][2]
)
)
amcs.append((x, slot_state[0][2]))
else:
self.logger.logger.debug("AMC{} slot is unpopulated.".format(x))
amcs.append((x, None))
return amcs
def _stop_all_slots(self):
"""Internal method to power down all AMC slots"""
if self._session_open is False:
self.open()
self.logger.logger.info("Powering down all AMCs.")
# Shutdown all slots
self._send_command("shutdown all")
self.logger.logger.info("All AMCs are now powered down.")
self.close()
def _determine_upstream(self):
"""Internal method to determine which AMCs are configured
as 'Upstream', i.e the PCIe root-complex
"""
if self._session_open is False:
self.open()
self.logger.logger.info("Determining which AMC(s) are configured as Upstream")
# 'mch' command takes a while to return all info
# so add an extended sleep period
self._send_command("mch", sleep=2)
mch_info = self._read_command()
self.close()
# Find the relevant lines in the output
first = mch_info.find("VS #")
last = mch_info.find("\r\nUpstream")
pcie_info = mch_info[first:last].splitlines()
""" Example info:
[' VS # | Host | NT-Host | Members',
' 0 AMC01_4 none AMC01_4 AMC02_4 AMC03_4 AMC04_4 AMC05_4 AMC06_4 AMC07_4 AMC08_4 AMC09_4 AMC10_4 AMC11_4 AMC12_4 ',
' 1 ',
' 2 ',
' 3 ',
' 4 ',
' 5 ']
"""
upstreams = []
# There are upto 6 virtual switches
for i in range(1, len(pcie_info)):
# First line is the header
match = re.findall(r"{}(\s+)AMC(\d+)_(\d)".format(i - 1), pcie_info[i])
if len(match) > 0:
self.logger.logger.info(
"AMC{} is the Upstream for Virtual Switch {}".format(
int(match[0][1]), i - 1
)
)
upstreams.append(int(match[0][1]))
return upstreams
[docs] def reboot_slot(self, amc_num):
""" Method to power cycle an individual AMC slot.
Args:
amc_num: AMC slot number to power cycle.
Returns:
A tuple containing the a success flag in the first position. In the \
second position:
- On success, an empty string.
- On failure, a string containing the failure message.
"""
self.logger.logger.info("Rebooting AMC in slot {}".format(amc_num))
response = ""
# Sleep time
shutdown_sleep = 5
start_sleep = 5
# Shutdown slot
success = self._stop_slot(amc_num, shutdown_sleep)
if success is False:
response = "Failed to power down card in AMC slot {}.".format(amc_num)
self.logger.logger.warning(response)
return (success, response)
# Allow time for session to update
time.sleep(1)
# Start slot
success = self._start_slot(amc_num, start_sleep)
if success is False:
response = "Failed to power up card in AMC slot {}.".format(amc_num)
self.logger.logger.warning(response)
return (success, response)
[docs] def reboot_all_slots(self):
""" Method to power cycle all AMC slots.
Returns:
A tuple containing the a success flag in the first position. In the \
second position:
- On success, an empty string.
- On failure, a string containing the failure message.
"""
if self._session_open is False:
self.open()
# Find which slots are populated
slot_status = self._slot_status_all()
# Shutdown all slots
self._stop_all_slots()
# Determine what AMC(s) are the Upstream (root-complex)
upstreams = self._determine_upstream()
upstream_state = []
overall_success = True
overall_response = ""
# Power on relevant slots
for slot in slot_status:
if (slot[1] is not None) and (slot[1] != "M1"):
if slot[0] not in upstreams:
success = self._start_slot(slot[0])
if success is False:
response = "Failed to power up card in AMC slot {}.".format(
slot[0]
)
self.logger.logger.warning(response)
overall_success = False
overall_response += "{}\n".format(response)
else:
upstream_state.append(slot)
self.logger.logger.info(
"AMC{} is configured as an upstream. Will reboot last.".format(
slot[0]
)
)
# Power on Upstream slots
for slot in upstream_state:
if (slot[1] is not None) and (slot[1] != "M1"):
success = self._start_slot(slot[0], upstream=True)
if success is False:
response = "Failed to power up card in AMC slot {}.".format(slot[0])
self.logger.logger.warning(response)
overall_success = False
overall_response += "{}\n".format(response)
self.close()
return (overall_success, overall_response)