data_query.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470
  1. """
  2. 数据查询工具
  3. 实现P0核心的数据查询工具。
  4. """
  5. from typing import Dict, List, Optional, Union
  6. from ..services.data_service import DataService
  7. from ..utils.validators import (
  8. validate_platforms,
  9. validate_limit,
  10. validate_keyword,
  11. validate_date_range,
  12. validate_top_n,
  13. validate_mode,
  14. validate_date_query,
  15. normalize_date_range
  16. )
  17. from ..utils.errors import MCPError
  18. class DataQueryTools:
  19. """数据查询工具类"""
  20. def __init__(self, project_root: str = None):
  21. """
  22. 初始化数据查询工具
  23. Args:
  24. project_root: 项目根目录
  25. """
  26. self.data_service = DataService(project_root)
  27. def get_latest_news(
  28. self,
  29. platforms: Optional[List[str]] = None,
  30. limit: Optional[int] = None,
  31. include_url: bool = False
  32. ) -> Dict:
  33. """
  34. 获取最新一批爬取的新闻数据
  35. Args:
  36. platforms: 平台ID列表,如 ['zhihu', 'weibo']
  37. limit: 返回条数限制,默认20
  38. include_url: 是否包含URL链接,默认False(节省token)
  39. Returns:
  40. 新闻列表字典
  41. Example:
  42. >>> tools = DataQueryTools()
  43. >>> result = tools.get_latest_news(platforms=['zhihu'], limit=10)
  44. >>> print(result['total'])
  45. 10
  46. """
  47. try:
  48. # 参数验证
  49. platforms = validate_platforms(platforms)
  50. limit = validate_limit(limit, default=50)
  51. # 获取数据
  52. news_list = self.data_service.get_latest_news(
  53. platforms=platforms,
  54. limit=limit,
  55. include_url=include_url
  56. )
  57. return {
  58. "success": True,
  59. "summary": {
  60. "description": "最新一批爬取的新闻数据",
  61. "total": len(news_list),
  62. "returned": len(news_list),
  63. "platforms": platforms or "全部平台"
  64. },
  65. "data": news_list
  66. }
  67. except MCPError as e:
  68. return {
  69. "success": False,
  70. "error": e.to_dict()
  71. }
  72. except Exception as e:
  73. return {
  74. "success": False,
  75. "error": {
  76. "code": "INTERNAL_ERROR",
  77. "message": str(e)
  78. }
  79. }
  80. def search_news_by_keyword(
  81. self,
  82. keyword: str,
  83. date_range: Optional[Union[Dict, str]] = None,
  84. platforms: Optional[List[str]] = None,
  85. limit: Optional[int] = None
  86. ) -> Dict:
  87. """
  88. 按关键词搜索历史新闻
  89. Args:
  90. keyword: 搜索关键词(必需)
  91. date_range: 日期范围,格式: {"start": "YYYY-MM-DD", "end": "YYYY-MM-DD"}
  92. platforms: 平台过滤列表
  93. limit: 返回条数限制(可选,默认返回所有)
  94. Returns:
  95. 搜索结果字典
  96. Example (假设今天是 2025-11-17):
  97. >>> tools = DataQueryTools()
  98. >>> result = tools.search_news_by_keyword(
  99. ... keyword="人工智能",
  100. ... date_range={"start": "2025-11-08", "end": "2025-11-17"},
  101. ... limit=50
  102. ... )
  103. >>> print(result['total'])
  104. """
  105. try:
  106. # 参数验证
  107. keyword = validate_keyword(keyword)
  108. date_range_tuple = validate_date_range(date_range)
  109. platforms = validate_platforms(platforms)
  110. if limit is not None:
  111. limit = validate_limit(limit, default=100)
  112. # 搜索数据
  113. search_result = self.data_service.search_news_by_keyword(
  114. keyword=keyword,
  115. date_range=date_range_tuple,
  116. platforms=platforms,
  117. limit=limit
  118. )
  119. return {
  120. **search_result,
  121. "success": True
  122. }
  123. except MCPError as e:
  124. return {
  125. "success": False,
  126. "error": e.to_dict()
  127. }
  128. except Exception as e:
  129. return {
  130. "success": False,
  131. "error": {
  132. "code": "INTERNAL_ERROR",
  133. "message": str(e)
  134. }
  135. }
  136. def get_trending_topics(
  137. self,
  138. top_n: Optional[int] = None,
  139. mode: Optional[str] = None,
  140. extract_mode: Optional[str] = None
  141. ) -> Dict:
  142. """
  143. 获取热点话题统计
  144. Args:
  145. top_n: 返回TOP N话题,默认10
  146. mode: 时间模式
  147. - "daily": 当日累计数据统计
  148. - "current": 最新一批数据统计(默认)
  149. extract_mode: 提取模式
  150. - "keywords": 统计预设关注词(基于 config/frequency_words.txt,默认)
  151. - "auto_extract": 自动从新闻标题提取高频词
  152. Returns:
  153. 话题频率统计字典
  154. Example:
  155. >>> tools = DataQueryTools()
  156. >>> # 使用预设关注词
  157. >>> result = tools.get_trending_topics(top_n=5, mode="current")
  158. >>> # 自动提取高频词
  159. >>> result = tools.get_trending_topics(top_n=10, extract_mode="auto_extract")
  160. """
  161. try:
  162. # 参数验证
  163. top_n = validate_top_n(top_n, default=10)
  164. valid_modes = ["daily", "current"]
  165. mode = validate_mode(mode, valid_modes, default="current")
  166. # 验证 extract_mode
  167. if extract_mode is None:
  168. extract_mode = "keywords"
  169. elif extract_mode not in ["keywords", "auto_extract"]:
  170. return {
  171. "success": False,
  172. "error": {
  173. "code": "INVALID_PARAMETER",
  174. "message": f"不支持的提取模式: {extract_mode}",
  175. "suggestion": "支持的模式: keywords, auto_extract"
  176. }
  177. }
  178. # 获取趋势话题
  179. trending_result = self.data_service.get_trending_topics(
  180. top_n=top_n,
  181. mode=mode,
  182. extract_mode=extract_mode
  183. )
  184. return {
  185. **trending_result,
  186. "success": True
  187. }
  188. except MCPError as e:
  189. return {
  190. "success": False,
  191. "error": e.to_dict()
  192. }
  193. except Exception as e:
  194. return {
  195. "success": False,
  196. "error": {
  197. "code": "INTERNAL_ERROR",
  198. "message": str(e)
  199. }
  200. }
  201. def get_news_by_date(
  202. self,
  203. date_range: Optional[Union[Dict[str, str], str]] = None,
  204. platforms: Optional[List[str]] = None,
  205. limit: Optional[int] = None,
  206. include_url: bool = False
  207. ) -> Dict:
  208. """
  209. 按日期查询新闻,支持自然语言日期
  210. Args:
  211. date_range: 日期范围(可选,默认"今天"),支持:
  212. - 范围对象:{"start": "2025-01-01", "end": "2025-01-07"}
  213. - 相对日期:今天、昨天、前天、3天前
  214. - 单日字符串:2025-10-10
  215. platforms: 平台ID列表,如 ['zhihu', 'weibo']
  216. limit: 返回条数限制,默认50
  217. include_url: 是否包含URL链接,默认False(节省token)
  218. Returns:
  219. 新闻列表字典
  220. Example:
  221. >>> tools = DataQueryTools()
  222. >>> # 不指定日期,默认查询今天
  223. >>> result = tools.get_news_by_date(platforms=['zhihu'], limit=20)
  224. >>> # 指定日期
  225. >>> result = tools.get_news_by_date(
  226. ... date_range="昨天",
  227. ... platforms=['zhihu'],
  228. ... limit=20
  229. ... )
  230. >>> print(result['total'])
  231. 20
  232. """
  233. try:
  234. # 参数验证 - 默认今天
  235. if date_range is None:
  236. date_range = "今天"
  237. # 规范化 date_range(处理 JSON 字符串序列化问题)
  238. date_range = normalize_date_range(date_range)
  239. # 处理 date_range:支持字符串或对象
  240. if isinstance(date_range, dict):
  241. # 范围对象,取 start 日期
  242. date_str = date_range.get('start', '今天')
  243. else:
  244. date_str = date_range
  245. target_date = validate_date_query(date_str)
  246. platforms = validate_platforms(platforms)
  247. limit = validate_limit(limit, default=50)
  248. # 获取数据
  249. news_list = self.data_service.get_news_by_date(
  250. target_date=target_date,
  251. platforms=platforms,
  252. limit=limit,
  253. include_url=include_url
  254. )
  255. return {
  256. "success": True,
  257. "summary": {
  258. "description": f"按日期查询的新闻({target_date.strftime('%Y-%m-%d')})",
  259. "total": len(news_list),
  260. "returned": len(news_list),
  261. "date": target_date.strftime("%Y-%m-%d"),
  262. "date_range": date_range,
  263. "platforms": platforms or "全部平台"
  264. },
  265. "data": news_list
  266. }
  267. except MCPError as e:
  268. return {
  269. "success": False,
  270. "error": e.to_dict()
  271. }
  272. except Exception as e:
  273. return {
  274. "success": False,
  275. "error": {
  276. "code": "INTERNAL_ERROR",
  277. "message": str(e)
  278. }
  279. }
  280. # ========================================
  281. # RSS 数据查询方法
  282. # ========================================
  283. def get_latest_rss(
  284. self,
  285. feeds: Optional[List[str]] = None,
  286. days: int = 1,
  287. limit: Optional[int] = None,
  288. include_summary: bool = False
  289. ) -> Dict:
  290. """
  291. 获取最新的 RSS 数据(支持多日查询)
  292. Args:
  293. feeds: RSS 源 ID 列表,如 ['hacker-news', '36kr']
  294. days: 获取最近 N 天的数据,默认 1(仅今天),最大 30 天
  295. limit: 返回条数限制,默认50
  296. include_summary: 是否包含摘要,默认False(节省token)
  297. Returns:
  298. RSS 条目列表字典
  299. """
  300. try:
  301. limit = validate_limit(limit, default=50)
  302. rss_list = self.data_service.get_latest_rss(
  303. feeds=feeds,
  304. days=days,
  305. limit=limit,
  306. include_summary=include_summary
  307. )
  308. return {
  309. "success": True,
  310. "summary": {
  311. "description": f"最近 {days} 天的 RSS 订阅数据" if days > 1 else "最新的 RSS 订阅数据",
  312. "total": len(rss_list),
  313. "returned": len(rss_list),
  314. "days": days,
  315. "feeds": feeds or "全部订阅源"
  316. },
  317. "data": rss_list
  318. }
  319. except MCPError as e:
  320. return {
  321. "success": False,
  322. "error": e.to_dict()
  323. }
  324. except Exception as e:
  325. return {
  326. "success": False,
  327. "error": {
  328. "code": "INTERNAL_ERROR",
  329. "message": str(e)
  330. }
  331. }
  332. def search_rss(
  333. self,
  334. keyword: str,
  335. feeds: Optional[List[str]] = None,
  336. days: int = 7,
  337. limit: Optional[int] = None,
  338. include_summary: bool = False
  339. ) -> Dict:
  340. """
  341. 搜索 RSS 数据
  342. Args:
  343. keyword: 搜索关键词
  344. feeds: RSS 源 ID 列表
  345. days: 搜索最近 N 天的数据,默认 7 天
  346. limit: 返回条数限制,默认50
  347. include_summary: 是否包含摘要
  348. Returns:
  349. 匹配的 RSS 条目列表
  350. """
  351. try:
  352. keyword = validate_keyword(keyword)
  353. limit = validate_limit(limit, default=50)
  354. if days < 1 or days > 30:
  355. days = 7
  356. rss_list = self.data_service.search_rss(
  357. keyword=keyword,
  358. feeds=feeds,
  359. days=days,
  360. limit=limit,
  361. include_summary=include_summary
  362. )
  363. return {
  364. "success": True,
  365. "summary": {
  366. "description": f"RSS 搜索结果(关键词: {keyword})",
  367. "total": len(rss_list),
  368. "returned": len(rss_list),
  369. "keyword": keyword,
  370. "feeds": feeds or "全部订阅源",
  371. "days": days
  372. },
  373. "data": rss_list
  374. }
  375. except MCPError as e:
  376. return {
  377. "success": False,
  378. "error": e.to_dict()
  379. }
  380. except Exception as e:
  381. return {
  382. "success": False,
  383. "error": {
  384. "code": "INTERNAL_ERROR",
  385. "message": str(e)
  386. }
  387. }
  388. def get_rss_feeds_status(self) -> Dict:
  389. """
  390. 获取 RSS 源状态
  391. Returns:
  392. RSS 源状态信息
  393. """
  394. try:
  395. status = self.data_service.get_rss_feeds_status()
  396. return {
  397. **status,
  398. "success": True
  399. }
  400. except MCPError as e:
  401. return {
  402. "success": False,
  403. "error": e.to_dict()
  404. }
  405. except Exception as e:
  406. return {
  407. "success": False,
  408. "error": {
  409. "code": "INTERNAL_ERROR",
  410. "message": str(e)
  411. }
  412. }