base.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457
  1. # coding=utf-8
  2. """
  3. 存储后端抽象基类和数据模型
  4. 定义统一的存储接口,所有存储后端都需要实现这些方法
  5. """
  6. from abc import ABC, abstractmethod
  7. from dataclasses import dataclass, field
  8. from datetime import datetime
  9. from typing import Dict, List, Optional, Any
  10. import json
  11. @dataclass
  12. class NewsItem:
  13. """新闻条目数据模型"""
  14. title: str # 新闻标题
  15. source_id: str # 来源平台ID(如 toutiao, baidu)
  16. source_name: str = "" # 来源平台名称(运行时使用,数据库不存储)
  17. rank: int = 0 # 排名
  18. url: str = "" # 链接 URL
  19. mobile_url: str = "" # 移动端 URL
  20. crawl_time: str = "" # 抓取时间(HH:MM 格式)
  21. # 统计信息(用于分析)
  22. ranks: List[int] = field(default_factory=list) # 历史排名列表
  23. first_time: str = "" # 首次出现时间
  24. last_time: str = "" # 最后出现时间
  25. count: int = 1 # 出现次数
  26. def to_dict(self) -> Dict[str, Any]:
  27. """转换为字典"""
  28. return {
  29. "title": self.title,
  30. "source_id": self.source_id,
  31. "source_name": self.source_name,
  32. "rank": self.rank,
  33. "url": self.url,
  34. "mobile_url": self.mobile_url,
  35. "crawl_time": self.crawl_time,
  36. "ranks": self.ranks,
  37. "first_time": self.first_time,
  38. "last_time": self.last_time,
  39. "count": self.count,
  40. }
  41. @classmethod
  42. def from_dict(cls, data: Dict[str, Any]) -> "NewsItem":
  43. """从字典创建"""
  44. return cls(
  45. title=data.get("title", ""),
  46. source_id=data.get("source_id", ""),
  47. source_name=data.get("source_name", ""),
  48. rank=data.get("rank", 0),
  49. url=data.get("url", ""),
  50. mobile_url=data.get("mobile_url", ""),
  51. crawl_time=data.get("crawl_time", ""),
  52. ranks=data.get("ranks", []),
  53. first_time=data.get("first_time", ""),
  54. last_time=data.get("last_time", ""),
  55. count=data.get("count", 1),
  56. )
  57. @dataclass
  58. class NewsData:
  59. """
  60. 新闻数据集合
  61. 结构:
  62. - date: 日期(YYYY-MM-DD)
  63. - crawl_time: 抓取时间(HH时MM分)
  64. - items: 按来源ID分组的新闻条目
  65. - id_to_name: 来源ID到名称的映射
  66. - failed_ids: 失败的来源ID列表
  67. """
  68. date: str # 日期
  69. crawl_time: str # 抓取时间
  70. items: Dict[str, List[NewsItem]] # 按来源分组的新闻
  71. id_to_name: Dict[str, str] = field(default_factory=dict) # ID到名称映射
  72. failed_ids: List[str] = field(default_factory=list) # 失败的ID
  73. def to_dict(self) -> Dict[str, Any]:
  74. """转换为字典"""
  75. items_dict = {}
  76. for source_id, news_list in self.items.items():
  77. items_dict[source_id] = [item.to_dict() for item in news_list]
  78. return {
  79. "date": self.date,
  80. "crawl_time": self.crawl_time,
  81. "items": items_dict,
  82. "id_to_name": self.id_to_name,
  83. "failed_ids": self.failed_ids,
  84. }
  85. @classmethod
  86. def from_dict(cls, data: Dict[str, Any]) -> "NewsData":
  87. """从字典创建"""
  88. items = {}
  89. items_data = data.get("items", {})
  90. for source_id, news_list in items_data.items():
  91. items[source_id] = [NewsItem.from_dict(item) for item in news_list]
  92. return cls(
  93. date=data.get("date", ""),
  94. crawl_time=data.get("crawl_time", ""),
  95. items=items,
  96. id_to_name=data.get("id_to_name", {}),
  97. failed_ids=data.get("failed_ids", []),
  98. )
  99. def get_total_count(self) -> int:
  100. """获取新闻总数"""
  101. return sum(len(news_list) for news_list in self.items.values())
  102. def merge_with(self, other: "NewsData") -> "NewsData":
  103. """
  104. 合并另一个 NewsData 到当前数据
  105. 合并规则:
  106. - 相同 source_id + title 的新闻合并排名历史
  107. - 更新 last_time 和 count
  108. - 保留较早的 first_time
  109. """
  110. merged_items = {}
  111. # 复制当前数据
  112. for source_id, news_list in self.items.items():
  113. merged_items[source_id] = {item.title: item for item in news_list}
  114. # 合并其他数据
  115. for source_id, news_list in other.items.items():
  116. if source_id not in merged_items:
  117. merged_items[source_id] = {}
  118. for item in news_list:
  119. if item.title in merged_items[source_id]:
  120. # 合并已存在的新闻
  121. existing = merged_items[source_id][item.title]
  122. # 合并排名
  123. existing_ranks = set(existing.ranks) if existing.ranks else set()
  124. new_ranks = set(item.ranks) if item.ranks else set()
  125. merged_ranks = sorted(existing_ranks | new_ranks)
  126. existing.ranks = merged_ranks
  127. # 更新时间
  128. if item.first_time and (not existing.first_time or item.first_time < existing.first_time):
  129. existing.first_time = item.first_time
  130. if item.last_time and (not existing.last_time or item.last_time > existing.last_time):
  131. existing.last_time = item.last_time
  132. # 更新计数
  133. existing.count += 1
  134. # 保留URL(如果原来没有)
  135. if not existing.url and item.url:
  136. existing.url = item.url
  137. if not existing.mobile_url and item.mobile_url:
  138. existing.mobile_url = item.mobile_url
  139. else:
  140. # 添加新新闻
  141. merged_items[source_id][item.title] = item
  142. # 转换回列表格式
  143. final_items = {}
  144. for source_id, items_dict in merged_items.items():
  145. final_items[source_id] = list(items_dict.values())
  146. # 合并 id_to_name
  147. merged_id_to_name = {**self.id_to_name, **other.id_to_name}
  148. # 合并 failed_ids(去重)
  149. merged_failed_ids = list(set(self.failed_ids + other.failed_ids))
  150. return NewsData(
  151. date=self.date or other.date,
  152. crawl_time=other.crawl_time, # 使用较新的抓取时间
  153. items=final_items,
  154. id_to_name=merged_id_to_name,
  155. failed_ids=merged_failed_ids,
  156. )
  157. class StorageBackend(ABC):
  158. """
  159. 存储后端抽象基类
  160. 所有存储后端都需要实现这些方法,以支持:
  161. - 保存新闻数据
  162. - 读取当天所有数据
  163. - 检测新增新闻
  164. - 生成报告文件(TXT/HTML)
  165. """
  166. @abstractmethod
  167. def save_news_data(self, data: NewsData) -> bool:
  168. """
  169. 保存新闻数据
  170. Args:
  171. data: 新闻数据
  172. Returns:
  173. 是否保存成功
  174. """
  175. pass
  176. @abstractmethod
  177. def get_today_all_data(self, date: Optional[str] = None) -> Optional[NewsData]:
  178. """
  179. 获取指定日期的所有新闻数据
  180. Args:
  181. date: 日期字符串(YYYY-MM-DD),默认为今天
  182. Returns:
  183. 合并后的新闻数据,如果没有数据返回 None
  184. """
  185. pass
  186. @abstractmethod
  187. def get_latest_crawl_data(self, date: Optional[str] = None) -> Optional[NewsData]:
  188. """
  189. 获取最新一次抓取的数据
  190. Args:
  191. date: 日期字符串,默认为今天
  192. Returns:
  193. 最新抓取的新闻数据
  194. """
  195. pass
  196. @abstractmethod
  197. def detect_new_titles(self, current_data: NewsData) -> Dict[str, Dict]:
  198. """
  199. 检测新增的标题
  200. Args:
  201. current_data: 当前抓取的数据
  202. Returns:
  203. 新增的标题数据,格式: {source_id: {title: title_data}}
  204. """
  205. pass
  206. @abstractmethod
  207. def save_txt_snapshot(self, data: NewsData) -> Optional[str]:
  208. """
  209. 保存 TXT 快照(可选功能,本地环境可用)
  210. Args:
  211. data: 新闻数据
  212. Returns:
  213. 保存的文件路径,如果不支持返回 None
  214. """
  215. pass
  216. @abstractmethod
  217. def save_html_report(self, html_content: str, filename: str, is_summary: bool = False) -> Optional[str]:
  218. """
  219. 保存 HTML 报告
  220. Args:
  221. html_content: HTML 内容
  222. filename: 文件名
  223. is_summary: 是否为汇总报告
  224. Returns:
  225. 保存的文件路径
  226. """
  227. pass
  228. @abstractmethod
  229. def is_first_crawl_today(self, date: Optional[str] = None) -> bool:
  230. """
  231. 检查是否是当天第一次抓取
  232. Args:
  233. date: 日期字符串,默认为今天
  234. Returns:
  235. 是否是第一次抓取
  236. """
  237. pass
  238. @abstractmethod
  239. def cleanup(self) -> None:
  240. """
  241. 清理资源(如临时文件、数据库连接等)
  242. """
  243. pass
  244. @abstractmethod
  245. def cleanup_old_data(self, retention_days: int) -> int:
  246. """
  247. 清理过期数据
  248. Args:
  249. retention_days: 保留天数(0 表示不清理)
  250. Returns:
  251. 删除的日期目录数量
  252. """
  253. pass
  254. @property
  255. @abstractmethod
  256. def backend_name(self) -> str:
  257. """
  258. 存储后端名称
  259. """
  260. pass
  261. @property
  262. @abstractmethod
  263. def supports_txt(self) -> bool:
  264. """
  265. 是否支持生成 TXT 快照
  266. """
  267. pass
  268. # === 推送记录相关方法 ===
  269. @abstractmethod
  270. def has_pushed_today(self, date: Optional[str] = None) -> bool:
  271. """
  272. 检查指定日期是否已推送过
  273. Args:
  274. date: 日期字符串(YYYY-MM-DD),默认为今天
  275. Returns:
  276. 是否已推送
  277. """
  278. pass
  279. @abstractmethod
  280. def record_push(self, report_type: str, date: Optional[str] = None) -> bool:
  281. """
  282. 记录推送
  283. Args:
  284. report_type: 报告类型
  285. date: 日期字符串(YYYY-MM-DD),默认为今天
  286. Returns:
  287. 是否记录成功
  288. """
  289. pass
  290. def convert_crawl_results_to_news_data(
  291. results: Dict[str, Dict],
  292. id_to_name: Dict[str, str],
  293. failed_ids: List[str],
  294. crawl_time: str,
  295. crawl_date: str,
  296. ) -> NewsData:
  297. """
  298. 将爬虫结果转换为 NewsData 格式
  299. Args:
  300. results: 爬虫返回的结果 {source_id: {title: {ranks: [], url: "", mobileUrl: ""}}}
  301. id_to_name: 来源ID到名称的映射
  302. failed_ids: 失败的来源ID
  303. crawl_time: 抓取时间(HH:MM)
  304. crawl_date: 抓取日期(YYYY-MM-DD)
  305. Returns:
  306. NewsData 对象
  307. """
  308. items = {}
  309. for source_id, titles_data in results.items():
  310. source_name = id_to_name.get(source_id, source_id)
  311. news_list = []
  312. for title, data in titles_data.items():
  313. if isinstance(data, dict):
  314. ranks = data.get("ranks", [])
  315. url = data.get("url", "")
  316. mobile_url = data.get("mobileUrl", "")
  317. else:
  318. # 兼容旧格式
  319. ranks = data if isinstance(data, list) else []
  320. url = ""
  321. mobile_url = ""
  322. rank = ranks[0] if ranks else 99
  323. news_item = NewsItem(
  324. title=title,
  325. source_id=source_id,
  326. source_name=source_name,
  327. rank=rank,
  328. url=url,
  329. mobile_url=mobile_url,
  330. crawl_time=crawl_time,
  331. ranks=ranks,
  332. first_time=crawl_time,
  333. last_time=crawl_time,
  334. count=1,
  335. )
  336. news_list.append(news_item)
  337. items[source_id] = news_list
  338. return NewsData(
  339. date=crawl_date,
  340. crawl_time=crawl_time,
  341. items=items,
  342. id_to_name=id_to_name,
  343. failed_ids=failed_ids,
  344. )
  345. def convert_news_data_to_results(data: NewsData) -> tuple:
  346. """
  347. 将 NewsData 转换回原有的 results 格式(用于兼容现有代码)
  348. Args:
  349. data: NewsData 对象
  350. Returns:
  351. (results, id_to_name, title_info) 元组
  352. """
  353. results = {}
  354. title_info = {}
  355. for source_id, news_list in data.items.items():
  356. results[source_id] = {}
  357. title_info[source_id] = {}
  358. for item in news_list:
  359. results[source_id][item.title] = {
  360. "ranks": item.ranks,
  361. "url": item.url,
  362. "mobileUrl": item.mobile_url,
  363. }
  364. title_info[source_id][item.title] = {
  365. "first_time": item.first_time,
  366. "last_time": item.last_time,
  367. "count": item.count,
  368. "ranks": item.ranks,
  369. "url": item.url,
  370. "mobileUrl": item.mobile_url,
  371. }
  372. return results, data.id_to_name, title_info