-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprovenance.py
More file actions
202 lines (166 loc) · 7.29 KB
/
provenance.py
File metadata and controls
202 lines (166 loc) · 7.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
"""Command provenance tracking for memory forensics operations"""
import json
from pathlib import Path
from typing import Dict, List, Any, Optional
from database import ForensicsDatabase
class ProvenanceTracker:
"""Tracks Volatility command executions for audit trail and reproducibility"""
def __init__(self, db: ForensicsDatabase):
self.db = db
async def log_command(self, dump_id: str, plugin_name: str,
dump_path: Path, parameters: Dict[str, Any] = None,
execution_time_ms: int = 0, row_count: int = 0,
success: bool = True, error: str = None):
"""
Log a Volatility command execution
Args:
dump_id: Unique identifier for the memory dump
plugin_name: Full plugin name (e.g., 'volatility3.plugins.windows.pslist.PsList')
dump_path: Path to the memory dump file
parameters: Plugin-specific parameters (e.g., {'pid': 1234})
execution_time_ms: Execution time in milliseconds
row_count: Number of rows returned
success: Whether the command succeeded
error: Error message if failed
"""
# Build command line equivalent
command_line = self._build_command_line(dump_path, plugin_name, parameters)
# Serialize parameters to JSON
params_json = json.dumps(parameters) if parameters else None
# Store in database
await self.db.add_command_log(
dump_id=dump_id,
plugin_name=plugin_name,
command_line=command_line,
parameters=params_json,
execution_time_ms=execution_time_ms,
row_count=row_count,
success=success,
error_message=error
)
def _build_command_line(self, dump_path: Path, plugin_name: str,
parameters: Dict[str, Any] = None) -> str:
"""
Build vol.py command-line equivalent for reproduction
Args:
dump_path: Path to memory dump file
plugin_name: Full plugin class name
parameters: Plugin-specific parameters
Returns:
Command line string like: "vol.py -f dump.raw windows.pslist --pid 1234"
"""
# Convert full plugin class name to short name
# Example: volatility3.plugins.windows.pslist.PsList -> windows.pslist
short_name = self._get_short_plugin_name(plugin_name)
# Build base command
cmd = f"vol.py -f {dump_path} {short_name}"
# Add plugin-specific parameters
if parameters:
for key, val in parameters.items():
# Skip None values
if val is None:
continue
# Format parameter
if isinstance(val, bool):
if val: # Only add flag if True
cmd += f" --{key}"
else:
cmd += f" --{key} {val}"
return cmd
def _get_short_plugin_name(self, full_plugin_name: str) -> str:
"""
Convert full plugin class name to short plugin name
Args:
full_plugin_name: e.g., 'volatility3.plugins.windows.pslist.PsList'
Returns:
Short name like 'windows.pslist'
"""
# Split by dots
parts = full_plugin_name.split('.')
# Find 'plugins' in the path
if 'plugins' in parts:
plugins_idx = parts.index('plugins')
# Take everything after 'plugins' except the class name
short_parts = parts[plugins_idx + 1:-1]
return '.'.join(short_parts)
# Fallback: just use the full name
return full_plugin_name
async def get_command_history(self, dump_id: str, limit: int = 50) -> List[Dict[str, Any]]:
"""
Get command execution history for a dump
Args:
dump_id: Dump identifier
limit: Maximum number of commands to return
Returns:
List of command log entries
"""
return await self.db.get_command_history(dump_id, limit)
async def get_provenance_summary(self, dump_id: str) -> str:
"""
Generate a formatted provenance summary
Args:
dump_id: Dump identifier
Returns:
Formatted markdown string with command history
"""
commands = await self.get_command_history(dump_id)
stats = await self.db.get_command_stats(dump_id)
if not commands:
return "No commands executed yet for this dump."
result = "**Command Provenance**\n\n"
result += f"Total commands executed: {stats.get('total_commands', 0)}\n"
if stats.get('failed_commands', 0) > 0:
result += f"Failed commands: {stats.get('failed_commands', 0)}\n"
avg_time = stats.get('avg_execution_time')
if avg_time:
result += f"Average execution time: {int(avg_time)} ms\n"
result += "\n**Volatility Commands Executed:**\n"
for cmd in commands:
result += f" {cmd['command_line']}\n"
if not cmd['success']:
result += f" [FAILED] {cmd.get('error_message', 'Unknown error')}\n"
return result
async def export_provenance_report(self, dump_id: str, output_path: Path,
format: str = 'json'):
"""
Export detailed provenance report
Args:
dump_id: Dump identifier
output_path: Where to write the report
format: 'json', 'csv', or 'txt'
"""
commands = await self.get_command_history(dump_id, limit=1000)
stats = await self.db.get_command_stats(dump_id)
if format == 'json':
data = {
'dump_id': dump_id,
'statistics': stats,
'commands': commands
}
with open(output_path, 'w') as f:
json.dump(data, f, indent=2, default=str)
elif format == 'csv':
import csv
with open(output_path, 'w', newline='') as f:
if commands:
writer = csv.DictWriter(f, fieldnames=commands[0].keys())
writer.writeheader()
writer.writerows(commands)
elif format == 'txt':
with open(output_path, 'w') as f:
f.write(f"Provenance Report for {dump_id}\n")
f.write("=" * 60 + "\n\n")
f.write(f"Total Commands: {stats.get('total_commands', 0)}\n")
f.write(f"Failed Commands: {stats.get('failed_commands', 0)}\n")
f.write(f"Avg Execution Time: {int(stats.get('avg_execution_time', 0))} ms\n\n")
f.write("Commands:\n")
f.write("-" * 60 + "\n")
for cmd in commands:
f.write(f"\n[{cmd['executed_at']}]\n")
f.write(f"Plugin: {cmd['plugin_name']}\n")
f.write(f"Command: {cmd['command_line']}\n")
f.write(f"Time: {cmd.get('execution_time_ms', 0)} ms\n")
f.write(f"Results: {cmd.get('row_count', 0)} rows\n")
if not cmd['success']:
f.write(f"Status: FAILED - {cmd.get('error_message')}\n")
f.write("-" * 60 + "\n")