# coding=utf-8 """ 时间线调度器 统一的时间线调度系统,替代分散的 push_window / analysis_window 逻辑。 基于 periods + day_plans + week_map 模型实现灵活的时间段调度。 """ import copy import re from dataclasses import dataclass from typing import Any, Callable, Dict, List, Optional from datetime import datetime @dataclass class ResolvedSchedule: """当前时间解析后的调度结果""" period_key: Optional[str] # 命中的 period key,None=默认配置 period_name: Optional[str] # 命中的展示名称 day_plan: str # 当前日计划 collect: bool analyze: bool push: bool report_mode: str ai_mode: str once_analyze: bool once_push: bool frequency_file: Optional[str] = None # 频率词文件路径,None=使用默认 filter_method: Optional[str] = None # 筛选策略: "keyword"|"ai",None=使用全局配置 interests_file: Optional[str] = None # AI 筛选兴趣文件,None=使用默认 class Scheduler: """ 时间线调度器 根据 timeline 配置(periods + day_plans + week_map)解析当前时间应执行的行为。 支持: - 预设模板 + 自定义模式 - 跨日时间段(如 22:00-07:00) - 每天 / 每周差异化配置 - once 执行去重(analyze / push 独立维度) - 冲突策略(error_on_overlap / last_wins) """ def __init__( self, schedule_config: Dict[str, Any], timeline_data: Dict[str, Any], storage_backend: Any, get_time_func: Callable[[], datetime], fallback_report_mode: str = "current", ): """ 初始化调度器 Args: schedule_config: config.yaml 中的 schedule 段(含 preset 等) timeline_data: timeline.yaml 的完整数据 storage_backend: 存储后端(用于 once 去重记录) get_time_func: 获取当前时间的函数(应使用配置的时区) fallback_report_mode: 调度未启用时回退使用的 report_mode(来自 config.yaml 的 report.mode) """ self.schedule_config = schedule_config self.storage = storage_backend self.get_time = get_time_func self.enabled = schedule_config.get("enabled", True) self.fallback_report_mode = fallback_report_mode # 加载并构建最终 timeline self.timeline = self._build_timeline(schedule_config, timeline_data) if self.enabled: self._validate_timeline(self.timeline) def _build_timeline( self, schedule_config: Dict[str, Any], timeline_data: Dict[str, Any], ) -> Dict[str, Any]: """从 preset 或 custom 构建 timeline""" preset = schedule_config.get("preset", "always_on") if preset == "custom": timeline = copy.deepcopy(timeline_data.get("custom", {})) else: presets = timeline_data.get("presets", {}) if preset not in presets: raise ValueError( f"未知的预设模板: '{preset}',可选值: " f"{', '.join(presets.keys())}, custom" ) timeline = copy.deepcopy(presets[preset]) # 确保 periods 是 dict(可能为空 {}) if timeline.get("periods") is None: timeline["periods"] = {} return timeline def resolve(self) -> ResolvedSchedule: """ 解析当前时间对应的调度配置 Returns: ResolvedSchedule 包含当前应执行的行为 """ if not self.enabled: # 调度未启用时返回默认的全功能配置,report_mode 回退使用 config.yaml 的 report.mode return ResolvedSchedule( period_key=None, period_name=None, day_plan="disabled", collect=True, analyze=True, push=True, report_mode=self.fallback_report_mode, ai_mode="follow_report", once_analyze=False, once_push=False, ) now = self.get_time() weekday = now.isoweekday() # 1=周一 ... 7=周日 now_hhmm = now.strftime("%H:%M") # 查找当天的日计划 day_plan_key = self.timeline["week_map"].get(weekday) if day_plan_key is None: raise ValueError(f"week_map 缺少星期映射: {weekday}") day_plan = self.timeline["day_plans"].get(day_plan_key) if day_plan is None: raise ValueError(f"week_map[{weekday}] 引用了不存在的 day_plan: {day_plan_key}") # 查找当前活跃的时间段 period_key = self._find_active_period(now_hhmm, day_plan) # 合并默认配置和时间段配置 merged = self._merge_with_default(period_key) # 打印调度日志 weekday_names = {1: "一", 2: "二", 3: "三", 4: "四", 5: "五", 6: "六", 7: "日"} period_display = "默认配置(未命中任何时间段)" if period_key: period_cfg = self.timeline["periods"][period_key] period_name = period_cfg.get("name", period_key) start = period_cfg.get("start", "?") end = period_cfg.get("end", "?") period_display = f"{period_name} ({start}-{end})" print(f"[调度] 星期{weekday_names.get(weekday, '?')},日计划: {day_plan_key}") print(f"[调度] 当前时间段: {period_display}") resolved = ResolvedSchedule( period_key=period_key, period_name=( self.timeline["periods"][period_key].get("name") if period_key else None ), day_plan=day_plan_key, collect=merged.get("collect", True), analyze=merged.get("analyze", False), push=merged.get("push", False), report_mode=merged.get("report_mode", "current"), ai_mode=self._resolve_ai_mode(merged), once_analyze=merged.get("once", {}).get("analyze", False), once_push=merged.get("once", {}).get("push", False), frequency_file=merged.get("frequency_file"), filter_method=merged.get("filter_method"), interests_file=merged.get("interests_file"), ) # 打印行为摘要 actions = [] if resolved.collect: actions.append("采集") if resolved.analyze: actions.append(f"分析(AI:{resolved.ai_mode})") if resolved.push: actions.append(f"推送(模式:{resolved.report_mode})") print(f"[调度] 行为: {', '.join(actions) if actions else '无'}") if resolved.frequency_file: print(f"[调度] 频率词文件: {resolved.frequency_file}") return resolved def _find_active_period( self, now_hhmm: str, day_plan: Dict[str, Any] ) -> Optional[str]: """ 查找当前时间命中的活跃时间段 Args: now_hhmm: 当前时间 HH:MM day_plan: 日计划配置 Returns: 命中的 period key,或 None """ candidates = [] for idx, key in enumerate(day_plan.get("periods", [])): period = self.timeline["periods"].get(key) if period is None: continue if self._in_range(now_hhmm, period["start"], period["end"]): candidates.append((idx, key)) if not candidates: return None # 检查冲突 if len(candidates) > 1: policy = self.timeline.get("overlap", {}).get("policy", "error_on_overlap") conflicting = [c[1] for c in candidates] if policy == "error_on_overlap": raise ValueError( f"检测到时间段重叠冲突: {', '.join(conflicting)} 在 {now_hhmm} 重叠。" f"请调整时间段配置,或将 overlap.policy 设为 'last_wins'" ) # last_wins:输出重叠警告,列表中后面的优先 print( f"[调度] 检测到时间段重叠: {', '.join(conflicting)} 在 {now_hhmm} 重叠" ) winner = candidates[-1] print(f"[调度] 冲突策略: last_wins,生效时间段: {winner[1]}") return winner[1] return candidates[0][1] @staticmethod def _in_range(now_hhmm: str, start: str, end: str) -> bool: """ 检查时间是否在范围内(支持跨日) Args: now_hhmm: 当前时间 HH:MM start: 开始时间 HH:MM end: 结束时间 HH:MM Returns: 是否在范围内 """ if start <= end: # 正常范围,如 08:00-09:00(半开区间 [start, end)) return start <= now_hhmm < end else: # 跨日范围,如 22:00-07:00(半开区间 [start, end)) return now_hhmm >= start or now_hhmm < end def _merge_with_default(self, period_key: Optional[str]) -> Dict[str, Any]: """合并默认配置和时间段配置""" base = copy.deepcopy(self.timeline.get("default", {})) if not period_key: return base period = copy.deepcopy(self.timeline["periods"][period_key]) # 先合并 once 子对象 merged_once = dict(base.get("once", {})) merged_once.update(period.get("once", {})) # 标量字段覆盖 base.update(period) # 恢复合并后的 once if merged_once: base["once"] = merged_once return base @staticmethod def _resolve_ai_mode(cfg: Dict[str, Any]) -> str: """解析最终的 AI 模式""" ai_mode = cfg.get("ai_mode", "follow_report") if ai_mode == "follow_report": return cfg.get("report_mode", "current") return ai_mode def already_executed(self, period_key: str, action: str, date_str: str) -> bool: """ 检查指定时间段的某个 action 今天是否已执行 Args: period_key: 时间段 key action: 动作类型 (analyze / push) date_str: 日期 YYYY-MM-DD Returns: 是否已执行 """ return self.storage.has_period_executed(date_str, period_key, action) def record_execution(self, period_key: str, action: str, date_str: str) -> None: """ 记录时间段的 action 执行 Args: period_key: 时间段 key action: 动作类型 (analyze / push) date_str: 日期 YYYY-MM-DD """ self.storage.record_period_execution(date_str, period_key, action) # ======================================== # 校验 # ======================================== def _validate_timeline(self, timeline: Dict[str, Any]) -> None: """ 启动时校验 timeline 配置 Raises: ValueError: 配置不合法时抛出 """ required_top_keys = ["default", "periods", "day_plans", "week_map"] for key in required_top_keys: if key not in timeline: raise ValueError(f"timeline 缺少必须字段: {key}") # week_map 必须覆盖 1..7 for day in range(1, 8): if day not in timeline["week_map"]: raise ValueError(f"week_map 缺少星期映射: {day}") # day_plan 引用完整性 for day, plan_key in timeline["week_map"].items(): if plan_key not in timeline["day_plans"]: raise ValueError( f"week_map[{day}] 引用了不存在的 day_plan: {plan_key}" ) # period 引用完整性 for plan_key, plan in timeline["day_plans"].items(): for period_key in plan.get("periods", []): if period_key not in timeline["periods"]: raise ValueError( f"day_plan[{plan_key}] 引用了不存在的 period: {period_key}" ) # 时间格式校验 for period_key, period in timeline["periods"].items(): if "start" not in period or "end" not in period: raise ValueError( f"period '{period_key}' 缺少 start 或 end 字段" ) self._validate_hhmm(period["start"], f"{period_key}.start") self._validate_hhmm(period["end"], f"{period_key}.end") if period["start"] == period["end"]: raise ValueError( f"period '{period_key}' 的 start 与 end 不能相同: {period['start']}" ) # 检查冲突策略下的重叠 policy = timeline.get("overlap", {}).get("policy", "error_on_overlap") if policy == "error_on_overlap": self._check_period_overlaps(timeline) def _check_period_overlaps(self, timeline: Dict[str, Any]) -> None: """ 检查每个日计划中的时间段是否存在重叠 仅在 overlap.policy == "error_on_overlap" 时调用 """ periods = timeline.get("periods", {}) for plan_key, plan in timeline["day_plans"].items(): period_keys = plan.get("periods", []) if len(period_keys) <= 1: continue # 收集每个时间段的范围 ranges = [] for pk in period_keys: p = periods.get(pk, {}) if "start" in p and "end" in p: ranges.append((pk, p["start"], p["end"])) # 两两检查重叠 for i in range(len(ranges)): for j in range(i + 1, len(ranges)): if self._ranges_overlap( ranges[i][1], ranges[i][2], ranges[j][1], ranges[j][2], ): raise ValueError( f"day_plan '{plan_key}' 中时间段 '{ranges[i][0]}' " f"({ranges[i][1]}-{ranges[i][2]}) 与 '{ranges[j][0]}' " f"({ranges[j][1]}-{ranges[j][2]}) 存在重叠。" f"请调整时间段,或将 overlap.policy 设为 'last_wins'" ) @staticmethod def _ranges_overlap(s1: str, e1: str, s2: str, e2: str) -> bool: """检查两个时间范围是否重叠(支持跨日)""" def to_minutes(t: str) -> int: h, m = t.split(":") return int(h) * 60 + int(m) def expand_range(start: str, end: str) -> List[tuple]: """将时间范围展开为分钟段列表,跨日时拆分为两段""" s = to_minutes(start) e = to_minutes(end) if s <= e: return [(s, e)] else: # 跨日:拆分为 [start, 24:00) 和 [00:00, end) return [(s, 24 * 60), (0, e)] segs1 = expand_range(s1, e1) segs2 = expand_range(s2, e2) for a_start, a_end in segs1: for b_start, b_end in segs2: # 两个半开区间有重叠的条件 if a_start < b_end and b_start < a_end: return True return False @staticmethod def _validate_hhmm(value: str, field_name: str) -> None: """校验 HH:MM 格式""" if not re.match(r"^\d{2}:\d{2}$", value): raise ValueError(f"{field_name} 格式错误: '{value}',期望 HH:MM") h, m = value.split(":") if not (0 <= int(h) <= 23 and 0 <= int(m) <= 59): raise ValueError(f"{field_name} 时间值超出范围: '{value}'")