push_manager.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. # coding=utf-8
  2. """
  3. 推送记录管理模块
  4. 管理推送记录,支持每日只推送一次和时间窗口控制
  5. 通过 storage_backend 统一存储,支持本地 SQLite 和远程云存储
  6. """
  7. from datetime import datetime
  8. from typing import Callable, Optional, Any
  9. import pytz
  10. class PushRecordManager:
  11. """
  12. 推送记录管理器
  13. 通过 storage_backend 统一管理推送记录:
  14. - 本地环境:使用 LocalStorageBackend,数据存储在本地 SQLite
  15. - GitHub Actions:使用 RemoteStorageBackend,数据存储在云端
  16. 这样 once_per_day 功能在 GitHub Actions 上也能正常工作。
  17. """
  18. def __init__(
  19. self,
  20. storage_backend: Any,
  21. get_time_func: Optional[Callable[[], datetime]] = None,
  22. ):
  23. """
  24. 初始化推送记录管理器
  25. Args:
  26. storage_backend: 存储后端实例(LocalStorageBackend 或 RemoteStorageBackend)
  27. get_time_func: 获取当前时间的函数(应使用配置的时区)
  28. """
  29. self.storage_backend = storage_backend
  30. self.get_time = get_time_func or self._default_get_time
  31. print(f"[推送记录] 使用 {storage_backend.backend_name} 存储后端")
  32. def _default_get_time(self) -> datetime:
  33. """默认时间获取函数(使用 storage_backend 的时区配置)"""
  34. timezone = getattr(self.storage_backend, 'timezone', 'Asia/Shanghai')
  35. return datetime.now(pytz.timezone(timezone))
  36. def has_pushed_today(self) -> bool:
  37. """
  38. 检查今天是否已经推送过
  39. Returns:
  40. 是否已推送
  41. """
  42. return self.storage_backend.has_pushed_today()
  43. def record_push(self, report_type: str) -> bool:
  44. """
  45. 记录推送
  46. Args:
  47. report_type: 报告类型
  48. Returns:
  49. 是否记录成功
  50. """
  51. return self.storage_backend.record_push(report_type)
  52. def is_in_time_range(self, start_time: str, end_time: str) -> bool:
  53. """
  54. 检查当前时间是否在指定时间范围内
  55. Args:
  56. start_time: 开始时间(格式:HH:MM)
  57. end_time: 结束时间(格式:HH:MM)
  58. Returns:
  59. 是否在时间范围内
  60. """
  61. now = self.get_time()
  62. current_time = now.strftime("%H:%M")
  63. def normalize_time(time_str: str) -> str:
  64. """将时间字符串标准化为 HH:MM 格式"""
  65. try:
  66. parts = time_str.strip().split(":")
  67. if len(parts) != 2:
  68. raise ValueError(f"时间格式错误: {time_str}")
  69. hour = int(parts[0])
  70. minute = int(parts[1])
  71. if not (0 <= hour <= 23 and 0 <= minute <= 59):
  72. raise ValueError(f"时间范围错误: {time_str}")
  73. return f"{hour:02d}:{minute:02d}"
  74. except Exception as e:
  75. print(f"时间格式化错误 '{time_str}': {e}")
  76. return time_str
  77. normalized_start = normalize_time(start_time)
  78. normalized_end = normalize_time(end_time)
  79. normalized_current = normalize_time(current_time)
  80. result = normalized_start <= normalized_current <= normalized_end
  81. if not result:
  82. print(f"时间窗口判断:当前 {normalized_current},窗口 {normalized_start}-{normalized_end}")
  83. return result