data_query.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. """
  2. 数据查询工具
  3. 实现P0核心的数据查询工具。
  4. """
  5. from typing import Dict, List, Optional
  6. from ..services.data_service import DataService
  7. from ..utils.validators import (
  8. validate_platforms,
  9. validate_limit,
  10. validate_keyword,
  11. validate_date_range,
  12. validate_top_n,
  13. validate_mode,
  14. validate_date_query
  15. )
  16. from ..utils.errors import MCPError
  17. class DataQueryTools:
  18. """数据查询工具类"""
  19. def __init__(self, project_root: str = None):
  20. """
  21. 初始化数据查询工具
  22. Args:
  23. project_root: 项目根目录
  24. """
  25. self.data_service = DataService(project_root)
  26. def get_latest_news(
  27. self,
  28. platforms: Optional[List[str]] = None,
  29. limit: Optional[int] = None,
  30. include_url: bool = False
  31. ) -> Dict:
  32. """
  33. 获取最新一批爬取的新闻数据
  34. Args:
  35. platforms: 平台ID列表,如 ['zhihu', 'weibo']
  36. limit: 返回条数限制,默认20
  37. include_url: 是否包含URL链接,默认False(节省token)
  38. Returns:
  39. 新闻列表字典
  40. Example:
  41. >>> tools = DataQueryTools()
  42. >>> result = tools.get_latest_news(platforms=['zhihu'], limit=10)
  43. >>> print(result['total'])
  44. 10
  45. """
  46. try:
  47. # 参数验证
  48. platforms = validate_platforms(platforms)
  49. limit = validate_limit(limit, default=50)
  50. # 获取数据
  51. news_list = self.data_service.get_latest_news(
  52. platforms=platforms,
  53. limit=limit,
  54. include_url=include_url
  55. )
  56. return {
  57. "news": news_list,
  58. "total": len(news_list),
  59. "platforms": platforms,
  60. "success": True
  61. }
  62. except MCPError as e:
  63. return {
  64. "success": False,
  65. "error": e.to_dict()
  66. }
  67. except Exception as e:
  68. return {
  69. "success": False,
  70. "error": {
  71. "code": "INTERNAL_ERROR",
  72. "message": str(e)
  73. }
  74. }
  75. def search_news_by_keyword(
  76. self,
  77. keyword: str,
  78. date_range: Optional[Dict] = None,
  79. platforms: Optional[List[str]] = None,
  80. limit: Optional[int] = None
  81. ) -> Dict:
  82. """
  83. 按关键词搜索历史新闻
  84. Args:
  85. keyword: 搜索关键词(必需)
  86. date_range: 日期范围,格式: {"start": "YYYY-MM-DD", "end": "YYYY-MM-DD"}
  87. platforms: 平台过滤列表
  88. limit: 返回条数限制(可选,默认返回所有)
  89. Returns:
  90. 搜索结果字典
  91. Example:
  92. >>> tools = DataQueryTools()
  93. >>> result = tools.search_news_by_keyword(
  94. ... keyword="人工智能",
  95. ... date_range={"start": "2025-10-01", "end": "2025-10-11"},
  96. ... limit=50
  97. ... )
  98. >>> print(result['total'])
  99. """
  100. try:
  101. # 参数验证
  102. keyword = validate_keyword(keyword)
  103. date_range_tuple = validate_date_range(date_range)
  104. platforms = validate_platforms(platforms)
  105. if limit is not None:
  106. limit = validate_limit(limit, default=100)
  107. # 搜索数据
  108. search_result = self.data_service.search_news_by_keyword(
  109. keyword=keyword,
  110. date_range=date_range_tuple,
  111. platforms=platforms,
  112. limit=limit
  113. )
  114. return {
  115. **search_result,
  116. "success": True
  117. }
  118. except MCPError as e:
  119. return {
  120. "success": False,
  121. "error": e.to_dict()
  122. }
  123. except Exception as e:
  124. return {
  125. "success": False,
  126. "error": {
  127. "code": "INTERNAL_ERROR",
  128. "message": str(e)
  129. }
  130. }
  131. def get_trending_topics(
  132. self,
  133. top_n: Optional[int] = None,
  134. mode: Optional[str] = None
  135. ) -> Dict:
  136. """
  137. 获取个人关注词的新闻出现频率统计
  138. 注意:本工具基于 config/frequency_words.txt 中的个人关注词列表进行统计,
  139. 而不是自动从新闻中提取热点话题。这是一个个人可定制的关注词列表,
  140. 用户可以根据自己的兴趣添加或删除关注词。
  141. Args:
  142. top_n: 返回TOP N关注词,默认10
  143. mode: 模式 - daily(当日累计), current(最新一批), incremental(增量)
  144. Returns:
  145. 关注词频率统计字典,包含每个关注词在新闻中出现的次数
  146. Example:
  147. >>> tools = DataQueryTools()
  148. >>> result = tools.get_trending_topics(top_n=5, mode="current")
  149. >>> print(len(result['topics']))
  150. 5
  151. >>> # 返回的是你在 frequency_words.txt 中设置的关注词的频率统计
  152. """
  153. try:
  154. # 参数验证
  155. top_n = validate_top_n(top_n, default=10)
  156. valid_modes = ["daily", "current", "incremental"]
  157. mode = validate_mode(mode, valid_modes, default="current")
  158. # 获取趋势话题
  159. trending_result = self.data_service.get_trending_topics(
  160. top_n=top_n,
  161. mode=mode
  162. )
  163. return {
  164. **trending_result,
  165. "success": True
  166. }
  167. except MCPError as e:
  168. return {
  169. "success": False,
  170. "error": e.to_dict()
  171. }
  172. except Exception as e:
  173. return {
  174. "success": False,
  175. "error": {
  176. "code": "INTERNAL_ERROR",
  177. "message": str(e)
  178. }
  179. }
  180. def get_news_by_date(
  181. self,
  182. date_query: Optional[str] = None,
  183. platforms: Optional[List[str]] = None,
  184. limit: Optional[int] = None,
  185. include_url: bool = False
  186. ) -> Dict:
  187. """
  188. 按日期查询新闻,支持自然语言日期
  189. Args:
  190. date_query: 日期查询字符串(可选,默认"今天"),支持:
  191. - 相对日期:今天、昨天、前天、3天前、yesterday、3 days ago
  192. - 星期:上周一、本周三、last monday、this friday
  193. - 绝对日期:2025-10-10、10月10日、2025年10月10日
  194. platforms: 平台ID列表,如 ['zhihu', 'weibo']
  195. limit: 返回条数限制,默认50
  196. include_url: 是否包含URL链接,默认False(节省token)
  197. Returns:
  198. 新闻列表字典
  199. Example:
  200. >>> tools = DataQueryTools()
  201. >>> # 不指定日期,默认查询今天
  202. >>> result = tools.get_news_by_date(platforms=['zhihu'], limit=20)
  203. >>> # 指定日期
  204. >>> result = tools.get_news_by_date(
  205. ... date_query="昨天",
  206. ... platforms=['zhihu'],
  207. ... limit=20
  208. ... )
  209. >>> print(result['total'])
  210. 20
  211. """
  212. try:
  213. # 参数验证 - 默认今天
  214. if date_query is None:
  215. date_query = "今天"
  216. target_date = validate_date_query(date_query)
  217. platforms = validate_platforms(platforms)
  218. limit = validate_limit(limit, default=50)
  219. # 获取数据
  220. news_list = self.data_service.get_news_by_date(
  221. target_date=target_date,
  222. platforms=platforms,
  223. limit=limit,
  224. include_url=include_url
  225. )
  226. return {
  227. "news": news_list,
  228. "total": len(news_list),
  229. "date": target_date.strftime("%Y-%m-%d"),
  230. "date_query": date_query,
  231. "platforms": platforms,
  232. "success": True
  233. }
  234. except MCPError as e:
  235. return {
  236. "success": False,
  237. "error": e.to_dict()
  238. }
  239. except Exception as e:
  240. return {
  241. "success": False,
  242. "error": {
  243. "code": "INTERNAL_ERROR",
  244. "message": str(e)
  245. }
  246. }