| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355 |
- """
- 文件解析服务
- 提供txt格式新闻数据和YAML配置文件的解析功能。
- """
- import re
- from pathlib import Path
- from typing import Dict, List, Tuple, Optional
- from datetime import datetime
- import yaml
- from ..utils.errors import FileParseError, DataNotFoundError
- from .cache_service import get_cache
- class ParserService:
- """文件解析服务类"""
- def __init__(self, project_root: str = None):
- """
- 初始化解析服务
- Args:
- project_root: 项目根目录,默认为当前目录的父目录
- """
- if project_root is None:
- # 获取当前文件所在目录的父目录的父目录
- current_file = Path(__file__)
- self.project_root = current_file.parent.parent.parent
- else:
- self.project_root = Path(project_root)
- # 初始化缓存服务
- self.cache = get_cache()
- @staticmethod
- def clean_title(title: str) -> str:
- """
- 清理标题文本
- Args:
- title: 原始标题
- Returns:
- 清理后的标题
- """
- # 移除多余空白
- title = re.sub(r'\s+', ' ', title)
- # 移除特殊字符
- title = title.strip()
- return title
- def parse_txt_file(self, file_path: Path) -> Tuple[Dict, Dict]:
- """
- 解析单个txt文件的标题数据
- Args:
- file_path: txt文件路径
- Returns:
- (titles_by_id, id_to_name) 元组
- - titles_by_id: {platform_id: {title: {ranks, url, mobileUrl}}}
- - id_to_name: {platform_id: platform_name}
- Raises:
- FileParseError: 文件解析错误
- """
- if not file_path.exists():
- raise FileParseError(str(file_path), "文件不存在")
- titles_by_id = {}
- id_to_name = {}
- try:
- with open(file_path, "r", encoding="utf-8") as f:
- content = f.read()
- sections = content.split("\n\n")
- for section in sections:
- if not section.strip() or "==== 以下ID请求失败 ====" in section:
- continue
- lines = section.strip().split("\n")
- if len(lines) < 2:
- continue
- # 解析header: id | name 或 id
- header_line = lines[0].strip()
- if " | " in header_line:
- parts = header_line.split(" | ", 1)
- source_id = parts[0].strip()
- name = parts[1].strip()
- id_to_name[source_id] = name
- else:
- source_id = header_line
- id_to_name[source_id] = source_id
- titles_by_id[source_id] = {}
- # 解析标题行
- for line in lines[1:]:
- if line.strip():
- try:
- title_part = line.strip()
- rank = None
- # 提取排名
- if ". " in title_part and title_part.split(". ")[0].isdigit():
- rank_str, title_part = title_part.split(". ", 1)
- rank = int(rank_str)
- # 提取 MOBILE URL
- mobile_url = ""
- if " [MOBILE:" in title_part:
- title_part, mobile_part = title_part.rsplit(" [MOBILE:", 1)
- if mobile_part.endswith("]"):
- mobile_url = mobile_part[:-1]
- # 提取 URL
- url = ""
- if " [URL:" in title_part:
- title_part, url_part = title_part.rsplit(" [URL:", 1)
- if url_part.endswith("]"):
- url = url_part[:-1]
- title = self.clean_title(title_part.strip())
- ranks = [rank] if rank is not None else [1]
- titles_by_id[source_id][title] = {
- "ranks": ranks,
- "url": url,
- "mobileUrl": mobile_url,
- }
- except Exception as e:
- # 忽略单行解析错误
- continue
- except Exception as e:
- raise FileParseError(str(file_path), str(e))
- return titles_by_id, id_to_name
- def get_date_folder_name(self, date: datetime = None) -> str:
- """
- 获取日期文件夹名称
- Args:
- date: 日期对象,默认为今天
- Returns:
- 文件夹名称,格式: YYYY年MM月DD日
- """
- if date is None:
- date = datetime.now()
- return date.strftime("%Y年%m月%d日")
- def read_all_titles_for_date(
- self,
- date: datetime = None,
- platform_ids: Optional[List[str]] = None
- ) -> Tuple[Dict, Dict, Dict]:
- """
- 读取指定日期的所有标题文件(带缓存)
- Args:
- date: 日期对象,默认为今天
- platform_ids: 平台ID列表,None表示所有平台
- Returns:
- (all_titles, id_to_name, all_timestamps) 元组
- - all_titles: {platform_id: {title: {ranks, url, mobileUrl, ...}}}
- - id_to_name: {platform_id: platform_name}
- - all_timestamps: {filename: timestamp}
- Raises:
- DataNotFoundError: 数据不存在
- """
- # 生成缓存键
- date_str = self.get_date_folder_name(date)
- platform_key = ','.join(sorted(platform_ids)) if platform_ids else 'all'
- cache_key = f"read_all_titles:{date_str}:{platform_key}"
- # 尝试从缓存获取
- # 对于历史数据(非今天),使用更长的缓存时间(1小时)
- # 对于今天的数据,使用较短的缓存时间(15分钟),因为可能有新数据
- is_today = (date is None) or (date.date() == datetime.now().date())
- ttl = 900 if is_today else 3600 # 15分钟 vs 1小时
- cached = self.cache.get(cache_key, ttl=ttl)
- if cached:
- return cached
- # 缓存未命中,读取文件
- date_folder = self.get_date_folder_name(date)
- txt_dir = self.project_root / "output" / date_folder / "txt"
- if not txt_dir.exists():
- raise DataNotFoundError(
- f"未找到 {date_folder} 的数据目录",
- suggestion="请先运行爬虫或检查日期是否正确"
- )
- all_titles = {}
- id_to_name = {}
- all_timestamps = {}
- # 读取所有txt文件
- txt_files = sorted(txt_dir.glob("*.txt"))
- if not txt_files:
- raise DataNotFoundError(
- f"{date_folder} 没有数据文件",
- suggestion="请等待爬虫任务完成"
- )
- for txt_file in txt_files:
- try:
- titles_by_id, file_id_to_name = self.parse_txt_file(txt_file)
- # 更新id_to_name
- id_to_name.update(file_id_to_name)
- # 合并标题数据
- for platform_id, titles in titles_by_id.items():
- # 如果指定了平台过滤
- if platform_ids and platform_id not in platform_ids:
- continue
- if platform_id not in all_titles:
- all_titles[platform_id] = {}
- for title, info in titles.items():
- if title in all_titles[platform_id]:
- # 合并排名
- all_titles[platform_id][title]["ranks"].extend(info["ranks"])
- else:
- all_titles[platform_id][title] = info.copy()
- # 记录文件时间戳
- all_timestamps[txt_file.name] = txt_file.stat().st_mtime
- except Exception as e:
- # 忽略单个文件的解析错误,继续处理其他文件
- print(f"Warning: 解析文件 {txt_file} 失败: {e}")
- continue
- if not all_titles:
- raise DataNotFoundError(
- f"{date_folder} 没有有效的数据",
- suggestion="请检查数据文件格式或重新运行爬虫"
- )
- # 缓存结果
- result = (all_titles, id_to_name, all_timestamps)
- self.cache.set(cache_key, result)
- return result
- def parse_yaml_config(self, config_path: str = None) -> dict:
- """
- 解析YAML配置文件
- Args:
- config_path: 配置文件路径,默认为 config/config.yaml
- Returns:
- 配置字典
- Raises:
- FileParseError: 配置文件解析错误
- """
- if config_path is None:
- config_path = self.project_root / "config" / "config.yaml"
- else:
- config_path = Path(config_path)
- if not config_path.exists():
- raise FileParseError(str(config_path), "配置文件不存在")
- try:
- with open(config_path, "r", encoding="utf-8") as f:
- config_data = yaml.safe_load(f)
- return config_data
- except Exception as e:
- raise FileParseError(str(config_path), str(e))
- def parse_frequency_words(self, words_file: str = None) -> List[Dict]:
- """
- 解析关键词配置文件
- Args:
- words_file: 关键词文件路径,默认为 config/frequency_words.txt
- Returns:
- 词组列表
- Raises:
- FileParseError: 文件解析错误
- """
- if words_file is None:
- words_file = self.project_root / "config" / "frequency_words.txt"
- else:
- words_file = Path(words_file)
- if not words_file.exists():
- return []
- word_groups = []
- try:
- with open(words_file, "r", encoding="utf-8") as f:
- for line in f:
- line = line.strip()
- if not line or line.startswith("#"):
- continue
- # 使用 | 分隔符
- parts = [p.strip() for p in line.split("|")]
- if not parts:
- continue
- group = {
- "required": [],
- "normal": [],
- "filter_words": []
- }
- for part in parts:
- if not part:
- continue
- words = [w.strip() for w in part.split(",")]
- for word in words:
- if not word:
- continue
- if word.endswith("+"):
- # 必须词
- group["required"].append(word[:-1])
- elif word.endswith("!"):
- # 过滤词
- group["filter_words"].append(word[:-1])
- else:
- # 普通词
- group["normal"].append(word)
- if group["required"] or group["normal"]:
- word_groups.append(group)
- except Exception as e:
- raise FileParseError(str(words_file), str(e))
- return word_groups
|