system.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465
  1. """
  2. 系统管理工具
  3. 实现系统状态查询和爬虫触发功能。
  4. """
  5. from pathlib import Path
  6. from typing import Dict, List, Optional
  7. from ..services.data_service import DataService
  8. from ..utils.validators import validate_platforms
  9. from ..utils.errors import MCPError, CrawlTaskError
  10. class SystemManagementTools:
  11. """系统管理工具类"""
  12. def __init__(self, project_root: str = None):
  13. """
  14. 初始化系统管理工具
  15. Args:
  16. project_root: 项目根目录
  17. """
  18. self.data_service = DataService(project_root)
  19. if project_root:
  20. self.project_root = Path(project_root)
  21. else:
  22. # 获取项目根目录
  23. current_file = Path(__file__)
  24. self.project_root = current_file.parent.parent.parent
  25. def get_system_status(self) -> Dict:
  26. """
  27. 获取系统运行状态和健康检查信息
  28. Returns:
  29. 系统状态字典
  30. Example:
  31. >>> tools = SystemManagementTools()
  32. >>> result = tools.get_system_status()
  33. >>> print(result['system']['version'])
  34. """
  35. try:
  36. # 获取系统状态
  37. status = self.data_service.get_system_status()
  38. return {
  39. **status,
  40. "success": True
  41. }
  42. except MCPError as e:
  43. return {
  44. "success": False,
  45. "error": e.to_dict()
  46. }
  47. except Exception as e:
  48. return {
  49. "success": False,
  50. "error": {
  51. "code": "INTERNAL_ERROR",
  52. "message": str(e)
  53. }
  54. }
  55. def trigger_crawl(self, platforms: Optional[List[str]] = None, save_to_local: bool = False, include_url: bool = False) -> Dict:
  56. """
  57. 手动触发一次临时爬取任务(可选持久化)
  58. Args:
  59. platforms: 指定平台列表,为空则爬取所有平台
  60. save_to_local: 是否保存到本地 output 目录,默认 False
  61. include_url: 是否包含URL链接,默认False(节省token)
  62. Returns:
  63. 爬取结果字典,包含新闻数据和保存路径(如果保存)
  64. Example:
  65. >>> tools = SystemManagementTools()
  66. >>> # 临时爬取,不保存
  67. >>> result = tools.trigger_crawl(platforms=['zhihu', 'weibo'])
  68. >>> print(result['data'])
  69. >>> # 爬取并保存到本地
  70. >>> result = tools.trigger_crawl(platforms=['zhihu'], save_to_local=True)
  71. >>> print(result['saved_files'])
  72. """
  73. try:
  74. import json
  75. import time
  76. import random
  77. import requests
  78. from datetime import datetime
  79. import pytz
  80. import yaml
  81. # 参数验证
  82. platforms = validate_platforms(platforms)
  83. # 加载配置文件
  84. config_path = self.project_root / "config" / "config.yaml"
  85. if not config_path.exists():
  86. raise CrawlTaskError(
  87. "配置文件不存在",
  88. suggestion=f"请确保配置文件存在: {config_path}"
  89. )
  90. # 读取配置
  91. with open(config_path, "r", encoding="utf-8") as f:
  92. config_data = yaml.safe_load(f)
  93. # 获取平台配置
  94. all_platforms = config_data.get("platforms", [])
  95. if not all_platforms:
  96. raise CrawlTaskError(
  97. "配置文件中没有平台配置",
  98. suggestion="请检查 config/config.yaml 中的 platforms 配置"
  99. )
  100. # 过滤平台
  101. if platforms:
  102. target_platforms = [p for p in all_platforms if p["id"] in platforms]
  103. if not target_platforms:
  104. raise CrawlTaskError(
  105. f"指定的平台不存在: {platforms}",
  106. suggestion=f"可用平台: {[p['id'] for p in all_platforms]}"
  107. )
  108. else:
  109. target_platforms = all_platforms
  110. # 获取请求间隔
  111. request_interval = config_data.get("crawler", {}).get("request_interval", 100)
  112. # 构建平台ID列表
  113. ids = []
  114. for platform in target_platforms:
  115. if "name" in platform:
  116. ids.append((platform["id"], platform["name"]))
  117. else:
  118. ids.append(platform["id"])
  119. print(f"开始临时爬取,平台: {[p.get('name', p['id']) for p in target_platforms]}")
  120. # 爬取数据
  121. results = {}
  122. id_to_name = {}
  123. failed_ids = []
  124. for i, id_info in enumerate(ids):
  125. if isinstance(id_info, tuple):
  126. id_value, name = id_info
  127. else:
  128. id_value = id_info
  129. name = id_value
  130. id_to_name[id_value] = name
  131. # 构建请求URL
  132. url = f"https://newsnow.busiyi.world/api/s?id={id_value}&latest"
  133. headers = {
  134. "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
  135. "Accept": "application/json, text/plain, */*",
  136. "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
  137. "Connection": "keep-alive",
  138. "Cache-Control": "no-cache",
  139. }
  140. # 重试机制
  141. max_retries = 2
  142. retries = 0
  143. success = False
  144. while retries <= max_retries and not success:
  145. try:
  146. response = requests.get(url, headers=headers, timeout=10)
  147. response.raise_for_status()
  148. data_text = response.text
  149. data_json = json.loads(data_text)
  150. status = data_json.get("status", "未知")
  151. if status not in ["success", "cache"]:
  152. raise ValueError(f"响应状态异常: {status}")
  153. status_info = "最新数据" if status == "success" else "缓存数据"
  154. print(f"获取 {id_value} 成功({status_info})")
  155. # 解析数据
  156. results[id_value] = {}
  157. for index, item in enumerate(data_json.get("items", []), 1):
  158. title = item["title"]
  159. url_link = item.get("url", "")
  160. mobile_url = item.get("mobileUrl", "")
  161. if title in results[id_value]:
  162. results[id_value][title]["ranks"].append(index)
  163. else:
  164. results[id_value][title] = {
  165. "ranks": [index],
  166. "url": url_link,
  167. "mobileUrl": mobile_url,
  168. }
  169. success = True
  170. except Exception as e:
  171. retries += 1
  172. if retries <= max_retries:
  173. wait_time = random.uniform(3, 5)
  174. print(f"请求 {id_value} 失败: {e}. {wait_time:.2f}秒后重试...")
  175. time.sleep(wait_time)
  176. else:
  177. print(f"请求 {id_value} 失败: {e}")
  178. failed_ids.append(id_value)
  179. # 请求间隔
  180. if i < len(ids) - 1:
  181. actual_interval = request_interval + random.randint(-10, 20)
  182. actual_interval = max(50, actual_interval)
  183. time.sleep(actual_interval / 1000)
  184. # 格式化返回数据
  185. news_data = []
  186. for platform_id, titles_data in results.items():
  187. platform_name = id_to_name.get(platform_id, platform_id)
  188. for title, info in titles_data.items():
  189. news_item = {
  190. "platform_id": platform_id,
  191. "platform_name": platform_name,
  192. "title": title,
  193. "ranks": info["ranks"]
  194. }
  195. # 条件性添加 URL 字段
  196. if include_url:
  197. news_item["url"] = info.get("url", "")
  198. news_item["mobile_url"] = info.get("mobileUrl", "")
  199. news_data.append(news_item)
  200. # 获取北京时间
  201. beijing_tz = pytz.timezone("Asia/Shanghai")
  202. now = datetime.now(beijing_tz)
  203. # 构建返回结果
  204. result = {
  205. "success": True,
  206. "task_id": f"crawl_{int(time.time())}",
  207. "status": "completed",
  208. "crawl_time": now.strftime("%Y-%m-%d %H:%M:%S"),
  209. "platforms": list(results.keys()),
  210. "total_news": len(news_data),
  211. "failed_platforms": failed_ids,
  212. "data": news_data,
  213. "saved_to_local": save_to_local
  214. }
  215. # 如果需要持久化,调用保存逻辑
  216. if save_to_local:
  217. try:
  218. import re
  219. # 辅助函数:清理标题
  220. def clean_title(title: str) -> str:
  221. """清理标题中的特殊字符"""
  222. if not isinstance(title, str):
  223. title = str(title)
  224. cleaned_title = title.replace("\n", " ").replace("\r", " ")
  225. cleaned_title = re.sub(r"\s+", " ", cleaned_title)
  226. cleaned_title = cleaned_title.strip()
  227. return cleaned_title
  228. # 辅助函数:创建目录
  229. def ensure_directory_exists(directory: str):
  230. """确保目录存在"""
  231. Path(directory).mkdir(parents=True, exist_ok=True)
  232. # 格式化日期和时间
  233. date_folder = now.strftime("%Y年%m月%d日")
  234. time_filename = now.strftime("%H时%M分")
  235. # 创建 txt 文件路径
  236. txt_dir = self.project_root / "output" / date_folder / "txt"
  237. ensure_directory_exists(str(txt_dir))
  238. txt_file_path = txt_dir / f"{time_filename}.txt"
  239. # 创建 html 文件路径
  240. html_dir = self.project_root / "output" / date_folder / "html"
  241. ensure_directory_exists(str(html_dir))
  242. html_file_path = html_dir / f"{time_filename}.html"
  243. # 保存 txt 文件(按照 main.py 的格式)
  244. with open(txt_file_path, "w", encoding="utf-8") as f:
  245. for id_value, title_data in results.items():
  246. # id | name 或 id
  247. name = id_to_name.get(id_value)
  248. if name and name != id_value:
  249. f.write(f"{id_value} | {name}\n")
  250. else:
  251. f.write(f"{id_value}\n")
  252. # 按排名排序标题
  253. sorted_titles = []
  254. for title, info in title_data.items():
  255. cleaned = clean_title(title)
  256. if isinstance(info, dict):
  257. ranks = info.get("ranks", [])
  258. url = info.get("url", "")
  259. mobile_url = info.get("mobileUrl", "")
  260. else:
  261. ranks = info if isinstance(info, list) else []
  262. url = ""
  263. mobile_url = ""
  264. rank = ranks[0] if ranks else 1
  265. sorted_titles.append((rank, cleaned, url, mobile_url))
  266. sorted_titles.sort(key=lambda x: x[0])
  267. for rank, cleaned, url, mobile_url in sorted_titles:
  268. line = f"{rank}. {cleaned}"
  269. if url:
  270. line += f" [URL:{url}]"
  271. if mobile_url:
  272. line += f" [MOBILE:{mobile_url}]"
  273. f.write(line + "\n")
  274. f.write("\n")
  275. if failed_ids:
  276. f.write("==== 以下ID请求失败 ====\n")
  277. for id_value in failed_ids:
  278. f.write(f"{id_value}\n")
  279. # 保存 html 文件(简化版)
  280. html_content = self._generate_simple_html(results, id_to_name, failed_ids, now)
  281. with open(html_file_path, "w", encoding="utf-8") as f:
  282. f.write(html_content)
  283. print(f"数据已保存到:")
  284. print(f" TXT: {txt_file_path}")
  285. print(f" HTML: {html_file_path}")
  286. result["saved_files"] = {
  287. "txt": str(txt_file_path),
  288. "html": str(html_file_path)
  289. }
  290. result["note"] = "数据已持久化到 output 文件夹"
  291. except Exception as e:
  292. print(f"保存文件失败: {e}")
  293. result["save_error"] = str(e)
  294. result["note"] = "爬取成功但保存失败,数据仅在内存中"
  295. else:
  296. result["note"] = "临时爬取结果,未持久化到output文件夹"
  297. return result
  298. except MCPError as e:
  299. return {
  300. "success": False,
  301. "error": e.to_dict()
  302. }
  303. except Exception as e:
  304. import traceback
  305. return {
  306. "success": False,
  307. "error": {
  308. "code": "INTERNAL_ERROR",
  309. "message": str(e),
  310. "traceback": traceback.format_exc()
  311. }
  312. }
  313. def _generate_simple_html(self, results: Dict, id_to_name: Dict, failed_ids: List, now) -> str:
  314. """生成简化的 HTML 报告"""
  315. html = """<!DOCTYPE html>
  316. <html>
  317. <head>
  318. <meta charset="UTF-8">
  319. <meta name="viewport" content="width=device-width, initial-scale=1.0">
  320. <title>MCP 爬取结果</title>
  321. <style>
  322. body { font-family: Arial, sans-serif; margin: 20px; background: #f5f5f5; }
  323. .container { max-width: 900px; margin: 0 auto; background: white; padding: 20px; border-radius: 8px; }
  324. h1 { color: #333; border-bottom: 2px solid #4CAF50; padding-bottom: 10px; }
  325. .platform { margin-bottom: 30px; }
  326. .platform-name { background: #4CAF50; color: white; padding: 10px; border-radius: 5px; margin-bottom: 10px; }
  327. .news-item { padding: 8px; border-bottom: 1px solid #eee; }
  328. .rank { color: #666; font-weight: bold; margin-right: 10px; }
  329. .title { color: #333; }
  330. .link { color: #1976D2; text-decoration: none; margin-left: 10px; font-size: 0.9em; }
  331. .link:hover { text-decoration: underline; }
  332. .failed { background: #ffebee; padding: 10px; border-radius: 5px; margin-top: 20px; }
  333. .failed h3 { color: #c62828; margin-top: 0; }
  334. .timestamp { color: #666; font-size: 0.9em; text-align: right; margin-top: 20px; }
  335. </style>
  336. </head>
  337. <body>
  338. <div class="container">
  339. <h1>MCP 爬取结果</h1>
  340. """
  341. # 添加时间戳
  342. html += f' <p class="timestamp">爬取时间: {now.strftime("%Y-%m-%d %H:%M:%S")}</p>\n\n'
  343. # 遍历每个平台
  344. for platform_id, titles_data in results.items():
  345. platform_name = id_to_name.get(platform_id, platform_id)
  346. html += f' <div class="platform">\n'
  347. html += f' <div class="platform-name">{platform_name}</div>\n'
  348. # 排序标题
  349. sorted_items = []
  350. for title, info in titles_data.items():
  351. ranks = info.get("ranks", [])
  352. url = info.get("url", "")
  353. mobile_url = info.get("mobileUrl", "")
  354. rank = ranks[0] if ranks else 999
  355. sorted_items.append((rank, title, url, mobile_url))
  356. sorted_items.sort(key=lambda x: x[0])
  357. # 显示新闻
  358. for rank, title, url, mobile_url in sorted_items:
  359. html += f' <div class="news-item">\n'
  360. html += f' <span class="rank">{rank}.</span>\n'
  361. html += f' <span class="title">{self._html_escape(title)}</span>\n'
  362. if url:
  363. html += f' <a class="link" href="{self._html_escape(url)}" target="_blank">链接</a>\n'
  364. if mobile_url and mobile_url != url:
  365. html += f' <a class="link" href="{self._html_escape(mobile_url)}" target="_blank">移动版</a>\n'
  366. html += ' </div>\n'
  367. html += ' </div>\n\n'
  368. # 失败的平台
  369. if failed_ids:
  370. html += ' <div class="failed">\n'
  371. html += ' <h3>请求失败的平台</h3>\n'
  372. html += ' <ul>\n'
  373. for platform_id in failed_ids:
  374. html += f' <li>{self._html_escape(platform_id)}</li>\n'
  375. html += ' </ul>\n'
  376. html += ' </div>\n'
  377. html += """ </div>
  378. </body>
  379. </html>"""
  380. return html
  381. def _html_escape(self, text: str) -> str:
  382. """HTML 转义"""
  383. if not isinstance(text, str):
  384. text = str(text)
  385. return (
  386. text.replace("&", "&amp;")
  387. .replace("<", "&lt;")
  388. .replace(">", "&gt;")
  389. .replace('"', "&quot;")
  390. .replace("'", "&#x27;")
  391. )