scheduler.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420
  1. # coding=utf-8
  2. """
  3. 时间线调度器
  4. 统一的时间线调度系统,替代分散的 push_window / analysis_window 逻辑。
  5. 基于 periods + day_plans + week_map 模型实现灵活的时间段调度。
  6. """
  7. import copy
  8. import re
  9. from dataclasses import dataclass
  10. from typing import Any, Callable, Dict, List, Optional
  11. from datetime import datetime
  12. @dataclass
  13. class ResolvedSchedule:
  14. """当前时间解析后的调度结果"""
  15. period_key: Optional[str] # 命中的 period key,None=默认配置
  16. period_name: Optional[str] # 命中的展示名称
  17. day_plan: str # 当前日计划
  18. collect: bool
  19. analyze: bool
  20. push: bool
  21. report_mode: str
  22. ai_mode: str
  23. once_analyze: bool
  24. once_push: bool
  25. class Scheduler:
  26. """
  27. 时间线调度器
  28. 根据 timeline 配置(periods + day_plans + week_map)解析当前时间应执行的行为。
  29. 支持:
  30. - 预设模板 + 自定义模式
  31. - 跨日时间段(如 22:00-07:00)
  32. - 每天 / 每周差异化配置
  33. - once 执行去重(analyze / push 独立维度)
  34. - 冲突策略(error_on_overlap / last_wins)
  35. """
  36. def __init__(
  37. self,
  38. schedule_config: Dict[str, Any],
  39. timeline_data: Dict[str, Any],
  40. storage_backend: Any,
  41. get_time_func: Callable[[], datetime],
  42. ):
  43. """
  44. 初始化调度器
  45. Args:
  46. schedule_config: config.yaml 中的 schedule 段(含 preset 等)
  47. timeline_data: timeline.yaml 的完整数据
  48. storage_backend: 存储后端(用于 once 去重记录)
  49. get_time_func: 获取当前时间的函数(应使用配置的时区)
  50. """
  51. self.schedule_config = schedule_config
  52. self.storage = storage_backend
  53. self.get_time = get_time_func
  54. self.enabled = schedule_config.get("enabled", True)
  55. # 加载并构建最终 timeline
  56. self.timeline = self._build_timeline(schedule_config, timeline_data)
  57. if self.enabled:
  58. self._validate_timeline(self.timeline)
  59. def _build_timeline(
  60. self,
  61. schedule_config: Dict[str, Any],
  62. timeline_data: Dict[str, Any],
  63. ) -> Dict[str, Any]:
  64. """从 preset 或 custom 构建 timeline"""
  65. preset = schedule_config.get("preset", "always_on")
  66. if preset == "custom":
  67. timeline = copy.deepcopy(timeline_data.get("custom", {}))
  68. else:
  69. presets = timeline_data.get("presets", {})
  70. if preset not in presets:
  71. raise ValueError(
  72. f"未知的预设模板: '{preset}',可选值: "
  73. f"{', '.join(presets.keys())}, custom"
  74. )
  75. timeline = copy.deepcopy(presets[preset])
  76. # 确保 periods 是 dict(可能为空 {})
  77. if timeline.get("periods") is None:
  78. timeline["periods"] = {}
  79. return timeline
  80. def resolve(self) -> ResolvedSchedule:
  81. """
  82. 解析当前时间对应的调度配置
  83. Returns:
  84. ResolvedSchedule 包含当前应执行的行为
  85. """
  86. if not self.enabled:
  87. # 调度未启用时返回默认的全功能配置
  88. return ResolvedSchedule(
  89. period_key=None,
  90. period_name=None,
  91. day_plan="disabled",
  92. collect=True,
  93. analyze=True,
  94. push=True,
  95. report_mode="current",
  96. ai_mode="follow_report",
  97. once_analyze=False,
  98. once_push=False,
  99. )
  100. now = self.get_time()
  101. weekday = now.isoweekday() # 1=周一 ... 7=周日
  102. now_hhmm = now.strftime("%H:%M")
  103. # 查找当天的日计划
  104. day_plan_key = self.timeline["week_map"].get(weekday)
  105. if day_plan_key is None:
  106. raise ValueError(f"week_map 缺少星期映射: {weekday}")
  107. day_plan = self.timeline["day_plans"].get(day_plan_key)
  108. if day_plan is None:
  109. raise ValueError(f"week_map[{weekday}] 引用了不存在的 day_plan: {day_plan_key}")
  110. # 查找当前活跃的时间段
  111. period_key = self._find_active_period(now_hhmm, day_plan)
  112. # 合并默认配置和时间段配置
  113. merged = self._merge_with_default(period_key)
  114. # 打印调度日志
  115. weekday_names = {1: "一", 2: "二", 3: "三", 4: "四", 5: "五", 6: "六", 7: "日"}
  116. period_display = "默认配置(未命中任何时间段)"
  117. if period_key:
  118. period_cfg = self.timeline["periods"][period_key]
  119. period_name = period_cfg.get("name", period_key)
  120. start = period_cfg.get("start", "?")
  121. end = period_cfg.get("end", "?")
  122. period_display = f"{period_name} ({start}-{end})"
  123. print(f"[调度] 星期{weekday_names.get(weekday, '?')},日计划: {day_plan_key}")
  124. print(f"[调度] 当前时间段: {period_display}")
  125. resolved = ResolvedSchedule(
  126. period_key=period_key,
  127. period_name=(
  128. self.timeline["periods"][period_key].get("name")
  129. if period_key
  130. else None
  131. ),
  132. day_plan=day_plan_key,
  133. collect=merged.get("collect", True),
  134. analyze=merged.get("analyze", False),
  135. push=merged.get("push", False),
  136. report_mode=merged.get("report_mode", "current"),
  137. ai_mode=self._resolve_ai_mode(merged),
  138. once_analyze=merged.get("once", {}).get("analyze", False),
  139. once_push=merged.get("once", {}).get("push", False),
  140. )
  141. # 打印行为摘要
  142. actions = []
  143. if resolved.collect:
  144. actions.append("采集")
  145. if resolved.analyze:
  146. actions.append(f"分析(AI:{resolved.ai_mode})")
  147. if resolved.push:
  148. actions.append(f"推送(模式:{resolved.report_mode})")
  149. print(f"[调度] 行为: {', '.join(actions) if actions else '无'}")
  150. return resolved
  151. def _find_active_period(
  152. self, now_hhmm: str, day_plan: Dict[str, Any]
  153. ) -> Optional[str]:
  154. """
  155. 查找当前时间命中的活跃时间段
  156. Args:
  157. now_hhmm: 当前时间 HH:MM
  158. day_plan: 日计划配置
  159. Returns:
  160. 命中的 period key,或 None
  161. """
  162. candidates = []
  163. for idx, key in enumerate(day_plan.get("periods", [])):
  164. period = self.timeline["periods"].get(key)
  165. if period is None:
  166. continue
  167. if self._in_range(now_hhmm, period["start"], period["end"]):
  168. candidates.append((idx, key))
  169. if not candidates:
  170. return None
  171. # 检查冲突
  172. if len(candidates) > 1:
  173. policy = self.timeline.get("overlap", {}).get("policy", "error_on_overlap")
  174. conflicting = [c[1] for c in candidates]
  175. if policy == "error_on_overlap":
  176. raise ValueError(
  177. f"检测到时间段重叠冲突: {', '.join(conflicting)} 在 {now_hhmm} 重叠。"
  178. f"请调整时间段配置,或将 overlap.policy 设为 'last_wins'"
  179. )
  180. # last_wins:输出重叠警告,列表中后面的优先
  181. print(
  182. f"[调度] 检测到时间段重叠: {', '.join(conflicting)} 在 {now_hhmm} 重叠"
  183. )
  184. winner = candidates[-1]
  185. print(f"[调度] 冲突策略: last_wins,生效时间段: {winner[1]}")
  186. return winner[1]
  187. return candidates[0][1]
  188. @staticmethod
  189. def _in_range(now_hhmm: str, start: str, end: str) -> bool:
  190. """
  191. 检查时间是否在范围内(支持跨日)
  192. Args:
  193. now_hhmm: 当前时间 HH:MM
  194. start: 开始时间 HH:MM
  195. end: 结束时间 HH:MM
  196. Returns:
  197. 是否在范围内
  198. """
  199. if start <= end:
  200. # 正常范围,如 08:00-09:00
  201. return start <= now_hhmm <= end
  202. else:
  203. # 跨日范围,如 22:00-07:00
  204. return now_hhmm >= start or now_hhmm <= end
  205. def _merge_with_default(self, period_key: Optional[str]) -> Dict[str, Any]:
  206. """合并默认配置和时间段配置"""
  207. base = copy.deepcopy(self.timeline.get("default", {}))
  208. if not period_key:
  209. return base
  210. period = copy.deepcopy(self.timeline["periods"][period_key])
  211. # 先合并 once 子对象
  212. merged_once = dict(base.get("once", {}))
  213. merged_once.update(period.get("once", {}))
  214. # 标量字段覆盖
  215. base.update(period)
  216. # 恢复合并后的 once
  217. if merged_once:
  218. base["once"] = merged_once
  219. return base
  220. @staticmethod
  221. def _resolve_ai_mode(cfg: Dict[str, Any]) -> str:
  222. """解析最终的 AI 模式"""
  223. ai_mode = cfg.get("ai_mode", "follow_report")
  224. if ai_mode == "follow_report":
  225. return cfg.get("report_mode", "current")
  226. return ai_mode
  227. def already_executed(self, period_key: str, action: str, date_str: str) -> bool:
  228. """
  229. 检查指定时间段的某个 action 今天是否已执行
  230. Args:
  231. period_key: 时间段 key
  232. action: 动作类型 (analyze / push)
  233. date_str: 日期 YYYY-MM-DD
  234. Returns:
  235. 是否已执行
  236. """
  237. return self.storage.has_period_executed(date_str, period_key, action)
  238. def record_execution(self, period_key: str, action: str, date_str: str) -> None:
  239. """
  240. 记录时间段的 action 执行
  241. Args:
  242. period_key: 时间段 key
  243. action: 动作类型 (analyze / push)
  244. date_str: 日期 YYYY-MM-DD
  245. """
  246. self.storage.record_period_execution(date_str, period_key, action)
  247. # ========================================
  248. # 校验
  249. # ========================================
  250. def _validate_timeline(self, timeline: Dict[str, Any]) -> None:
  251. """
  252. 启动时校验 timeline 配置
  253. Raises:
  254. ValueError: 配置不合法时抛出
  255. """
  256. required_top_keys = ["default", "periods", "day_plans", "week_map"]
  257. for key in required_top_keys:
  258. if key not in timeline:
  259. raise ValueError(f"timeline 缺少必须字段: {key}")
  260. # week_map 必须覆盖 1..7
  261. for day in range(1, 8):
  262. if day not in timeline["week_map"]:
  263. raise ValueError(f"week_map 缺少星期映射: {day}")
  264. # day_plan 引用完整性
  265. for day, plan_key in timeline["week_map"].items():
  266. if plan_key not in timeline["day_plans"]:
  267. raise ValueError(
  268. f"week_map[{day}] 引用了不存在的 day_plan: {plan_key}"
  269. )
  270. # period 引用完整性
  271. for plan_key, plan in timeline["day_plans"].items():
  272. for period_key in plan.get("periods", []):
  273. if period_key not in timeline["periods"]:
  274. raise ValueError(
  275. f"day_plan[{plan_key}] 引用了不存在的 period: {period_key}"
  276. )
  277. # 时间格式校验
  278. for period_key, period in timeline["periods"].items():
  279. if "start" not in period or "end" not in period:
  280. raise ValueError(
  281. f"period '{period_key}' 缺少 start 或 end 字段"
  282. )
  283. self._validate_hhmm(period["start"], f"{period_key}.start")
  284. self._validate_hhmm(period["end"], f"{period_key}.end")
  285. if period["start"] == period["end"]:
  286. raise ValueError(
  287. f"period '{period_key}' 的 start 与 end 不能相同: {period['start']}"
  288. )
  289. # 检查冲突策略下的重叠
  290. policy = timeline.get("overlap", {}).get("policy", "error_on_overlap")
  291. if policy == "error_on_overlap":
  292. self._check_period_overlaps(timeline)
  293. def _check_period_overlaps(self, timeline: Dict[str, Any]) -> None:
  294. """
  295. 检查每个日计划中的时间段是否存在重叠
  296. 仅在 overlap.policy == "error_on_overlap" 时调用
  297. """
  298. periods = timeline.get("periods", {})
  299. for plan_key, plan in timeline["day_plans"].items():
  300. period_keys = plan.get("periods", [])
  301. if len(period_keys) <= 1:
  302. continue
  303. # 收集每个时间段的范围
  304. ranges = []
  305. for pk in period_keys:
  306. p = periods.get(pk, {})
  307. if "start" in p and "end" in p:
  308. ranges.append((pk, p["start"], p["end"]))
  309. # 两两检查重叠
  310. for i in range(len(ranges)):
  311. for j in range(i + 1, len(ranges)):
  312. if self._ranges_overlap(
  313. ranges[i][1], ranges[i][2],
  314. ranges[j][1], ranges[j][2],
  315. ):
  316. raise ValueError(
  317. f"day_plan '{plan_key}' 中时间段 '{ranges[i][0]}' "
  318. f"({ranges[i][1]}-{ranges[i][2]}) 与 '{ranges[j][0]}' "
  319. f"({ranges[j][1]}-{ranges[j][2]}) 存在重叠。"
  320. f"请调整时间段,或将 overlap.policy 设为 'last_wins'"
  321. )
  322. @staticmethod
  323. def _ranges_overlap(s1: str, e1: str, s2: str, e2: str) -> bool:
  324. """检查两个时间范围是否重叠(支持跨日)"""
  325. def to_minutes(t: str) -> int:
  326. h, m = t.split(":")
  327. return int(h) * 60 + int(m)
  328. def expand_range(start: str, end: str) -> List[tuple]:
  329. """将时间范围展开为分钟段列表,跨日时拆分为两段"""
  330. s = to_minutes(start)
  331. e = to_minutes(end)
  332. if s <= e:
  333. return [(s, e)]
  334. else:
  335. # 跨日:拆分为 [start, 23:59] 和 [00:00, end]
  336. return [(s, 24 * 60 - 1), (0, e)]
  337. segs1 = expand_range(s1, e1)
  338. segs2 = expand_range(s2, e2)
  339. for a_start, a_end in segs1:
  340. for b_start, b_end in segs2:
  341. # 两个区间有重叠的条件
  342. if a_start <= b_end and b_start <= a_end:
  343. return True
  344. return False
  345. @staticmethod
  346. def _validate_hhmm(value: str, field_name: str) -> None:
  347. """校验 HH:MM 格式"""
  348. if not re.match(r"^\d{2}:\d{2}$", value):
  349. raise ValueError(f"{field_name} 格式错误: '{value}',期望 HH:MM")
  350. h, m = value.split(":")
  351. if not (0 <= int(h) <= 23 and 0 <= int(m) <= 59):
  352. raise ValueError(f"{field_name} 时间值超出范围: '{value}'")