| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208 |
- # MCP_Server.py
- from fastmcp import FastMCP
- from ctypes import *
- import os
- import sys
- import subprocess
- import time
- import json
- from pathlib import Path
- # 记录已 read 过的文件,用于写入前校验
- _READ_CACHE = set()
- # 记录本会话由 agent 创建或成功写入过的文件,允许后续直接覆盖
- _WRITE_TRACK = set()
- mcp = FastMCP("服务器")
- @mcp.tool()
- def read_file(file_path: str) -> str:
- r"""
- 读取文件内容工具。
- args:
- file_path: 被读取的文件的地址 (使用linux系统,因此必须包含"./")
- returns:
- 成功: 返回"读取成功!"和文件内容
- 失败: 返回"ERROR:"和错误信息
- """
- try:
- with open(file_path, 'r', encoding='utf-8') as f:
- content = f.read()
- _READ_CACHE.add(os.path.abspath(file_path))
- return f"读取成功!文件内容:{content}"
- except Exception as e:
- return f"ERROR: {str(e)}"
- @mcp.tool()
- def write_file(file_path: str, content: str, mode: str = 'w') -> str:
- r"""
- 编辑文件内容工具。
- args:
- file_path: 被编辑的文件的地址 (使用linux系统,因此必须包含"./")
- content: 编辑的内容内容
- mode: 模式('默认覆盖模式'),'w'覆盖写入;'a'追加写入
- returns:
- 成功: 返回"编辑成功!"和文件路径
- 失败: 返回"ERROR:"和错误信息
- """
- try:
- abs_path = os.path.abspath(file_path)
- parent = Path(abs_path).parent
- # 动态工作目录限制
- session_root = os.environ.get('SESSION_WORK_DIR')
- if session_root:
- session_root_abs = os.path.abspath(session_root)
- # 允许写入: session_root 下的任意子路径
- if not abs_path.startswith(session_root_abs + os.sep):
- return (
- "ERROR: 写入被拒绝。文件不在本次会话工作目录内: "
- f"{file_path}. 允许根目录: {session_root_abs}/"
- )
- # 自动创建上级目录
- parent.mkdir(parents=True, exist_ok=True)
- # 覆盖策略:
- # 1. 若文件不存在 -> 直接写入并加入 _WRITE_TRACK
- # 2. 若文件存在且不是 append: 允许以下任一条件直接覆盖
- # a) 曾被 read 过 (兼容旧逻辑)
- # b) 曾在本会话通过 write_file 成功写入 (_WRITE_TRACK 命中)
- # 否则阻止并提示先 read 一次以确认外部文件语义
- exists_before = os.path.exists(abs_path)
- if exists_before and mode != 'a':
- if abs_path not in _READ_CACHE and abs_path not in _WRITE_TRACK:
- return (
- "ERROR: 试图覆盖已存在文件但之前未 read,且非本会话创建: "
- f"{file_path}. 如为外部文件请先调用 read_file;若期望直接覆盖,先执行一次 read 或首次写入。"
- )
- with open(abs_path, mode, encoding='utf-8') as f:
- f.write(content)
- # 标记写入跟踪(无论追加或覆盖)
- _WRITE_TRACK.add(abs_path)
- action = "追加" if (mode == 'a' and exists_before) else ("覆盖" if exists_before else "创建")
- return f"成功{action}文件: {file_path}"
- except Exception as e:
- return f"ERROR: {str(e)}"
- @mcp.tool()
- def run_bash(command: str) -> str:
- r"""
- 执行bash命令并返回输出的工具。
- args:
- command: 在bash中执行的命令
- returns:
- 成功: 返回"命令运行成功"和输入的命令以及对应的结果
- 失败: 返回"ERROR:"和错误信息
- """
- try:
- result = subprocess.run(
- command,
- shell=True,
- capture_output=True,
- text=True,
- timeout=30
- )
- if result.returncode == 0:
- return f"命令运行成功,[命令:{command}; {result.stdout}]"
- else:
- return f"ERROR: 命令:{command}; {result.stderr}"
- except subprocess.TimeoutExpired:
- return "ERROR: 命令执行超时"
- except Exception as e:
- return f"ERROR: {str(e)}"
- @mcp.tool()
- def run_shell(command: str) -> str:
- r"""
- 执行 shell 命令并返回结构化 JSON 字符串。
- 返回字段(JSON):
- command: 原始命令
- returncode: 进程退出码 (int 或 'TIMEOUT')
- stdout: 标准输出前若干字符 (全部保留, 若过长可由上层截断)
- stderr: 标准错误输出 (可能为空)
- duration_ms: 执行耗时 (毫秒, 仅非超时)
- compile_log: 若检测到 run_pika 输出中的 compile_log 行, 给出真实路径
- run_log: 若检测到 run_pika 输出中的 run_log 行, 给出真实路径
- note: 辅助说明
- 错误时也返回 JSON (不抛异常), 由上层 agent 自行判断。
- """
- start = time.time()
- try:
- proc = subprocess.run(
- command,
- shell=True,
- capture_output=True,
- text=True,
- timeout=120
- )
- dur_ms = int((time.time() - start) * 1000)
- stdout = proc.stdout or ""
- stderr = proc.stderr or ""
- compile_log_path = None
- run_log_path = None
- # 解析 run_pika.py 标准输出中的日志路径行
- # 典型行: [run_pika] compile_log: /abs/path/to/compile.log
- for line in stdout.splitlines():
- line_strip = line.strip()
- if 'compile_log:' in line_strip:
- # 拆分最后的路径部分
- try:
- compile_log_path = line_strip.split('compile_log:')[-1].strip()
- except Exception:
- pass
- if 'run_log:' in line_strip:
- try:
- run_log_path = line_strip.split('run_log:')[-1].strip()
- except Exception:
- pass
- payload = {
- "command": command,
- "returncode": proc.returncode,
- "stdout": stdout,
- "stderr": stderr,
- "duration_ms": dur_ms,
- "compile_log": compile_log_path,
- "run_log": run_log_path,
- "note": "ok" if proc.returncode == 0 else "non-zero returncode"
- }
- return json.dumps(payload, ensure_ascii=False)
- except subprocess.TimeoutExpired:
- payload = {
- "command": command,
- "returncode": "TIMEOUT",
- "stdout": "",
- "stderr": "",
- "duration_ms": 120000,
- "compile_log": None,
- "run_log": None,
- "note": "timeout"
- }
- return json.dumps(payload, ensure_ascii=False)
- except Exception as e:
- payload = {
- "command": command,
- "returncode": "EXCEPTION",
- "stdout": "",
- "stderr": str(e),
- "duration_ms": int((time.time() - start) * 1000),
- "compile_log": None,
- "run_log": None,
- "note": "raised"
- }
- return json.dumps(payload, ensure_ascii=False)
- if __name__ == "__main__":
- mcp.run()
|