Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 161 additions & 0 deletions src/nethsec/inventory/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
#


from math import floor
from euci import EUci
from nethsec import utils, mwan, users, firewall, objects
import os
import re
import subprocess
import configparser
import json
import hashlib

# run a bash command and return the error code
def _run_status(cmd):
Expand Down Expand Up @@ -530,3 +532,162 @@ def fact_default_password(uci: EUci):
result = subprocess.run(['/bin/ubus', 'call', 'session', 'login', json.dumps(data)], capture_output=True)

return { 'default_password': result.returncode == 0 }

def info_fqdn(uci: EUci):
system = uci.get_all('system')
for section in system:
for option in system[section]:
if option == 'hostname':
return anonymize(system[section][option], uci)
return ''

Comment thread
gsanchietti marked this conversation as resolved.
def info_kernel_version(uci: EUci):
try:
with open('/proc/version', 'r') as f:
version = f.read().strip()
return version.split()[2]
except:
Comment thread
gsanchietti marked this conversation as resolved.
return ''
Comment thread
gsanchietti marked this conversation as resolved.

def info_uptime_seconds(uci: EUci):
try:
with open('/proc/uptime', 'r') as f:
uptime = f.read().strip().split()[0]
return floor(float(uptime))
except:
Comment thread
gsanchietti marked this conversation as resolved.
return 0

def info_default_ipv4(uci: EUci):
# first method: dig -4 TXT +short o-o.myaddr.l.google.com @ns1.google.com
try:
res = subprocess.run(['dig', '-4', 'TXT', '+short', 'o-o.myaddr.l.google.com', '@ns1.google.com'],
capture_output=True, text=True, timeout=3)
Comment thread
gsanchietti marked this conversation as resolved.
if res.returncode == 0 and res.stdout.strip():
ip = res.stdout.strip().strip('"')
if ip:
return anonymize(ip, uci)
except:
Comment thread
gsanchietti marked this conversation as resolved.
Comment thread
gsanchietti marked this conversation as resolved.
pass

# second method: curl -4 ifconfig.co
try:
res = subprocess.run(['curl', '-4', '-s', 'ifconfig.co'],
capture_output=True, text=True, timeout=3)
Comment thread
gsanchietti marked this conversation as resolved.
if res.returncode == 0 and res.stdout.strip():
return anonymize(res.stdout.strip(), uci)
except:
Comment thread
gsanchietti marked this conversation as resolved.
Comment thread
gsanchietti marked this conversation as resolved.
pass

# third method: get the first WAN device and its IPv4
try:
wan_devices = utils.get_all_wan_devices(uci)
if wan_devices:
first_wan = wan_devices[0]
res = subprocess.run(['ip', '-4', '-j', 'addr', 'show', first_wan],
capture_output=True, text=True, timeout=3)
if res.returncode == 0:
addr_info = json.loads(res.stdout)
if addr_info and 'addr_info' in addr_info[0]:
for addr in addr_info[0]['addr_info']:
if addr.get('family') == 'inet':
return anonymize(addr.get('local'), uci)
Comment thread
gsanchietti marked this conversation as resolved.
except:
Comment thread
gsanchietti marked this conversation as resolved.
Comment thread
gsanchietti marked this conversation as resolved.
pass

return ''

def info_default_ipv6(uci: EUci):
# Get the first WAN device and its IPv6
ipv6 = ''
try:
wan_devices = utils.get_all_wan_devices(uci)
if wan_devices:
first_wan = wan_devices[0]
res = subprocess.run(['ip', '-6', '-j', 'addr', 'show', first_wan],
capture_output=True, text=True, timeout=3)
if res.returncode == 0:
addr_info = json.loads(res.stdout)
if addr_info and 'addr_info' in addr_info[0]:
for addr in addr_info[0]['addr_info']:
if addr.get('family') == 'inet6' and addr.get('scope') == 'global':
ipv6 = addr.get('local')
Comment thread
gsanchietti marked this conversation as resolved.
except:
Comment thread
gsanchietti marked this conversation as resolved.
Comment thread
gsanchietti marked this conversation as resolved.
pass

# If WAN has no IPv6, assume that IPv6 is not configured: speedup data collection
if ipv6 == '':
return ''

# first method: dig -6 TXT +short o-o.myaddr.l.google.com @ns1.google.com
try:
res = subprocess.run(['dig', '-6', 'TXT', '+short', 'o-o.myaddr.l.google.com', '@ns1.google.com'],
capture_output=True, text=True, timeout=3)
if res.returncode == 0 and res.stdout.strip():
ip = res.stdout.strip().strip('"')
if ip:
return anonymize(ip, uci)
except:
Comment thread
gsanchietti marked this conversation as resolved.
Comment thread
gsanchietti marked this conversation as resolved.
pass

# second method: curl -6 ifconfig.co
try:
res = subprocess.run(['curl', '-6', '-s', 'ifconfig.co'],
capture_output=True, text=True, timeout=3)
if res.returncode == 0 and res.stdout.strip():
return anonymize(res.stdout.strip(), uci)
except:
Comment thread
gsanchietti marked this conversation as resolved.
Comment thread
gsanchietti marked this conversation as resolved.
pass


return anonymize(ipv6, uci)

def anonymize(value, uci: EUci):
if fact_subscription_status(uci).get('status', 'no') != "no":
return value
h = hashlib.sha256(value.encode()).hexdigest()
return f"anon-{h[:16]}"
Comment thread
gsanchietti marked this conversation as resolved.

def info_package_updates_available(uci: EUci):
"""Check if package updates are available"""
try:
res = subprocess.run(['/usr/libexec/rpcd/ns.update', 'call', 'check-package-updates'],
capture_output=True, text=True, timeout=10)
Comment thread
gsanchietti marked this conversation as resolved.
if res.returncode == 0:
data = json.loads(res.stdout)
if isinstance(data, dict) and 'updates' in data:
updates = data['updates']
if isinstance(updates, list) and len(updates) > 0:
return True
except:
Comment thread
gsanchietti marked this conversation as resolved.
Comment thread
gsanchietti marked this conversation as resolved.
pass
return False

def parse_version(version_str):
Comment thread
gsanchietti marked this conversation as resolved.
# Remove "NethSecurity " prefix if present
if version_str.startswith('NethSecurity '):
version_str = version_str[13:] # len('NethSecurity ') = 13
# Take only the part before "-"
version_str = version_str.split('-')[0]
# Convert to tuple of integers for comparison
try:
return tuple(int(x) for x in version_str.split('.'))
Comment thread
gsanchietti marked this conversation as resolved.
except (ValueError, AttributeError):
return ()
Comment thread
gsanchietti marked this conversation as resolved.
def info_image_updates_available(uci: EUci):
"""Check if system image updates are available"""
try:
res = subprocess.run(['/usr/libexec/rpcd/ns.update', 'call', 'check-system-update'],
capture_output=True, text=True, timeout=10)
if res.returncode == 0:
data = json.loads(res.stdout)
current_version = data.get('currentVersion', '')
last_version = data.get('lastVersion', '')

if current_version and last_version:
current = parse_version(current_version)
last = parse_version(last_version)
if current and last and last > current:
return True
except:
Comment thread
gsanchietti marked this conversation as resolved.
Comment thread
gsanchietti marked this conversation as resolved.
pass
return False
Loading