fetcher.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. # coding=utf-8
  2. """
  3. 数据获取器模块
  4. 负责从 NewsNow API 抓取新闻数据,支持:
  5. - 单个平台数据获取
  6. - 批量平台数据爬取
  7. - 自动重试机制
  8. - 代理支持
  9. """
  10. import json
  11. import random
  12. import time
  13. from typing import Dict, List, Tuple, Optional, Union
  14. import requests
  15. class DataFetcher:
  16. """数据获取器"""
  17. # 默认 API 地址
  18. DEFAULT_API_URL = "https://newsnow.busiyi.world/api/s"
  19. # 默认请求头
  20. DEFAULT_HEADERS = {
  21. "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
  22. "Accept": "application/json, text/plain, */*",
  23. "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
  24. "Connection": "keep-alive",
  25. "Cache-Control": "no-cache",
  26. }
  27. def __init__(
  28. self,
  29. proxy_url: Optional[str] = None,
  30. api_url: Optional[str] = None,
  31. ):
  32. """
  33. 初始化数据获取器
  34. Args:
  35. proxy_url: 代理服务器 URL(可选)
  36. api_url: API 基础 URL(可选,默认使用 DEFAULT_API_URL)
  37. """
  38. self.proxy_url = proxy_url
  39. self.api_url = api_url or self.DEFAULT_API_URL
  40. def fetch_data(
  41. self,
  42. id_info: Union[str, Tuple[str, str]],
  43. max_retries: int = 2,
  44. min_retry_wait: int = 3,
  45. max_retry_wait: int = 5,
  46. ) -> Tuple[Optional[str], str, str]:
  47. """
  48. 获取指定ID数据,支持重试
  49. Args:
  50. id_info: 平台ID 或 (平台ID, 别名) 元组
  51. max_retries: 最大重试次数
  52. min_retry_wait: 最小重试等待时间(秒)
  53. max_retry_wait: 最大重试等待时间(秒)
  54. Returns:
  55. (响应文本, 平台ID, 别名) 元组,失败时响应文本为 None
  56. """
  57. if isinstance(id_info, tuple):
  58. id_value, alias = id_info
  59. else:
  60. id_value = id_info
  61. alias = id_value
  62. url = f"{self.api_url}?id={id_value}&latest"
  63. proxies = None
  64. if self.proxy_url:
  65. proxies = {"http": self.proxy_url, "https": self.proxy_url}
  66. retries = 0
  67. while retries <= max_retries:
  68. try:
  69. response = requests.get(
  70. url,
  71. proxies=proxies,
  72. headers=self.DEFAULT_HEADERS,
  73. timeout=10,
  74. )
  75. response.raise_for_status()
  76. data_text = response.text
  77. data_json = json.loads(data_text)
  78. status = data_json.get("status", "未知")
  79. if status not in ["success", "cache"]:
  80. raise ValueError(f"响应状态异常: {status}")
  81. status_info = "最新数据" if status == "success" else "缓存数据"
  82. print(f"获取 {id_value} 成功({status_info})")
  83. return data_text, id_value, alias
  84. except Exception as e:
  85. retries += 1
  86. if retries <= max_retries:
  87. base_wait = random.uniform(min_retry_wait, max_retry_wait)
  88. additional_wait = (retries - 1) * random.uniform(1, 2)
  89. wait_time = base_wait + additional_wait
  90. print(f"请求 {id_value} 失败: {e}. {wait_time:.2f}秒后重试...")
  91. time.sleep(wait_time)
  92. else:
  93. print(f"请求 {id_value} 失败: {e}")
  94. return None, id_value, alias
  95. return None, id_value, alias
  96. def crawl_websites(
  97. self,
  98. ids_list: List[Union[str, Tuple[str, str]]],
  99. request_interval: int = 100,
  100. ) -> Tuple[Dict, Dict, List]:
  101. """
  102. 爬取多个网站数据
  103. Args:
  104. ids_list: 平台ID列表,每个元素可以是字符串或 (平台ID, 别名) 元组
  105. request_interval: 请求间隔(毫秒)
  106. Returns:
  107. (结果字典, ID到名称的映射, 失败ID列表) 元组
  108. """
  109. results = {}
  110. id_to_name = {}
  111. failed_ids = []
  112. for i, id_info in enumerate(ids_list):
  113. if isinstance(id_info, tuple):
  114. id_value, name = id_info
  115. else:
  116. id_value = id_info
  117. name = id_value
  118. id_to_name[id_value] = name
  119. response, _, _ = self.fetch_data(id_info)
  120. if response:
  121. try:
  122. data = json.loads(response)
  123. results[id_value] = {}
  124. for index, item in enumerate(data.get("items", []), 1):
  125. title = item.get("title")
  126. # 跳过无效标题(None、float、空字符串)
  127. if title is None or isinstance(title, float) or not str(title).strip():
  128. continue
  129. title = str(title).strip()
  130. url = item.get("url", "")
  131. mobile_url = item.get("mobileUrl", "")
  132. if title in results[id_value]:
  133. results[id_value][title]["ranks"].append(index)
  134. else:
  135. results[id_value][title] = {
  136. "ranks": [index],
  137. "url": url,
  138. "mobileUrl": mobile_url,
  139. }
  140. except json.JSONDecodeError:
  141. print(f"解析 {id_value} 响应失败")
  142. failed_ids.append(id_value)
  143. except Exception as e:
  144. print(f"处理 {id_value} 数据出错: {e}")
  145. failed_ids.append(id_value)
  146. else:
  147. failed_ids.append(id_value)
  148. # 请求间隔(除了最后一个)
  149. if i < len(ids_list) - 1:
  150. actual_interval = request_interval + random.randint(-10, 20)
  151. actual_interval = max(50, actual_interval)
  152. time.sleep(actual_interval / 1000)
  153. print(f"成功: {list(results.keys())}, 失败: {failed_ids}")
  154. return results, id_to_name, failed_ids