main.py 41 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133
  1. # coding=utf-8
  2. import json
  3. import os
  4. import time
  5. import random
  6. from datetime import datetime
  7. import webbrowser
  8. from typing import Dict, List, Tuple, Optional, Union
  9. import requests
  10. import pytz
  11. # 配置常量
  12. CONFIG = {
  13. "FEISHU_SEPARATOR": "━━━━━━━━━━━━━━━━━━━", # 飞书消息中,每个频率词之间的分割线,注意,其它类型的分割线可能会被飞书过滤而不显示
  14. "REQUEST_INTERVAL": 1000, # 毫秒
  15. "FEISHU_REPORT_TYPE": "daily", # 可选: "current", "daily", "both"
  16. "RANK_THRESHOLD": 5, # 排名阈值,前5名使用红色加粗显示
  17. "USE_PROXY": True, # 是否启用本地代理
  18. "DEFAULT_PROXY": "http://127.0.0.1:10086",
  19. "CONTINUE_WITHOUT_FEISHU": True, # 控制是否在没有飞书webhook URL时继续执行爬虫, 如果True ,会依然进行爬虫行为,会在github上持续的生成爬取的新闻数据
  20. "FEISHU_WEBHOOK_URL": "", # 飞书机器人的webhook URL,大概长这样:https://www.feishu.cn/flow/api/trigger-webhook/xxxx, 默认为空,推荐通过GitHub Secrets设置
  21. }
  22. class TimeHelper:
  23. """时间相关的辅助功能"""
  24. @staticmethod
  25. def get_beijing_time() -> datetime:
  26. """获取北京时间"""
  27. return datetime.now(pytz.timezone("Asia/Shanghai"))
  28. @staticmethod
  29. def format_date_folder() -> str:
  30. """返回日期文件夹名称格式"""
  31. return TimeHelper.get_beijing_time().strftime("%Y年%m月%d日")
  32. @staticmethod
  33. def format_time_filename() -> str:
  34. """返回时间文件名格式"""
  35. return TimeHelper.get_beijing_time().strftime("%H时%M分")
  36. class FileHelper:
  37. """文件操作相关的辅助功能"""
  38. @staticmethod
  39. def ensure_directory_exists(directory: str) -> None:
  40. """确保目录存在,如果不存在则创建"""
  41. if not os.path.exists(directory):
  42. os.makedirs(directory)
  43. @staticmethod
  44. def get_output_path(subfolder: str, filename: str) -> str:
  45. """获取输出文件路径"""
  46. date_folder = TimeHelper.format_date_folder()
  47. output_dir = os.path.join("output", date_folder, subfolder)
  48. FileHelper.ensure_directory_exists(output_dir)
  49. return os.path.join(output_dir, filename)
  50. class DataFetcher:
  51. """数据获取相关功能"""
  52. def __init__(self, proxy_url: Optional[str] = None):
  53. self.proxy_url = proxy_url
  54. def fetch_data(
  55. self,
  56. id_info: Union[str, Tuple[str, str]],
  57. max_retries: int = 2,
  58. min_retry_wait: int = 3,
  59. max_retry_wait: int = 5,
  60. ) -> Tuple[Optional[str], str, str]:
  61. """
  62. 同步获取指定ID的数据,失败时进行重试
  63. 接受'success'和'cache'两种状态,其他状态才会触发重试
  64. Args:
  65. id_info: ID信息,可以是ID字符串或(ID, 别名)元组
  66. max_retries: 最大重试次数
  67. min_retry_wait: 最小重试等待时间(秒)
  68. max_retry_wait: 最大重试等待时间(秒)
  69. Returns:
  70. (响应数据, ID, 别名)元组,如果请求失败则响应数据为None
  71. """
  72. # 处理ID和别名
  73. if isinstance(id_info, tuple):
  74. id_value, alias = id_info
  75. else:
  76. id_value = id_info
  77. alias = id_value
  78. url = f"https://newsnow.busiyi.world/api/s?id={id_value}&latest"
  79. # 设置代理
  80. proxies = None
  81. if self.proxy_url:
  82. proxies = {"http": self.proxy_url, "https": self.proxy_url}
  83. # 添加随机性模拟真实用户
  84. headers = {
  85. "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
  86. "Accept": "application/json, text/plain, */*",
  87. "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
  88. "Connection": "keep-alive",
  89. "Cache-Control": "no-cache",
  90. }
  91. retries = 0
  92. while retries <= max_retries:
  93. try:
  94. print(
  95. f"正在请求 {id_value} 数据... (尝试 {retries + 1}/{max_retries + 1})"
  96. )
  97. response = requests.get(
  98. url, proxies=proxies, headers=headers, timeout=10
  99. )
  100. response.raise_for_status() # 检查HTTP状态码
  101. # 解析JSON并检查响应状态
  102. data_text = response.text
  103. data_json = json.loads(data_text)
  104. # 修改状态检查逻辑:接受success和cache两种状态
  105. status = data_json.get("status", "未知")
  106. if status not in ["success", "cache"]:
  107. raise ValueError(f"响应状态异常: {status}")
  108. # 记录状态信息
  109. status_info = "最新数据" if status == "success" else "缓存数据"
  110. print(f"成功获取 {id_value} 数据({status_info})")
  111. return data_text, id_value, alias
  112. except Exception as e:
  113. retries += 1
  114. if retries <= max_retries:
  115. # 计算重试等待时间:基础3-5秒,每次重试增加1-2秒
  116. base_wait = random.uniform(min_retry_wait, max_retry_wait)
  117. additional_wait = (retries - 1) * random.uniform(1, 2)
  118. wait_time = base_wait + additional_wait
  119. print(
  120. f"请求 {id_value} 失败: {e}. 将在 {wait_time:.2f} 秒后重试..."
  121. )
  122. time.sleep(wait_time)
  123. else:
  124. print(f"请求 {id_value} 失败: {e}. 已达到最大重试次数。")
  125. return None, id_value, alias
  126. return None, id_value, alias
  127. def crawl_websites(
  128. self,
  129. ids_list: List[Union[str, Tuple[str, str]]],
  130. request_interval: int = CONFIG["REQUEST_INTERVAL"],
  131. ) -> Tuple[Dict, Dict, List]:
  132. """
  133. 爬取多个网站的数据,使用同步请求
  134. Args:
  135. ids_list: ID列表,每个元素可以是ID字符串或(ID, 别名)元组
  136. request_interval: 请求间隔(毫秒)
  137. Returns:
  138. (results, id_to_alias, failed_ids)元组
  139. """
  140. results = {}
  141. id_to_alias = {}
  142. failed_ids = []
  143. for i, id_info in enumerate(ids_list):
  144. # 处理ID和别名
  145. if isinstance(id_info, tuple):
  146. id_value, alias = id_info
  147. else:
  148. id_value = id_info
  149. alias = id_value
  150. # 添加到ID-别名映射
  151. id_to_alias[id_value] = alias
  152. # 发送请求
  153. response, _, _ = self.fetch_data(id_info)
  154. # 处理响应
  155. if response:
  156. try:
  157. data = json.loads(response)
  158. # 获取标题列表,同时记录排名
  159. results[id_value] = {}
  160. for index, item in enumerate(data.get("items", []), 1):
  161. title = item["title"]
  162. if title in results[id_value]:
  163. results[id_value][title].append(index)
  164. else:
  165. results[id_value][title] = [index]
  166. except json.JSONDecodeError:
  167. print(f"解析 {id_value} 的响应失败,不是有效的JSON")
  168. failed_ids.append(id_value)
  169. except Exception as e:
  170. print(f"处理 {id_value} 数据时出错: {e}")
  171. failed_ids.append(id_value)
  172. else:
  173. failed_ids.append(id_value)
  174. # 添加间隔时间,除非是最后一个请求
  175. if i < len(ids_list) - 1:
  176. # 添加一些随机性到间隔时间
  177. actual_interval = request_interval + random.randint(-10, 20)
  178. actual_interval = max(50, actual_interval) # 确保至少50毫秒
  179. print(f"等待 {actual_interval} 毫秒后发送下一个请求...")
  180. time.sleep(actual_interval / 1000)
  181. print(f"\n请求总结:")
  182. print(f"- 成功获取数据的ID: {list(results.keys())}")
  183. print(f"- 请求失败的ID: {failed_ids}")
  184. return results, id_to_alias, failed_ids
  185. class DataProcessor:
  186. """数据处理相关功能"""
  187. @staticmethod
  188. def save_titles_to_file(results: Dict, id_to_alias: Dict, failed_ids: List) -> str:
  189. """将标题保存到文件,包括失败的请求信息"""
  190. file_path = FileHelper.get_output_path(
  191. "txt", f"{TimeHelper.format_time_filename()}.txt"
  192. )
  193. with open(file_path, "w", encoding="utf-8") as f:
  194. # 先写入成功获取的数据
  195. for id_value, title_data in results.items():
  196. display_name = id_to_alias.get(id_value, id_value)
  197. f.write(f"{display_name}\n")
  198. for i, (title, ranks) in enumerate(title_data.items(), 1):
  199. rank_str = ",".join(map(str, ranks))
  200. f.write(f"{i}. {title} (排名:{rank_str})\n")
  201. f.write("\n")
  202. # 如果有失败的请求,写入失败信息
  203. if failed_ids:
  204. f.write("==== 以下ID请求失败 ====\n")
  205. for id_value in failed_ids:
  206. display_name = id_to_alias.get(id_value, id_value)
  207. f.write(f"{display_name} (ID: {id_value})\n")
  208. return file_path
  209. @staticmethod
  210. def load_frequency_words(
  211. frequency_file: str = "frequency_words.txt",
  212. ) -> Tuple[List[List[str]], List[str]]:
  213. """
  214. 加载频率词和过滤词,处理关联词
  215. Returns:
  216. (word_groups, filter_words)元组
  217. """
  218. if not os.path.exists(frequency_file):
  219. print(f"频率词文件 {frequency_file} 不存在")
  220. return [], []
  221. with open(frequency_file, "r", encoding="utf-8") as f:
  222. content = f.read()
  223. # 按双空行分割不同的词组
  224. word_groups = [
  225. group.strip() for group in content.split("\n\n") if group.strip()
  226. ]
  227. # 处理每个词组
  228. processed_groups = []
  229. filter_words = [] # 用于存储过滤词
  230. for group in word_groups:
  231. words = [word.strip() for word in group.split("\n") if word.strip()]
  232. # 分离频率词和过滤词
  233. group_frequency_words = []
  234. for word in words:
  235. if word.startswith("!"):
  236. # 去掉感叹号,添加到过滤词列表
  237. filter_words.append(word[1:])
  238. else:
  239. # 正常的频率词
  240. group_frequency_words.append(word)
  241. # 只有当词组中包含频率词时才添加到结果中
  242. if group_frequency_words:
  243. processed_groups.append(group_frequency_words)
  244. return processed_groups, filter_words
  245. @staticmethod
  246. def read_all_today_titles() -> Tuple[Dict, Dict, Dict]:
  247. """
  248. 读取当天所有txt文件的标题,并按来源合并,去除重复,记录时间和出现次数
  249. Returns:
  250. (all_results, id_to_alias, title_info)元组
  251. """
  252. date_folder = TimeHelper.format_date_folder()
  253. txt_dir = os.path.join("output", date_folder, "txt")
  254. if not os.path.exists(txt_dir):
  255. print(f"今日文件夹 {txt_dir} 不存在")
  256. return {}, {}, {}
  257. all_results = {} # 所有源的所有标题 {source_id: {title: [ranks]}}
  258. id_to_alias = {} # ID到别名的映射
  259. title_info = (
  260. {}
  261. ) # 标题信息 {source_id: {title: {"first_time": 首次时间, "last_time": 最后时间, "count": 出现次数, "ranks": [排名列表]}}}
  262. # 读取所有txt文件,按时间排序确保早的时间优先处理
  263. files = sorted([f for f in os.listdir(txt_dir) if f.endswith(".txt")])
  264. for file in files:
  265. # 从文件名提取时间信息 (例如 "12时34分.txt")
  266. time_info = file.replace(".txt", "")
  267. file_path = os.path.join(txt_dir, file)
  268. with open(file_path, "r", encoding="utf-8") as f:
  269. content = f.read()
  270. # 解析内容
  271. sections = content.split("\n\n")
  272. for section in sections:
  273. if not section.strip() or "==== 以下ID请求失败 ====" in section:
  274. continue
  275. lines = section.strip().split("\n")
  276. if len(lines) < 2:
  277. continue
  278. # 第一行是来源名
  279. source_name = lines[0].strip()
  280. # 提取标题和排名
  281. title_ranks = {}
  282. for line in lines[1:]:
  283. if line.strip():
  284. try:
  285. # 提取序号和正文部分
  286. match_num = None
  287. title_part = line.strip()
  288. # 处理格式 "数字. 标题"
  289. if (
  290. ". " in title_part
  291. and title_part.split(". ")[0].isdigit()
  292. ):
  293. parts = title_part.split(". ", 1)
  294. match_num = int(parts[0]) # 序号可能是排名
  295. title_part = parts[1]
  296. # 提取排名信息 "标题 (排名:1,2,3)"
  297. ranks = []
  298. if " (排名:" in title_part:
  299. title, rank_str = title_part.rsplit(" (排名:", 1)
  300. rank_str = rank_str.rstrip(")")
  301. ranks = [
  302. int(r)
  303. for r in rank_str.split(",")
  304. if r.strip() and r.isdigit()
  305. ]
  306. else:
  307. title = title_part
  308. # 如果没找到排名但有序号,则使用序号
  309. if not ranks and match_num is not None:
  310. ranks = [match_num]
  311. # 确保排名列表不为空
  312. if not ranks:
  313. ranks = [99] # 默认排名
  314. title_ranks[title] = ranks
  315. except Exception as e:
  316. print(f"解析标题行出错: {line}, 错误: {e}")
  317. # 处理来源数据
  318. DataProcessor._process_source_data(
  319. source_name,
  320. title_ranks,
  321. time_info,
  322. all_results,
  323. title_info,
  324. id_to_alias,
  325. )
  326. # 将结果从 {source_name: {title: [ranks]}} 转换为 {source_id: {title: [ranks]}}
  327. id_results = {}
  328. id_title_info = {}
  329. for name, titles in all_results.items():
  330. for id_value, alias in id_to_alias.items():
  331. if alias == name:
  332. id_results[id_value] = titles
  333. id_title_info[id_value] = title_info[name]
  334. break
  335. return id_results, id_to_alias, id_title_info
  336. @staticmethod
  337. def _process_source_data(
  338. source_name: str,
  339. title_ranks: Dict,
  340. time_info: str,
  341. all_results: Dict,
  342. title_info: Dict,
  343. id_to_alias: Dict,
  344. ) -> None:
  345. """处理来源数据,更新结果和标题信息"""
  346. if source_name not in all_results:
  347. # 首次遇到此来源
  348. all_results[source_name] = title_ranks
  349. # 初始化标题信息
  350. if source_name not in title_info:
  351. title_info[source_name] = {}
  352. # 记录每个标题的时间、次数和排名
  353. for title, ranks in title_ranks.items():
  354. title_info[source_name][title] = {
  355. "first_time": time_info, # 记录首次时间
  356. "last_time": time_info, # 最后时间初始同首次时间
  357. "count": 1,
  358. "ranks": ranks,
  359. }
  360. # 尝试反向生成ID
  361. reversed_id = source_name.lower().replace(" ", "-")
  362. id_to_alias[reversed_id] = source_name
  363. else:
  364. # 已有此来源,更新标题
  365. for title, ranks in title_ranks.items():
  366. if title not in all_results[source_name]:
  367. all_results[source_name][title] = ranks
  368. title_info[source_name][title] = {
  369. "first_time": time_info, # 新标题的首次和最后时间都设为当前
  370. "last_time": time_info,
  371. "count": 1,
  372. "ranks": ranks,
  373. }
  374. else:
  375. # 已存在的标题,更新最后时间,合并排名信息并增加计数
  376. existing_ranks = title_info[source_name][title]["ranks"]
  377. merged_ranks = existing_ranks.copy()
  378. for rank in ranks:
  379. if rank not in merged_ranks:
  380. merged_ranks.append(rank)
  381. title_info[source_name][title][
  382. "last_time"
  383. ] = time_info # 更新最后时间
  384. title_info[source_name][title]["ranks"] = merged_ranks
  385. title_info[source_name][title]["count"] += 1
  386. class StatisticsCalculator:
  387. """统计计算相关功能"""
  388. @staticmethod
  389. def count_word_frequency(
  390. results: Dict,
  391. word_groups: List[List[str]],
  392. filter_words: List[str],
  393. id_to_alias: Dict,
  394. title_info: Optional[Dict] = None,
  395. rank_threshold: int = CONFIG["RANK_THRESHOLD"],
  396. ) -> Tuple[List[Dict], int]:
  397. """
  398. 统计词频,处理关联词和大小写不敏感,每个标题只计入首个匹配词组,并应用过滤词
  399. Returns:
  400. (stats, total_titles)元组
  401. """
  402. word_stats = {}
  403. total_titles = 0
  404. processed_titles = {} # 用于跟踪已处理标题 {source_id: {title: True}}
  405. # 初始化title_info
  406. if title_info is None:
  407. title_info = {}
  408. # 为每个词组创建统计对象
  409. for group in word_groups:
  410. group_key = " ".join(group)
  411. word_stats[group_key] = {"count": 0, "titles": {}}
  412. # 遍历所有标题并统计
  413. for source_id, titles_data in results.items():
  414. total_titles += len(titles_data)
  415. # 初始化该来源的处理记录
  416. if source_id not in processed_titles:
  417. processed_titles[source_id] = {}
  418. for title, source_ranks in titles_data.items():
  419. # 跳过已处理的标题
  420. if title in processed_titles.get(source_id, {}):
  421. continue
  422. title_lower = title.lower() # 转换为小写以实现大小写不敏感
  423. # 检查是否包含任何过滤词
  424. contains_filter_word = any(
  425. filter_word.lower() in title_lower for filter_word in filter_words
  426. )
  427. # 如果包含过滤词,跳过这个标题
  428. if contains_filter_word:
  429. continue
  430. # 按顺序检查每个词组
  431. for group in word_groups:
  432. group_key = " ".join(group)
  433. # 检查是否有任何一个词在标题中
  434. matched = any(word.lower() in title_lower for word in group)
  435. # 如果匹配,增加计数并添加标题,然后标记为已处理
  436. if matched:
  437. word_stats[group_key]["count"] += 1
  438. if source_id not in word_stats[group_key]["titles"]:
  439. word_stats[group_key]["titles"][source_id] = []
  440. # 获取标题信息
  441. first_time = ""
  442. last_time = ""
  443. count_info = 1
  444. ranks = source_ranks if source_ranks else []
  445. if (
  446. title_info
  447. and source_id in title_info
  448. and title in title_info[source_id]
  449. ):
  450. info = title_info[source_id][title]
  451. first_time = info.get("first_time", "")
  452. last_time = info.get("last_time", "")
  453. count_info = info.get("count", 1)
  454. if "ranks" in info and info["ranks"]:
  455. ranks = info["ranks"]
  456. # 添加带信息的标题
  457. word_stats[group_key]["titles"][source_id].append(
  458. {
  459. "title": title,
  460. "first_time": first_time,
  461. "last_time": last_time,
  462. "count": count_info,
  463. "ranks": ranks,
  464. }
  465. )
  466. # 标记该标题已处理,不再匹配其他词组
  467. if source_id not in processed_titles:
  468. processed_titles[source_id] = {}
  469. processed_titles[source_id][title] = True
  470. break # 找到第一个匹配的词组后退出循环
  471. # 转换统计结果
  472. stats = []
  473. for group_key, data in word_stats.items():
  474. titles_with_info = []
  475. for source_id, title_list in data["titles"].items():
  476. source_alias = id_to_alias.get(source_id, source_id)
  477. for title_data in title_list:
  478. title = title_data["title"]
  479. first_time = title_data["first_time"]
  480. last_time = title_data["last_time"]
  481. count_info = title_data["count"]
  482. ranks = title_data.get("ranks", [])
  483. # 确保排名是有效的
  484. if not ranks:
  485. ranks = [99] # 使用默认排名
  486. # 格式化排名信息
  487. rank_display = StatisticsCalculator._format_rank_display(
  488. ranks, rank_threshold
  489. )
  490. # 格式化时间信息
  491. time_display = StatisticsCalculator._format_time_display(
  492. first_time, last_time
  493. )
  494. # 格式化标题信息
  495. formatted_title = f"[{source_alias}] {title}"
  496. if rank_display:
  497. formatted_title += f" {rank_display}"
  498. if time_display:
  499. formatted_title += (
  500. f" <font color='grey'>- {time_display}</font>"
  501. )
  502. if count_info > 1:
  503. formatted_title += (
  504. f" <font color='green'>({count_info}次)</font>"
  505. )
  506. titles_with_info.append(formatted_title)
  507. stats.append(
  508. {
  509. "word": group_key,
  510. "count": data["count"],
  511. "titles": titles_with_info,
  512. "percentage": (
  513. round(data["count"] / total_titles * 100, 2)
  514. if total_titles > 0
  515. else 0
  516. ),
  517. }
  518. )
  519. # 按出现次数从高到低排序
  520. stats.sort(key=lambda x: x["count"], reverse=True)
  521. return stats, total_titles
  522. @staticmethod
  523. def _format_rank_display(ranks: List[int], rank_threshold: int = 5) -> str:
  524. """格式化排名显示,前5名使用红色数字"""
  525. if not ranks:
  526. return ""
  527. # 排序排名并确保不重复
  528. unique_ranks = sorted(set(ranks))
  529. min_rank = unique_ranks[0]
  530. max_rank = unique_ranks[-1]
  531. # 所有排名都使用[],只有前5名显示红色
  532. if min_rank <= rank_threshold:
  533. if min_rank == max_rank:
  534. # 单一排名且在前5
  535. return f"<font color='red'>**[{min_rank}]**</font>"
  536. else:
  537. return f"<font color='red'>**[{min_rank} - {max_rank}]**</font>"
  538. else:
  539. # 排名在5名之后,使用普通显示
  540. if min_rank == max_rank:
  541. return f"[{min_rank}]"
  542. else:
  543. return f"[{min_rank} - {max_rank}]"
  544. @staticmethod
  545. def _format_time_display(first_time: str, last_time: str) -> str:
  546. """格式化时间显示,单次显示时间,多次显示时间范围"""
  547. if not first_time:
  548. return ""
  549. if first_time == last_time or not last_time:
  550. # 只有一个时间点,直接显示
  551. return first_time
  552. else:
  553. # 有两个时间点,显示范围
  554. return f"[{first_time} ~ {last_time}]"
  555. class ReportGenerator:
  556. """报告生成相关功能"""
  557. @staticmethod
  558. def generate_html_report(
  559. stats: List[Dict],
  560. total_titles: int,
  561. failed_ids: Optional[List] = None,
  562. is_daily: bool = False,
  563. ) -> str:
  564. """
  565. 生成HTML报告,包括失败的请求信息
  566. Returns:
  567. HTML文件路径
  568. """
  569. # 创建文件路径
  570. if is_daily:
  571. filename = "当日统计.html"
  572. else:
  573. filename = f"{TimeHelper.format_time_filename()}.html"
  574. file_path = FileHelper.get_output_path("html", filename)
  575. # HTML模板和内容生成
  576. html_content = ReportGenerator._create_html_content(
  577. stats, total_titles, failed_ids, is_daily
  578. )
  579. # 写入文件
  580. with open(file_path, "w", encoding="utf-8") as f:
  581. f.write(html_content)
  582. # 如果是当日统计,还需要在根目录下生成index.html
  583. if is_daily:
  584. root_file_path = "index.html"
  585. with open(root_file_path, "w", encoding="utf-8") as f:
  586. f.write(html_content)
  587. print(
  588. f"当日统计报告已保存到根目录的index.html: {os.path.abspath(root_file_path)}"
  589. )
  590. return file_path
  591. @staticmethod
  592. def _create_html_content(
  593. stats: List[Dict],
  594. total_titles: int,
  595. failed_ids: Optional[List] = None,
  596. is_daily: bool = False,
  597. ) -> str:
  598. """创建HTML内容"""
  599. # HTML头部
  600. html = """
  601. <!DOCTYPE html>
  602. <html>
  603. <head>
  604. <meta charset="UTF-8">
  605. <title>频率词统计报告</title>
  606. <style>
  607. body { font-family: Arial, sans-serif; margin: 20px; }
  608. h1, h2 { color: #333; }
  609. table { border-collapse: collapse; width: 100%; margin-top: 20px; }
  610. th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
  611. th { background-color: #f2f2f2; }
  612. tr:nth-child(even) { background-color: #f9f9f9; }
  613. .word { font-weight: bold; }
  614. .count { text-align: center; }
  615. .percentage { text-align: center; }
  616. .titles { max-width: 500px; }
  617. .source { color: #666; font-style: italic; }
  618. .error { color: #d9534f; }
  619. </style>
  620. </head>
  621. <body>
  622. <h1>频率词统计报告</h1>
  623. """
  624. # 报告类型
  625. if is_daily:
  626. html += "<p>报告类型: 当日汇总</p>"
  627. # 基本信息
  628. now = TimeHelper.get_beijing_time()
  629. html += f"<p>总标题数: {total_titles}</p>"
  630. html += f"<p>生成时间: {now.strftime('%Y-%m-%d %H:%M:%S')}</p>"
  631. # 失败的请求信息
  632. if failed_ids and len(failed_ids) > 0:
  633. html += """
  634. <div class="error">
  635. <h2>请求失败的平台</h2>
  636. <ul>
  637. """
  638. for id_value in failed_ids:
  639. html += f"<li>{id_value}</li>"
  640. html += """
  641. </ul>
  642. </div>
  643. """
  644. # 表格头部
  645. html += """
  646. <table>
  647. <tr>
  648. <th>排名</th>
  649. <th>频率词</th>
  650. <th>出现次数</th>
  651. <th>占比</th>
  652. <th>相关标题</th>
  653. </tr>
  654. """
  655. # 表格内容
  656. for i, stat in enumerate(stats, 1):
  657. html += f"""
  658. <tr>
  659. <td>{i}</td>
  660. <td class="word">{stat['word']}</td>
  661. <td class="count">{stat['count']}</td>
  662. <td class="percentage">{stat['percentage']}%</td>
  663. <td class="titles">{"<br>".join(stat['titles'])}</td>
  664. </tr>
  665. """
  666. # 表格结尾
  667. html += """
  668. </table>
  669. </body>
  670. </html>
  671. """
  672. return html
  673. @staticmethod
  674. def send_to_feishu(
  675. stats: List[Dict],
  676. failed_ids: Optional[List] = None,
  677. report_type: str = "单次爬取",
  678. ) -> bool:
  679. """
  680. 将频率词统计结果发送到飞书
  681. Returns:
  682. 成功发送返回True,否则返回False
  683. """
  684. # 获取webhook URL,优先使用环境变量,其次使用配置中的URL
  685. webhook_url = os.environ.get("FEISHU_WEBHOOK_URL", CONFIG["FEISHU_WEBHOOK_URL"])
  686. # 检查webhook URL是否有效
  687. if not webhook_url:
  688. print(f"警告: FEISHU_WEBHOOK_URL未设置或无效,跳过发送飞书通知")
  689. return False
  690. headers = {"Content-Type": "application/json"}
  691. # 获取总标题数
  692. total_titles = sum(len(stat["titles"]) for stat in stats if stat["count"] > 0)
  693. # 构建文本内容
  694. text_content = ReportGenerator._build_feishu_content(stats, failed_ids)
  695. # 构造消息体
  696. now = TimeHelper.get_beijing_time()
  697. payload = {
  698. "msg_type": "text",
  699. "content": {
  700. "total_titles": total_titles,
  701. "timestamp": now.strftime("%Y-%m-%d %H:%M:%S"),
  702. "report_type": report_type,
  703. "text": text_content,
  704. },
  705. }
  706. # 发送请求
  707. try:
  708. response = requests.post(webhook_url, headers=headers, json=payload)
  709. if response.status_code == 200:
  710. print(f"数据发送到飞书成功 [{report_type}]")
  711. return True
  712. else:
  713. print(
  714. f"发送到飞书失败 [{report_type}],状态码:{response.status_code},响应:{response.text}"
  715. )
  716. return False
  717. except Exception as e:
  718. print(f"发送到飞书时出错 [{report_type}]:{e}")
  719. return False
  720. @staticmethod
  721. def _build_feishu_content(
  722. stats: List[Dict], failed_ids: Optional[List] = None
  723. ) -> str:
  724. """构建飞书消息内容,使用富文本格式"""
  725. text_content = ""
  726. # 添加频率词统计信息
  727. filtered_stats = [stat for stat in stats if stat["count"] > 0]
  728. # 如果有统计数据,添加标题
  729. if filtered_stats:
  730. text_content += "📊 **热点词汇统计**\n\n"
  731. for i, stat in enumerate(filtered_stats):
  732. word = stat["word"]
  733. count = stat["count"]
  734. # 关键词加粗,计数和百分比使用不同颜色
  735. if count >= 10:
  736. # 高频词使用红色
  737. text_content += (
  738. f"🔥 **{word}** : <font color='red'>{count}</font> 条\n\n"
  739. )
  740. elif count >= 5:
  741. # 中频词使用橙色
  742. text_content += (
  743. f"📈 **{word}** : <font color='orange'>{count}</font> 条\n\n"
  744. )
  745. else:
  746. # 低频词使用默认颜色
  747. text_content += f"📌 **{word}** : {count} 条\n\n"
  748. # 添加相关标题
  749. for j, title in enumerate(stat["titles"], 1):
  750. # 提取来源信息
  751. if title.startswith("[") and "]" in title:
  752. source_end = title.index("]") + 1
  753. source = title[:source_end]
  754. rest = title[source_end:].strip()
  755. # 使用灰色显示来源
  756. text_content += (
  757. f" {j}. <font color='grey'>{source}</font> {rest}\n"
  758. )
  759. else:
  760. text_content += f" {j}. {title}\n"
  761. # 在每条新闻后添加额外间隔(除了最后一条)
  762. if j < len(stat["titles"]):
  763. text_content += "\n"
  764. # 添加分割线,使用更优雅的样式
  765. if i < len(filtered_stats) - 1:
  766. text_content += f"\n{CONFIG['FEISHU_SEPARATOR']}\n\n"
  767. if not text_content:
  768. text_content = "📭 暂无匹配的热点词汇\n\n"
  769. # 添加失败平台信息
  770. if failed_ids and len(failed_ids) > 0:
  771. if text_content and "暂无匹配" not in text_content:
  772. text_content += f"\n{CONFIG['FEISHU_SEPARATOR']}\n\n"
  773. text_content += "⚠️ **数据获取失败的平台:**\n\n"
  774. for i, id_value in enumerate(failed_ids, 1):
  775. text_content += f" • <font color='red'>{id_value}</font>\n"
  776. # 添加底部时间戳
  777. now = TimeHelper.get_beijing_time()
  778. text_content += f"\n\n<font color='grey'>更新时间:{now.strftime('%Y-%m-%d %H:%M:%S')}</font>"
  779. return text_content
  780. class NewsAnalyzer:
  781. """新闻分析主类"""
  782. def __init__(
  783. self,
  784. request_interval: int = CONFIG["REQUEST_INTERVAL"],
  785. feishu_report_type: str = CONFIG["FEISHU_REPORT_TYPE"],
  786. rank_threshold: int = CONFIG["RANK_THRESHOLD"],
  787. ):
  788. """
  789. 初始化新闻分析器
  790. Args:
  791. request_interval: 请求间隔(毫秒)
  792. feishu_report_type: 飞书报告类型,可选值: "current"(当前爬取), "daily"(当日汇总), "both"(两者都发送)
  793. rank_threshold: 排名显示阈值
  794. """
  795. self.request_interval = request_interval
  796. self.feishu_report_type = feishu_report_type
  797. self.rank_threshold = rank_threshold
  798. # 判断是否在GitHub Actions环境中
  799. self.is_github_actions = os.environ.get("GITHUB_ACTIONS") == "true"
  800. # 设置代理
  801. self.proxy_url = None
  802. if not self.is_github_actions and CONFIG["USE_PROXY"]:
  803. # 本地环境且启用代理时使用代理
  804. self.proxy_url = CONFIG["DEFAULT_PROXY"]
  805. print("本地环境,使用代理")
  806. elif not self.is_github_actions and not CONFIG["USE_PROXY"]:
  807. print("本地环境,未启用代理")
  808. else:
  809. print("GitHub Actions环境,不使用代理")
  810. # 初始化数据获取器
  811. self.data_fetcher = DataFetcher(self.proxy_url)
  812. def generate_daily_summary(self) -> Optional[str]:
  813. """
  814. 生成当日统计报告
  815. Returns:
  816. HTML文件路径,如果生成失败则返回None
  817. """
  818. print("开始生成当日统计报告...")
  819. # 读取当天所有标题
  820. all_results, id_to_alias, title_info = DataProcessor.read_all_today_titles()
  821. if not all_results:
  822. print("没有找到当天的数据")
  823. return None
  824. # 计算标题总数
  825. total_titles = sum(len(titles) for titles in all_results.values())
  826. print(f"读取到 {total_titles} 个标题")
  827. # 加载频率词和过滤词
  828. word_groups, filter_words = DataProcessor.load_frequency_words()
  829. # 统计词频
  830. stats, total_titles = StatisticsCalculator.count_word_frequency(
  831. all_results,
  832. word_groups,
  833. filter_words,
  834. id_to_alias,
  835. title_info,
  836. self.rank_threshold,
  837. )
  838. # 生成HTML报告
  839. html_file = ReportGenerator.generate_html_report(
  840. stats, total_titles, is_daily=True
  841. )
  842. print(f"当日HTML统计报告已生成: {html_file}")
  843. # 根据配置决定是否发送当日汇总到飞书
  844. if self.feishu_report_type in ["daily", "both"]:
  845. ReportGenerator.send_to_feishu(stats, [], "当日汇总")
  846. return html_file
  847. def run(self) -> None:
  848. """执行新闻分析流程"""
  849. # 输出当前时间信息
  850. now = TimeHelper.get_beijing_time()
  851. print(f"当前北京时间: {now.strftime('%Y-%m-%d %H:%M:%S')}")
  852. # 检查FEISHU_WEBHOOK_URL是否存在
  853. webhook_url = os.environ.get("FEISHU_WEBHOOK_URL", CONFIG["FEISHU_WEBHOOK_URL"])
  854. if not webhook_url and not CONFIG["CONTINUE_WITHOUT_FEISHU"]:
  855. print(
  856. "错误: FEISHU_WEBHOOK_URL未设置或无效,且CONTINUE_WITHOUT_FEISHU为False,程序退出"
  857. )
  858. return
  859. if not webhook_url:
  860. print(
  861. "警告: FEISHU_WEBHOOK_URL未设置或无效,将继续执行爬虫但不发送飞书通知"
  862. )
  863. print(f"飞书报告类型: {self.feishu_report_type}")
  864. print(f"排名阈值: {self.rank_threshold}")
  865. # 要爬取的网站ID列表
  866. ids = [
  867. ("toutiao", "今日头条"),
  868. ("baidu", "百度热搜"),
  869. ("wallstreetcn-hot", "华尔街见闻"),
  870. ("thepaper", "澎湃新闻"),
  871. ("bilibili-hot-search", "bilibili 热搜"),
  872. ("cls-hot", "财联社热门"),
  873. ("ifeng", "凤凰网"),
  874. "tieba",
  875. "weibo",
  876. "douyin",
  877. "zhihu",
  878. ]
  879. print(f"开始爬取数据,请求间隔设置为 {self.request_interval} 毫秒")
  880. # 确保output目录存在
  881. FileHelper.ensure_directory_exists("output")
  882. # 爬取数据
  883. results, id_to_alias, failed_ids = self.data_fetcher.crawl_websites(
  884. ids, self.request_interval
  885. )
  886. # 保存标题到文件
  887. title_file = DataProcessor.save_titles_to_file(results, id_to_alias, failed_ids)
  888. print(f"标题已保存到: {title_file}")
  889. # 从文件名中提取时间信息
  890. time_info = os.path.basename(title_file).replace(".txt", "")
  891. # 创建标题信息字典
  892. title_info = {}
  893. for source_id, titles_data in results.items():
  894. title_info[source_id] = {}
  895. for title, ranks in titles_data.items():
  896. title_info[source_id][title] = {
  897. "first_time": time_info,
  898. "last_time": time_info,
  899. "count": 1,
  900. "ranks": ranks,
  901. }
  902. # 加载频率词和过滤词
  903. word_groups, filter_words = DataProcessor.load_frequency_words()
  904. # 统计词频
  905. stats, total_titles = StatisticsCalculator.count_word_frequency(
  906. results,
  907. word_groups,
  908. filter_words,
  909. id_to_alias,
  910. title_info,
  911. self.rank_threshold,
  912. )
  913. # 根据配置决定发送哪种报告
  914. if self.feishu_report_type in ["current", "both"]:
  915. # 发送当前爬取数据到飞书
  916. ReportGenerator.send_to_feishu(stats, failed_ids, "单次爬取")
  917. # 生成HTML报告
  918. html_file = ReportGenerator.generate_html_report(
  919. stats, total_titles, failed_ids
  920. )
  921. print(f"HTML报告已生成: {html_file}")
  922. # 生成当日统计报告
  923. daily_html = self.generate_daily_summary()
  924. # 在本地环境中自动打开HTML文件
  925. if not self.is_github_actions and html_file:
  926. file_url = "file://" + os.path.abspath(html_file)
  927. print(f"正在打开HTML报告: {file_url}")
  928. webbrowser.open(file_url)
  929. if daily_html:
  930. daily_url = "file://" + os.path.abspath(daily_html)
  931. print(f"正在打开当日统计报告: {daily_url}")
  932. webbrowser.open(daily_url)
  933. def main():
  934. """程序入口点"""
  935. # 初始化并运行新闻分析器
  936. analyzer = NewsAnalyzer(
  937. request_interval=CONFIG["REQUEST_INTERVAL"],
  938. feishu_report_type=CONFIG["FEISHU_REPORT_TYPE"],
  939. rank_threshold=CONFIG["RANK_THRESHOLD"],
  940. )
  941. analyzer.run()
  942. if __name__ == "__main__":
  943. main()