local.py 49 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340
  1. # coding=utf-8
  2. """
  3. 本地存储后端 - SQLite + TXT/HTML
  4. 使用 SQLite 作为主存储,支持可选的 TXT 快照和 HTML 报告
  5. """
  6. import sqlite3
  7. import shutil
  8. import pytz
  9. import re
  10. from datetime import datetime, timedelta
  11. from pathlib import Path
  12. from typing import Dict, List, Optional
  13. from trendradar.storage.base import StorageBackend, NewsItem, NewsData, RSSItem, RSSData
  14. from trendradar.utils.time import (
  15. get_configured_time,
  16. format_date_folder,
  17. format_time_filename,
  18. )
  19. from trendradar.utils.url import normalize_url
  20. class LocalStorageBackend(StorageBackend):
  21. """
  22. 本地存储后端
  23. 使用 SQLite 数据库存储新闻数据,支持:
  24. - 按日期组织的 SQLite 数据库文件
  25. - 可选的 TXT 快照(用于调试)
  26. - HTML 报告生成
  27. """
  28. def __init__(
  29. self,
  30. data_dir: str = "output",
  31. enable_txt: bool = True,
  32. enable_html: bool = True,
  33. timezone: str = "Asia/Shanghai",
  34. ):
  35. """
  36. 初始化本地存储后端
  37. Args:
  38. data_dir: 数据目录路径
  39. enable_txt: 是否启用 TXT 快照
  40. enable_html: 是否启用 HTML 报告
  41. timezone: 时区配置(默认 Asia/Shanghai)
  42. """
  43. self.data_dir = Path(data_dir)
  44. self.enable_txt = enable_txt
  45. self.enable_html = enable_html
  46. self.timezone = timezone
  47. self._db_connections: Dict[str, sqlite3.Connection] = {}
  48. @property
  49. def backend_name(self) -> str:
  50. return "local"
  51. @property
  52. def supports_txt(self) -> bool:
  53. return self.enable_txt
  54. def _get_configured_time(self) -> datetime:
  55. """获取配置时区的当前时间"""
  56. return get_configured_time(self.timezone)
  57. def _format_date_folder(self, date: Optional[str] = None) -> str:
  58. """格式化日期文件夹名 (ISO 格式: YYYY-MM-DD)"""
  59. return format_date_folder(date, self.timezone)
  60. def _format_time_filename(self) -> str:
  61. """格式化时间文件名 (格式: HH-MM)"""
  62. return format_time_filename(self.timezone)
  63. def _get_db_path(self, date: Optional[str] = None, db_type: str = "news") -> Path:
  64. """
  65. 获取 SQLite 数据库路径
  66. 新结构(扁平):output/{type}/{date}.db
  67. - output/news/2025-12-28.db
  68. - output/rss/2025-12-28.db
  69. Args:
  70. date: 日期字符串
  71. db_type: 数据库类型 ("news" 或 "rss")
  72. Returns:
  73. 数据库文件路径
  74. """
  75. date_str = self._format_date_folder(date)
  76. db_dir = self.data_dir / db_type
  77. db_dir.mkdir(parents=True, exist_ok=True)
  78. return db_dir / f"{date_str}.db"
  79. def _get_connection(self, date: Optional[str] = None, db_type: str = "news") -> sqlite3.Connection:
  80. """
  81. 获取数据库连接(带缓存)
  82. Args:
  83. date: 日期字符串
  84. db_type: 数据库类型 ("news" 或 "rss")
  85. Returns:
  86. 数据库连接
  87. """
  88. db_path = str(self._get_db_path(date, db_type))
  89. if db_path not in self._db_connections:
  90. conn = sqlite3.connect(db_path)
  91. conn.row_factory = sqlite3.Row
  92. self._init_tables(conn, db_type)
  93. self._db_connections[db_path] = conn
  94. return self._db_connections[db_path]
  95. def _get_schema_path(self, db_type: str = "news") -> Path:
  96. """
  97. 获取 schema.sql 文件路径
  98. Args:
  99. db_type: 数据库类型 ("news" 或 "rss")
  100. Returns:
  101. schema 文件路径
  102. """
  103. if db_type == "rss":
  104. return Path(__file__).parent / "rss_schema.sql"
  105. return Path(__file__).parent / "schema.sql"
  106. def _init_tables(self, conn: sqlite3.Connection, db_type: str = "news") -> None:
  107. """
  108. 从 schema.sql 初始化数据库表结构
  109. Args:
  110. conn: 数据库连接
  111. db_type: 数据库类型 ("news" 或 "rss")
  112. """
  113. schema_path = self._get_schema_path(db_type)
  114. if schema_path.exists():
  115. with open(schema_path, "r", encoding="utf-8") as f:
  116. schema_sql = f.read()
  117. conn.executescript(schema_sql)
  118. else:
  119. raise FileNotFoundError(f"Schema file not found: {schema_path}")
  120. conn.commit()
  121. def save_news_data(self, data: NewsData) -> bool:
  122. """
  123. 保存新闻数据到 SQLite(以 URL 为唯一标识,支持标题更新检测)
  124. Args:
  125. data: 新闻数据
  126. Returns:
  127. 是否保存成功
  128. """
  129. try:
  130. conn = self._get_connection(data.date)
  131. cursor = conn.cursor()
  132. # 获取配置时区的当前时间
  133. now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
  134. # 首先同步平台信息到 platforms 表
  135. for source_id, source_name in data.id_to_name.items():
  136. cursor.execute("""
  137. INSERT INTO platforms (id, name, updated_at)
  138. VALUES (?, ?, ?)
  139. ON CONFLICT(id) DO UPDATE SET
  140. name = excluded.name,
  141. updated_at = excluded.updated_at
  142. """, (source_id, source_name, now_str))
  143. # 统计计数器
  144. new_count = 0
  145. updated_count = 0
  146. title_changed_count = 0
  147. success_sources = []
  148. for source_id, news_list in data.items.items():
  149. success_sources.append(source_id)
  150. for item in news_list:
  151. try:
  152. # 标准化 URL(去除动态参数,如微博的 band_rank)
  153. normalized_url = normalize_url(item.url, source_id) if item.url else ""
  154. # 检查是否已存在(通过标准化 URL + platform_id)
  155. if normalized_url:
  156. cursor.execute("""
  157. SELECT id, title FROM news_items
  158. WHERE url = ? AND platform_id = ?
  159. """, (normalized_url, source_id))
  160. existing = cursor.fetchone()
  161. if existing:
  162. # 已存在,更新记录
  163. existing_id, existing_title = existing
  164. # 检查标题是否变化
  165. if existing_title != item.title:
  166. # 记录标题变更
  167. cursor.execute("""
  168. INSERT INTO title_changes
  169. (news_item_id, old_title, new_title, changed_at)
  170. VALUES (?, ?, ?, ?)
  171. """, (existing_id, existing_title, item.title, now_str))
  172. title_changed_count += 1
  173. # 记录排名历史
  174. cursor.execute("""
  175. INSERT INTO rank_history
  176. (news_item_id, rank, crawl_time, created_at)
  177. VALUES (?, ?, ?, ?)
  178. """, (existing_id, item.rank, data.crawl_time, now_str))
  179. # 更新现有记录
  180. cursor.execute("""
  181. UPDATE news_items SET
  182. title = ?,
  183. rank = ?,
  184. mobile_url = ?,
  185. last_crawl_time = ?,
  186. crawl_count = crawl_count + 1,
  187. updated_at = ?
  188. WHERE id = ?
  189. """, (item.title, item.rank, item.mobile_url,
  190. data.crawl_time, now_str, existing_id))
  191. updated_count += 1
  192. else:
  193. # 不存在,插入新记录(存储标准化后的 URL)
  194. cursor.execute("""
  195. INSERT INTO news_items
  196. (title, platform_id, rank, url, mobile_url,
  197. first_crawl_time, last_crawl_time, crawl_count,
  198. created_at, updated_at)
  199. VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?)
  200. """, (item.title, source_id, item.rank, normalized_url,
  201. item.mobile_url, data.crawl_time, data.crawl_time,
  202. now_str, now_str))
  203. new_id = cursor.lastrowid
  204. # 记录初始排名
  205. cursor.execute("""
  206. INSERT INTO rank_history
  207. (news_item_id, rank, crawl_time, created_at)
  208. VALUES (?, ?, ?, ?)
  209. """, (new_id, item.rank, data.crawl_time, now_str))
  210. new_count += 1
  211. else:
  212. # URL 为空的情况,直接插入(不做去重)
  213. cursor.execute("""
  214. INSERT INTO news_items
  215. (title, platform_id, rank, url, mobile_url,
  216. first_crawl_time, last_crawl_time, crawl_count,
  217. created_at, updated_at)
  218. VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?)
  219. """, (item.title, source_id, item.rank, "",
  220. item.mobile_url, data.crawl_time, data.crawl_time,
  221. now_str, now_str))
  222. new_id = cursor.lastrowid
  223. # 记录初始排名
  224. cursor.execute("""
  225. INSERT INTO rank_history
  226. (news_item_id, rank, crawl_time, created_at)
  227. VALUES (?, ?, ?, ?)
  228. """, (new_id, item.rank, data.crawl_time, now_str))
  229. new_count += 1
  230. except sqlite3.Error as e:
  231. print(f"保存新闻条目失败 [{item.title[:30]}...]: {e}")
  232. total_items = new_count + updated_count
  233. # 记录抓取信息
  234. cursor.execute("""
  235. INSERT OR REPLACE INTO crawl_records
  236. (crawl_time, total_items, created_at)
  237. VALUES (?, ?, ?)
  238. """, (data.crawl_time, total_items, now_str))
  239. # 获取刚插入的 crawl_record 的 ID
  240. cursor.execute("""
  241. SELECT id FROM crawl_records WHERE crawl_time = ?
  242. """, (data.crawl_time,))
  243. record_row = cursor.fetchone()
  244. if record_row:
  245. crawl_record_id = record_row[0]
  246. # 记录成功的来源
  247. for source_id in success_sources:
  248. cursor.execute("""
  249. INSERT OR REPLACE INTO crawl_source_status
  250. (crawl_record_id, platform_id, status)
  251. VALUES (?, ?, 'success')
  252. """, (crawl_record_id, source_id))
  253. # 记录失败的来源
  254. for failed_id in data.failed_ids:
  255. # 确保失败的平台也在 platforms 表中
  256. cursor.execute("""
  257. INSERT OR IGNORE INTO platforms (id, name, updated_at)
  258. VALUES (?, ?, ?)
  259. """, (failed_id, failed_id, now_str))
  260. cursor.execute("""
  261. INSERT OR REPLACE INTO crawl_source_status
  262. (crawl_record_id, platform_id, status)
  263. VALUES (?, ?, 'failed')
  264. """, (crawl_record_id, failed_id))
  265. conn.commit()
  266. # 输出详细的存储统计日志
  267. log_parts = [f"[本地存储] 处理完成:新增 {new_count} 条"]
  268. if updated_count > 0:
  269. log_parts.append(f"更新 {updated_count} 条")
  270. if title_changed_count > 0:
  271. log_parts.append(f"标题变更 {title_changed_count} 条")
  272. print(",".join(log_parts))
  273. return True
  274. except Exception as e:
  275. print(f"[本地存储] 保存失败: {e}")
  276. return False
  277. def get_today_all_data(self, date: Optional[str] = None) -> Optional[NewsData]:
  278. """
  279. 获取指定日期的所有新闻数据(合并后)
  280. Args:
  281. date: 日期字符串,默认为今天
  282. Returns:
  283. 合并后的新闻数据
  284. """
  285. try:
  286. db_path = self._get_db_path(date)
  287. if not db_path.exists():
  288. return None
  289. conn = self._get_connection(date)
  290. cursor = conn.cursor()
  291. # 获取所有新闻数据(包含 id 用于查询排名历史)
  292. cursor.execute("""
  293. SELECT n.id, n.title, n.platform_id, p.name as platform_name,
  294. n.rank, n.url, n.mobile_url,
  295. n.first_crawl_time, n.last_crawl_time, n.crawl_count
  296. FROM news_items n
  297. LEFT JOIN platforms p ON n.platform_id = p.id
  298. ORDER BY n.platform_id, n.last_crawl_time
  299. """)
  300. rows = cursor.fetchall()
  301. if not rows:
  302. return None
  303. # 收集所有 news_item_id
  304. news_ids = [row[0] for row in rows]
  305. # 批量查询排名历史
  306. rank_history_map: Dict[int, List[int]] = {}
  307. if news_ids:
  308. placeholders = ",".join("?" * len(news_ids))
  309. cursor.execute(f"""
  310. SELECT news_item_id, rank FROM rank_history
  311. WHERE news_item_id IN ({placeholders})
  312. ORDER BY news_item_id, crawl_time
  313. """, news_ids)
  314. for rh_row in cursor.fetchall():
  315. news_id, rank = rh_row[0], rh_row[1]
  316. if news_id not in rank_history_map:
  317. rank_history_map[news_id] = []
  318. if rank not in rank_history_map[news_id]:
  319. rank_history_map[news_id].append(rank)
  320. # 按 platform_id 分组
  321. items: Dict[str, List[NewsItem]] = {}
  322. id_to_name: Dict[str, str] = {}
  323. crawl_date = self._format_date_folder(date)
  324. for row in rows:
  325. news_id = row[0]
  326. platform_id = row[2]
  327. title = row[1]
  328. platform_name = row[3] or platform_id
  329. id_to_name[platform_id] = platform_name
  330. if platform_id not in items:
  331. items[platform_id] = []
  332. # 获取排名历史,如果没有则使用当前排名
  333. ranks = rank_history_map.get(news_id, [row[4]])
  334. items[platform_id].append(NewsItem(
  335. title=title,
  336. source_id=platform_id,
  337. source_name=platform_name,
  338. rank=row[4],
  339. url=row[5] or "",
  340. mobile_url=row[6] or "",
  341. crawl_time=row[8], # last_crawl_time
  342. ranks=ranks,
  343. first_time=row[7], # first_crawl_time
  344. last_time=row[8], # last_crawl_time
  345. count=row[9], # crawl_count
  346. ))
  347. final_items = items
  348. # 获取失败的来源
  349. cursor.execute("""
  350. SELECT DISTINCT css.platform_id
  351. FROM crawl_source_status css
  352. JOIN crawl_records cr ON css.crawl_record_id = cr.id
  353. WHERE css.status = 'failed'
  354. """)
  355. failed_ids = [row[0] for row in cursor.fetchall()]
  356. # 获取最新的抓取时间
  357. cursor.execute("""
  358. SELECT crawl_time FROM crawl_records
  359. ORDER BY crawl_time DESC
  360. LIMIT 1
  361. """)
  362. time_row = cursor.fetchone()
  363. crawl_time = time_row[0] if time_row else self._format_time_filename()
  364. return NewsData(
  365. date=crawl_date,
  366. crawl_time=crawl_time,
  367. items=final_items,
  368. id_to_name=id_to_name,
  369. failed_ids=failed_ids,
  370. )
  371. except Exception as e:
  372. print(f"[本地存储] 读取数据失败: {e}")
  373. return None
  374. def get_latest_crawl_data(self, date: Optional[str] = None) -> Optional[NewsData]:
  375. """
  376. 获取最新一次抓取的数据
  377. Args:
  378. date: 日期字符串,默认为今天
  379. Returns:
  380. 最新抓取的新闻数据
  381. """
  382. try:
  383. db_path = self._get_db_path(date)
  384. if not db_path.exists():
  385. return None
  386. conn = self._get_connection(date)
  387. cursor = conn.cursor()
  388. # 获取最新的抓取时间
  389. cursor.execute("""
  390. SELECT crawl_time FROM crawl_records
  391. ORDER BY crawl_time DESC
  392. LIMIT 1
  393. """)
  394. time_row = cursor.fetchone()
  395. if not time_row:
  396. return None
  397. latest_time = time_row[0]
  398. # 获取该时间的新闻数据(包含 id 用于查询排名历史)
  399. cursor.execute("""
  400. SELECT n.id, n.title, n.platform_id, p.name as platform_name,
  401. n.rank, n.url, n.mobile_url,
  402. n.first_crawl_time, n.last_crawl_time, n.crawl_count
  403. FROM news_items n
  404. LEFT JOIN platforms p ON n.platform_id = p.id
  405. WHERE n.last_crawl_time = ?
  406. """, (latest_time,))
  407. rows = cursor.fetchall()
  408. if not rows:
  409. return None
  410. # 收集所有 news_item_id
  411. news_ids = [row[0] for row in rows]
  412. # 批量查询排名历史
  413. rank_history_map: Dict[int, List[int]] = {}
  414. if news_ids:
  415. placeholders = ",".join("?" * len(news_ids))
  416. cursor.execute(f"""
  417. SELECT news_item_id, rank FROM rank_history
  418. WHERE news_item_id IN ({placeholders})
  419. ORDER BY news_item_id, crawl_time
  420. """, news_ids)
  421. for rh_row in cursor.fetchall():
  422. news_id, rank = rh_row[0], rh_row[1]
  423. if news_id not in rank_history_map:
  424. rank_history_map[news_id] = []
  425. if rank not in rank_history_map[news_id]:
  426. rank_history_map[news_id].append(rank)
  427. items: Dict[str, List[NewsItem]] = {}
  428. id_to_name: Dict[str, str] = {}
  429. crawl_date = self._format_date_folder(date)
  430. for row in rows:
  431. news_id = row[0]
  432. platform_id = row[2]
  433. platform_name = row[3] or platform_id
  434. id_to_name[platform_id] = platform_name
  435. if platform_id not in items:
  436. items[platform_id] = []
  437. # 获取排名历史,如果没有则使用当前排名
  438. ranks = rank_history_map.get(news_id, [row[4]])
  439. items[platform_id].append(NewsItem(
  440. title=row[1],
  441. source_id=platform_id,
  442. source_name=platform_name,
  443. rank=row[4],
  444. url=row[5] or "",
  445. mobile_url=row[6] or "",
  446. crawl_time=row[8], # last_crawl_time
  447. ranks=ranks,
  448. first_time=row[7], # first_crawl_time
  449. last_time=row[8], # last_crawl_time
  450. count=row[9], # crawl_count
  451. ))
  452. # 获取失败的来源(针对最新一次抓取)
  453. cursor.execute("""
  454. SELECT css.platform_id
  455. FROM crawl_source_status css
  456. JOIN crawl_records cr ON css.crawl_record_id = cr.id
  457. WHERE cr.crawl_time = ? AND css.status = 'failed'
  458. """, (latest_time,))
  459. failed_ids = [row[0] for row in cursor.fetchall()]
  460. return NewsData(
  461. date=crawl_date,
  462. crawl_time=latest_time,
  463. items=items,
  464. id_to_name=id_to_name,
  465. failed_ids=failed_ids,
  466. )
  467. except Exception as e:
  468. print(f"[本地存储] 获取最新数据失败: {e}")
  469. return None
  470. def detect_new_titles(self, current_data: NewsData) -> Dict[str, Dict]:
  471. """
  472. 检测新增的标题
  473. 该方法比较当前抓取数据与历史数据,找出新增的标题。
  474. 关键逻辑:只有在历史批次中从未出现过的标题才算新增。
  475. Args:
  476. current_data: 当前抓取的数据
  477. Returns:
  478. 新增的标题数据 {source_id: {title: NewsItem}}
  479. """
  480. try:
  481. # 获取历史数据
  482. historical_data = self.get_today_all_data(current_data.date)
  483. if not historical_data:
  484. # 没有历史数据,所有都是新的
  485. new_titles = {}
  486. for source_id, news_list in current_data.items.items():
  487. new_titles[source_id] = {item.title: item for item in news_list}
  488. return new_titles
  489. # 获取当前批次时间
  490. current_time = current_data.crawl_time
  491. # 收集历史标题(first_time < current_time 的标题)
  492. # 这样可以正确处理同一标题因 URL 变化而产生多条记录的情况
  493. historical_titles: Dict[str, set] = {}
  494. for source_id, news_list in historical_data.items.items():
  495. historical_titles[source_id] = set()
  496. for item in news_list:
  497. first_time = getattr(item, 'first_time', item.crawl_time)
  498. if first_time < current_time:
  499. historical_titles[source_id].add(item.title)
  500. # 检查是否有历史数据
  501. has_historical_data = any(len(titles) > 0 for titles in historical_titles.values())
  502. if not has_historical_data:
  503. # 第一次抓取,没有"新增"概念
  504. return {}
  505. # 检测新增
  506. new_titles = {}
  507. for source_id, news_list in current_data.items.items():
  508. hist_set = historical_titles.get(source_id, set())
  509. for item in news_list:
  510. if item.title not in hist_set:
  511. if source_id not in new_titles:
  512. new_titles[source_id] = {}
  513. new_titles[source_id][item.title] = item
  514. return new_titles
  515. except Exception as e:
  516. print(f"[本地存储] 检测新标题失败: {e}")
  517. return {}
  518. def save_txt_snapshot(self, data: NewsData) -> Optional[str]:
  519. """
  520. 保存 TXT 快照
  521. 新结构:output/txt/{date}/{time}.txt
  522. Args:
  523. data: 新闻数据
  524. Returns:
  525. 保存的文件路径
  526. """
  527. if not self.enable_txt:
  528. return None
  529. try:
  530. date_folder = self._format_date_folder(data.date)
  531. txt_dir = self.data_dir / "txt" / date_folder
  532. txt_dir.mkdir(parents=True, exist_ok=True)
  533. file_path = txt_dir / f"{data.crawl_time}.txt"
  534. with open(file_path, "w", encoding="utf-8") as f:
  535. for source_id, news_list in data.items.items():
  536. source_name = data.id_to_name.get(source_id, source_id)
  537. # 写入来源标题
  538. if source_name and source_name != source_id:
  539. f.write(f"{source_id} | {source_name}\n")
  540. else:
  541. f.write(f"{source_id}\n")
  542. # 按排名排序
  543. sorted_news = sorted(news_list, key=lambda x: x.rank)
  544. for item in sorted_news:
  545. line = f"{item.rank}. {item.title}"
  546. if item.url:
  547. line += f" [URL:{item.url}]"
  548. if item.mobile_url:
  549. line += f" [MOBILE:{item.mobile_url}]"
  550. f.write(line + "\n")
  551. f.write("\n")
  552. # 写入失败的来源
  553. if data.failed_ids:
  554. f.write("==== 以下ID请求失败 ====\n")
  555. for failed_id in data.failed_ids:
  556. f.write(f"{failed_id}\n")
  557. print(f"[本地存储] TXT 快照已保存: {file_path}")
  558. return str(file_path)
  559. except Exception as e:
  560. print(f"[本地存储] 保存 TXT 快照失败: {e}")
  561. return None
  562. def save_html_report(self, html_content: str, filename: str, is_summary: bool = False) -> Optional[str]:
  563. """
  564. 保存 HTML 报告
  565. 新结构:output/html/{date}/{filename}
  566. Args:
  567. html_content: HTML 内容
  568. filename: 文件名
  569. is_summary: 是否为汇总报告
  570. Returns:
  571. 保存的文件路径
  572. """
  573. if not self.enable_html:
  574. return None
  575. try:
  576. date_folder = self._format_date_folder()
  577. html_dir = self.data_dir / "html" / date_folder
  578. html_dir.mkdir(parents=True, exist_ok=True)
  579. file_path = html_dir / filename
  580. with open(file_path, "w", encoding="utf-8") as f:
  581. f.write(html_content)
  582. print(f"[本地存储] HTML 报告已保存: {file_path}")
  583. return str(file_path)
  584. except Exception as e:
  585. print(f"[本地存储] 保存 HTML 报告失败: {e}")
  586. return None
  587. def is_first_crawl_today(self, date: Optional[str] = None) -> bool:
  588. """
  589. 检查是否是当天第一次抓取
  590. Args:
  591. date: 日期字符串,默认为今天
  592. Returns:
  593. 是否是第一次抓取
  594. """
  595. try:
  596. db_path = self._get_db_path(date)
  597. if not db_path.exists():
  598. return True
  599. conn = self._get_connection(date)
  600. cursor = conn.cursor()
  601. cursor.execute("""
  602. SELECT COUNT(*) as count FROM crawl_records
  603. """)
  604. row = cursor.fetchone()
  605. count = row[0] if row else 0
  606. # 如果只有一条或没有记录,视为第一次抓取
  607. return count <= 1
  608. except Exception as e:
  609. print(f"[本地存储] 检查首次抓取失败: {e}")
  610. return True
  611. def get_crawl_times(self, date: Optional[str] = None) -> List[str]:
  612. """
  613. 获取指定日期的所有抓取时间列表
  614. Args:
  615. date: 日期字符串,默认为今天
  616. Returns:
  617. 抓取时间列表(按时间排序)
  618. """
  619. try:
  620. db_path = self._get_db_path(date)
  621. if not db_path.exists():
  622. return []
  623. conn = self._get_connection(date)
  624. cursor = conn.cursor()
  625. cursor.execute("""
  626. SELECT crawl_time FROM crawl_records
  627. ORDER BY crawl_time
  628. """)
  629. rows = cursor.fetchall()
  630. return [row[0] for row in rows]
  631. except Exception as e:
  632. print(f"[本地存储] 获取抓取时间列表失败: {e}")
  633. return []
  634. def cleanup(self) -> None:
  635. """清理资源(关闭数据库连接)"""
  636. for db_path, conn in self._db_connections.items():
  637. try:
  638. conn.close()
  639. print(f"[本地存储] 关闭数据库连接: {db_path}")
  640. except Exception as e:
  641. print(f"[本地存储] 关闭连接失败 {db_path}: {e}")
  642. self._db_connections.clear()
  643. def cleanup_old_data(self, retention_days: int) -> int:
  644. """
  645. 清理过期数据
  646. 新结构清理逻辑:
  647. - output/news/{date}.db -> 删除过期的 .db 文件
  648. - output/rss/{date}.db -> 删除过期的 .db 文件
  649. - output/txt/{date}/ -> 删除过期的日期目录
  650. - output/html/{date}/ -> 删除过期的日期目录
  651. Args:
  652. retention_days: 保留天数(0 表示不清理)
  653. Returns:
  654. 删除的文件/目录数量
  655. """
  656. if retention_days <= 0:
  657. return 0
  658. deleted_count = 0
  659. cutoff_date = self._get_configured_time() - timedelta(days=retention_days)
  660. def parse_date_from_name(name: str) -> Optional[datetime]:
  661. """从文件名或目录名解析日期"""
  662. # 移除 .db 后缀
  663. name = name.replace('.db', '')
  664. try:
  665. # ISO 格式: YYYY-MM-DD
  666. date_match = re.match(r'(\d{4})-(\d{2})-(\d{2})', name)
  667. if date_match:
  668. return datetime(
  669. int(date_match.group(1)),
  670. int(date_match.group(2)),
  671. int(date_match.group(3)),
  672. tzinfo=pytz.timezone("Asia/Shanghai")
  673. )
  674. # 旧中文格式: YYYY年MM月DD日
  675. date_match = re.match(r'(\d{4})年(\d{2})月(\d{2})日', name)
  676. if date_match:
  677. return datetime(
  678. int(date_match.group(1)),
  679. int(date_match.group(2)),
  680. int(date_match.group(3)),
  681. tzinfo=pytz.timezone("Asia/Shanghai")
  682. )
  683. except Exception:
  684. pass
  685. return None
  686. try:
  687. if not self.data_dir.exists():
  688. return 0
  689. # 清理数据库文件 (news/, rss/)
  690. for db_type in ["news", "rss"]:
  691. db_dir = self.data_dir / db_type
  692. if not db_dir.exists():
  693. continue
  694. for db_file in db_dir.glob("*.db"):
  695. file_date = parse_date_from_name(db_file.name)
  696. if file_date and file_date < cutoff_date:
  697. # 先关闭数据库连接
  698. db_path = str(db_file)
  699. if db_path in self._db_connections:
  700. try:
  701. self._db_connections[db_path].close()
  702. del self._db_connections[db_path]
  703. except Exception:
  704. pass
  705. # 删除文件
  706. try:
  707. db_file.unlink()
  708. deleted_count += 1
  709. print(f"[本地存储] 清理过期数据: {db_type}/{db_file.name}")
  710. except Exception as e:
  711. print(f"[本地存储] 删除文件失败 {db_file}: {e}")
  712. # 清理快照目录 (txt/, html/)
  713. for snapshot_type in ["txt", "html"]:
  714. snapshot_dir = self.data_dir / snapshot_type
  715. if not snapshot_dir.exists():
  716. continue
  717. for date_folder in snapshot_dir.iterdir():
  718. if not date_folder.is_dir() or date_folder.name.startswith('.'):
  719. continue
  720. folder_date = parse_date_from_name(date_folder.name)
  721. if folder_date and folder_date < cutoff_date:
  722. try:
  723. shutil.rmtree(date_folder)
  724. deleted_count += 1
  725. print(f"[本地存储] 清理过期数据: {snapshot_type}/{date_folder.name}")
  726. except Exception as e:
  727. print(f"[本地存储] 删除目录失败 {date_folder}: {e}")
  728. if deleted_count > 0:
  729. print(f"[本地存储] 共清理 {deleted_count} 个过期文件/目录")
  730. return deleted_count
  731. except Exception as e:
  732. print(f"[本地存储] 清理过期数据失败: {e}")
  733. return deleted_count
  734. def has_pushed_today(self, date: Optional[str] = None) -> bool:
  735. """
  736. 检查指定日期是否已推送过
  737. Args:
  738. date: 日期字符串(YYYY-MM-DD),默认为今天
  739. Returns:
  740. 是否已推送
  741. """
  742. try:
  743. conn = self._get_connection(date)
  744. cursor = conn.cursor()
  745. target_date = self._format_date_folder(date)
  746. cursor.execute("""
  747. SELECT pushed FROM push_records WHERE date = ?
  748. """, (target_date,))
  749. row = cursor.fetchone()
  750. if row:
  751. return bool(row[0])
  752. return False
  753. except Exception as e:
  754. print(f"[本地存储] 检查推送记录失败: {e}")
  755. return False
  756. def record_push(self, report_type: str, date: Optional[str] = None) -> bool:
  757. """
  758. 记录推送
  759. Args:
  760. report_type: 报告类型
  761. date: 日期字符串(YYYY-MM-DD),默认为今天
  762. Returns:
  763. 是否记录成功
  764. """
  765. try:
  766. conn = self._get_connection(date)
  767. cursor = conn.cursor()
  768. target_date = self._format_date_folder(date)
  769. now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
  770. cursor.execute("""
  771. INSERT INTO push_records (date, pushed, push_time, report_type, created_at)
  772. VALUES (?, 1, ?, ?, ?)
  773. ON CONFLICT(date) DO UPDATE SET
  774. pushed = 1,
  775. push_time = excluded.push_time,
  776. report_type = excluded.report_type
  777. """, (target_date, now_str, report_type, now_str))
  778. conn.commit()
  779. print(f"[本地存储] 推送记录已保存: {report_type} at {now_str}")
  780. return True
  781. except Exception as e:
  782. print(f"[本地存储] 记录推送失败: {e}")
  783. return False
  784. # ========================================
  785. # RSS 数据存储方法
  786. # ========================================
  787. def save_rss_data(self, data: RSSData) -> bool:
  788. """
  789. 保存 RSS 数据到 SQLite(以 URL 为唯一标识)
  790. Args:
  791. data: RSS 数据
  792. Returns:
  793. 是否保存成功
  794. """
  795. try:
  796. conn = self._get_connection(data.date, db_type="rss")
  797. cursor = conn.cursor()
  798. now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
  799. # 同步 RSS 源信息到 rss_feeds 表
  800. for feed_id, feed_name in data.id_to_name.items():
  801. cursor.execute("""
  802. INSERT INTO rss_feeds (id, name, updated_at)
  803. VALUES (?, ?, ?)
  804. ON CONFLICT(id) DO UPDATE SET
  805. name = excluded.name,
  806. updated_at = excluded.updated_at
  807. """, (feed_id, feed_name, now_str))
  808. # 统计计数器
  809. new_count = 0
  810. updated_count = 0
  811. for feed_id, rss_list in data.items.items():
  812. for item in rss_list:
  813. try:
  814. # 检查是否已存在(通过 URL + feed_id)
  815. if item.url:
  816. cursor.execute("""
  817. SELECT id, title FROM rss_items
  818. WHERE url = ? AND feed_id = ?
  819. """, (item.url, feed_id))
  820. existing = cursor.fetchone()
  821. if existing:
  822. # 已存在,更新记录
  823. existing_id = existing[0]
  824. cursor.execute("""
  825. UPDATE rss_items SET
  826. title = ?,
  827. published_at = ?,
  828. summary = ?,
  829. author = ?,
  830. last_crawl_time = ?,
  831. crawl_count = crawl_count + 1,
  832. updated_at = ?
  833. WHERE id = ?
  834. """, (item.title, item.published_at, item.summary,
  835. item.author, data.crawl_time, now_str, existing_id))
  836. updated_count += 1
  837. else:
  838. # 不存在,插入新记录
  839. cursor.execute("""
  840. INSERT INTO rss_items
  841. (title, feed_id, url, published_at, summary, author,
  842. first_crawl_time, last_crawl_time, crawl_count,
  843. created_at, updated_at)
  844. VALUES (?, ?, ?, ?, ?, ?, ?, ?, 1, ?, ?)
  845. """, (item.title, feed_id, item.url, item.published_at,
  846. item.summary, item.author, data.crawl_time,
  847. data.crawl_time, now_str, now_str))
  848. new_count += 1
  849. else:
  850. # URL 为空,直接插入
  851. cursor.execute("""
  852. INSERT INTO rss_items
  853. (title, feed_id, url, published_at, summary, author,
  854. first_crawl_time, last_crawl_time, crawl_count,
  855. created_at, updated_at)
  856. VALUES (?, ?, ?, ?, ?, ?, ?, ?, 1, ?, ?)
  857. """, (item.title, feed_id, "", item.published_at,
  858. item.summary, item.author, data.crawl_time,
  859. data.crawl_time, now_str, now_str))
  860. new_count += 1
  861. except sqlite3.Error as e:
  862. print(f"[本地存储] 保存 RSS 条目失败 [{item.title[:30]}...]: {e}")
  863. total_items = new_count + updated_count
  864. # 记录抓取信息
  865. cursor.execute("""
  866. INSERT OR REPLACE INTO rss_crawl_records
  867. (crawl_time, total_items, created_at)
  868. VALUES (?, ?, ?)
  869. """, (data.crawl_time, total_items, now_str))
  870. # 记录抓取状态
  871. cursor.execute("""
  872. SELECT id FROM rss_crawl_records WHERE crawl_time = ?
  873. """, (data.crawl_time,))
  874. record_row = cursor.fetchone()
  875. if record_row:
  876. crawl_record_id = record_row[0]
  877. # 记录成功的源
  878. for feed_id in data.items.keys():
  879. cursor.execute("""
  880. INSERT OR REPLACE INTO rss_crawl_status
  881. (crawl_record_id, feed_id, status)
  882. VALUES (?, ?, 'success')
  883. """, (crawl_record_id, feed_id))
  884. # 记录失败的源
  885. for failed_id in data.failed_ids:
  886. cursor.execute("""
  887. INSERT OR IGNORE INTO rss_feeds (id, name, updated_at)
  888. VALUES (?, ?, ?)
  889. """, (failed_id, failed_id, now_str))
  890. cursor.execute("""
  891. INSERT OR REPLACE INTO rss_crawl_status
  892. (crawl_record_id, feed_id, status)
  893. VALUES (?, ?, 'failed')
  894. """, (crawl_record_id, failed_id))
  895. conn.commit()
  896. # 输出统计日志
  897. log_parts = [f"[本地存储] RSS 处理完成:新增 {new_count} 条"]
  898. if updated_count > 0:
  899. log_parts.append(f"更新 {updated_count} 条")
  900. print(",".join(log_parts))
  901. return True
  902. except Exception as e:
  903. print(f"[本地存储] 保存 RSS 数据失败: {e}")
  904. return False
  905. def get_rss_data(self, date: Optional[str] = None) -> Optional[RSSData]:
  906. """
  907. 获取指定日期的所有 RSS 数据
  908. Args:
  909. date: 日期字符串(YYYY-MM-DD),默认为今天
  910. Returns:
  911. RSSData 对象,如果没有数据返回 None
  912. """
  913. try:
  914. conn = self._get_connection(date, db_type="rss")
  915. cursor = conn.cursor()
  916. # 获取所有 RSS 数据
  917. cursor.execute("""
  918. SELECT i.id, i.title, i.feed_id, f.name as feed_name,
  919. i.url, i.published_at, i.summary, i.author,
  920. i.first_crawl_time, i.last_crawl_time, i.crawl_count
  921. FROM rss_items i
  922. LEFT JOIN rss_feeds f ON i.feed_id = f.id
  923. ORDER BY i.published_at DESC
  924. """)
  925. rows = cursor.fetchall()
  926. if not rows:
  927. return None
  928. items: Dict[str, List[RSSItem]] = {}
  929. id_to_name: Dict[str, str] = {}
  930. crawl_date = self._format_date_folder(date)
  931. for row in rows:
  932. feed_id = row[2]
  933. feed_name = row[3] or feed_id
  934. id_to_name[feed_id] = feed_name
  935. if feed_id not in items:
  936. items[feed_id] = []
  937. items[feed_id].append(RSSItem(
  938. title=row[1],
  939. feed_id=feed_id,
  940. feed_name=feed_name,
  941. url=row[4] or "",
  942. published_at=row[5] or "",
  943. summary=row[6] or "",
  944. author=row[7] or "",
  945. crawl_time=row[9],
  946. first_time=row[8],
  947. last_time=row[9],
  948. count=row[10],
  949. ))
  950. # 获取最新的抓取时间
  951. cursor.execute("""
  952. SELECT crawl_time FROM rss_crawl_records
  953. ORDER BY crawl_time DESC
  954. LIMIT 1
  955. """)
  956. time_row = cursor.fetchone()
  957. crawl_time = time_row[0] if time_row else self._format_time_filename()
  958. # 获取失败的源
  959. cursor.execute("""
  960. SELECT DISTINCT cs.feed_id
  961. FROM rss_crawl_status cs
  962. JOIN rss_crawl_records cr ON cs.crawl_record_id = cr.id
  963. WHERE cs.status = 'failed'
  964. """)
  965. failed_ids = [row[0] for row in cursor.fetchall()]
  966. return RSSData(
  967. date=crawl_date,
  968. crawl_time=crawl_time,
  969. items=items,
  970. id_to_name=id_to_name,
  971. failed_ids=failed_ids,
  972. )
  973. except Exception as e:
  974. print(f"[本地存储] 读取 RSS 数据失败: {e}")
  975. return None
  976. def detect_new_rss_items(self, current_data: RSSData) -> Dict[str, List[RSSItem]]:
  977. """
  978. 检测新增的 RSS 条目(增量模式)
  979. 该方法比较当前抓取数据与历史数据,找出新增的 RSS 条目。
  980. 关键逻辑:只有在历史批次中从未出现过的 URL 才算新增。
  981. Args:
  982. current_data: 当前抓取的 RSS 数据
  983. Returns:
  984. 新增的 RSS 条目 {feed_id: [RSSItem, ...]}
  985. """
  986. try:
  987. # 获取历史数据
  988. historical_data = self.get_rss_data(current_data.date)
  989. if not historical_data:
  990. # 没有历史数据,所有都是新的
  991. return current_data.items.copy()
  992. # 获取当前批次时间
  993. current_time = current_data.crawl_time
  994. # 收集历史 URL(first_time < current_time 的条目)
  995. historical_urls: Dict[str, set] = {}
  996. for feed_id, rss_list in historical_data.items.items():
  997. historical_urls[feed_id] = set()
  998. for item in rss_list:
  999. first_time = getattr(item, 'first_time', item.crawl_time)
  1000. if first_time < current_time:
  1001. if item.url:
  1002. historical_urls[feed_id].add(item.url)
  1003. # 检查是否有历史数据
  1004. has_historical_data = any(len(urls) > 0 for urls in historical_urls.values())
  1005. if not has_historical_data:
  1006. # 第一次抓取,没有"新增"概念
  1007. return {}
  1008. # 检测新增
  1009. new_items: Dict[str, List[RSSItem]] = {}
  1010. for feed_id, rss_list in current_data.items.items():
  1011. hist_set = historical_urls.get(feed_id, set())
  1012. for item in rss_list:
  1013. # 通过 URL 判断是否新增
  1014. if item.url and item.url not in hist_set:
  1015. if feed_id not in new_items:
  1016. new_items[feed_id] = []
  1017. new_items[feed_id].append(item)
  1018. return new_items
  1019. except Exception as e:
  1020. print(f"[本地存储] 检测新 RSS 条目失败: {e}")
  1021. return {}
  1022. def get_latest_rss_data(self, date: Optional[str] = None) -> Optional[RSSData]:
  1023. """
  1024. 获取最新一次抓取的 RSS 数据(当前榜单模式)
  1025. Args:
  1026. date: 日期字符串(YYYY-MM-DD),默认为今天
  1027. Returns:
  1028. 最新抓取的 RSS 数据,如果没有数据返回 None
  1029. """
  1030. try:
  1031. db_path = self._get_db_path(date, db_type="rss")
  1032. if not db_path.exists():
  1033. return None
  1034. conn = self._get_connection(date, db_type="rss")
  1035. cursor = conn.cursor()
  1036. # 获取最新的抓取时间
  1037. cursor.execute("""
  1038. SELECT crawl_time FROM rss_crawl_records
  1039. ORDER BY crawl_time DESC
  1040. LIMIT 1
  1041. """)
  1042. time_row = cursor.fetchone()
  1043. if not time_row:
  1044. return None
  1045. latest_time = time_row[0]
  1046. # 获取该时间的 RSS 数据
  1047. cursor.execute("""
  1048. SELECT i.id, i.title, i.feed_id, f.name as feed_name,
  1049. i.url, i.published_at, i.summary, i.author,
  1050. i.first_crawl_time, i.last_crawl_time, i.crawl_count
  1051. FROM rss_items i
  1052. LEFT JOIN rss_feeds f ON i.feed_id = f.id
  1053. WHERE i.last_crawl_time = ?
  1054. ORDER BY i.published_at DESC
  1055. """, (latest_time,))
  1056. rows = cursor.fetchall()
  1057. if not rows:
  1058. return None
  1059. items: Dict[str, List[RSSItem]] = {}
  1060. id_to_name: Dict[str, str] = {}
  1061. crawl_date = self._format_date_folder(date)
  1062. for row in rows:
  1063. feed_id = row[2]
  1064. feed_name = row[3] or feed_id
  1065. id_to_name[feed_id] = feed_name
  1066. if feed_id not in items:
  1067. items[feed_id] = []
  1068. items[feed_id].append(RSSItem(
  1069. title=row[1],
  1070. feed_id=feed_id,
  1071. feed_name=feed_name,
  1072. url=row[4] or "",
  1073. published_at=row[5] or "",
  1074. summary=row[6] or "",
  1075. author=row[7] or "",
  1076. crawl_time=row[9],
  1077. first_time=row[8],
  1078. last_time=row[9],
  1079. count=row[10],
  1080. ))
  1081. # 获取失败的源(针对最新一次抓取)
  1082. cursor.execute("""
  1083. SELECT cs.feed_id
  1084. FROM rss_crawl_status cs
  1085. JOIN rss_crawl_records cr ON cs.crawl_record_id = cr.id
  1086. WHERE cr.crawl_time = ? AND cs.status = 'failed'
  1087. """, (latest_time,))
  1088. failed_ids = [row[0] for row in cursor.fetchall()]
  1089. return RSSData(
  1090. date=crawl_date,
  1091. crawl_time=latest_time,
  1092. items=items,
  1093. id_to_name=id_to_name,
  1094. failed_ids=failed_ids,
  1095. )
  1096. except Exception as e:
  1097. print(f"[本地存储] 获取最新 RSS 数据失败: {e}")
  1098. return None
  1099. def __del__(self):
  1100. """析构函数,确保关闭连接"""
  1101. self.cleanup()