-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtools.py
More file actions
133 lines (111 loc) · 4.72 KB
/
tools.py
File metadata and controls
133 lines (111 loc) · 4.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import logging
import subprocess
from typing import List, Optional
from DoNetAgent import NetAgent # Import NetAgent class
logger = logging.getLogger(__name__)
# Constants for subprocess commands
PING_COMMAND = ['ping', '-c', '1', '-W', '2']
NMAP_COMMAND = ['nmap', '-sT', '-p', '1-1000']
TIMEOUT_SECONDS = 30
def _run_subprocess(command: List[str], host: str) -> subprocess.CompletedProcess:
"""Runs a subprocess command with the specified host.
Args:
command (List[str]): The command and its arguments to execute.
host (str): The target host for the command.
Returns:
subprocess.CompletedProcess: The result of the subprocess execution.
Raises:
subprocess.SubprocessError: If the subprocess execution fails.
"""
try:
return subprocess.run(
command + [host],
capture_output=True,
text=True,
timeout=TIMEOUT_SECONDS
)
except subprocess.SubprocessError as e:
logger.error(f"Subprocess error for command {command} on host {host}: {str(e)}")
raise
def ping_host(host: str) -> str:
"""Checks the availability of a host using the ping command.
Args:
host (str): The IP address or hostname to ping.
Returns:
str: A message indicating whether the host is reachable or not, or an error message.
"""
try:
result = _run_subprocess(PING_COMMAND, host)
if result.returncode == 0:
logger.info(f"Host {host} is reachable.")
return f"Host {host} is reachable."
logger.warning(f"Host {host} is unreachable.")
return f"Host {host} is unreachable: {result.stderr}"
except subprocess.SubprocessError as e:
logger.error(f"Ping error for host {host}: {str(e)}")
return f"Ping error: {str(e)}"
def port_scan(host: str) -> str:
"""Scans ports on a host using nmap.
Args:
host (str): The IP address or hostname to scan.
Returns:
str: The nmap scan output or an error message if the host is unreachable or scanning fails.
"""
ping_result = ping_host(host)
if "unreachable" in ping_result.lower():
logger.warning(f"Port scan canceled for {host}: host is unreachable.")
return "Host unreachable, scan canceled."
try:
result = _run_subprocess(NMAP_COMMAND, host)
logger.info(f"Port scan result for {host}: {result.stdout}")
return result.stdout
except subprocess.SubprocessError as e:
logger.error(f"Port scan error for {host}: {str(e)}")
return f"Scan error: {str(e)}"
def simulate_port_scan(host: str) -> str:
"""Simulates a port scan by returning a predefined JSON response.
Args:
host (str): The IP address or hostname to simulate scanning.
Returns:
str: A JSON string with simulated scan results.
"""
logger.info(f"Simulating port scan for {host}")
return f'{{"host": "{host}", "open_ports": [22, 80, 443], "scan_time": "2025-09-11T12:00:00Z"}}'
def netmiko_show(host: str, command: str, username: str, password: str, device_type: str = 'cisco_ios') -> str:
"""Execute a show command on the network device using Netmiko.
Args:
host (str): The IP address or hostname.
command (str): The show command to execute.
username (str): Username for authentication.
password (str): Password for authentication.
device_type (str): Device type (default 'cisco_ios').
Returns:
str: The output from the command or error message.
"""
try:
agent = NetAgent(host=host, username=username, password=password, device_type=device_type)
result = agent.execute_show(command)
agent.disconnect()
return result
except Exception as e:
logger.error(f"Netmiko show error for {host}: {str(e)}")
return f"Error executing show command: {str(e)}"
def netmiko_set(host: str, commands: List[str], username: str, password: str, device_type: str = 'cisco_ios') -> str:
"""Execute set commands on the network device using Netmiko.
Args:
host (str): The IP address or hostname.
commands (List[str]): List of configuration commands.
username (str): Username for authentication.
password (str): Password for authentication.
device_type (str): Device type (default 'cisco_ios').
Returns:
str: The output from the commands or error message.
"""
try:
agent = NetAgent(host=host, username=username, password=password, device_type=device_type)
result = agent.execute_set(commands)
agent.disconnect()
return result
except Exception as e:
logger.error(f"Netmiko set error for {host}: {str(e)}")
return f"Error executing set commands: {str(e)}"