remote.py 58 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557
  1. # coding=utf-8
  2. """
  3. 远程存储后端(S3 兼容协议)
  4. 支持 Cloudflare R2、阿里云 OSS、腾讯云 COS、AWS S3、MinIO 等
  5. 使用 S3 兼容 API (boto3) 访问对象存储
  6. 数据流程:下载当天 SQLite → 合并新数据 → 上传回远程
  7. """
  8. import pytz
  9. import re
  10. import shutil
  11. import sys
  12. import tempfile
  13. import sqlite3
  14. from datetime import datetime, timedelta
  15. from pathlib import Path
  16. from typing import Dict, List, Optional
  17. try:
  18. import boto3
  19. from botocore.config import Config as BotoConfig
  20. from botocore.exceptions import ClientError
  21. HAS_BOTO3 = True
  22. except ImportError:
  23. HAS_BOTO3 = False
  24. boto3 = None
  25. BotoConfig = None
  26. ClientError = Exception
  27. from trendradar.storage.base import StorageBackend, NewsItem, NewsData, RSSItem, RSSData
  28. from trendradar.utils.time import (
  29. get_configured_time,
  30. format_date_folder,
  31. format_time_filename,
  32. )
  33. from trendradar.utils.url import normalize_url
  34. class RemoteStorageBackend(StorageBackend):
  35. """
  36. 远程云存储后端(S3 兼容协议)
  37. 特点:
  38. - 使用 S3 兼容 API 访问远程存储
  39. - 支持 Cloudflare R2、阿里云 OSS、腾讯云 COS、AWS S3、MinIO 等
  40. - 下载 SQLite 到临时目录进行操作
  41. - 支持数据合并和上传
  42. - 支持从远程拉取历史数据到本地
  43. - 运行结束后自动清理临时文件
  44. """
  45. def __init__(
  46. self,
  47. bucket_name: str,
  48. access_key_id: str,
  49. secret_access_key: str,
  50. endpoint_url: str,
  51. region: str = "",
  52. enable_txt: bool = False, # 远程模式默认不生成 TXT
  53. enable_html: bool = True,
  54. temp_dir: Optional[str] = None,
  55. timezone: str = "Asia/Shanghai",
  56. ):
  57. """
  58. 初始化远程存储后端
  59. Args:
  60. bucket_name: 存储桶名称
  61. access_key_id: 访问密钥 ID
  62. secret_access_key: 访问密钥
  63. endpoint_url: 服务端点 URL
  64. region: 区域(可选,部分服务商需要)
  65. enable_txt: 是否启用 TXT 快照(默认关闭)
  66. enable_html: 是否启用 HTML 报告
  67. temp_dir: 临时目录路径(默认使用系统临时目录)
  68. timezone: 时区配置(默认 Asia/Shanghai)
  69. """
  70. if not HAS_BOTO3:
  71. raise ImportError("远程存储后端需要安装 boto3: pip install boto3")
  72. self.bucket_name = bucket_name
  73. self.endpoint_url = endpoint_url
  74. self.region = region
  75. self.enable_txt = enable_txt
  76. self.enable_html = enable_html
  77. self.timezone = timezone
  78. # 创建临时目录
  79. self.temp_dir = Path(temp_dir) if temp_dir else Path(tempfile.mkdtemp(prefix="trendradar_"))
  80. self.temp_dir.mkdir(parents=True, exist_ok=True)
  81. # 初始化 S3 客户端
  82. # 使用 virtual-hosted style addressing(主流)
  83. # 根据服务商选择签名版本:
  84. # - 腾讯云 COS 使用 SigV2 以避免 chunked encoding 问题
  85. # - 其他服务商(AWS S3、Cloudflare R2、阿里云 OSS、MinIO 等)默认使用 SigV4
  86. is_tencent_cos = "myqcloud.com" in endpoint_url.lower()
  87. signature_version = 's3' if is_tencent_cos else 's3v4'
  88. s3_config = BotoConfig(
  89. s3={"addressing_style": "virtual"},
  90. signature_version=signature_version,
  91. )
  92. client_kwargs = {
  93. "endpoint_url": endpoint_url,
  94. "aws_access_key_id": access_key_id,
  95. "aws_secret_access_key": secret_access_key,
  96. "config": s3_config,
  97. }
  98. if region:
  99. client_kwargs["region_name"] = region
  100. self.s3_client = boto3.client("s3", **client_kwargs)
  101. # 跟踪下载的文件(用于清理)
  102. self._downloaded_files: List[Path] = []
  103. self._db_connections: Dict[str, sqlite3.Connection] = {}
  104. print(f"[远程存储] 初始化完成,存储桶: {bucket_name},签名版本: {signature_version}")
  105. @property
  106. def backend_name(self) -> str:
  107. return "remote"
  108. @property
  109. def supports_txt(self) -> bool:
  110. return self.enable_txt
  111. def _get_configured_time(self) -> datetime:
  112. """获取配置时区的当前时间"""
  113. return get_configured_time(self.timezone)
  114. def _format_date_folder(self, date: Optional[str] = None) -> str:
  115. """格式化日期文件夹名 (ISO 格式: YYYY-MM-DD)"""
  116. return format_date_folder(date, self.timezone)
  117. def _format_time_filename(self) -> str:
  118. """格式化时间文件名 (格式: HH-MM)"""
  119. return format_time_filename(self.timezone)
  120. def _get_remote_db_key(self, date: Optional[str] = None, db_type: str = "news") -> str:
  121. """
  122. 获取远程存储中 SQLite 文件的对象键
  123. Args:
  124. date: 日期字符串
  125. db_type: 数据库类型 ("news" 或 "rss")
  126. Returns:
  127. 远程对象键,如 "news/2025-12-28.db" 或 "rss/2025-12-28.db"
  128. """
  129. date_folder = self._format_date_folder(date)
  130. return f"{db_type}/{date_folder}.db"
  131. def _get_local_db_path(self, date: Optional[str] = None, db_type: str = "news") -> Path:
  132. """
  133. 获取本地临时 SQLite 文件路径
  134. Args:
  135. date: 日期字符串
  136. db_type: 数据库类型 ("news" 或 "rss")
  137. Returns:
  138. 本地临时文件路径
  139. """
  140. date_folder = self._format_date_folder(date)
  141. db_dir = self.temp_dir / db_type
  142. db_dir.mkdir(parents=True, exist_ok=True)
  143. return db_dir / f"{date_folder}.db"
  144. def _check_object_exists(self, r2_key: str) -> bool:
  145. """
  146. 检查远程存储中对象是否存在
  147. Args:
  148. r2_key: 远程对象键
  149. Returns:
  150. 是否存在
  151. """
  152. try:
  153. self.s3_client.head_object(Bucket=self.bucket_name, Key=r2_key)
  154. return True
  155. except ClientError as e:
  156. error_code = e.response.get("Error", {}).get("Code", "")
  157. # S3 兼容存储可能返回 404, NoSuchKey, 或其他变体
  158. if error_code in ("404", "NoSuchKey", "Not Found"):
  159. return False
  160. # 其他错误(如权限问题)也视为不存在,但打印警告
  161. print(f"[远程存储] 检查对象存在性失败 ({r2_key}): {e}")
  162. return False
  163. except Exception as e:
  164. print(f"[远程存储] 检查对象存在性异常 ({r2_key}): {e}")
  165. return False
  166. def _download_sqlite(self, date: Optional[str] = None, db_type: str = "news") -> Optional[Path]:
  167. """
  168. 从远程存储下载当天的 SQLite 文件到本地临时目录
  169. 使用 get_object + iter_chunks 替代 download_file,
  170. 以正确处理腾讯云 COS 的 chunked transfer encoding。
  171. Args:
  172. date: 日期字符串
  173. db_type: 数据库类型 ("news" 或 "rss")
  174. Returns:
  175. 本地文件路径,如果不存在返回 None
  176. """
  177. r2_key = self._get_remote_db_key(date, db_type)
  178. local_path = self._get_local_db_path(date, db_type)
  179. # 确保目录存在
  180. local_path.parent.mkdir(parents=True, exist_ok=True)
  181. # 先检查文件是否存在
  182. if not self._check_object_exists(r2_key):
  183. print(f"[远程存储] 文件不存在,将创建新数据库: {r2_key}")
  184. return None
  185. try:
  186. # 使用 get_object + iter_chunks 替代 download_file
  187. # iter_chunks 会自动处理 chunked transfer encoding
  188. response = self.s3_client.get_object(Bucket=self.bucket_name, Key=r2_key)
  189. with open(local_path, 'wb') as f:
  190. for chunk in response['Body'].iter_chunks(chunk_size=1024*1024):
  191. f.write(chunk)
  192. self._downloaded_files.append(local_path)
  193. print(f"[远程存储] 已下载: {r2_key} -> {local_path}")
  194. return local_path
  195. except ClientError as e:
  196. error_code = e.response.get("Error", {}).get("Code", "")
  197. # S3 兼容存储可能返回不同的错误码
  198. if error_code in ("404", "NoSuchKey", "Not Found"):
  199. print(f"[远程存储] 文件不存在,将创建新数据库: {r2_key}")
  200. return None
  201. else:
  202. print(f"[远程存储] 下载失败 (错误码: {error_code}): {e}")
  203. raise
  204. except Exception as e:
  205. print(f"[远程存储] 下载异常: {e}")
  206. raise
  207. def _upload_sqlite(self, date: Optional[str] = None, db_type: str = "news") -> bool:
  208. """
  209. 上传本地 SQLite 文件到远程存储
  210. Args:
  211. date: 日期字符串
  212. db_type: 数据库类型 ("news" 或 "rss")
  213. Returns:
  214. 是否上传成功
  215. """
  216. local_path = self._get_local_db_path(date, db_type)
  217. r2_key = self._get_remote_db_key(date, db_type)
  218. if not local_path.exists():
  219. print(f"[远程存储] 本地文件不存在,无法上传: {local_path}")
  220. return False
  221. try:
  222. # 获取本地文件大小
  223. local_size = local_path.stat().st_size
  224. print(f"[远程存储] 准备上传: {local_path} ({local_size} bytes) -> {r2_key}")
  225. # 读取文件内容为 bytes 后上传
  226. # 避免传入文件对象时 requests 库使用 chunked transfer encoding
  227. # 腾讯云 COS 等 S3 兼容服务可能无法正确处理 chunked encoding
  228. with open(local_path, 'rb') as f:
  229. file_content = f.read()
  230. # 使用 put_object 并明确设置 ContentLength,确保不使用 chunked encoding
  231. self.s3_client.put_object(
  232. Bucket=self.bucket_name,
  233. Key=r2_key,
  234. Body=file_content,
  235. ContentLength=local_size,
  236. ContentType='application/x-sqlite3',
  237. )
  238. print(f"[远程存储] 已上传: {local_path} -> {r2_key}")
  239. # 验证上传成功
  240. if self._check_object_exists(r2_key):
  241. print(f"[远程存储] 上传验证成功: {r2_key}")
  242. return True
  243. else:
  244. print(f"[远程存储] 上传验证失败: 文件未在远程存储中找到")
  245. return False
  246. except Exception as e:
  247. print(f"[远程存储] 上传失败: {e}")
  248. return False
  249. def _get_connection(self, date: Optional[str] = None, db_type: str = "news") -> sqlite3.Connection:
  250. """
  251. 获取数据库连接
  252. Args:
  253. date: 日期字符串
  254. db_type: 数据库类型 ("news" 或 "rss")
  255. Returns:
  256. 数据库连接
  257. """
  258. local_path = self._get_local_db_path(date, db_type)
  259. db_path = str(local_path)
  260. if db_path not in self._db_connections:
  261. # 确保目录存在
  262. local_path.parent.mkdir(parents=True, exist_ok=True)
  263. # 如果本地不存在,尝试从远程存储下载
  264. if not local_path.exists():
  265. self._download_sqlite(date, db_type)
  266. conn = sqlite3.connect(db_path)
  267. conn.row_factory = sqlite3.Row
  268. self._init_tables(conn, db_type)
  269. self._db_connections[db_path] = conn
  270. return self._db_connections[db_path]
  271. def _get_schema_path(self, db_type: str = "news") -> Path:
  272. """
  273. 获取 schema.sql 文件路径
  274. Args:
  275. db_type: 数据库类型 ("news" 或 "rss")
  276. Returns:
  277. schema 文件路径
  278. """
  279. if db_type == "rss":
  280. return Path(__file__).parent / "rss_schema.sql"
  281. return Path(__file__).parent / "schema.sql"
  282. def _init_tables(self, conn: sqlite3.Connection, db_type: str = "news") -> None:
  283. """
  284. 从 schema.sql 初始化数据库表结构
  285. Args:
  286. conn: 数据库连接
  287. db_type: 数据库类型 ("news" 或 "rss")
  288. """
  289. schema_path = self._get_schema_path(db_type)
  290. if schema_path.exists():
  291. with open(schema_path, "r", encoding="utf-8") as f:
  292. schema_sql = f.read()
  293. conn.executescript(schema_sql)
  294. else:
  295. raise FileNotFoundError(f"Schema file not found: {schema_path}")
  296. conn.commit()
  297. def save_news_data(self, data: NewsData) -> bool:
  298. """
  299. 保存新闻数据到远程存储(以 URL 为唯一标识,支持标题更新检测)
  300. 流程:下载现有数据库 → 插入/更新数据 → 上传回远程存储
  301. Args:
  302. data: 新闻数据
  303. Returns:
  304. 是否保存成功
  305. """
  306. try:
  307. conn = self._get_connection(data.date)
  308. cursor = conn.cursor()
  309. # 查询已有记录数
  310. cursor.execute("SELECT COUNT(*) as count FROM news_items")
  311. row = cursor.fetchone()
  312. existing_count = row[0] if row else 0
  313. if existing_count > 0:
  314. print(f"[远程存储] 已有 {existing_count} 条历史记录,将合并新数据")
  315. # 获取配置时区的当前时间
  316. now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
  317. # 首先同步平台信息到 platforms 表
  318. for source_id, source_name in data.id_to_name.items():
  319. cursor.execute("""
  320. INSERT INTO platforms (id, name, updated_at)
  321. VALUES (?, ?, ?)
  322. ON CONFLICT(id) DO UPDATE SET
  323. name = excluded.name,
  324. updated_at = excluded.updated_at
  325. """, (source_id, source_name, now_str))
  326. # 统计计数器
  327. new_count = 0
  328. updated_count = 0
  329. title_changed_count = 0
  330. success_sources = []
  331. for source_id, news_list in data.items.items():
  332. success_sources.append(source_id)
  333. for item in news_list:
  334. try:
  335. # 标准化 URL(去除动态参数,如微博的 band_rank)
  336. normalized_url = normalize_url(item.url, source_id) if item.url else ""
  337. # 检查是否已存在(通过标准化 URL + platform_id)
  338. if normalized_url:
  339. cursor.execute("""
  340. SELECT id, title FROM news_items
  341. WHERE url = ? AND platform_id = ?
  342. """, (normalized_url, source_id))
  343. existing = cursor.fetchone()
  344. if existing:
  345. # 已存在,更新记录
  346. existing_id, existing_title = existing
  347. # 检查标题是否变化
  348. if existing_title != item.title:
  349. # 记录标题变更
  350. cursor.execute("""
  351. INSERT INTO title_changes
  352. (news_item_id, old_title, new_title, changed_at)
  353. VALUES (?, ?, ?, ?)
  354. """, (existing_id, existing_title, item.title, now_str))
  355. title_changed_count += 1
  356. # 记录排名历史
  357. cursor.execute("""
  358. INSERT INTO rank_history
  359. (news_item_id, rank, crawl_time, created_at)
  360. VALUES (?, ?, ?, ?)
  361. """, (existing_id, item.rank, data.crawl_time, now_str))
  362. # 更新现有记录
  363. cursor.execute("""
  364. UPDATE news_items SET
  365. title = ?,
  366. rank = ?,
  367. mobile_url = ?,
  368. last_crawl_time = ?,
  369. crawl_count = crawl_count + 1,
  370. updated_at = ?
  371. WHERE id = ?
  372. """, (item.title, item.rank, item.mobile_url,
  373. data.crawl_time, now_str, existing_id))
  374. updated_count += 1
  375. else:
  376. # 不存在,插入新记录(存储标准化后的 URL)
  377. cursor.execute("""
  378. INSERT INTO news_items
  379. (title, platform_id, rank, url, mobile_url,
  380. first_crawl_time, last_crawl_time, crawl_count,
  381. created_at, updated_at)
  382. VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?)
  383. """, (item.title, source_id, item.rank, normalized_url,
  384. item.mobile_url, data.crawl_time, data.crawl_time,
  385. now_str, now_str))
  386. new_id = cursor.lastrowid
  387. # 记录初始排名
  388. cursor.execute("""
  389. INSERT INTO rank_history
  390. (news_item_id, rank, crawl_time, created_at)
  391. VALUES (?, ?, ?, ?)
  392. """, (new_id, item.rank, data.crawl_time, now_str))
  393. new_count += 1
  394. else:
  395. # URL 为空的情况,直接插入(不做去重)
  396. cursor.execute("""
  397. INSERT INTO news_items
  398. (title, platform_id, rank, url, mobile_url,
  399. first_crawl_time, last_crawl_time, crawl_count,
  400. created_at, updated_at)
  401. VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?)
  402. """, (item.title, source_id, item.rank, "",
  403. item.mobile_url, data.crawl_time, data.crawl_time,
  404. now_str, now_str))
  405. new_id = cursor.lastrowid
  406. # 记录初始排名
  407. cursor.execute("""
  408. INSERT INTO rank_history
  409. (news_item_id, rank, crawl_time, created_at)
  410. VALUES (?, ?, ?, ?)
  411. """, (new_id, item.rank, data.crawl_time, now_str))
  412. new_count += 1
  413. except sqlite3.Error as e:
  414. print(f"[远程存储] 保存新闻条目失败 [{item.title[:30]}...]: {e}")
  415. total_items = new_count + updated_count
  416. # 记录抓取信息
  417. cursor.execute("""
  418. INSERT OR REPLACE INTO crawl_records
  419. (crawl_time, total_items, created_at)
  420. VALUES (?, ?, ?)
  421. """, (data.crawl_time, total_items, now_str))
  422. # 获取刚插入的 crawl_record 的 ID
  423. cursor.execute("""
  424. SELECT id FROM crawl_records WHERE crawl_time = ?
  425. """, (data.crawl_time,))
  426. record_row = cursor.fetchone()
  427. if record_row:
  428. crawl_record_id = record_row[0]
  429. # 记录成功的来源
  430. for source_id in success_sources:
  431. cursor.execute("""
  432. INSERT OR REPLACE INTO crawl_source_status
  433. (crawl_record_id, platform_id, status)
  434. VALUES (?, ?, 'success')
  435. """, (crawl_record_id, source_id))
  436. # 记录失败的来源
  437. for failed_id in data.failed_ids:
  438. # 确保失败的平台也在 platforms 表中
  439. cursor.execute("""
  440. INSERT OR IGNORE INTO platforms (id, name, updated_at)
  441. VALUES (?, ?, ?)
  442. """, (failed_id, failed_id, now_str))
  443. cursor.execute("""
  444. INSERT OR REPLACE INTO crawl_source_status
  445. (crawl_record_id, platform_id, status)
  446. VALUES (?, ?, 'failed')
  447. """, (crawl_record_id, failed_id))
  448. conn.commit()
  449. # 查询合并后的总记录数
  450. cursor.execute("SELECT COUNT(*) as count FROM news_items")
  451. row = cursor.fetchone()
  452. final_count = row[0] if row else 0
  453. # 输出详细的存储统计日志
  454. log_parts = [f"[远程存储] 处理完成:新增 {new_count} 条"]
  455. if updated_count > 0:
  456. log_parts.append(f"更新 {updated_count} 条")
  457. if title_changed_count > 0:
  458. log_parts.append(f"标题变更 {title_changed_count} 条")
  459. log_parts.append(f"(去重后总计: {final_count} 条)")
  460. print(",".join(log_parts))
  461. # 上传到远程存储
  462. if self._upload_sqlite(data.date):
  463. print(f"[远程存储] 数据已同步到远程存储")
  464. return True
  465. else:
  466. print(f"[远程存储] 上传远程存储失败")
  467. return False
  468. except Exception as e:
  469. print(f"[远程存储] 保存失败: {e}")
  470. return False
  471. def get_today_all_data(self, date: Optional[str] = None) -> Optional[NewsData]:
  472. """获取指定日期的所有新闻数据(合并后)"""
  473. try:
  474. conn = self._get_connection(date)
  475. cursor = conn.cursor()
  476. # 获取所有新闻数据(包含 id 用于查询排名历史)
  477. cursor.execute("""
  478. SELECT n.id, n.title, n.platform_id, p.name as platform_name,
  479. n.rank, n.url, n.mobile_url,
  480. n.first_crawl_time, n.last_crawl_time, n.crawl_count
  481. FROM news_items n
  482. LEFT JOIN platforms p ON n.platform_id = p.id
  483. ORDER BY n.platform_id, n.last_crawl_time
  484. """)
  485. rows = cursor.fetchall()
  486. if not rows:
  487. return None
  488. # 收集所有 news_item_id
  489. news_ids = [row[0] for row in rows]
  490. # 批量查询排名历史
  491. rank_history_map: Dict[int, List[int]] = {}
  492. if news_ids:
  493. placeholders = ",".join("?" * len(news_ids))
  494. cursor.execute(f"""
  495. SELECT news_item_id, rank FROM rank_history
  496. WHERE news_item_id IN ({placeholders})
  497. ORDER BY news_item_id, crawl_time
  498. """, news_ids)
  499. for rh_row in cursor.fetchall():
  500. news_id, rank = rh_row[0], rh_row[1]
  501. if news_id not in rank_history_map:
  502. rank_history_map[news_id] = []
  503. if rank not in rank_history_map[news_id]:
  504. rank_history_map[news_id].append(rank)
  505. # 按 platform_id 分组
  506. items: Dict[str, List[NewsItem]] = {}
  507. id_to_name: Dict[str, str] = {}
  508. crawl_date = self._format_date_folder(date)
  509. for row in rows:
  510. news_id = row[0]
  511. platform_id = row[2]
  512. title = row[1]
  513. platform_name = row[3] or platform_id
  514. id_to_name[platform_id] = platform_name
  515. if platform_id not in items:
  516. items[platform_id] = []
  517. # 获取排名历史,如果没有则使用当前排名
  518. ranks = rank_history_map.get(news_id, [row[4]])
  519. items[platform_id].append(NewsItem(
  520. title=title,
  521. source_id=platform_id,
  522. source_name=platform_name,
  523. rank=row[4],
  524. url=row[5] or "",
  525. mobile_url=row[6] or "",
  526. crawl_time=row[8], # last_crawl_time
  527. ranks=ranks,
  528. first_time=row[7], # first_crawl_time
  529. last_time=row[8], # last_crawl_time
  530. count=row[9], # crawl_count
  531. ))
  532. final_items = items
  533. # 获取失败的来源
  534. cursor.execute("""
  535. SELECT DISTINCT css.platform_id
  536. FROM crawl_source_status css
  537. JOIN crawl_records cr ON css.crawl_record_id = cr.id
  538. WHERE css.status = 'failed'
  539. """)
  540. failed_ids = [row[0] for row in cursor.fetchall()]
  541. # 获取最新的抓取时间
  542. cursor.execute("""
  543. SELECT crawl_time FROM crawl_records
  544. ORDER BY crawl_time DESC
  545. LIMIT 1
  546. """)
  547. time_row = cursor.fetchone()
  548. crawl_time = time_row[0] if time_row else self._format_time_filename()
  549. return NewsData(
  550. date=crawl_date,
  551. crawl_time=crawl_time,
  552. items=final_items,
  553. id_to_name=id_to_name,
  554. failed_ids=failed_ids,
  555. )
  556. except Exception as e:
  557. print(f"[远程存储] 读取数据失败: {e}")
  558. return None
  559. def get_latest_crawl_data(self, date: Optional[str] = None) -> Optional[NewsData]:
  560. """获取最新一次抓取的数据"""
  561. try:
  562. conn = self._get_connection(date)
  563. cursor = conn.cursor()
  564. # 获取最新的抓取时间
  565. cursor.execute("""
  566. SELECT crawl_time FROM crawl_records
  567. ORDER BY crawl_time DESC
  568. LIMIT 1
  569. """)
  570. time_row = cursor.fetchone()
  571. if not time_row:
  572. return None
  573. latest_time = time_row[0]
  574. # 获取该时间的新闻数据,通过 JOIN 获取平台名称
  575. cursor.execute("""
  576. SELECT n.title, n.platform_id, p.name as platform_name,
  577. n.rank, n.url, n.mobile_url,
  578. n.first_crawl_time, n.last_crawl_time, n.crawl_count
  579. FROM news_items n
  580. LEFT JOIN platforms p ON n.platform_id = p.id
  581. WHERE n.last_crawl_time = ?
  582. """, (latest_time,))
  583. rows = cursor.fetchall()
  584. if not rows:
  585. return None
  586. items: Dict[str, List[NewsItem]] = {}
  587. id_to_name: Dict[str, str] = {}
  588. crawl_date = self._format_date_folder(date)
  589. for row in rows:
  590. platform_id = row[1]
  591. platform_name = row[2] or platform_id
  592. id_to_name[platform_id] = platform_name
  593. if platform_id not in items:
  594. items[platform_id] = []
  595. items[platform_id].append(NewsItem(
  596. title=row[0],
  597. source_id=platform_id,
  598. source_name=platform_name,
  599. rank=row[3],
  600. url=row[4] or "",
  601. mobile_url=row[5] or "",
  602. crawl_time=row[7], # last_crawl_time
  603. ranks=[row[3]],
  604. first_time=row[6], # first_crawl_time
  605. last_time=row[7], # last_crawl_time
  606. count=row[8], # crawl_count
  607. ))
  608. # 获取失败的来源(针对最新一次抓取)
  609. cursor.execute("""
  610. SELECT css.platform_id
  611. FROM crawl_source_status css
  612. JOIN crawl_records cr ON css.crawl_record_id = cr.id
  613. WHERE cr.crawl_time = ? AND css.status = 'failed'
  614. """, (latest_time,))
  615. failed_ids = [row[0] for row in cursor.fetchall()]
  616. return NewsData(
  617. date=crawl_date,
  618. crawl_time=latest_time,
  619. items=items,
  620. id_to_name=id_to_name,
  621. failed_ids=failed_ids,
  622. )
  623. except Exception as e:
  624. print(f"[远程存储] 获取最新数据失败: {e}")
  625. return None
  626. def detect_new_titles(self, current_data: NewsData) -> Dict[str, Dict]:
  627. """
  628. 检测新增的标题
  629. 该方法比较当前抓取数据与历史数据,找出新增的标题。
  630. 关键逻辑:只有在历史批次中从未出现过的标题才算新增。
  631. """
  632. try:
  633. historical_data = self.get_today_all_data(current_data.date)
  634. if not historical_data:
  635. new_titles = {}
  636. for source_id, news_list in current_data.items.items():
  637. new_titles[source_id] = {item.title: item for item in news_list}
  638. return new_titles
  639. # 获取当前批次时间
  640. current_time = current_data.crawl_time
  641. # 收集历史标题(first_time < current_time 的标题)
  642. # 这样可以正确处理同一标题因 URL 变化而产生多条记录的情况
  643. historical_titles: Dict[str, set] = {}
  644. for source_id, news_list in historical_data.items.items():
  645. historical_titles[source_id] = set()
  646. for item in news_list:
  647. first_time = getattr(item, 'first_time', item.crawl_time)
  648. if first_time < current_time:
  649. historical_titles[source_id].add(item.title)
  650. # 检查是否有历史数据
  651. has_historical_data = any(len(titles) > 0 for titles in historical_titles.values())
  652. if not has_historical_data:
  653. # 第一次抓取,没有"新增"概念
  654. return {}
  655. new_titles = {}
  656. for source_id, news_list in current_data.items.items():
  657. hist_set = historical_titles.get(source_id, set())
  658. for item in news_list:
  659. if item.title not in hist_set:
  660. if source_id not in new_titles:
  661. new_titles[source_id] = {}
  662. new_titles[source_id][item.title] = item
  663. return new_titles
  664. except Exception as e:
  665. print(f"[远程存储] 检测新标题失败: {e}")
  666. return {}
  667. def save_txt_snapshot(self, data: NewsData) -> Optional[str]:
  668. """保存 TXT 快照(远程存储模式下默认不支持)"""
  669. if not self.enable_txt:
  670. return None
  671. # 如果启用,保存到本地临时目录
  672. try:
  673. date_folder = self._format_date_folder(data.date)
  674. txt_dir = self.temp_dir / date_folder / "txt"
  675. txt_dir.mkdir(parents=True, exist_ok=True)
  676. file_path = txt_dir / f"{data.crawl_time}.txt"
  677. with open(file_path, "w", encoding="utf-8") as f:
  678. for source_id, news_list in data.items.items():
  679. source_name = data.id_to_name.get(source_id, source_id)
  680. if source_name and source_name != source_id:
  681. f.write(f"{source_id} | {source_name}\n")
  682. else:
  683. f.write(f"{source_id}\n")
  684. sorted_news = sorted(news_list, key=lambda x: x.rank)
  685. for item in sorted_news:
  686. line = f"{item.rank}. {item.title}"
  687. if item.url:
  688. line += f" [URL:{item.url}]"
  689. if item.mobile_url:
  690. line += f" [MOBILE:{item.mobile_url}]"
  691. f.write(line + "\n")
  692. f.write("\n")
  693. if data.failed_ids:
  694. f.write("==== 以下ID请求失败 ====\n")
  695. for failed_id in data.failed_ids:
  696. f.write(f"{failed_id}\n")
  697. print(f"[远程存储] TXT 快照已保存: {file_path}")
  698. return str(file_path)
  699. except Exception as e:
  700. print(f"[远程存储] 保存 TXT 快照失败: {e}")
  701. return None
  702. def save_html_report(self, html_content: str, filename: str, is_summary: bool = False) -> Optional[str]:
  703. """保存 HTML 报告到临时目录"""
  704. if not self.enable_html:
  705. return None
  706. try:
  707. date_folder = self._format_date_folder()
  708. html_dir = self.temp_dir / date_folder / "html"
  709. html_dir.mkdir(parents=True, exist_ok=True)
  710. file_path = html_dir / filename
  711. with open(file_path, "w", encoding="utf-8") as f:
  712. f.write(html_content)
  713. print(f"[远程存储] HTML 报告已保存: {file_path}")
  714. return str(file_path)
  715. except Exception as e:
  716. print(f"[远程存储] 保存 HTML 报告失败: {e}")
  717. return None
  718. def is_first_crawl_today(self, date: Optional[str] = None) -> bool:
  719. """检查是否是当天第一次抓取"""
  720. try:
  721. conn = self._get_connection(date)
  722. cursor = conn.cursor()
  723. cursor.execute("""
  724. SELECT COUNT(*) as count FROM crawl_records
  725. """)
  726. row = cursor.fetchone()
  727. count = row[0] if row else 0
  728. return count <= 1
  729. except Exception as e:
  730. print(f"[远程存储] 检查首次抓取失败: {e}")
  731. return True
  732. def cleanup(self) -> None:
  733. """清理资源(关闭连接和删除临时文件)"""
  734. # 检查 Python 是否正在关闭
  735. if sys.meta_path is None:
  736. return
  737. # 关闭数据库连接
  738. db_connections = getattr(self, "_db_connections", {})
  739. for db_path, conn in list(db_connections.items()):
  740. try:
  741. conn.close()
  742. print(f"[远程存储] 关闭数据库连接: {db_path}")
  743. except Exception as e:
  744. print(f"[远程存储] 关闭连接失败 {db_path}: {e}")
  745. if db_connections:
  746. db_connections.clear()
  747. # 删除临时目录
  748. temp_dir = getattr(self, "temp_dir", None)
  749. if temp_dir:
  750. try:
  751. if temp_dir.exists():
  752. shutil.rmtree(temp_dir)
  753. print(f"[远程存储] 临时目录已清理: {temp_dir}")
  754. except Exception as e:
  755. # 忽略 Python 关闭时的错误
  756. if sys.meta_path is not None:
  757. print(f"[远程存储] 清理临时目录失败: {e}")
  758. downloaded_files = getattr(self, "_downloaded_files", None)
  759. if downloaded_files:
  760. downloaded_files.clear()
  761. def cleanup_old_data(self, retention_days: int) -> int:
  762. """
  763. 清理远程存储上的过期数据
  764. Args:
  765. retention_days: 保留天数(0 表示不清理)
  766. Returns:
  767. 删除的数据库文件数量
  768. """
  769. if retention_days <= 0:
  770. return 0
  771. deleted_count = 0
  772. cutoff_date = self._get_configured_time() - timedelta(days=retention_days)
  773. try:
  774. # 列出远程存储中 news/ 前缀下的所有对象
  775. paginator = self.s3_client.get_paginator('list_objects_v2')
  776. pages = paginator.paginate(Bucket=self.bucket_name, Prefix="news/")
  777. # 收集需要删除的对象键
  778. objects_to_delete = []
  779. deleted_dates = set()
  780. for page in pages:
  781. if 'Contents' not in page:
  782. continue
  783. for obj in page['Contents']:
  784. key = obj['Key']
  785. # 解析日期(格式: news/YYYY-MM-DD.db 或 news/YYYY年MM月DD日.db)
  786. folder_date = None
  787. try:
  788. # ISO 格式: news/YYYY-MM-DD.db
  789. date_match = re.match(r'news/(\d{4})-(\d{2})-(\d{2})\.db$', key)
  790. if date_match:
  791. folder_date = datetime(
  792. int(date_match.group(1)),
  793. int(date_match.group(2)),
  794. int(date_match.group(3)),
  795. tzinfo=pytz.timezone("Asia/Shanghai")
  796. )
  797. date_str = f"{date_match.group(1)}-{date_match.group(2)}-{date_match.group(3)}"
  798. else:
  799. # 旧中文格式: news/YYYY年MM月DD日.db
  800. date_match = re.match(r'news/(\d{4})年(\d{2})月(\d{2})日\.db$', key)
  801. if date_match:
  802. folder_date = datetime(
  803. int(date_match.group(1)),
  804. int(date_match.group(2)),
  805. int(date_match.group(3)),
  806. tzinfo=pytz.timezone("Asia/Shanghai")
  807. )
  808. date_str = f"{date_match.group(1)}年{date_match.group(2)}月{date_match.group(3)}日"
  809. except Exception:
  810. continue
  811. if folder_date and folder_date < cutoff_date:
  812. objects_to_delete.append({'Key': key})
  813. deleted_dates.add(date_str)
  814. # 批量删除对象(每次最多 1000 个)
  815. if objects_to_delete:
  816. batch_size = 1000
  817. for i in range(0, len(objects_to_delete), batch_size):
  818. batch = objects_to_delete[i:i + batch_size]
  819. try:
  820. self.s3_client.delete_objects(
  821. Bucket=self.bucket_name,
  822. Delete={'Objects': batch}
  823. )
  824. print(f"[远程存储] 删除 {len(batch)} 个对象")
  825. except Exception as e:
  826. print(f"[远程存储] 批量删除失败: {e}")
  827. deleted_count = len(deleted_dates)
  828. for date_str in sorted(deleted_dates):
  829. print(f"[远程存储] 清理过期数据: news/{date_str}.db")
  830. print(f"[远程存储] 共清理 {deleted_count} 个过期日期数据库文件")
  831. return deleted_count
  832. except Exception as e:
  833. print(f"[远程存储] 清理过期数据失败: {e}")
  834. return deleted_count
  835. def has_pushed_today(self, date: Optional[str] = None) -> bool:
  836. """
  837. 检查指定日期是否已推送过
  838. Args:
  839. date: 日期字符串(YYYY-MM-DD),默认为今天
  840. Returns:
  841. 是否已推送
  842. """
  843. try:
  844. conn = self._get_connection(date)
  845. cursor = conn.cursor()
  846. target_date = self._format_date_folder(date)
  847. cursor.execute("""
  848. SELECT pushed FROM push_records WHERE date = ?
  849. """, (target_date,))
  850. row = cursor.fetchone()
  851. if row:
  852. return bool(row[0])
  853. return False
  854. except Exception as e:
  855. print(f"[远程存储] 检查推送记录失败: {e}")
  856. return False
  857. def record_push(self, report_type: str, date: Optional[str] = None) -> bool:
  858. """
  859. 记录推送
  860. Args:
  861. report_type: 报告类型
  862. date: 日期字符串(YYYY-MM-DD),默认为今天
  863. Returns:
  864. 是否记录成功
  865. """
  866. try:
  867. conn = self._get_connection(date)
  868. cursor = conn.cursor()
  869. target_date = self._format_date_folder(date)
  870. now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
  871. cursor.execute("""
  872. INSERT INTO push_records (date, pushed, push_time, report_type, created_at)
  873. VALUES (?, 1, ?, ?, ?)
  874. ON CONFLICT(date) DO UPDATE SET
  875. pushed = 1,
  876. push_time = excluded.push_time,
  877. report_type = excluded.report_type
  878. """, (target_date, now_str, report_type, now_str))
  879. conn.commit()
  880. print(f"[远程存储] 推送记录已保存: {report_type} at {now_str}")
  881. # 上传到远程存储 确保记录持久化
  882. if self._upload_sqlite(date):
  883. print(f"[远程存储] 推送记录已同步到远程存储")
  884. return True
  885. else:
  886. print(f"[远程存储] 推送记录同步到远程存储失败")
  887. return False
  888. except Exception as e:
  889. print(f"[远程存储] 记录推送失败: {e}")
  890. return False
  891. # ========================================
  892. # RSS 数据存储方法
  893. # ========================================
  894. def save_rss_data(self, data: RSSData) -> bool:
  895. """
  896. 保存 RSS 数据到远程存储(以 URL 为唯一标识)
  897. 流程:下载现有数据库 → 插入/更新数据 → 上传回远程存储
  898. Args:
  899. data: RSS 数据
  900. Returns:
  901. 是否保存成功
  902. """
  903. try:
  904. conn = self._get_connection(data.date, db_type="rss")
  905. cursor = conn.cursor()
  906. now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
  907. # 同步 RSS 源信息到 rss_feeds 表
  908. for feed_id, feed_name in data.id_to_name.items():
  909. cursor.execute("""
  910. INSERT INTO rss_feeds (id, name, updated_at)
  911. VALUES (?, ?, ?)
  912. ON CONFLICT(id) DO UPDATE SET
  913. name = excluded.name,
  914. updated_at = excluded.updated_at
  915. """, (feed_id, feed_name, now_str))
  916. # 统计计数器
  917. new_count = 0
  918. updated_count = 0
  919. for feed_id, rss_list in data.items.items():
  920. for item in rss_list:
  921. try:
  922. # 检查是否已存在(通过 URL + feed_id)
  923. if item.url:
  924. cursor.execute("""
  925. SELECT id, title FROM rss_items
  926. WHERE url = ? AND feed_id = ?
  927. """, (item.url, feed_id))
  928. existing = cursor.fetchone()
  929. if existing:
  930. # 已存在,更新记录
  931. existing_id = existing[0]
  932. cursor.execute("""
  933. UPDATE rss_items SET
  934. title = ?,
  935. published_at = ?,
  936. summary = ?,
  937. author = ?,
  938. last_crawl_time = ?,
  939. crawl_count = crawl_count + 1,
  940. updated_at = ?
  941. WHERE id = ?
  942. """, (item.title, item.published_at, item.summary,
  943. item.author, data.crawl_time, now_str, existing_id))
  944. updated_count += 1
  945. else:
  946. # 不存在,插入新记录
  947. cursor.execute("""
  948. INSERT INTO rss_items
  949. (title, feed_id, url, published_at, summary, author,
  950. first_crawl_time, last_crawl_time, crawl_count,
  951. created_at, updated_at)
  952. VALUES (?, ?, ?, ?, ?, ?, ?, ?, 1, ?, ?)
  953. """, (item.title, feed_id, item.url, item.published_at,
  954. item.summary, item.author, data.crawl_time,
  955. data.crawl_time, now_str, now_str))
  956. new_count += 1
  957. else:
  958. # URL 为空,直接插入
  959. cursor.execute("""
  960. INSERT INTO rss_items
  961. (title, feed_id, url, published_at, summary, author,
  962. first_crawl_time, last_crawl_time, crawl_count,
  963. created_at, updated_at)
  964. VALUES (?, ?, ?, ?, ?, ?, ?, ?, 1, ?, ?)
  965. """, (item.title, feed_id, "", item.published_at,
  966. item.summary, item.author, data.crawl_time,
  967. data.crawl_time, now_str, now_str))
  968. new_count += 1
  969. except sqlite3.Error as e:
  970. print(f"[远程存储] 保存 RSS 条目失败 [{item.title[:30]}...]: {e}")
  971. total_items = new_count + updated_count
  972. # 记录抓取信息
  973. cursor.execute("""
  974. INSERT OR REPLACE INTO rss_crawl_records
  975. (crawl_time, total_items, created_at)
  976. VALUES (?, ?, ?)
  977. """, (data.crawl_time, total_items, now_str))
  978. # 记录抓取状态
  979. cursor.execute("""
  980. SELECT id FROM rss_crawl_records WHERE crawl_time = ?
  981. """, (data.crawl_time,))
  982. record_row = cursor.fetchone()
  983. if record_row:
  984. crawl_record_id = record_row[0]
  985. # 记录成功的源
  986. for feed_id in data.items.keys():
  987. cursor.execute("""
  988. INSERT OR REPLACE INTO rss_crawl_status
  989. (crawl_record_id, feed_id, status)
  990. VALUES (?, ?, 'success')
  991. """, (crawl_record_id, feed_id))
  992. # 记录失败的源
  993. for failed_id in data.failed_ids:
  994. cursor.execute("""
  995. INSERT OR IGNORE INTO rss_feeds (id, name, updated_at)
  996. VALUES (?, ?, ?)
  997. """, (failed_id, failed_id, now_str))
  998. cursor.execute("""
  999. INSERT OR REPLACE INTO rss_crawl_status
  1000. (crawl_record_id, feed_id, status)
  1001. VALUES (?, ?, 'failed')
  1002. """, (crawl_record_id, failed_id))
  1003. conn.commit()
  1004. # 输出统计日志
  1005. log_parts = [f"[远程存储] RSS 处理完成:新增 {new_count} 条"]
  1006. if updated_count > 0:
  1007. log_parts.append(f"更新 {updated_count} 条")
  1008. print(",".join(log_parts))
  1009. # 上传到远程存储
  1010. if self._upload_sqlite(data.date, db_type="rss"):
  1011. print(f"[远程存储] RSS 数据已同步到远程存储")
  1012. return True
  1013. else:
  1014. print(f"[远程存储] RSS 上传远程存储失败")
  1015. return False
  1016. except Exception as e:
  1017. print(f"[远程存储] 保存 RSS 数据失败: {e}")
  1018. return False
  1019. def get_rss_data(self, date: Optional[str] = None) -> Optional[RSSData]:
  1020. """
  1021. 获取指定日期的所有 RSS 数据
  1022. Args:
  1023. date: 日期字符串(YYYY-MM-DD),默认为今天
  1024. Returns:
  1025. RSSData 对象,如果没有数据返回 None
  1026. """
  1027. try:
  1028. conn = self._get_connection(date, db_type="rss")
  1029. cursor = conn.cursor()
  1030. # 获取所有 RSS 数据
  1031. cursor.execute("""
  1032. SELECT i.id, i.title, i.feed_id, f.name as feed_name,
  1033. i.url, i.published_at, i.summary, i.author,
  1034. i.first_crawl_time, i.last_crawl_time, i.crawl_count
  1035. FROM rss_items i
  1036. LEFT JOIN rss_feeds f ON i.feed_id = f.id
  1037. ORDER BY i.published_at DESC
  1038. """)
  1039. rows = cursor.fetchall()
  1040. if not rows:
  1041. return None
  1042. items: Dict[str, List[RSSItem]] = {}
  1043. id_to_name: Dict[str, str] = {}
  1044. crawl_date = self._format_date_folder(date)
  1045. for row in rows:
  1046. feed_id = row[2]
  1047. feed_name = row[3] or feed_id
  1048. id_to_name[feed_id] = feed_name
  1049. if feed_id not in items:
  1050. items[feed_id] = []
  1051. items[feed_id].append(RSSItem(
  1052. title=row[1],
  1053. feed_id=feed_id,
  1054. feed_name=feed_name,
  1055. url=row[4] or "",
  1056. published_at=row[5] or "",
  1057. summary=row[6] or "",
  1058. author=row[7] or "",
  1059. crawl_time=row[9],
  1060. first_time=row[8],
  1061. last_time=row[9],
  1062. count=row[10],
  1063. ))
  1064. # 获取最新的抓取时间
  1065. cursor.execute("""
  1066. SELECT crawl_time FROM rss_crawl_records
  1067. ORDER BY crawl_time DESC
  1068. LIMIT 1
  1069. """)
  1070. time_row = cursor.fetchone()
  1071. crawl_time = time_row[0] if time_row else self._format_time_filename()
  1072. # 获取失败的源
  1073. cursor.execute("""
  1074. SELECT DISTINCT cs.feed_id
  1075. FROM rss_crawl_status cs
  1076. JOIN rss_crawl_records cr ON cs.crawl_record_id = cr.id
  1077. WHERE cs.status = 'failed'
  1078. """)
  1079. failed_ids = [row[0] for row in cursor.fetchall()]
  1080. return RSSData(
  1081. date=crawl_date,
  1082. crawl_time=crawl_time,
  1083. items=items,
  1084. id_to_name=id_to_name,
  1085. failed_ids=failed_ids,
  1086. )
  1087. except Exception as e:
  1088. print(f"[远程存储] 读取 RSS 数据失败: {e}")
  1089. return None
  1090. def detect_new_rss_items(self, current_data: RSSData) -> Dict[str, List[RSSItem]]:
  1091. """
  1092. 检测新增的 RSS 条目(增量模式)
  1093. 该方法比较当前抓取数据与历史数据,找出新增的 RSS 条目。
  1094. 关键逻辑:只有在历史批次中从未出现过的 URL 才算新增。
  1095. Args:
  1096. current_data: 当前抓取的 RSS 数据
  1097. Returns:
  1098. 新增的 RSS 条目 {feed_id: [RSSItem, ...]}
  1099. """
  1100. try:
  1101. # 获取历史数据
  1102. historical_data = self.get_rss_data(current_data.date)
  1103. if not historical_data:
  1104. # 没有历史数据,所有都是新的
  1105. return current_data.items.copy()
  1106. # 获取当前批次时间
  1107. current_time = current_data.crawl_time
  1108. # 收集历史 URL(first_time < current_time 的条目)
  1109. historical_urls: Dict[str, set] = {}
  1110. for feed_id, rss_list in historical_data.items.items():
  1111. historical_urls[feed_id] = set()
  1112. for item in rss_list:
  1113. first_time = getattr(item, 'first_time', item.crawl_time)
  1114. if first_time < current_time:
  1115. if item.url:
  1116. historical_urls[feed_id].add(item.url)
  1117. # 检查是否有历史数据
  1118. has_historical_data = any(len(urls) > 0 for urls in historical_urls.values())
  1119. if not has_historical_data:
  1120. # 第一次抓取,没有"新增"概念
  1121. return {}
  1122. # 检测新增
  1123. new_items: Dict[str, List[RSSItem]] = {}
  1124. for feed_id, rss_list in current_data.items.items():
  1125. hist_set = historical_urls.get(feed_id, set())
  1126. for item in rss_list:
  1127. # 通过 URL 判断是否新增
  1128. if item.url and item.url not in hist_set:
  1129. if feed_id not in new_items:
  1130. new_items[feed_id] = []
  1131. new_items[feed_id].append(item)
  1132. return new_items
  1133. except Exception as e:
  1134. print(f"[远程存储] 检测新 RSS 条目失败: {e}")
  1135. return {}
  1136. def get_latest_rss_data(self, date: Optional[str] = None) -> Optional[RSSData]:
  1137. """
  1138. 获取最新一次抓取的 RSS 数据(当前榜单模式)
  1139. Args:
  1140. date: 日期字符串(YYYY-MM-DD),默认为今天
  1141. Returns:
  1142. 最新抓取的 RSS 数据,如果没有数据返回 None
  1143. """
  1144. try:
  1145. conn = self._get_connection(date, db_type="rss")
  1146. cursor = conn.cursor()
  1147. # 获取最新的抓取时间
  1148. cursor.execute("""
  1149. SELECT crawl_time FROM rss_crawl_records
  1150. ORDER BY crawl_time DESC
  1151. LIMIT 1
  1152. """)
  1153. time_row = cursor.fetchone()
  1154. if not time_row:
  1155. return None
  1156. latest_time = time_row[0]
  1157. # 获取该时间的 RSS 数据
  1158. cursor.execute("""
  1159. SELECT i.id, i.title, i.feed_id, f.name as feed_name,
  1160. i.url, i.published_at, i.summary, i.author,
  1161. i.first_crawl_time, i.last_crawl_time, i.crawl_count
  1162. FROM rss_items i
  1163. LEFT JOIN rss_feeds f ON i.feed_id = f.id
  1164. WHERE i.last_crawl_time = ?
  1165. ORDER BY i.published_at DESC
  1166. """, (latest_time,))
  1167. rows = cursor.fetchall()
  1168. if not rows:
  1169. return None
  1170. items: Dict[str, List[RSSItem]] = {}
  1171. id_to_name: Dict[str, str] = {}
  1172. crawl_date = self._format_date_folder(date)
  1173. for row in rows:
  1174. feed_id = row[2]
  1175. feed_name = row[3] or feed_id
  1176. id_to_name[feed_id] = feed_name
  1177. if feed_id not in items:
  1178. items[feed_id] = []
  1179. items[feed_id].append(RSSItem(
  1180. title=row[1],
  1181. feed_id=feed_id,
  1182. feed_name=feed_name,
  1183. url=row[4] or "",
  1184. published_at=row[5] or "",
  1185. summary=row[6] or "",
  1186. author=row[7] or "",
  1187. crawl_time=row[9],
  1188. first_time=row[8],
  1189. last_time=row[9],
  1190. count=row[10],
  1191. ))
  1192. # 获取失败的源(针对最新一次抓取)
  1193. cursor.execute("""
  1194. SELECT cs.feed_id
  1195. FROM rss_crawl_status cs
  1196. JOIN rss_crawl_records cr ON cs.crawl_record_id = cr.id
  1197. WHERE cr.crawl_time = ? AND cs.status = 'failed'
  1198. """, (latest_time,))
  1199. failed_ids = [row[0] for row in cursor.fetchall()]
  1200. return RSSData(
  1201. date=crawl_date,
  1202. crawl_time=latest_time,
  1203. items=items,
  1204. id_to_name=id_to_name,
  1205. failed_ids=failed_ids,
  1206. )
  1207. except Exception as e:
  1208. print(f"[远程存储] 获取最新 RSS 数据失败: {e}")
  1209. return None
  1210. def __del__(self):
  1211. """析构函数"""
  1212. # 检查 Python 是否正在关闭
  1213. if sys.meta_path is None:
  1214. return
  1215. try:
  1216. self.cleanup()
  1217. except Exception:
  1218. # Python 关闭时可能会出错,忽略即可
  1219. pass
  1220. def pull_recent_days(self, days: int, local_data_dir: str = "output") -> int:
  1221. """
  1222. 从远程拉取最近 N 天的数据到本地
  1223. Args:
  1224. days: 拉取天数
  1225. local_data_dir: 本地数据目录
  1226. Returns:
  1227. 成功拉取的数据库文件数量
  1228. """
  1229. if days <= 0:
  1230. return 0
  1231. local_dir = Path(local_data_dir)
  1232. local_dir.mkdir(parents=True, exist_ok=True)
  1233. pulled_count = 0
  1234. now = self._get_configured_time()
  1235. print(f"[远程存储] 开始拉取最近 {days} 天的数据...")
  1236. for i in range(days):
  1237. date = now - timedelta(days=i)
  1238. date_str = date.strftime("%Y-%m-%d")
  1239. # 本地目标路径
  1240. local_date_dir = local_dir / date_str
  1241. local_db_path = local_date_dir / "news.db"
  1242. # 如果本地已存在,跳过
  1243. if local_db_path.exists():
  1244. print(f"[远程存储] 跳过(本地已存在): {date_str}")
  1245. continue
  1246. # 远程对象键
  1247. remote_key = f"news/{date_str}.db"
  1248. # 检查远程是否存在
  1249. if not self._check_object_exists(remote_key):
  1250. print(f"[远程存储] 跳过(远程不存在): {date_str}")
  1251. continue
  1252. # 下载(使用 get_object + iter_chunks 处理 chunked encoding)
  1253. try:
  1254. local_date_dir.mkdir(parents=True, exist_ok=True)
  1255. response = self.s3_client.get_object(Bucket=self.bucket_name, Key=remote_key)
  1256. with open(local_db_path, 'wb') as f:
  1257. for chunk in response['Body'].iter_chunks(chunk_size=1024*1024):
  1258. f.write(chunk)
  1259. print(f"[远程存储] 已拉取: {remote_key} -> {local_db_path}")
  1260. pulled_count += 1
  1261. except Exception as e:
  1262. print(f"[远程存储] 拉取失败 ({date_str}): {e}")
  1263. print(f"[远程存储] 拉取完成,共下载 {pulled_count} 个数据库文件")
  1264. return pulled_count
  1265. def list_remote_dates(self) -> List[str]:
  1266. """
  1267. 列出远程存储中所有可用的日期
  1268. Returns:
  1269. 日期字符串列表(YYYY-MM-DD 格式)
  1270. """
  1271. dates = []
  1272. try:
  1273. paginator = self.s3_client.get_paginator('list_objects_v2')
  1274. pages = paginator.paginate(Bucket=self.bucket_name, Prefix="news/")
  1275. for page in pages:
  1276. if 'Contents' not in page:
  1277. continue
  1278. for obj in page['Contents']:
  1279. key = obj['Key']
  1280. # 解析日期
  1281. date_match = re.match(r'news/(\d{4}-\d{2}-\d{2})\.db$', key)
  1282. if date_match:
  1283. dates.append(date_match.group(1))
  1284. return sorted(dates, reverse=True)
  1285. except Exception as e:
  1286. print(f"[远程存储] 列出远程日期失败: {e}")
  1287. return []