runner.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. #!/usr/bin/env python3
  2. """
  3. workflow_engine/runner.py - 轻量状态机调度器
  4. 用于编排 step1~step5 的全自动开发闭环
  5. """
  6. import json
  7. import os
  8. import sys
  9. from datetime import datetime
  10. from pathlib import Path
  11. BASE_DIR = Path(__file__).parent.parent
  12. STATE_FILE = BASE_DIR / "workflow_engine/state/current_step.json"
  13. ARTIFACTS_DIR = BASE_DIR / "workflow_engine/artifacts"
  14. PROMPTS_DIR = BASE_DIR
  15. STEP_MAP = {
  16. "step1": "step1_需求输入.jsonl",
  17. "step2": "step2_执行计划.jsonl",
  18. "step3": "step3_实施变更.jsonl",
  19. "step4": "step4_验证发布.jsonl",
  20. "step5": "step5_总控与循环.jsonl",
  21. }
  22. STEP_FLOW = ["step1", "step2", "step3", "step4", "step5"]
  23. def load_state() -> dict:
  24. if STATE_FILE.exists():
  25. return json.loads(STATE_FILE.read_text(encoding="utf-8"))
  26. return {"run_id": None, "step": None, "status": "idle"}
  27. def save_state(state: dict):
  28. STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
  29. STATE_FILE.write_text(json.dumps(state, ensure_ascii=False, indent=2), encoding="utf-8")
  30. def get_run_id() -> str:
  31. return datetime.now().strftime("%Y%m%dT%H%M%S")
  32. def get_artifact_path(run_id: str, step: str, ext: str = "json") -> Path:
  33. path = ARTIFACTS_DIR / run_id
  34. path.mkdir(parents=True, exist_ok=True)
  35. return path / f"{step}.{ext}"
  36. def next_step(current: str) -> str | None:
  37. """返回下一步,step5 后返回 None"""
  38. try:
  39. idx = STEP_FLOW.index(current)
  40. return STEP_FLOW[idx + 1] if idx + 1 < len(STEP_FLOW) else None
  41. except ValueError:
  42. return None
  43. def run_step(step: str, state: dict, input_data: dict = None):
  44. """执行单个步骤(实际调用模型的占位)"""
  45. prompt_file = PROMPTS_DIR / STEP_MAP.get(step, "")
  46. if not prompt_file.exists():
  47. print(f"[ERROR] Prompt file not found: {prompt_file}")
  48. return None
  49. run_id = state.get("run_id") or get_run_id()
  50. # 更新状态为 running
  51. state.update({"run_id": run_id, "step": step, "status": "running"})
  52. save_state(state)
  53. print(f"[RUN] {step} | run_id={run_id}")
  54. print(f" prompt: {prompt_file.name}")
  55. # === 这里是模型调用占位 ===
  56. # 实际实现时替换为:
  57. # result = call_llm(prompt_file.read_text(), input_data)
  58. result = {
  59. "step": step,
  60. "status": "success", # 模拟成功
  61. "output": f"[MOCK] {step} completed",
  62. "timestamp": datetime.now().isoformat()
  63. }
  64. # ========================
  65. # 保存产物
  66. artifact_path = get_artifact_path(run_id, step)
  67. artifact_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
  68. print(f" artifact: {artifact_path}")
  69. return result
  70. def dispatch():
  71. """根据当前状态分发到下一步"""
  72. state = load_state()
  73. target = state.get("target_step")
  74. if target == "done":
  75. print("[DONE] 所有任务完成")
  76. return
  77. if target:
  78. # 有明确的目标步骤(来自 step5 的指令)
  79. run_step(target, state)
  80. else:
  81. print("[IDLE] 无待执行任务,使用 'run --step 1' 启动")
  82. def start_workflow(input_file: str = None):
  83. """从 step1 启动新的工作流"""
  84. run_id = get_run_id()
  85. state = {"run_id": run_id, "step": None, "status": "pending"}
  86. input_data = None
  87. if input_file and Path(input_file).exists():
  88. input_data = json.loads(Path(input_file).read_text(encoding="utf-8"))
  89. print(f"[START] 新工作流 run_id={run_id}")
  90. # 依次执行 step1 -> step5
  91. for step in STEP_FLOW:
  92. result = run_step(step, state, input_data)
  93. if not result:
  94. state["status"] = "error"
  95. save_state(state)
  96. return
  97. # step4 后检查验证结果
  98. if step == "step4":
  99. verify_status = result.get("verify", {}).get("status", "success")
  100. state["verify"] = {"status": verify_status}
  101. # step5 决定下一步
  102. if step == "step5":
  103. # 模拟 step5 的决策逻辑
  104. if state.get("verify", {}).get("status") == "failed":
  105. state["target_step"] = "step2" # 失败回跳
  106. state["status"] = "retry"
  107. print(f"[RETRY] 验证失败,返回 step2 重规划")
  108. else:
  109. state["target_step"] = "done"
  110. state["status"] = "completed"
  111. print(f"[COMPLETE] 工作流完成")
  112. save_state(state)
  113. # 如果需要回跳,递归处理(带熔断)
  114. if state.get("target_step") == "step2":
  115. retry_count = state.get("retry_count", 0) + 1
  116. if retry_count > 3:
  117. print(f"[FATAL] 超过最大重试次数")
  118. state["status"] = "fatal_error"
  119. save_state(state)
  120. return
  121. state["retry_count"] = retry_count
  122. # 从 step2 重新开始
  123. for retry_step in STEP_FLOW[1:]: # step2 onwards
  124. run_step(retry_step, state)
  125. break
  126. def main():
  127. if len(sys.argv) < 2:
  128. print("Usage:")
  129. print(" python runner.py start [input.json] - 启动新工作流")
  130. print(" python runner.py dispatch - 根据状态分发")
  131. print(" python runner.py status - 查看当前状态")
  132. return
  133. cmd = sys.argv[1]
  134. if cmd == "start":
  135. input_file = sys.argv[2] if len(sys.argv) > 2 else None
  136. start_workflow(input_file)
  137. elif cmd == "dispatch":
  138. dispatch()
  139. elif cmd == "status":
  140. state = load_state()
  141. print(json.dumps(state, ensure_ascii=False, indent=2))
  142. else:
  143. print(f"Unknown command: {cmd}")
  144. if __name__ == "__main__":
  145. main()