Skip to content

Commit 0de69a5

Browse files
committed
New integration with the VirusTotal API for scanning hosts collected via the code search API
1 parent 1186acf commit 0de69a5

File tree

1 file changed

+85
-0
lines changed

1 file changed

+85
-0
lines changed

src/gitxray/include/vt_api.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import os, time, requests
2+
3+
class VTRESTAPI:
4+
def __init__(self, gx_output):
5+
self.gx_output = gx_output
6+
self.VT_API_URL = "https://www.virustotal.com/api/v3"
7+
self.VT_API_KEY = os.environ.get("VT_API_KEY", None)
8+
9+
self._PRIVATE_RANGES = [
10+
(0x0A000000, 0x0AFFFFFF), # 10.0.0.0/8
11+
(0xAC100000, 0xAC1FFFFF), # 172.16.0.0/12
12+
(0xC0A80000, 0xC0A8FFFF), # 192.168.0.0/16
13+
(0x7F000000, 0x7FFFFFFF), # 127.0.0.0/8 (loopback)
14+
(0xA9FE0000, 0xA9FEFFFF), # 169.254.0.0/16 (link-local)
15+
(0xE0000000, 0xEFFFFFFF), # 224.0.0.0/4 (multicast)
16+
(0xF0000000, 0xFFFFFFFE), # 240.0.0.0/4 (reserved)
17+
]
18+
19+
20+
def vt_request_json(self, url, max_retries=3):
21+
headers = {"x-apikey": self.VT_API_KEY, "accept": "application/json"}
22+
for attempt in range(1, max_retries + 1):
23+
resp = requests.get(url, headers=headers)
24+
if resp.status_code == 200:
25+
return resp.json()
26+
if resp.status_code == 429:
27+
retry_after = int(resp.headers.get("Retry-After", 60))
28+
self.gx_output.notify(f"\r[VirusTotal] rate limited [you may have met your daily quota]: sleeping {retry_after}s (try {attempt}/{max_retries})")
29+
time.sleep(retry_after)
30+
continue
31+
if resp.status_code == 401:
32+
self.gx_output.warn(f"\r[VirusTotal] VT_API_KEY may be incorrect, getting unauthorized errors.")
33+
break
34+
resp.raise_for_status()
35+
return None
36+
#raise RuntimeError(f"VT API retries exceeded for {url}")
37+
38+
def _ipv4_to_int(self, ip_str):
39+
parts = ip_str.split('.')
40+
if len(parts) != 4:
41+
raise ValueError(f"Invalid IPv4 address: {ip_str!r}")
42+
n = 0
43+
for p in parts:
44+
if not p.isdigit():
45+
raise ValueError(f"Invalid IPv4 octet: {p!r}")
46+
x = int(p)
47+
if x < 0 or x > 255:
48+
raise ValueError(f"IPv4 octet out of range: {p!r}")
49+
n = (n << 8) | x
50+
return n
51+
52+
def is_private_ipv4(self, ip_str):
53+
"""
54+
Returns True if ip_str falls into any of the non-routable IPv4 blocks,
55+
or is the unspecified address 0.0.0.0.
56+
"""
57+
n = self._ipv4_to_int(ip_str)
58+
if n == 0x00000000:
59+
return True
60+
for start, end in self._PRIVATE_RANGES:
61+
if start <= n <= end:
62+
return True
63+
return False
64+
65+
def is_ip_address(self, host: str) -> bool:
66+
# True if every character is a digit or a dot
67+
return bool(host) and all(c.isdigit() or c == '.' for c in host)
68+
69+
def is_testable(self, host):
70+
known_hosts = ["github.com","raw.github.com","api.github.com","gitlab.com","www.google.com","docs.google.com","sheets.google.com","google.com","python.org"]
71+
return "." in host and host not in known_hosts
72+
73+
def host_report(self, domain, debug_enabled=False):
74+
if self.VT_API_KEY:
75+
try:
76+
if not self.is_testable(domain): return None
77+
if not self.is_ip_address(domain):
78+
return self.vt_request_json(f"{self.VT_API_URL}/domains/{domain}")
79+
elif not self.is_private_ipv4(domain):
80+
return self.vt_request_json(f"{self.VT_API_URL}/ip_addresses/{domain}")
81+
except Exception as ex:
82+
if debug_enabled: print(ex)
83+
else: pass
84+
85+
return None

0 commit comments

Comments
 (0)