| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431 |
- # coding=utf-8
- """
- 时间线调度器
- 统一的时间线调度系统,替代分散的 push_window / analysis_window 逻辑。
- 基于 periods + day_plans + week_map 模型实现灵活的时间段调度。
- """
- import copy
- import re
- from dataclasses import dataclass
- from typing import Any, Callable, Dict, List, Optional
- from datetime import datetime
- @dataclass
- class ResolvedSchedule:
- """当前时间解析后的调度结果"""
- period_key: Optional[str] # 命中的 period key,None=默认配置
- period_name: Optional[str] # 命中的展示名称
- day_plan: str # 当前日计划
- collect: bool
- analyze: bool
- push: bool
- report_mode: str
- ai_mode: str
- once_analyze: bool
- once_push: bool
- frequency_file: Optional[str] = None # 频率词文件路径,None=使用默认
- filter_method: Optional[str] = None # 筛选策略: "keyword"|"ai",None=使用全局配置
- interests_file: Optional[str] = None # AI 筛选兴趣文件,None=使用默认
- class Scheduler:
- """
- 时间线调度器
- 根据 timeline 配置(periods + day_plans + week_map)解析当前时间应执行的行为。
- 支持:
- - 预设模板 + 自定义模式
- - 跨日时间段(如 22:00-07:00)
- - 每天 / 每周差异化配置
- - once 执行去重(analyze / push 独立维度)
- - 冲突策略(error_on_overlap / last_wins)
- """
- def __init__(
- self,
- schedule_config: Dict[str, Any],
- timeline_data: Dict[str, Any],
- storage_backend: Any,
- get_time_func: Callable[[], datetime],
- fallback_report_mode: str = "current",
- ):
- """
- 初始化调度器
- Args:
- schedule_config: config.yaml 中的 schedule 段(含 preset 等)
- timeline_data: timeline.yaml 的完整数据
- storage_backend: 存储后端(用于 once 去重记录)
- get_time_func: 获取当前时间的函数(应使用配置的时区)
- fallback_report_mode: 调度未启用时回退使用的 report_mode(来自 config.yaml 的 report.mode)
- """
- self.schedule_config = schedule_config
- self.storage = storage_backend
- self.get_time = get_time_func
- self.enabled = schedule_config.get("enabled", True)
- self.fallback_report_mode = fallback_report_mode
- # 加载并构建最终 timeline
- self.timeline = self._build_timeline(schedule_config, timeline_data)
- if self.enabled:
- self._validate_timeline(self.timeline)
- def _build_timeline(
- self,
- schedule_config: Dict[str, Any],
- timeline_data: Dict[str, Any],
- ) -> Dict[str, Any]:
- """从 preset 或 custom 构建 timeline"""
- preset = schedule_config.get("preset", "always_on")
- if preset == "custom":
- timeline = copy.deepcopy(timeline_data.get("custom", {}))
- else:
- presets = timeline_data.get("presets", {})
- if preset not in presets:
- raise ValueError(
- f"未知的预设模板: '{preset}',可选值: "
- f"{', '.join(presets.keys())}, custom"
- )
- timeline = copy.deepcopy(presets[preset])
- # 确保 periods 是 dict(可能为空 {})
- if timeline.get("periods") is None:
- timeline["periods"] = {}
- return timeline
- def resolve(self) -> ResolvedSchedule:
- """
- 解析当前时间对应的调度配置
- Returns:
- ResolvedSchedule 包含当前应执行的行为
- """
- if not self.enabled:
- # 调度未启用时返回默认的全功能配置,report_mode 回退使用 config.yaml 的 report.mode
- return ResolvedSchedule(
- period_key=None,
- period_name=None,
- day_plan="disabled",
- collect=True,
- analyze=True,
- push=True,
- report_mode=self.fallback_report_mode,
- ai_mode="follow_report",
- once_analyze=False,
- once_push=False,
- )
- now = self.get_time()
- weekday = now.isoweekday() # 1=周一 ... 7=周日
- now_hhmm = now.strftime("%H:%M")
- # 查找当天的日计划
- day_plan_key = self.timeline["week_map"].get(weekday)
- if day_plan_key is None:
- raise ValueError(f"week_map 缺少星期映射: {weekday}")
- day_plan = self.timeline["day_plans"].get(day_plan_key)
- if day_plan is None:
- raise ValueError(f"week_map[{weekday}] 引用了不存在的 day_plan: {day_plan_key}")
- # 查找当前活跃的时间段
- period_key = self._find_active_period(now_hhmm, day_plan)
- # 合并默认配置和时间段配置
- merged = self._merge_with_default(period_key)
- # 打印调度日志
- weekday_names = {1: "一", 2: "二", 3: "三", 4: "四", 5: "五", 6: "六", 7: "日"}
- period_display = "默认配置(未命中任何时间段)"
- if period_key:
- period_cfg = self.timeline["periods"][period_key]
- period_name = period_cfg.get("name", period_key)
- start = period_cfg.get("start", "?")
- end = period_cfg.get("end", "?")
- period_display = f"{period_name} ({start}-{end})"
- print(f"[调度] 星期{weekday_names.get(weekday, '?')},日计划: {day_plan_key}")
- print(f"[调度] 当前时间段: {period_display}")
- resolved = ResolvedSchedule(
- period_key=period_key,
- period_name=(
- self.timeline["periods"][period_key].get("name")
- if period_key
- else None
- ),
- day_plan=day_plan_key,
- collect=merged.get("collect", True),
- analyze=merged.get("analyze", False),
- push=merged.get("push", False),
- report_mode=merged.get("report_mode", "current"),
- ai_mode=self._resolve_ai_mode(merged),
- once_analyze=merged.get("once", {}).get("analyze", False),
- once_push=merged.get("once", {}).get("push", False),
- frequency_file=merged.get("frequency_file"),
- filter_method=merged.get("filter_method"),
- interests_file=merged.get("interests_file"),
- )
- # 打印行为摘要
- actions = []
- if resolved.collect:
- actions.append("采集")
- if resolved.analyze:
- actions.append(f"分析(AI:{resolved.ai_mode})")
- if resolved.push:
- actions.append(f"推送(模式:{resolved.report_mode})")
- print(f"[调度] 行为: {', '.join(actions) if actions else '无'}")
- if resolved.frequency_file:
- print(f"[调度] 频率词文件: {resolved.frequency_file}")
- return resolved
- def _find_active_period(
- self, now_hhmm: str, day_plan: Dict[str, Any]
- ) -> Optional[str]:
- """
- 查找当前时间命中的活跃时间段
- Args:
- now_hhmm: 当前时间 HH:MM
- day_plan: 日计划配置
- Returns:
- 命中的 period key,或 None
- """
- candidates = []
- for idx, key in enumerate(day_plan.get("periods", [])):
- period = self.timeline["periods"].get(key)
- if period is None:
- continue
- if self._in_range(now_hhmm, period["start"], period["end"]):
- candidates.append((idx, key))
- if not candidates:
- return None
- # 检查冲突
- if len(candidates) > 1:
- policy = self.timeline.get("overlap", {}).get("policy", "error_on_overlap")
- conflicting = [c[1] for c in candidates]
- if policy == "error_on_overlap":
- raise ValueError(
- f"检测到时间段重叠冲突: {', '.join(conflicting)} 在 {now_hhmm} 重叠。"
- f"请调整时间段配置,或将 overlap.policy 设为 'last_wins'"
- )
- # last_wins:输出重叠警告,列表中后面的优先
- print(
- f"[调度] 检测到时间段重叠: {', '.join(conflicting)} 在 {now_hhmm} 重叠"
- )
- winner = candidates[-1]
- print(f"[调度] 冲突策略: last_wins,生效时间段: {winner[1]}")
- return winner[1]
- return candidates[0][1]
- @staticmethod
- def _in_range(now_hhmm: str, start: str, end: str) -> bool:
- """
- 检查时间是否在范围内(支持跨日)
- Args:
- now_hhmm: 当前时间 HH:MM
- start: 开始时间 HH:MM
- end: 结束时间 HH:MM
- Returns:
- 是否在范围内
- """
- if start <= end:
- # 正常范围,如 08:00-09:00(半开区间 [start, end))
- return start <= now_hhmm < end
- else:
- # 跨日范围,如 22:00-07:00(半开区间 [start, end))
- return now_hhmm >= start or now_hhmm < end
- def _merge_with_default(self, period_key: Optional[str]) -> Dict[str, Any]:
- """合并默认配置和时间段配置"""
- base = copy.deepcopy(self.timeline.get("default", {}))
- if not period_key:
- return base
- period = copy.deepcopy(self.timeline["periods"][period_key])
- # 先合并 once 子对象
- merged_once = dict(base.get("once", {}))
- merged_once.update(period.get("once", {}))
- # 标量字段覆盖
- base.update(period)
- # 恢复合并后的 once
- if merged_once:
- base["once"] = merged_once
- return base
- @staticmethod
- def _resolve_ai_mode(cfg: Dict[str, Any]) -> str:
- """解析最终的 AI 模式"""
- ai_mode = cfg.get("ai_mode", "follow_report")
- if ai_mode == "follow_report":
- return cfg.get("report_mode", "current")
- return ai_mode
- def already_executed(self, period_key: str, action: str, date_str: str) -> bool:
- """
- 检查指定时间段的某个 action 今天是否已执行
- Args:
- period_key: 时间段 key
- action: 动作类型 (analyze / push)
- date_str: 日期 YYYY-MM-DD
- Returns:
- 是否已执行
- """
- return self.storage.has_period_executed(date_str, period_key, action)
- def record_execution(self, period_key: str, action: str, date_str: str) -> None:
- """
- 记录时间段的 action 执行
- Args:
- period_key: 时间段 key
- action: 动作类型 (analyze / push)
- date_str: 日期 YYYY-MM-DD
- """
- self.storage.record_period_execution(date_str, period_key, action)
- # ========================================
- # 校验
- # ========================================
- def _validate_timeline(self, timeline: Dict[str, Any]) -> None:
- """
- 启动时校验 timeline 配置
- Raises:
- ValueError: 配置不合法时抛出
- """
- required_top_keys = ["default", "periods", "day_plans", "week_map"]
- for key in required_top_keys:
- if key not in timeline:
- raise ValueError(f"timeline 缺少必须字段: {key}")
- # week_map 必须覆盖 1..7
- for day in range(1, 8):
- if day not in timeline["week_map"]:
- raise ValueError(f"week_map 缺少星期映射: {day}")
- # day_plan 引用完整性
- for day, plan_key in timeline["week_map"].items():
- if plan_key not in timeline["day_plans"]:
- raise ValueError(
- f"week_map[{day}] 引用了不存在的 day_plan: {plan_key}"
- )
- # period 引用完整性
- for plan_key, plan in timeline["day_plans"].items():
- for period_key in plan.get("periods", []):
- if period_key not in timeline["periods"]:
- raise ValueError(
- f"day_plan[{plan_key}] 引用了不存在的 period: {period_key}"
- )
- # 时间格式校验
- for period_key, period in timeline["periods"].items():
- if "start" not in period or "end" not in period:
- raise ValueError(
- f"period '{period_key}' 缺少 start 或 end 字段"
- )
- self._validate_hhmm(period["start"], f"{period_key}.start")
- self._validate_hhmm(period["end"], f"{period_key}.end")
- if period["start"] == period["end"]:
- raise ValueError(
- f"period '{period_key}' 的 start 与 end 不能相同: {period['start']}"
- )
- # 检查冲突策略下的重叠
- policy = timeline.get("overlap", {}).get("policy", "error_on_overlap")
- if policy == "error_on_overlap":
- self._check_period_overlaps(timeline)
- def _check_period_overlaps(self, timeline: Dict[str, Any]) -> None:
- """
- 检查每个日计划中的时间段是否存在重叠
- 仅在 overlap.policy == "error_on_overlap" 时调用
- """
- periods = timeline.get("periods", {})
- for plan_key, plan in timeline["day_plans"].items():
- period_keys = plan.get("periods", [])
- if len(period_keys) <= 1:
- continue
- # 收集每个时间段的范围
- ranges = []
- for pk in period_keys:
- p = periods.get(pk, {})
- if "start" in p and "end" in p:
- ranges.append((pk, p["start"], p["end"]))
- # 两两检查重叠
- for i in range(len(ranges)):
- for j in range(i + 1, len(ranges)):
- if self._ranges_overlap(
- ranges[i][1], ranges[i][2],
- ranges[j][1], ranges[j][2],
- ):
- raise ValueError(
- f"day_plan '{plan_key}' 中时间段 '{ranges[i][0]}' "
- f"({ranges[i][1]}-{ranges[i][2]}) 与 '{ranges[j][0]}' "
- f"({ranges[j][1]}-{ranges[j][2]}) 存在重叠。"
- f"请调整时间段,或将 overlap.policy 设为 'last_wins'"
- )
- @staticmethod
- def _ranges_overlap(s1: str, e1: str, s2: str, e2: str) -> bool:
- """检查两个时间范围是否重叠(支持跨日)"""
- def to_minutes(t: str) -> int:
- h, m = t.split(":")
- return int(h) * 60 + int(m)
- def expand_range(start: str, end: str) -> List[tuple]:
- """将时间范围展开为分钟段列表,跨日时拆分为两段"""
- s = to_minutes(start)
- e = to_minutes(end)
- if s <= e:
- return [(s, e)]
- else:
- # 跨日:拆分为 [start, 24:00) 和 [00:00, end)
- return [(s, 24 * 60), (0, e)]
- segs1 = expand_range(s1, e1)
- segs2 = expand_range(s2, e2)
- for a_start, a_end in segs1:
- for b_start, b_end in segs2:
- # 两个半开区间有重叠的条件
- if a_start < b_end and b_start < a_end:
- return True
- return False
- @staticmethod
- def _validate_hhmm(value: str, field_name: str) -> None:
- """校验 HH:MM 格式"""
- if not re.match(r"^\d{2}:\d{2}$", value):
- raise ValueError(f"{field_name} 格式错误: '{value}',期望 HH:MM")
- h, m = value.split(":")
- if not (0 <= int(h) <= 23 and 0 <= int(m) <= 59):
- raise ValueError(f"{field_name} 时间值超出范围: '{value}'")
|