-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
376 lines (304 loc) · 14.8 KB
/
main.py
File metadata and controls
376 lines (304 loc) · 14.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
"""
代码分析系统主入口
"""
import asyncio
import argparse
import os
import sys
import time
from pathlib import Path
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.tree import Tree
from rich.markdown import Markdown
# 添加当前目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from core.workflow import CodeAnalysisWorkflow
from agents import (
CodeQueryAgent, DirectoryScannerAgent, ASTAnalyzerAgent,
CodeDiagnosticianAgent, CodeFixerAgent, AISchedulerAgent
)
from config import settings
from utils.file_utils import FileUtils
from utils.llm_factory import get_current_model_info, validate_llm_config
console = Console()
class CodeAnalyzer:
"""代码分析系统主类"""
def __init__(self):
self.workflow = CodeAnalysisWorkflow()
self.console = Console()
async def analyze_code(self, user_request: str, target_directory: str,
output_format: str = "rich", save_report: bool = False) -> dict:
"""执行代码分析"""
# 验证目录
if not os.path.exists(target_directory):
return {'success': False, 'error': f'目录不存在: {target_directory}'}
# 验证LLM配置
is_valid, error_msg = validate_llm_config()
if not is_valid:
return {'success': False, 'error': f'LLM配置错误: {error_msg}'}
# 获取当前模型信息
model_info = get_current_model_info()
# 显示开始信息
self.console.print(Panel.fit(
f"🔍 开始分析代码库\n📁 目标目录: {target_directory}\n📋 分析需求: {user_request}\n🤖 使用模型: {model_info['provider'].upper()} - {model_info['model']}",
title="代码分析系统",
style="bold blue"
))
start_time = time.time()
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
console=self.console
) as progress:
task = progress.add_task("正在分析代码...", total=None)
try:
# 运行工作流
result = await self.workflow.run_analysis(user_request, target_directory)
progress.update(task, description="分析完成", completed=True)
# 计算执行时间
execution_time = time.time() - start_time
result['execution_time'] = execution_time
# 显示结果
if output_format == "rich":
self._display_rich_results(result)
elif output_format == "json":
import json
self.console.print_json(json.dumps(result, indent=2, ensure_ascii=False))
# 保存报告
if save_report:
self._save_report(result, target_directory)
return result
except Exception as e:
progress.update(task, description=f"分析失败: {str(e)}")
return {'success': False, 'error': str(e)}
def _display_rich_results(self, result: dict):
"""使用Rich显示分析结果"""
# 显示执行摘要
summary_table = Table(title="📊 执行摘要", show_header=True, header_style="bold magenta")
summary_table.add_column("项目", style="cyan")
summary_table.add_column("值", style="white")
summary_table.add_row("✅ 分析状态", "成功" if result.get('success') else "失败")
summary_table.add_row("⏱️ 执行时间", f"{result.get('execution_time', 0):.2f} 秒")
summary_table.add_row("🔧 完成任务", str(len(result.get('completed_tasks', []))))
summary_table.add_row("❌ 错误数量", str(result.get('error_count', 0)))
self.console.print(summary_table)
self.console.print()
# 显示完成的任务
if result.get('completed_tasks'):
task_tree = Tree("📋 已完成任务")
for task in result.get('completed_tasks', []):
task_tree.add(f"✅ {task}")
self.console.print(Panel(task_tree, title="任务执行情况", style="green"))
self.console.print()
# 显示最终报告
final_report = result.get('final_report')
if final_report and final_report.get('success'):
self._display_final_report(final_report)
# 显示执行消息
if result.get('messages'):
messages_text = "\n".join([f"• {msg}" for msg in result.get('messages', [])])
self.console.print(Panel(
messages_text,
title="🗨️ 执行日志",
style="dim"
))
def _display_final_report(self, report: dict):
"""显示最终分析报告"""
# 执行摘要
exec_summary = report.get('execution_summary', {})
if exec_summary:
summary_table = Table(title="🎯 Agent执行情况", show_header=True)
summary_table.add_column("Agent", style="cyan")
summary_table.add_column("状态", style="white")
summary_table.add_column("执行时间", style="yellow")
for agent_perf in exec_summary.get('agent_performance', []):
status = "✅ 成功" if agent_perf.get('success') else "❌ 失败"
exec_time = f"{agent_perf.get('execution_time', 0):.2f}s"
summary_table.add_row(
agent_perf.get('agent', 'Unknown'),
status,
exec_time
)
self.console.print(summary_table)
self.console.print()
# 整合发现
findings = report.get('consolidated_findings', {})
if findings:
# 项目结构
if findings.get('project_structure'):
struct_info = "项目结构分析完成"
self.console.print(Panel(struct_info, title="📁 项目结构", style="blue"))
# 代码问题
if findings.get('code_issues'):
issues = findings['code_issues']
issues_table = Table(title="🚨 发现的代码问题", show_header=True)
issues_table.add_column("类型", style="red")
issues_table.add_column("严重程度", style="yellow")
issues_table.add_column("数量", style="white")
# 统计问题
issue_stats = {}
severity_stats = {}
for issue in issues:
issue_type = issue.get('type', 'unknown')
severity = issue.get('severity', 'info')
issue_stats[issue_type] = issue_stats.get(issue_type, 0) + 1
severity_stats[severity] = severity_stats.get(severity, 0) + 1
for issue_type, count in issue_stats.items():
issues_table.add_row(issue_type, "-", str(count))
self.console.print(issues_table)
# 按严重程度统计
if severity_stats:
severity_table = Table(title="按严重程度分类", show_header=True)
severity_table.add_column("严重程度", style="yellow")
severity_table.add_column("数量", style="white")
for severity, count in severity_stats.items():
color = {"error": "red", "warning": "yellow", "info": "blue"}.get(severity, "white")
severity_table.add_row(f"[{color}]{severity}[/]", str(count))
self.console.print(severity_table)
# 代码度量
if findings.get('code_metrics'):
metrics = findings['code_metrics']
metrics_text = ""
for key, value in metrics.items():
metrics_text += f"• {key}: {value}\n"
if metrics_text:
self.console.print(Panel(metrics_text.strip(), title="📈 代码度量", style="cyan"))
# 建议
recommendations = report.get('recommendations', [])
if recommendations:
rec_tree = Tree("💡 改进建议")
for rec in recommendations:
priority = rec.get('priority', 'medium')
desc = rec.get('description', '')
priority_icon = {"high": "🔴", "medium": "🟡", "low": "🟢"}.get(priority, "⚪")
rec_tree.add(f"{priority_icon} [{priority}] {desc}")
self.console.print(Panel(rec_tree, title="建议", style="magenta"))
def _save_report(self, result: dict, target_directory: str):
"""保存分析报告"""
try:
output_dir = os.path.join(settings.output_dir, "reports")
os.makedirs(output_dir, exist_ok=True)
# 生成报告文件名
import datetime
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
report_file = os.path.join(output_dir, f"analysis_report_{timestamp}.json")
# 保存JSON报告
import json
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(result, f, indent=2, ensure_ascii=False)
self.console.print(f"📄 报告已保存到: {report_file}")
except Exception as e:
self.console.print(f"❌ 保存报告失败: {e}")
async def demo_analysis():
"""演示分析功能"""
console.print(Panel.fit(
"🎯 代码分析系统演示\n\n这将对当前项目进行代码分析演示",
title="演示模式",
style="bold green"
))
analyzer = CodeAnalyzer()
# 使用当前项目目录作为演示
current_dir = os.path.dirname(os.path.abspath(__file__))
# 演示不同类型的分析
demo_requests = [
"分析项目的整体结构和代码质量",
"搜索所有的Agent类和它们的功能",
"生成AST调用图并检查代码问题"
]
for i, request in enumerate(demo_requests, 1):
console.print(f"\n🔍 演示 {i}: {request}")
console.print("-" * 60)
result = await analyzer.analyze_code(
user_request=request,
target_directory=current_dir,
output_format="rich"
)
if not result.get('success'):
console.print(f"❌ 演示 {i} 失败: {result.get('error')}")
# 等待用户确认继续
if i < len(demo_requests):
input("\n按Enter键继续下一个演示...")
def main():
"""主函数"""
parser = argparse.ArgumentParser(description="代码分析Agent系统")
parser.add_argument("--directory", "-d", default=".", help="要分析的目录路径")
parser.add_argument("--request", "-r", default="分析项目代码质量", help="分析请求")
parser.add_argument("--output", "-o", choices=["rich", "json"], default="rich", help="输出格式")
parser.add_argument("--save", "-s", action="store_true", help="保存报告到文件")
parser.add_argument("--demo", action="store_true", help="运行演示模式")
parser.add_argument("--model-info", action="store_true", help="显示当前模型信息")
parser.add_argument("--test-ignore", action="store_true", help="测试忽略目录配置")
args = parser.parse_args()
# 显示模型信息
if args.model_info:
model_info = get_current_model_info()
console.print(Panel.fit(
f"🤖 当前模型信息\n"
f"📦 提供商: {model_info['provider'].upper()}\n"
f"🎨 模型: {model_info['model']}\n"
f"🔗 API地址: {model_info['base_url']}\n"
f"🔑 API密钥: {'[green]已配置[/]' if model_info['has_api_key'] else '[red]未配置[/]'}",
title="模型配置信息",
style="cyan"
))
return
# 检查配置
is_valid, error_msg = validate_llm_config()
if not is_valid:
console.print(f"❌ {error_msg}", style="red")
console.print("📝 请检查您的环境变量配置,或查看 .env.example 文件")
return
# 测试忽略配置
if args.test_ignore:
from utils.file_utils import FileUtils
console.print("🔧 测试忽略目录配置", style="bold cyan")
# 显示默认忽略模式
console.print("\n📋 默认忽略的目录:")
for pattern in settings.exclude_patterns:
readable = pattern.replace('\\', '').replace(r'\.', '.')
console.print(f" • {readable}", style="dim")
# 显示自定义模式
if settings.custom_exclude_patterns:
console.print(f"\n🎯 自定义忽略模式: {settings.custom_exclude_patterns}")
else:
console.print("\n💡 未设置自定义忽略模式 (EXCLUDE_PATTERNS环境变量)")
# 测试扫描
test_dir = args.directory
console.print(f"\n🔍 测试扫描目录: {test_dir}")
files = FileUtils.scan_directory(test_dir)
console.print(f"✅ 扫描到 {len(files)} 个文件")
# 检查venv目录
venv_files = [f for f in files if 'venv' in f.lower()]
if venv_files:
console.print("⚠️ 警告: 发现venv相关文件:", style="yellow")
for f in venv_files[:3]:
console.print(f" • {f}", style="yellow")
else:
console.print("✅ venv目录已正确忽略", style="green")
return
async def run_main():
analyzer = CodeAnalyzer()
if args.demo:
await demo_analysis()
else:
result = await analyzer.analyze_code(
user_request=args.request,
target_directory=args.directory,
output_format=args.output,
save_report=args.save
)
if not result.get('success'):
console.print(f"❌ 分析失败: {result.get('error')}", style="red")
sys.exit(1)
# 运行异步主函数
try:
asyncio.run(run_main())
except KeyboardInterrupt:
console.print("\n🛑 用户中断操作", style="yellow")
except Exception as e:
console.print(f"❌ 系统错误: {e}", style="red")
if __name__ == "__main__":
main()