| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449 |
- """
- 数据解析服务
- v2.0.0: 仅支持 SQLite 数据库,移除 TXT 文件支持
- 新存储结构:output/{type}/{date}.db
- """
- import re
- import sqlite3
- from pathlib import Path
- from typing import Dict, List, Tuple, Optional
- from datetime import datetime
- import yaml
- from ..utils.errors import FileParseError, DataNotFoundError
- from .cache_service import get_cache
- class ParserService:
- """数据解析服务类"""
- def __init__(self, project_root: str = None):
- """
- 初始化解析服务
- Args:
- project_root: 项目根目录,默认为当前目录的父目录
- """
- if project_root is None:
- current_file = Path(__file__)
- self.project_root = current_file.parent.parent.parent
- else:
- self.project_root = Path(project_root)
- self.cache = get_cache()
- @staticmethod
- def clean_title(title: str) -> str:
- """清理标题文本"""
- title = re.sub(r'\s+', ' ', title)
- title = title.strip()
- return title
- def get_date_folder_name(self, date: datetime = None) -> str:
- """
- 获取日期字符串(ISO 格式)
- Args:
- date: 日期对象,默认为今天
- Returns:
- 日期字符串(YYYY-MM-DD)
- """
- if date is None:
- date = datetime.now()
- return date.strftime("%Y-%m-%d")
- def _get_db_path(self, date: datetime = None, db_type: str = "news") -> Optional[Path]:
- """
- 获取数据库文件路径
- 新结构:output/{type}/{date}.db
- Args:
- date: 日期对象,默认为今天
- db_type: 数据库类型 ("news" 或 "rss")
- Returns:
- 数据库文件路径,如果不存在则返回 None
- """
- date_str = self.get_date_folder_name(date)
- db_path = self.project_root / "output" / db_type / f"{date_str}.db"
- if db_path.exists():
- return db_path
- return None
- def _read_from_sqlite(
- self,
- date: datetime = None,
- platform_ids: Optional[List[str]] = None,
- db_type: str = "news"
- ) -> Optional[Tuple[Dict, Dict, Dict]]:
- """
- 从 SQLite 数据库读取数据
- Args:
- date: 日期对象,默认为今天
- platform_ids: 平台ID列表,None表示所有平台
- db_type: 数据库类型 ("news" 或 "rss")
- Returns:
- (all_titles, id_to_name, all_timestamps) 元组,如果数据库不存在返回 None
- """
- db_path = self._get_db_path(date, db_type)
- if db_path is None:
- return None
- all_titles = {}
- id_to_name = {}
- all_timestamps = {}
- try:
- conn = sqlite3.connect(str(db_path))
- conn.row_factory = sqlite3.Row
- cursor = conn.cursor()
- if db_type == "news":
- return self._read_news_from_sqlite(cursor, platform_ids, all_titles, id_to_name, all_timestamps)
- elif db_type == "rss":
- return self._read_rss_from_sqlite(cursor, platform_ids, all_titles, id_to_name, all_timestamps)
- except Exception as e:
- print(f"Warning: 从 SQLite 读取数据失败: {e}")
- return None
- finally:
- if 'conn' in locals():
- conn.close()
- def _read_news_from_sqlite(
- self,
- cursor,
- platform_ids: Optional[List[str]],
- all_titles: Dict,
- id_to_name: Dict,
- all_timestamps: Dict
- ) -> Optional[Tuple[Dict, Dict, Dict]]:
- """从热榜数据库读取数据"""
- # 检查表是否存在
- cursor.execute("""
- SELECT name FROM sqlite_master
- WHERE type='table' AND name='news_items'
- """)
- if not cursor.fetchone():
- return None
- # 构建查询
- if platform_ids:
- placeholders = ','.join(['?' for _ in platform_ids])
- query = f"""
- SELECT n.id, n.platform_id, p.name as platform_name, n.title,
- n.rank, n.url, n.mobile_url,
- n.first_crawl_time, n.last_crawl_time, n.crawl_count
- FROM news_items n
- LEFT JOIN platforms p ON n.platform_id = p.id
- WHERE n.platform_id IN ({placeholders})
- """
- cursor.execute(query, platform_ids)
- else:
- cursor.execute("""
- SELECT n.id, n.platform_id, p.name as platform_name, n.title,
- n.rank, n.url, n.mobile_url,
- n.first_crawl_time, n.last_crawl_time, n.crawl_count
- FROM news_items n
- LEFT JOIN platforms p ON n.platform_id = p.id
- """)
- rows = cursor.fetchall()
- # 收集所有 news_item_id 用于查询历史排名
- news_ids = [row['id'] for row in rows]
- rank_history_map = {}
- if news_ids:
- placeholders = ",".join("?" * len(news_ids))
- cursor.execute(f"""
- SELECT news_item_id, rank FROM rank_history
- WHERE news_item_id IN ({placeholders})
- ORDER BY news_item_id, crawl_time
- """, news_ids)
- for rh_row in cursor.fetchall():
- news_id = rh_row['news_item_id']
- rank = rh_row['rank']
- if news_id not in rank_history_map:
- rank_history_map[news_id] = []
- rank_history_map[news_id].append(rank)
- for row in rows:
- news_id = row['id']
- platform_id = row['platform_id']
- platform_name = row['platform_name'] or platform_id
- title = row['title']
- if platform_id not in id_to_name:
- id_to_name[platform_id] = platform_name
- if platform_id not in all_titles:
- all_titles[platform_id] = {}
- ranks = rank_history_map.get(news_id, [row['rank']])
- all_titles[platform_id][title] = {
- "ranks": ranks,
- "url": row['url'] or "",
- "mobileUrl": row['mobile_url'] or "",
- "first_time": row['first_crawl_time'] or "",
- "last_time": row['last_crawl_time'] or "",
- "count": row['crawl_count'] or 1,
- }
- # 获取抓取时间作为 timestamps
- cursor.execute("""
- SELECT crawl_time, created_at FROM crawl_records
- ORDER BY crawl_time
- """)
- for row in cursor.fetchall():
- crawl_time = row['crawl_time']
- created_at = row['created_at']
- try:
- ts = datetime.strptime(created_at, "%Y-%m-%d %H:%M:%S").timestamp()
- except (ValueError, TypeError):
- ts = datetime.now().timestamp()
- all_timestamps[f"{crawl_time}.db"] = ts
- if not all_titles:
- return None
- return (all_titles, id_to_name, all_timestamps)
- def _read_rss_from_sqlite(
- self,
- cursor,
- feed_ids: Optional[List[str]],
- all_items: Dict,
- id_to_name: Dict,
- all_timestamps: Dict
- ) -> Optional[Tuple[Dict, Dict, Dict]]:
- """从 RSS 数据库读取数据"""
- # 检查表是否存在
- cursor.execute("""
- SELECT name FROM sqlite_master
- WHERE type='table' AND name='rss_items'
- """)
- if not cursor.fetchone():
- return None
- # 构建查询
- if feed_ids:
- placeholders = ','.join(['?' for _ in feed_ids])
- query = f"""
- SELECT i.id, i.feed_id, f.name as feed_name, i.title,
- i.url, i.published_at, i.summary, i.author,
- i.first_crawl_time, i.last_crawl_time, i.crawl_count
- FROM rss_items i
- LEFT JOIN rss_feeds f ON i.feed_id = f.id
- WHERE i.feed_id IN ({placeholders})
- ORDER BY i.published_at DESC
- """
- cursor.execute(query, feed_ids)
- else:
- cursor.execute("""
- SELECT i.id, i.feed_id, f.name as feed_name, i.title,
- i.url, i.published_at, i.summary, i.author,
- i.first_crawl_time, i.last_crawl_time, i.crawl_count
- FROM rss_items i
- LEFT JOIN rss_feeds f ON i.feed_id = f.id
- ORDER BY i.published_at DESC
- """)
- rows = cursor.fetchall()
- for row in rows:
- feed_id = row['feed_id']
- feed_name = row['feed_name'] or feed_id
- title = row['title']
- if feed_id not in id_to_name:
- id_to_name[feed_id] = feed_name
- if feed_id not in all_items:
- all_items[feed_id] = {}
- all_items[feed_id][title] = {
- "url": row['url'] or "",
- "published_at": row['published_at'] or "",
- "summary": row['summary'] or "",
- "author": row['author'] or "",
- "first_time": row['first_crawl_time'] or "",
- "last_time": row['last_crawl_time'] or "",
- "count": row['crawl_count'] or 1,
- }
- # 获取抓取时间
- cursor.execute("""
- SELECT crawl_time, created_at FROM rss_crawl_records
- ORDER BY crawl_time
- """)
- for row in cursor.fetchall():
- crawl_time = row['crawl_time']
- created_at = row['created_at']
- try:
- ts = datetime.strptime(created_at, "%Y-%m-%d %H:%M:%S").timestamp()
- except (ValueError, TypeError):
- ts = datetime.now().timestamp()
- all_timestamps[f"{crawl_time}.db"] = ts
- if not all_items:
- return None
- return (all_items, id_to_name, all_timestamps)
- def read_all_titles_for_date(
- self,
- date: datetime = None,
- platform_ids: Optional[List[str]] = None,
- db_type: str = "news"
- ) -> Tuple[Dict, Dict, Dict]:
- """
- 读取指定日期的所有数据(带缓存)
- Args:
- date: 日期对象,默认为今天
- platform_ids: 平台/Feed ID列表,None表示所有
- db_type: 数据库类型 ("news" 或 "rss")
- Returns:
- (all_titles, id_to_name, all_timestamps) 元组
- Raises:
- DataNotFoundError: 数据不存在
- """
- date_str = self.get_date_folder_name(date)
- platform_key = ','.join(sorted(platform_ids)) if platform_ids else 'all'
- cache_key = f"read_all:{db_type}:{date_str}:{platform_key}"
- is_today = (date is None) or (date.date() == datetime.now().date())
- ttl = 900 if is_today else 3600
- cached = self.cache.get(cache_key, ttl=ttl)
- if cached:
- return cached
- result = self._read_from_sqlite(date, platform_ids, db_type)
- if result:
- self.cache.set(cache_key, result)
- return result
- raise DataNotFoundError(
- f"未找到 {date_str} 的 {db_type} 数据",
- suggestion="请先运行爬虫或检查日期是否正确"
- )
- def parse_yaml_config(self, config_path: str = None) -> dict:
- """
- 解析YAML配置文件
- Args:
- config_path: 配置文件路径,默认为 config/config.yaml
- Returns:
- 配置字典
- Raises:
- FileParseError: 配置文件解析错误
- """
- if config_path is None:
- config_path = self.project_root / "config" / "config.yaml"
- else:
- config_path = Path(config_path)
- if not config_path.exists():
- raise FileParseError(str(config_path), "配置文件不存在")
- try:
- with open(config_path, "r", encoding="utf-8") as f:
- config_data = yaml.safe_load(f)
- return config_data
- except Exception as e:
- raise FileParseError(str(config_path), str(e))
- def parse_frequency_words(self, words_file: str = None) -> List[Dict]:
- """
- 解析关键词配置文件
- 复用 trendradar.core.frequency 的解析逻辑,支持:
- - # 开头的注释行
- - 空行分隔词组
- - [组别名] 作为词组第一行,给整组指定别名
- - +前缀必须词、!前缀过滤词、@数量限制
- - /pattern/ 正则表达式语法
- - => 别名 显示名称语法
- - [GLOBAL_FILTER] 全局过滤区域
- 显示名称优先级:组别名 > 行别名拼接 > 关键词拼接
- Args:
- words_file: 关键词文件路径,默认为 config/frequency_words.txt
- Returns:
- 词组列表
- Raises:
- FileParseError: 文件解析错误
- """
- from trendradar.core.frequency import load_frequency_words
- if words_file is None:
- words_file = str(self.project_root / "config" / "frequency_words.txt")
- else:
- words_file = str(words_file)
- try:
- word_groups, filter_words, global_filters = load_frequency_words(words_file)
- return word_groups
- except FileNotFoundError:
- return []
- except Exception as e:
- raise FileParseError(words_file, str(e))
- def get_available_dates(self, db_type: str = "news") -> List[str]:
- """
- 获取可用的日期列表
- Args:
- db_type: 数据库类型 ("news" 或 "rss")
- Returns:
- 日期字符串列表(YYYY-MM-DD 格式,降序排列)
- """
- db_dir = self.project_root / "output" / db_type
- if not db_dir.exists():
- return []
- dates = []
- for db_file in db_dir.glob("*.db"):
- date_match = re.match(r'(\d{4}-\d{2}-\d{2})\.db$', db_file.name)
- if date_match:
- dates.append(date_match.group(1))
- return sorted(dates, reverse=True)
- def get_available_date_range(self, db_type: str = "news") -> Tuple[Optional[datetime], Optional[datetime]]:
- """
- 获取可用的日期范围
- Args:
- db_type: 数据库类型 ("news" 或 "rss")
- Returns:
- (最早日期, 最新日期) 元组,如果没有数据则返回 (None, None)
- """
- dates = self.get_available_dates(db_type)
- if not dates:
- return (None, None)
- earliest = datetime.strptime(dates[-1], "%Y-%m-%d")
- latest = datetime.strptime(dates[0], "%Y-%m-%d")
- return (earliest, latest)
|