data_query.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. """
  2. 数据查询工具
  3. 实现P0核心的数据查询工具。
  4. """
  5. from typing import Dict, List, Optional, Union
  6. from ..services.data_service import DataService
  7. from ..utils.validators import (
  8. validate_platforms,
  9. validate_limit,
  10. validate_keyword,
  11. validate_date_range,
  12. validate_top_n,
  13. validate_mode,
  14. validate_date_query
  15. )
  16. from ..utils.errors import MCPError
  17. class DataQueryTools:
  18. """数据查询工具类"""
  19. def __init__(self, project_root: str = None):
  20. """
  21. 初始化数据查询工具
  22. Args:
  23. project_root: 项目根目录
  24. """
  25. self.data_service = DataService(project_root)
  26. def get_latest_news(
  27. self,
  28. platforms: Optional[List[str]] = None,
  29. limit: Optional[int] = None,
  30. include_url: bool = False
  31. ) -> Dict:
  32. """
  33. 获取最新一批爬取的新闻数据
  34. Args:
  35. platforms: 平台ID列表,如 ['zhihu', 'weibo']
  36. limit: 返回条数限制,默认20
  37. include_url: 是否包含URL链接,默认False(节省token)
  38. Returns:
  39. 新闻列表字典
  40. Example:
  41. >>> tools = DataQueryTools()
  42. >>> result = tools.get_latest_news(platforms=['zhihu'], limit=10)
  43. >>> print(result['total'])
  44. 10
  45. """
  46. try:
  47. # 参数验证
  48. platforms = validate_platforms(platforms)
  49. limit = validate_limit(limit, default=50)
  50. # 获取数据
  51. news_list = self.data_service.get_latest_news(
  52. platforms=platforms,
  53. limit=limit,
  54. include_url=include_url
  55. )
  56. return {
  57. "news": news_list,
  58. "total": len(news_list),
  59. "platforms": platforms,
  60. "success": True
  61. }
  62. except MCPError as e:
  63. return {
  64. "success": False,
  65. "error": e.to_dict()
  66. }
  67. except Exception as e:
  68. return {
  69. "success": False,
  70. "error": {
  71. "code": "INTERNAL_ERROR",
  72. "message": str(e)
  73. }
  74. }
  75. def search_news_by_keyword(
  76. self,
  77. keyword: str,
  78. date_range: Optional[Union[Dict, str]] = None,
  79. platforms: Optional[List[str]] = None,
  80. limit: Optional[int] = None
  81. ) -> Dict:
  82. """
  83. 按关键词搜索历史新闻
  84. Args:
  85. keyword: 搜索关键词(必需)
  86. date_range: 日期范围,格式: {"start": "YYYY-MM-DD", "end": "YYYY-MM-DD"}
  87. platforms: 平台过滤列表
  88. limit: 返回条数限制(可选,默认返回所有)
  89. Returns:
  90. 搜索结果字典
  91. Example (假设今天是 2025-11-17):
  92. >>> tools = DataQueryTools()
  93. >>> result = tools.search_news_by_keyword(
  94. ... keyword="人工智能",
  95. ... date_range={"start": "2025-11-08", "end": "2025-11-17"},
  96. ... limit=50
  97. ... )
  98. >>> print(result['total'])
  99. """
  100. try:
  101. # 参数验证
  102. keyword = validate_keyword(keyword)
  103. date_range_tuple = validate_date_range(date_range)
  104. platforms = validate_platforms(platforms)
  105. if limit is not None:
  106. limit = validate_limit(limit, default=100)
  107. # 搜索数据
  108. search_result = self.data_service.search_news_by_keyword(
  109. keyword=keyword,
  110. date_range=date_range_tuple,
  111. platforms=platforms,
  112. limit=limit
  113. )
  114. return {
  115. **search_result,
  116. "success": True
  117. }
  118. except MCPError as e:
  119. return {
  120. "success": False,
  121. "error": e.to_dict()
  122. }
  123. except Exception as e:
  124. return {
  125. "success": False,
  126. "error": {
  127. "code": "INTERNAL_ERROR",
  128. "message": str(e)
  129. }
  130. }
  131. def get_trending_topics(
  132. self,
  133. top_n: Optional[int] = None,
  134. mode: Optional[str] = None,
  135. extract_mode: Optional[str] = None
  136. ) -> Dict:
  137. """
  138. 获取热点话题统计
  139. Args:
  140. top_n: 返回TOP N话题,默认10
  141. mode: 时间模式
  142. - "daily": 当日累计数据统计
  143. - "current": 最新一批数据统计(默认)
  144. extract_mode: 提取模式
  145. - "keywords": 统计预设关注词(基于 config/frequency_words.txt,默认)
  146. - "auto_extract": 自动从新闻标题提取高频词
  147. Returns:
  148. 话题频率统计字典
  149. Example:
  150. >>> tools = DataQueryTools()
  151. >>> # 使用预设关注词
  152. >>> result = tools.get_trending_topics(top_n=5, mode="current")
  153. >>> # 自动提取高频词
  154. >>> result = tools.get_trending_topics(top_n=10, extract_mode="auto_extract")
  155. """
  156. try:
  157. # 参数验证
  158. top_n = validate_top_n(top_n, default=10)
  159. valid_modes = ["daily", "current"]
  160. mode = validate_mode(mode, valid_modes, default="current")
  161. # 验证 extract_mode
  162. if extract_mode is None:
  163. extract_mode = "keywords"
  164. elif extract_mode not in ["keywords", "auto_extract"]:
  165. return {
  166. "success": False,
  167. "error": {
  168. "code": "INVALID_PARAMETER",
  169. "message": f"不支持的提取模式: {extract_mode}",
  170. "suggestion": "支持的模式: keywords, auto_extract"
  171. }
  172. }
  173. # 获取趋势话题
  174. trending_result = self.data_service.get_trending_topics(
  175. top_n=top_n,
  176. mode=mode,
  177. extract_mode=extract_mode
  178. )
  179. return {
  180. **trending_result,
  181. "success": True
  182. }
  183. except MCPError as e:
  184. return {
  185. "success": False,
  186. "error": e.to_dict()
  187. }
  188. except Exception as e:
  189. return {
  190. "success": False,
  191. "error": {
  192. "code": "INTERNAL_ERROR",
  193. "message": str(e)
  194. }
  195. }
  196. def get_news_by_date(
  197. self,
  198. date_range: Optional[Union[Dict[str, str], str]] = None,
  199. platforms: Optional[List[str]] = None,
  200. limit: Optional[int] = None,
  201. include_url: bool = False
  202. ) -> Dict:
  203. """
  204. 按日期查询新闻,支持自然语言日期
  205. Args:
  206. date_range: 日期范围(可选,默认"今天"),支持:
  207. - 范围对象:{"start": "2025-01-01", "end": "2025-01-07"}
  208. - 相对日期:今天、昨天、前天、3天前
  209. - 单日字符串:2025-10-10
  210. platforms: 平台ID列表,如 ['zhihu', 'weibo']
  211. limit: 返回条数限制,默认50
  212. include_url: 是否包含URL链接,默认False(节省token)
  213. Returns:
  214. 新闻列表字典
  215. Example:
  216. >>> tools = DataQueryTools()
  217. >>> # 不指定日期,默认查询今天
  218. >>> result = tools.get_news_by_date(platforms=['zhihu'], limit=20)
  219. >>> # 指定日期
  220. >>> result = tools.get_news_by_date(
  221. ... date_range="昨天",
  222. ... platforms=['zhihu'],
  223. ... limit=20
  224. ... )
  225. >>> print(result['total'])
  226. 20
  227. """
  228. try:
  229. # 参数验证 - 默认今天
  230. if date_range is None:
  231. date_range = "今天"
  232. # 处理 date_range:支持字符串或对象
  233. if isinstance(date_range, dict):
  234. # 范围对象,取 start 日期
  235. date_str = date_range.get('start', '今天')
  236. else:
  237. date_str = date_range
  238. target_date = validate_date_query(date_str)
  239. platforms = validate_platforms(platforms)
  240. limit = validate_limit(limit, default=50)
  241. # 获取数据
  242. news_list = self.data_service.get_news_by_date(
  243. target_date=target_date,
  244. platforms=platforms,
  245. limit=limit,
  246. include_url=include_url
  247. )
  248. return {
  249. "news": news_list,
  250. "total": len(news_list),
  251. "date": target_date.strftime("%Y-%m-%d"),
  252. "date_range": date_range,
  253. "platforms": platforms,
  254. "success": True
  255. }
  256. except MCPError as e:
  257. return {
  258. "success": False,
  259. "error": e.to_dict()
  260. }
  261. except Exception as e:
  262. return {
  263. "success": False,
  264. "error": {
  265. "code": "INTERNAL_ERROR",
  266. "message": str(e)
  267. }
  268. }