base.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566
  1. # coding=utf-8
  2. """
  3. 存储后端抽象基类和数据模型
  4. 定义统一的存储接口,所有存储后端都需要实现这些方法
  5. """
  6. from abc import ABC, abstractmethod
  7. from dataclasses import dataclass, field
  8. from typing import Dict, List, Optional, Any
  9. @dataclass
  10. class NewsItem:
  11. """新闻条目数据模型(热榜数据)"""
  12. title: str # 新闻标题
  13. source_id: str # 来源平台ID(如 toutiao, baidu)
  14. source_name: str = "" # 来源平台名称(运行时使用,数据库不存储)
  15. rank: int = 0 # 排名
  16. url: str = "" # 链接 URL
  17. mobile_url: str = "" # 移动端 URL
  18. crawl_time: str = "" # 抓取时间(HH:MM 格式)
  19. # 统计信息(用于分析)
  20. ranks: List[int] = field(default_factory=list) # 历史排名列表
  21. first_time: str = "" # 首次出现时间
  22. last_time: str = "" # 最后出现时间
  23. count: int = 1 # 出现次数
  24. rank_timeline: List[Dict[str, Any]] = field(default_factory=list) # 完整排名时间线
  25. # 格式: [{"time": "09:30", "rank": 1}, {"time": "10:00", "rank": 2}, ...]
  26. # None 表示脱榜: [{"time": "11:00", "rank": None}]
  27. def to_dict(self) -> Dict[str, Any]:
  28. """转换为字典"""
  29. return {
  30. "title": self.title,
  31. "source_id": self.source_id,
  32. "source_name": self.source_name,
  33. "rank": self.rank,
  34. "url": self.url,
  35. "mobile_url": self.mobile_url,
  36. "crawl_time": self.crawl_time,
  37. "ranks": self.ranks,
  38. "first_time": self.first_time,
  39. "last_time": self.last_time,
  40. "count": self.count,
  41. "rank_timeline": self.rank_timeline,
  42. }
  43. @classmethod
  44. def from_dict(cls, data: Dict[str, Any]) -> "NewsItem":
  45. """从字典创建"""
  46. return cls(
  47. title=data.get("title", ""),
  48. source_id=data.get("source_id", ""),
  49. source_name=data.get("source_name", ""),
  50. rank=data.get("rank", 0),
  51. url=data.get("url", ""),
  52. mobile_url=data.get("mobile_url", ""),
  53. crawl_time=data.get("crawl_time", ""),
  54. ranks=data.get("ranks", []),
  55. first_time=data.get("first_time", ""),
  56. last_time=data.get("last_time", ""),
  57. count=data.get("count", 1),
  58. rank_timeline=data.get("rank_timeline", []),
  59. )
  60. @dataclass
  61. class RSSItem:
  62. """RSS 条目数据模型"""
  63. title: str # 标题
  64. feed_id: str # RSS 源 ID(如 "hacker-news")
  65. feed_name: str = "" # RSS 源名称(运行时使用)
  66. url: str = "" # 文章链接
  67. published_at: str = "" # RSS 发布时间(ISO 格式)
  68. summary: str = "" # 摘要/描述
  69. author: str = "" # 作者
  70. crawl_time: str = "" # 抓取时间(HH:MM 格式)
  71. # 统计信息
  72. first_time: str = "" # 首次抓取时间
  73. last_time: str = "" # 最后抓取时间
  74. count: int = 1 # 抓取次数
  75. def to_dict(self) -> Dict[str, Any]:
  76. """转换为字典"""
  77. return {
  78. "title": self.title,
  79. "feed_id": self.feed_id,
  80. "feed_name": self.feed_name,
  81. "url": self.url,
  82. "published_at": self.published_at,
  83. "summary": self.summary,
  84. "author": self.author,
  85. "crawl_time": self.crawl_time,
  86. "first_time": self.first_time,
  87. "last_time": self.last_time,
  88. "count": self.count,
  89. }
  90. @classmethod
  91. def from_dict(cls, data: Dict[str, Any]) -> "RSSItem":
  92. """从字典创建"""
  93. return cls(
  94. title=data.get("title", ""),
  95. feed_id=data.get("feed_id", ""),
  96. feed_name=data.get("feed_name", ""),
  97. url=data.get("url", ""),
  98. published_at=data.get("published_at", ""),
  99. summary=data.get("summary", ""),
  100. author=data.get("author", ""),
  101. crawl_time=data.get("crawl_time", ""),
  102. first_time=data.get("first_time", ""),
  103. last_time=data.get("last_time", ""),
  104. count=data.get("count", 1),
  105. )
  106. @dataclass
  107. class RSSData:
  108. """
  109. RSS 数据集合
  110. 结构:
  111. - date: 日期(YYYY-MM-DD)
  112. - crawl_time: 抓取时间(HH:MM)
  113. - items: 按 feed_id 分组的 RSS 条目
  114. - id_to_name: feed_id 到名称的映射
  115. - failed_ids: 失败的 feed_id 列表
  116. """
  117. date: str # 日期
  118. crawl_time: str # 抓取时间
  119. items: Dict[str, List[RSSItem]] # 按 feed_id 分组的条目
  120. id_to_name: Dict[str, str] = field(default_factory=dict) # ID到名称映射
  121. failed_ids: List[str] = field(default_factory=list) # 失败的ID
  122. def to_dict(self) -> Dict[str, Any]:
  123. """转换为字典"""
  124. items_dict = {}
  125. for feed_id, rss_list in self.items.items():
  126. items_dict[feed_id] = [item.to_dict() for item in rss_list]
  127. return {
  128. "date": self.date,
  129. "crawl_time": self.crawl_time,
  130. "items": items_dict,
  131. "id_to_name": self.id_to_name,
  132. "failed_ids": self.failed_ids,
  133. }
  134. @classmethod
  135. def from_dict(cls, data: Dict[str, Any]) -> "RSSData":
  136. """从字典创建"""
  137. items = {}
  138. items_data = data.get("items", {})
  139. for feed_id, rss_list in items_data.items():
  140. items[feed_id] = [RSSItem.from_dict(item) for item in rss_list]
  141. return cls(
  142. date=data.get("date", ""),
  143. crawl_time=data.get("crawl_time", ""),
  144. items=items,
  145. id_to_name=data.get("id_to_name", {}),
  146. failed_ids=data.get("failed_ids", []),
  147. )
  148. def get_total_count(self) -> int:
  149. """获取条目总数"""
  150. return sum(len(rss_list) for rss_list in self.items.values())
  151. @dataclass
  152. class NewsData:
  153. """
  154. 新闻数据集合
  155. 结构:
  156. - date: 日期(YYYY-MM-DD)
  157. - crawl_time: 抓取时间(HH时MM分)
  158. - items: 按来源ID分组的新闻条目
  159. - id_to_name: 来源ID到名称的映射
  160. - failed_ids: 失败的来源ID列表
  161. """
  162. date: str # 日期
  163. crawl_time: str # 抓取时间
  164. items: Dict[str, List[NewsItem]] # 按来源分组的新闻
  165. id_to_name: Dict[str, str] = field(default_factory=dict) # ID到名称映射
  166. failed_ids: List[str] = field(default_factory=list) # 失败的ID
  167. def to_dict(self) -> Dict[str, Any]:
  168. """转换为字典"""
  169. items_dict = {}
  170. for source_id, news_list in self.items.items():
  171. items_dict[source_id] = [item.to_dict() for item in news_list]
  172. return {
  173. "date": self.date,
  174. "crawl_time": self.crawl_time,
  175. "items": items_dict,
  176. "id_to_name": self.id_to_name,
  177. "failed_ids": self.failed_ids,
  178. }
  179. @classmethod
  180. def from_dict(cls, data: Dict[str, Any]) -> "NewsData":
  181. """从字典创建"""
  182. items = {}
  183. items_data = data.get("items", {})
  184. for source_id, news_list in items_data.items():
  185. items[source_id] = [NewsItem.from_dict(item) for item in news_list]
  186. return cls(
  187. date=data.get("date", ""),
  188. crawl_time=data.get("crawl_time", ""),
  189. items=items,
  190. id_to_name=data.get("id_to_name", {}),
  191. failed_ids=data.get("failed_ids", []),
  192. )
  193. def get_total_count(self) -> int:
  194. """获取新闻总数"""
  195. return sum(len(news_list) for news_list in self.items.values())
  196. def merge_with(self, other: "NewsData") -> "NewsData":
  197. """
  198. 合并另一个 NewsData 到当前数据
  199. 合并规则:
  200. - 相同 source_id + title 的新闻合并排名历史
  201. - 更新 last_time 和 count
  202. - 保留较早的 first_time
  203. """
  204. merged_items = {}
  205. # 复制当前数据
  206. for source_id, news_list in self.items.items():
  207. merged_items[source_id] = {item.title: item for item in news_list}
  208. # 合并其他数据
  209. for source_id, news_list in other.items.items():
  210. if source_id not in merged_items:
  211. merged_items[source_id] = {}
  212. for item in news_list:
  213. if item.title in merged_items[source_id]:
  214. # 合并已存在的新闻
  215. existing = merged_items[source_id][item.title]
  216. # 合并排名
  217. existing_ranks = set(existing.ranks) if existing.ranks else set()
  218. new_ranks = set(item.ranks) if item.ranks else set()
  219. merged_ranks = sorted(existing_ranks | new_ranks)
  220. existing.ranks = merged_ranks
  221. # 更新时间
  222. if item.first_time and (not existing.first_time or item.first_time < existing.first_time):
  223. existing.first_time = item.first_time
  224. if item.last_time and (not existing.last_time or item.last_time > existing.last_time):
  225. existing.last_time = item.last_time
  226. # 更新计数
  227. existing.count += 1
  228. # 保留URL(如果原来没有)
  229. if not existing.url and item.url:
  230. existing.url = item.url
  231. if not existing.mobile_url and item.mobile_url:
  232. existing.mobile_url = item.mobile_url
  233. else:
  234. # 添加新新闻
  235. merged_items[source_id][item.title] = item
  236. # 转换回列表格式
  237. final_items = {}
  238. for source_id, items_dict in merged_items.items():
  239. final_items[source_id] = list(items_dict.values())
  240. # 合并 id_to_name
  241. merged_id_to_name = {**self.id_to_name, **other.id_to_name}
  242. # 合并 failed_ids(去重)
  243. merged_failed_ids = list(set(self.failed_ids + other.failed_ids))
  244. return NewsData(
  245. date=self.date or other.date,
  246. crawl_time=other.crawl_time, # 使用较新的抓取时间
  247. items=final_items,
  248. id_to_name=merged_id_to_name,
  249. failed_ids=merged_failed_ids,
  250. )
  251. class StorageBackend(ABC):
  252. """
  253. 存储后端抽象基类
  254. 所有存储后端都需要实现这些方法,以支持:
  255. - 保存新闻数据
  256. - 读取当天所有数据
  257. - 检测新增新闻
  258. - 生成报告文件(TXT/HTML)
  259. """
  260. @abstractmethod
  261. def save_news_data(self, data: NewsData) -> bool:
  262. """
  263. 保存新闻数据
  264. Args:
  265. data: 新闻数据
  266. Returns:
  267. 是否保存成功
  268. """
  269. pass
  270. @abstractmethod
  271. def get_today_all_data(self, date: Optional[str] = None) -> Optional[NewsData]:
  272. """
  273. 获取指定日期的所有新闻数据
  274. Args:
  275. date: 日期字符串(YYYY-MM-DD),默认为今天
  276. Returns:
  277. 合并后的新闻数据,如果没有数据返回 None
  278. """
  279. pass
  280. @abstractmethod
  281. def get_latest_crawl_data(self, date: Optional[str] = None) -> Optional[NewsData]:
  282. """
  283. 获取最新一次抓取的数据
  284. Args:
  285. date: 日期字符串,默认为今天
  286. Returns:
  287. 最新抓取的新闻数据
  288. """
  289. pass
  290. @abstractmethod
  291. def detect_new_titles(self, current_data: NewsData) -> Dict[str, Dict]:
  292. """
  293. 检测新增的标题
  294. Args:
  295. current_data: 当前抓取的数据
  296. Returns:
  297. 新增的标题数据,格式: {source_id: {title: title_data}}
  298. """
  299. pass
  300. @abstractmethod
  301. def save_txt_snapshot(self, data: NewsData) -> Optional[str]:
  302. """
  303. 保存 TXT 快照(可选功能,本地环境可用)
  304. Args:
  305. data: 新闻数据
  306. Returns:
  307. 保存的文件路径,如果不支持返回 None
  308. """
  309. pass
  310. @abstractmethod
  311. def save_html_report(self, html_content: str, filename: str, is_summary: bool = False) -> Optional[str]:
  312. """
  313. 保存 HTML 报告
  314. Args:
  315. html_content: HTML 内容
  316. filename: 文件名
  317. is_summary: 是否为汇总报告
  318. Returns:
  319. 保存的文件路径
  320. """
  321. pass
  322. @abstractmethod
  323. def is_first_crawl_today(self, date: Optional[str] = None) -> bool:
  324. """
  325. 检查是否是当天第一次抓取
  326. Args:
  327. date: 日期字符串,默认为今天
  328. Returns:
  329. 是否是第一次抓取
  330. """
  331. pass
  332. @abstractmethod
  333. def cleanup(self) -> None:
  334. """
  335. 清理资源(如临时文件、数据库连接等)
  336. """
  337. pass
  338. @abstractmethod
  339. def cleanup_old_data(self, retention_days: int) -> int:
  340. """
  341. 清理过期数据
  342. Args:
  343. retention_days: 保留天数(0 表示不清理)
  344. Returns:
  345. 删除的日期目录数量
  346. """
  347. pass
  348. @property
  349. @abstractmethod
  350. def backend_name(self) -> str:
  351. """
  352. 存储后端名称
  353. """
  354. pass
  355. @property
  356. @abstractmethod
  357. def supports_txt(self) -> bool:
  358. """
  359. 是否支持生成 TXT 快照
  360. """
  361. pass
  362. # === 推送记录相关方法 ===
  363. @abstractmethod
  364. def has_pushed_today(self, date: Optional[str] = None) -> bool:
  365. """
  366. 检查指定日期是否已推送过
  367. Args:
  368. date: 日期字符串(YYYY-MM-DD),默认为今天
  369. Returns:
  370. 是否已推送
  371. """
  372. pass
  373. @abstractmethod
  374. def record_push(self, report_type: str, date: Optional[str] = None) -> bool:
  375. """
  376. 记录推送
  377. Args:
  378. report_type: 报告类型
  379. date: 日期字符串(YYYY-MM-DD),默认为今天
  380. Returns:
  381. 是否记录成功
  382. """
  383. pass
  384. def convert_crawl_results_to_news_data(
  385. results: Dict[str, Dict],
  386. id_to_name: Dict[str, str],
  387. failed_ids: List[str],
  388. crawl_time: str,
  389. crawl_date: str,
  390. ) -> NewsData:
  391. """
  392. 将爬虫结果转换为 NewsData 格式
  393. Args:
  394. results: 爬虫返回的结果 {source_id: {title: {ranks: [], url: "", mobileUrl: ""}}}
  395. id_to_name: 来源ID到名称的映射
  396. failed_ids: 失败的来源ID
  397. crawl_time: 抓取时间(HH:MM)
  398. crawl_date: 抓取日期(YYYY-MM-DD)
  399. Returns:
  400. NewsData 对象
  401. """
  402. items = {}
  403. for source_id, titles_data in results.items():
  404. source_name = id_to_name.get(source_id, source_id)
  405. news_list = []
  406. for title, data in titles_data.items():
  407. if isinstance(data, dict):
  408. ranks = data.get("ranks", [])
  409. url = data.get("url", "")
  410. mobile_url = data.get("mobileUrl", "")
  411. else:
  412. # 兼容旧格式
  413. ranks = data if isinstance(data, list) else []
  414. url = ""
  415. mobile_url = ""
  416. rank = ranks[0] if ranks else 99
  417. news_item = NewsItem(
  418. title=title,
  419. source_id=source_id,
  420. source_name=source_name,
  421. rank=rank,
  422. url=url,
  423. mobile_url=mobile_url,
  424. crawl_time=crawl_time,
  425. ranks=ranks,
  426. first_time=crawl_time,
  427. last_time=crawl_time,
  428. count=1,
  429. )
  430. news_list.append(news_item)
  431. items[source_id] = news_list
  432. return NewsData(
  433. date=crawl_date,
  434. crawl_time=crawl_time,
  435. items=items,
  436. id_to_name=id_to_name,
  437. failed_ids=failed_ids,
  438. )
  439. def convert_news_data_to_results(data: NewsData) -> tuple:
  440. """
  441. 将 NewsData 转换回原有的 results 格式(用于兼容现有代码)
  442. Args:
  443. data: NewsData 对象
  444. Returns:
  445. (results, id_to_name, title_info) 元组
  446. """
  447. results = {}
  448. title_info = {}
  449. for source_id, news_list in data.items.items():
  450. results[source_id] = {}
  451. title_info[source_id] = {}
  452. for item in news_list:
  453. results[source_id][item.title] = {
  454. "ranks": item.ranks,
  455. "url": item.url,
  456. "mobileUrl": item.mobile_url,
  457. }
  458. title_info[source_id][item.title] = {
  459. "first_time": item.first_time,
  460. "last_time": item.last_time,
  461. "count": item.count,
  462. "ranks": item.ranks,
  463. "url": item.url,
  464. "mobileUrl": item.mobile_url,
  465. }
  466. return results, data.id_to_name, title_info