Преглед изворни кода

fix: 修复增量模式下识别问题

sansan пре 4 месеци
родитељ
комит
7468f8adcc

+ 11 - 4
README-EN.md

@@ -13,7 +13,7 @@
 [![GitHub Stars](https://img.shields.io/github/stars/sansan0/TrendRadar?style=flat-square&logo=github&color=yellow)](https://github.com/sansan0/TrendRadar/stargazers)
 [![GitHub Forks](https://img.shields.io/github/forks/sansan0/TrendRadar?style=flat-square&logo=github&color=blue)](https://github.com/sansan0/TrendRadar/network/members)
 [![License](https://img.shields.io/badge/license-GPL--3.0-blue.svg?style=flat-square)](LICENSE)
-[![Version](https://img.shields.io/badge/version-v4.0.0-blue.svg)](https://github.com/sansan0/TrendRadar)
+[![Version](https://img.shields.io/badge/version-v4.0.3-blue.svg)](https://github.com/sansan0/TrendRadar)
 [![MCP](https://img.shields.io/badge/MCP-v1.1.0-green.svg)](https://github.com/sansan0/TrendRadar)
 
 [![WeWork](https://img.shields.io/badge/WeWork-Notification-00D4AA?style=flat-square)](https://work.weixin.qq.com/)
@@ -330,10 +330,11 @@ Transform from "algorithm recommendation captivity" to "actively getting the inf
 - **Major Version Upgrade**: Upgrading from v1.x to v2.y, recommend deleting existing fork and re-forking to save effort and avoid config conflicts
 
 
-### 2025/12/17 - v4.0.1
+### 2025/12/20 - v4.0.3
+
+- Added URL normalization to fix duplicate push issues caused by dynamic parameters (e.g., Weibo's `band_rank`)
+- Fixed incremental mode detection logic to correctly identify historical titles
 
-- StorageManager adds push record proxy methods
-- S3 client switches to virtual-hosted style for better compatibility (supports Tencent Cloud COS and more services)
 
 ### 2025/12/13 - mcp-v1.1.0
 
@@ -349,6 +350,12 @@ Transform from "algorithm recommendation captivity" to "actively getting the inf
 <summary>👉 Click to expand: <strong>Historical Updates</strong></summary>
 
 
+### 2025/12/17 - v4.0.1
+
+- StorageManager adds push record proxy methods
+- S3 client switches to virtual-hosted style for better compatibility (supports Tencent Cloud COS and more services)
+
+
 ### 2025/12/13 - v4.0.0
 
 **🎉 Major Update: Comprehensive Refactoring of Storage and Core Architecture**

+ 11 - 4
README.md

@@ -13,7 +13,7 @@
 [![GitHub Stars](https://img.shields.io/github/stars/sansan0/TrendRadar?style=flat-square&logo=github&color=yellow)](https://github.com/sansan0/TrendRadar/stargazers)
 [![GitHub Forks](https://img.shields.io/github/forks/sansan0/TrendRadar?style=flat-square&logo=github&color=blue)](https://github.com/sansan0/TrendRadar/network/members)
 [![License](https://img.shields.io/badge/license-GPL--3.0-blue.svg?style=flat-square)](LICENSE)
-[![Version](https://img.shields.io/badge/version-v4.0.2-blue.svg)](https://github.com/sansan0/TrendRadar)
+[![Version](https://img.shields.io/badge/version-v4.0.3-blue.svg)](https://github.com/sansan0/TrendRadar)
 [![MCP](https://img.shields.io/badge/MCP-v1.1.0-green.svg)](https://github.com/sansan0/TrendRadar)
 
 [![企业微信通知](https://img.shields.io/badge/企业微信-通知-00D4AA?style=flat-square)](https://work.weixin.qq.com/)
@@ -377,10 +377,10 @@ GitHub 一键 Fork 即可使用,无需编程基础。
 
 
 
-### 2025/12/17 - v4.0.1
+### 2025/12/20 - v4.0.3
 
-- StorageManager 添加推送记录代理方法
-- S3 客户端切换至 virtual-hosted style 以提升兼容性(支持腾讯云 COS 等更多服务)
+- 新增 URL 标准化功能,解决微博等平台因动态参数(如 `band_rank`)导致的重复推送问题
+- 修复增量模式检测逻辑,正确识别历史标题
 
 
 ### 2025/12/13 - mcp-v1.1.0
@@ -397,6 +397,13 @@ GitHub 一键 Fork 即可使用,无需编程基础。
 <summary>👉 点击展开:<strong>历史更新</strong></summary>
 
 
+
+### 2025/12/17 - v4.0.1
+
+- StorageManager 添加推送记录代理方法
+- S3 客户端切换至 virtual-hosted style 以提升兼容性(支持腾讯云 COS 等更多服务)
+
+
 ### 2025/12/13 - v4.0.0
 
 **🎉 重大更新:全面重构存储和核心架构**

+ 1 - 1
config/config.yaml

@@ -73,7 +73,7 @@ crawler:
 
 # 推送模式选择
 report:
-  mode: "daily" # 可选: "daily"|"incremental"|"current"
+  mode: "current" # 可选: "daily"|"incremental"|"current"
   rank_threshold: 5 # 排名高亮阈值
   sort_by_position_first: false # 排序优先级:true=先按配置位置排序,false=先按热点条数排序
   max_news_per_keyword: 0 # 每个关键词最大显示数量,0=不限制

+ 1 - 1
trendradar/__init__.py

@@ -9,5 +9,5 @@ TrendRadar - 热点新闻聚合与分析工具
 
 from trendradar.context import AppContext
 
-__version__ = "4.0.2"
+__version__ = "4.0.3"
 __all__ = ["AppContext", "__version__"]

+ 27 - 12
trendradar/__main__.py

@@ -214,8 +214,14 @@ class NewsAnalyzer:
         self, stats: List[Dict], new_titles: Optional[Dict] = None
     ) -> bool:
         """检查是否有有效的新闻内容"""
-        if self.report_mode in ["incremental", "current"]:
-            # 增量模式和current模式下,只要stats有内容就说明有匹配的新闻
+        if self.report_mode == "incremental":
+            # 增量模式:必须有新增标题才推送
+            has_new_titles = bool(
+                new_titles and any(len(titles) > 0 for titles in new_titles.values())
+            )
+            return has_new_titles
+        elif self.report_mode == "current":
+            # current模式:只要stats有内容就说明有匹配的新闻
             return any(stat["count"] > 0 for stat in stats)
         else:
             # 当日汇总模式下,检查是否有匹配的频率词新闻或新增新闻
@@ -227,15 +233,17 @@ class NewsAnalyzer:
 
     def _load_analysis_data(
         self,
+        quiet: bool = False,
     ) -> Optional[Tuple[Dict, Dict, Dict, Dict, List, List]]:
         """统一的数据加载和预处理,使用当前监控平台列表过滤历史数据"""
         try:
             # 获取当前配置的监控平台ID列表
             current_platform_ids = self.ctx.platform_ids
-            print(f"当前监控平台: {current_platform_ids}")
+            if not quiet:
+                print(f"当前监控平台: {current_platform_ids}")
 
             all_results, id_to_name, title_info = self.ctx.read_today_titles(
-                current_platform_ids
+                current_platform_ids, quiet=quiet
             )
 
             if not all_results:
@@ -243,9 +251,10 @@ class NewsAnalyzer:
                 return None
 
             total_titles = sum(len(titles) for titles in all_results.values())
-            print(f"读取到 {total_titles} 个标题(已按当前监控平台过滤)")
+            if not quiet:
+                print(f"读取到 {total_titles} 个标题(已按当前监控平台过滤)")
 
-            new_titles = self.ctx.detect_new_titles(current_platform_ids)
+            new_titles = self.ctx.detect_new_titles(current_platform_ids, quiet=quiet)
             word_groups, filter_words, global_filters = self.ctx.load_frequency_words()
 
             return (
@@ -293,6 +302,7 @@ class NewsAnalyzer:
         failed_ids: Optional[List] = None,
         is_daily_summary: bool = False,
         global_filters: Optional[List[str]] = None,
+        quiet: bool = False,
     ) -> Tuple[List[Dict], Optional[str]]:
         """统一的分析流水线:数据处理 → 统计计算 → HTML生成"""
 
@@ -306,6 +316,7 @@ class NewsAnalyzer:
             new_titles,
             mode=mode,
             global_filters=global_filters,
+            quiet=quiet,
         )
 
         # HTML生成(如果启用)
@@ -406,9 +417,12 @@ class NewsAnalyzer:
         ):
             mode_strategy = self._get_mode_strategy()
             if "实时" in report_type:
-                print(
-                    f"跳过实时推送通知:{mode_strategy['mode_name']}下未检测到匹配的新闻"
-                )
+                if self.report_mode == "incremental":
+                    print("跳过实时推送通知:增量模式下未检测到新增的新闻")
+                else:
+                    print(
+                        f"跳过实时推送通知:{mode_strategy['mode_name']}下未检测到匹配的新闻"
+                    )
             else:
                 print(
                     f"跳过{mode_strategy['summary_report_type']}通知:未匹配到有效的新闻内容"
@@ -466,8 +480,8 @@ class NewsAnalyzer:
         summary_type = "当前榜单汇总" if mode == "current" else "当日汇总"
         print(f"生成{summary_type}HTML...")
 
-        # 加载分析数据
-        analysis_data = self._load_analysis_data()
+        # 加载分析数据(静默模式,避免重复输出日志)
+        analysis_data = self._load_analysis_data(quiet=True)
         if not analysis_data:
             return None
 
@@ -475,7 +489,7 @@ class NewsAnalyzer:
             analysis_data
         )
 
-        # 运行分析流水线
+        # 运行分析流水线(静默模式,避免重复输出日志)
         _, html_file = self._run_analysis_pipeline(
             all_results,
             mode,
@@ -486,6 +500,7 @@ class NewsAnalyzer:
             id_to_name,
             is_daily_summary=True,
             global_filters=global_filters,
+            quiet=True,
         )
 
         if html_file:

+ 7 - 5
trendradar/context.py

@@ -167,20 +167,20 @@ class AppContext:
         return save_titles_to_file(results, id_to_name, failed_ids, output_path, clean_title)
 
     def read_today_titles(
-        self, platform_ids: Optional[List[str]] = None
+        self, platform_ids: Optional[List[str]] = None, quiet: bool = False
     ) -> Tuple[Dict, Dict, Dict]:
         """读取当天所有标题"""
-        return read_all_today_titles(self.get_storage_manager(), platform_ids)
+        return read_all_today_titles(self.get_storage_manager(), platform_ids, quiet=quiet)
 
     def detect_new_titles(
-        self, platform_ids: Optional[List[str]] = None
+        self, platform_ids: Optional[List[str]] = None, quiet: bool = False
     ) -> Dict:
         """检测最新批次的新增标题"""
-        return detect_latest_new_titles(self.get_storage_manager(), platform_ids)
+        return detect_latest_new_titles(self.get_storage_manager(), platform_ids, quiet=quiet)
 
     def is_first_crawl(self) -> bool:
         """检测是否是当天第一次爬取"""
-        return is_first_crawl_today("output", self.format_date())
+        return self.get_storage_manager().is_first_crawl_today()
 
     # === 频率词处理 ===
 
@@ -212,6 +212,7 @@ class AppContext:
         new_titles: Optional[Dict] = None,
         mode: str = "daily",
         global_filters: Optional[List[str]] = None,
+        quiet: bool = False,
     ) -> Tuple[List[Dict], int]:
         """统计词频"""
         return count_word_frequency(
@@ -229,6 +230,7 @@ class AppContext:
             sort_by_position_first=self.config.get("SORT_BY_POSITION_FIRST", False),
             is_first_crawl_func=self.is_first_crawl,
             convert_time_func=self.convert_time_display,
+            quiet=quiet,
         )
 
     # === 报告生成 ===

+ 6 - 3
trendradar/core/analyzer.py

@@ -102,6 +102,7 @@ def count_word_frequency(
     sort_by_position_first: bool = False,
     is_first_crawl_func: Optional[Callable[[], bool]] = None,
     convert_time_func: Optional[Callable[[str], str]] = None,
+    quiet: bool = False,
 ) -> Tuple[List[Dict], int]:
     """
     统计词频,支持必须词、频率词、过滤词、全局过滤词,并标记新增标题
@@ -121,6 +122,7 @@ def count_word_frequency(
         sort_by_position_first: 是否优先按配置位置排序
         is_first_crawl_func: 检测是否是当天第一次爬取的函数
         convert_time_func: 时间格式转换函数
+        quiet: 是否静默模式(不打印日志)
 
     Returns:
         Tuple[List[Dict], int]: (统计结果列表, 总标题数)
@@ -461,9 +463,10 @@ def count_word_frequency(
         # 先按热点条数,再按配置位置(原逻辑)
         stats.sort(key=lambda x: (-x["count"], x["position"]))
 
-    # 打印过滤后的匹配新闻数(与推送显示一致)
+    # 打印过滤后的匹配新闻数
     matched_news_count = sum(len(stat["titles"]) for stat in stats if stat["count"] > 0)
-    if mode == "daily":
-        print(f"频率词过滤后:{matched_news_count} 条新闻匹配(将显示在推送中)")
+    if not quiet and mode == "daily":
+        print(f"当日汇总模式:处理 {total_titles} 条新闻,模式:频率词过滤")
+        print(f"频率词过滤后:{matched_news_count} 条新闻匹配")
 
     return stats, total_titles

+ 36 - 22
trendradar/core/data.py

@@ -152,6 +152,7 @@ def read_all_today_titles_from_storage(
 def read_all_today_titles(
     storage_manager,
     current_platform_ids: Optional[List[str]] = None,
+    quiet: bool = False,
 ) -> Tuple[Dict, Dict, Dict]:
     """
     读取当天所有标题(从存储后端)
@@ -159,6 +160,7 @@ def read_all_today_titles(
     Args:
         storage_manager: 存储管理器实例
         current_platform_ids: 当前监控的平台 ID 列表(用于过滤)
+        quiet: 是否静默模式(不打印日志)
 
     Returns:
         Tuple[Dict, Dict, Dict]: (all_results, id_to_name, title_info)
@@ -167,11 +169,12 @@ def read_all_today_titles(
         storage_manager, current_platform_ids
     )
 
-    if all_results:
-        total_count = sum(len(titles) for titles in all_results.values())
-        print(f"[存储] 已从存储后端读取 {total_count} 条标题")
-    else:
-        print("[存储] 当天暂无数据")
+    if not quiet:
+        if all_results:
+            total_count = sum(len(titles) for titles in all_results.values())
+            print(f"[存储] 已从存储后端读取 {total_count} 条标题")
+        else:
+            print("[存储] 当天暂无数据")
 
     return all_results, final_id_to_name, title_info
 
@@ -202,19 +205,35 @@ def detect_latest_new_titles_from_storage(
             # 没有历史数据(第一次抓取),不应该有"新增"标题
             return {}
 
-        # 收集历史标题(不包括最新批次的时间)
+        # 获取最新批次时间
         latest_time = latest_data.crawl_time
-        historical_titles = {}
 
+        # 步骤1:收集最新批次的标题(last_crawl_time = latest_time 的标题)
+        latest_titles = {}
+        for source_id, news_list in latest_data.items.items():
+            if current_platform_ids is not None and source_id not in current_platform_ids:
+                continue
+            latest_titles[source_id] = {}
+            for item in news_list:
+                latest_titles[source_id][item.title] = {
+                    "ranks": [item.rank],
+                    "url": item.url or "",
+                    "mobileUrl": item.mobile_url or "",
+                }
+
+        # 步骤2:收集历史标题
+        # 关键逻辑:一个标题只要其 first_crawl_time < latest_time,就是历史标题
+        # 这样即使同一标题有多条记录(URL 不同),只要任何一条是历史的,该标题就算历史
+        historical_titles = {}
         for source_id, news_list in all_data.items.items():
             if current_platform_ids is not None and source_id not in current_platform_ids:
                 continue
 
             historical_titles[source_id] = set()
             for item in news_list:
-                # 只统计非最新批次的标题
                 first_time = getattr(item, 'first_time', item.crawl_time)
-                if first_time != latest_time:
+                # 如果该记录的首次出现时间早于最新批次,则该标题是历史标题
+                if first_time < latest_time:
                     historical_titles[source_id].add(item.title)
 
         # 检查是否是当天第一次抓取(没有任何历史标题)
@@ -223,22 +242,15 @@ def detect_latest_new_titles_from_storage(
         if not has_historical_data:
             return {}
 
-        # 找出新增标题
+        # 步骤3:找出新增标题 = 最新批次标题 - 历史标题
         new_titles = {}
-        for source_id, news_list in latest_data.items.items():
-            if current_platform_ids is not None and source_id not in current_platform_ids:
-                continue
-
+        for source_id, source_latest_titles in latest_titles.items():
             historical_set = historical_titles.get(source_id, set())
             source_new_titles = {}
 
-            for item in news_list:
-                if item.title not in historical_set:
-                    source_new_titles[item.title] = {
-                        "ranks": [item.rank],
-                        "url": item.url or "",
-                        "mobileUrl": item.mobile_url or "",
-                    }
+            for title, title_data in source_latest_titles.items():
+                if title not in historical_set:
+                    source_new_titles[title] = title_data
 
             if source_new_titles:
                 new_titles[source_id] = source_new_titles
@@ -253,6 +265,7 @@ def detect_latest_new_titles_from_storage(
 def detect_latest_new_titles(
     storage_manager,
     current_platform_ids: Optional[List[str]] = None,
+    quiet: bool = False,
 ) -> Dict:
     """
     检测当日最新批次的新增标题(从存储后端)
@@ -260,12 +273,13 @@ def detect_latest_new_titles(
     Args:
         storage_manager: 存储管理器实例
         current_platform_ids: 当前监控的平台 ID 列表(用于过滤)
+        quiet: 是否静默模式(不打印日志)
 
     Returns:
         Dict: 新增标题 {source_id: {title: title_data}}
     """
     new_titles = detect_latest_new_titles_from_storage(storage_manager, current_platform_ids)
-    if new_titles:
+    if new_titles and not quiet:
         total_new = sum(len(titles) for titles in new_titles.values())
         print(f"[存储] 从存储后端检测到 {total_new} 条新增标题")
     return new_titles

+ 1 - 1
trendradar/notification/renderer.py

@@ -6,7 +6,7 @@
 """
 
 from datetime import datetime
-from typing import Dict, List, Optional, Callable
+from typing import Dict, Optional, Callable
 
 from trendradar.report.formatter import format_title_for_platform
 

+ 0 - 2
trendradar/storage/base.py

@@ -7,9 +7,7 @@
 
 from abc import ABC, abstractmethod
 from dataclasses import dataclass, field
-from datetime import datetime
 from typing import Dict, List, Optional, Any
-import json
 
 
 @dataclass

+ 30 - 10
trendradar/storage/local.py

@@ -6,13 +6,12 @@
 """
 
 import sqlite3
-import os
 import shutil
 import pytz
 import re
 from datetime import datetime, timedelta
 from pathlib import Path
-from typing import Dict, List, Optional, Any
+from typing import Dict, List, Optional
 
 from trendradar.storage.base import StorageBackend, NewsItem, NewsData
 from trendradar.utils.time import (
@@ -20,6 +19,7 @@ from trendradar.utils.time import (
     format_date_folder,
     format_time_filename,
 )
+from trendradar.utils.url import normalize_url
 
 
 class LocalStorageBackend(StorageBackend):
@@ -148,12 +148,15 @@ class LocalStorageBackend(StorageBackend):
 
                 for item in news_list:
                     try:
-                        # 检查是否已存在(通过 URL + platform_id)
-                        if item.url:
+                        # 标准化 URL(去除动态参数,如微博的 band_rank)
+                        normalized_url = normalize_url(item.url, source_id) if item.url else ""
+
+                        # 检查是否已存在(通过标准化 URL + platform_id)
+                        if normalized_url:
                             cursor.execute("""
                                 SELECT id, title FROM news_items
                                 WHERE url = ? AND platform_id = ?
-                            """, (item.url, source_id))
+                            """, (normalized_url, source_id))
                             existing = cursor.fetchone()
 
                             if existing:
@@ -191,14 +194,14 @@ class LocalStorageBackend(StorageBackend):
                                       data.crawl_time, now_str, existing_id))
                                 updated_count += 1
                             else:
-                                # 不存在,插入新记录
+                                # 不存在,插入新记录(存储标准化后的 URL)
                                 cursor.execute("""
                                     INSERT INTO news_items
                                     (title, platform_id, rank, url, mobile_url,
                                      first_crawl_time, last_crawl_time, crawl_count,
                                      created_at, updated_at)
                                     VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?)
-                                """, (item.title, source_id, item.rank, item.url,
+                                """, (item.title, source_id, item.rank, normalized_url,
                                       item.mobile_url, data.crawl_time, data.crawl_time,
                                       now_str, now_str))
                                 new_id = cursor.lastrowid
@@ -217,7 +220,7 @@ class LocalStorageBackend(StorageBackend):
                                  first_crawl_time, last_crawl_time, crawl_count,
                                  created_at, updated_at)
                                 VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?)
-                            """, (item.title, source_id, item.rank, item.url,
+                            """, (item.title, source_id, item.rank, "",
                                   item.mobile_url, data.crawl_time, data.crawl_time,
                                   now_str, now_str))
                             new_id = cursor.lastrowid
@@ -524,6 +527,9 @@ class LocalStorageBackend(StorageBackend):
         """
         检测新增的标题
 
+        该方法比较当前抓取数据与历史数据,找出新增的标题。
+        关键逻辑:只有在历史批次中从未出现过的标题才算新增。
+
         Args:
             current_data: 当前抓取的数据
 
@@ -541,10 +547,24 @@ class LocalStorageBackend(StorageBackend):
                     new_titles[source_id] = {item.title: item for item in news_list}
                 return new_titles
 
-            # 收集历史标题
+            # 获取当前批次时间
+            current_time = current_data.crawl_time
+
+            # 收集历史标题(first_time < current_time 的标题)
+            # 这样可以正确处理同一标题因 URL 变化而产生多条记录的情况
             historical_titles: Dict[str, set] = {}
             for source_id, news_list in historical_data.items.items():
-                historical_titles[source_id] = {item.title for item in news_list}
+                historical_titles[source_id] = set()
+                for item in news_list:
+                    first_time = getattr(item, 'first_time', item.crawl_time)
+                    if first_time < current_time:
+                        historical_titles[source_id].add(item.title)
+
+            # 检查是否有历史数据
+            has_historical_data = any(len(titles) > 0 for titles in historical_titles.values())
+            if not has_historical_data:
+                # 第一次抓取,没有"新增"概念
+                return {}
 
             # 检测新增
             new_titles = {}

+ 32 - 8
trendradar/storage/remote.py

@@ -34,6 +34,7 @@ from trendradar.utils.time import (
     format_date_folder,
     format_time_filename,
 )
+from trendradar.utils.url import normalize_url
 
 
 class RemoteStorageBackend(StorageBackend):
@@ -355,12 +356,15 @@ class RemoteStorageBackend(StorageBackend):
 
                 for item in news_list:
                     try:
-                        # 检查是否已存在(通过 URL + platform_id)
-                        if item.url:
+                        # 标准化 URL(去除动态参数,如微博的 band_rank)
+                        normalized_url = normalize_url(item.url, source_id) if item.url else ""
+
+                        # 检查是否已存在(通过标准化 URL + platform_id)
+                        if normalized_url:
                             cursor.execute("""
                                 SELECT id, title FROM news_items
                                 WHERE url = ? AND platform_id = ?
-                            """, (item.url, source_id))
+                            """, (normalized_url, source_id))
                             existing = cursor.fetchone()
 
                             if existing:
@@ -398,14 +402,14 @@ class RemoteStorageBackend(StorageBackend):
                                       data.crawl_time, now_str, existing_id))
                                 updated_count += 1
                             else:
-                                # 不存在,插入新记录
+                                # 不存在,插入新记录(存储标准化后的 URL)
                                 cursor.execute("""
                                     INSERT INTO news_items
                                     (title, platform_id, rank, url, mobile_url,
                                      first_crawl_time, last_crawl_time, crawl_count,
                                      created_at, updated_at)
                                     VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?)
-                                """, (item.title, source_id, item.rank, item.url,
+                                """, (item.title, source_id, item.rank, normalized_url,
                                       item.mobile_url, data.crawl_time, data.crawl_time,
                                       now_str, now_str))
                                 new_id = cursor.lastrowid
@@ -424,7 +428,7 @@ class RemoteStorageBackend(StorageBackend):
                                  first_crawl_time, last_crawl_time, crawl_count,
                                  created_at, updated_at)
                                 VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?)
-                            """, (item.title, source_id, item.rank, item.url,
+                            """, (item.title, source_id, item.rank, "",
                                   item.mobile_url, data.crawl_time, data.crawl_time,
                                   now_str, now_str))
                             new_id = cursor.lastrowid
@@ -693,7 +697,12 @@ class RemoteStorageBackend(StorageBackend):
             return None
 
     def detect_new_titles(self, current_data: NewsData) -> Dict[str, Dict]:
-        """检测新增的标题"""
+        """
+        检测新增的标题
+
+        该方法比较当前抓取数据与历史数据,找出新增的标题。
+        关键逻辑:只有在历史批次中从未出现过的标题才算新增。
+        """
         try:
             historical_data = self.get_today_all_data(current_data.date)
 
@@ -703,9 +712,24 @@ class RemoteStorageBackend(StorageBackend):
                     new_titles[source_id] = {item.title: item for item in news_list}
                 return new_titles
 
+            # 获取当前批次时间
+            current_time = current_data.crawl_time
+
+            # 收集历史标题(first_time < current_time 的标题)
+            # 这样可以正确处理同一标题因 URL 变化而产生多条记录的情况
             historical_titles: Dict[str, set] = {}
             for source_id, news_list in historical_data.items.items():
-                historical_titles[source_id] = {item.title for item in news_list}
+                historical_titles[source_id] = set()
+                for item in news_list:
+                    first_time = getattr(item, 'first_time', item.crawl_time)
+                    if first_time < current_time:
+                        historical_titles[source_id].add(item.title)
+
+            # 检查是否有历史数据
+            has_historical_data = any(len(titles) > 0 for titles in historical_titles.values())
+            if not has_historical_data:
+                # 第一次抓取,没有"新增"概念
+                return {}
 
             new_titles = {}
             for source_id, news_list in current_data.items.items():

+ 3 - 0
trendradar/utils/__init__.py

@@ -10,6 +10,7 @@ from trendradar.utils.time import (
     get_current_time_display,
     convert_time_for_display,
 )
+from trendradar.utils.url import normalize_url, get_url_signature
 
 __all__ = [
     "get_configured_time",
@@ -17,4 +18,6 @@ __all__ = [
     "format_time_filename",
     "get_current_time_display",
     "convert_time_for_display",
+    "normalize_url",
+    "get_url_signature",
 ]

+ 146 - 0
trendradar/utils/url.py

@@ -0,0 +1,146 @@
+# coding=utf-8
+"""
+URL 处理工具模块
+
+提供 URL 标准化功能,用于去重时消除动态参数的影响:
+- normalize_url: 标准化 URL,去除动态参数
+"""
+
+from urllib.parse import urlparse, urlunparse, parse_qs, urlencode
+from typing import Dict, Set
+
+
+# 各平台需要移除的特定参数
+#   - weibo: 有 band_rank(排名)和 Refer(来源)动态参数
+#   - 其他平台: URL 为路径格式或简单关键词查询,无需处理
+PLATFORM_PARAMS_TO_REMOVE: Dict[str, Set[str]] = {
+    # 微博:band_rank 是动态排名参数,Refer 是来源参数,t 是时间范围参数
+    # 示例:https://s.weibo.com/weibo?q=xxx&t=31&band_rank=1&Refer=top
+    # 保留:q(关键词)
+    # 移除:band_rank, Refer, t
+    "weibo": {"band_rank", "Refer", "t"},
+}
+
+# 通用追踪参数(适用于所有平台)
+# 这些参数通常由分享链接或广告追踪添加,不影响内容识别
+COMMON_TRACKING_PARAMS: Set[str] = {
+    # UTM 追踪参数
+    "utm_source", "utm_medium", "utm_campaign", "utm_term", "utm_content",
+    # 常见追踪参数
+    "ref", "referrer", "source", "channel",
+    # 时间戳和随机参数
+    "_t", "timestamp", "_", "random",
+    # 分享相关
+    "share_token", "share_id", "share_from",
+}
+
+
+def normalize_url(url: str, platform_id: str = "") -> str:
+    """
+    标准化 URL,去除动态参数
+
+    用于数据库去重,确保同一条新闻的不同 URL 变体能被正确识别为同一条。
+
+    处理规则:
+    1. 去除平台特定的动态参数(如微博的 band_rank)
+    2. 去除通用追踪参数(如 utm_*)
+    3. 保留核心查询参数(如搜索关键词 q=, wd=, keyword=)
+    4. 对查询参数按字母序排序(确保一致性)
+
+    Args:
+        url: 原始 URL
+        platform_id: 平台 ID,用于应用平台特定规则
+
+    Returns:
+        标准化后的 URL
+
+    Examples:
+        >>> normalize_url("https://s.weibo.com/weibo?q=test&band_rank=6&Refer=top", "weibo")
+        'https://s.weibo.com/weibo?q=test'
+
+        >>> normalize_url("https://example.com/page?id=1&utm_source=twitter", "")
+        'https://example.com/page?id=1'
+    """
+    if not url:
+        return url
+
+    try:
+        # 解析 URL
+        parsed = urlparse(url)
+
+        # 如果没有查询参数,直接返回
+        if not parsed.query:
+            return url
+
+        # 解析查询参数
+        params = parse_qs(parsed.query, keep_blank_values=True)
+
+        # 收集需要移除的参数(使用小写进行比较)
+        params_to_remove: Set[str] = set()
+
+        # 添加通用追踪参数
+        params_to_remove.update(COMMON_TRACKING_PARAMS)
+
+        # 添加平台特定参数
+        if platform_id and platform_id in PLATFORM_PARAMS_TO_REMOVE:
+            params_to_remove.update(PLATFORM_PARAMS_TO_REMOVE[platform_id])
+
+        # 过滤参数(参数名转小写进行比较)
+        filtered_params = {
+            key: values
+            for key, values in params.items()
+            if key.lower() not in {p.lower() for p in params_to_remove}
+        }
+
+        # 如果过滤后没有参数了,返回不带查询字符串的 URL
+        if not filtered_params:
+            return urlunparse((
+                parsed.scheme,
+                parsed.netloc,
+                parsed.path,
+                parsed.params,
+                "",  # 空查询字符串
+                ""   # 移除 fragment
+            ))
+
+        # 重建查询字符串(按字母序排序以确保一致性)
+        sorted_params = []
+        for key in sorted(filtered_params.keys()):
+            for value in filtered_params[key]:
+                sorted_params.append((key, value))
+
+        new_query = urlencode(sorted_params)
+
+        # 重建 URL(移除 fragment)
+        normalized = urlunparse((
+            parsed.scheme,
+            parsed.netloc,
+            parsed.path,
+            parsed.params,
+            new_query,
+            ""  # 移除 fragment
+        ))
+
+        return normalized
+
+    except Exception:
+        # 解析失败时返回原始 URL
+        return url
+
+
+def get_url_signature(url: str, platform_id: str = "") -> str:
+    """
+    获取 URL 的签名(用于快速比较)
+
+    基于标准化 URL 生成签名,可用于:
+    - 快速判断两个 URL 是否指向同一内容
+    - 作为缓存键
+
+    Args:
+        url: 原始 URL
+        platform_id: 平台 ID
+
+    Returns:
+        URL 签名字符串
+    """
+    return normalize_url(url, platform_id)

+ 1 - 1
version

@@ -1 +1 @@
-4.0.2
+4.0.3