-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain_forensics.py
More file actions
280 lines (233 loc) · 10.5 KB
/
main_forensics.py
File metadata and controls
280 lines (233 loc) · 10.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
import os
import time
import json
import logging
import requests
import networkx as nx
from datetime import datetime
from pyvis.network import Network
from dotenv import load_dotenv
load_dotenv()
# --- CONFIGURATION & SINK LIST ---
USDT_CONTRACT = "TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t"
API_KEY = os.getenv("TRONSCAN_API_KEY")
# Major Exchange Hot Wallets (Sinks)
SINK_ADDRESSES = {
"TwmTmsVy5hCdSwyth3vWJ1a8uR9g56S8qG": "Binance-Hot-1",
"TAUN6FwrnwwmaEqYcckffC7wYmbaS6c767": "Binance-Hot-2",
"TNPeeaaFB7K9cmo4uQpcU32zGK8G1NYqeL": "Huobi-Hot",
"TM1zzNDZD2DPASbKcgdVoTYhfmYgtfwx9R": "OKX-Hot",
"TLsV52sRDL79HXGGm9yzwKgccDfJrwpaPf": "Kraken-Hot",
"TKzxdSv2FZKQrEqkKVgp5DcwEXBEKMg2Ax": "SunSwap-Router"
}
class TronForensics:
def __init__(self, api_key, cache_file="blockchain_cache.json"):
self.api_key = api_key
self.cache_file = cache_file
self.headers = {"TRON-PRO-API-KEY": api_key}
self.last_call_time = 0
self.graph = nx.DiGraph()
self.cache = self._load_cache()
self.start_address = None # Track the starting point
def _load_cache(self):
if os.path.exists(self.cache_file):
try:
with open(self.cache_file, 'r') as f:
content = f.read().strip()
if not content: # Empty file
return {}
return json.loads(content)
except json.JSONDecodeError as e:
logging.warning(f"Cache file corrupted, starting fresh: {e}")
return {}
return {}
def _save_cache(self):
with open(self.cache_file, 'w') as f:
json.dump(self.cache, f)
def rate_limited_get(self, url, params):
"""Ensures we stay under 5 calls/sec (0.2s delay)."""
elapsed = time.time() - self.last_call_time
if elapsed < 0.25: # Slightly more than 0.2 to be safe
time.sleep(0.25 - elapsed)
response = requests.get(url, params=params, headers=self.headers)
self.last_call_time = time.time()
if response.status_code == 429:
logging.warning("Rate limit hit! Cooling down...")
time.sleep(5)
return self.rate_limited_get(url, params)
return response.json()
def get_usdt_transfers(self, address, direction="out"):
"""Fetches transfers with dust filtering and caching."""
if address in self.cache:
return self.cache[address]
url = "https://apilist.tronscan.org/api/token_trc20/transfers"
params = {
"limit": 50, # High limit to save calls
"start": 0,
"contract_address": USDT_CONTRACT,
"relatedAddress": address
}
data = self.rate_limited_get(url, params)
transfers = data.get("token_transfers", [])
# Clean & Filter Data
results = []
for tx in transfers:
amount = float(tx.get("quant", 0)) / 1_000_000
# EDGE CASE 1: Filter Dust/Spam (Less than 5 USDT is usually noise)
if amount < 5.0: continue
# EDGE CASE 2: Ensure it's the real USDT contract
if tx.get("contract_address") != USDT_CONTRACT: continue
results.append({
"from": tx["from_address"],
"to": tx["to_address"],
"amount": amount,
"ts": tx["block_ts"]
})
self.cache[address] = results
self._save_cache()
return results
def trace(self, start_address, depth=3):
self.start_address = start_address # Remember the starting point
queue = [(start_address, 0)]
visited = set()
while queue:
addr, current_depth = queue.pop(0)
if current_depth >= depth or addr in SINK_ADDRESSES or addr in visited:
continue
visited.add(addr)
transfers = self.get_usdt_transfers(addr)
for tx in transfers:
# Add to graph
self.graph.add_edge(tx["from"], tx["to"], weight=tx["amount"])
# Recursive step
if tx["to"] not in visited:
queue.append((tx["to"], current_depth + 1))
def export_interactive(self, filename="ponzi_map.html"):
net = Network(
height="900px",
width="100%",
bgcolor="#0a0a0a",
font_color="white",
directed=True,
heading="🕵️ USDT Money Flow Analysis"
)
# Better physics for clearer visualization
net.barnes_hut(
gravity=-80000,
central_gravity=0.3,
spring_length=200,
spring_strength=0.001,
damping=0.09
)
for node in self.graph.nodes():
# Default: Intermediate wallet
color = "#3498db" # Blue
size = 25
shape = "dot"
label = f"Wallet\n{node[:6]}...{node[-6:]}"
title = f"<b>Intermediate Wallet</b><br>{node}<br>Click to view on Tronscan"
border_color = "#2980b9"
# Starting address (Suspect)
if node == self.start_address:
color = "#e74c3c" # Red
size = 40
shape = "star"
label = f"🎯 SUSPECT\n{node[:6]}...{node[-6:]}"
title = f"<b>⚠️ INVESTIGATION TARGET</b><br>{node}<br>This is where the trace started"
border_color = "#c0392b"
# Exchange/Sink address
elif node in SINK_ADDRESSES:
color = "#2ecc71" # Green
size = 35
shape = "box"
label = f"🏦 {SINK_ADDRESSES[node]}\n{node[:6]}..."
title = f"<b>💰 EXCHANGE WALLET</b><br>{SINK_ADDRESSES[node]}<br>{node}<br>Money likely cashed out here"
border_color = "#27ae60"
net.add_node(
node,
label=label,
title=title,
color={"background": color, "border": border_color},
size=size,
shape=shape,
borderWidth=3,
font={"size": 14, "color": "white", "bold": True}
)
# Add edges with better styling
for u, v, data in self.graph.edges(data=True):
amount = data['weight']
# Make edge thickness proportional to amount
width = min(1 + (amount / 1000), 10) # Cap at 10px
# Color code by amount
if amount > 10000:
edge_color = "#e74c3c" # Red for large amounts
elif amount > 1000:
edge_color = "#f39c12" # Orange for medium
else:
edge_color = "#95a5a6" # Gray for small
label = f"${amount:,.2f}"
title = f"<b>Transfer: ${amount:,.2f} USDT</b><br>From: {u}<br>To: {v}"
net.add_edge(
u,
v,
value=width,
title=title,
label=label,
color=edge_color,
arrows={"to": {"enabled": True, "scaleFactor": 1.2}},
font={"size": 12, "color": "white", "strokeWidth": 0}
)
# Add custom legend and instructions
legend_html = """
<div style="position: fixed; top: 80px; left: 20px; background: rgba(0,0,0,0.85);
padding: 20px; border-radius: 10px; border: 2px solid #3498db;
font-family: Arial; color: white; z-index: 1000; max-width: 300px;">
<h3 style="margin-top: 0; color: #3498db;">📊 Legend</h3>
<div style="margin: 10px 0;">
<span style="color: #e74c3c; font-size: 20px;">⭐</span>
<strong style="color: #e74c3c;">RED STAR</strong> = Investigation Target (Suspect)
</div>
<div style="margin: 10px 0;">
<span style="color: #3498db; font-size: 20px;">●</span>
<strong style="color: #3498db;">BLUE CIRCLES</strong> = Intermediate Wallets
</div>
<div style="margin: 10px 0;">
<span style="color: #2ecc71; font-size: 20px;">■</span>
<strong style="color: #2ecc71;">GREEN BOXES</strong> = Exchange Wallets (Cash-out points)
</div>
<hr style="border-color: #34495e;">
<h4 style="color: #f39c12; margin-bottom: 5px;">💸 Transfer Amounts</h4>
<div style="font-size: 12px; line-height: 1.6;">
<span style="color: #e74c3c;">●</span> Red arrow = Over $10,000<br>
<span style="color: #f39c12;">●</span> Orange arrow = $1,000 - $10,000<br>
<span style="color: #95a5a6;">●</span> Gray arrow = Under $1,000
</div>
<hr style="border-color: #34495e;">
<h4 style="color: #3498db; margin-bottom: 5px;">🎮 Controls</h4>
<div style="font-size: 12px; line-height: 1.6;">
• <strong>Drag</strong> nodes to rearrange<br>
• <strong>Scroll</strong> to zoom in/out<br>
• <strong>Click</strong> nodes for details<br>
• <strong>Hover</strong> over arrows for amounts
</div>
</div>
"""
# Write HTML without opening browser
net.write_html(filename, open_browser=False, notebook=False)
# Add legend to the HTML file
with open(filename, 'r', encoding='utf-8') as f:
html_content = f.read()
# Insert legend before closing body tag
html_content = html_content.replace('</body>', f'{legend_html}</body>')
with open(filename, 'w', encoding='utf-8') as f:
f.write(html_content)
print(f"\n✅ Investigation complete! Open '{filename}' in your browser to view the network.")
print(f"📍 Starting point (Suspect): {self.start_address}")
print(f"🔗 Total wallets traced: {len(self.graph.nodes())}")
print(f"💸 Total transfers tracked: {len(self.graph.edges())}")
if __name__ == "__main__":
investigator = TronForensics(api_key=API_KEY)
# Replace with actual suspect address
target = "TYCR4XDMvFZ6LXCkSaMWYnULZeeod8GCAM"
investigator.trace(target, depth=2)
investigator.export_interactive()