monitor_workflows.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. #!/usr/bin/env python3
  2. import os
  3. import json
  4. import requests
  5. import time
  6. import sys
  7. from datetime import datetime, timezone
  8. def monitor_workflows(github_token, repo, workflow_names, start_time):
  9. """监控工作流运行"""
  10. headers = {
  11. "Authorization": f"token {github_token}",
  12. "Accept": "application/vnd.github.v3+json"
  13. }
  14. monitoring_results = []
  15. for workflow_name in workflow_names:
  16. print(f"\n=== Monitoring {workflow_name} ===")
  17. try:
  18. workflow_id = get_workflow_id(github_token, repo, workflow_name)
  19. if not workflow_id:
  20. monitoring_results.append({
  21. "name": workflow_name,
  22. "status": "error",
  23. "conclusion": "error",
  24. "error": "Workflow not found"
  25. })
  26. continue
  27. # 查找开始时间后的运行
  28. runs = get_recent_runs(github_token, repo, workflow_id, start_time)
  29. if not runs:
  30. print(f"No runs found for {workflow_name} after {start_time}")
  31. # 尝试查找任何正在运行的工作流
  32. all_runs = get_all_runs(github_token, repo, workflow_id, 10)
  33. if all_runs:
  34. latest_run = all_runs[0]
  35. print(f"Using latest run instead: {latest_run['id']} created at {latest_run['created_at']}")
  36. result = monitor_single_run(github_token, repo, latest_run["id"], workflow_name)
  37. monitoring_results.append(result)
  38. else:
  39. monitoring_results.append({
  40. "name": workflow_name,
  41. "status": "not_found",
  42. "conclusion": "not_found",
  43. "error": f"No runs found after {start_time}"
  44. })
  45. else:
  46. # 监控找到的运行
  47. run_to_monitor = runs[0] # 取最新的一个
  48. print(f"Monitoring run: {run_to_monitor['id']}")
  49. result = monitor_single_run(github_token, repo, run_to_monitor["id"], workflow_name)
  50. monitoring_results.append(result)
  51. except Exception as e:
  52. print(f"Error monitoring {workflow_name}: {str(e)}")
  53. monitoring_results.append({
  54. "name": workflow_name,
  55. "status": "error",
  56. "conclusion": "error",
  57. "error": str(e)
  58. })
  59. return monitoring_results
  60. def get_all_runs(github_token, repo, workflow_id, per_page=10):
  61. """获取所有运行"""
  62. headers = {
  63. "Authorization": f"token {github_token}",
  64. "Accept": "application/vnd.github.v3+json"
  65. }
  66. url = f"https://api.github.com/repos/{repo}/actions/workflows/{workflow_id}/runs"
  67. params = {"per_page": per_page}
  68. response = requests.get(url, headers=headers, params=params)
  69. if response.status_code == 200:
  70. return response.json()["workflow_runs"]
  71. return []
  72. def get_recent_runs(github_token, repo, workflow_id, start_time):
  73. """获取开始时间后的运行"""
  74. all_runs = get_all_runs(github_token, repo, workflow_id, 10)
  75. start_time_dt = datetime.fromisoformat(start_time.replace('Z', '+00:00'))
  76. recent_runs = []
  77. for run in all_runs:
  78. run_time = datetime.fromisoformat(run["created_at"].replace('Z', '+00:00'))
  79. if run_time >= start_time_dt:
  80. recent_runs.append(run)
  81. return recent_runs
  82. def monitor_single_run(github_token, repo, run_id, workflow_name):
  83. """监控单个运行"""
  84. headers = {
  85. "Authorization": f"token {github_token}",
  86. "Accept": "application/vnd.github.v3+json"
  87. }
  88. max_wait_time = 1800 # 30分钟
  89. check_interval = 30
  90. start_time = time.time()
  91. print(f"Monitoring {workflow_name} (run {run_id})")
  92. while time.time() - start_time < max_wait_time:
  93. url = f"https://api.github.com/repos/{repo}/actions/runs/{run_id}"
  94. response = requests.get(url, headers=headers)
  95. if response.status_code != 200:
  96. print(f"Error getting run status: {response.status_code}")
  97. time.sleep(check_interval)
  98. continue
  99. run_data = response.json()
  100. status = run_data["status"]
  101. conclusion = run_data.get("conclusion")
  102. print(f" {workflow_name}: status={status}, conclusion={conclusion}")
  103. if status == "completed":
  104. result = {
  105. "name": workflow_name,
  106. "run_id": run_id,
  107. "status": status,
  108. "conclusion": conclusion,
  109. "html_url": run_data["html_url"],
  110. "created_at": run_data["created_at"],
  111. "updated_at": run_data["updated_at"]
  112. }
  113. if conclusion == "failure":
  114. result["failure_details"] = get_failure_logs(github_token, repo, run_id)
  115. return result
  116. time.sleep(check_interval)
  117. # 超时
  118. return {
  119. "name": workflow_name,
  120. "run_id": run_id,
  121. "status": "timed_out",
  122. "conclusion": "timed_out",
  123. "html_url": f"https://github.com/{repo}/actions/runs/{run_id}",
  124. "error": "Monitoring timed out after 30 minutes"
  125. }
  126. def get_failure_logs(github_token, repo, run_id):
  127. """获取失败日志"""
  128. headers = {
  129. "Authorization": f"token {github_token}",
  130. "Accept": "application/vnd.github.v3+json"
  131. }
  132. try:
  133. jobs_url = f"https://api.github.com/repos/{repo}/actions/runs/{run_id}/jobs"
  134. jobs_response = requests.get(jobs_url, headers=headers)
  135. failure_details = []
  136. if jobs_response.status_code == 200:
  137. jobs_data = jobs_response.json()["jobs"]
  138. for job in jobs_data:
  139. if job["conclusion"] == "failure":
  140. job_info = {
  141. "name": job["name"],
  142. "steps": []
  143. }
  144. for step in job["steps"]:
  145. if step["conclusion"] == "failure":
  146. job_info["steps"].append({
  147. "name": step["name"],
  148. "number": step["number"]
  149. })
  150. failure_details.append(job_info)
  151. return failure_details
  152. except Exception as e:
  153. print(f"Error getting failure logs: {e}")
  154. return []
  155. def get_workflow_id(github_token, repo, workflow_name):
  156. """获取工作流ID"""
  157. headers = {
  158. "Authorization": f"token {github_token}",
  159. "Accept": "application/vnd.github.v3+json"
  160. }
  161. url = f"https://api.github.com/repos/{repo}/actions/workflows"
  162. response = requests.get(url, headers=headers)
  163. if response.status_code == 200:
  164. workflows = response.json()["workflows"]
  165. for workflow in workflows:
  166. if workflow["name"] == workflow_name:
  167. return workflow["id"]
  168. return None
  169. def main():
  170. github_token = os.getenv("GITHUB_TOKEN")
  171. repo = os.getenv("GITHUB_REPOSITORY")
  172. workflows_json = os.getenv("TARGET_WORKFLOWS")
  173. start_time = sys.argv[1] if len(sys.argv) > 1 else datetime.now(timezone.utc).isoformat()
  174. if not all([github_token, repo, workflows_json]):
  175. raise ValueError("Missing required environment variables")
  176. workflows = json.loads(workflows_json)
  177. results = monitor_workflows(github_token, repo, workflows, start_time)
  178. with open("monitoring_results.json", "w") as f:
  179. json.dump(results, f, indent=2)
  180. print(f"\n=== Monitoring Summary ===")
  181. for result in results:
  182. status_icon = "✅" if result.get("conclusion") == "success" else "❌" if result.get("conclusion") == "failure" else "⚠️"
  183. print(f"{status_icon} {result['name']}: {result.get('conclusion', 'unknown')}")
  184. if __name__ == "__main__":
  185. main()