time.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445
  1. # coding=utf-8
  2. """
  3. 时间工具模块
  4. 本模块提供统一的时间处理函数,所有时区相关操作都应使用 DEFAULT_TIMEZONE 常量。
  5. """
  6. from datetime import datetime
  7. from typing import Optional, Tuple
  8. import pytz
  9. # 默认时区常量 - 仅作为 fallback,正常运行时使用 config.yaml 中的 app.timezone
  10. DEFAULT_TIMEZONE = "Asia/Shanghai"
  11. def get_configured_time(timezone: str = DEFAULT_TIMEZONE) -> datetime:
  12. """
  13. 获取配置时区的当前时间
  14. Args:
  15. timezone: 时区名称,如 'Asia/Shanghai', 'America/Los_Angeles'
  16. Returns:
  17. 带时区信息的当前时间
  18. """
  19. try:
  20. tz = pytz.timezone(timezone)
  21. except pytz.UnknownTimeZoneError:
  22. print(f"[警告] 未知时区 '{timezone}',使用默认时区 {DEFAULT_TIMEZONE}")
  23. tz = pytz.timezone(DEFAULT_TIMEZONE)
  24. return datetime.now(tz)
  25. def format_date_folder(
  26. date: Optional[str] = None, timezone: str = DEFAULT_TIMEZONE
  27. ) -> str:
  28. """
  29. 格式化日期文件夹名 (ISO 格式: YYYY-MM-DD)
  30. Args:
  31. date: 指定日期字符串,为 None 则使用当前日期
  32. timezone: 时区名称
  33. Returns:
  34. 格式化后的日期字符串,如 '2025-12-09'
  35. """
  36. if date:
  37. return date
  38. return get_configured_time(timezone).strftime("%Y-%m-%d")
  39. def format_time_filename(timezone: str = DEFAULT_TIMEZONE) -> str:
  40. """
  41. 格式化时间文件名 (格式: HH-MM,用于文件名)
  42. Windows 系统不支持冒号作为文件名,因此使用连字符
  43. Args:
  44. timezone: 时区名称
  45. Returns:
  46. 格式化后的时间字符串,如 '15-30'
  47. """
  48. return get_configured_time(timezone).strftime("%H-%M")
  49. def get_current_time_display(timezone: str = DEFAULT_TIMEZONE) -> str:
  50. """
  51. 获取当前时间显示 (格式: HH:MM,用于显示)
  52. Args:
  53. timezone: 时区名称
  54. Returns:
  55. 格式化后的时间字符串,如 '15:30'
  56. """
  57. return get_configured_time(timezone).strftime("%H:%M")
  58. def convert_time_for_display(time_str: str) -> str:
  59. """
  60. 将 HH-MM 格式转换为 HH:MM 格式用于显示
  61. Args:
  62. time_str: 输入时间字符串,如 '15-30'
  63. Returns:
  64. 转换后的时间字符串,如 '15:30'
  65. """
  66. if time_str and "-" in time_str and len(time_str) == 5:
  67. return time_str.replace("-", ":")
  68. return time_str
  69. def format_iso_time_friendly(
  70. iso_time: str,
  71. timezone: str = DEFAULT_TIMEZONE,
  72. include_date: bool = True,
  73. ) -> str:
  74. """
  75. 将 ISO 格式时间转换为用户时区的友好显示格式
  76. Args:
  77. iso_time: ISO 格式时间字符串,如 '2025-12-29T00:20:00' 或 '2025-12-29T00:20:00+00:00'
  78. timezone: 目标时区名称
  79. include_date: 是否包含日期部分
  80. Returns:
  81. 友好格式的时间字符串,如 '12-29 08:20' 或 '08:20'
  82. """
  83. if not iso_time:
  84. return ""
  85. try:
  86. # 尝试解析各种 ISO 格式
  87. dt = None
  88. # 尝试解析带时区的格式
  89. if "+" in iso_time or iso_time.endswith("Z"):
  90. iso_time = iso_time.replace("Z", "+00:00")
  91. try:
  92. dt = datetime.fromisoformat(iso_time)
  93. except ValueError:
  94. pass
  95. # 尝试解析不带时区的格式(假设为 UTC)
  96. if dt is None:
  97. try:
  98. # 处理 T 分隔符
  99. if "T" in iso_time:
  100. dt = datetime.fromisoformat(iso_time.replace("T", " ").split(".")[0])
  101. else:
  102. dt = datetime.fromisoformat(iso_time.split(".")[0])
  103. # 假设为 UTC 时间
  104. dt = pytz.UTC.localize(dt)
  105. except ValueError:
  106. pass
  107. if dt is None:
  108. # 无法解析,返回原始字符串的简化版本
  109. if "T" in iso_time:
  110. parts = iso_time.split("T")
  111. if len(parts) == 2:
  112. date_part = parts[0][5:] # MM-DD
  113. time_part = parts[1][:5] # HH:MM
  114. return f"{date_part} {time_part}" if include_date else time_part
  115. return iso_time
  116. # 转换到目标时区
  117. try:
  118. target_tz = pytz.timezone(timezone)
  119. except pytz.UnknownTimeZoneError:
  120. target_tz = pytz.timezone(DEFAULT_TIMEZONE)
  121. dt_local = dt.astimezone(target_tz)
  122. # 格式化输出
  123. if include_date:
  124. return dt_local.strftime("%m-%d %H:%M")
  125. else:
  126. return dt_local.strftime("%H:%M")
  127. except Exception:
  128. # 出错时返回原始字符串的简化版本
  129. if "T" in iso_time:
  130. parts = iso_time.split("T")
  131. if len(parts) == 2:
  132. date_part = parts[0][5:] # MM-DD
  133. time_part = parts[1][:5] # HH:MM
  134. return f"{date_part} {time_part}" if include_date else time_part
  135. return iso_time
  136. def is_within_days(
  137. iso_time: str,
  138. max_days: int,
  139. timezone: str = DEFAULT_TIMEZONE,
  140. ) -> bool:
  141. """
  142. 检查 ISO 格式时间是否在指定天数内
  143. 用于 RSS 文章新鲜度过滤,判断文章发布时间是否超过指定天数。
  144. Args:
  145. iso_time: ISO 格式时间字符串(如 '2025-12-29T00:20:00' 或带时区)
  146. max_days: 最大天数(文章发布时间距今不超过此天数则返回 True)
  147. - max_days > 0: 正常过滤,保留 N 天内的文章
  148. - max_days <= 0: 禁用过滤,保留所有文章
  149. timezone: 时区名称(用于获取当前时间)
  150. Returns:
  151. True 如果时间在指定天数内(应保留),False 如果超过指定天数(应过滤)
  152. 如果无法解析时间,返回 True(保留文章)
  153. """
  154. # 无时间戳或禁用过滤时,保留文章
  155. if not iso_time:
  156. return True
  157. if max_days <= 0:
  158. return True # max_days=0 表示禁用过滤
  159. try:
  160. dt = None
  161. # 尝试解析带时区的格式
  162. if "+" in iso_time or iso_time.endswith("Z"):
  163. iso_time_normalized = iso_time.replace("Z", "+00:00")
  164. try:
  165. dt = datetime.fromisoformat(iso_time_normalized)
  166. except ValueError:
  167. pass
  168. # 尝试解析不带时区的格式(假设为 UTC)
  169. if dt is None:
  170. try:
  171. if "T" in iso_time:
  172. dt = datetime.fromisoformat(iso_time.replace("T", " ").split(".")[0])
  173. else:
  174. dt = datetime.fromisoformat(iso_time.split(".")[0])
  175. dt = pytz.UTC.localize(dt)
  176. except ValueError:
  177. pass
  178. if dt is None:
  179. # 无法解析时间,保留文章
  180. return True
  181. # 获取当前时间(配置的时区,带时区信息)
  182. now = get_configured_time(timezone)
  183. # 计算时间差(两个带时区的 datetime 相减会自动处理时区差异)
  184. diff = now - dt
  185. days_diff = diff.total_seconds() / (24 * 60 * 60)
  186. return days_diff <= max_days
  187. except Exception:
  188. # 出错时保留文章
  189. return True
  190. def calculate_days_old(iso_time: str, timezone: str = DEFAULT_TIMEZONE) -> Optional[float]:
  191. """
  192. 计算 ISO 格式时间距今多少天
  193. Args:
  194. iso_time: ISO 格式时间字符串
  195. timezone: 时区名称
  196. Returns:
  197. 距今天数(浮点数),如果无法解析返回 None
  198. """
  199. if not iso_time:
  200. return None
  201. try:
  202. dt = None
  203. # 尝试解析带时区的格式
  204. if "+" in iso_time or iso_time.endswith("Z"):
  205. iso_time_normalized = iso_time.replace("Z", "+00:00")
  206. try:
  207. dt = datetime.fromisoformat(iso_time_normalized)
  208. except ValueError:
  209. pass
  210. # 尝试解析不带时区的格式(假设为 UTC)
  211. if dt is None:
  212. try:
  213. if "T" in iso_time:
  214. dt = datetime.fromisoformat(iso_time.replace("T", " ").split(".")[0])
  215. else:
  216. dt = datetime.fromisoformat(iso_time.split(".")[0])
  217. dt = pytz.UTC.localize(dt)
  218. except ValueError:
  219. pass
  220. if dt is None:
  221. return None
  222. now = get_configured_time(timezone)
  223. diff = now - dt
  224. return diff.total_seconds() / (24 * 60 * 60)
  225. except Exception:
  226. return None
  227. class TimeWindowChecker:
  228. """
  229. 时间窗口检查器
  230. 统一管理时间窗口控制逻辑,支持:
  231. - 推送窗口控制 (push_window)
  232. - AI 分析窗口控制 (analysis_window)
  233. - once_per_day 功能
  234. """
  235. def __init__(
  236. self,
  237. storage_backend,
  238. get_time_func=None,
  239. window_name: str = "时间窗口",
  240. ):
  241. """
  242. 初始化时间窗口检查器
  243. Args:
  244. storage_backend: 存储后端实例
  245. get_time_func: 获取当前时间的函数
  246. window_name: 窗口名称(用于日志输出)
  247. """
  248. self.storage_backend = storage_backend
  249. self.get_time_func = get_time_func or (lambda: get_configured_time(DEFAULT_TIMEZONE))
  250. self.window_name = window_name
  251. def is_in_time_range(self, start_time: str, end_time: str) -> bool:
  252. """
  253. 检查当前时间是否在指定时间范围内
  254. 支持跨日时间窗口,例如:
  255. - 正常窗口:09:00-21:00(当天 9 点到 21 点)
  256. - 跨日窗口:22:00-02:00(当天 22 点到次日 2 点)
  257. Args:
  258. start_time: 开始时间(格式:HH:MM)
  259. end_time: 结束时间(格式:HH:MM)
  260. Returns:
  261. 是否在时间范围内
  262. """
  263. now = self.get_time_func()
  264. current_time = now.strftime("%H:%M")
  265. normalized_start = self._normalize_time(start_time)
  266. normalized_end = self._normalize_time(end_time)
  267. normalized_current = self._normalize_time(current_time)
  268. # 判断是否跨日窗口(start > end 表示跨日,如 22:00-02:00)
  269. if normalized_start <= normalized_end:
  270. # 正常窗口:09:00-21:00
  271. result = normalized_start <= normalized_current <= normalized_end
  272. else:
  273. # 跨日窗口:22:00-02:00
  274. # 当前时间 >= 开始时间(如 23:00 >= 22:00)或 当前时间 <= 结束时间(如 01:00 <= 02:00)
  275. result = normalized_current >= normalized_start or normalized_current <= normalized_end
  276. if not result:
  277. print(f"[{self.window_name}] 当前 {normalized_current},窗口 {normalized_start}-{normalized_end}")
  278. return result
  279. def _normalize_time(self, time_str: str) -> str:
  280. """将时间字符串标准化为 HH:MM 格式"""
  281. try:
  282. parts = time_str.strip().split(":")
  283. if len(parts) != 2:
  284. raise ValueError(f"时间格式错误: {time_str}")
  285. hour = int(parts[0])
  286. minute = int(parts[1])
  287. if not (0 <= hour <= 23 and 0 <= minute <= 59):
  288. raise ValueError(f"时间范围错误: {time_str}")
  289. return f"{hour:02d}:{minute:02d}"
  290. except Exception as e:
  291. print(f"[{self.window_name}] 时间格式化错误 '{time_str}': {e}")
  292. return time_str
  293. def check_window(
  294. self,
  295. window_config: dict,
  296. check_once_per_day_func=None,
  297. record_func=None,
  298. ) -> Tuple[bool, str]:
  299. """
  300. 统一的时间窗口检查逻辑
  301. Args:
  302. window_config: 窗口配置字典,包含:
  303. - ENABLED: 是否启用窗口控制
  304. - TIME_RANGE: {"START": "HH:MM", "END": "HH:MM"}
  305. - ONCE_PER_DAY: 是否每天只执行一次
  306. check_once_per_day_func: 检查今天是否已执行的函数
  307. record_func: 记录执行的函数(成功后调用)
  308. Returns:
  309. (should_proceed, reason) 元组:
  310. - should_proceed: 是否应该继续执行
  311. - reason: 原因说明
  312. """
  313. if not window_config.get("ENABLED", False):
  314. return True, "窗口控制未启用"
  315. time_range = window_config.get("TIME_RANGE", {})
  316. start_time = time_range.get("START", "00:00")
  317. end_time = time_range.get("END", "23:59")
  318. # 检查时间范围
  319. if not self.is_in_time_range(start_time, end_time):
  320. now = self.get_time_func()
  321. return False, f"当前时间 {now.strftime('%H:%M')} 不在窗口 {start_time}-{end_time} 内"
  322. # 检查 once_per_day
  323. if window_config.get("ONCE_PER_DAY", False) and check_once_per_day_func:
  324. if check_once_per_day_func():
  325. return False, "今天已执行过"
  326. else:
  327. print(f"[{self.window_name}] 今天首次执行")
  328. return True, "在窗口内"
  329. def get_status(self, window_config: dict, check_once_per_day_func=None) -> dict:
  330. """
  331. 获取窗口状态信息
  332. Args:
  333. window_config: 窗口配置
  334. check_once_per_day_func: 检查今天是否已执行的函数
  335. Returns:
  336. 状态信息字典
  337. """
  338. now = self.get_time_func()
  339. status = {
  340. "enabled": window_config.get("ENABLED", False),
  341. "current_time": now.strftime("%H:%M:%S"),
  342. "current_date": now.strftime("%Y-%m-%d"),
  343. "timezone": str(now.tzinfo),
  344. }
  345. if status["enabled"]:
  346. time_range = window_config.get("TIME_RANGE", {})
  347. status["window_start"] = time_range.get("START", "00:00")
  348. status["window_end"] = time_range.get("END", "23:59")
  349. status["in_window"] = self.is_in_time_range(
  350. status["window_start"], status["window_end"]
  351. )
  352. status["once_per_day"] = window_config.get("ONCE_PER_DAY", False)
  353. if status["once_per_day"] and check_once_per_day_func:
  354. status["executed_today"] = check_once_per_day_func()
  355. return status