diff --git a/README.md b/README.md index df067c5..23d89e5 100644 --- a/README.md +++ b/README.md @@ -2,3 +2,28 @@ ### Keenetic NDMS v2 client library ### +#### Usage + +- Telnet + +```python +from ndms2_client import Client, TelnetConnection + +client = Client(connection=TelnetConnection("192.168.1.1", 23, "admin", "admin")) +client.get_router_info() +``` + +- Http + +```python +from ndms2_client import Client, HttpConnection + +client = Client(connection=HttpConnection("192.168.1.1", 80, "admin", "admin")) +client.get_router_info() +``` + +#### Tests + +```shell +pytest +``` diff --git a/ndms2_client/__init__.py b/ndms2_client/__init__.py index def164e..d6d82e3 100644 --- a/ndms2_client/__init__.py +++ b/ndms2_client/__init__.py @@ -1,2 +1,14 @@ -from .connection import Connection, ConnectionException, TelnetConnection -from .client import Client, Device, RouterInfo, InterfaceInfo +from .client import Client +from .connection import Connection, ConnectionException, HttpConnection, TelnetConnection +from .models import Device, InterfaceInfo, RouterInfo + +__all__ = [ + "Client", + "Connection", + "ConnectionException", + "HttpConnection", + "TelnetConnection", + "Device", + "InterfaceInfo", + "RouterInfo", +] diff --git a/ndms2_client/client.py b/ndms2_client/client.py index 1b747e9..0a8d6d2 100644 --- a/ndms2_client/client.py +++ b/ndms2_client/client.py @@ -1,179 +1,115 @@ import logging -import re -from typing import Dict, List, Tuple, Union, NamedTuple, Optional +from typing import List, Optional +from .command import Command from .connection import Connection +from .models import Device, InterfaceInfo, RouterInfo _LOGGER = logging.getLogger(__name__) -_VERSION_CMD = 'show version' -_ARP_CMD = 'show ip arp' -_ASSOCIATIONS_CMD = 'show associations' -_HOTSPOT_CMD = 'show ip hotspot' -_INTERFACE_CMD = 'show interface %s' -_INTERFACES_CMD = 'show interface' -_ARP_REGEX = re.compile( - r'(?P.*?)\s+' + - r'(?P([0-9]{1,3}[.]){3}[0-9]{1,3})?\s+' + - r'(?P(([0-9a-f]{2}[:-]){5}([0-9a-f]{2})))\s+' + - r'(?P([^ ]+))\s+' -) - - -class Device(NamedTuple): - mac: str - name: str - ip: str - interface: str - - -class RouterInfo(NamedTuple): - name: str - fw_version: str - fw_channel: str - model: str - hw_version: str - manufacturer: str - vendor: str - region: str - - @classmethod - def from_dict(cls, info: dict) -> "RouterInfo": - return RouterInfo( - name=str(info.get('description', info.get('model', 'NDMS2 Router'))), - fw_version=str(info.get('title', info.get('release'))), - fw_channel=str(info.get('sandbox', 'unknown')), - model=str(info.get('model', info.get('hw_id'))), - hw_version=str(info.get('hw_version', 'N/A')), - manufacturer=str(info.get('manufacturer')), - vendor=str(info.get('vendor')), - region=str(info.get('region', 'N/A')), - ) - - -class InterfaceInfo(NamedTuple): - name: str - type: Optional[str] - description: Optional[str] - link: Optional[str] - connected: Optional[str] - state: Optional[str] - mtu: Optional[int] - address: Optional[str] - mask: Optional[str] - uptime: Optional[int] - security_level: Optional[str] - mac: Optional[str] - - @classmethod - def from_dict(cls, info: dict) -> "InterfaceInfo": - return InterfaceInfo( - name=_str(info.get('interface-name')) or str(info['id']), - type=_str(info.get('type')), - description=_str(info.get('description')), - link=_str(info.get('link')), - connected=_str(info.get('connected')), - state=_str(info.get('state')), - mtu=_int(info.get('mtu')), - address=_str(info.get('address')), - mask=_str(info.get('mask')), - uptime=_int(info.get('uptime')), - security_level=_str(info.get('security-level')), - mac=_str(info.get('mac')), - ) - - class Client(object): def __init__(self, connection: Connection): self._connection = connection def get_router_info(self) -> RouterInfo: - info = _parse_dict_lines(self._connection.run_command(_VERSION_CMD)) + info = self._connection.run_command(Command.VERSION) + + _LOGGER.debug("Raw router info: %s", str(info)) + assert isinstance(info, dict), "Router info response is not a dictionary" - _LOGGER.debug('Raw router info: %s', str(info)) - assert isinstance(info, dict), 'Router info response is not a dictionary' - return RouterInfo.from_dict(info) def get_interfaces(self) -> List[InterfaceInfo]: - collection = _parse_collection_lines(self._connection.run_command(_INTERFACES_CMD)) + collection = self._connection.run_command(Command.INTERFACES) - _LOGGER.debug('Raw interfaces info: %s', str(collection)) - assert isinstance(collection, list), 'Interfaces info response is not a collection' + _LOGGER.debug("Raw interfaces info: %s", str(collection)) + assert isinstance(collection, list), "Interfaces info response is not a collection" return [InterfaceInfo.from_dict(info) for info in collection] def get_interface_info(self, interface_name) -> Optional[InterfaceInfo]: - info = _parse_dict_lines(self._connection.run_command(_INTERFACE_CMD % interface_name)) + info = self._connection.run_command(Command.INTERFACE, name=interface_name) - _LOGGER.debug('Raw interface info: %s', str(info)) - assert isinstance(info, dict), 'Interface info response is not a dictionary' + _LOGGER.debug("Raw interface info: %s", str(info)) + assert isinstance(info, dict), "Interface info response is not a dictionary" - if 'id' in info: + if "id" in info: return InterfaceInfo.from_dict(info) return None - def get_devices(self, *, try_hotspot=True, include_arp=True, include_associated=True) -> List[Device]: + def get_devices( + self, + *, + try_hotspot=True, + include_arp=True, + include_associated=True, + ) -> List[Device]: """ - Fetches a list of connected devices online - :param try_hotspot: first try `ip hotspot` command. - This is the most precise information on devices known to be online - :param include_arp: if try_hotspot is False or no hotspot devices detected - :param include_associated: - :return: + Fetches a list of connected devices online + :param try_hotspot: first try `ip hotspot` command. + This is the most precise information on devices known to be online + :param include_arp: if try_hotspot is False or no hotspot devices detected + :param include_associated: + :return: """ devices = [] if try_hotspot: - devices = _merge_devices(devices, self.get_hotspot_devices()) + devices = Device.merge_devices(devices, self.get_hotspot_devices()) if len(devices) > 0: return devices if include_arp: - devices = _merge_devices(devices, self.get_arp_devices()) + devices = Device.merge_devices(devices, self.get_arp_devices()) if include_associated: - devices = _merge_devices(devices, self.get_associated_devices()) + devices = Device.merge_devices(devices, self.get_associated_devices()) return devices def get_hotspot_devices(self) -> List[Device]: hotspot_info = self.__get_hotspot_info() - return [Device( - mac=info.get('mac').upper(), - name=info.get('name'), - ip=info.get('ip'), - interface=info['interface'].get('name', '') - ) for info in hotspot_info.values() if 'interface' in info and info.get('link') == 'up'] + return [ + Device( + mac=info.get("mac").upper(), + name=info.get("name"), + ip=info.get("ip"), + interface=info["interface"].get("name", ""), + ) + for info in hotspot_info.values() + if "interface" in info and info.get("link") == "up" + ] def get_arp_devices(self) -> List[Device]: - lines = self._connection.run_command(_ARP_CMD) - - result = _parse_table_lines(lines, _ARP_REGEX) - - return [Device( - mac=info.get('mac').upper(), - name=info.get('name') or None, - ip=info.get('ip'), - interface=info.get('interface') - ) for info in result if info.get('mac') is not None] + result = self._connection.run_command(Command.ARP) + + return [ + Device( + mac=info.get("mac").upper(), + name=info.get("name") or None, + ip=info.get("ip"), + interface=info.get("interface"), + ) + for info in result + if info.get("mac") is not None + ] def get_associated_devices(self): - associations = _parse_dict_lines(self._connection.run_command(_ASSOCIATIONS_CMD)) + associations = self._connection.run_command(Command.ASSOCIATIONS) - items = associations.get('station', []) + items = associations.get("station", []) if not isinstance(items, list): items = [items] - aps = set([info.get('ap') for info in items]) + aps = set([info.get("ap") for info in items]) ap_to_bridge = {} for ap in aps: - ap_info = _parse_dict_lines(self._connection.run_command(_INTERFACE_CMD % ap)) - ap_to_bridge[ap] = ap_info.get('group') or ap_info.get('interface-name') + ap_info = self._connection.run_command(Command.INTERFACE, name=ap) + ap_to_bridge[ap] = ap_info.get("group") or ap_info.get("interface-name") # try enriching the results with hotspot additional info hotspot_info = self.__get_hotspot_info() @@ -181,199 +117,28 @@ def get_associated_devices(self): devices = [] for info in items: - mac = info.get('mac') - if mac is not None and info.get('authenticated') in ['1', 'yes']: + mac = info.get("mac") + if mac is not None and info.get("authenticated") in ["1", "yes", True]: host_info = hotspot_info.get(mac) - devices.append(Device( - mac=mac.upper(), - name=host_info.get('name') if host_info else None, - ip=host_info.get('ip') if host_info else None, - interface=ap_to_bridge.get(info.get('ap'), info.get('ap')) - )) + devices.append( + Device( + mac=mac.upper(), + name=host_info.get("name") if host_info else None, + ip=host_info.get("ip") if host_info else None, + interface=ap_to_bridge.get(info.get("ap"), info.get("ap")), + ), + ) return devices # hotspot info is only available in newest firmware (2.09 and up) and in router mode # however missing command error will lead to empty dict returned def __get_hotspot_info(self): - info = _parse_dict_lines(self._connection.run_command(_HOTSPOT_CMD)) + info = self._connection.run_command(Command.HOTSPOT) - items = info.get('host', []) + items = info.get("host", []) if not isinstance(items, list): items = [items] - return {item.get('mac'): item for item in items} - - -def _str(value: Optional[any]) -> Optional[str]: - if value is None: - return None - - return str(value) - - -def _int(value: Optional[any]) -> Optional[int]: - if value is None: - return None - - return int(value) - - -def _merge_devices(*lists: List[Device]) -> List[Device]: - res = {} - for l in lists: - for dev in l: - key = (dev.interface, dev.mac) - if key in res: - old_dev = res.get(key) - res[key] = Device( - mac=old_dev.mac, - name=old_dev.name or dev.name, - ip=old_dev.ip or dev.ip, - interface=old_dev.interface - ) - else: - res[key] = dev - - return list(res.values()) - - -def _parse_table_lines(lines: List[str], regex: re) -> List[Dict[str, any]]: - """Parse the lines using the given regular expression. - If a line can't be parsed it is logged and skipped in the output. - """ - results = [] - for line in lines: - match = regex.search(line) - if not match: - _LOGGER.debug('Could not parse line: %s', line) - continue - results.append(match.groupdict()) - return results - - -def _fix_continuation_lines(lines: List[str]) -> List[str]: - indent = 0 - continuation_possible = False - fixed_lines = [] # type: List[str] - for line in lines: - if len(line.strip()) == 0: - continue - - if continuation_possible and len(line[:indent].strip()) == 0: - prev_line = fixed_lines.pop() - line = prev_line.rstrip() + line[(indent + 1):].lstrip() - else: - assert ':' in line, 'Found a line with no colon when continuation is not possible: ' + line - - colon_pos = line.index(':') - comma_pos = line.index(',') if ',' in line[:colon_pos] else None - indent = comma_pos if comma_pos is not None else colon_pos - - continuation_possible = len(line[(indent + 1):].strip()) > 0 - - fixed_lines.append(line) - - return fixed_lines - - -def _parse_dict_lines(lines: List[str]) -> Dict[str, any]: - response = {} - indent = 0 - stack = [(None, indent, response)] # type: List[Tuple[str, int, Union[str, dict]]] - stack_level = 0 - - for line in _fix_continuation_lines(lines): - if len(line.strip()) == 0: - continue - - _LOGGER.debug(line) - - # exploding the line - colon_pos = line.index(':') - comma_pos = line.index(',') if ',' in line[:colon_pos] else None - key = line[:colon_pos].strip() - value = line[(colon_pos + 1):].strip() - new_indent = comma_pos if comma_pos is not None else colon_pos - - # assuming line is like 'mac-access, id = Bridge0: ...' - if comma_pos is not None: - key = line[:comma_pos].strip() - - value = {key: value} if value != '' else {} - - args = line[comma_pos + 1:colon_pos].split(',') - for arg in args: - sub_key, sub_value = [p.strip() for p in arg.split('=', 1)] - value[sub_key] = sub_value - - # up and down the stack - if new_indent > indent: # new line is a sub-value of parent - stack_level += 1 - indent = new_indent - stack.append(None) - else: - while new_indent < indent and len(stack) > 0: # getting one level up - stack_level -= 1 - stack.pop() - _, indent, _ = stack[stack_level] - - if stack_level < 1: - break - - assert indent == new_indent, 'Irregular indentation detected' - - stack[stack_level] = key, indent, value - - # current containing object - obj_key, obj_indent, obj = stack[stack_level - 1] - - # we are the first child of the containing object - if not isinstance(obj, dict): - # need to convert it from empty string to empty object - assert obj == '', 'Unexpected nested object format' - _, _, parent_obj = stack[stack_level - 2] - obj = {} - - # containing object might be in a list also - if isinstance(parent_obj[obj_key], list): - parent_obj[obj_key].pop() - parent_obj[obj_key].append(obj) - else: - parent_obj[obj_key] = obj - stack[stack_level - 1] = obj_key, obj_indent, obj - - # current key is already in object means there should be an array of values - if key in obj: - if not isinstance(obj[key], list): - obj[key] = [obj[key]] - - obj[key].append(value) - else: - obj[key] = value - - return response - - -def _parse_collection_lines(lines: List[str]) -> List[Dict[str, any]]: - _HEADER_REGEXP = re.compile(r'^(\w+),\s*name\s*=\s*\"([^"]+)\"') - - result = [] - item_lines = [] # type: List[str] - for line in lines: - if len(line.strip()) == 0: - continue - - match = _HEADER_REGEXP.match(line) - if match: - if len(item_lines) > 0: - result.append(_parse_dict_lines(item_lines)) - item_lines = [] - else: - item_lines.append(line) - - if len(item_lines) > 0: - result.append(_parse_dict_lines(item_lines)) - - return result + return {item.get("mac"): item for item in items} diff --git a/ndms2_client/command.py b/ndms2_client/command.py new file mode 100644 index 0000000..496dd71 --- /dev/null +++ b/ndms2_client/command.py @@ -0,0 +1,10 @@ +from enum import Enum + + +class Command(Enum): + VERSION = "show version" + ARP = "show ip arp" + ASSOCIATIONS = "show associations" + HOTSPOT = "show ip hotspot" + INTERFACE = "show interface %s" + INTERFACES = "show interface" diff --git a/ndms2_client/connection.py b/ndms2_client/connection.py deleted file mode 100644 index c62c041..0000000 --- a/ndms2_client/connection.py +++ /dev/null @@ -1,141 +0,0 @@ -import logging -import re -from telnetlib import Telnet -from typing import List, Union, Pattern, Match - -_LOGGER = logging.getLogger(__name__) - - -class ConnectionException(Exception): - pass - - -class Connection(object): - @property - def connected(self) -> bool: - raise NotImplementedError("Should have implemented this") - - def connect(self): - raise NotImplementedError("Should have implemented this") - - def disconnect(self): - raise NotImplementedError("Should have implemented this") - - def run_command(self, command: str) -> List[str]: - raise NotImplementedError("Should have implemented this") - - -class TelnetConnection(Connection): - """Maintains a Telnet connection to a router.""" - - def __init__(self, host: str, port: int, username: str, password: str, *, - timeout: int = 30): - """Initialize the Telnet connection properties.""" - self._telnet = None # type: Telnet - self._host = host - self._port = port - self._username = username - self._password = password - self._timeout = timeout - self._current_prompt_string = None # type: bytes - - @property - def connected(self): - return self._telnet is not None - - def run_command(self, command, *, group_change_expected=False) -> List[str]: - """Run a command through a Telnet connection. - Connect to the Telnet server if not currently connected, otherwise - use the existing connection. - """ - if not self._telnet: - self.connect() - - try: - self._telnet.read_very_eager() # this is here to flush the read buffer - self._telnet.write('{}\n'.format(command).encode('UTF-8')) - response = self._read_response(group_change_expected) - except Exception as e: - message = "Error executing command: %s" % str(e) - _LOGGER.error(message) - self.disconnect() - raise ConnectionException(message) from None - else: - _LOGGER.debug('Command %s: %s', command, '\n'.join(response)) - return response - - def connect(self): - """Connect to the Telnet server.""" - try: - self._telnet = Telnet() - self._telnet.set_option_negotiation_callback(TelnetConnection.__negotiate_naws) - self._telnet.open(self._host, self._port, self._timeout) - - self._read_until(b'Login: ') - self._telnet.write((self._username + '\n').encode('UTF-8')) - self._read_until(b'Password: ') - self._telnet.write((self._password + '\n').encode('UTF-8')) - - self._read_response(True) - self._set_max_window_size() - except Exception as e: - message = "Error connecting to telnet server: %s" % str(e) - _LOGGER.error(message) - self._telnet = None - raise ConnectionException(message) from None - - def disconnect(self): - """Disconnect the current Telnet connection.""" - try: - if self._telnet: - self._telnet.write(b'exit\n') - except Exception as e: - _LOGGER.error("Telnet error on exit: %s" % str(e)) - pass - self._telnet = None - - def _read_response(self, detect_new_prompt_string=False) -> List[str]: - needle = re.compile(br'\n\(\w+[-\w]+\)>') if detect_new_prompt_string else self._current_prompt_string - (match, text) = self._read_until(needle) - if detect_new_prompt_string: - self._current_prompt_string = match[0] - return text.decode('UTF-8').split('\n')[1:-1] - - def _read_until(self, needle: Union[bytes, Pattern]) -> (Match, bytes): - matcher = needle if isinstance(needle, Pattern) else re.escape(needle) - (i, match, text) = self._telnet.expect([matcher], self._timeout) - assert i == 0, "No expected response from server" - return match, text - - # noinspection PyProtectedMember - def _set_max_window_size(self): - """ - --> inform the Telnet server of the window width and height. see __negotiate_naws - """ - from telnetlib import IAC, NAWS, SB, SE - import struct - - width = struct.pack('H', 65000) - height = struct.pack('H', 5000) - self._telnet.get_socket().sendall(IAC + SB + NAWS + width + height + IAC + SE) - - # noinspection PyProtectedMember - @staticmethod - def __negotiate_naws(tsocket, command, option): - """ - --> inform the Telnet server we'll be using Window Size Option. - Refer to https://www.ietf.org/rfc/rfc1073.txt - :param tsocket: telnet socket object - :param command: telnet Command - :param option: telnet option - :return: None - """ - from telnetlib import DO, DONT, IAC, WILL, WONT, NAWS - - if option == NAWS: - tsocket.sendall(IAC + WILL + NAWS) - # -- below code taken from telnetlib - elif command in (DO, DONT): - tsocket.sendall(IAC + WONT + option) - elif command in (WILL, WONT): - tsocket.sendall(IAC + DONT + option) diff --git a/ndms2_client/connection/__init__.py b/ndms2_client/connection/__init__.py new file mode 100644 index 0000000..12e015c --- /dev/null +++ b/ndms2_client/connection/__init__.py @@ -0,0 +1,12 @@ +from .base import Connection, ConnectionException +from .http import HttpConnection, HttpResponseConverter +from .telnet import TelnetConnection, TelnetResponseConverter + +__all__ = [ + "Connection", + "ConnectionException", + "HttpConnection", + "HttpResponseConverter", + "TelnetConnection", + "TelnetResponseConverter", +] diff --git a/ndms2_client/connection/base.py b/ndms2_client/connection/base.py new file mode 100644 index 0000000..cf52805 --- /dev/null +++ b/ndms2_client/connection/base.py @@ -0,0 +1,32 @@ +from abc import ABC, abstractmethod +from typing import List +from ndms2_client.command import Command + + +class ResponseConverter(ABC): + @abstractmethod + def convert(self, command, data): + ... + + +class ConnectionException(Exception): + pass + + +class Connection(ABC): + @property + @abstractmethod + def connected(self) -> bool: + ... + + @abstractmethod + def connect(self): + ... + + @abstractmethod + def disconnect(self): + ... + + @abstractmethod + def run_command(self, command: Command, *, name: str = None) -> List[str]: + ... diff --git a/ndms2_client/connection/http.py b/ndms2_client/connection/http.py new file mode 100644 index 0000000..3afa5ea --- /dev/null +++ b/ndms2_client/connection/http.py @@ -0,0 +1,102 @@ +import hashlib +import json +import logging +from contextlib import suppress +from http.cookiejar import CookieJar +from typing import List, Union +from urllib import request +from urllib.error import HTTPError +from urllib.request import Request + +from ..command import Command +from .base import Connection, ConnectionException, ResponseConverter + +_LOGGER = logging.getLogger(__name__) + + +class HttpResponseConverter(ResponseConverter): + def convert(self, command, data): + if command == Command.INTERFACES: + return list(data.values()) + return data + + +class HttpConnection(Connection): + def __init__( + self, + host: str, + port: int, + username: str, + password: str, + *, + scheme: str = "http", + timeout: int = 30, + response_converter: ResponseConverter = None, + ): + self._scheme = scheme + self._host = host + self._port = port + self._username = username + self._password = password + self._timeout = timeout + self._base_url = f"{scheme}://{self._host}:{self._port}" + self._auth_url = f"{self._base_url}/auth" + self._converter = response_converter or HttpResponseConverter() + self._cookie_jar = CookieJar() + opener = request.build_opener(request.HTTPCookieProcessor(self._cookie_jar)) + request.install_opener(opener) + + @property + def connected(self) -> bool: + with suppress(HTTPError): + response = self._open(self._auth_url) + return response.status == 200 + return False + + def connect(self): + message = None + try: + try: + self._open(self._auth_url) + except HTTPError as error: + realm = error.headers.get("X-NDM-Realm") + challenge = error.headers.get("X-NDM-Challenge") + + md5 = hashlib.md5(f"admin:{realm}:{self._password}".encode()).hexdigest() + sha = hashlib.sha256(f"{challenge}{md5}".encode()).hexdigest() + + req = request.Request( + self._auth_url, + method="POST", + data=json.dumps({"login": "admin", "password": sha}).encode("utf-8"), + headers={"Content-Type": "application/json"}, + ) + response = self._open(req) + if response.status == 200: + return + message = "Error connecting to api: %s %s" % (response.status, response.read()) + + except Exception as e: + message = "Error connecting to api server: %s" % str(e) + _LOGGER.error(message) + raise ConnectionException(message) from None + + def disconnect(self): + self._cookie_jar.clear() + + def run_command(self, command: Command, name: str = None) -> List[str]: + if not self.connected: + self.connect() + cmd = command.value.replace(" ", "/") + if name: + if "/" in name: + name, *_ = name.split("/") + cmd = cmd % name + response = self._open(f"{self._base_url}/rci/{cmd}/") + response = json.loads(response.read()) + if self._converter: + return self._converter.convert(command, response) + return response + + def _open(self, url: Union[str, Request]): + return request.urlopen(url, timeout=self._timeout) diff --git a/ndms2_client/connection/telnet.py b/ndms2_client/connection/telnet.py new file mode 100644 index 0000000..0c3f4f9 --- /dev/null +++ b/ndms2_client/connection/telnet.py @@ -0,0 +1,309 @@ +import logging +import re +from telnetlib import Telnet +from typing import Dict, List, Optional, Union + +from ..command import Command +from .base import Connection, ConnectionException, ResponseConverter + +_LOGGER = logging.getLogger(__name__) + +_ARP_REGEX = re.compile( + r"(?P.*?)\s+" + + r"(?P([0-9]{1,3}[.]){3}[0-9]{1,3})?\s+" + + r"(?P(([0-9a-f]{2}[:-]){5}([0-9a-f]{2})))\s+" + + r"(?P([^ ]+))\s+", +) + + +def _parse_table_lines(lines: List[str]) -> List[Dict[str, any]]: + """Parse the lines using the given regular expression. + If a line can't be parsed it is logged and skipped in the output. + """ + results = [] + for line in lines: + match = _ARP_REGEX.search(line) + if not match: + _LOGGER.debug("Could not parse line: %s", line) + continue + results.append(match.groupdict()) + return results + + +def _fix_continuation_lines(lines: List[str]) -> List[str]: + indent = 0 + continuation_possible = False + fixed_lines = [] # type: List[str] + for line in lines: + if len(line.strip()) == 0: + continue + + if continuation_possible and len(line[:indent].strip()) == 0: + prev_line = fixed_lines.pop() + line = prev_line.rstrip() + line[(indent + 1) :].lstrip() + else: + assert ":" in line, ( + "Found a line with no colon when continuation is not possible: " + line + ) + + colon_pos = line.index(":") + comma_pos = line.index(",") if "," in line[:colon_pos] else None + indent = comma_pos if comma_pos is not None else colon_pos + + continuation_possible = len(line[(indent + 1) :].strip()) > 0 + + fixed_lines.append(line) + + return fixed_lines + + +def _parse_dict_lines(lines: List[str]) -> Dict[str, any]: + response = {} + indent = 0 + stack = [(None, indent, response)] # type: List[Tuple[str, int, Union[str, dict]]] + stack_level = 0 + + for line in _fix_continuation_lines(lines): + if len(line.strip()) == 0: + continue + + _LOGGER.debug(line) + + # exploding the line + colon_pos = line.index(":") + comma_pos = line.index(",") if "," in line[:colon_pos] else None + key = line[:colon_pos].strip() + value = line[(colon_pos + 1) :].strip() + new_indent = comma_pos if comma_pos is not None else colon_pos + + # assuming line is like 'mac-access, id = Bridge0: ...' + if comma_pos is not None: + key = line[:comma_pos].strip() + + value = {key: value} if value != "" else {} + + args = line[comma_pos + 1 : colon_pos].split(",") + for arg in args: + sub_key, sub_value = [p.strip() for p in arg.split("=", 1)] + value[sub_key] = sub_value + + # up and down the stack + if new_indent > indent: # new line is a sub-value of parent + stack_level += 1 + indent = new_indent + stack.append(None) + else: + while new_indent < indent and len(stack) > 0: # getting one level up + stack_level -= 1 + stack.pop() + _, indent, _ = stack[stack_level] + + if stack_level < 1: + break + + assert indent == new_indent, "Irregular indentation detected" + + stack[stack_level] = key, indent, value + + # current containing object + obj_key, obj_indent, obj = stack[stack_level - 1] + + # we are the first child of the containing object + if not isinstance(obj, dict): + # need to convert it from empty string to empty object + assert obj == "", "Unexpected nested object format" + _, _, parent_obj = stack[stack_level - 2] + obj = {} + + # containing object might be in a list also + if isinstance(parent_obj[obj_key], list): + parent_obj[obj_key].pop() + parent_obj[obj_key].append(obj) + else: + parent_obj[obj_key] = obj + stack[stack_level - 1] = obj_key, obj_indent, obj + + # current key is already in object means there should be an array of values + if key in obj: + if not isinstance(obj[key], list): + obj[key] = [obj[key]] + + obj[key].append(value) + else: + obj[key] = value + + return response + + +def _parse_collection_lines(lines: List[str]) -> List[Dict[str, any]]: + _HEADER_REGEXP = re.compile(r'^(\w+),\s*name\s*=\s*\"([^"]+)\"') + + result = [] + item_lines = [] # type: List[str] + for line in lines: + if len(line.strip()) == 0: + continue + + match = _HEADER_REGEXP.match(line) + if match: + if len(item_lines) > 0: + result.append(_parse_dict_lines(item_lines)) + item_lines = [] + else: + item_lines.append(line) + + if len(item_lines) > 0: + result.append(_parse_dict_lines(item_lines)) + + return result + + +class TelnetResponseConverter(ResponseConverter): + def convert(self, command, data): + if command in ( + Command.VERSION, + Command.INTERFACE, + Command.ASSOCIATIONS, + Command.HOTSPOT, + ): + return _parse_dict_lines(data) + elif command == Command.INTERFACES: + return _parse_collection_lines(data) + elif command == Command.ARP: + return _parse_table_lines(data) + + return data + + +class TelnetConnection(Connection): + """Maintains a Telnet connection to a router.""" + + def __init__( + self, + host: str, + port: int, + username: str, + password: str, + *, + timeout: int = 30, + response_converter: Optional[ResponseConverter] = None, + ): + """Initialize the Telnet connection properties.""" + self._telnet = None # type: Telnet + self._host = host + self._port = port + self._username = username + self._password = password + self._timeout = timeout + self._current_prompt_string = None # type: bytes + self._converter = response_converter or TelnetResponseConverter() + + @property + def connected(self): + return self._telnet is not None + + def run_command(self, command: Command, *, name=None, group_change_expected=False) -> List[str]: + """Run a command through a Telnet connection. + Connect to the Telnet server if not currently connected, otherwise + use the existing connection. + """ + cmd = command.value + if name: + cmd = command.value % name + if not self._telnet: + self.connect() + + try: + self._telnet.read_very_eager() # this is here to flush the read buffer + self._telnet.write("{}\n".format(cmd).encode("UTF-8")) + response = self._read_response(group_change_expected) + except Exception as e: + message = "Error executing command: %s" % str(e) + _LOGGER.error(message) + self.disconnect() + raise ConnectionException(message) from None + else: + _LOGGER.debug("Command %s: %s", cmd, "\n".join(response)) + if self._converter: + return self._converter.convert(command, response) + return response + + def connect(self): + """Connect to the Telnet server.""" + try: + self._telnet = Telnet() + self._telnet.set_option_negotiation_callback(TelnetConnection.__negotiate_naws) + self._telnet.open(self._host, self._port, self._timeout) + + self._read_until(b"Login: ") + self._telnet.write((self._username + "\n").encode("UTF-8")) + self._read_until(b"Password: ") + self._telnet.write((self._password + "\n").encode("UTF-8")) + + self._read_response(True) + self._set_max_window_size() + except Exception as e: + message = "Error connecting to telnet server: %s" % str(e) + _LOGGER.error(message) + self._telnet = None + raise ConnectionException(message) from None + + def disconnect(self): + """Disconnect the current Telnet connection.""" + try: + if self._telnet: + self._telnet.write(b"exit\n") + except Exception as e: + _LOGGER.error("Telnet error on exit: %s" % str(e)) + pass + self._telnet = None + + def _read_response(self, detect_new_prompt_string=False) -> List[str]: + needle = ( + re.compile(rb"\n\(\w+[-\w]+\)>") + if detect_new_prompt_string + else self._current_prompt_string + ) + (match, text) = self._read_until(needle) + if detect_new_prompt_string: + self._current_prompt_string = match[0] + return text.decode("UTF-8").split("\n")[1:-1] + + def _read_until(self, needle: Union[bytes, re.Pattern]) -> (re.Match, bytes): + matcher = needle if isinstance(needle, re.Pattern) else re.escape(needle) + (i, match, text) = self._telnet.expect([matcher], self._timeout) + assert i == 0, "No expected response from server" + return match, text + + # noinspection PyProtectedMember + def _set_max_window_size(self): + """ + --> inform the Telnet server of the window width and height. see __negotiate_naws + """ + import struct + from telnetlib import IAC, NAWS, SB, SE + + width = struct.pack("H", 65000) + height = struct.pack("H", 5000) + self._telnet.get_socket().sendall(IAC + SB + NAWS + width + height + IAC + SE) + + # noinspection PyProtectedMember + @staticmethod + def __negotiate_naws(tsocket, command, option): + """ + --> inform the Telnet server we'll be using Window Size Option. + Refer to https://www.ietf.org/rfc/rfc1073.txt + :param tsocket: telnet socket object + :param command: telnet Command + :param option: telnet option + :return: None + """ + from telnetlib import DO, DONT, IAC, NAWS, WILL, WONT + + if option == NAWS: + tsocket.sendall(IAC + WILL + NAWS) + # -- below code taken from telnetlib + elif command in (DO, DONT): + tsocket.sendall(IAC + WONT + option) + elif command in (WILL, WONT): + tsocket.sendall(IAC + DONT + option) diff --git a/ndms2_client/models.py b/ndms2_client/models.py new file mode 100644 index 0000000..9a56acb --- /dev/null +++ b/ndms2_client/models.py @@ -0,0 +1,97 @@ +from typing import List, NamedTuple, Optional + + +class Device(NamedTuple): + mac: str + name: str + ip: str + interface: str + + @staticmethod + def merge_devices(*lists: List["Device"]) -> List["Device"]: + res = {} + for list_ in lists: + for dev in list_: + key = (dev.interface, dev.mac) + if key in res: + old_dev = res.get(key) + res[key] = Device( + mac=old_dev.mac, + name=old_dev.name or dev.name, + ip=old_dev.ip or dev.ip, + interface=old_dev.interface, + ) + else: + res[key] = dev + + return list(res.values()) + + +class RouterInfo(NamedTuple): + name: str + fw_version: str + fw_channel: str + model: str + hw_version: str + manufacturer: str + vendor: str + region: str + + @classmethod + def from_dict(cls, info: dict) -> "RouterInfo": + return RouterInfo( + name=str(info.get("description", info.get("model", "NDMS2 Router"))), + fw_version=str(info.get("title", info.get("release"))), + fw_channel=str(info.get("sandbox", "unknown")), + model=str(info.get("model", info.get("hw_id"))), + hw_version=str(info.get("hw_version", "N/A")), + manufacturer=str(info.get("manufacturer")), + vendor=str(info.get("vendor")), + region=str(info.get("region", "N/A")), + ) + + +class InterfaceInfo(NamedTuple): + name: str + type: Optional[str] + description: Optional[str] + link: Optional[str] + connected: Optional[str] + state: Optional[str] + mtu: Optional[int] + address: Optional[str] + mask: Optional[str] + uptime: Optional[int] + security_level: Optional[str] + mac: Optional[str] + + @classmethod + def from_dict(cls, info: dict) -> "InterfaceInfo": + return InterfaceInfo( + name=_str(info.get("interface-name")) or str(info["id"]), + type=_str(info.get("type")), + description=_str(info.get("description")), + link=_str(info.get("link")), + connected=_str(info.get("connected")), + state=_str(info.get("state")), + mtu=_int(info.get("mtu")), + address=_str(info.get("address")), + mask=_str(info.get("mask")), + uptime=_int(info.get("uptime")), + security_level=_str(info.get("security-level")), + mac=_str(info.get("mac")), + ) + + +def _str(value: Optional[any]) -> Optional[str]: + if value is None: + return None + + return str(value) + + +def _int(value: Optional[any]) -> Optional[int]: + if value is None: + return None + + return int(value) diff --git a/setup.py b/setup.py index 79f32bb..b05b780 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ long_description=long_description, long_description_content_type="text/markdown", url="https://github.com/foxel/python_ndms2_client", - packages=setuptools.find_packages(exclude=['tests']), + packages=setuptools.find_packages(exclude=["tests"]), classifiers=( "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.6", diff --git a/tests/test_compile.py b/tests/test_compile.py index 393fd4c..1033916 100755 --- a/tests/test_compile.py +++ b/tests/test_compile.py @@ -1,12 +1,14 @@ #!/usr/bin/python3 import os import sys -sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) # this is a dummy test showing that code compiles def test_compile(): - from ndms2_client import TelnetConnection, Client + from ndms2_client import Client, HttpConnection, TelnetConnection + assert HttpConnection is not None assert TelnetConnection is not None assert Client is not None diff --git a/tests/test_http_connection.py b/tests/test_http_connection.py new file mode 100644 index 0000000..00e4278 --- /dev/null +++ b/tests/test_http_connection.py @@ -0,0 +1,39 @@ +import os +import sys +from unittest.mock import MagicMock, Mock, PropertyMock, patch + +import pytest + +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + + +from ndms2_client.command import Command +from ndms2_client.connection import HttpConnection + + +@pytest.fixture +def connection(): + con = HttpConnection("192.168.111.111", 80, "admin", "admin") + mock = MagicMock() + mock.read.return_value = b"{}" + con._open = MagicMock(wraps=Mock(return_value=mock)) + return con + + +@pytest.mark.parametrize( + ("command", "url"), + [ + (Command.VERSION, "/rci/show/version/"), + (Command.ARP, "/rci/show/ip/arp/"), + (Command.ASSOCIATIONS, "/rci/show/associations/"), + (Command.HOTSPOT, "/rci/show/ip/hotspot/"), + (Command.INTERFACES, "/rci/show/interface/"), + ], +) +@patch.object(HttpConnection, "connected", new_callable=PropertyMock) +def test_something(mocked_connected, connection, command, url): + full_url = f"{connection._scheme}://{connection._host}:{connection._port}{url}" + mocked_connected.return_value = True + connection.run_command(command) + connection._open.assert_called_once() + connection._open.assert_called_with(full_url) diff --git a/tests/test_parse_dict_lines.py b/tests/test_parse_dict_lines.py index 6f09cb5..9f1686a 100644 --- a/tests/test_parse_dict_lines.py +++ b/tests/test_parse_dict_lines.py @@ -4,35 +4,34 @@ import pytest -sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) # noinspection PyProtectedMember def test_parse_dict_lines(dict_text): - from ndms2_client.client import _parse_dict_lines + from ndms2_client.connection.telnet import _parse_dict_lines - _parse_dict_lines(dict_text.split('\n')) + _parse_dict_lines(dict_text.split("\n")) def test_hotspot_data(hostpot_sample: Tuple[str, int]): - from ndms2_client.client import _parse_dict_lines + from ndms2_client.connection.telnet import _parse_dict_lines sample, expected_hosts = hostpot_sample - parsed = _parse_dict_lines(sample.split('\n')) - - print(parsed['host']) + parsed = _parse_dict_lines(sample.split("\n")) if expected_hosts > 1: - assert isinstance(parsed['host'], list) - assert len(parsed['host']) == expected_hosts + assert isinstance(parsed["host"], list) + assert len(parsed["host"]) == expected_hosts else: - assert isinstance(parsed['host'], dict) + assert isinstance(parsed["host"], dict) @pytest.fixture(params=range(4)) def dict_text(request): - data = [''' + data = [ + """ station: mac: 60:ff:ff:ff:ff:ff @@ -146,7 +145,8 @@ def dict_text(request): mcs: 9 txss: 1 -''', ''' +""", + """ buttons: button, name = RESET: @@ -182,7 +182,8 @@ def dict_text(request): hold_delay: 3000 -''', ''' +""", + """ id: WifiMaster0/AccessPoint0 index: 0 @@ -203,7 +204,8 @@ def dict_text(request): ssid: home encryption: wpa2,wpa3 -''', ''' +""", + """ release: v2.08(AAUR.4)C2 arch: mips @@ -235,14 +237,16 @@ def dict_text(request): device: Keenetic 4G III class: Internet Center -'''] +""", + ] return data[request.param] @pytest.fixture(params=range(2)) def hostpot_sample(request) -> Tuple[str, int]: samples = [ - (''' + ( + """ host: mac: dc:09:xx:xx:xx:xx @@ -307,8 +311,11 @@ def hostpot_sample(request) -> Tuple[str, int]: mode: mac schedule: - ''', 2), - (''' + """, + 2, + ), + ( + """ host: mac: 74:ff:ff:ff:ff:ff via: 74:ff:ff:ff:ff:ff @@ -458,7 +465,9 @@ def hostpot_sample(request) -> Tuple[str, int]: mac-access, id = Bridge0: permit - ''', 4) + """, + 4, + ), ] return samples[request.param]