-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexample.py
More file actions
216 lines (168 loc) · 7.2 KB
/
example.py
File metadata and controls
216 lines (168 loc) · 7.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
"""
代码分析系统快速使用示例
"""
import asyncio
import os
import sys
# 添加项目路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from main import CodeAnalyzer
from agents import *
from core.workflow import CodeAnalysisWorkflow
async def quick_example():
"""快速使用示例"""
print("🚀 代码分析系统快速示例")
print("=" * 50)
# 创建分析器
analyzer = CodeAnalyzer()
# 分析当前项目目录
current_dir = os.path.dirname(os.path.abspath(__file__))
print(f"📁 分析目录: {current_dir}")
print("🔍 开始分析...")
# 执行分析
result = await analyzer.analyze_code(
user_request="分析这个Python项目的代码质量,找出潜在问题并生成改进建议",
target_directory=current_dir,
output_format="rich",
save_report=True
)
# 显示简单摘要
print("\n" + "=" * 50)
print("📊 分析完成!")
print(f"✅ 成功: {result.get('success')}")
print(f"⏱️ 耗时: {result.get('execution_time', 0):.2f} 秒")
print(f"📋 完成任务: {len(result.get('completed_tasks', []))}")
print(f"❌ 错误数: {result.get('error_count', 0)}")
async def individual_agent_examples():
"""单个Agent使用示例"""
print("\n🤖 单个Agent使用示例")
print("=" * 50)
current_dir = os.path.dirname(os.path.abspath(__file__))
# 1. 目录扫描Agent
print("\n1️⃣ 目录扫描Agent")
scanner = DirectoryScannerAgent()
result = scanner.analyze_directory(current_dir, max_depth=3)
if result['success']:
stats = result['statistics']
print(f" 📁 总文件数: {stats['total_files']}")
print(f" 🐍 Python文件: {stats['language_stats'].get('Python', 0)}")
print(f" 📏 总大小: {stats['total_size']} bytes")
# 2. 代码查询Agent
print("\n2️⃣ 代码查询Agent")
query_agent = CodeQueryAgent()
result = query_agent.search_code(current_dir, "class.*Agent", use_regex=True)
if result['success']:
print(f" 🔍 找到匹配: {result['total_matches']} 个")
print(f" 📂 涉及文件: {result['total_files_scanned']} 个")
# 3. AST分析Agent
print("\n3️⃣ AST分析Agent")
ast_agent = ASTAnalyzerAgent()
result = ast_agent.analyze_ast(current_dir, generate_graphs=False)
if result['success']:
print(f" 🔧 分析文件: {result['total_files']}")
print(f" 📊 调用图节点: {result['call_graph_stats']['nodes']}")
print(f" 🔗 调用图边: {result['call_graph_stats']['edges']}")
# 4. 代码诊断Agent
print("\n4️⃣ 代码诊断Agent")
diagnostician = CodeDiagnosticianAgent()
# 找一个Python文件进行诊断
python_files = []
for root, dirs, files in os.walk(current_dir):
for file in files:
if file.endswith('.py') and 'main.py' in file:
python_files.append(os.path.join(root, file))
break
if python_files:
break
if python_files:
result = diagnostician.analyze_quality(python_files[0])
if result['success']:
summary = result['summary']
print(f" 🏆 质量评分: {summary['quality_score']}/100")
print(f" ⚠️ 发现问题: {summary['total_issues']} 个")
print(f" 📝 代码行数: {summary['code_lines']}")
def sync_example():
"""同步使用示例(不使用async)"""
print("\n🔄 同步使用示例")
print("=" * 50)
current_dir = os.path.dirname(os.path.abspath(__file__))
# 使用单个Agent进行同步操作
scanner = DirectoryScannerAgent()
# 生成文本格式的目录树
tree_text = scanner.generate_tree_text(current_dir, max_depth=3)
print("\n📁 项目结构:")
print(tree_text[:500] + "..." if len(tree_text) > 500 else tree_text)
# 分析目录
result = scanner.analyze_directory(current_dir)
if result['success']:
print("\n📊 项目信息:")
project_info = result['project_info']
print(f" 🏷️ 检测到的技术: {', '.join(project_info['detected_types'])}")
print(f" 🚀 检测到的框架: {', '.join(project_info['frameworks'])}")
async def workflow_example():
"""工作流使用示例"""
print("\n⚙️ 工作流编排示例")
print("=" * 50)
# 创建工作流
workflow = CodeAnalysisWorkflow()
current_dir = os.path.dirname(os.path.abspath(__file__))
# 运行完整分析流程
result = await workflow.run_analysis(
user_request="进行全面的代码分析,包括结构分析、质量检查和改进建议",
target_directory=current_dir
)
print(f" ✅ 工作流完成: {result.get('success')}")
print(f" 📋 执行步骤: {', '.join(result.get('completed_tasks', []))}")
# 显示最终报告摘要
final_report = result.get('final_report')
if final_report and final_report.get('success'):
exec_summary = final_report.get('execution_summary', {})
print(f" 🤖 Agent数量: {exec_summary.get('total_agents', 0)}")
print(f" ✅ 成功率: {exec_summary.get('success_rate', 0)*100:.1f}%")
def print_agent_info():
"""打印Agent信息"""
print("\n🤖 可用的Agent列表")
print("=" * 50)
agents_info = [
("CodeQueryAgent", "🔍", "搜索和查找代码片段"),
("DirectoryScannerAgent", "📁", "分析项目文件结构"),
("ASTAnalyzerAgent", "🌲", "AST分析和图表生成"),
("CodeDiagnosticianAgent", "🔬", "代码质量诊断"),
("CodeFixerAgent", "🔧", "自动修复代码问题"),
("AISchedulerAgent", "🧠", "智能调度其他Agent")
]
for agent_name, icon, description in agents_info:
print(f"{icon} {agent_name:25} - {description}")
async def main():
"""主函数"""
print("🎯 欢迎使用代码分析Agent系统!")
print("这个系统使用LangChain和LangGraph构建多Agent协作的代码分析工具")
# 检查环境
if not os.getenv('OPENAI_API_KEY'):
print("\n⚠️ 警告: 未设置OPENAI_API_KEY环境变量")
print("某些功能可能无法正常工作")
# 显示Agent信息
print_agent_info()
# 运行示例
try:
print("\n" + "🚀 开始运行示例...")
# 1. 同步示例
sync_example()
# 2. 单个Agent示例
await individual_agent_examples()
# 3. 工作流示例
await workflow_example()
# 4. 完整分析示例
await quick_example()
print("\n" + "🎉 所有示例执行完成!")
print("您可以修改示例代码来探索更多功能")
except Exception as e:
print(f"\n❌ 执行过程中出现错误: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
# 设置事件循环策略(Windows兼容)
if sys.platform.startswith('win'):
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
# 运行主函数
asyncio.run(main())