base.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586
  1. # coding=utf-8
  2. """
  3. 存储后端抽象基类和数据模型
  4. 定义统一的存储接口,所有存储后端都需要实现这些方法
  5. """
  6. from abc import ABC, abstractmethod
  7. from dataclasses import dataclass, field
  8. from typing import Dict, List, Optional, Any, Set
  9. @dataclass
  10. class NewsItem:
  11. """新闻条目数据模型(热榜数据)"""
  12. title: str # 新闻标题
  13. source_id: str # 来源平台ID(如 toutiao, baidu)
  14. source_name: str = "" # 来源平台名称(运行时使用,数据库不存储)
  15. rank: int = 0 # 排名
  16. url: str = "" # 链接 URL
  17. mobile_url: str = "" # 移动端 URL
  18. crawl_time: str = "" # 抓取时间(HH:MM 格式)
  19. # 统计信息(用于分析)
  20. ranks: List[int] = field(default_factory=list) # 历史排名列表
  21. first_time: str = "" # 首次出现时间
  22. last_time: str = "" # 最后出现时间
  23. count: int = 1 # 出现次数
  24. rank_timeline: List[Dict[str, Any]] = field(default_factory=list) # 完整排名时间线
  25. # 格式: [{"time": "09:30", "rank": 1}, {"time": "10:00", "rank": 2}, ...]
  26. # None 表示脱榜: [{"time": "11:00", "rank": None}]
  27. def to_dict(self) -> Dict[str, Any]:
  28. """转换为字典"""
  29. return {
  30. "title": self.title,
  31. "source_id": self.source_id,
  32. "source_name": self.source_name,
  33. "rank": self.rank,
  34. "url": self.url,
  35. "mobile_url": self.mobile_url,
  36. "crawl_time": self.crawl_time,
  37. "ranks": self.ranks,
  38. "first_time": self.first_time,
  39. "last_time": self.last_time,
  40. "count": self.count,
  41. "rank_timeline": self.rank_timeline,
  42. }
  43. @classmethod
  44. def from_dict(cls, data: Dict[str, Any]) -> "NewsItem":
  45. """从字典创建"""
  46. return cls(
  47. title=data.get("title", ""),
  48. source_id=data.get("source_id", ""),
  49. source_name=data.get("source_name", ""),
  50. rank=data.get("rank", 0),
  51. url=data.get("url", ""),
  52. mobile_url=data.get("mobile_url", ""),
  53. crawl_time=data.get("crawl_time", ""),
  54. ranks=data.get("ranks", []),
  55. first_time=data.get("first_time", ""),
  56. last_time=data.get("last_time", ""),
  57. count=data.get("count", 1),
  58. rank_timeline=data.get("rank_timeline", []),
  59. )
  60. @dataclass
  61. class RSSItem:
  62. """RSS 条目数据模型"""
  63. title: str # 标题
  64. feed_id: str # RSS 源 ID(如 "hacker-news")
  65. feed_name: str = "" # RSS 源名称(运行时使用)
  66. url: str = "" # 文章链接
  67. guid: str = "" # GUID/ID(RSS guid 或 Atom id)
  68. published_at: str = "" # RSS 发布时间(ISO 格式)
  69. summary: str = "" # 摘要/描述
  70. author: str = "" # 作者
  71. crawl_time: str = "" # 抓取时间(HH:MM 格式)
  72. # 统计信息
  73. first_time: str = "" # 首次抓取时间
  74. last_time: str = "" # 最后抓取时间
  75. count: int = 1 # 抓取次数
  76. def to_dict(self) -> Dict[str, Any]:
  77. """转换为字典"""
  78. return {
  79. "title": self.title,
  80. "feed_id": self.feed_id,
  81. "feed_name": self.feed_name,
  82. "url": self.url,
  83. "published_at": self.published_at,
  84. "summary": self.summary,
  85. "author": self.author,
  86. "crawl_time": self.crawl_time,
  87. "first_time": self.first_time,
  88. "last_time": self.last_time,
  89. "count": self.count,
  90. }
  91. @classmethod
  92. def from_dict(cls, data: Dict[str, Any]) -> "RSSItem":
  93. """从字典创建"""
  94. return cls(
  95. title=data.get("title", ""),
  96. feed_id=data.get("feed_id", ""),
  97. feed_name=data.get("feed_name", ""),
  98. url=data.get("url", ""),
  99. published_at=data.get("published_at", ""),
  100. summary=data.get("summary", ""),
  101. author=data.get("author", ""),
  102. crawl_time=data.get("crawl_time", ""),
  103. first_time=data.get("first_time", ""),
  104. last_time=data.get("last_time", ""),
  105. count=data.get("count", 1),
  106. )
  107. @dataclass
  108. class RSSData:
  109. """
  110. RSS 数据集合
  111. 结构:
  112. - date: 日期(YYYY-MM-DD)
  113. - crawl_time: 抓取时间(HH:MM)
  114. - items: 按 feed_id 分组的 RSS 条目
  115. - id_to_name: feed_id 到名称的映射
  116. - failed_ids: 失败的 feed_id 列表
  117. """
  118. date: str # 日期
  119. crawl_time: str # 抓取时间
  120. items: Dict[str, List[RSSItem]] # 按 feed_id 分组的条目
  121. id_to_name: Dict[str, str] = field(default_factory=dict) # ID到名称映射
  122. failed_ids: List[str] = field(default_factory=list) # 失败的ID
  123. def to_dict(self) -> Dict[str, Any]:
  124. """转换为字典"""
  125. items_dict = {}
  126. for feed_id, rss_list in self.items.items():
  127. items_dict[feed_id] = [item.to_dict() for item in rss_list]
  128. return {
  129. "date": self.date,
  130. "crawl_time": self.crawl_time,
  131. "items": items_dict,
  132. "id_to_name": self.id_to_name,
  133. "failed_ids": self.failed_ids,
  134. }
  135. @classmethod
  136. def from_dict(cls, data: Dict[str, Any]) -> "RSSData":
  137. """从字典创建"""
  138. items = {}
  139. items_data = data.get("items", {})
  140. for feed_id, rss_list in items_data.items():
  141. items[feed_id] = [RSSItem.from_dict(item) for item in rss_list]
  142. return cls(
  143. date=data.get("date", ""),
  144. crawl_time=data.get("crawl_time", ""),
  145. items=items,
  146. id_to_name=data.get("id_to_name", {}),
  147. failed_ids=data.get("failed_ids", []),
  148. )
  149. def get_total_count(self) -> int:
  150. """获取条目总数"""
  151. return sum(len(rss_list) for rss_list in self.items.values())
  152. @dataclass
  153. class NewsData:
  154. """
  155. 新闻数据集合
  156. 结构:
  157. - date: 日期(YYYY-MM-DD)
  158. - crawl_time: 抓取时间(HH时MM分)
  159. - items: 按来源ID分组的新闻条目
  160. - id_to_name: 来源ID到名称的映射
  161. - failed_ids: 失败的来源ID列表
  162. """
  163. date: str # 日期
  164. crawl_time: str # 抓取时间
  165. items: Dict[str, List[NewsItem]] # 按来源分组的新闻
  166. id_to_name: Dict[str, str] = field(default_factory=dict) # ID到名称映射
  167. failed_ids: List[str] = field(default_factory=list) # 失败的ID
  168. def to_dict(self) -> Dict[str, Any]:
  169. """转换为字典"""
  170. items_dict = {}
  171. for source_id, news_list in self.items.items():
  172. items_dict[source_id] = [item.to_dict() for item in news_list]
  173. return {
  174. "date": self.date,
  175. "crawl_time": self.crawl_time,
  176. "items": items_dict,
  177. "id_to_name": self.id_to_name,
  178. "failed_ids": self.failed_ids,
  179. }
  180. @classmethod
  181. def from_dict(cls, data: Dict[str, Any]) -> "NewsData":
  182. """从字典创建"""
  183. items = {}
  184. items_data = data.get("items", {})
  185. for source_id, news_list in items_data.items():
  186. items[source_id] = [NewsItem.from_dict(item) for item in news_list]
  187. return cls(
  188. date=data.get("date", ""),
  189. crawl_time=data.get("crawl_time", ""),
  190. items=items,
  191. id_to_name=data.get("id_to_name", {}),
  192. failed_ids=data.get("failed_ids", []),
  193. )
  194. def get_total_count(self) -> int:
  195. """获取新闻总数"""
  196. return sum(len(news_list) for news_list in self.items.values())
  197. def merge_with(self, other: "NewsData") -> "NewsData":
  198. """
  199. 合并另一个 NewsData 到当前数据
  200. 合并规则:
  201. - 相同 source_id + title 的新闻合并排名历史
  202. - 更新 last_time 和 count
  203. - 保留较早的 first_time
  204. """
  205. merged_items = {}
  206. # 复制当前数据
  207. for source_id, news_list in self.items.items():
  208. merged_items[source_id] = {item.title: item for item in news_list}
  209. # 合并其他数据
  210. for source_id, news_list in other.items.items():
  211. if source_id not in merged_items:
  212. merged_items[source_id] = {}
  213. for item in news_list:
  214. if item.title in merged_items[source_id]:
  215. # 合并已存在的新闻
  216. existing = merged_items[source_id][item.title]
  217. # 合并排名
  218. existing_ranks = set(existing.ranks) if existing.ranks else set()
  219. new_ranks = set(item.ranks) if item.ranks else set()
  220. merged_ranks = sorted(existing_ranks | new_ranks)
  221. existing.ranks = merged_ranks
  222. # 更新时间
  223. if item.first_time and (not existing.first_time or item.first_time < existing.first_time):
  224. existing.first_time = item.first_time
  225. if item.last_time and (not existing.last_time or item.last_time > existing.last_time):
  226. existing.last_time = item.last_time
  227. # 更新计数
  228. existing.count += 1
  229. # 保留URL(如果原来没有)
  230. if not existing.url and item.url:
  231. existing.url = item.url
  232. if not existing.mobile_url and item.mobile_url:
  233. existing.mobile_url = item.mobile_url
  234. else:
  235. # 添加新新闻
  236. merged_items[source_id][item.title] = item
  237. # 转换回列表格式
  238. final_items = {}
  239. for source_id, items_dict in merged_items.items():
  240. final_items[source_id] = list(items_dict.values())
  241. # 合并 id_to_name
  242. merged_id_to_name = {**self.id_to_name, **other.id_to_name}
  243. # 合并 failed_ids(去重)
  244. merged_failed_ids = list(set(self.failed_ids + other.failed_ids))
  245. return NewsData(
  246. date=self.date or other.date,
  247. crawl_time=other.crawl_time, # 使用较新的抓取时间
  248. items=final_items,
  249. id_to_name=merged_id_to_name,
  250. failed_ids=merged_failed_ids,
  251. )
  252. class StorageBackend(ABC):
  253. """
  254. 存储后端抽象基类
  255. 所有存储后端都需要实现这些方法,以支持:
  256. - 保存新闻数据
  257. - 读取当天所有数据
  258. - 检测新增新闻
  259. - 生成报告文件(TXT/HTML)
  260. """
  261. @abstractmethod
  262. def save_news_data(self, data: NewsData) -> bool:
  263. """
  264. 保存新闻数据
  265. Args:
  266. data: 新闻数据
  267. Returns:
  268. 是否保存成功
  269. """
  270. pass
  271. @abstractmethod
  272. def get_today_all_data(self, date: Optional[str] = None) -> Optional[NewsData]:
  273. """
  274. 获取指定日期的所有新闻数据
  275. Args:
  276. date: 日期字符串(YYYY-MM-DD),默认为今天
  277. Returns:
  278. 合并后的新闻数据,如果没有数据返回 None
  279. """
  280. pass
  281. @abstractmethod
  282. def get_latest_crawl_data(self, date: Optional[str] = None) -> Optional[NewsData]:
  283. """
  284. 获取最新一次抓取的数据
  285. Args:
  286. date: 日期字符串,默认为今天
  287. Returns:
  288. 最新抓取的新闻数据
  289. """
  290. pass
  291. @abstractmethod
  292. def detect_new_titles(self, current_data: NewsData) -> Dict[str, Dict]:
  293. """
  294. 检测新增的标题
  295. Args:
  296. current_data: 当前抓取的数据
  297. Returns:
  298. 新增的标题数据,格式: {source_id: {title: title_data}}
  299. """
  300. pass
  301. @abstractmethod
  302. def save_txt_snapshot(self, data: NewsData) -> Optional[str]:
  303. """
  304. 保存 TXT 快照(可选功能,本地环境可用)
  305. Args:
  306. data: 新闻数据
  307. Returns:
  308. 保存的文件路径,如果不支持返回 None
  309. """
  310. pass
  311. @abstractmethod
  312. def save_html_report(self, html_content: str, filename: str) -> Optional[str]:
  313. """
  314. 保存 HTML 报告
  315. Args:
  316. html_content: HTML 内容
  317. filename: 文件名
  318. Returns:
  319. 保存的文件路径
  320. """
  321. pass
  322. @abstractmethod
  323. def is_first_crawl_today(self, date: Optional[str] = None) -> bool:
  324. """
  325. 检查是否是当天第一次抓取
  326. Args:
  327. date: 日期字符串,默认为今天
  328. Returns:
  329. 是否是第一次抓取
  330. """
  331. pass
  332. @abstractmethod
  333. def cleanup(self) -> None:
  334. """
  335. 清理资源(如临时文件、数据库连接等)
  336. """
  337. pass
  338. @abstractmethod
  339. def cleanup_old_data(self, retention_days: int) -> int:
  340. """
  341. 清理过期数据
  342. Args:
  343. retention_days: 保留天数(0 表示不清理)
  344. Returns:
  345. 删除的日期目录数量
  346. """
  347. pass
  348. @property
  349. @abstractmethod
  350. def backend_name(self) -> str:
  351. """
  352. 存储后端名称
  353. """
  354. pass
  355. @property
  356. @abstractmethod
  357. def supports_txt(self) -> bool:
  358. """
  359. 是否支持生成 TXT 快照
  360. """
  361. pass
  362. # === 时间段执行记录(调度系统)===
  363. def has_period_executed(self, date_str: str, period_key: str, action: str) -> bool:
  364. """
  365. 检查指定时间段的某个 action 是否已执行
  366. Args:
  367. date_str: 日期字符串 YYYY-MM-DD
  368. period_key: 时间段 key
  369. action: 动作类型 (analyze / push)
  370. Returns:
  371. 是否已执行
  372. """
  373. return False
  374. def record_period_execution(self, date_str: str, period_key: str, action: str) -> bool:
  375. """
  376. 记录时间段的 action 执行
  377. Args:
  378. date_str: 日期字符串 YYYY-MM-DD
  379. period_key: 时间段 key
  380. action: 动作类型 (analyze / push)
  381. Returns:
  382. 是否记录成功
  383. """
  384. return False
  385. # === AI 智能筛选(默认实现,子类通过 mixin 覆盖) ===
  386. def begin_batch(self) -> None:
  387. """开启批量模式(远程后端延迟上传,本地后端无操作)"""
  388. pass
  389. def end_batch(self) -> None:
  390. """结束批量模式"""
  391. pass
  392. def get_active_ai_filter_tags(self, date: Optional[str] = None, interests_file: str = "ai_interests.txt") -> List[Dict]:
  393. return []
  394. def get_latest_prompt_hash(self, date: Optional[str] = None, interests_file: str = "ai_interests.txt") -> Optional[str]:
  395. return None
  396. def get_latest_ai_filter_tag_version(self, date: Optional[str] = None) -> int:
  397. return 0
  398. def deprecate_all_ai_filter_tags(self, date: Optional[str] = None, interests_file: str = "ai_interests.txt") -> int:
  399. return 0
  400. def save_ai_filter_tags(self, tags: List[Dict], version: int, prompt_hash: str, date: Optional[str] = None, interests_file: str = "ai_interests.txt") -> int:
  401. return 0
  402. def save_ai_filter_results(self, results: List[Dict], date: Optional[str] = None) -> int:
  403. return 0
  404. def get_active_ai_filter_results(self, date: Optional[str] = None, interests_file: str = "ai_interests.txt") -> List[Dict]:
  405. return []
  406. def deprecate_specific_ai_filter_tags(self, tag_ids: List[int], date: Optional[str] = None) -> int:
  407. return 0
  408. def update_ai_filter_tags_hash(self, interests_file: str, new_hash: str, date: Optional[str] = None) -> int:
  409. return 0
  410. def update_ai_filter_tag_descriptions(self, tag_updates: List[Dict], date: Optional[str] = None, interests_file: str = "ai_interests.txt") -> int:
  411. return 0
  412. def update_ai_filter_tag_priorities(self, tag_priorities: List[Dict], date: Optional[str] = None, interests_file: str = "ai_interests.txt") -> int:
  413. return 0
  414. def save_analyzed_news(self, news_ids: List[str], source_type: str, interests_file: str, prompt_hash: str, matched_ids: Set[str], date: Optional[str] = None) -> int:
  415. return 0
  416. def get_analyzed_news_ids(self, source_type: str = "hotlist", date: Optional[str] = None, interests_file: str = "ai_interests.txt") -> Set[str]:
  417. return set()
  418. def clear_analyzed_news(self, date: Optional[str] = None, interests_file: str = "ai_interests.txt") -> int:
  419. return 0
  420. def clear_unmatched_analyzed_news(self, date: Optional[str] = None, interests_file: str = "ai_interests.txt") -> int:
  421. return 0
  422. def get_all_news_ids(self, date: Optional[str] = None) -> List[Dict]:
  423. return []
  424. def get_all_rss_ids(self, date: Optional[str] = None) -> List[Dict]:
  425. return []
  426. def convert_crawl_results_to_news_data(
  427. results: Dict[str, Dict],
  428. id_to_name: Dict[str, str],
  429. failed_ids: List[str],
  430. crawl_time: str,
  431. crawl_date: str,
  432. ) -> NewsData:
  433. """
  434. 将爬虫结果转换为 NewsData 格式
  435. Args:
  436. results: 爬虫返回的结果 {source_id: {title: {ranks: [], url: "", mobileUrl: ""}}}
  437. id_to_name: 来源ID到名称的映射
  438. failed_ids: 失败的来源ID
  439. crawl_time: 抓取时间(HH:MM)
  440. crawl_date: 抓取日期(YYYY-MM-DD)
  441. Returns:
  442. NewsData 对象
  443. """
  444. items = {}
  445. for source_id, titles_data in results.items():
  446. source_name = id_to_name.get(source_id, source_id)
  447. news_list = []
  448. for title, data in titles_data.items():
  449. ranks = data.get("ranks", [])
  450. url = data.get("url", "")
  451. mobile_url = data.get("mobileUrl", "")
  452. rank = ranks[0] if ranks else 99
  453. news_item = NewsItem(
  454. title=title,
  455. source_id=source_id,
  456. source_name=source_name,
  457. rank=rank,
  458. url=url,
  459. mobile_url=mobile_url,
  460. crawl_time=crawl_time,
  461. ranks=ranks,
  462. first_time=crawl_time,
  463. last_time=crawl_time,
  464. count=1,
  465. )
  466. news_list.append(news_item)
  467. items[source_id] = news_list
  468. return NewsData(
  469. date=crawl_date,
  470. crawl_time=crawl_time,
  471. items=items,
  472. id_to_name=id_to_name,
  473. failed_ids=failed_ids,
  474. )