scheduler.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. # coding=utf-8
  2. """
  3. 时间线调度器
  4. 统一的时间线调度系统,替代分散的 push_window / analysis_window 逻辑。
  5. 基于 periods + day_plans + week_map 模型实现灵活的时间段调度。
  6. """
  7. import copy
  8. import re
  9. from dataclasses import dataclass
  10. from typing import Any, Callable, Dict, List, Optional
  11. from datetime import datetime
  12. @dataclass
  13. class ResolvedSchedule:
  14. """当前时间解析后的调度结果"""
  15. period_key: Optional[str] # 命中的 period key,None=默认配置
  16. period_name: Optional[str] # 命中的展示名称
  17. day_plan: str # 当前日计划
  18. collect: bool
  19. analyze: bool
  20. push: bool
  21. report_mode: str
  22. ai_mode: str
  23. once_analyze: bool
  24. once_push: bool
  25. frequency_file: Optional[str] = None # 频率词文件路径,None=使用默认
  26. filter_method: Optional[str] = None # 筛选策略: "keyword"|"ai",None=使用全局配置
  27. interests_file: Optional[str] = None # AI 筛选兴趣文件,None=使用默认
  28. class Scheduler:
  29. """
  30. 时间线调度器
  31. 根据 timeline 配置(periods + day_plans + week_map)解析当前时间应执行的行为。
  32. 支持:
  33. - 预设模板 + 自定义模式
  34. - 跨日时间段(如 22:00-07:00)
  35. - 每天 / 每周差异化配置
  36. - once 执行去重(analyze / push 独立维度)
  37. - 冲突策略(error_on_overlap / last_wins)
  38. """
  39. def __init__(
  40. self,
  41. schedule_config: Dict[str, Any],
  42. timeline_data: Dict[str, Any],
  43. storage_backend: Any,
  44. get_time_func: Callable[[], datetime],
  45. fallback_report_mode: str = "current",
  46. ):
  47. """
  48. 初始化调度器
  49. Args:
  50. schedule_config: config.yaml 中的 schedule 段(含 preset 等)
  51. timeline_data: timeline.yaml 的完整数据
  52. storage_backend: 存储后端(用于 once 去重记录)
  53. get_time_func: 获取当前时间的函数(应使用配置的时区)
  54. fallback_report_mode: 调度未启用时回退使用的 report_mode(来自 config.yaml 的 report.mode)
  55. """
  56. self.schedule_config = schedule_config
  57. self.storage = storage_backend
  58. self.get_time = get_time_func
  59. self.enabled = schedule_config.get("enabled", True)
  60. self.fallback_report_mode = fallback_report_mode
  61. # 加载并构建最终 timeline
  62. self.timeline = self._build_timeline(schedule_config, timeline_data)
  63. if self.enabled:
  64. self._validate_timeline(self.timeline)
  65. def _build_timeline(
  66. self,
  67. schedule_config: Dict[str, Any],
  68. timeline_data: Dict[str, Any],
  69. ) -> Dict[str, Any]:
  70. """从 preset 或 custom 构建 timeline"""
  71. preset = schedule_config.get("preset", "always_on")
  72. if preset == "custom":
  73. timeline = copy.deepcopy(timeline_data.get("custom", {}))
  74. else:
  75. presets = timeline_data.get("presets", {})
  76. if preset not in presets:
  77. raise ValueError(
  78. f"未知的预设模板: '{preset}',可选值: "
  79. f"{', '.join(presets.keys())}, custom"
  80. )
  81. timeline = copy.deepcopy(presets[preset])
  82. # 确保 periods 是 dict(可能为空 {})
  83. if timeline.get("periods") is None:
  84. timeline["periods"] = {}
  85. return timeline
  86. def resolve(self) -> ResolvedSchedule:
  87. """
  88. 解析当前时间对应的调度配置
  89. Returns:
  90. ResolvedSchedule 包含当前应执行的行为
  91. """
  92. if not self.enabled:
  93. # 调度未启用时返回默认的全功能配置,report_mode 回退使用 config.yaml 的 report.mode
  94. return ResolvedSchedule(
  95. period_key=None,
  96. period_name=None,
  97. day_plan="disabled",
  98. collect=True,
  99. analyze=True,
  100. push=True,
  101. report_mode=self.fallback_report_mode,
  102. ai_mode="follow_report",
  103. once_analyze=False,
  104. once_push=False,
  105. )
  106. now = self.get_time()
  107. weekday = now.isoweekday() # 1=周一 ... 7=周日
  108. now_hhmm = now.strftime("%H:%M")
  109. # 查找当天的日计划
  110. day_plan_key = self.timeline["week_map"].get(weekday)
  111. if day_plan_key is None:
  112. raise ValueError(f"week_map 缺少星期映射: {weekday}")
  113. day_plan = self.timeline["day_plans"].get(day_plan_key)
  114. if day_plan is None:
  115. raise ValueError(f"week_map[{weekday}] 引用了不存在的 day_plan: {day_plan_key}")
  116. # 查找当前活跃的时间段
  117. period_key = self._find_active_period(now_hhmm, day_plan)
  118. # 合并默认配置和时间段配置
  119. merged = self._merge_with_default(period_key)
  120. # 打印调度日志
  121. weekday_names = {1: "一", 2: "二", 3: "三", 4: "四", 5: "五", 6: "六", 7: "日"}
  122. period_display = "默认配置(未命中任何时间段)"
  123. if period_key:
  124. period_cfg = self.timeline["periods"][period_key]
  125. period_name = period_cfg.get("name", period_key)
  126. start = period_cfg.get("start", "?")
  127. end = period_cfg.get("end", "?")
  128. period_display = f"{period_name} ({start}-{end})"
  129. print(f"[调度] 星期{weekday_names.get(weekday, '?')},日计划: {day_plan_key}")
  130. print(f"[调度] 当前时间段: {period_display}")
  131. resolved = ResolvedSchedule(
  132. period_key=period_key,
  133. period_name=(
  134. self.timeline["periods"][period_key].get("name")
  135. if period_key
  136. else None
  137. ),
  138. day_plan=day_plan_key,
  139. collect=merged.get("collect", True),
  140. analyze=merged.get("analyze", False),
  141. push=merged.get("push", False),
  142. report_mode=merged.get("report_mode", "current"),
  143. ai_mode=self._resolve_ai_mode(merged),
  144. once_analyze=merged.get("once", {}).get("analyze", False),
  145. once_push=merged.get("once", {}).get("push", False),
  146. frequency_file=merged.get("frequency_file"),
  147. filter_method=merged.get("filter_method"),
  148. interests_file=merged.get("interests_file"),
  149. )
  150. # 打印行为摘要
  151. actions = []
  152. if resolved.collect:
  153. actions.append("采集")
  154. if resolved.analyze:
  155. actions.append(f"分析(AI:{resolved.ai_mode})")
  156. if resolved.push:
  157. actions.append(f"推送(模式:{resolved.report_mode})")
  158. print(f"[调度] 行为: {', '.join(actions) if actions else '无'}")
  159. if resolved.frequency_file:
  160. print(f"[调度] 频率词文件: {resolved.frequency_file}")
  161. return resolved
  162. def _find_active_period(
  163. self, now_hhmm: str, day_plan: Dict[str, Any]
  164. ) -> Optional[str]:
  165. """
  166. 查找当前时间命中的活跃时间段
  167. Args:
  168. now_hhmm: 当前时间 HH:MM
  169. day_plan: 日计划配置
  170. Returns:
  171. 命中的 period key,或 None
  172. """
  173. candidates = []
  174. for idx, key in enumerate(day_plan.get("periods", [])):
  175. period = self.timeline["periods"].get(key)
  176. if period is None:
  177. continue
  178. if self._in_range(now_hhmm, period["start"], period["end"]):
  179. candidates.append((idx, key))
  180. if not candidates:
  181. return None
  182. # 检查冲突
  183. if len(candidates) > 1:
  184. policy = self.timeline.get("overlap", {}).get("policy", "error_on_overlap")
  185. conflicting = [c[1] for c in candidates]
  186. if policy == "error_on_overlap":
  187. raise ValueError(
  188. f"检测到时间段重叠冲突: {', '.join(conflicting)} 在 {now_hhmm} 重叠。"
  189. f"请调整时间段配置,或将 overlap.policy 设为 'last_wins'"
  190. )
  191. # last_wins:输出重叠警告,列表中后面的优先
  192. print(
  193. f"[调度] 检测到时间段重叠: {', '.join(conflicting)} 在 {now_hhmm} 重叠"
  194. )
  195. winner = candidates[-1]
  196. print(f"[调度] 冲突策略: last_wins,生效时间段: {winner[1]}")
  197. return winner[1]
  198. return candidates[0][1]
  199. @staticmethod
  200. def _in_range(now_hhmm: str, start: str, end: str) -> bool:
  201. """
  202. 检查时间是否在范围内(支持跨日)
  203. Args:
  204. now_hhmm: 当前时间 HH:MM
  205. start: 开始时间 HH:MM
  206. end: 结束时间 HH:MM
  207. Returns:
  208. 是否在范围内
  209. """
  210. if start <= end:
  211. # 正常范围,如 08:00-09:00
  212. return start <= now_hhmm <= end
  213. else:
  214. # 跨日范围,如 22:00-07:00
  215. return now_hhmm >= start or now_hhmm <= end
  216. def _merge_with_default(self, period_key: Optional[str]) -> Dict[str, Any]:
  217. """合并默认配置和时间段配置"""
  218. base = copy.deepcopy(self.timeline.get("default", {}))
  219. if not period_key:
  220. return base
  221. period = copy.deepcopy(self.timeline["periods"][period_key])
  222. # 先合并 once 子对象
  223. merged_once = dict(base.get("once", {}))
  224. merged_once.update(period.get("once", {}))
  225. # 标量字段覆盖
  226. base.update(period)
  227. # 恢复合并后的 once
  228. if merged_once:
  229. base["once"] = merged_once
  230. return base
  231. @staticmethod
  232. def _resolve_ai_mode(cfg: Dict[str, Any]) -> str:
  233. """解析最终的 AI 模式"""
  234. ai_mode = cfg.get("ai_mode", "follow_report")
  235. if ai_mode == "follow_report":
  236. return cfg.get("report_mode", "current")
  237. return ai_mode
  238. def already_executed(self, period_key: str, action: str, date_str: str) -> bool:
  239. """
  240. 检查指定时间段的某个 action 今天是否已执行
  241. Args:
  242. period_key: 时间段 key
  243. action: 动作类型 (analyze / push)
  244. date_str: 日期 YYYY-MM-DD
  245. Returns:
  246. 是否已执行
  247. """
  248. return self.storage.has_period_executed(date_str, period_key, action)
  249. def record_execution(self, period_key: str, action: str, date_str: str) -> None:
  250. """
  251. 记录时间段的 action 执行
  252. Args:
  253. period_key: 时间段 key
  254. action: 动作类型 (analyze / push)
  255. date_str: 日期 YYYY-MM-DD
  256. """
  257. self.storage.record_period_execution(date_str, period_key, action)
  258. # ========================================
  259. # 校验
  260. # ========================================
  261. def _validate_timeline(self, timeline: Dict[str, Any]) -> None:
  262. """
  263. 启动时校验 timeline 配置
  264. Raises:
  265. ValueError: 配置不合法时抛出
  266. """
  267. required_top_keys = ["default", "periods", "day_plans", "week_map"]
  268. for key in required_top_keys:
  269. if key not in timeline:
  270. raise ValueError(f"timeline 缺少必须字段: {key}")
  271. # week_map 必须覆盖 1..7
  272. for day in range(1, 8):
  273. if day not in timeline["week_map"]:
  274. raise ValueError(f"week_map 缺少星期映射: {day}")
  275. # day_plan 引用完整性
  276. for day, plan_key in timeline["week_map"].items():
  277. if plan_key not in timeline["day_plans"]:
  278. raise ValueError(
  279. f"week_map[{day}] 引用了不存在的 day_plan: {plan_key}"
  280. )
  281. # period 引用完整性
  282. for plan_key, plan in timeline["day_plans"].items():
  283. for period_key in plan.get("periods", []):
  284. if period_key not in timeline["periods"]:
  285. raise ValueError(
  286. f"day_plan[{plan_key}] 引用了不存在的 period: {period_key}"
  287. )
  288. # 时间格式校验
  289. for period_key, period in timeline["periods"].items():
  290. if "start" not in period or "end" not in period:
  291. raise ValueError(
  292. f"period '{period_key}' 缺少 start 或 end 字段"
  293. )
  294. self._validate_hhmm(period["start"], f"{period_key}.start")
  295. self._validate_hhmm(period["end"], f"{period_key}.end")
  296. if period["start"] == period["end"]:
  297. raise ValueError(
  298. f"period '{period_key}' 的 start 与 end 不能相同: {period['start']}"
  299. )
  300. # 检查冲突策略下的重叠
  301. policy = timeline.get("overlap", {}).get("policy", "error_on_overlap")
  302. if policy == "error_on_overlap":
  303. self._check_period_overlaps(timeline)
  304. def _check_period_overlaps(self, timeline: Dict[str, Any]) -> None:
  305. """
  306. 检查每个日计划中的时间段是否存在重叠
  307. 仅在 overlap.policy == "error_on_overlap" 时调用
  308. """
  309. periods = timeline.get("periods", {})
  310. for plan_key, plan in timeline["day_plans"].items():
  311. period_keys = plan.get("periods", [])
  312. if len(period_keys) <= 1:
  313. continue
  314. # 收集每个时间段的范围
  315. ranges = []
  316. for pk in period_keys:
  317. p = periods.get(pk, {})
  318. if "start" in p and "end" in p:
  319. ranges.append((pk, p["start"], p["end"]))
  320. # 两两检查重叠
  321. for i in range(len(ranges)):
  322. for j in range(i + 1, len(ranges)):
  323. if self._ranges_overlap(
  324. ranges[i][1], ranges[i][2],
  325. ranges[j][1], ranges[j][2],
  326. ):
  327. raise ValueError(
  328. f"day_plan '{plan_key}' 中时间段 '{ranges[i][0]}' "
  329. f"({ranges[i][1]}-{ranges[i][2]}) 与 '{ranges[j][0]}' "
  330. f"({ranges[j][1]}-{ranges[j][2]}) 存在重叠。"
  331. f"请调整时间段,或将 overlap.policy 设为 'last_wins'"
  332. )
  333. @staticmethod
  334. def _ranges_overlap(s1: str, e1: str, s2: str, e2: str) -> bool:
  335. """检查两个时间范围是否重叠(支持跨日)"""
  336. def to_minutes(t: str) -> int:
  337. h, m = t.split(":")
  338. return int(h) * 60 + int(m)
  339. def expand_range(start: str, end: str) -> List[tuple]:
  340. """将时间范围展开为分钟段列表,跨日时拆分为两段"""
  341. s = to_minutes(start)
  342. e = to_minutes(end)
  343. if s <= e:
  344. return [(s, e)]
  345. else:
  346. # 跨日:拆分为 [start, 23:59] 和 [00:00, end]
  347. return [(s, 24 * 60 - 1), (0, e)]
  348. segs1 = expand_range(s1, e1)
  349. segs2 = expand_range(s2, e2)
  350. for a_start, a_end in segs1:
  351. for b_start, b_end in segs2:
  352. # 两个区间有重叠的条件
  353. if a_start <= b_end and b_start <= a_end:
  354. return True
  355. return False
  356. @staticmethod
  357. def _validate_hhmm(value: str, field_name: str) -> None:
  358. """校验 HH:MM 格式"""
  359. if not re.match(r"^\d{2}:\d{2}$", value):
  360. raise ValueError(f"{field_name} 格式错误: '{value}',期望 HH:MM")
  361. h, m = value.split(":")
  362. if not (0 <= int(h) <= 23 and 0 <= int(m) <= 59):
  363. raise ValueError(f"{field_name} 时间值超出范围: '{value}'")