diff --git a/src/nethsec/inventory/__init__.py b/src/nethsec/inventory/__init__.py index 936ec3b4..31f0b006 100644 --- a/src/nethsec/inventory/__init__.py +++ b/src/nethsec/inventory/__init__.py @@ -6,6 +6,7 @@ # +from math import floor from euci import EUci from nethsec import utils, mwan, users, firewall, objects import os @@ -13,6 +14,7 @@ import subprocess import configparser import json +import hashlib # run a bash command and return the error code def _run_status(cmd): @@ -530,3 +532,162 @@ def fact_default_password(uci: EUci): result = subprocess.run(['/bin/ubus', 'call', 'session', 'login', json.dumps(data)], capture_output=True) return { 'default_password': result.returncode == 0 } + +def info_fqdn(uci: EUci): + system = uci.get_all('system') + for section in system: + for option in system[section]: + if option == 'hostname': + return anonymize(system[section][option], uci) + return '' + +def info_kernel_version(uci: EUci): + try: + with open('/proc/version', 'r') as f: + version = f.read().strip() + return version.split()[2] + except: + return '' + +def info_uptime_seconds(uci: EUci): + try: + with open('/proc/uptime', 'r') as f: + uptime = f.read().strip().split()[0] + return floor(float(uptime)) + except: + return 0 + +def info_default_ipv4(uci: EUci): + # first method: dig -4 TXT +short o-o.myaddr.l.google.com @ns1.google.com + try: + res = subprocess.run(['dig', '-4', 'TXT', '+short', 'o-o.myaddr.l.google.com', '@ns1.google.com'], + capture_output=True, text=True, timeout=3) + if res.returncode == 0 and res.stdout.strip(): + ip = res.stdout.strip().strip('"') + if ip: + return anonymize(ip, uci) + except: + pass + + # second method: curl -4 ifconfig.co + try: + res = subprocess.run(['curl', '-4', '-s', 'ifconfig.co'], + capture_output=True, text=True, timeout=3) + if res.returncode == 0 and res.stdout.strip(): + return anonymize(res.stdout.strip(), uci) + except: + pass + + # third method: get the first WAN device and its IPv4 + try: + wan_devices = utils.get_all_wan_devices(uci) + if wan_devices: + first_wan = wan_devices[0] + res = subprocess.run(['ip', '-4', '-j', 'addr', 'show', first_wan], + capture_output=True, text=True, timeout=3) + if res.returncode == 0: + addr_info = json.loads(res.stdout) + if addr_info and 'addr_info' in addr_info[0]: + for addr in addr_info[0]['addr_info']: + if addr.get('family') == 'inet': + return anonymize(addr.get('local'), uci) + except: + pass + + return '' + +def info_default_ipv6(uci: EUci): + # Get the first WAN device and its IPv6 + ipv6 = '' + try: + wan_devices = utils.get_all_wan_devices(uci) + if wan_devices: + first_wan = wan_devices[0] + res = subprocess.run(['ip', '-6', '-j', 'addr', 'show', first_wan], + capture_output=True, text=True, timeout=3) + if res.returncode == 0: + addr_info = json.loads(res.stdout) + if addr_info and 'addr_info' in addr_info[0]: + for addr in addr_info[0]['addr_info']: + if addr.get('family') == 'inet6' and addr.get('scope') == 'global': + ipv6 = addr.get('local') + except: + pass + + # If WAN has no IPv6, assume that IPv6 is not configured: speedup data collection + if ipv6 == '': + return '' + + # first method: dig -6 TXT +short o-o.myaddr.l.google.com @ns1.google.com + try: + res = subprocess.run(['dig', '-6', 'TXT', '+short', 'o-o.myaddr.l.google.com', '@ns1.google.com'], + capture_output=True, text=True, timeout=3) + if res.returncode == 0 and res.stdout.strip(): + ip = res.stdout.strip().strip('"') + if ip: + return anonymize(ip, uci) + except: + pass + + # second method: curl -6 ifconfig.co + try: + res = subprocess.run(['curl', '-6', '-s', 'ifconfig.co'], + capture_output=True, text=True, timeout=3) + if res.returncode == 0 and res.stdout.strip(): + return anonymize(res.stdout.strip(), uci) + except: + pass + + + return anonymize(ipv6, uci) + +def anonymize(value, uci: EUci): + if fact_subscription_status(uci).get('status', 'no') != "no": + return value + h = hashlib.sha256(value.encode()).hexdigest() + return f"anon-{h[:16]}" + +def info_package_updates_available(uci: EUci): + """Check if package updates are available""" + try: + res = subprocess.run(['/usr/libexec/rpcd/ns.update', 'call', 'check-package-updates'], + capture_output=True, text=True, timeout=10) + if res.returncode == 0: + data = json.loads(res.stdout) + if isinstance(data, dict) and 'updates' in data: + updates = data['updates'] + if isinstance(updates, list) and len(updates) > 0: + return True + except: + pass + return False + +def parse_version(version_str): + # Remove "NethSecurity " prefix if present + if version_str.startswith('NethSecurity '): + version_str = version_str[13:] # len('NethSecurity ') = 13 + # Take only the part before "-" + version_str = version_str.split('-')[0] + # Convert to tuple of integers for comparison + try: + return tuple(int(x) for x in version_str.split('.')) + except (ValueError, AttributeError): + return () +def info_image_updates_available(uci: EUci): + """Check if system image updates are available""" + try: + res = subprocess.run(['/usr/libexec/rpcd/ns.update', 'call', 'check-system-update'], + capture_output=True, text=True, timeout=10) + if res.returncode == 0: + data = json.loads(res.stdout) + current_version = data.get('currentVersion', '') + last_version = data.get('lastVersion', '') + + if current_version and last_version: + current = parse_version(current_version) + last = parse_version(last_version) + if current and last and last > current: + return True + except: + pass + return False diff --git a/tests/test_inventory.py b/tests/test_inventory.py index 88908e01..f0ede5e1 100644 --- a/tests/test_inventory.py +++ b/tests/test_inventory.py @@ -1,4 +1,6 @@ from euci import EUci +from unittest.mock import patch, mock_open, MagicMock +import subprocess from nethsec import inventory @@ -967,3 +969,336 @@ def test_fact_adblock(tmp_path): u = _setup_db(tmp_path) result = inventory.fact_adblock(u) assert result == {"enabled": True, "community": 5, "enterprise": 2} + +# Tests for info_* helper functions + +def test_info_kernel_version(): + """Test info_kernel_version reads from /proc/version""" + u = EUci() + + # Test successful read + mock_version = "Linux version 5.10.176 (builder@nethsec) (gcc version 11.2.0) #0 SMP Mon Jan 1 00:00:00 2024\n" + with patch('builtins.open', mock_open(read_data=mock_version)): + result = inventory.info_kernel_version(u) + assert result == "5.10.176" + + # Test exception handling (file not found) + with patch('builtins.open', side_effect=FileNotFoundError): + result = inventory.info_kernel_version(u) + assert result == '' + +def test_info_uptime_seconds(): + """Test info_uptime_seconds reads from /proc/uptime""" + u = EUci() + + # Test successful read + mock_uptime = "12345.67 98765.43\n" + with patch('builtins.open', mock_open(read_data=mock_uptime)): + result = inventory.info_uptime_seconds(u) + assert result == 12345 + + # Test exception handling + with patch('builtins.open', side_effect=FileNotFoundError): + result = inventory.info_uptime_seconds(u) + assert result == 0 + +def test_info_fqdn(tmp_path): + """Test info_fqdn retrieves hostname from UCI""" + # Setup system config + system_db = """ +config system + option hostname 'myhost' + option domain 'example.com' +""" + with tmp_path.joinpath('system').open('w') as fp: + fp.write(system_db) + with tmp_path.joinpath('ns-plug').open('w') as fp: + fp.write("config config\n\toption type 'no'\n") + + u = EUci(confdir=tmp_path.as_posix()) + result = inventory.info_fqdn(u) + # Should be anonymized since no subscription + assert result.startswith('anon-') + +def test_info_default_ipv4_dig_method(tmp_path): + """Test info_default_ipv4 with dig method""" + with tmp_path.joinpath('ns-plug').open('w') as fp: + fp.write("config config\n\toption type 'no'\n") + u = EUci(confdir=tmp_path.as_posix()) + + # Mock successful dig command + mock_result = MagicMock() + mock_result.returncode = 0 + mock_result.stdout = '"192.0.2.1"\n' + + with patch('subprocess.run', return_value=mock_result): + result = inventory.info_default_ipv4(u) + assert result.startswith('anon-') # Should be anonymized + +def test_info_default_ipv4_curl_fallback(tmp_path): + """Test info_default_ipv4 falls back to curl when dig fails""" + with tmp_path.joinpath('ns-plug').open('w') as fp: + fp.write("config config\n\toption type 'no'\n") + u = EUci(confdir=tmp_path.as_posix()) + + # Mock dig failure and curl success + def mock_subprocess_run(cmd, **kwargs): + result = MagicMock() + if 'dig' in cmd: + result.returncode = 1 + result.stdout = '' + elif 'curl' in cmd: + result.returncode = 0 + result.stdout = '203.0.113.1\n' + else: + result.returncode = 1 + result.stdout = '' + return result + + with patch('subprocess.run', side_effect=mock_subprocess_run): + result = inventory.info_default_ipv4(u) + assert result.startswith('anon-') + +def test_info_default_ipv4_ip_command_fallback(tmp_path): + """Test info_default_ipv4 falls back to ip command when dig and curl fail""" + with tmp_path.joinpath('network').open('w') as fp: + fp.write(network_db) + with tmp_path.joinpath('firewall').open('w') as fp: + fp.write(firewall_db) + with tmp_path.joinpath('ns-plug').open('w') as fp: + fp.write("config config\n\toption type 'no'\n") + u = EUci(confdir=tmp_path.as_posix()) + + # Mock dig and curl failure, ip command success + def mock_subprocess_run(cmd, **kwargs): + result = MagicMock() + if isinstance(cmd, list) and 'ip' in cmd: + result.returncode = 0 + result.stdout = '[{"addr_info": [{"family": "inet", "local": "198.51.100.1"}]}]' + else: + result.returncode = 1 + result.stdout = '' + return result + + with patch('subprocess.run', side_effect=mock_subprocess_run): + with patch('nethsec.utils.get_all_wan_devices', return_value=['eth0']): + result = inventory.info_default_ipv4(u) + assert result.startswith('anon-') + +def test_info_default_ipv4_all_methods_fail(tmp_path): + """Test info_default_ipv4 returns empty string when all methods fail""" + with tmp_path.joinpath('ns-plug').open('w') as fp: + fp.write("config config\n\toption type 'no'\n") + u = EUci(confdir=tmp_path.as_posix()) + + # Mock all methods failing + mock_result = MagicMock() + mock_result.returncode = 1 + mock_result.stdout = '' + + with patch('subprocess.run', return_value=mock_result): + with patch('nethsec.utils.get_all_wan_devices', return_value=[]): + result = inventory.info_default_ipv4(u) + assert result == '' + +def test_info_default_ipv6_no_wan_ipv6(tmp_path): + """Test info_default_ipv6 returns empty when no IPv6 on WAN""" + with tmp_path.joinpath('ns-plug').open('w') as fp: + fp.write("config config\n\toption type 'no'\n") + u = EUci(confdir=tmp_path.as_posix()) + + # Mock no WAN devices or no IPv6 + mock_result = MagicMock() + mock_result.returncode = 0 + mock_result.stdout = '[]' + + with patch('subprocess.run', return_value=mock_result): + with patch('nethsec.utils.get_all_wan_devices', return_value=['eth0']): + result = inventory.info_default_ipv6(u) + assert result == '' + +def test_info_default_ipv6_with_wan_ipv6(tmp_path): + """Test info_default_ipv6 with IPv6 on WAN""" + with tmp_path.joinpath('ns-plug').open('w') as fp: + fp.write("config config\n\toption type 'no'\n") + u = EUci(confdir=tmp_path.as_posix()) + + # Mock WAN with IPv6 + def mock_subprocess_run(cmd, **kwargs): + result = MagicMock() + if isinstance(cmd, list) and 'ip' in cmd and '-6' in cmd: + result.returncode = 0 + result.stdout = '[{"addr_info": [{"family": "inet6", "local": "2001:db8::1", "scope": "global"}]}]' + elif isinstance(cmd, list) and 'dig' in cmd: + result.returncode = 0 + result.stdout = '"2001:db8::2"\n' + else: + result.returncode = 1 + result.stdout = '' + return result + + with patch('subprocess.run', side_effect=mock_subprocess_run): + with patch('nethsec.utils.get_all_wan_devices', return_value=['eth0']): + result = inventory.info_default_ipv6(u) + assert result.startswith('anon-') + +def test_info_default_ipv6_fallback_to_curl(tmp_path): + """Test info_default_ipv6 falls back to curl when dig fails""" + with tmp_path.joinpath('ns-plug').open('w') as fp: + fp.write("config config\n\toption type 'no'\n") + u = EUci(confdir=tmp_path.as_posix()) + + # Mock WAN with IPv6, dig fails, curl succeeds + def mock_subprocess_run(cmd, **kwargs): + result = MagicMock() + if isinstance(cmd, list) and 'ip' in cmd and '-6' in cmd: + result.returncode = 0 + result.stdout = '[{"addr_info": [{"family": "inet6", "local": "2001:db8::1", "scope": "global"}]}]' + elif isinstance(cmd, list) and 'curl' in cmd: + result.returncode = 0 + result.stdout = '2001:db8::3\n' + else: + result.returncode = 1 + result.stdout = '' + return result + + with patch('subprocess.run', side_effect=mock_subprocess_run): + with patch('nethsec.utils.get_all_wan_devices', return_value=['eth0']): + result = inventory.info_default_ipv6(u) + assert result.startswith('anon-') + +def test_info_package_updates_available_with_updates(): + """Test info_package_updates_available when updates are available""" + u = EUci() + + mock_result = MagicMock() + mock_result.returncode = 0 + mock_result.stdout = '{"updates": ["package1", "package2"]}' + + with patch('subprocess.run', return_value=mock_result): + result = inventory.info_package_updates_available(u) + assert result is True + +def test_info_package_updates_available_no_updates(): + """Test info_package_updates_available when no updates available""" + u = EUci() + + mock_result = MagicMock() + mock_result.returncode = 0 + mock_result.stdout = '{"updates": []}' + + with patch('subprocess.run', return_value=mock_result): + result = inventory.info_package_updates_available(u) + assert result is False + +def test_info_package_updates_available_command_fails(): + """Test info_package_updates_available when command fails""" + u = EUci() + + mock_result = MagicMock() + mock_result.returncode = 1 + mock_result.stdout = '' + + with patch('subprocess.run', return_value=mock_result): + result = inventory.info_package_updates_available(u) + assert result is False + +def test_parse_version_with_prefix(): + """Test parse_version removes NethSecurity prefix""" + assert inventory.parse_version('NethSecurity 8.1.0-dev') == (8, 1, 0) + assert inventory.parse_version('NethSecurity 8.2.1') == (8, 2, 1) + +def test_parse_version_without_prefix(): + """Test parse_version handles versions without prefix""" + assert inventory.parse_version('8.1.0-dev') == (8, 1, 0) + assert inventory.parse_version('8.2.1') == (8, 2, 1) + +def test_parse_version_comparison(): + """Test parse_version version comparison logic""" + v1 = inventory.parse_version('8.1.0') + v2 = inventory.parse_version('8.2.0') + v3 = inventory.parse_version('8.1.1') + + assert v2 > v1 + assert v3 > v1 + assert v2 > v3 + +def test_parse_version_invalid(): + """Test parse_version handles invalid input""" + result = inventory.parse_version('invalid') + assert result == () + + result = inventory.parse_version('') + assert result == () + +def test_info_image_updates_available_update_exists(): + """Test info_image_updates_available when newer version exists""" + u = EUci() + + mock_result = MagicMock() + mock_result.returncode = 0 + mock_result.stdout = '{"currentVersion": "NethSecurity 8.1.0", "lastVersion": "NethSecurity 8.2.0"}' + + with patch('subprocess.run', return_value=mock_result): + result = inventory.info_image_updates_available(u) + assert result is True + +def test_info_image_updates_available_no_update(): + """Test info_image_updates_available when current version is latest""" + u = EUci() + + mock_result = MagicMock() + mock_result.returncode = 0 + mock_result.stdout = '{"currentVersion": "NethSecurity 8.2.0", "lastVersion": "NethSecurity 8.2.0"}' + + with patch('subprocess.run', return_value=mock_result): + result = inventory.info_image_updates_available(u) + assert result is False + +def test_info_image_updates_available_patch_update(): + """Test info_image_updates_available when a patch update is available""" + u = EUci() + + mock_result = MagicMock() + mock_result.returncode = 0 + mock_result.stdout = '{"currentVersion": "NethSecurity 8.2.0", "lastVersion": "NethSecurity 8.2.1"}' + + with patch('subprocess.run', return_value=mock_result): + result = inventory.info_image_updates_available(u) + assert result is True + +def test_info_image_updates_available_minor_update(): + """Test info_image_updates_available when a minor update is available""" + u = EUci() + + mock_result = MagicMock() + mock_result.returncode = 0 + mock_result.stdout = '{"currentVersion": "NethSecurity 8.2.0", "lastVersion": "NethSecurity 8.3.0"}' + + with patch('subprocess.run', return_value=mock_result): + result = inventory.info_image_updates_available(u) + assert result is True + +def test_info_image_updates_available_current_newer(): + """Test info_image_updates_available when current version is newer than last""" + u = EUci() + + mock_result = MagicMock() + mock_result.returncode = 0 + mock_result.stdout = '{"currentVersion": "NethSecurity 8.3.0", "lastVersion": "NethSecurity 8.2.0"}' + + with patch('subprocess.run', return_value=mock_result): + result = inventory.info_image_updates_available(u) + assert result is False + +def test_info_image_updates_available_command_fails(): + """Test info_image_updates_available when command fails""" + u = EUci() + + mock_result = MagicMock() + mock_result.returncode = 1 + mock_result.stdout = '' + + with patch('subprocess.run', return_value=mock_result): + result = inventory.info_image_updates_available(u) + assert result is False