-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstructures.py
More file actions
234 lines (189 loc) · 7.8 KB
/
structures.py
File metadata and controls
234 lines (189 loc) · 7.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
"""Shared data structures for openclaw-scanner.
All TypedDicts used across openclaw_usage.py and platform_compat/.
"""
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Literal, Optional, TypedDict
# CLI command as list of string parts (ex. ["/usr/bin/openclaw"] or ["npx", "openclaw"])
CliCommand = List[str]
# =============================================================================
# Variant Detection
# =============================================================================
# Supported product names (ordered newest → oldest, then nano variants)
CLAWDBOT_VARIANT_NAMES = ["openclaw", "moltbot", "clawdbot", "nanobot", "picoclaw"]
# Scanning strategy: determines which scanning approach to use, based on detected variant
ScanningStrategy = Literal["openclaw", "nano"]
# Mapping from detected bot variant to scanning strategy to use.
SCANNING_STRATEGY_MAP: Dict[str, ScanningStrategy] = {
"openclaw": "openclaw",
"moltbot": "openclaw",
"clawdbot": "openclaw",
"nanobot": "nano",
"picoclaw": "nano",
}
@dataclass
class ClawdbotInstallInfo:
"""Information about a detected bot installation (OpenClaw family then nano/pico variants)."""
bot_variant: str # detected variant: one of CLAWDBOT_VARIANT_NAMES
bot_cli_cmd: CliCommand # Command to run bot CLI: ["/usr/bin/openclaw"]
bot_config_dir: Path # Bot config directory: ~/.openclaw/
scanning_strategy: str # ScanningStrategy to use with the detected bot_variant scan
# =============================================================================
# Platform Compat Structures (used by platform_compat/)
# =============================================================================
class UserInfo(TypedDict):
"""User information returned by get_user_info()."""
username: str # e.g. "john"
home_directory: str # e.g. "/Users/john"
user_id: Optional[str] # Unix: "501", Windows: None
group_id: Optional[str] # Unix: "20", Windows: None
full_name: Optional[str] # e.g. "John Doe" or None
shell: str # e.g. "/bin/zsh" or "cmd.exe"
groups: List[str] # e.g. ["staff", "admin"], never None
system_info: Optional[str] # uname output or equivalent
computer_name: Optional[str] # hostname or friendly name
local_hostname: Optional[str] # macOS only, others: None
class OsInfo(TypedDict):
"""Operating system information."""
system: str # e.g. "Darwin", "Linux", "Windows"
release: str # e.g. "24.6.0"
version: str # full version string
machine: str # e.g. "arm64", "x86_64"
class SystemInfo(TypedDict):
"""System information for the host being scanned (used by openclaw_usage.py)."""
username: str # e.g. "john"
home_dir: str # e.g. "/Users/john"
current_dir: str # current working directory
hostname: str # machine hostname
os: OsInfo # nested OS details
shell: str # e.g. "/bin/zsh"
lang: str # e.g. "en_US.UTF-8"
class ProcessInfo(TypedDict):
"""Process info returned by find_processes()."""
user: str # process owner, "" if unavailable
pid: str # process ID as string
cpu: str # CPU %, "" if unavailable (Windows)
mem: str # memory %, "" if unavailable (Windows)
command: str # full command line
process_name: str # the filter that matched (from input list)
class ToolPaths(TypedDict):
"""Tool paths returned by get_tool_paths()."""
config: List[str] # paths to check for config files
logs: List[str] # paths/globs for log files
# =============================================================================
# Tool Configuration
# =============================================================================
class ToolConfig(TypedDict, total=False):
"""Configuration for a tool to scan."""
config_paths: List[str]
log_paths: List[str]
workspace_path: str
process_names: List[str]
default_port: Optional[int]
binary_names: List[str]
macos_log_subsystem: str
# =============================================================================
# Discovered Data Structures
# =============================================================================
class ApiKeyInfo(TypedDict):
"""Information about a discovered API key."""
path: str
value_masked: str
length: int
class Integration(TypedDict, total=False):
"""An integration/channel/service connection."""
name: str
type: str
source: str
enabled: bool
config: Dict[str, Any] # Free-form config, varies per integration
description: str
details: Dict[str, Any] # Free-form details
class SkillInfo(TypedDict, total=False):
"""Information about a skill/extension/plugin."""
name: str
type: str
path: str
has_manifest: bool
has_skill_md: bool
files: List[str]
description: Optional[str]
homepage: Optional[str]
emoji: Optional[str]
requires: List[str]
install_methods: List[str]
connected_services: List[str]
enabled: bool
has_api_key: bool
has_env: bool
has_config: bool
source: str
class ServiceStats(TypedDict, total=False):
"""Statistics for an accessed service."""
resource: str
service_name: Optional[str]
category: str
access_count: int
success_count: int
failure_count: int
first_seen: Optional[str]
last_seen: Optional[str]
access_types: List[str]
class AccessedApp(TypedDict, total=False):
"""An accessed app/service event."""
resource: str
access_type: str
status: str
service_name: Optional[str]
service_category: str
timestamp: Optional[str]
line_number: int
log_file: str
line_sample: str
class ConfigFile(TypedDict):
"""A configuration file and its parsed data."""
path: str
data: Dict[str, Any] # Config structure varies per tool
# =============================================================================
# Aggregate/Summary Structures
# =============================================================================
class AccessedAppsSummary(TypedDict):
"""Summary of accessed apps/services."""
total_access_events: int
by_category: Dict[str, List[ServiceStats]]
by_status: Dict[str, int]
unique_services: List[ServiceStats]
access_timeline: List[Dict[str, Any]] # Timeline events, variable structure
class WorkspaceLibrary(TypedDict, total=False):
"""Connected apps and integrations from workspace."""
connected_apps: List[Integration]
channels: List[Integration]
skills: List[SkillInfo]
skills_from_config: List[SkillInfo]
extensions: List[SkillInfo]
plugins: List[SkillInfo]
mcp_servers: List[Integration]
oauth_credentials: List[Dict[str, Any]] # Varies per provider
tools_md_integrations: List[Integration]
# =============================================================================
# Scan Results
# =============================================================================
class ScanResult(TypedDict, total=False):
"""Result of scanning a single tool."""
tool_name: str
installed: bool
installation_details: Dict[str, Any] # Varies: binary_path, npm, workspace_exists
active: bool
processes: List[ProcessInfo]
port_listening: bool
default_port: Optional[int]
config_files: List[ConfigFile]
log_files: List[str]
log_sources: List[str]
connections: List[Dict[str, Any]] # Connection info from log parsing
api_keys_found: List[ApiKeyInfo]
accessed_apps: List[AccessedApp]
accessed_apps_summary: AccessedAppsSummary
integrations: List[Integration]
skills: List[SkillInfo]
workspace_library: WorkspaceLibrary