trigger_workflows_direct.py 3.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. #!/usr/bin/env python3
  2. import os
  3. import json
  4. import requests
  5. import time
  6. from datetime import datetime, timezone
  7. def trigger_workflow_directly(workflow_name, github_token, repo):
  8. """直接触发工作流"""
  9. headers = {
  10. "Authorization": f"token {github_token}",
  11. "Accept": "application/vnd.github.v3+json"
  12. }
  13. # 首先获取工作流ID
  14. workflow_id = get_workflow_id(github_token, repo, workflow_name)
  15. if not workflow_id:
  16. print(f"✗ Workflow '{workflow_name}' not found")
  17. return False
  18. # 使用 workflow_dispatch API 直接触发
  19. dispatch_url = f"https://api.github.com/repos/{repo}/actions/workflows/{workflow_id}/dispatches" # 🔧 修复:添加这行
  20. # 根据工作流实际定义的输入参数进行调整
  21. dispatch_data = {
  22. "ref": "master",
  23. "inputs": {
  24. "trigger_type": "scheduled" # 使用工作流实际定义的输入参数
  25. }
  26. }
  27. try:
  28. print(f"Triggering workflow: {workflow_name} (ID: {workflow_id})")
  29. response = requests.post(dispatch_url, headers=headers, json=dispatch_data) # 🔧 修复:现在 dispatch_url 已定义
  30. if response.status_code == 204:
  31. print(f"✓ Successfully triggered workflow: {workflow_name}")
  32. return True
  33. else:
  34. print(f"✗ Failed to trigger {workflow_name}: {response.status_code}")
  35. print(f"Response: {response.text}")
  36. return False
  37. except Exception as e:
  38. print(f"✗ Error triggering {workflow_name}: {str(e)}")
  39. return False
  40. def get_workflow_id(github_token, repo, workflow_name):
  41. """获取工作流ID"""
  42. headers = {
  43. "Authorization": f"token {github_token}",
  44. "Accept": "application/vnd.github.v3+json"
  45. }
  46. url = f"https://api.github.com/repos/{repo}/actions/workflows"
  47. response = requests.get(url, headers=headers)
  48. if response.status_code == 200:
  49. workflows = response.json()["workflows"]
  50. for workflow in workflows:
  51. if workflow["name"] == workflow_name:
  52. return workflow["id"]
  53. print(f"Available workflows: {[w['name'] for w in workflows]}")
  54. else:
  55. print(f"Failed to get workflows: {response.status_code}")
  56. return None
  57. def main():
  58. github_token = os.getenv("GITHUB_TOKEN")
  59. repo = os.getenv("GITHUB_REPOSITORY")
  60. workflows_json = os.getenv("TARGET_WORKFLOWS")
  61. if not all([github_token, repo, workflows_json]):
  62. raise ValueError("Missing required environment variables")
  63. try:
  64. workflows = json.loads(workflows_json)
  65. except json.JSONDecodeError:
  66. raise ValueError("Invalid TARGET_WORKFLOWS JSON format")
  67. print(f"Directly triggering {len(workflows)} workflows...")
  68. success_count = 0
  69. for i, workflow in enumerate(workflows):
  70. success = trigger_workflow_directly(workflow, github_token, repo)
  71. if success:
  72. success_count += 1
  73. # 在触发之间等待
  74. if i < len(workflows) - 1:
  75. print("Waiting 10 seconds before next trigger...")
  76. time.sleep(10)
  77. print(f"Triggering completed: {success_count}/{len(workflows)} successful")
  78. if __name__ == "__main__":
  79. main()