| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351 |
- from quart import Quart, render_template, request, jsonify, Response
- from Client import PythonCTranspiler, AIMessage
- import asyncio
- import aiofiles
- from contextlib import asynccontextmanager
- from typing import Optional, AsyncIterator
- import re
- import json
- from pathlib import Path
- # 全局agent实例
- agent_instance: Optional[PythonCTranspiler] = None
- # 配置扫描路径
- LOG_DIR = None # 日志文件目录
- FILE_DIR = None # 文件扫描目录
- # 存储已读取的日志信息和历史内容
- file_history = []
- processed_files = set()
- app = Quart(__name__)
- # 在应用启动前执行
- @app.before_serving
- async def startup():
- print("=== Application Starting ===")
- global agent_instance
- global LOG_DIR
- global FILE_DIR
- agent_instance = await PythonCTranspiler().initialize()
- # 获取日志存储路径
- if LOG_DIR == None:
- LOG_DIR = agent_instance.session_log_dir
- print(f"Get log_dir:{LOG_DIR}")
- # 获取文件存储路径
- if FILE_DIR == None:
- FILE_DIR = agent_instance.session_work_dir
- print(f"Get file_dir:{FILE_DIR}")
-
- print("Agent initialized!")
- # 在应用关闭后执行
- @app.after_serving
- async def shutdown():
- print("=== Application Shutting Down ===")
- if agent_instance:
- await agent_instance.close()
- print("Agent closed!")
- # 构建网页界面
- @app.route('/')
- async def index():
- print("In index")
- return await render_template('index.html')
- @app.route('/chat', methods=['POST'])
- async def chat():
- print("In chat")
- try:
- # 检查agent是否已经初始化
- if not agent_instance or not getattr(agent_instance, 'initialized', False):
- return jsonify({'error': 'Agent未初始化,请稍后重试'}), 503
- data = await request.get_json()
- user_input = data.get('message', '').strip()
-
- if not user_input:
- return jsonify({'error': '输入不能为空'}), 400
-
- if not agent_instance:
- return jsonify({'error': 'Agent未初始化'}), 500
-
- # 使用全局agent实例处理输入
- state = await agent_instance.process_input(user_input)
-
- # 提取最终的AI响应
- if state["messages"] and isinstance(state["messages"][-1], AIMessage):
- agent_response = state['messages'][-1].content
-
- return jsonify({
- 'user_input': user_input,
- 'agent_response': agent_response,
- 'success': True
- })
-
- except Exception as e:
- return jsonify({'error': f'处理请求时出错: {str(e)}'}), 500
- # 实时获取日志内容
- @app.route('/scan_logs', methods=['GET'])
- async def scan_logs():
- print("scan_logs start")
- try:
- path = LOG_DIR
- print(f"log_dir:{path}")
- # 检查路径是否存在
- if not path.exists() or path is None:
- return jsonify({
- 'success': False,
- 'error': '日志目录未创建'
- }), 400
- """SSE流式传输新文件内容"""
- async def generate():
- n_value = 1
- max_n = 0 # 跟踪最大n值
- encoding = 'utf-8'
- while True:
- # 使用正则表达式匹配日志文件名格式:*****_LLM[n_value].log
- log_pattern = re.compile(fr"_LLM{n_value}\.log$")
- new_file = None
- # 获取匹配的日志文件
- for file in path.iterdir():
- if file.is_file() and log_pattern.search(file.name):
- new_file = file
- break
- # 检查是否存在完成文件
- completion_file_path = path / "complate_message.log"
-
- if completion_file_path.exists() and new_file is None:
- try:
- completion_file_content = completion_file_path.read_text(encoding=encoding)
- data = {
- 'filename': "complate_message.log",
- 'content': completion_file_content,
- 'n': max_n + 1,
- 'is_completion': True
- }
- data_json = json.dumps(data, ensure_ascii=False)
- print(f"发送完成文件: {data_json}")
- yield f"data: {data_json}\n\n"
- print("检测到完成文件: complate_message.log")
- break # 完成文件后退出循环
- except Exception as e:
- data = {
- 'filename': "complate_message.log",
- 'content': f'读取完成文件时出错: {str(e)}',
- 'n': max_n + 1,
- 'is_completion': True
- }
- data_json = json.dumps(data, ensure_ascii=False)
- yield f"data: {data_json}\n\n"
- break
- elif new_file:
- try:
- # 打开并读取日志文件
- file_content = new_file.read_text(encoding=encoding)
-
- # 将JSON字符串转换为Python对象
- file_info = json.loads(file_content)
- # 过滤日志内容,只保留需要的字段
- filtered_info = {
- 'model': file_info.get('model'),
- 'parameters': file_info.get('parameters'),
- 'current_message': file_info.get('current_message'),
- 'response': file_info.get('response')
- }
-
- # 移除空的字段
- filtered_info = {k: v for k, v in filtered_info.items() if v is not None}
- # 更新max_n
- if n_value > max_n:
- max_n = n_value
- # 发送SSE事件 - 修正:发送完整的data结构
- data = {
- 'filename': new_file.name,
- 'content': filtered_info,
- 'n': n_value,
- 'is_completion': False
- }
- data_json = json.dumps(data, ensure_ascii=False)
- yield f"data: {data_json}\n\n"
-
- # n_value加一
- n_value += 1
- except Exception as e:
- print(f"处理文件出错: {str(e)}")
- data = {
- 'error': f'处理文件错误: {str(e)}',
- 'filename': new_file.name
- }
- data_json = json.dumps(data, ensure_ascii=False) # 修正:定义data_json
- yield f"data: {data_json}\n\n"
- n_value += 1
-
- else:
- # 如果没有找到新文件,发送心跳保持连接
- yield "data: {\"heartbeat\": true}\n\n"
-
- # 每2秒检查一次
- await asyncio.sleep(2)
-
- return Response(
- generate(),
- mimetype='text/event-stream',
- headers={
- 'Cache-Control': 'no-cache',
- 'Connection': 'keep-alive',
- 'Access-Control-Allow-Origin': '*',
- }
- )
- except Exception as e:
- print(f"scan_logs顶层错误: {str(e)}")
- return jsonify({
- 'success': False,
- 'error': f'服务器错误: {str(e)}'
- }), 500
- # 获取file_create内文件内容
- @app.route('/refresh_files', methods=['POST'])
- async def refresh_files():
- try:
- files_data = []
-
- # 确保文件目录存在
- if not FILE_DIR.exists():
- return jsonify({'success': False, 'error': f'文件目录不存在: {FILE_DIR}'})
-
- # 递归扫描目录
- await scan_directory(FILE_DIR, files_data)
-
- return jsonify({'success': True, 'files': files_data})
-
- except Exception as e:
- return jsonify({'success': False, 'error': str(e)})
- async def scan_directory(directory: Path, files_data: list, depth=0):
- """递归扫描目录"""
- try:
- # 首先添加当前目录的信息
- if depth > 0: # 不显示根目录
- files_data.append({
- 'type': 'directory',
- 'name': directory.name,
- 'path': str(directory.relative_to(FILE_DIR)),
- 'depth': depth,
- 'content': f"目录: {directory.name}"
- })
-
- # 遍历目录中的所有项目
- for item in directory.iterdir():
- if item.is_dir():
- # 递归扫描子目录
- await scan_directory(item, files_data, depth + 1)
- elif item.is_file():
- # 处理文件
- await process_file(item, files_data, depth + 1)
-
- except PermissionError:
- files_data.append({
- 'type': 'error',
- 'name': directory.name,
- 'path': str(directory.relative_to(FILE_DIR)),
- 'depth': depth,
- 'content': '[权限不足,无法访问此目录]'
- })
- async def process_file(file_path: Path, files_data: list, depth: int):
- """处理单个文件"""
- try:
- # 检查文件是否为二进制文件
- if await is_binary_file(file_path):
- content = '[二进制文件,无法显示内容]'
- else:
- # 尝试读取文件内容
- try:
- async with aiofiles.open(file_path, 'r', encoding='utf-8') as f:
- content = await f.read()
- except UnicodeDecodeError:
- # 如果 UTF-8 解码失败,尝试其他编码
- try:
- async with aiofiles.open(file_path, 'r', encoding='latin-1') as f:
- content = await f.read()
- except Exception as e:
- content = f'[读取文件时出错: {str(e)}]'
- except Exception as e:
- content = f'[读取文件时出错: {str(e)}]'
-
- files_data.append({
- 'type': 'file',
- 'name': file_path.name,
- 'path': str(file_path.relative_to(FILE_DIR)),
- 'depth': depth,
- 'content': content
- })
-
- except Exception as e:
- files_data.append({
- 'type': 'error',
- 'name': file_path.name,
- 'path': str(file_path.relative_to(FILE_DIR)),
- 'depth': depth,
- 'content': f'[处理文件时出错: {str(e)}]'
- })
- async def is_binary_file(file_path: Path) -> bool:
- """检查文件是否为二进制文件"""
- try:
- # 检查文件扩展名(常见的文本文件扩展名)
- text_extensions = {'.txt', '.py', '.js', '.html', '.css', '.json', '.xml',
- '.csv', '.md', '.rst', '.yml', '.yaml', '.ini', '.cfg',
- '.conf', '.log', '.sh', '.bat', '.ps1', '.java', '.c',
- '.cpp', '.h', '.hpp', '.cs', '.php', '.rb', '.go', '.rs'}
-
- if file_path.suffix.lower() in text_extensions:
- return False
-
- # 进一步检查文件内容
- async with aiofiles.open(file_path, 'rb') as f:
- chunk = await f.read(1024)
-
- if not chunk:
- return False
-
- # 检查是否包含空字节(二进制文件的标志)
- if b'\0' in chunk:
- return True
-
- # 检查文本字符比例
- text_characters = bytearray({7, 8, 9, 10, 12, 13, 27} | set(range(0x20, 0x100)) - {0x7f})
- non_text = chunk.translate(None, text_characters)
- return float(len(non_text)) / len(chunk) > 0.3
-
- except Exception:
- return True
- def is_text_file(file_path):
- """简单判断是否为文本文件"""
- text_extensions = {'.txt', '.log', '.py', '.js', '.html', '.css', '.json', '.xml', '.csv', '.md'}
- return file_path.suffix.lower() in text_extensions
- if __name__ == '__main__':
- print("=== SCRIPT START ===")
- app.run(debug=True, host="0.0.0.0", port=5001)
- print("=== SCRIPT END ===")
-
|