push_manager.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. # coding=utf-8
  2. """
  3. 推送记录管理模块
  4. 管理推送记录,支持每日只推送一次和时间窗口控制
  5. 通过 storage_backend 统一存储,支持本地 SQLite 和远程云存储
  6. """
  7. from datetime import datetime
  8. from typing import Callable, Optional, Any, Tuple
  9. import pytz
  10. from trendradar.utils.time import DEFAULT_TIMEZONE, TimeWindowChecker
  11. class PushRecordManager:
  12. """
  13. 推送记录管理器
  14. 通过 storage_backend 统一管理推送记录:
  15. - 本地环境:使用 LocalStorageBackend,数据存储在本地 SQLite
  16. - GitHub Actions:使用 RemoteStorageBackend,数据存储在云端
  17. 这样 once_per_day 功能在 GitHub Actions 上也能正常工作。
  18. """
  19. def __init__(
  20. self,
  21. storage_backend: Any,
  22. get_time_func: Optional[Callable[[], datetime]] = None,
  23. ):
  24. """
  25. 初始化推送记录管理器
  26. Args:
  27. storage_backend: 存储后端实例(LocalStorageBackend 或 RemoteStorageBackend)
  28. get_time_func: 获取当前时间的函数(应使用配置的时区)
  29. """
  30. self.storage_backend = storage_backend
  31. self.get_time = get_time_func or self._default_get_time
  32. print(f"[推送记录] 使用 {storage_backend.backend_name} 存储后端")
  33. def _default_get_time(self) -> datetime:
  34. """默认时间获取函数(使用 storage_backend 的时区配置)"""
  35. timezone = getattr(self.storage_backend, 'timezone', DEFAULT_TIMEZONE)
  36. return datetime.now(pytz.timezone(timezone))
  37. def has_pushed_today(self) -> bool:
  38. """
  39. 检查今天是否已经推送过
  40. Returns:
  41. 是否已推送
  42. """
  43. return self.storage_backend.has_pushed_today()
  44. def record_push(self, report_type: str) -> bool:
  45. """
  46. 记录推送
  47. Args:
  48. report_type: 报告类型
  49. Returns:
  50. 是否记录成功
  51. """
  52. return self.storage_backend.record_push(report_type)
  53. def is_in_time_range(self, start_time: str, end_time: str) -> bool:
  54. """
  55. 检查当前时间是否在指定时间范围内
  56. Args:
  57. start_time: 开始时间(格式:HH:MM)
  58. end_time: 结束时间(格式:HH:MM)
  59. Returns:
  60. 是否在时间范围内
  61. """
  62. checker = TimeWindowChecker(
  63. storage_backend=self.storage_backend,
  64. get_time_func=self.get_time,
  65. window_name="推送窗口",
  66. )
  67. return checker.is_in_time_range(start_time, end_time)
  68. def check_push_window(self, window_config: dict) -> Tuple[bool, str]:
  69. """
  70. 检查推送窗口控制
  71. Args:
  72. window_config: 推送窗口配置
  73. Returns:
  74. (should_push, reason) 元组
  75. """
  76. checker = TimeWindowChecker(
  77. storage_backend=self.storage_backend,
  78. get_time_func=self.get_time,
  79. window_name="推送窗口",
  80. )
  81. return checker.check_window(
  82. window_config=window_config,
  83. check_once_per_day_func=self.has_pushed_today,
  84. )
  85. def check_ai_analysis_window(self, window_config: dict) -> Tuple[bool, str]:
  86. """
  87. 检查 AI 分析窗口控制
  88. Args:
  89. window_config: AI 分析窗口配置
  90. Returns:
  91. (should_analyze, reason) 元组
  92. """
  93. checker = TimeWindowChecker(
  94. storage_backend=self.storage_backend,
  95. get_time_func=self.get_time,
  96. window_name="AI 分析窗口",
  97. )
  98. return checker.check_window(
  99. window_config=window_config,
  100. check_once_per_day_func=self.storage_backend.has_ai_analyzed_today,
  101. )
  102. def get_push_status(self, window_config: dict) -> dict:
  103. """
  104. 获取推送状态信息
  105. Args:
  106. window_config: 推送窗口配置
  107. Returns:
  108. 状态信息字典
  109. """
  110. checker = TimeWindowChecker(
  111. storage_backend=self.storage_backend,
  112. get_time_func=self.get_time,
  113. window_name="推送窗口",
  114. )
  115. status = checker.get_status(
  116. window_config=window_config,
  117. check_once_per_day_func=self.has_pushed_today,
  118. )
  119. status["window_type"] = "push"
  120. return status
  121. def get_ai_analysis_status(self, window_config: dict) -> dict:
  122. """
  123. 获取 AI 分析状态信息
  124. Args:
  125. window_config: AI 分析窗口配置
  126. Returns:
  127. 状态信息字典
  128. """
  129. checker = TimeWindowChecker(
  130. storage_backend=self.storage_backend,
  131. get_time_func=self.get_time,
  132. window_name="AI 分析窗口",
  133. )
  134. status = checker.get_status(
  135. window_config=window_config,
  136. check_once_per_day_func=self.storage_backend.has_ai_analyzed_today,
  137. )
  138. status["window_type"] = "ai_analysis"
  139. return status
  140. def reset_push_state(self) -> bool:
  141. """
  142. 重置今日推送状态
  143. Returns:
  144. 是否重置成功
  145. """
  146. try:
  147. # 通过存储后端重置推送记录
  148. if hasattr(self.storage_backend, 'reset_push_state'):
  149. return self.storage_backend.reset_push_state()
  150. else:
  151. print("[推送记录] 存储后端不支持重置推送状态")
  152. return False
  153. except Exception as e:
  154. print(f"[推送记录] 重置推送状态失败: {e}")
  155. return False
  156. def reset_ai_analysis_state(self) -> bool:
  157. """
  158. 重置今日 AI 分析状态
  159. Returns:
  160. 是否重置成功
  161. """
  162. try:
  163. if hasattr(self.storage_backend, 'reset_ai_analysis_state'):
  164. return self.storage_backend.reset_ai_analysis_state()
  165. else:
  166. print("[推送记录] 存储后端不支持重置 AI 分析状态")
  167. return False
  168. except Exception as e:
  169. print(f"[推送记录] 重置 AI 分析状态失败: {e}")
  170. return False