base.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455
  1. # coding=utf-8
  2. """
  3. 存储后端抽象基类和数据模型
  4. 定义统一的存储接口,所有存储后端都需要实现这些方法
  5. """
  6. from abc import ABC, abstractmethod
  7. from dataclasses import dataclass, field
  8. from typing import Dict, List, Optional, Any
  9. @dataclass
  10. class NewsItem:
  11. """新闻条目数据模型"""
  12. title: str # 新闻标题
  13. source_id: str # 来源平台ID(如 toutiao, baidu)
  14. source_name: str = "" # 来源平台名称(运行时使用,数据库不存储)
  15. rank: int = 0 # 排名
  16. url: str = "" # 链接 URL
  17. mobile_url: str = "" # 移动端 URL
  18. crawl_time: str = "" # 抓取时间(HH:MM 格式)
  19. # 统计信息(用于分析)
  20. ranks: List[int] = field(default_factory=list) # 历史排名列表
  21. first_time: str = "" # 首次出现时间
  22. last_time: str = "" # 最后出现时间
  23. count: int = 1 # 出现次数
  24. def to_dict(self) -> Dict[str, Any]:
  25. """转换为字典"""
  26. return {
  27. "title": self.title,
  28. "source_id": self.source_id,
  29. "source_name": self.source_name,
  30. "rank": self.rank,
  31. "url": self.url,
  32. "mobile_url": self.mobile_url,
  33. "crawl_time": self.crawl_time,
  34. "ranks": self.ranks,
  35. "first_time": self.first_time,
  36. "last_time": self.last_time,
  37. "count": self.count,
  38. }
  39. @classmethod
  40. def from_dict(cls, data: Dict[str, Any]) -> "NewsItem":
  41. """从字典创建"""
  42. return cls(
  43. title=data.get("title", ""),
  44. source_id=data.get("source_id", ""),
  45. source_name=data.get("source_name", ""),
  46. rank=data.get("rank", 0),
  47. url=data.get("url", ""),
  48. mobile_url=data.get("mobile_url", ""),
  49. crawl_time=data.get("crawl_time", ""),
  50. ranks=data.get("ranks", []),
  51. first_time=data.get("first_time", ""),
  52. last_time=data.get("last_time", ""),
  53. count=data.get("count", 1),
  54. )
  55. @dataclass
  56. class NewsData:
  57. """
  58. 新闻数据集合
  59. 结构:
  60. - date: 日期(YYYY-MM-DD)
  61. - crawl_time: 抓取时间(HH时MM分)
  62. - items: 按来源ID分组的新闻条目
  63. - id_to_name: 来源ID到名称的映射
  64. - failed_ids: 失败的来源ID列表
  65. """
  66. date: str # 日期
  67. crawl_time: str # 抓取时间
  68. items: Dict[str, List[NewsItem]] # 按来源分组的新闻
  69. id_to_name: Dict[str, str] = field(default_factory=dict) # ID到名称映射
  70. failed_ids: List[str] = field(default_factory=list) # 失败的ID
  71. def to_dict(self) -> Dict[str, Any]:
  72. """转换为字典"""
  73. items_dict = {}
  74. for source_id, news_list in self.items.items():
  75. items_dict[source_id] = [item.to_dict() for item in news_list]
  76. return {
  77. "date": self.date,
  78. "crawl_time": self.crawl_time,
  79. "items": items_dict,
  80. "id_to_name": self.id_to_name,
  81. "failed_ids": self.failed_ids,
  82. }
  83. @classmethod
  84. def from_dict(cls, data: Dict[str, Any]) -> "NewsData":
  85. """从字典创建"""
  86. items = {}
  87. items_data = data.get("items", {})
  88. for source_id, news_list in items_data.items():
  89. items[source_id] = [NewsItem.from_dict(item) for item in news_list]
  90. return cls(
  91. date=data.get("date", ""),
  92. crawl_time=data.get("crawl_time", ""),
  93. items=items,
  94. id_to_name=data.get("id_to_name", {}),
  95. failed_ids=data.get("failed_ids", []),
  96. )
  97. def get_total_count(self) -> int:
  98. """获取新闻总数"""
  99. return sum(len(news_list) for news_list in self.items.values())
  100. def merge_with(self, other: "NewsData") -> "NewsData":
  101. """
  102. 合并另一个 NewsData 到当前数据
  103. 合并规则:
  104. - 相同 source_id + title 的新闻合并排名历史
  105. - 更新 last_time 和 count
  106. - 保留较早的 first_time
  107. """
  108. merged_items = {}
  109. # 复制当前数据
  110. for source_id, news_list in self.items.items():
  111. merged_items[source_id] = {item.title: item for item in news_list}
  112. # 合并其他数据
  113. for source_id, news_list in other.items.items():
  114. if source_id not in merged_items:
  115. merged_items[source_id] = {}
  116. for item in news_list:
  117. if item.title in merged_items[source_id]:
  118. # 合并已存在的新闻
  119. existing = merged_items[source_id][item.title]
  120. # 合并排名
  121. existing_ranks = set(existing.ranks) if existing.ranks else set()
  122. new_ranks = set(item.ranks) if item.ranks else set()
  123. merged_ranks = sorted(existing_ranks | new_ranks)
  124. existing.ranks = merged_ranks
  125. # 更新时间
  126. if item.first_time and (not existing.first_time or item.first_time < existing.first_time):
  127. existing.first_time = item.first_time
  128. if item.last_time and (not existing.last_time or item.last_time > existing.last_time):
  129. existing.last_time = item.last_time
  130. # 更新计数
  131. existing.count += 1
  132. # 保留URL(如果原来没有)
  133. if not existing.url and item.url:
  134. existing.url = item.url
  135. if not existing.mobile_url and item.mobile_url:
  136. existing.mobile_url = item.mobile_url
  137. else:
  138. # 添加新新闻
  139. merged_items[source_id][item.title] = item
  140. # 转换回列表格式
  141. final_items = {}
  142. for source_id, items_dict in merged_items.items():
  143. final_items[source_id] = list(items_dict.values())
  144. # 合并 id_to_name
  145. merged_id_to_name = {**self.id_to_name, **other.id_to_name}
  146. # 合并 failed_ids(去重)
  147. merged_failed_ids = list(set(self.failed_ids + other.failed_ids))
  148. return NewsData(
  149. date=self.date or other.date,
  150. crawl_time=other.crawl_time, # 使用较新的抓取时间
  151. items=final_items,
  152. id_to_name=merged_id_to_name,
  153. failed_ids=merged_failed_ids,
  154. )
  155. class StorageBackend(ABC):
  156. """
  157. 存储后端抽象基类
  158. 所有存储后端都需要实现这些方法,以支持:
  159. - 保存新闻数据
  160. - 读取当天所有数据
  161. - 检测新增新闻
  162. - 生成报告文件(TXT/HTML)
  163. """
  164. @abstractmethod
  165. def save_news_data(self, data: NewsData) -> bool:
  166. """
  167. 保存新闻数据
  168. Args:
  169. data: 新闻数据
  170. Returns:
  171. 是否保存成功
  172. """
  173. pass
  174. @abstractmethod
  175. def get_today_all_data(self, date: Optional[str] = None) -> Optional[NewsData]:
  176. """
  177. 获取指定日期的所有新闻数据
  178. Args:
  179. date: 日期字符串(YYYY-MM-DD),默认为今天
  180. Returns:
  181. 合并后的新闻数据,如果没有数据返回 None
  182. """
  183. pass
  184. @abstractmethod
  185. def get_latest_crawl_data(self, date: Optional[str] = None) -> Optional[NewsData]:
  186. """
  187. 获取最新一次抓取的数据
  188. Args:
  189. date: 日期字符串,默认为今天
  190. Returns:
  191. 最新抓取的新闻数据
  192. """
  193. pass
  194. @abstractmethod
  195. def detect_new_titles(self, current_data: NewsData) -> Dict[str, Dict]:
  196. """
  197. 检测新增的标题
  198. Args:
  199. current_data: 当前抓取的数据
  200. Returns:
  201. 新增的标题数据,格式: {source_id: {title: title_data}}
  202. """
  203. pass
  204. @abstractmethod
  205. def save_txt_snapshot(self, data: NewsData) -> Optional[str]:
  206. """
  207. 保存 TXT 快照(可选功能,本地环境可用)
  208. Args:
  209. data: 新闻数据
  210. Returns:
  211. 保存的文件路径,如果不支持返回 None
  212. """
  213. pass
  214. @abstractmethod
  215. def save_html_report(self, html_content: str, filename: str, is_summary: bool = False) -> Optional[str]:
  216. """
  217. 保存 HTML 报告
  218. Args:
  219. html_content: HTML 内容
  220. filename: 文件名
  221. is_summary: 是否为汇总报告
  222. Returns:
  223. 保存的文件路径
  224. """
  225. pass
  226. @abstractmethod
  227. def is_first_crawl_today(self, date: Optional[str] = None) -> bool:
  228. """
  229. 检查是否是当天第一次抓取
  230. Args:
  231. date: 日期字符串,默认为今天
  232. Returns:
  233. 是否是第一次抓取
  234. """
  235. pass
  236. @abstractmethod
  237. def cleanup(self) -> None:
  238. """
  239. 清理资源(如临时文件、数据库连接等)
  240. """
  241. pass
  242. @abstractmethod
  243. def cleanup_old_data(self, retention_days: int) -> int:
  244. """
  245. 清理过期数据
  246. Args:
  247. retention_days: 保留天数(0 表示不清理)
  248. Returns:
  249. 删除的日期目录数量
  250. """
  251. pass
  252. @property
  253. @abstractmethod
  254. def backend_name(self) -> str:
  255. """
  256. 存储后端名称
  257. """
  258. pass
  259. @property
  260. @abstractmethod
  261. def supports_txt(self) -> bool:
  262. """
  263. 是否支持生成 TXT 快照
  264. """
  265. pass
  266. # === 推送记录相关方法 ===
  267. @abstractmethod
  268. def has_pushed_today(self, date: Optional[str] = None) -> bool:
  269. """
  270. 检查指定日期是否已推送过
  271. Args:
  272. date: 日期字符串(YYYY-MM-DD),默认为今天
  273. Returns:
  274. 是否已推送
  275. """
  276. pass
  277. @abstractmethod
  278. def record_push(self, report_type: str, date: Optional[str] = None) -> bool:
  279. """
  280. 记录推送
  281. Args:
  282. report_type: 报告类型
  283. date: 日期字符串(YYYY-MM-DD),默认为今天
  284. Returns:
  285. 是否记录成功
  286. """
  287. pass
  288. def convert_crawl_results_to_news_data(
  289. results: Dict[str, Dict],
  290. id_to_name: Dict[str, str],
  291. failed_ids: List[str],
  292. crawl_time: str,
  293. crawl_date: str,
  294. ) -> NewsData:
  295. """
  296. 将爬虫结果转换为 NewsData 格式
  297. Args:
  298. results: 爬虫返回的结果 {source_id: {title: {ranks: [], url: "", mobileUrl: ""}}}
  299. id_to_name: 来源ID到名称的映射
  300. failed_ids: 失败的来源ID
  301. crawl_time: 抓取时间(HH:MM)
  302. crawl_date: 抓取日期(YYYY-MM-DD)
  303. Returns:
  304. NewsData 对象
  305. """
  306. items = {}
  307. for source_id, titles_data in results.items():
  308. source_name = id_to_name.get(source_id, source_id)
  309. news_list = []
  310. for title, data in titles_data.items():
  311. if isinstance(data, dict):
  312. ranks = data.get("ranks", [])
  313. url = data.get("url", "")
  314. mobile_url = data.get("mobileUrl", "")
  315. else:
  316. # 兼容旧格式
  317. ranks = data if isinstance(data, list) else []
  318. url = ""
  319. mobile_url = ""
  320. rank = ranks[0] if ranks else 99
  321. news_item = NewsItem(
  322. title=title,
  323. source_id=source_id,
  324. source_name=source_name,
  325. rank=rank,
  326. url=url,
  327. mobile_url=mobile_url,
  328. crawl_time=crawl_time,
  329. ranks=ranks,
  330. first_time=crawl_time,
  331. last_time=crawl_time,
  332. count=1,
  333. )
  334. news_list.append(news_item)
  335. items[source_id] = news_list
  336. return NewsData(
  337. date=crawl_date,
  338. crawl_time=crawl_time,
  339. items=items,
  340. id_to_name=id_to_name,
  341. failed_ids=failed_ids,
  342. )
  343. def convert_news_data_to_results(data: NewsData) -> tuple:
  344. """
  345. 将 NewsData 转换回原有的 results 格式(用于兼容现有代码)
  346. Args:
  347. data: NewsData 对象
  348. Returns:
  349. (results, id_to_name, title_info) 元组
  350. """
  351. results = {}
  352. title_info = {}
  353. for source_id, news_list in data.items.items():
  354. results[source_id] = {}
  355. title_info[source_id] = {}
  356. for item in news_list:
  357. results[source_id][item.title] = {
  358. "ranks": item.ranks,
  359. "url": item.url,
  360. "mobileUrl": item.mobile_url,
  361. }
  362. title_info[source_id][item.title] = {
  363. "first_time": item.first_time,
  364. "last_time": item.last_time,
  365. "count": item.count,
  366. "ranks": item.ranks,
  367. "url": item.url,
  368. "mobileUrl": item.mobile_url,
  369. }
  370. return results, data.id_to_name, title_info