diff --git a/.gitignore b/.gitignore index b06e05c..91440de 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,4 @@ -app/env \ No newline at end of file +app/env +__pycache__/ +*.pyc +*.pyo \ No newline at end of file diff --git a/app/.env.example b/app/.env.example new file mode 100644 index 0000000..28ac9b7 --- /dev/null +++ b/app/.env.example @@ -0,0 +1,5 @@ +# Demo mode (returns mock data, no Raspberry Pi hardware required) +# HACKMASTER_DEMO_MODE=true + +# Production mode (default, connects to real hardware) +HACKMASTER_DEMO_MODE=false diff --git a/app/api/BLE.py b/app/api/BLE.py index 78406cc..d909648 100644 --- a/app/api/BLE.py +++ b/app/api/BLE.py @@ -1,34 +1,32 @@ -from fastapi import APIRouter, Request, HTTPException +import os +from fastapi import APIRouter, Request, HTTPException, Depends from fastapi.responses import HTMLResponse from fastapi.templating import Jinja2Templates -import subprocess -import os -import signal -import psutil -from .mylib.beacon import beacon_emulator -import json -from pathlib import Path - -PROFILES_FILE = Path("data/beacon_profiles.json") -PROFILES_FILE.parent.mkdir(exist_ok=True) -# 確保設定檔檔案存在 -if not PROFILES_FILE.exists(): - PROFILES_FILE.write_text("[]") +from services.ble_service import BLEService +from services.real.ble_real import BLERealService router = APIRouter( prefix="/BLE", tags=["BLE"] ) -running_process = None +BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +templates = Jinja2Templates(directory=os.path.join(BASE_DIR, "templates")) + + +# ---------- Dependency ---------- -templates = Jinja2Templates(directory="templates") +def get_ble_service() -> BLEService: + return BLERealService() + + +# ---------- HTML page routes ---------- @router.get("/beacon-scanner", response_class=HTMLResponse) -def read_airpods_emulator(request: Request): +def read_beacon_scanner(request: Request): return templates.TemplateResponse( - "BLE/beacon-scanner.html", + "BLE/beacon-scanner.html", {"request": request, "message": "Beacon Scanner"} ) @@ -39,171 +37,66 @@ def read_beacon_storage(request: Request): {"request": request, "message": "Beacon Storage"} ) -@router.get("/beacon-storage/profiles") -def get_profiles(): - profiles = json.loads(PROFILES_FILE.read_text()) - return profiles - -@router.post("/beacon-storage/profiles") -async def add_profile(profile: dict): - profiles = json.loads(PROFILES_FILE.read_text()) - profiles.append(profile) - PROFILES_FILE.write_text(json.dumps(profiles, indent=2)) - return {"status": "success"} - -@router.delete("/beacon-storage/profiles/{name}") -async def delete_profile(name: str): - profiles = json.loads(PROFILES_FILE.read_text()) - profiles = [p for p in profiles if p["name"] != name] - PROFILES_FILE.write_text(json.dumps(profiles, indent=2)) - return {"status": "success"} - @router.get("/beacon-emulator", response_class=HTMLResponse) def read_beacon_emulator(request: Request): return templates.TemplateResponse( - "BLE/beacon-emulator.html", + "BLE/beacon-emulator.html", {"request": request, "message": "Beacon Emulator"} ) -# 在文件頂部的 import 部分之後添加一個變量來跟踪 beacon 模擬器的狀態 -beacon_emulator_active = False +@router.get("/airpods-emulator", response_class=HTMLResponse) +def read_airpods_emulator_page(request: Request): + return templates.TemplateResponse( + "BLE/airpods-emulator.html", + {"request": request, "message": "Wordlist Generator"} + ) + + +# ---------- API routes ---------- + +@router.get("/beacon-storage/profiles") +def get_profiles(service: BLEService = Depends(get_ble_service)): + return service.get_profiles() + +@router.get("/beacon-scanner/scan") +async def scan_beacons(duration: int = 5, service: BLEService = Depends(get_ble_service)): + return await service.scan_beacons(duration) + +@router.post("/beacon-storage/profiles") +async def add_profile(profile: dict, service: BLEService = Depends(get_ble_service)): + return await service.add_profile(profile) + +@router.delete("/beacon-storage/profiles/{name}") +async def delete_profile(name: str, service: BLEService = Depends(get_ble_service)): + return await service.delete_profile(name) -# 修改 start_beacon_emulator 函數來設置狀態標誌 @router.post("/beacon-emulator/start") -async def start_beacon_emulator(data: dict): - global beacon_emulator_active - - profiles = json.loads(PROFILES_FILE.read_text()) - profile = next((p for p in profiles if p["name"] == data["profile_name"]), None) - - if not profile: - raise HTTPException(status_code=404, detail="Profile not found") - - beacon_emulator.start_ibeacon( - uuid=profile["uuid"], - major=profile["major"], - minor=profile["minor"], - power=profile["power"] - ) - - # 設置 beacon 模擬器狀態為活動 - beacon_emulator_active = True - - return {"status": "started", "profile": profile["name"]} +async def start_beacon_emulator(data: dict, service: BLEService = Depends(get_ble_service)): + result = await service.start_beacon_emulator(data.get("profile_name", "")) + if result.get("success") == False: + raise HTTPException(status_code=404, detail=result.get("message", "Not found")) + return result -# 修改 stop_beacon_emulator 函數來更新狀態標誌 @router.post("/beacon-emulator/stop") -async def stop_beacon_emulator(): - global beacon_emulator_active - - beacon_emulator.stop_ibeacon() - - # 設置 beacon 模擬器狀態為停止 - beacon_emulator_active = False - - return {"status": "stopped"} - -# 添加新的狀態檢查路由 -@router.get("/beacon-emulator/status") -async def get_beacon_emulator_status(): - global beacon_emulator_active - - if beacon_emulator_active: - return {"status": "running"} - else: - return {"status": "not_running"} +async def stop_beacon_emulator(service: BLEService = Depends(get_ble_service)): + return await service.stop_beacon_emulator() -@router.get("/airpods-emulator", response_class=HTMLResponse) -def read_airpods_emulator(request: Request): - return templates.TemplateResponse( - "BLE/airpods-emulator.html", - {"request": request, "message": "Wordlist Generator"} - ) +@router.get("/beacon-emulator/status") +async def get_beacon_emulator_status(service: BLEService = Depends(get_ble_service)): + return await service.get_beacon_emulator_status() @router.post("/airpods-emulator/start") -async def start_airpods_scan(): - global running_process - - if running_process and running_process.poll() is None: - return {"status": "already_running", "pid": running_process.pid} - - try: - # 獲取當前環境變數 - env = os.environ.copy() - - # 使用 sudo -E 保留環境變數 - cmd = ["sudo", "-E", "python3", "api/mylib/apple_bleee/adv_airpods.py"] - - # 啟動進程 - with open("airpods_output.log", "w") as out_file, open("airpods_error.log", "w") as err_file: - process = subprocess.Popen( - cmd, - stdout=out_file, - stderr=err_file, - text=True, - env=env, - preexec_fn=os.setsid - ) - - running_process = process - return {"status": "started", "pid": process.pid} - except Exception as e: - raise HTTPException(status_code=500, detail=f"Failed to start: {str(e)}") +async def start_airpods_scan(service: BLEService = Depends(get_ble_service)): + return await service.start_airpods_emulator() @router.post("/airpods-emulator/stop") -async def stop_airpods_scan(): - global running_process - - if not running_process: - return {"status": "not_running"} - - try: - # 獲取進程 PID - pid = running_process.pid - - # 終止主進程及其子進程 - parent = psutil.Process(pid) - children = parent.children(recursive=True) - - for child in children: - child.terminate() - - # 終止主進程 - parent.terminate() - - # 確保進程已終止 - gone, alive = psutil.wait_procs([parent], timeout=3) - for p in alive: - p.kill() - - running_process = None - return {"status": "stopped", "pid": pid} - except Exception as e: - raise HTTPException(status_code=500, detail=f"Failed to stop: {str(e)}") +async def stop_airpods_scan(service: BLEService = Depends(get_ble_service)): + return await service.stop_airpods_emulator() @router.get("/airpods-emulator/status") -async def get_status(): - global running_process - - if running_process and running_process.poll() is None: - return {"status": "running", "pid": running_process.pid} - else: - return {"status": "not_running"} - +async def get_status(service: BLEService = Depends(get_ble_service)): + return await service.get_airpods_emulator_status() + @router.get("/airpods-emulator/logs") -async def get_logs(): - try: - error_content = "" - output_content = "" - - if os.path.exists("airpods_error.log"): - with open("airpods_error.log", "r") as error_file: - error_content = error_file.read() - - if os.path.exists("airpods_output.log"): - with open("airpods_output.log", "r") as output_file: - output_content = output_file.read() - - return {"output": output_content, "errors": error_content} - except Exception as e: - raise HTTPException(status_code=500, detail=f"Failed to read logs: {str(e)}") \ No newline at end of file +async def get_logs(service: BLEService = Depends(get_ble_service)): + return await service.get_airpods_logs() diff --git a/app/api/IR.py b/app/api/IR.py index 59cb333..ab1810d 100644 --- a/app/api/IR.py +++ b/app/api/IR.py @@ -1,32 +1,37 @@ -# 建議的後端 API 實現 (IR.py) - -from fastapi import APIRouter, Request, BackgroundTasks -from fastapi.responses import HTMLResponse, JSONResponse +import os +from fastapi import APIRouter, Request, Depends +from fastapi.responses import HTMLResponse from fastapi.templating import Jinja2Templates from pydantic import BaseModel -from typing import Dict, Optional, List -import asyncio -import json -import time -from datetime import datetime -import os +from typing import Optional + +from services.ir_service import IRService +from services.real.ir_real import IRRealService router = APIRouter( prefix="/IR", tags=["IR"] ) -templates = Jinja2Templates(directory="templates") +BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +templates = Jinja2Templates(directory=os.path.join(BASE_DIR, "templates")) + -# 紅外線錄製狀態變數 -ir_recording = False -ir_signal = None -ir_record_start_time = None +# ---------- Pydantic models ---------- class TransmitRequest(BaseModel): signalData: str format: Optional[str] = "RAW" + +# ---------- Dependency ---------- + +def get_ir_service() -> IRService: + return IRRealService() + + +# ---------- HTML page routes ---------- + @router.get("/signal-learner", response_class=HTMLResponse) def read_signal_learner(request: Request): return templates.TemplateResponse( @@ -41,147 +46,30 @@ def read_signal_enumerator(request: Request): {"request": request, "message": "IR Signal Enumerator"} ) + +# ---------- API routes ---------- + @router.post("/record") -async def record_ir_signal(): - """ - 開始錄製紅外線訊號 - """ - global ir_recording, ir_signal, ir_record_start_time - - try: - # 檢查是否已在錄製 - if ir_recording: - return {"success": False, "message": "Already recording"} - - # 這裡實現啟動 IR 接收器的代碼 - # 以下僅為模擬,實際使用需要連接硬體 - - # 更新狀態 - ir_recording = True - ir_signal = None - ir_record_start_time = time.time() - - # 啟動背景任務等待訊號 - # 在實際應用中,這應該與硬體互動 - asyncio.create_task(simulate_ir_reception()) - - return {"success": True, "message": "IR recording started"} - except Exception as e: - ir_recording = False - return {"success": False, "message": str(e)} +async def record_ir_signal(service: IRService = Depends(get_ir_service)): + return await service.start_recording() @router.post("/record/cancel") -async def cancel_ir_recording(): - """ - 取消錄製紅外線訊號 - """ - global ir_recording, ir_signal - - try: - # 更新狀態 - ir_recording = False - ir_signal = None - - return {"success": True, "message": "Recording cancelled"} - except Exception as e: - return {"success": False, "message": str(e)} +async def cancel_ir_recording(service: IRService = Depends(get_ir_service)): + return await service.cancel_recording() @router.get("/status") -async def get_ir_status(): - """ - 獲取紅外線錄製狀態 - """ - global ir_recording, ir_signal, ir_record_start_time - - # 檢查是否超時 - if ir_recording and ir_record_start_time: - if time.time() - ir_record_start_time > 15: # 15秒超時 - ir_recording = False - return {"status": "timeout", "message": "Recording timed out"} - - if ir_recording: - return {"status": "recording"} - elif ir_signal: - return { - "status": "completed", - "signal": ir_signal - } - else: - return {"status": "idle"} +async def get_ir_status(service: IRService = Depends(get_ir_service)): + return await service.get_status() @router.post("/transmit") -async def transmit_ir_signal(request: TransmitRequest): - """ - 發送紅外線訊號 - """ - try: - # 這裡實現發送 IR 訊號的代碼 - # 以下僅為模擬,實際使用需要連接硬體 - - # 模擬發送延遲 - await asyncio.sleep(1) - - return {"success": True, "message": "Signal transmitted successfully"} - except Exception as e: - return {"success": False, "message": str(e)} - -# 模擬紅外線接收 -async def simulate_ir_reception(): - global ir_recording, ir_signal - - # 模擬 3 秒後接收到訊號 - await asyncio.sleep(3) - - # 如果仍在錄製狀態,則模擬接收到訊號 - if ir_recording: - ir_signal = { - "data": "010101010110101010100101010101011010101010010101010101101010", - "format": "NEC", - "length": 32 - } - ir_recording = False +async def transmit_ir_signal(request: TransmitRequest, service: IRService = Depends(get_ir_service)): + return await service.transmit(request.signalData, request.format) @router.post("/enumerate") -async def enumerate_ir_code(data: dict): - """ - 枚舉並發送紅外線代碼,嘗試控制設備 - """ - try: - device_type = data.get("device_type") - brand = data.get("brand", "") - protocol = data.get("protocol", "all") - function = data.get("function") - code_index = data.get("code_index", 0) - - # 在實際應用中,這裡應該根據設備類型、品牌和功能生成特定的IR代碼 - # 然後通過紅外線發射器發送 - - # 模擬發送代碼 - await asyncio.sleep(0.1) - - # 模擬代碼,實際應用需改為真實生成 - code_hex = generate_ir_code(device_type, brand, protocol, function, code_index) - - # 在實際應用中,這裡應該檢測設備是否響應了信號 - # 可通過額外的傳感器或用戶確認 - - # 模擬隨機響應(實際應用中需要真實響應檢測) - response_detected = random.random() < 0.05 # 5% 的概率模擬成功 - - return { - "success": True, - "code_index": code_index, - "code_hex": code_hex, - "protocol": protocol if protocol != "all" else random.choice(["NEC", "SONY", "RC5", "RC6", "SAMSUNG"]), - "response": response_detected, - "bits": 32 # 大多數紅外線代碼的長度 - } - except Exception as e: - return {"success": False, "message": str(e)} - -# 輔助函數:生成紅外線代碼(示例) -def generate_ir_code(device_type, brand, protocol, function, index): - # 實際應用需要使用真實代碼數據庫或算法生成有效代碼 - # 這只是一個簡單的示例實現 - base_value = index * 37 + ord(function[0]) # 簡單算法生成不同代碼 - return format(base_value % 65536, '04X') + format((base_value * 17) % 65536, '04X') \ No newline at end of file +async def enumerate_ir_code(data: dict, service: IRService = Depends(get_ir_service)): + device_type = data.get("device_type", "") + brand = data.get("brand", "") + protocol = data.get("protocol", "all") + function = data.get("function", "") + code_index = data.get("code_index", 0) + return await service.enumerate_code(device_type, brand, protocol, function, code_index) diff --git a/app/api/RFID.py b/app/api/RFID.py index c815b7d..21dbefb 100644 --- a/app/api/RFID.py +++ b/app/api/RFID.py @@ -1,131 +1,76 @@ -from fastapi import APIRouter, Request -from fastapi.responses import HTMLResponse, FileResponse, JSONResponse +import os +from fastapi import APIRouter, Request, Depends +from fastapi.responses import HTMLResponse, JSONResponse from fastapi.templating import Jinja2Templates from pydantic import BaseModel from typing import Dict, List -from .mylib.WeakPasswordGenerater.main import PasswordGenerator -from .mylib.RFIDlib import main as RFIDlib -from .mylib.defense.defense_manager import DefenseManager -import os -import time -import json -import asyncio -from datetime import datetime -import random -import binascii + +from services.rfid_service import RFIDService +from services.real.rfid_real import RFIDRealService router = APIRouter( prefix="/RFID", tags=["RFID"] ) -templates = Jinja2Templates(directory="templates") +BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +templates = Jinja2Templates(directory=os.path.join(BASE_DIR, "templates")) -RFID_DATA_DIR = "data/rfid" -# 定義寫入請求模型 +# ---------- Pydantic models ---------- + class WriteCardRequest(BaseModel): card_data: Dict save_to_db: bool = False -# 定義請求模型 class SaveCardsRequest(BaseModel): cards: List[Dict] filename: str -# 模擬狀態變數,實際應用中可能需要更複雜的狀態管理 -emulation_active = False -emulation_data = None -emulation_start_time = None -read_attempts = 0 -# 定義模擬請求模型 -class EmulationRequest(BaseModel): - uid: str - type: str - data: str +# ---------- Dependency ---------- + +def get_rfid_service() -> RFIDService: + return RFIDRealService() + + +# ---------- HTML page routes ---------- @router.get("/identify-rfid", response_class=HTMLResponse) -def identify_rfid(request: Request): +def identify_rfid_page(request: Request): return templates.TemplateResponse( "RFID/identify-rfid.html", {"request": request, "message": "Identify RFID Card"} ) @router.get("/write-uid", response_class=HTMLResponse) -def write_uid(request: Request): +def write_uid_page(request: Request): return templates.TemplateResponse( "RFID/write-uid.html", {"request": request, "message": "Write UID to HF RFID CUID Card"} ) + +# ---------- API routes ---------- + @router.post("/setup-pn532") -async def setup_pn532(request: Request): - try: - result = RFIDlib.setup() - return JSONResponse(content=result) - except Exception as e: - return JSONResponse(content={"success": False, "error": str(e)}) +async def setup_pn532(service: RFIDService = Depends(get_rfid_service)): + result = await service.setup_pn532() + return JSONResponse(content=result) @router.post("/identify-rfid") -async def identify_rfid_post(request: Request): - try: - found = RFIDlib.iso14443a_identify() - while not found["success"]: - await asyncio.sleep(0.1) - found = RFIDlib.iso14443a_identify() - return JSONResponse(content=found) - except Exception as e: - return JSONResponse(content={"success": False, "error": str(e)}) - +async def identify_rfid_post(service: RFIDService = Depends(get_rfid_service)): + result = await service.identify_rfid() + return JSONResponse(content=result) + @router.post("/write-uid") -async def write_uid_post(request: Request, write_request: WriteCardRequest): - try: - new_uid_hex = write_request.card_data.get("uid") - if not new_uid_hex: - return JSONResponse(content={"success": False, "error": "UID is required"}) - - if len(new_uid_hex) != 8: - print("❌ UID UID must be 8 hexadecimal characters") - return JSONResponse(content={"success": False, "error": "UID must be 8 hexadecimal characters"}) - - try: - binascii.unhexlify(new_uid_hex) - except: - print("❌ UID contains invalid characters") - return JSONResponse(content={"success": False, "error": "UID contains invalid characters"}) - - result = RFIDlib.write_uid(new_uid_hex) - return JSONResponse(content=result) - - except Exception as e: - return JSONResponse(content={"success": False, "error": str(e)}) +async def write_uid_post(write_request: WriteCardRequest, service: RFIDService = Depends(get_rfid_service)): + result = await service.write_uid(write_request.card_data, write_request.save_to_db) + return JSONResponse(content=result) @router.post("/analyze") -async def analyze_rfid(request: Request): - """ - 分析 RFID 卡片的安全性 - """ - try: - data = await request.json() - card_info = data.get("card_info", {}) - - # 創建 DefenseManager 實例 - defense_manager = DefenseManager() - - # 執行 RFID 防禦分析 - result = defense_manager.run_rfid_defense(card_info) - - return JSONResponse(content={ - "success": True, - "module": result["module"], - "issues": result["issues"], - "threat": result["threat"] - }) - except Exception as e: - return JSONResponse(content={ - "success": False, - "error": str(e), - "issues": [], - "threat": {"score": 0, "status": "UNKNOWN"} - }) \ No newline at end of file +async def analyze_rfid(request: Request, service: RFIDService = Depends(get_rfid_service)): + data = await request.json() + card_info = data.get("card_info", {}) + result = await service.analyze_rfid(card_info) + return JSONResponse(content=result) diff --git a/app/api/WiFi.py b/app/api/WiFi.py index df38862..b0123d6 100644 --- a/app/api/WiFi.py +++ b/app/api/WiFi.py @@ -1,43 +1,24 @@ -from fastapi import APIRouter, Request, HTTPException, BackgroundTasks -from fastapi.responses import HTMLResponse, FileResponse, JSONResponse +import os +from fastapi import APIRouter, Request, BackgroundTasks, Depends +from fastapi.responses import HTMLResponse, JSONResponse from fastapi.templating import Jinja2Templates from pydantic import BaseModel from typing import Optional, List, Dict -import os -import subprocess -import time -import uuid -import json -from .mylib.WeakPasswordGenerater.main import PasswordGenerator -from .mylib.ap_scan import scan_wifi_networks -from .mylib.defense.defense_manager import DefenseManager -import random -import asyncio -import csv -import glob -import re -from datetime import datetime + +from services.wifi_service import WiFiService +from services.real.wifi_real import WiFiRealService router = APIRouter( prefix="/WiFi", tags=["WiFi"] ) -templates = Jinja2Templates(directory="templates") +BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +templates = Jinja2Templates(directory=os.path.join(BASE_DIR, "templates")) -# AP 狀態變數 -ap_running = False -ap_start_time = None -ap_config = None -capture_active = False -capture_process = None -capture_file = None -connected_clients = [] -# 全域網卡名稱列表 -network_adapters = [] +# ---------- Pydantic models ---------- -# 定義 AP 配置模型 class APConfig(BaseModel): ssid: str security: str @@ -48,7 +29,6 @@ class APConfig(BaseModel): internet_sharing: bool = False mac_address: Optional[str] = None -# 定義捕獲文件模型 class CaptureFile(BaseModel): id: str filename: str @@ -56,38 +36,29 @@ class CaptureFile(BaseModel): size: int timestamp: str -# 儲存的捕獲文件 -capture_files = [] - -# 定義請求模型 class ScanRequest(BaseModel): bands: Dict[str, bool] show_hidden: bool = False -# 定義網路介面請求模型 class NetworkInterfaceRequest(BaseModel): interface: str -# 定義掃描請求模型 class ScanWifiRequest(BaseModel): interface: str timeout: int = 10 -# 定義監聽握手包請求模型 class CaptureRequest(BaseModel): interface: str bssid: str channel: int output_file: Optional[str] = None -# 定義斷線訊號請求模型 class DeauthRequest(BaseModel): interface: str bssid: str - packets: int = 10 # 預設 10 個封包 + packets: int = 10 broadcast: bool = True -# 定義破解請求模型 class CrackRequest(BaseModel): capture_file: str wordlist: str @@ -95,25 +66,30 @@ class CrackRequest(BaseModel): attack_mode: Optional[str] = "0" ssid: Optional[str] = None -# 定義請求模型 class WordlistRequest(BaseModel): output_filename: str info_data: Dict[str, List[str]] -# 定義頻道設定請求模型 class ChannelRequest(BaseModel): interface: str channel: str -# 定義檢查握手包請求模型 class HandshakeCheckRequest(BaseModel): - capture_file: Optional[str] = "deauth_handshake-01.cap" # airodump-ng 會自動加上 -01 + capture_file: Optional[str] = "deauth_handshake-01.cap" -# 定義密碼破解請求模型 class CrackPasswordRequest(BaseModel): capture_file: Optional[str] = "deauth_handshake-01.cap" wordlist_file: str + +# ---------- Dependency ---------- + +def get_wifi_service() -> WiFiService: + return WiFiRealService() + + +# ---------- HTML page routes ---------- + @router.get("/ap-emulator", response_class=HTMLResponse) def read_ap_emulator(request: Request): return templates.TemplateResponse( @@ -142,898 +118,89 @@ def read_wordlist_generator(request: Request): {"request": request, "message": "Wordlist Generator"} ) -@router.get("/interface/details") -async def list_adapters(request: Request): - """ - 執行 ifconfig -a 命令並返回所有網路介面的詳細資訊 - """ - global network_adapters - - try: - # 執行 ifconfig -a 命令 - result = subprocess.run( - ["ifconfig", "-a"], - capture_output=True, text=True, timeout=10 - ) - - if result.returncode == 0: - # 截取網卡名稱 - network_adapters = extract_adapter_names(result.stdout) - - return { - "success": True, - "output": result.stdout, - "message": "Network adapters listed successfully", - "adapters": network_adapters - } - else: - return { - "success": False, - "message": f"Failed to execute ifconfig: {result.stderr}", - "output": result.stderr, - "adapters": [] - } - - except subprocess.TimeoutExpired: - return { - "success": False, - "message": "Command timed out", - "output": "", - "adapters": [] - } - except FileNotFoundError: - return { - "success": False, - "message": "ifconfig command not found. Please ensure net-tools is installed.", - "output": "", - "adapters": [] - } - except Exception as e: - return { - "success": False, - "message": f"Error executing ifconfig: {str(e)}", - "output": "", - "adapters": [] - } -def extract_adapter_names(ifconfig_output): - """ - 從 ifconfig 輸出中截取網卡名稱 - """ - adapter_names = [] - lines = ifconfig_output.split('\n') - - for line in lines: - # 網卡名稱通常在行的開頭,後面跟著冒號或空格 - # 例如: "eth0: flags=4163 mtu 1500" - # 或: "wlan0 Link encap:Ethernet HWaddr" - if line and not line.startswith(' ') and not line.startswith('\t'): - # 提取網卡名稱 (在冒號或空格之前) - if ':' in line: - adapter_name = line.split(':')[0].strip() - else: - # 處理某些系統中沒有冒號的情況 - parts = line.split() - if parts: - adapter_name = parts[0].strip() - else: - continue - - # 過濾掉空字串和無效名稱 - if adapter_name and adapter_name.isalnum() or any(c in adapter_name for c in ['-', '_']): - adapter_names.append(adapter_name) - - return adapter_names +# ---------- API routes ---------- + +@router.get("/interface/details") +async def list_adapters(service: WiFiService = Depends(get_wifi_service)): + return await service.get_interface_details() @router.get("/interface/list") -async def get_adapter_names(): - """ - 返回已截取的網卡名稱列表 - """ - global network_adapters - - return { - "success": True, - "adapters": network_adapters, - "count": len(network_adapters) - } +async def get_adapter_names(service: WiFiService = Depends(get_wifi_service)): + return await service.get_interface_list() @router.post("/interface/monitorMode") -async def activate_monitor_mode(request: NetworkInterfaceRequest): - """ - 啟用指定網路介面的監聽模式 - """ - try: - # 執行 sudo ifconfig {interface_name} up - up_result = subprocess.run( - ["sudo", "ifconfig", request.interface, "up"], - capture_output=True, text=True, timeout=10 - ) - - if up_result.returncode != 0: - return { - "success": False, - "message": f"Failed to bring up interface {request.interface}", - "error": up_result.stderr - } - - # 執行 sudo iwconfig {interface_name} mode monitor - monitor_result = subprocess.run( - ["sudo", "iwconfig", request.interface, "mode", "monitor"], - capture_output=True, text=True, timeout=10 - ) - - if monitor_result.returncode != 0: - return { - "success": False, - "message": f"Failed to set monitor mode for {request.interface}", - "error": monitor_result.stderr - } - - return { - "success": True, - "message": f"Monitor mode activated successfully for {request.interface}", - "interface": request.interface - } - - except subprocess.TimeoutExpired: - return { - "success": False, - "message": "Command timed out", - "interface": request.interface - } - except Exception as e: - return { - "success": False, - "message": f"Error activating monitor mode: {str(e)}", - "interface": request.interface - } +async def activate_monitor_mode(request: NetworkInterfaceRequest, service: WiFiService = Depends(get_wifi_service)): + return await service.set_monitor_mode(request.interface) @router.get("/interface/status") -async def get_interface_status(interface: str): - """ - 取得指定網路介面的狀態,特別是其模式 - """ - try: - # 執行 iwconfig 命令 - result = subprocess.run( - ["iwconfig", interface], - capture_output=True, text=True, timeout=10 - ) - - if result.returncode != 0: - return { - "success": False, - "message": f"Failed to get status for interface {interface}", - "error": result.stderr, - "interface": interface - } - - output = result.stdout - mode = "Unknown" - - # 從 iwconfig 輸出中擷取模式 - if "Mode:Monitor" in output: - mode = "Monitor" - elif "Mode:Managed" in output: - mode = "Managed" - elif "Mode:Master" in output: - mode = "Master" - elif "Mode:Ad-Hoc" in output: - mode = "Ad-Hoc" - elif "no wireless extensions" in output.lower(): - return { - "success": False, - "message": f"Interface {interface} is not a wireless interface", - "interface": interface - } - - return { - "success": True, - "interface": interface, - "mode": mode, - "status": f"{mode} mode", - "output": output - } - - except subprocess.TimeoutExpired: - return { - "success": False, - "message": "Command timed out", - "interface": interface - } - except Exception as e: - return { - "success": False, - "message": f"Error getting interface status: {str(e)}", - "interface": interface - } +async def get_interface_status(interface: str, service: WiFiService = Depends(get_wifi_service)): + return await service.get_interface_status(interface) @router.post("/ap/scan") -async def scan_wifi(request: ScanWifiRequest): - try: - # 使用 asyncio 在執行緒池中運行掃描,避免阻塞事件循環 - loop = asyncio.get_event_loop() - nearby_ap = await loop.run_in_executor( - None, - scan_wifi_networks, - request.interface, - request.timeout - ) - - return { - "success": True, - "ap_list": nearby_ap, - "interface": request.interface, - "count": len(nearby_ap) - } - except Exception as e: - return { - "success": False, - "message": f"Failed to scan networks: {str(e)}", - "ap_list": [], - "interface": request.interface, - "count": 0 - } - -@router.post('/interface/channel') -async def set_interface_channel(request: ChannelRequest): - """ - 設定指定網路介面的頻道 - """ - try: - # 執行 sudo iwconfig {interface_name} channel {channel} - result = subprocess.run( - ["sudo", "iwconfig", request.interface, "channel", request.channel], - capture_output=True, text=True, timeout=10 - ) - - if result.returncode != 0: - return { - "success": False, - "message": f"Failed to set channel {request.channel} for {request.interface}", - "error": result.stderr - } - - return { - "success": True, - "message": f"Channel {request.channel} set successfully for {request.interface}", - "interface": request.interface, - "channel": request.channel - } - - except subprocess.TimeoutExpired: - return { - "success": False, - "message": "Command timed out", - "interface": request.interface, - "channel": request.channel - } - except Exception as e: - return { - "success": False, - "message": f"Error setting channel: {str(e)}", - "interface": request.interface, - "channel": request.channel - } +async def scan_wifi(request: ScanWifiRequest, service: WiFiService = Depends(get_wifi_service)): + return await service.scan_networks(request.interface, request.timeout) -@router.post("/capture/start") -async def start_capture(request: CaptureRequest, background_tasks: BackgroundTasks): - """ - 開始捕獲 Wi-Fi 流量 - """ - global capture_active, capture_process - - if capture_active: - return { - "success": False, - "message": "Capture is already running" - } - - try: - # 使用固定的輸出文件名 - output_file = "deauth_handshake" - - # 確保捕獲目錄存在 - os.makedirs("data/captures", exist_ok=True) - output_path = os.path.join("data/captures", output_file) - - # 刪除舊的捕獲文件(airodump-ng 會自動加上 -01、-02 等後綴) - import glob - old_file_patterns = [ - f"{output_path}*.cap", - f"{output_path}*.csv", - f"{output_path}*.kismet*", - f"{output_path}*.log.csv" - ] - - for pattern in old_file_patterns: - for old_file in glob.glob(pattern): - try: - if os.path.exists(old_file): - os.remove(old_file) - print(f"Removed old capture file: {old_file}") - except Exception as e: - print(f"Failed to remove old file {old_file}: {e}") - - # 使用 airodump-ng 開始捕獲流量 - # 指令範例:sudo airodump-ng -c 7 --bssid BO:BE:76:CD:97:24 -w capture wlan1 - capture_command = [ - "sudo", "airodump-ng", - "-c", str(request.channel), - "--bssid", request.bssid, - "-w", output_path, - request.interface - ] - - # 在背景啟動捕獲進程 - background_tasks.add_task(run_capture_process, capture_command, output_path) - - return { - "success": True, - "message": "Traffic capture started", - "capture_file": "deauth_handshake-01.cap", # airodump-ng 會自動產生 -01 後綴 - "command": " ".join(capture_command) - } - except Exception as e: - return { - "success": False, - "message": f"Failed to start capture: {str(e)}" - } +@router.post("/ap/start") +async def start_ap(config: APConfig, service: WiFiService = Depends(get_wifi_service)): + return await service.start_ap(config) -@router.post("/deauth/send") -async def send_deauth(request: DeauthRequest): - """ - 發送解除認證封包 - """ - try: - # 構建 aireplay-ng 指令 - # sudo aireplay-ng --deauth {packets} -a {bssid} {interface} - deauth_command = [ - "sudo", "aireplay-ng", - "--deauth", str(request.packets), - "-a", request.bssid, - request.interface - ] - - # 如果不是廣播模式,可以加入特定客戶端MAC - # 這裡暫時只支援廣播模式(對所有連接的客戶端發送) - - # 執行指令 - result = subprocess.run( - deauth_command, - capture_output=True, - text=True, - timeout=30 # 30秒超時 - ) - - if result.returncode == 0: - return { - "success": True, - "message": f"Successfully sent {request.packets} deauth packets to {request.bssid}", - "packets_sent": request.packets, - "target_bssid": request.bssid, - "interface": request.interface, - "command": " ".join(deauth_command), - "output": result.stdout - } - else: - return { - "success": False, - "message": f"Failed to send deauth packets: {result.stderr}", - "command": " ".join(deauth_command), - "error": result.stderr - } - - except subprocess.TimeoutExpired: - return { - "success": False, - "message": "Deauth command timed out (30 seconds)", - "packets_sent": 0 - } - except Exception as e: - return { - "success": False, - "message": f"Error sending deauth packets: {str(e)}", - "packets_sent": 0 - } +@router.post("/ap/stop") +async def stop_ap(service: WiFiService = Depends(get_wifi_service)): + return await service.stop_ap() +@router.get("/ap/status") +async def get_ap_status(service: WiFiService = Depends(get_wifi_service)): + return await service.get_ap_status() -@router.post("/handshake/check") -async def check_handshake(request: HandshakeCheckRequest): - """ - 檢查捕獲文件中的握手包數量 - """ - try: - # 如果指定的檔案不存在,嘗試自動偵測最新的 capture 檔案 - capture_path = os.path.join("data/captures", request.capture_file) - - if not os.path.exists(capture_path): - # 嘗試尋找最新的 capture 檔案 - import glob - capture_dir = "data/captures" - if os.path.exists(capture_dir): - capture_files = glob.glob(os.path.join(capture_dir, "deauth_handshake*.cap")) - if capture_files: - # 按修改時間排序,取最新的 - capture_path = max(capture_files, key=os.path.getmtime) - request.capture_file = os.path.basename(capture_path) - else: - return { - "success": False, - "message": f"No deauth_handshake files found in {capture_dir}", - "handshakes": 0, - "networks": [] - } - else: - return { - "success": False, - "message": f"Capture directory not found: {capture_dir}", - "handshakes": 0, - "networks": [] - } - - # 再次確認文件是否存在 - if not os.path.exists(capture_path): - return { - "success": False, - "message": f"Capture file not found: {request.capture_file}", - "handshakes": 0, - "networks": [] - } - - # 構建 aircrack-ng 指令來檢查握手包 - # sudo aircrack-ng capture-01.cap - aircrack_command = [ - "sudo", "aircrack-ng", - capture_path - ] - - # 執行指令 - result = subprocess.run( - aircrack_command, - capture_output=True, - text=True, - timeout=30 # 30秒超時 - ) - - # 解析 aircrack-ng 的輸出 - networks = parse_aircrack_output(result.stdout) - - # 計算總握手包數量 - total_handshakes = sum(network.get('handshakes', 0) for network in networks) - - return { - "success": True, - "message": f"Found {total_handshakes} handshake(s) in {len(networks)} network(s)", - "capture_file": request.capture_file, - "total_handshakes": total_handshakes, - "total_networks": len(networks), - "networks": networks, - "command": " ".join(aircrack_command), - "raw_output": result.stdout - } - - except subprocess.TimeoutExpired: - return { - "success": False, - "message": "Aircrack-ng command timed out (30 seconds)", - "handshakes": 0, - "networks": [] - } - except Exception as e: - return { - "success": False, - "message": f"Error checking handshakes: {str(e)}", - "handshakes": 0, - "networks": [] - } +@router.post("/ap/capture/start") +async def start_ap_capture(service: WiFiService = Depends(get_wifi_service)): + return await service.start_ap_capture() +@router.post("/ap/capture/stop") +async def stop_ap_capture(service: WiFiService = Depends(get_wifi_service)): + return await service.stop_ap_capture() -def parse_aircrack_output(output: str) -> List[Dict]: - """ - 解析 aircrack-ng 的輸出來提取網路資訊和握手包數量 - """ - networks = [] - - try: - lines = output.split('\n') - in_network_list = False - - for line in lines: - line = line.strip() - - # 找到網路列表的開始 - if "# BSSID" in line and "ESSID" in line and "Encryption" in line: - in_network_list = True - continue - - # 如果遇到空行或其他內容,停止解析網路列表 - if in_network_list and not line: - break - - # 解析網路資訊 - if in_network_list and line and line[0].isdigit(): - # 範例行: " 1 B0:BE:76:CD:97:24 victim WPA (0 handshake)" - parts = line.split() - if len(parts) >= 4: - try: - network_num = parts[0] - bssid = parts[1] - - # ESSID 可能包含空格,需要特殊處理 - # 找到 WPA/WEP 等加密類型的位置 - encryption_start = -1 - for i, part in enumerate(parts[2:], 2): - if any(enc in part.upper() for enc in ['WPA', 'WEP', 'OPN']): - encryption_start = i - break - - if encryption_start > 2: - essid = ' '.join(parts[2:encryption_start]) - encryption_info = ' '.join(parts[encryption_start:]) - else: - essid = parts[2] if len(parts) > 2 else "Unknown" - encryption_info = ' '.join(parts[3:]) if len(parts) > 3 else "Unknown" - - # 提取握手包數量 - handshakes = 0 - if "handshake" in encryption_info.lower(): - # 尋找數字 - import re - handshake_match = re.search(r'\((\d+) handshake', encryption_info) - if handshake_match: - handshakes = int(handshake_match.group(1)) - - network = { - "number": int(network_num), - "bssid": bssid, - "essid": essid, - "encryption": encryption_info, - "handshakes": handshakes - } - networks.append(network) - - except (ValueError, IndexError) as e: - print(f"Error parsing network line '{line}': {e}") - continue - - except Exception as e: - print(f"Error parsing aircrack output: {e}") - - return networks +@router.get("/ap/capture/list") +async def list_ap_captures(service: WiFiService = Depends(get_wifi_service)): + return await service.list_ap_captures() +@router.post("/interface/channel") +async def set_interface_channel(request: ChannelRequest, service: WiFiService = Depends(get_wifi_service)): + return await service.set_channel(request.interface, request.channel) + +@router.post("/capture/start") +async def start_capture(request: CaptureRequest, background_tasks: BackgroundTasks, service: WiFiService = Depends(get_wifi_service)): + return await service.start_capture(request, background_tasks) @router.post("/capture/stop") -async def stop_capture(): - """ - 停止捕獲 Wi-Fi 流量 - """ - global capture_active, capture_process - - try: - if not capture_active or not capture_process: - return { - "success": False, - "message": "No capture is currently running" - } - - # 終止捕獲進程 (asyncio 子進程) - capture_process.terminate() - - # 等待進程結束 - try: - await asyncio.wait_for(capture_process.wait(), timeout=5.0) - except asyncio.TimeoutError: - capture_process.kill() - await capture_process.wait() - - capture_active = False - capture_process = None - - return { - "success": True, - "message": "Traffic capture stopped" - } - except Exception as e: - return { - "success": False, - "message": f"Failed to stop capture: {str(e)}" - } +async def stop_capture(service: WiFiService = Depends(get_wifi_service)): + return await service.stop_capture() + +@router.post("/deauth/send") +async def send_deauth(request: DeauthRequest, service: WiFiService = Depends(get_wifi_service)): + return await service.send_deauth(request.interface, request.bssid, request.packets) + +@router.post("/handshake/check") +async def check_handshake(request: HandshakeCheckRequest, service: WiFiService = Depends(get_wifi_service)): + return await service.check_handshake(request.capture_file) -# 背景捕獲進程 -async def run_capture_process(command, output_path): - global capture_process, capture_active - - try: - capture_active = True - - # 使用 asyncio.create_subprocess_exec 創建異步子進程 - capture_process = await asyncio.create_subprocess_exec( - *command, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE - ) - - # 異步等待進程完成或被終止 - await capture_process.wait() - - except Exception as e: - print(f"Capture process error: {e}") - finally: - capture_active = False - capture_process = None +@router.post("/capture/crack") +async def crack_password(request: CrackPasswordRequest, service: WiFiService = Depends(get_wifi_service)): + return await service.crack_password(request.capture_file, request.wordlist_file) @router.post("/wordlist-generator") -async def generate_wordlist(request: WordlistRequest): - try: - # 確保檔案名稱合法 - filename = request.output_filename - if not filename.endswith('.txt'): - filename += '.txt' - - # 建立生成器實例 - generator = PasswordGenerator(output_file=f"static/wordlists/{filename}") - - # 從請求中取得資料 - info_data = request.info_data - - # 生成密碼字典 - generator.generate( - DATE=info_data.get('date', []), - TEL=info_data.get('tel', []), - NAME=info_data.get('name', []), - ID=info_data.get('ID', []), - SSID=info_data.get('SSID', [''])[0] if info_data.get('SSID') else '' - ) - - # 讀取生成的檔案以取得樣本和行數 - file_path = f"static/wordlists/{filename}" - with open(file_path, 'r') as f: - lines = f.readlines() - total_count = len(lines) - sample = ''.join(lines[:10]) # 只回傳前10行作為樣本 - - return JSONResponse({ - "success": True, - "filename": filename, - "count": total_count, - "sample": sample, - "download_link": f"/static/wordlists/{filename}" - }) - - except Exception as e: - return JSONResponse({ - "success": False, - "error": str(e) - }) +async def generate_wordlist(request: WordlistRequest, service: WiFiService = Depends(get_wifi_service)): + return await service.generate_wordlist(request.output_filename, request.info_data) @router.get("/wordlists/list") -async def list_wordlists(): - """ - 列出可用的密碼字典檔案 - """ - try: - wordlists = [] - - # 檢查 static/wordlists 目錄 - wordlists_dir = "static/wordlists" - if os.path.exists(wordlists_dir): - for file in os.listdir(wordlists_dir): - if file.endswith('.txt'): - file_path = os.path.join(wordlists_dir, file) - if os.path.isfile(file_path): # 確保是檔案而不是目錄 - file_stat = os.stat(file_path) - wordlists.append({ - "filename": file, - "path": f"wordlists/{file}", - "size": file_stat.st_size, - "category": "custom", - "modified": datetime.fromtimestamp(file_stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'), - "download_link": f"/static/wordlists/{file}" - }) - - # 檢查 static/wordlists/standard 目錄 - standard_dir = "static/wordlists/standard" - if os.path.exists(standard_dir): - for file in os.listdir(standard_dir): - if file.endswith('.txt'): - file_path = os.path.join(standard_dir, file) - if os.path.isfile(file_path): # 確保是檔案而不是目錄 - file_stat = os.stat(file_path) - wordlists.append({ - "filename": file, - "path": f"wordlists/standard/{file}", - "size": file_stat.st_size, - "category": "standard", - "modified": datetime.fromtimestamp(file_stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'), - "download_link": f"/static/wordlists/standard/{file}" - }) - - # 按類別和檔案名稱排序 - wordlists.sort(key=lambda x: (x['category'], x['filename'])) - - return { - "success": True, - "wordlists": wordlists, - "count": len(wordlists) - } - - except Exception as e: - return { - "success": False, - "message": f"Error listing wordlists: {str(e)}", - "wordlists": [], - "count": 0 - } +async def list_wordlists(service: WiFiService = Depends(get_wifi_service)): + return await service.list_wordlists() @router.delete("/wordlists/custom/{filename}") -async def delete_custom_wordlist(filename: str): - """ - 刪除指定的自定義密碼字典檔案(僅限 custom 目錄) - """ - try: - # 安全檢查:確保檔案名稱只包含安全字符 - import re - if not re.match(r'^[a-zA-Z0-9_.-]+\.txt$', filename): - return { - "success": False, - "message": "Invalid filename format" - } - - # 確保只能刪除 custom 目錄中的檔案 - file_path = os.path.join("static/wordlists", filename) - - # 檢查檔案路徑是否在 standard 目錄中 - if "standard" in filename or os.path.exists(os.path.join("static/wordlists/standard", filename)): - return { - "success": False, - "message": "Cannot delete standard wordlist files" - } - - # 檢查檔案是否存在 - if not os.path.exists(file_path): - return { - "success": False, - "message": f"File not found: {filename}" - } - - # 確保不是目錄 - if not os.path.isfile(file_path): - return { - "success": False, - "message": f"Not a file: {filename}" - } - - # 刪除檔案 - os.remove(file_path) - - return { - "success": True, - "message": f"Successfully deleted {filename}" - } - - except Exception as e: - return { - "success": False, - "message": f"Error deleting wordlist: {str(e)}" - } - -@router.post("/capture/crack") -async def crack_password(request: CrackPasswordRequest): - """ - 使用指定的字典檔案進行密碼破解 - """ - try: - # 構建 capture 文件的完整路徑 - capture_path = os.path.join("data/captures", request.capture_file) - - # 檢查 capture 文件是否存在 - if not os.path.exists(capture_path): - # 嘗試尋找最新的 capture 檔案 - import glob - capture_dir = "data/captures" - if os.path.exists(capture_dir): - capture_files = glob.glob(os.path.join(capture_dir, "deauth_handshake*.cap")) - if capture_files: - capture_path = max(capture_files, key=os.path.getmtime) - request.capture_file = os.path.basename(capture_path) - else: - return { - "success": False, - "message": f"No capture files found in {capture_dir}" - } - else: - return { - "success": False, - "message": f"Capture directory not found: {capture_dir}" - } - - # 構建 wordlist 文件的完整路徑 - wordlist_path = os.path.join("static", request.wordlist_file) - - # 檢查 wordlist 文件是否存在 - if not os.path.exists(wordlist_path): - return { - "success": False, - "message": f"Wordlist file not found: {request.wordlist_file}" - } - - # 構建 aircrack-ng 破解指令 - # sudo aircrack-ng capture-01.cap -w wordlist.txt - crack_command = [ - "sudo", "aircrack-ng", - capture_path, - "-w", wordlist_path - ] - - # 執行指令 (這可能需要很長時間) - result = subprocess.run( - crack_command, - capture_output=True, - text=True, - timeout=300 # 5分鐘超時 - ) - - # 解析輸出尋找密碼 - password_found = None - if "KEY FOUND!" in result.stdout: - # 從輸出中提取密碼 - lines = result.stdout.split('\n') - for line in lines: - if "KEY FOUND!" in line: - # 範例: "KEY FOUND! [ password123 ]" - import re - password_match = re.search(r'KEY FOUND!\s*\[\s*(.+?)\s*\]', line) - if password_match: - password_found = password_match.group(1) - break - - return { - "success": True, - "message": "Password cracking completed", - "capture_file": request.capture_file, - "wordlist_file": request.wordlist_file, - "password_found": password_found, - "command": " ".join(crack_command), - "raw_output": result.stdout, - "return_code": result.returncode - } - - except subprocess.TimeoutExpired: - return { - "success": False, - "message": "Password cracking timed out (5 minutes). The wordlist might be too large or the password is not in the list." - } - except Exception as e: - return { - "success": False, - "message": f"Error during password cracking: {str(e)}" - } +async def delete_custom_wordlist(filename: str, service: WiFiService = Depends(get_wifi_service)): + return await service.delete_wordlist(filename) @router.post("/defense/scan") -async def defense_scan(request: ScanWifiRequest): - """ - 使用 DefenseManager 掃描並分析 Wi-Fi 威脅 - """ - try: - # 創建 DefenseManager 實例 - defense_manager = DefenseManager(iface=request.interface) - - # 執行 Wi-Fi 防禦掃描和分析 - result = defense_manager.run_wifi_defense() - - return { - "success": True, - "module": result["module"], - "issues": result["issues"], - "threat": result["threat"], - "interface": request.interface - } - except Exception as e: - return { - "success": False, - "message": f"Failed to scan and analyze Wi-Fi threats: {str(e)}", - "issues": [], - "threat": {"score": 0, "status": "UNKNOWN"} - } \ No newline at end of file +async def defense_scan(request: ScanWifiRequest, service: WiFiService = Depends(get_wifi_service)): + return await service.defense_scan(request.interface, request.timeout) diff --git a/app/api/__pycache__/BLE.cpython-313.pyc b/app/api/__pycache__/BLE.cpython-313.pyc deleted file mode 100644 index 274fc06..0000000 Binary files a/app/api/__pycache__/BLE.cpython-313.pyc and /dev/null differ diff --git a/app/api/__pycache__/IR.cpython-313.pyc b/app/api/__pycache__/IR.cpython-313.pyc deleted file mode 100644 index bc2905e..0000000 Binary files a/app/api/__pycache__/IR.cpython-313.pyc and /dev/null differ diff --git a/app/api/__pycache__/RFID.cpython-313.pyc b/app/api/__pycache__/RFID.cpython-313.pyc deleted file mode 100644 index e87f93c..0000000 Binary files a/app/api/__pycache__/RFID.cpython-313.pyc and /dev/null differ diff --git a/app/api/__pycache__/WiFi.cpython-313.pyc b/app/api/__pycache__/WiFi.cpython-313.pyc deleted file mode 100644 index a33de8b..0000000 Binary files a/app/api/__pycache__/WiFi.cpython-313.pyc and /dev/null differ diff --git a/app/api/mylib/WeakPasswordGenerater/__pycache__/main.cpython-313.pyc b/app/api/mylib/WeakPasswordGenerater/__pycache__/main.cpython-313.pyc deleted file mode 100644 index 3ecc87e..0000000 Binary files a/app/api/mylib/WeakPasswordGenerater/__pycache__/main.cpython-313.pyc and /dev/null differ diff --git a/app/api/mylib/__pycache__/ap_scan.cpython-313.pyc b/app/api/mylib/__pycache__/ap_scan.cpython-313.pyc deleted file mode 100644 index a62e154..0000000 Binary files a/app/api/mylib/__pycache__/ap_scan.cpython-313.pyc and /dev/null differ diff --git a/app/api/mylib/beacon/__pycache__/beacon_emulator.cpython-313.pyc b/app/api/mylib/beacon/__pycache__/beacon_emulator.cpython-313.pyc deleted file mode 100644 index 736df7e..0000000 Binary files a/app/api/mylib/beacon/__pycache__/beacon_emulator.cpython-313.pyc and /dev/null differ diff --git a/app/main.py b/app/main.py index 4baa784..2612157 100644 --- a/app/main.py +++ b/app/main.py @@ -1,37 +1,62 @@ +import os + from fastapi import FastAPI, Request from fastapi.responses import HTMLResponse, FileResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates + from api import WiFi, BLE, IR, RFID -import os + +DEMO_MODE = os.getenv("HACKMASTER_DEMO_MODE", "false").lower() == "true" app = FastAPI( title="HackMaster Pi", - description="An open source IoT Hacker Tool by using Raspberry Pi Zero W 2", + description="An open source IoT Hacker Tool by using Raspberry Pi Zero 2 W", version="1.0.0", + docs_url="/docs", + redoc_url="/redoc", + openapi_url="/openapi.json", ) -app.mount("/static", StaticFiles(directory="static"), name="static") +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) + +app.mount("/static", StaticFiles(directory=os.path.join(BASE_DIR, "static")), name="static") +templates = Jinja2Templates(directory=os.path.join(BASE_DIR, "templates")) + +app.include_router(BLE.router) +app.include_router(WiFi.router) +app.include_router(IR.router) +app.include_router(RFID.router) + +# Demo mode: override all dependencies with Mock services +if DEMO_MODE: + from services.mock.wifi_mock import WiFiMockService + from services.mock.ble_mock import BLEMockService + from services.mock.ir_mock import IRMockService + from services.mock.rfid_mock import RFIDMockService + + _wifi_mock = WiFiMockService() + _ble_mock = BLEMockService() + _ir_mock = IRMockService() + _rfid_mock = RFIDMockService() + + app.dependency_overrides[WiFi.get_wifi_service] = lambda: _wifi_mock + app.dependency_overrides[BLE.get_ble_service] = lambda: _ble_mock + app.dependency_overrides[IR.get_ir_service] = lambda: _ir_mock + app.dependency_overrides[RFID.get_rfid_service] = lambda: _rfid_mock -templates = Jinja2Templates(directory="templates") @app.get("/", response_class=HTMLResponse) def read_root(request: Request): - return templates.TemplateResponse( - "index.html", - {"request": request} - ) + return templates.TemplateResponse("index.html", {"request": request}) + @app.get("/favicon.ico", include_in_schema=False) async def favicon(): favicon_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "static", "favicon.ico") return FileResponse(favicon_path) -app.include_router(BLE.router) -app.include_router(WiFi.router) -app.include_router(IR.router) -app.include_router(RFID.router) if __name__ == "__main__": import uvicorn - uvicorn.run(app, host='0.0.0.0', port=4000) \ No newline at end of file + uvicorn.run(app, host="0.0.0.0", port=4000) diff --git a/app/services/__init__.py b/app/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/services/ble_service.py b/app/services/ble_service.py new file mode 100644 index 0000000..3d445f2 --- /dev/null +++ b/app/services/ble_service.py @@ -0,0 +1,38 @@ +from abc import ABC, abstractmethod +from typing import Any, Dict, List + + +class BLEService(ABC): + + @abstractmethod + def get_profiles(self) -> List[Dict]: ... + + @abstractmethod + async def add_profile(self, profile: Dict) -> Dict[str, Any]: ... + + @abstractmethod + async def delete_profile(self, name: str) -> Dict[str, Any]: ... + + @abstractmethod + async def start_beacon_emulator(self, profile_name: str) -> Dict[str, Any]: ... + + @abstractmethod + async def stop_beacon_emulator(self) -> Dict[str, Any]: ... + + @abstractmethod + async def get_beacon_emulator_status(self) -> Dict[str, Any]: ... + + @abstractmethod + async def start_airpods_emulator(self) -> Dict[str, Any]: ... + + @abstractmethod + async def stop_airpods_emulator(self) -> Dict[str, Any]: ... + + @abstractmethod + async def get_airpods_emulator_status(self) -> Dict[str, Any]: ... + + @abstractmethod + async def get_airpods_logs(self) -> Dict[str, Any]: ... + + @abstractmethod + async def scan_beacons(self, duration: int = 5) -> List[Dict[str, Any]]: ... diff --git a/app/services/ir_service.py b/app/services/ir_service.py new file mode 100644 index 0000000..92102e1 --- /dev/null +++ b/app/services/ir_service.py @@ -0,0 +1,20 @@ +from abc import ABC, abstractmethod +from typing import Any, Dict, Optional + + +class IRService(ABC): + + @abstractmethod + async def start_recording(self) -> Dict[str, Any]: ... + + @abstractmethod + async def cancel_recording(self) -> Dict[str, Any]: ... + + @abstractmethod + async def get_status(self) -> Dict[str, Any]: ... + + @abstractmethod + async def transmit(self, signal_data: str, format: Optional[str]) -> Dict[str, Any]: ... + + @abstractmethod + async def enumerate_code(self, device_type: str, brand: str, protocol: str, function: str, code_index: int) -> Dict[str, Any]: ... diff --git a/app/services/mock/__init__.py b/app/services/mock/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/services/mock/ble_mock.py b/app/services/mock/ble_mock.py new file mode 100644 index 0000000..db2491f --- /dev/null +++ b/app/services/mock/ble_mock.py @@ -0,0 +1,78 @@ +from typing import Any, Dict, List + +from services.ble_service import BLEService + + +class BLEMockService(BLEService): + + def get_profiles(self) -> List[Dict]: + return [ + { + "name": "Demo iBeacon 1", + "uuid": "12345678-1234-1234-1234-123456789abc", + "major": 1, + "minor": 1, + "power": -59 + }, + { + "name": "Demo iBeacon 2", + "uuid": "87654321-4321-4321-4321-cba987654321", + "major": 2, + "minor": 2, + "power": -65 + } + ] + + async def add_profile(self, profile: Dict) -> Dict[str, Any]: + return {"status": "success"} + + async def delete_profile(self, name: str) -> Dict[str, Any]: + return {"status": "success"} + + async def start_beacon_emulator(self, profile_name: str) -> Dict[str, Any]: + return {"status": "started", "profile": profile_name} + + async def stop_beacon_emulator(self) -> Dict[str, Any]: + return {"status": "stopped"} + + async def get_beacon_emulator_status(self) -> Dict[str, Any]: + return {"status": "running"} + + async def start_airpods_emulator(self) -> Dict[str, Any]: + return {"status": "started", "pid": 99999} + + async def stop_airpods_emulator(self) -> Dict[str, Any]: + return {"status": "stopped", "pid": 99999} + + async def get_airpods_emulator_status(self) -> Dict[str, Any]: + return {"status": "running", "pid": 99999} + + async def get_airpods_logs(self) -> Dict[str, Any]: + return { + "output": "[demo] AirPods emulator running...\n[demo] Broadcasting BLE advertisement\n", + "errors": "" + } + + async def scan_beacons(self, duration: int = 5) -> List[Dict[str, Any]]: + return [ + { + "mac": "AA:BB:CC:DD:EE:FF", + "name": "Demo iBeacon 1", + "rssi": -59, + "uuid": "12345678-1234-1234-1234-123456789abc", + "major": 1, + "minor": 1, + "tx_power": -59, + "type": "iBeacon" + }, + { + "mac": "11:22:33:44:55:66", + "name": "Demo iBeacon 2", + "rssi": -65, + "uuid": "87654321-4321-4321-4321-cba987654321", + "major": 2, + "minor": 2, + "tx_power": -65, + "type": "iBeacon" + } + ] diff --git a/app/services/mock/ir_mock.py b/app/services/mock/ir_mock.py new file mode 100644 index 0000000..aeda4b9 --- /dev/null +++ b/app/services/mock/ir_mock.py @@ -0,0 +1,54 @@ +from typing import Any, Dict, Optional + +from services.ir_service import IRService + +# Module-level state for singleton mock +_mock_recording = False +_mock_signal_ready = False + + +class IRMockService(IRService): + + async def start_recording(self) -> Dict[str, Any]: + global _mock_recording, _mock_signal_ready + _mock_recording = True + _mock_signal_ready = False + return {"success": True, "message": "IR recording started (demo)"} + + async def cancel_recording(self) -> Dict[str, Any]: + global _mock_recording, _mock_signal_ready + _mock_recording = False + _mock_signal_ready = False + return {"success": True, "message": "Recording cancelled (demo)"} + + async def get_status(self) -> Dict[str, Any]: + global _mock_recording, _mock_signal_ready + if _mock_recording: + # Simulate signal received after first status poll + _mock_recording = False + _mock_signal_ready = True + if _mock_signal_ready: + return { + "status": "completed", + "signal": { + "data": "010101010110101010100101010101011010101010010101010101101010", + "format": "NEC", + "length": 32 + } + } + return {"status": "idle"} + + async def transmit(self, signal_data: str, format: Optional[str]) -> Dict[str, Any]: + return {"success": True, "message": "Signal transmitted successfully (demo)"} + + async def enumerate_code(self, device_type: str, brand: str, protocol: str, function: str, code_index: int) -> Dict[str, Any]: + base_value = code_index * 37 + ord(function[0]) if function else code_index * 37 + code_hex = format(base_value % 65536, '04X') + format((base_value * 17) % 65536, '04X') + return { + "success": True, + "code_index": code_index, + "code_hex": code_hex, + "protocol": protocol if protocol != "all" else "NEC", + "response": False, + "bits": 32 + } diff --git a/app/services/mock/rfid_mock.py b/app/services/mock/rfid_mock.py new file mode 100644 index 0000000..c9fb893 --- /dev/null +++ b/app/services/mock/rfid_mock.py @@ -0,0 +1,36 @@ +from typing import Any, Dict + +from services.rfid_service import RFIDService + + +class RFIDMockService(RFIDService): + + async def setup_pn532(self) -> Dict[str, Any]: + return {"success": True, "message": "PN532 initialized (demo)"} + + async def identify_rfid(self) -> Dict[str, Any]: + return { + "success": True, + "uid": "aabbccdd", + "uid_length": 4, + "type": ["Mifare Classic"], + "atqa": "0004", + "sak": "08" + } + + async def write_uid(self, card_data: Dict, save_to_db: bool) -> Dict[str, Any]: + return {"success": True, "message": "UID written successfully (demo)"} + + async def analyze_rfid(self, card_info: Dict) -> Dict[str, Any]: + return { + "success": True, + "module": "RFID Defense", + "issues": [ + { + "type": "STATIC_UID", + "severity": "MEDIUM", + "description": "Card uses a static UID which may be vulnerable to cloning" + } + ], + "threat": {"score": 50, "status": "MEDIUM"} + } diff --git a/app/services/mock/wifi_mock.py b/app/services/mock/wifi_mock.py new file mode 100644 index 0000000..6939072 --- /dev/null +++ b/app/services/mock/wifi_mock.py @@ -0,0 +1,227 @@ +from typing import Any, Dict, List + +from fastapi import BackgroundTasks + +from services.wifi_service import WiFiService + + +class WiFiMockService(WiFiService): + + async def get_interface_details(self) -> Dict[str, Any]: + return { + "success": True, + "output": "wlan0: flags=4163 mtu 1500\n" + " inet 192.168.1.100 netmask 255.255.255.0 broadcast 192.168.1.255\n" + "wlan1: flags=4163 mtu 1500\n", + "message": "Network adapters listed successfully", + "adapters": ["wlan0", "wlan1"] + } + + async def get_interface_list(self) -> Dict[str, Any]: + return { + "success": True, + "adapters": ["wlan0", "wlan1"], + "count": 2 + } + + async def set_monitor_mode(self, interface: str) -> Dict[str, Any]: + return { + "success": True, + "message": f"Monitor mode activated successfully for {interface} (demo)", + "interface": interface + } + + async def get_interface_status(self, interface: str) -> Dict[str, Any]: + return { + "success": True, + "interface": interface, + "mode": "Monitor", + "status": "Monitor mode", + "output": f"{interface} IEEE 802.11 Mode:Monitor Tx-Power=20 dBm\n" + } + + async def scan_networks(self, interface: str, timeout: int) -> Dict[str, Any]: + return { + "success": True, + "ap_list": [ + { + "bssid": "AA:BB:CC:DD:EE:01", + "ssid": "DemoNetwork_1", + "channel": 6, + "signal": -55, + "encryption": "WPA2" + }, + { + "bssid": "AA:BB:CC:DD:EE:02", + "ssid": "DemoNetwork_2", + "channel": 11, + "signal": -70, + "encryption": "WPA2" + }, + { + "bssid": "AA:BB:CC:DD:EE:03", + "ssid": "DemoNetwork_3", + "channel": 1, + "signal": -80, + "encryption": "WEP" + } + ], + "interface": interface, + "count": 3 + } + + async def set_channel(self, interface: str, channel: str) -> Dict[str, Any]: + return { + "success": True, + "message": f"Channel {channel} set successfully for {interface} (demo)", + "interface": interface, + "channel": channel + } + + async def start_capture(self, request: Any, background_tasks: BackgroundTasks) -> Dict[str, Any]: + return { + "success": True, + "message": "Traffic capture started (demo)", + "capture_file": "demo_handshake-01.cap", + "command": "sudo airodump-ng -c 6 --bssid AA:BB:CC:DD:EE:01 -w data/captures/demo_handshake wlan1" + } + + async def stop_capture(self) -> Dict[str, Any]: + return { + "success": True, + "message": "Traffic capture stopped (demo)" + } + + async def send_deauth(self, interface: str, bssid: str, packets: int) -> Dict[str, Any]: + return { + "success": True, + "message": f"Successfully sent {packets} deauth packets to {bssid} (demo)", + "packets_sent": packets, + "target_bssid": bssid, + "interface": interface, + "command": f"sudo aireplay-ng --deauth {packets} -a {bssid} {interface}", + "output": "" + } + + async def check_handshake(self, capture_file: str) -> Dict[str, Any]: + return { + "success": True, + "message": "Found 1 handshake(s) in 1 network(s)", + "capture_file": capture_file, + "total_handshakes": 1, + "total_networks": 1, + "networks": [ + { + "number": 1, + "bssid": "AA:BB:CC:DD:EE:01", + "essid": "DemoNetwork_1", + "encryption": "WPA (1 handshake)", + "handshakes": 1 + } + ], + "command": f"sudo aircrack-ng data/captures/{capture_file}", + "raw_output": "" + } + + async def crack_password(self, capture_file: str, wordlist_file: str) -> Dict[str, Any]: + return { + "success": True, + "message": "Password cracking completed (demo)", + "capture_file": capture_file, + "wordlist_file": wordlist_file, + "password_found": "demo_password123", + "command": f"sudo aircrack-ng data/captures/{capture_file} -w static/{wordlist_file}", + "raw_output": "KEY FOUND! [ demo_password123 ]", + "return_code": 0 + } + + async def generate_wordlist(self, output_filename: str, info_data: Dict[str, List[str]]) -> Dict[str, Any]: + filename = output_filename if output_filename.endswith('.txt') else output_filename + '.txt' + sample_lines = [ + "demo1234\n", "password1\n", "test123\n", "hello2024\n", "hackmaster\n", + "demo5678\n", "password2\n", "test456\n", "hello2025\n", "hackmaster2\n" + ] + sample = ''.join(sample_lines) + return { + "success": True, + "filename": filename, + "count": 10, + "sample": sample, + "download_link": f"/static/wordlists/{filename}" + } + + async def list_wordlists(self) -> Dict[str, Any]: + return { + "success": True, + "wordlists": [ + { + "filename": "demo_wordlist.txt", + "path": "wordlists/demo_wordlist.txt", + "size": 1024, + "category": "custom", + "modified": "2025-01-01 00:00:00", + "download_link": "/static/wordlists/demo_wordlist.txt" + }, + { + "filename": "rockyou_mini.txt", + "path": "wordlists/standard/rockyou_mini.txt", + "size": 204800, + "category": "standard", + "modified": "2025-01-01 00:00:00", + "download_link": "/static/wordlists/standard/rockyou_mini.txt" + } + ], + "count": 2 + } + + async def delete_wordlist(self, filename: str) -> Dict[str, Any]: + return { + "success": True, + "message": f"Successfully deleted {filename} (demo)" + } + + async def defense_scan(self, interface: str, timeout: int) -> Dict[str, Any]: + return { + "success": True, + "module": "Wi-Fi Defense", + "issues": [ + { + "type": "EVIL_TWIN", + "severity": "HIGH", + "description": "Potential Evil Twin AP detected: DemoNetwork_1", + "bssid": "AA:BB:CC:FF:EE:01", + "ssid": "DemoNetwork_1" + } + ], + "threat": {"score": 75, "status": "HIGH"}, + "interface": interface + } + + async def start_ap(self, config: Any) -> Dict[str, Any]: + return {"success": True, "message": "AP started successfully"} + + async def stop_ap(self) -> Dict[str, Any]: + return {"success": True, "message": "AP stopped successfully"} + + async def get_ap_status(self) -> Dict[str, Any]: + return { + "running": True, + "ssid": "Mock_AP", + "clients": 2, + "uptime": "00:15:30" + } + + async def start_ap_capture(self) -> Dict[str, Any]: + return {"success": True, "message": "Capture started"} + + async def stop_ap_capture(self) -> Dict[str, Any]: + return {"success": True, "message": "Capture stopped"} + + async def list_ap_captures(self) -> Dict[str, Any]: + return { + "success": True, + "files": [ + {"name": "capture_01.pcap", "size": "1.2 MB", "date": "2023-10-25 10:30:00"}, + {"name": "capture_02.pcap", "size": "3.4 MB", "date": "2023-10-25 11:45:00"} + ] + } diff --git a/app/services/real/__init__.py b/app/services/real/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/services/real/ble_real.py b/app/services/real/ble_real.py new file mode 100644 index 0000000..1a06682 --- /dev/null +++ b/app/services/real/ble_real.py @@ -0,0 +1,135 @@ +import os +import subprocess +import signal +import psutil +import json +from pathlib import Path +from typing import Any, Dict, List + +from fastapi import HTTPException + +from services.ble_service import BLEService +from api.mylib.beacon import beacon_emulator +from api.mylib.beacon.beacon_scanner import BeaconScanner + +PROFILES_FILE = Path("data/beacon_profiles.json") +PROFILES_FILE.parent.mkdir(exist_ok=True) +if not PROFILES_FILE.exists(): + PROFILES_FILE.write_text("[]") + +# Module-level global state +_running_process = None +_beacon_emulator_active = False + + +class BLERealService(BLEService): + + def get_profiles(self) -> List[Dict]: + return json.loads(PROFILES_FILE.read_text()) + + async def add_profile(self, profile: Dict) -> Dict[str, Any]: + profiles = json.loads(PROFILES_FILE.read_text()) + profiles.append(profile) + PROFILES_FILE.write_text(json.dumps(profiles, indent=2)) + return {"status": "success"} + + async def delete_profile(self, name: str) -> Dict[str, Any]: + profiles = json.loads(PROFILES_FILE.read_text()) + profiles = [p for p in profiles if p["name"] != name] + PROFILES_FILE.write_text(json.dumps(profiles, indent=2)) + return {"status": "success"} + + async def start_beacon_emulator(self, profile_name: str) -> Dict[str, Any]: + global _beacon_emulator_active + profiles = json.loads(PROFILES_FILE.read_text()) + profile = next((p for p in profiles if p["name"] == profile_name), None) + if not profile: + return {"success": False, "message": "Profile not found"} + beacon_emulator.start_ibeacon( + uuid=profile["uuid"], + major=profile["major"], + minor=profile["minor"], + power=profile["power"] + ) + _beacon_emulator_active = True + return {"status": "started", "profile": profile["name"]} + + async def stop_beacon_emulator(self) -> Dict[str, Any]: + global _beacon_emulator_active + beacon_emulator.stop_ibeacon() + _beacon_emulator_active = False + return {"status": "stopped"} + + async def get_beacon_emulator_status(self) -> Dict[str, Any]: + global _beacon_emulator_active + if _beacon_emulator_active: + return {"status": "running"} + return {"status": "not_running"} + + async def start_airpods_emulator(self) -> Dict[str, Any]: + global _running_process + if _running_process and _running_process.poll() is None: + return {"status": "already_running", "pid": _running_process.pid} + try: + env = os.environ.copy() + cmd = ["sudo", "-E", "python3", "api/mylib/apple_bleee/adv_airpods.py"] + with open("airpods_output.log", "w") as out_file, open("airpods_error.log", "w") as err_file: + process = subprocess.Popen( + cmd, + stdout=out_file, + stderr=err_file, + text=True, + env=env, + preexec_fn=os.setsid + ) + _running_process = process + return {"status": "started", "pid": process.pid} + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to start: {str(e)}") + + async def stop_airpods_emulator(self) -> Dict[str, Any]: + global _running_process + if not _running_process: + return {"status": "not_running"} + try: + pid = _running_process.pid + parent = psutil.Process(pid) + children = parent.children(recursive=True) + for child in children: + child.terminate() + parent.terminate() + gone, alive = psutil.wait_procs([parent], timeout=3) + for p in alive: + p.kill() + _running_process = None + return {"status": "stopped", "pid": pid} + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to stop: {str(e)}") + + async def get_airpods_emulator_status(self) -> Dict[str, Any]: + global _running_process + if _running_process and _running_process.poll() is None: + return {"status": "running", "pid": _running_process.pid} + return {"status": "not_running"} + + async def get_airpods_logs(self) -> Dict[str, Any]: + try: + error_content = "" + output_content = "" + if os.path.exists("airpods_error.log"): + with open("airpods_error.log", "r") as error_file: + error_content = error_file.read() + if os.path.exists("airpods_output.log"): + with open("airpods_output.log", "r") as output_file: + output_content = output_file.read() + return {"output": output_content, "errors": error_content} + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to read logs: {str(e)}") + + async def scan_beacons(self, duration: int = 5) -> List[Dict[str, Any]]: + try: + scanner = BeaconScanner(scan_duration=duration) + beacons = await scanner.scan() + return beacons + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to scan beacons: {str(e)}") diff --git a/app/services/real/ir_real.py b/app/services/real/ir_real.py new file mode 100644 index 0000000..322312e --- /dev/null +++ b/app/services/real/ir_real.py @@ -0,0 +1,90 @@ +import asyncio +import time +import random +from typing import Any, Dict, Optional + +from services.ir_service import IRService + +# Module-level global state +_ir_recording = False +_ir_signal = None +_ir_record_start_time = None + + +def generate_ir_code(device_type, brand, protocol, function, index): + base_value = index * 37 + (ord(function[0]) if function else 0) + return format(base_value % 65536, '04X') + format((base_value * 17) % 65536, '04X') + + +async def _simulate_ir_reception(): + global _ir_recording, _ir_signal + await asyncio.sleep(3) + if _ir_recording: + _ir_signal = { + "data": "010101010110101010100101010101011010101010010101010101101010", + "format": "NEC", + "length": 32 + } + _ir_recording = False + + +class IRRealService(IRService): + + async def start_recording(self) -> Dict[str, Any]: + global _ir_recording, _ir_signal, _ir_record_start_time + try: + if _ir_recording: + return {"success": False, "message": "Already recording"} + _ir_recording = True + _ir_signal = None + _ir_record_start_time = time.time() + asyncio.create_task(_simulate_ir_reception()) + return {"success": True, "message": "IR recording started"} + except Exception as e: + _ir_recording = False + return {"success": False, "message": str(e)} + + async def cancel_recording(self) -> Dict[str, Any]: + global _ir_recording, _ir_signal + try: + _ir_recording = False + _ir_signal = None + return {"success": True, "message": "Recording cancelled"} + except Exception as e: + return {"success": False, "message": str(e)} + + async def get_status(self) -> Dict[str, Any]: + global _ir_recording, _ir_signal, _ir_record_start_time + if _ir_recording and _ir_record_start_time: + if time.time() - _ir_record_start_time > 15: + _ir_recording = False + return {"status": "timeout", "message": "Recording timed out"} + if _ir_recording: + return {"status": "recording"} + elif _ir_signal: + return {"status": "completed", "signal": _ir_signal} + else: + return {"status": "idle"} + + async def transmit(self, signal_data: str, format: Optional[str]) -> Dict[str, Any]: + try: + await asyncio.sleep(1) + return {"success": True, "message": "Signal transmitted successfully"} + except Exception as e: + return {"success": False, "message": str(e)} + + async def enumerate_code(self, device_type: str, brand: str, protocol: str, function: str, code_index: int) -> Dict[str, Any]: + try: + await asyncio.sleep(0.1) + code_hex = generate_ir_code(device_type, brand, protocol, function, code_index) + response_detected = random.random() < 0.05 + return { + "success": True, + "code_index": code_index, + "code_hex": code_hex, + "protocol": protocol if protocol != "all" else random.choice(["NEC", "SONY", "RC5", "RC6", "SAMSUNG"]), + "response": response_detected, + "bits": 32 + } + except Exception as e: + return {"success": False, "message": str(e)} diff --git a/app/services/real/rfid_real.py b/app/services/real/rfid_real.py new file mode 100644 index 0000000..c811f6a --- /dev/null +++ b/app/services/real/rfid_real.py @@ -0,0 +1,61 @@ +import asyncio +import binascii +from typing import Any, Dict + +from services.rfid_service import RFIDService +from api.mylib.RFIDlib import main as RFIDlib +from api.mylib.defense.defense_manager import DefenseManager + + +class RFIDRealService(RFIDService): + + async def setup_pn532(self) -> Dict[str, Any]: + try: + result = RFIDlib.setup() + return result + except Exception as e: + return {"success": False, "error": str(e)} + + async def identify_rfid(self) -> Dict[str, Any]: + try: + found = RFIDlib.iso14443a_identify() + while not found["success"]: + await asyncio.sleep(0.1) + found = RFIDlib.iso14443a_identify() + return found + except Exception as e: + return {"success": False, "error": str(e)} + + async def write_uid(self, card_data: Dict, save_to_db: bool) -> Dict[str, Any]: + try: + new_uid_hex = card_data.get("uid") + if not new_uid_hex: + return {"success": False, "error": "UID is required"} + if len(new_uid_hex) != 8: + return {"success": False, "error": "UID must be 8 hexadecimal characters"} + try: + binascii.unhexlify(new_uid_hex) + except Exception: + return {"success": False, "error": "UID contains invalid characters"} + result = RFIDlib.write_uid(new_uid_hex) + return result + except Exception as e: + return {"success": False, "error": str(e)} + + async def analyze_rfid(self, card_info: Dict) -> Dict[str, Any]: + try: + defense_manager = DefenseManager() + result = defense_manager.run_rfid_defense(card_info) + return { + "success": True, + "module": result["module"], + "issues": result["issues"], + "threat": result["threat"] + } + except Exception as e: + return { + "success": False, + "error": str(e), + "issues": [], + "threat": {"score": 0, "status": "UNKNOWN"} + } diff --git a/app/services/real/wifi_real.py b/app/services/real/wifi_real.py new file mode 100644 index 0000000..4b53f7c --- /dev/null +++ b/app/services/real/wifi_real.py @@ -0,0 +1,574 @@ +import os +import subprocess +import asyncio +import glob +import re +from datetime import datetime +from typing import Any, Dict, List, Optional + +from fastapi import BackgroundTasks + +from services.wifi_service import WiFiService +from api.mylib.WeakPasswordGenerater.main import PasswordGenerator +from api.mylib.ap_scan import scan_wifi_networks +from api.mylib.defense.defense_manager import DefenseManager + +# Module-level global state (preserved across requests) +_capture_active = False +_capture_process = None +_network_adapters: List[str] = [] + + +def extract_adapter_names(ifconfig_output: str) -> List[str]: + adapter_names = [] + lines = ifconfig_output.split('\n') + for line in lines: + if line and not line.startswith(' ') and not line.startswith('\t'): + if ':' in line: + adapter_name = line.split(':')[0].strip() + else: + parts = line.split() + if parts: + adapter_name = parts[0].strip() + else: + continue + if (adapter_name and adapter_name.isalnum()) or any(c in adapter_name for c in ['-', '_']): + adapter_names.append(adapter_name) + return adapter_names + + +def parse_aircrack_output(output: str) -> List[Dict]: + networks = [] + try: + lines = output.split('\n') + in_network_list = False + for line in lines: + line = line.strip() + if "# BSSID" in line and "ESSID" in line and "Encryption" in line: + in_network_list = True + continue + if in_network_list and not line: + break + if in_network_list and line and line[0].isdigit(): + parts = line.split() + if len(parts) >= 4: + try: + network_num = parts[0] + bssid = parts[1] + encryption_start = -1 + for i, part in enumerate(parts[2:], 2): + if any(enc in part.upper() for enc in ['WPA', 'WEP', 'OPN']): + encryption_start = i + break + if encryption_start > 2: + essid = ' '.join(parts[2:encryption_start]) + encryption_info = ' '.join(parts[encryption_start:]) + else: + essid = parts[2] if len(parts) > 2 else "Unknown" + encryption_info = ' '.join(parts[3:]) if len(parts) > 3 else "Unknown" + handshakes = 0 + if "handshake" in encryption_info.lower(): + handshake_match = re.search(r'\((\d+) handshake', encryption_info) + if handshake_match: + handshakes = int(handshake_match.group(1)) + networks.append({ + "number": int(network_num), + "bssid": bssid, + "essid": essid, + "encryption": encryption_info, + "handshakes": handshakes + }) + except (ValueError, IndexError) as e: + print(f"Error parsing network line '{line}': {e}") + continue + except Exception as e: + print(f"Error parsing aircrack output: {e}") + return networks + + +async def run_capture_process(command, output_path): + global _capture_process, _capture_active + try: + _capture_active = True + _capture_process = await asyncio.create_subprocess_exec( + *command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + await _capture_process.wait() + except Exception as e: + print(f"Capture process error: {e}") + finally: + _capture_active = False + _capture_process = None + + +class WiFiRealService(WiFiService): + + async def get_interface_details(self) -> Dict[str, Any]: + global _network_adapters + try: + result = subprocess.run( + ["ifconfig", "-a"], + capture_output=True, text=True, timeout=10 + ) + if result.returncode == 0: + _network_adapters = extract_adapter_names(result.stdout) + return { + "success": True, + "output": result.stdout, + "message": "Network adapters listed successfully", + "adapters": _network_adapters + } + else: + return { + "success": False, + "message": f"Failed to execute ifconfig: {result.stderr}", + "output": result.stderr, + "adapters": [] + } + except subprocess.TimeoutExpired: + return {"success": False, "message": "Command timed out", "output": "", "adapters": []} + except FileNotFoundError: + return { + "success": False, + "message": "ifconfig command not found. Please ensure net-tools is installed.", + "output": "", + "adapters": [] + } + except Exception as e: + return {"success": False, "message": f"Error executing ifconfig: {str(e)}", "output": "", "adapters": []} + + async def get_interface_list(self) -> Dict[str, Any]: + global _network_adapters + return { + "success": True, + "adapters": _network_adapters, + "count": len(_network_adapters) + } + + async def set_monitor_mode(self, interface: str) -> Dict[str, Any]: + try: + up_result = subprocess.run( + ["sudo", "ifconfig", interface, "up"], + capture_output=True, text=True, timeout=10 + ) + if up_result.returncode != 0: + return { + "success": False, + "message": f"Failed to bring up interface {interface}", + "error": up_result.stderr + } + monitor_result = subprocess.run( + ["sudo", "iwconfig", interface, "mode", "monitor"], + capture_output=True, text=True, timeout=10 + ) + if monitor_result.returncode != 0: + return { + "success": False, + "message": f"Failed to set monitor mode for {interface}", + "error": monitor_result.stderr + } + return { + "success": True, + "message": f"Monitor mode activated successfully for {interface}", + "interface": interface + } + except subprocess.TimeoutExpired: + return {"success": False, "message": "Command timed out", "interface": interface} + except Exception as e: + return {"success": False, "message": f"Error activating monitor mode: {str(e)}", "interface": interface} + + async def get_interface_status(self, interface: str) -> Dict[str, Any]: + try: + result = subprocess.run( + ["iwconfig", interface], + capture_output=True, text=True, timeout=10 + ) + if result.returncode != 0: + return { + "success": False, + "message": f"Failed to get status for interface {interface}", + "error": result.stderr, + "interface": interface + } + output = result.stdout + mode = "Unknown" + if "Mode:Monitor" in output: + mode = "Monitor" + elif "Mode:Managed" in output: + mode = "Managed" + elif "Mode:Master" in output: + mode = "Master" + elif "Mode:Ad-Hoc" in output: + mode = "Ad-Hoc" + elif "no wireless extensions" in output.lower(): + return { + "success": False, + "message": f"Interface {interface} is not a wireless interface", + "interface": interface + } + return { + "success": True, + "interface": interface, + "mode": mode, + "status": f"{mode} mode", + "output": output + } + except subprocess.TimeoutExpired: + return {"success": False, "message": "Command timed out", "interface": interface} + except Exception as e: + return {"success": False, "message": f"Error getting interface status: {str(e)}", "interface": interface} + + async def scan_networks(self, interface: str, timeout: int) -> Dict[str, Any]: + try: + loop = asyncio.get_event_loop() + nearby_ap = await loop.run_in_executor(None, scan_wifi_networks, interface, timeout) + return { + "success": True, + "ap_list": nearby_ap, + "interface": interface, + "count": len(nearby_ap) + } + except Exception as e: + return { + "success": False, + "message": f"Failed to scan networks: {str(e)}", + "ap_list": [], + "interface": interface, + "count": 0 + } + + async def set_channel(self, interface: str, channel: str) -> Dict[str, Any]: + try: + result = subprocess.run( + ["sudo", "iwconfig", interface, "channel", channel], + capture_output=True, text=True, timeout=10 + ) + if result.returncode != 0: + return { + "success": False, + "message": f"Failed to set channel {channel} for {interface}", + "error": result.stderr + } + return { + "success": True, + "message": f"Channel {channel} set successfully for {interface}", + "interface": interface, + "channel": channel + } + except subprocess.TimeoutExpired: + return {"success": False, "message": "Command timed out", "interface": interface, "channel": channel} + except Exception as e: + return {"success": False, "message": f"Error setting channel: {str(e)}", "interface": interface, "channel": channel} + + async def start_capture(self, request: Any, background_tasks: BackgroundTasks) -> Dict[str, Any]: + global _capture_active, _capture_process + if _capture_active: + return {"success": False, "message": "Capture is already running"} + try: + output_file = "deauth_handshake" + os.makedirs("data/captures", exist_ok=True) + output_path = os.path.join("data/captures", output_file) + old_file_patterns = [ + f"{output_path}*.cap", + f"{output_path}*.csv", + f"{output_path}*.kismet*", + f"{output_path}*.log.csv" + ] + for pattern in old_file_patterns: + for old_file in glob.glob(pattern): + try: + if os.path.exists(old_file): + os.remove(old_file) + except Exception as e: + print(f"Failed to remove old file {old_file}: {e}") + capture_command = [ + "sudo", "airodump-ng", + "-c", str(request.channel), + "--bssid", request.bssid, + "-w", output_path, + request.interface + ] + background_tasks.add_task(run_capture_process, capture_command, output_path) + return { + "success": True, + "message": "Traffic capture started", + "capture_file": "deauth_handshake-01.cap", + "command": " ".join(capture_command) + } + except Exception as e: + return {"success": False, "message": f"Failed to start capture: {str(e)}"} + + async def stop_capture(self) -> Dict[str, Any]: + global _capture_active, _capture_process + try: + if not _capture_active or not _capture_process: + return {"success": False, "message": "No capture is currently running"} + _capture_process.terminate() + try: + await asyncio.wait_for(_capture_process.wait(), timeout=5.0) + except asyncio.TimeoutError: + _capture_process.kill() + await _capture_process.wait() + _capture_active = False + _capture_process = None + return {"success": True, "message": "Traffic capture stopped"} + except Exception as e: + return {"success": False, "message": f"Failed to stop capture: {str(e)}"} + + async def send_deauth(self, interface: str, bssid: str, packets: int) -> Dict[str, Any]: + try: + deauth_command = [ + "sudo", "aireplay-ng", + "--deauth", str(packets), + "-a", bssid, + interface + ] + result = subprocess.run(deauth_command, capture_output=True, text=True, timeout=30) + if result.returncode == 0: + return { + "success": True, + "message": f"Successfully sent {packets} deauth packets to {bssid}", + "packets_sent": packets, + "target_bssid": bssid, + "interface": interface, + "command": " ".join(deauth_command), + "output": result.stdout + } + else: + return { + "success": False, + "message": f"Failed to send deauth packets: {result.stderr}", + "command": " ".join(deauth_command), + "error": result.stderr + } + except subprocess.TimeoutExpired: + return {"success": False, "message": "Deauth command timed out (30 seconds)", "packets_sent": 0} + except Exception as e: + return {"success": False, "message": f"Error sending deauth packets: {str(e)}", "packets_sent": 0} + + async def check_handshake(self, capture_file: str) -> Dict[str, Any]: + try: + capture_path = os.path.join("data/captures", capture_file) + if not os.path.exists(capture_path): + capture_dir = "data/captures" + if os.path.exists(capture_dir): + cap_files = glob.glob(os.path.join(capture_dir, "deauth_handshake*.cap")) + if cap_files: + capture_path = max(cap_files, key=os.path.getmtime) + capture_file = os.path.basename(capture_path) + else: + return { + "success": False, + "message": f"No deauth_handshake files found in {capture_dir}", + "handshakes": 0, + "networks": [] + } + else: + return { + "success": False, + "message": f"Capture directory not found: {capture_dir}", + "handshakes": 0, + "networks": [] + } + if not os.path.exists(capture_path): + return { + "success": False, + "message": f"Capture file not found: {capture_file}", + "handshakes": 0, + "networks": [] + } + aircrack_command = ["sudo", "aircrack-ng", capture_path] + result = subprocess.run(aircrack_command, capture_output=True, text=True, timeout=30) + networks = parse_aircrack_output(result.stdout) + total_handshakes = sum(n.get('handshakes', 0) for n in networks) + return { + "success": True, + "message": f"Found {total_handshakes} handshake(s) in {len(networks)} network(s)", + "capture_file": capture_file, + "total_handshakes": total_handshakes, + "total_networks": len(networks), + "networks": networks, + "command": " ".join(aircrack_command), + "raw_output": result.stdout + } + except subprocess.TimeoutExpired: + return {"success": False, "message": "Aircrack-ng command timed out (30 seconds)", "handshakes": 0, "networks": []} + except Exception as e: + return {"success": False, "message": f"Error checking handshakes: {str(e)}", "handshakes": 0, "networks": []} + + async def crack_password(self, capture_file: str, wordlist_file: str) -> Dict[str, Any]: + try: + capture_path = os.path.join("data/captures", capture_file) + if not os.path.exists(capture_path): + capture_dir = "data/captures" + if os.path.exists(capture_dir): + cap_files = glob.glob(os.path.join(capture_dir, "deauth_handshake*.cap")) + if cap_files: + capture_path = max(cap_files, key=os.path.getmtime) + capture_file = os.path.basename(capture_path) + else: + return {"success": False, "message": f"No capture files found in {capture_dir}"} + else: + return {"success": False, "message": f"Capture directory not found: {capture_dir}"} + wordlist_path = os.path.join("static", wordlist_file) + if not os.path.exists(wordlist_path): + return {"success": False, "message": f"Wordlist file not found: {wordlist_file}"} + crack_command = ["sudo", "aircrack-ng", capture_path, "-w", wordlist_path] + result = subprocess.run(crack_command, capture_output=True, text=True, timeout=300) + password_found = None + if "KEY FOUND!" in result.stdout: + for line in result.stdout.split('\n'): + if "KEY FOUND!" in line: + password_match = re.search(r'KEY FOUND!\s*\[\s*(.+?)\s*\]', line) + if password_match: + password_found = password_match.group(1) + break + return { + "success": True, + "message": "Password cracking completed", + "capture_file": capture_file, + "wordlist_file": wordlist_file, + "password_found": password_found, + "command": " ".join(crack_command), + "raw_output": result.stdout, + "return_code": result.returncode + } + except subprocess.TimeoutExpired: + return {"success": False, "message": "Password cracking timed out (5 minutes)."} + except Exception as e: + return {"success": False, "message": f"Error during password cracking: {str(e)}"} + + async def generate_wordlist(self, output_filename: str, info_data: Dict[str, List[str]]) -> Dict[str, Any]: + try: + filename = output_filename + if not filename.endswith('.txt'): + filename += '.txt' + generator = PasswordGenerator(output_file=f"static/wordlists/{filename}") + generator.generate( + DATE=info_data.get('date', []), + TEL=info_data.get('tel', []), + NAME=info_data.get('name', []), + ID=info_data.get('ID', []), + SSID=info_data.get('SSID', [''])[0] if info_data.get('SSID') else '' + ) + file_path = f"static/wordlists/{filename}" + with open(file_path, 'r') as f: + lines = f.readlines() + total_count = len(lines) + sample = ''.join(lines[:10]) + return { + "success": True, + "filename": filename, + "count": total_count, + "sample": sample, + "download_link": f"/static/wordlists/{filename}" + } + except Exception as e: + return {"success": False, "error": str(e)} + + async def list_wordlists(self) -> Dict[str, Any]: + try: + wordlists = [] + wordlists_dir = "static/wordlists" + if os.path.exists(wordlists_dir): + for file in os.listdir(wordlists_dir): + if file.endswith('.txt'): + file_path = os.path.join(wordlists_dir, file) + if os.path.isfile(file_path): + file_stat = os.stat(file_path) + wordlists.append({ + "filename": file, + "path": f"wordlists/{file}", + "size": file_stat.st_size, + "category": "custom", + "modified": datetime.fromtimestamp(file_stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'), + "download_link": f"/static/wordlists/{file}" + }) + standard_dir = "static/wordlists/standard" + if os.path.exists(standard_dir): + for file in os.listdir(standard_dir): + if file.endswith('.txt'): + file_path = os.path.join(standard_dir, file) + if os.path.isfile(file_path): + file_stat = os.stat(file_path) + wordlists.append({ + "filename": file, + "path": f"wordlists/standard/{file}", + "size": file_stat.st_size, + "category": "standard", + "modified": datetime.fromtimestamp(file_stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'), + "download_link": f"/static/wordlists/standard/{file}" + }) + wordlists.sort(key=lambda x: (x['category'], x['filename'])) + return {"success": True, "wordlists": wordlists, "count": len(wordlists)} + except Exception as e: + return {"success": False, "message": f"Error listing wordlists: {str(e)}", "wordlists": [], "count": 0} + + async def delete_wordlist(self, filename: str) -> Dict[str, Any]: + try: + if not re.match(r'^[a-zA-Z0-9_.-]+\.txt$', filename): + return {"success": False, "message": "Invalid filename format"} + file_path = os.path.join("static/wordlists", filename) + if "standard" in filename or os.path.exists(os.path.join("static/wordlists/standard", filename)): + return {"success": False, "message": "Cannot delete standard wordlist files"} + if not os.path.exists(file_path): + return {"success": False, "message": f"File not found: {filename}"} + if not os.path.isfile(file_path): + return {"success": False, "message": f"Not a file: {filename}"} + os.remove(file_path) + return {"success": True, "message": f"Successfully deleted {filename}"} + except Exception as e: + return {"success": False, "message": f"Error deleting wordlist: {str(e)}"} + + async def defense_scan(self, interface: str, timeout: int) -> Dict[str, Any]: + try: + defense_manager = DefenseManager(iface=interface) + result = defense_manager.run_wifi_defense() + return { + "success": True, + "module": result["module"], + "issues": result["issues"], + "threat": result["threat"], + "interface": interface + } + except Exception as e: + return { + "success": False, + "message": f"Failed to scan and analyze Wi-Fi threats: {str(e)}", + "issues": [], + "threat": {"score": 0, "status": "UNKNOWN"} + } + + async def start_ap(self, config: Any) -> Dict[str, Any]: + # TODO: Implement real AP start logic + return {"success": True, "message": "AP started successfully"} + + async def stop_ap(self) -> Dict[str, Any]: + # TODO: Implement real AP stop logic + return {"success": True, "message": "AP stopped successfully"} + + async def get_ap_status(self) -> Dict[str, Any]: + # TODO: Implement real AP status logic + return { + "running": True, + "ssid": "Real_AP", + "clients": 0, + "uptime": "00:00:00" + } + + async def start_ap_capture(self) -> Dict[str, Any]: + # TODO: Implement real AP capture start logic + return {"success": True, "message": "Capture started"} + + async def stop_ap_capture(self) -> Dict[str, Any]: + # TODO: Implement real AP capture stop logic + return {"success": True, "message": "Capture stopped"} + + async def list_ap_captures(self) -> Dict[str, Any]: + # TODO: Implement real AP capture list logic + return { + "success": True, + "files": [] + } diff --git a/app/services/rfid_service.py b/app/services/rfid_service.py new file mode 100644 index 0000000..9e34d27 --- /dev/null +++ b/app/services/rfid_service.py @@ -0,0 +1,17 @@ +from abc import ABC, abstractmethod +from typing import Any, Dict, List + + +class RFIDService(ABC): + + @abstractmethod + async def setup_pn532(self) -> Dict[str, Any]: ... + + @abstractmethod + async def identify_rfid(self) -> Dict[str, Any]: ... + + @abstractmethod + async def write_uid(self, card_data: Dict, save_to_db: bool) -> Dict[str, Any]: ... + + @abstractmethod + async def analyze_rfid(self, card_info: Dict) -> Dict[str, Any]: ... diff --git a/app/services/wifi_service.py b/app/services/wifi_service.py new file mode 100644 index 0000000..544cbb0 --- /dev/null +++ b/app/services/wifi_service.py @@ -0,0 +1,70 @@ +from abc import ABC, abstractmethod +from typing import Any, Dict, List, Optional + +from fastapi import BackgroundTasks + + +class WiFiService(ABC): + + @abstractmethod + async def get_interface_details(self) -> Dict[str, Any]: ... + + @abstractmethod + async def get_interface_list(self) -> Dict[str, Any]: ... + + @abstractmethod + async def set_monitor_mode(self, interface: str) -> Dict[str, Any]: ... + + @abstractmethod + async def get_interface_status(self, interface: str) -> Dict[str, Any]: ... + + @abstractmethod + async def scan_networks(self, interface: str, timeout: int) -> Dict[str, Any]: ... + + @abstractmethod + async def set_channel(self, interface: str, channel: str) -> Dict[str, Any]: ... + + @abstractmethod + async def start_capture(self, request: Any, background_tasks: BackgroundTasks) -> Dict[str, Any]: ... + + @abstractmethod + async def stop_capture(self) -> Dict[str, Any]: ... + + @abstractmethod + async def send_deauth(self, interface: str, bssid: str, packets: int) -> Dict[str, Any]: ... + + @abstractmethod + async def check_handshake(self, capture_file: str) -> Dict[str, Any]: ... + + @abstractmethod + async def crack_password(self, capture_file: str, wordlist_file: str) -> Dict[str, Any]: ... + + @abstractmethod + async def generate_wordlist(self, output_filename: str, info_data: Dict[str, List[str]]) -> Dict[str, Any]: ... + + @abstractmethod + async def list_wordlists(self) -> Dict[str, Any]: ... + + @abstractmethod + async def delete_wordlist(self, filename: str) -> Dict[str, Any]: ... + + @abstractmethod + async def defense_scan(self, interface: str, timeout: int) -> Dict[str, Any]: ... + + @abstractmethod + async def start_ap(self, config: Any) -> Dict[str, Any]: ... + + @abstractmethod + async def stop_ap(self) -> Dict[str, Any]: ... + + @abstractmethod + async def get_ap_status(self) -> Dict[str, Any]: ... + + @abstractmethod + async def start_ap_capture(self) -> Dict[str, Any]: ... + + @abstractmethod + async def stop_ap_capture(self) -> Dict[str, Any]: ... + + @abstractmethod + async def list_ap_captures(self) -> Dict[str, Any]: ...