| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604 |
- """
- 数据访问服务
- 提供统一的数据查询接口,封装数据访问逻辑。
- """
- import re
- from collections import Counter
- from datetime import datetime, timedelta
- from typing import Dict, List, Optional, Tuple
- from .cache_service import get_cache
- from .parser_service import ParserService
- from ..utils.errors import DataNotFoundError
- class DataService:
- """数据访问服务类"""
- def __init__(self, project_root: str = None):
- """
- 初始化数据服务
- Args:
- project_root: 项目根目录
- """
- self.parser = ParserService(project_root)
- self.cache = get_cache()
- def get_latest_news(
- self,
- platforms: Optional[List[str]] = None,
- limit: int = 50,
- include_url: bool = False
- ) -> List[Dict]:
- """
- 获取最新一批爬取的新闻数据
- Args:
- platforms: 平台ID列表,None表示所有平台
- limit: 返回条数限制
- include_url: 是否包含URL链接,默认False(节省token)
- Returns:
- 新闻列表
- Raises:
- DataNotFoundError: 数据不存在
- """
- # 尝试从缓存获取
- cache_key = f"latest_news:{','.join(platforms or [])}:{limit}:{include_url}"
- cached = self.cache.get(cache_key, ttl=900) # 15分钟缓存
- if cached:
- return cached
- # 读取今天的数据
- all_titles, id_to_name, timestamps = self.parser.read_all_titles_for_date(
- date=None,
- platform_ids=platforms
- )
- # 获取最新的文件时间
- if timestamps:
- latest_timestamp = max(timestamps.values())
- fetch_time = datetime.fromtimestamp(latest_timestamp)
- else:
- fetch_time = datetime.now()
- # 转换为新闻列表
- news_list = []
- for platform_id, titles in all_titles.items():
- platform_name = id_to_name.get(platform_id, platform_id)
- for title, info in titles.items():
- # 取第一个排名
- rank = info["ranks"][0] if info["ranks"] else 0
- news_item = {
- "title": title,
- "platform": platform_id,
- "platform_name": platform_name,
- "rank": rank,
- "timestamp": fetch_time.strftime("%Y-%m-%d %H:%M:%S")
- }
- # 条件性添加 URL 字段
- if include_url:
- news_item["url"] = info.get("url", "")
- news_item["mobileUrl"] = info.get("mobileUrl", "")
- news_list.append(news_item)
- # 按排名排序
- news_list.sort(key=lambda x: x["rank"])
- # 限制返回数量
- result = news_list[:limit]
- # 缓存结果
- self.cache.set(cache_key, result)
- return result
- def get_news_by_date(
- self,
- target_date: datetime,
- platforms: Optional[List[str]] = None,
- limit: int = 50,
- include_url: bool = False
- ) -> List[Dict]:
- """
- 按指定日期获取新闻
- Args:
- target_date: 目标日期
- platforms: 平台ID列表,None表示所有平台
- limit: 返回条数限制
- include_url: 是否包含URL链接,默认False(节省token)
- Returns:
- 新闻列表
- Raises:
- DataNotFoundError: 数据不存在
- Examples:
- >>> service = DataService()
- >>> news = service.get_news_by_date(
- ... target_date=datetime(2025, 10, 10),
- ... platforms=['zhihu'],
- ... limit=20
- ... )
- """
- # 尝试从缓存获取
- date_str = target_date.strftime("%Y-%m-%d")
- cache_key = f"news_by_date:{date_str}:{','.join(platforms or [])}:{limit}:{include_url}"
- cached = self.cache.get(cache_key, ttl=1800) # 30分钟缓存
- if cached:
- return cached
- # 读取指定日期的数据
- all_titles, id_to_name, timestamps = self.parser.read_all_titles_for_date(
- date=target_date,
- platform_ids=platforms
- )
- # 转换为新闻列表
- news_list = []
- for platform_id, titles in all_titles.items():
- platform_name = id_to_name.get(platform_id, platform_id)
- for title, info in titles.items():
- # 计算平均排名
- avg_rank = sum(info["ranks"]) / len(info["ranks"]) if info["ranks"] else 0
- news_item = {
- "title": title,
- "platform": platform_id,
- "platform_name": platform_name,
- "rank": info["ranks"][0] if info["ranks"] else 0,
- "avg_rank": round(avg_rank, 2),
- "count": len(info["ranks"]),
- "date": date_str
- }
- # 条件性添加 URL 字段
- if include_url:
- news_item["url"] = info.get("url", "")
- news_item["mobileUrl"] = info.get("mobileUrl", "")
- news_list.append(news_item)
- # 按排名排序
- news_list.sort(key=lambda x: x["rank"])
- # 限制返回数量
- result = news_list[:limit]
- # 缓存结果(历史数据缓存更久)
- self.cache.set(cache_key, result)
- return result
- def search_news_by_keyword(
- self,
- keyword: str,
- date_range: Optional[Tuple[datetime, datetime]] = None,
- platforms: Optional[List[str]] = None,
- limit: Optional[int] = None
- ) -> Dict:
- """
- 按关键词搜索新闻
- Args:
- keyword: 搜索关键词
- date_range: 日期范围 (start_date, end_date)
- platforms: 平台过滤列表
- limit: 返回条数限制(可选)
- Returns:
- 搜索结果字典
- Raises:
- DataNotFoundError: 数据不存在
- """
- # 确定搜索日期范围
- if date_range:
- start_date, end_date = date_range
- else:
- # 默认搜索今天
- start_date = end_date = datetime.now()
- # 收集所有匹配的新闻
- results = []
- platform_distribution = Counter()
- # 遍历日期范围
- current_date = start_date
- while current_date <= end_date:
- try:
- all_titles, id_to_name, _ = self.parser.read_all_titles_for_date(
- date=current_date,
- platform_ids=platforms
- )
- # 搜索包含关键词的标题
- for platform_id, titles in all_titles.items():
- platform_name = id_to_name.get(platform_id, platform_id)
- for title, info in titles.items():
- if keyword.lower() in title.lower():
- # 计算平均排名
- avg_rank = sum(info["ranks"]) / len(info["ranks"]) if info["ranks"] else 0
- results.append({
- "title": title,
- "platform": platform_id,
- "platform_name": platform_name,
- "ranks": info["ranks"],
- "count": len(info["ranks"]),
- "avg_rank": round(avg_rank, 2),
- "url": info.get("url", ""),
- "mobileUrl": info.get("mobileUrl", ""),
- "date": current_date.strftime("%Y-%m-%d")
- })
- platform_distribution[platform_id] += 1
- except DataNotFoundError:
- # 该日期没有数据,继续下一天
- pass
- # 下一天
- current_date += timedelta(days=1)
- if not results:
- raise DataNotFoundError(
- f"未找到包含关键词 '{keyword}' 的新闻",
- suggestion="请尝试其他关键词或扩大日期范围"
- )
- # 计算统计信息
- total_ranks = []
- for item in results:
- total_ranks.extend(item["ranks"])
- avg_rank = sum(total_ranks) / len(total_ranks) if total_ranks else 0
- # 限制返回数量(如果指定)
- total_found = len(results)
- if limit is not None and limit > 0:
- results = results[:limit]
- return {
- "results": results,
- "total": len(results),
- "total_found": total_found,
- "statistics": {
- "platform_distribution": dict(platform_distribution),
- "avg_rank": round(avg_rank, 2),
- "keyword": keyword
- }
- }
- def get_trending_topics(
- self,
- top_n: int = 10,
- mode: str = "current"
- ) -> Dict:
- """
- 获取个人关注词的新闻出现频率统计
- 注意:本工具基于 config/frequency_words.txt 中的个人关注词列表进行统计,
- 而不是自动从新闻中提取热点话题。用户可以自定义这个关注词列表。
- Args:
- top_n: 返回TOP N关注词
- mode: 模式 - daily(当日累计), current(最新一批)
- Returns:
- 关注词频率统计字典
- Raises:
- DataNotFoundError: 数据不存在
- """
- # 尝试从缓存获取
- cache_key = f"trending_topics:{top_n}:{mode}"
- cached = self.cache.get(cache_key, ttl=1800) # 30分钟缓存
- if cached:
- return cached
- # 读取今天的数据
- all_titles, id_to_name, timestamps = self.parser.read_all_titles_for_date()
- if not all_titles:
- raise DataNotFoundError(
- "未找到今天的新闻数据",
- suggestion="请确保爬虫已经运行并生成了数据"
- )
- # 加载关键词配置
- word_groups = self.parser.parse_frequency_words()
- # 根据mode选择要处理的标题数据
- titles_to_process = {}
- if mode == "daily":
- # daily模式:处理当天所有累计数据
- titles_to_process = all_titles
- elif mode == "current":
- # current模式:只处理最新一批数据(最新时间戳的文件)
- if timestamps:
- # 找出最新的时间戳
- latest_timestamp = max(timestamps.values())
- # 重新读取,只获取最新时间的数据
- # 这里我们通过timestamps字典反查找最新文件对应的平台
- latest_titles, _, _ = self.parser.read_all_titles_for_date()
- # 由于read_all_titles_for_date返回所有文件的合并数据,
- # 我们需要通过timestamps来过滤出最新批次
- # 简化实现:使用当前所有数据作为最新批次
- # (更精确的实现需要解析服务支持按时间过滤)
- titles_to_process = latest_titles
- else:
- titles_to_process = all_titles
- else:
- raise ValueError(
- f"不支持的模式: {mode}。支持的模式: daily, current"
- )
- # 统计词频
- word_frequency = Counter()
- keyword_to_news = {}
- # 遍历要处理的标题
- for platform_id, titles in titles_to_process.items():
- for title in titles.keys():
- # 对每个关键词组进行匹配
- for group in word_groups:
- all_words = group.get("required", []) + group.get("normal", [])
- for word in all_words:
- if word and word in title:
- word_frequency[word] += 1
- if word not in keyword_to_news:
- keyword_to_news[word] = []
- keyword_to_news[word].append(title)
- # 获取TOP N关键词
- top_keywords = word_frequency.most_common(top_n)
- # 构建话题列表
- topics = []
- for keyword, frequency in top_keywords:
- matched_news = keyword_to_news.get(keyword, [])
- topics.append({
- "keyword": keyword,
- "frequency": frequency,
- "matched_news": len(set(matched_news)), # 去重后的新闻数量
- "trend": "stable", # TODO: 需要历史数据来计算趋势
- "weight_score": 0.0 # TODO: 需要实现权重计算
- })
- # 构建结果
- result = {
- "topics": topics,
- "generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
- "mode": mode,
- "total_keywords": len(word_frequency),
- "description": self._get_mode_description(mode)
- }
- # 缓存结果
- self.cache.set(cache_key, result)
- return result
- def _get_mode_description(self, mode: str) -> str:
- """获取模式描述"""
- descriptions = {
- "daily": "当日累计统计",
- "current": "最新一批统计"
- }
- return descriptions.get(mode, "未知模式")
- def get_current_config(self, section: str = "all") -> Dict:
- """
- 获取当前系统配置
- Args:
- section: 配置节 - all/crawler/push/keywords/weights
- Returns:
- 配置字典
- Raises:
- FileParseError: 配置文件解析错误
- """
- # 尝试从缓存获取
- cache_key = f"config:{section}"
- cached = self.cache.get(cache_key, ttl=3600) # 1小时缓存
- if cached:
- return cached
- # 解析配置文件
- config_data = self.parser.parse_yaml_config()
- word_groups = self.parser.parse_frequency_words()
- # 根据section返回对应配置
- if section == "all" or section == "crawler":
- crawler_config = {
- "enable_crawler": config_data.get("crawler", {}).get("enable_crawler", True),
- "use_proxy": config_data.get("crawler", {}).get("use_proxy", False),
- "request_interval": config_data.get("crawler", {}).get("request_interval", 1),
- "retry_times": 3,
- "platforms": [p["id"] for p in config_data.get("platforms", [])]
- }
- if section == "all" or section == "push":
- push_config = {
- "enable_notification": config_data.get("notification", {}).get("enable_notification", True),
- "enabled_channels": [],
- "message_batch_size": config_data.get("notification", {}).get("message_batch_size", 20),
- "push_window": config_data.get("notification", {}).get("push_window", {})
- }
- # 检测已配置的通知渠道
- webhooks = config_data.get("notification", {}).get("webhooks", {})
- if webhooks.get("feishu_url"):
- push_config["enabled_channels"].append("feishu")
- if webhooks.get("dingtalk_url"):
- push_config["enabled_channels"].append("dingtalk")
- if webhooks.get("wework_url"):
- push_config["enabled_channels"].append("wework")
- if section == "all" or section == "keywords":
- keywords_config = {
- "word_groups": word_groups,
- "total_groups": len(word_groups)
- }
- if section == "all" or section == "weights":
- weights_config = {
- "rank_weight": config_data.get("weight", {}).get("rank_weight", 0.6),
- "frequency_weight": config_data.get("weight", {}).get("frequency_weight", 0.3),
- "hotness_weight": config_data.get("weight", {}).get("hotness_weight", 0.1)
- }
- # 组装结果
- if section == "all":
- result = {
- "crawler": crawler_config,
- "push": push_config,
- "keywords": keywords_config,
- "weights": weights_config
- }
- elif section == "crawler":
- result = crawler_config
- elif section == "push":
- result = push_config
- elif section == "keywords":
- result = keywords_config
- elif section == "weights":
- result = weights_config
- else:
- result = {}
- # 缓存结果
- self.cache.set(cache_key, result)
- return result
- def get_available_date_range(self) -> Tuple[Optional[datetime], Optional[datetime]]:
- """
- 扫描 output 目录,返回实际可用的日期范围
- Returns:
- (最早日期, 最新日期) 元组,如果没有数据则返回 (None, None)
- Examples:
- >>> service = DataService()
- >>> earliest, latest = service.get_available_date_range()
- >>> print(f"可用日期范围:{earliest} 至 {latest}")
- """
- output_dir = self.parser.project_root / "output"
- if not output_dir.exists():
- return (None, None)
- available_dates = []
- # 遍历日期文件夹
- for date_folder in output_dir.iterdir():
- if date_folder.is_dir() and not date_folder.name.startswith('.'):
- # 解析日期(格式: YYYY年MM月DD日)
- try:
- date_match = re.match(r'(\d{4})年(\d{2})月(\d{2})日', date_folder.name)
- if date_match:
- folder_date = datetime(
- int(date_match.group(1)),
- int(date_match.group(2)),
- int(date_match.group(3))
- )
- available_dates.append(folder_date)
- except Exception:
- pass
- if not available_dates:
- return (None, None)
- return (min(available_dates), max(available_dates))
- def get_system_status(self) -> Dict:
- """
- 获取系统运行状态
- Returns:
- 系统状态字典
- """
- # 获取数据统计
- output_dir = self.parser.project_root / "output"
- total_storage = 0
- oldest_record = None
- latest_record = None
- total_news = 0
- if output_dir.exists():
- # 遍历日期文件夹
- for date_folder in output_dir.iterdir():
- if date_folder.is_dir():
- # 解析日期
- try:
- date_str = date_folder.name
- # 格式: YYYY年MM月DD日
- date_match = re.match(r'(\d{4})年(\d{2})月(\d{2})日', date_str)
- if date_match:
- folder_date = datetime(
- int(date_match.group(1)),
- int(date_match.group(2)),
- int(date_match.group(3))
- )
- if oldest_record is None or folder_date < oldest_record:
- oldest_record = folder_date
- if latest_record is None or folder_date > latest_record:
- latest_record = folder_date
- except:
- pass
- # 计算存储大小
- for item in date_folder.rglob("*"):
- if item.is_file():
- total_storage += item.stat().st_size
- # 读取版本信息
- version_file = self.parser.project_root / "version"
- version = "unknown"
- if version_file.exists():
- try:
- with open(version_file, "r") as f:
- version = f.read().strip()
- except:
- pass
- return {
- "system": {
- "version": version,
- "project_root": str(self.parser.project_root)
- },
- "data": {
- "total_storage": f"{total_storage / 1024 / 1024:.2f} MB",
- "oldest_record": oldest_record.strftime("%Y-%m-%d") if oldest_record else None,
- "latest_record": latest_record.strftime("%Y-%m-%d") if latest_record else None,
- },
- "cache": self.cache.get_stats(),
- "health": "healthy"
- }
|