| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227 |
- #!/usr/bin/env python3
- import os
- import json
- import requests
- import time
- import sys
- from datetime import datetime, timezone
- def monitor_workflows(github_token, repo, workflow_names, start_time):
- """监控工作流运行"""
- headers = {
- "Authorization": f"token {github_token}",
- "Accept": "application/vnd.github.v3+json"
- }
-
- monitoring_results = []
-
- for workflow_name in workflow_names:
- print(f"\n=== Monitoring {workflow_name} ===")
-
- try:
- workflow_id = get_workflow_id(github_token, repo, workflow_name)
- if not workflow_id:
- monitoring_results.append({
- "name": workflow_name,
- "status": "error",
- "conclusion": "error",
- "error": "Workflow not found"
- })
- continue
-
- # 查找开始时间后的运行
- runs = get_recent_runs(github_token, repo, workflow_id, start_time)
-
- if not runs:
- print(f"No runs found for {workflow_name} after {start_time}")
- # 尝试查找任何正在运行的工作流
- all_runs = get_all_runs(github_token, repo, workflow_id, 10)
- if all_runs:
- latest_run = all_runs[0]
- print(f"Using latest run instead: {latest_run['id']} created at {latest_run['created_at']}")
- result = monitor_single_run(github_token, repo, latest_run["id"], workflow_name)
- monitoring_results.append(result)
- else:
- monitoring_results.append({
- "name": workflow_name,
- "status": "not_found",
- "conclusion": "not_found",
- "error": f"No runs found after {start_time}"
- })
- else:
- # 监控找到的运行
- run_to_monitor = runs[0] # 取最新的一个
- print(f"Monitoring run: {run_to_monitor['id']}")
- result = monitor_single_run(github_token, repo, run_to_monitor["id"], workflow_name)
- monitoring_results.append(result)
-
- except Exception as e:
- print(f"Error monitoring {workflow_name}: {str(e)}")
- monitoring_results.append({
- "name": workflow_name,
- "status": "error",
- "conclusion": "error",
- "error": str(e)
- })
-
- return monitoring_results
- def get_all_runs(github_token, repo, workflow_id, per_page=10):
- """获取所有运行"""
- headers = {
- "Authorization": f"token {github_token}",
- "Accept": "application/vnd.github.v3+json"
- }
-
- url = f"https://api.github.com/repos/{repo}/actions/workflows/{workflow_id}/runs"
- params = {"per_page": per_page}
-
- response = requests.get(url, headers=headers, params=params)
- if response.status_code == 200:
- return response.json()["workflow_runs"]
- return []
- def get_recent_runs(github_token, repo, workflow_id, start_time):
- """获取开始时间后的运行"""
- all_runs = get_all_runs(github_token, repo, workflow_id, 10)
- start_time_dt = datetime.fromisoformat(start_time.replace('Z', '+00:00'))
-
- recent_runs = []
- for run in all_runs:
- run_time = datetime.fromisoformat(run["created_at"].replace('Z', '+00:00'))
- if run_time >= start_time_dt:
- recent_runs.append(run)
-
- return recent_runs
- def monitor_single_run(github_token, repo, run_id, workflow_name):
- """监控单个运行"""
- headers = {
- "Authorization": f"token {github_token}",
- "Accept": "application/vnd.github.v3+json"
- }
-
- max_wait_time = 1800 # 30分钟
- check_interval = 30
- start_time = time.time()
-
- print(f"Monitoring {workflow_name} (run {run_id})")
-
- while time.time() - start_time < max_wait_time:
- url = f"https://api.github.com/repos/{repo}/actions/runs/{run_id}"
- response = requests.get(url, headers=headers)
-
- if response.status_code != 200:
- print(f"Error getting run status: {response.status_code}")
- time.sleep(check_interval)
- continue
-
- run_data = response.json()
- status = run_data["status"]
- conclusion = run_data.get("conclusion")
-
- print(f" {workflow_name}: status={status}, conclusion={conclusion}")
-
- if status == "completed":
- result = {
- "name": workflow_name,
- "run_id": run_id,
- "status": status,
- "conclusion": conclusion,
- "html_url": run_data["html_url"],
- "created_at": run_data["created_at"],
- "updated_at": run_data["updated_at"]
- }
-
- if conclusion == "failure":
- result["failure_details"] = get_failure_logs(github_token, repo, run_id)
-
- return result
-
- time.sleep(check_interval)
-
- # 超时
- return {
- "name": workflow_name,
- "run_id": run_id,
- "status": "timed_out",
- "conclusion": "timed_out",
- "html_url": f"https://github.com/{repo}/actions/runs/{run_id}",
- "error": "Monitoring timed out after 30 minutes"
- }
- def get_failure_logs(github_token, repo, run_id):
- """获取失败日志"""
- headers = {
- "Authorization": f"token {github_token}",
- "Accept": "application/vnd.github.v3+json"
- }
-
- try:
- jobs_url = f"https://api.github.com/repos/{repo}/actions/runs/{run_id}/jobs"
- jobs_response = requests.get(jobs_url, headers=headers)
-
- failure_details = []
-
- if jobs_response.status_code == 200:
- jobs_data = jobs_response.json()["jobs"]
- for job in jobs_data:
- if job["conclusion"] == "failure":
- job_info = {
- "name": job["name"],
- "steps": []
- }
-
- for step in job["steps"]:
- if step["conclusion"] == "failure":
- job_info["steps"].append({
- "name": step["name"],
- "number": step["number"]
- })
-
- failure_details.append(job_info)
-
- return failure_details
- except Exception as e:
- print(f"Error getting failure logs: {e}")
- return []
- def get_workflow_id(github_token, repo, workflow_name):
- """获取工作流ID"""
- headers = {
- "Authorization": f"token {github_token}",
- "Accept": "application/vnd.github.v3+json"
- }
-
- url = f"https://api.github.com/repos/{repo}/actions/workflows"
- response = requests.get(url, headers=headers)
-
- if response.status_code == 200:
- workflows = response.json()["workflows"]
- for workflow in workflows:
- if workflow["name"] == workflow_name:
- return workflow["id"]
- return None
- def main():
- github_token = os.getenv("GITHUB_TOKEN")
- repo = os.getenv("GITHUB_REPOSITORY")
- workflows_json = os.getenv("TARGET_WORKFLOWS")
- start_time = sys.argv[1] if len(sys.argv) > 1 else datetime.now(timezone.utc).isoformat()
-
- if not all([github_token, repo, workflows_json]):
- raise ValueError("Missing required environment variables")
-
- workflows = json.loads(workflows_json)
- results = monitor_workflows(github_token, repo, workflows, start_time)
-
- with open("monitoring_results.json", "w") as f:
- json.dump(results, f, indent=2)
-
- print(f"\n=== Monitoring Summary ===")
- for result in results:
- status_icon = "✅" if result.get("conclusion") == "success" else "❌" if result.get("conclusion") == "failure" else "⚠️"
- print(f"{status_icon} {result['name']}: {result.get('conclusion', 'unknown')}")
- if __name__ == "__main__":
- main()
|