base.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561
  1. # coding=utf-8
  2. """
  3. 存储后端抽象基类和数据模型
  4. 定义统一的存储接口,所有存储后端都需要实现这些方法
  5. """
  6. from abc import ABC, abstractmethod
  7. from dataclasses import dataclass, field
  8. from typing import Dict, List, Optional, Any
  9. @dataclass
  10. class NewsItem:
  11. """新闻条目数据模型(热榜数据)"""
  12. title: str # 新闻标题
  13. source_id: str # 来源平台ID(如 toutiao, baidu)
  14. source_name: str = "" # 来源平台名称(运行时使用,数据库不存储)
  15. rank: int = 0 # 排名
  16. url: str = "" # 链接 URL
  17. mobile_url: str = "" # 移动端 URL
  18. crawl_time: str = "" # 抓取时间(HH:MM 格式)
  19. # 统计信息(用于分析)
  20. ranks: List[int] = field(default_factory=list) # 历史排名列表
  21. first_time: str = "" # 首次出现时间
  22. last_time: str = "" # 最后出现时间
  23. count: int = 1 # 出现次数
  24. def to_dict(self) -> Dict[str, Any]:
  25. """转换为字典"""
  26. return {
  27. "title": self.title,
  28. "source_id": self.source_id,
  29. "source_name": self.source_name,
  30. "rank": self.rank,
  31. "url": self.url,
  32. "mobile_url": self.mobile_url,
  33. "crawl_time": self.crawl_time,
  34. "ranks": self.ranks,
  35. "first_time": self.first_time,
  36. "last_time": self.last_time,
  37. "count": self.count,
  38. }
  39. @classmethod
  40. def from_dict(cls, data: Dict[str, Any]) -> "NewsItem":
  41. """从字典创建"""
  42. return cls(
  43. title=data.get("title", ""),
  44. source_id=data.get("source_id", ""),
  45. source_name=data.get("source_name", ""),
  46. rank=data.get("rank", 0),
  47. url=data.get("url", ""),
  48. mobile_url=data.get("mobile_url", ""),
  49. crawl_time=data.get("crawl_time", ""),
  50. ranks=data.get("ranks", []),
  51. first_time=data.get("first_time", ""),
  52. last_time=data.get("last_time", ""),
  53. count=data.get("count", 1),
  54. )
  55. @dataclass
  56. class RSSItem:
  57. """RSS 条目数据模型"""
  58. title: str # 标题
  59. feed_id: str # RSS 源 ID(如 "hacker-news")
  60. feed_name: str = "" # RSS 源名称(运行时使用)
  61. url: str = "" # 文章链接
  62. published_at: str = "" # RSS 发布时间(ISO 格式)
  63. summary: str = "" # 摘要/描述
  64. author: str = "" # 作者
  65. crawl_time: str = "" # 抓取时间(HH:MM 格式)
  66. # 统计信息
  67. first_time: str = "" # 首次抓取时间
  68. last_time: str = "" # 最后抓取时间
  69. count: int = 1 # 抓取次数
  70. def to_dict(self) -> Dict[str, Any]:
  71. """转换为字典"""
  72. return {
  73. "title": self.title,
  74. "feed_id": self.feed_id,
  75. "feed_name": self.feed_name,
  76. "url": self.url,
  77. "published_at": self.published_at,
  78. "summary": self.summary,
  79. "author": self.author,
  80. "crawl_time": self.crawl_time,
  81. "first_time": self.first_time,
  82. "last_time": self.last_time,
  83. "count": self.count,
  84. }
  85. @classmethod
  86. def from_dict(cls, data: Dict[str, Any]) -> "RSSItem":
  87. """从字典创建"""
  88. return cls(
  89. title=data.get("title", ""),
  90. feed_id=data.get("feed_id", ""),
  91. feed_name=data.get("feed_name", ""),
  92. url=data.get("url", ""),
  93. published_at=data.get("published_at", ""),
  94. summary=data.get("summary", ""),
  95. author=data.get("author", ""),
  96. crawl_time=data.get("crawl_time", ""),
  97. first_time=data.get("first_time", ""),
  98. last_time=data.get("last_time", ""),
  99. count=data.get("count", 1),
  100. )
  101. @dataclass
  102. class RSSData:
  103. """
  104. RSS 数据集合
  105. 结构:
  106. - date: 日期(YYYY-MM-DD)
  107. - crawl_time: 抓取时间(HH:MM)
  108. - items: 按 feed_id 分组的 RSS 条目
  109. - id_to_name: feed_id 到名称的映射
  110. - failed_ids: 失败的 feed_id 列表
  111. """
  112. date: str # 日期
  113. crawl_time: str # 抓取时间
  114. items: Dict[str, List[RSSItem]] # 按 feed_id 分组的条目
  115. id_to_name: Dict[str, str] = field(default_factory=dict) # ID到名称映射
  116. failed_ids: List[str] = field(default_factory=list) # 失败的ID
  117. def to_dict(self) -> Dict[str, Any]:
  118. """转换为字典"""
  119. items_dict = {}
  120. for feed_id, rss_list in self.items.items():
  121. items_dict[feed_id] = [item.to_dict() for item in rss_list]
  122. return {
  123. "date": self.date,
  124. "crawl_time": self.crawl_time,
  125. "items": items_dict,
  126. "id_to_name": self.id_to_name,
  127. "failed_ids": self.failed_ids,
  128. }
  129. @classmethod
  130. def from_dict(cls, data: Dict[str, Any]) -> "RSSData":
  131. """从字典创建"""
  132. items = {}
  133. items_data = data.get("items", {})
  134. for feed_id, rss_list in items_data.items():
  135. items[feed_id] = [RSSItem.from_dict(item) for item in rss_list]
  136. return cls(
  137. date=data.get("date", ""),
  138. crawl_time=data.get("crawl_time", ""),
  139. items=items,
  140. id_to_name=data.get("id_to_name", {}),
  141. failed_ids=data.get("failed_ids", []),
  142. )
  143. def get_total_count(self) -> int:
  144. """获取条目总数"""
  145. return sum(len(rss_list) for rss_list in self.items.values())
  146. @dataclass
  147. class NewsData:
  148. """
  149. 新闻数据集合
  150. 结构:
  151. - date: 日期(YYYY-MM-DD)
  152. - crawl_time: 抓取时间(HH时MM分)
  153. - items: 按来源ID分组的新闻条目
  154. - id_to_name: 来源ID到名称的映射
  155. - failed_ids: 失败的来源ID列表
  156. """
  157. date: str # 日期
  158. crawl_time: str # 抓取时间
  159. items: Dict[str, List[NewsItem]] # 按来源分组的新闻
  160. id_to_name: Dict[str, str] = field(default_factory=dict) # ID到名称映射
  161. failed_ids: List[str] = field(default_factory=list) # 失败的ID
  162. def to_dict(self) -> Dict[str, Any]:
  163. """转换为字典"""
  164. items_dict = {}
  165. for source_id, news_list in self.items.items():
  166. items_dict[source_id] = [item.to_dict() for item in news_list]
  167. return {
  168. "date": self.date,
  169. "crawl_time": self.crawl_time,
  170. "items": items_dict,
  171. "id_to_name": self.id_to_name,
  172. "failed_ids": self.failed_ids,
  173. }
  174. @classmethod
  175. def from_dict(cls, data: Dict[str, Any]) -> "NewsData":
  176. """从字典创建"""
  177. items = {}
  178. items_data = data.get("items", {})
  179. for source_id, news_list in items_data.items():
  180. items[source_id] = [NewsItem.from_dict(item) for item in news_list]
  181. return cls(
  182. date=data.get("date", ""),
  183. crawl_time=data.get("crawl_time", ""),
  184. items=items,
  185. id_to_name=data.get("id_to_name", {}),
  186. failed_ids=data.get("failed_ids", []),
  187. )
  188. def get_total_count(self) -> int:
  189. """获取新闻总数"""
  190. return sum(len(news_list) for news_list in self.items.values())
  191. def merge_with(self, other: "NewsData") -> "NewsData":
  192. """
  193. 合并另一个 NewsData 到当前数据
  194. 合并规则:
  195. - 相同 source_id + title 的新闻合并排名历史
  196. - 更新 last_time 和 count
  197. - 保留较早的 first_time
  198. """
  199. merged_items = {}
  200. # 复制当前数据
  201. for source_id, news_list in self.items.items():
  202. merged_items[source_id] = {item.title: item for item in news_list}
  203. # 合并其他数据
  204. for source_id, news_list in other.items.items():
  205. if source_id not in merged_items:
  206. merged_items[source_id] = {}
  207. for item in news_list:
  208. if item.title in merged_items[source_id]:
  209. # 合并已存在的新闻
  210. existing = merged_items[source_id][item.title]
  211. # 合并排名
  212. existing_ranks = set(existing.ranks) if existing.ranks else set()
  213. new_ranks = set(item.ranks) if item.ranks else set()
  214. merged_ranks = sorted(existing_ranks | new_ranks)
  215. existing.ranks = merged_ranks
  216. # 更新时间
  217. if item.first_time and (not existing.first_time or item.first_time < existing.first_time):
  218. existing.first_time = item.first_time
  219. if item.last_time and (not existing.last_time or item.last_time > existing.last_time):
  220. existing.last_time = item.last_time
  221. # 更新计数
  222. existing.count += 1
  223. # 保留URL(如果原来没有)
  224. if not existing.url and item.url:
  225. existing.url = item.url
  226. if not existing.mobile_url and item.mobile_url:
  227. existing.mobile_url = item.mobile_url
  228. else:
  229. # 添加新新闻
  230. merged_items[source_id][item.title] = item
  231. # 转换回列表格式
  232. final_items = {}
  233. for source_id, items_dict in merged_items.items():
  234. final_items[source_id] = list(items_dict.values())
  235. # 合并 id_to_name
  236. merged_id_to_name = {**self.id_to_name, **other.id_to_name}
  237. # 合并 failed_ids(去重)
  238. merged_failed_ids = list(set(self.failed_ids + other.failed_ids))
  239. return NewsData(
  240. date=self.date or other.date,
  241. crawl_time=other.crawl_time, # 使用较新的抓取时间
  242. items=final_items,
  243. id_to_name=merged_id_to_name,
  244. failed_ids=merged_failed_ids,
  245. )
  246. class StorageBackend(ABC):
  247. """
  248. 存储后端抽象基类
  249. 所有存储后端都需要实现这些方法,以支持:
  250. - 保存新闻数据
  251. - 读取当天所有数据
  252. - 检测新增新闻
  253. - 生成报告文件(TXT/HTML)
  254. """
  255. @abstractmethod
  256. def save_news_data(self, data: NewsData) -> bool:
  257. """
  258. 保存新闻数据
  259. Args:
  260. data: 新闻数据
  261. Returns:
  262. 是否保存成功
  263. """
  264. pass
  265. @abstractmethod
  266. def get_today_all_data(self, date: Optional[str] = None) -> Optional[NewsData]:
  267. """
  268. 获取指定日期的所有新闻数据
  269. Args:
  270. date: 日期字符串(YYYY-MM-DD),默认为今天
  271. Returns:
  272. 合并后的新闻数据,如果没有数据返回 None
  273. """
  274. pass
  275. @abstractmethod
  276. def get_latest_crawl_data(self, date: Optional[str] = None) -> Optional[NewsData]:
  277. """
  278. 获取最新一次抓取的数据
  279. Args:
  280. date: 日期字符串,默认为今天
  281. Returns:
  282. 最新抓取的新闻数据
  283. """
  284. pass
  285. @abstractmethod
  286. def detect_new_titles(self, current_data: NewsData) -> Dict[str, Dict]:
  287. """
  288. 检测新增的标题
  289. Args:
  290. current_data: 当前抓取的数据
  291. Returns:
  292. 新增的标题数据,格式: {source_id: {title: title_data}}
  293. """
  294. pass
  295. @abstractmethod
  296. def save_txt_snapshot(self, data: NewsData) -> Optional[str]:
  297. """
  298. 保存 TXT 快照(可选功能,本地环境可用)
  299. Args:
  300. data: 新闻数据
  301. Returns:
  302. 保存的文件路径,如果不支持返回 None
  303. """
  304. pass
  305. @abstractmethod
  306. def save_html_report(self, html_content: str, filename: str, is_summary: bool = False) -> Optional[str]:
  307. """
  308. 保存 HTML 报告
  309. Args:
  310. html_content: HTML 内容
  311. filename: 文件名
  312. is_summary: 是否为汇总报告
  313. Returns:
  314. 保存的文件路径
  315. """
  316. pass
  317. @abstractmethod
  318. def is_first_crawl_today(self, date: Optional[str] = None) -> bool:
  319. """
  320. 检查是否是当天第一次抓取
  321. Args:
  322. date: 日期字符串,默认为今天
  323. Returns:
  324. 是否是第一次抓取
  325. """
  326. pass
  327. @abstractmethod
  328. def cleanup(self) -> None:
  329. """
  330. 清理资源(如临时文件、数据库连接等)
  331. """
  332. pass
  333. @abstractmethod
  334. def cleanup_old_data(self, retention_days: int) -> int:
  335. """
  336. 清理过期数据
  337. Args:
  338. retention_days: 保留天数(0 表示不清理)
  339. Returns:
  340. 删除的日期目录数量
  341. """
  342. pass
  343. @property
  344. @abstractmethod
  345. def backend_name(self) -> str:
  346. """
  347. 存储后端名称
  348. """
  349. pass
  350. @property
  351. @abstractmethod
  352. def supports_txt(self) -> bool:
  353. """
  354. 是否支持生成 TXT 快照
  355. """
  356. pass
  357. # === 推送记录相关方法 ===
  358. @abstractmethod
  359. def has_pushed_today(self, date: Optional[str] = None) -> bool:
  360. """
  361. 检查指定日期是否已推送过
  362. Args:
  363. date: 日期字符串(YYYY-MM-DD),默认为今天
  364. Returns:
  365. 是否已推送
  366. """
  367. pass
  368. @abstractmethod
  369. def record_push(self, report_type: str, date: Optional[str] = None) -> bool:
  370. """
  371. 记录推送
  372. Args:
  373. report_type: 报告类型
  374. date: 日期字符串(YYYY-MM-DD),默认为今天
  375. Returns:
  376. 是否记录成功
  377. """
  378. pass
  379. def convert_crawl_results_to_news_data(
  380. results: Dict[str, Dict],
  381. id_to_name: Dict[str, str],
  382. failed_ids: List[str],
  383. crawl_time: str,
  384. crawl_date: str,
  385. ) -> NewsData:
  386. """
  387. 将爬虫结果转换为 NewsData 格式
  388. Args:
  389. results: 爬虫返回的结果 {source_id: {title: {ranks: [], url: "", mobileUrl: ""}}}
  390. id_to_name: 来源ID到名称的映射
  391. failed_ids: 失败的来源ID
  392. crawl_time: 抓取时间(HH:MM)
  393. crawl_date: 抓取日期(YYYY-MM-DD)
  394. Returns:
  395. NewsData 对象
  396. """
  397. items = {}
  398. for source_id, titles_data in results.items():
  399. source_name = id_to_name.get(source_id, source_id)
  400. news_list = []
  401. for title, data in titles_data.items():
  402. if isinstance(data, dict):
  403. ranks = data.get("ranks", [])
  404. url = data.get("url", "")
  405. mobile_url = data.get("mobileUrl", "")
  406. else:
  407. # 兼容旧格式
  408. ranks = data if isinstance(data, list) else []
  409. url = ""
  410. mobile_url = ""
  411. rank = ranks[0] if ranks else 99
  412. news_item = NewsItem(
  413. title=title,
  414. source_id=source_id,
  415. source_name=source_name,
  416. rank=rank,
  417. url=url,
  418. mobile_url=mobile_url,
  419. crawl_time=crawl_time,
  420. ranks=ranks,
  421. first_time=crawl_time,
  422. last_time=crawl_time,
  423. count=1,
  424. )
  425. news_list.append(news_item)
  426. items[source_id] = news_list
  427. return NewsData(
  428. date=crawl_date,
  429. crawl_time=crawl_time,
  430. items=items,
  431. id_to_name=id_to_name,
  432. failed_ids=failed_ids,
  433. )
  434. def convert_news_data_to_results(data: NewsData) -> tuple:
  435. """
  436. 将 NewsData 转换回原有的 results 格式(用于兼容现有代码)
  437. Args:
  438. data: NewsData 对象
  439. Returns:
  440. (results, id_to_name, title_info) 元组
  441. """
  442. results = {}
  443. title_info = {}
  444. for source_id, news_list in data.items.items():
  445. results[source_id] = {}
  446. title_info[source_id] = {}
  447. for item in news_list:
  448. results[source_id][item.title] = {
  449. "ranks": item.ranks,
  450. "url": item.url,
  451. "mobileUrl": item.mobile_url,
  452. }
  453. title_info[source_id][item.title] = {
  454. "first_time": item.first_time,
  455. "last_time": item.last_time,
  456. "count": item.count,
  457. "ranks": item.ranks,
  458. "url": item.url,
  459. "mobileUrl": item.mobile_url,
  460. }
  461. return results, data.id_to_name, title_info