| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257 |
- """
- TrendRadar MCP Server - FastMCP 2.0 实现
- 使用 FastMCP 2.0 提供生产级 MCP 工具服务器。
- 支持 stdio 和 HTTP 两种传输模式。
- """
- import asyncio
- import json
- from typing import List, Optional, Dict, Union
- from fastmcp import FastMCP
- from .tools.data_query import DataQueryTools
- from .tools.analytics import AnalyticsTools
- from .tools.search_tools import SearchTools
- from .tools.config_mgmt import ConfigManagementTools
- from .tools.system import SystemManagementTools
- from .tools.storage_sync import StorageSyncTools
- from .tools.article_reader import ArticleReaderTools
- from .tools.notification import NotificationTools
- from .utils.date_parser import DateParser
- from .utils.errors import MCPError
- # 创建 FastMCP 2.0 应用
- mcp = FastMCP('trendradar-news')
- # 全局工具实例(在第一次请求时初始化)
- _tools_instances = {}
- def _get_tools(project_root: Optional[str] = None):
- """获取或创建工具实例(单例模式)"""
- if not _tools_instances:
- _tools_instances['data'] = DataQueryTools(project_root)
- _tools_instances['analytics'] = AnalyticsTools(project_root)
- _tools_instances['search'] = SearchTools(project_root)
- _tools_instances['config'] = ConfigManagementTools(project_root)
- _tools_instances['system'] = SystemManagementTools(project_root)
- _tools_instances['storage'] = StorageSyncTools(project_root)
- _tools_instances['article'] = ArticleReaderTools(project_root)
- _tools_instances['notification'] = NotificationTools(project_root)
- return _tools_instances
- # ==================== MCP Resources ====================
- @mcp.resource("config://platforms")
- async def get_platforms_resource() -> str:
- """
- 获取支持的平台列表
- 返回 config.yaml 中配置的所有平台信息,包括 ID 和名称。
- """
- tools = _get_tools()
- config = await asyncio.to_thread(
- tools['config'].get_current_config, section="crawler"
- )
- return json.dumps({
- "platforms": config.get("platforms", []),
- "description": "TrendRadar 支持的热榜平台列表"
- }, ensure_ascii=False, indent=2)
- @mcp.resource("config://rss-feeds")
- async def get_rss_feeds_resource() -> str:
- """
- 获取 RSS 订阅源列表
- 返回当前配置的所有 RSS 源信息。
- """
- tools = _get_tools()
- status = await asyncio.to_thread(tools['data'].get_rss_feeds_status)
- return json.dumps({
- "feeds": status.get("today_feeds", {}),
- "description": "TrendRadar 支持的 RSS 订阅源列表"
- }, ensure_ascii=False, indent=2)
- @mcp.resource("data://available-dates")
- async def get_available_dates_resource() -> str:
- """
- 获取可用的数据日期范围
- 返回本地存储中可查询的日期列表。
- """
- tools = _get_tools()
- result = await asyncio.to_thread(
- tools['storage'].list_available_dates, source="local"
- )
- return json.dumps({
- "dates": result.get("data", {}).get("local", {}).get("dates", []),
- "description": "本地存储中可查询的日期列表"
- }, ensure_ascii=False, indent=2)
- @mcp.resource("config://keywords")
- async def get_keywords_resource() -> str:
- """
- 获取关注词配置
- 返回 frequency_words.txt 中配置的关注词分组。
- """
- tools = _get_tools()
- config = await asyncio.to_thread(
- tools['config'].get_current_config, section="keywords"
- )
- return json.dumps({
- "word_groups": config.get("word_groups", []),
- "total_groups": config.get("total_groups", 0),
- "description": "TrendRadar 关注词配置"
- }, ensure_ascii=False, indent=2)
- # ==================== 日期解析工具(优先调用)====================
- @mcp.tool
- async def resolve_date_range(
- expression: str
- ) -> str:
- """
- 【推荐优先调用】将自然语言日期表达式解析为标准日期范围
- **为什么需要这个工具?**
- 用户经常使用"本周"、"最近7天"等自然语言表达日期,但 AI 模型自己计算日期
- 可能导致不一致的结果。此工具在服务器端使用精确的当前时间计算,确保所有
- AI 模型获得一致的日期范围。
- **推荐使用流程:**
- 1. 用户说"分析AI本周的情感倾向"
- 2. AI 调用 resolve_date_range("本周") → 获取精确日期范围
- 3. AI 调用 analyze_sentiment(topic="ai", date_range=上一步返回的date_range)
- Args:
- expression: 自然语言日期表达式,支持:
- - 单日: "今天", "昨天", "today", "yesterday"
- - 周: "本周", "上周", "this week", "last week"
- - 月: "本月", "上月", "this month", "last month"
- - 最近N天: "最近7天", "最近30天", "last 7 days", "last 30 days"
- - 动态: "最近5天", "last 10 days"(任意天数)
- Returns:
- JSON格式的日期范围,可直接用于其他工具的 date_range 参数:
- {
- "success": true,
- "expression": "本周",
- "date_range": {
- "start": "2025-11-18",
- "end": "2025-11-26"
- },
- "current_date": "2025-11-26",
- "description": "本周(周一到周日,11-18 至 11-26)"
- }
- Examples:
- 用户:"分析AI本周的情感倾向"
- AI调用步骤:
- 1. resolve_date_range("本周")
- → {"date_range": {"start": "2025-11-18", "end": "2025-11-26"}, ...}
- 2. analyze_sentiment(topic="ai", date_range={"start": "2025-11-18", "end": "2025-11-26"})
- 用户:"看看最近7天的特斯拉新闻"
- AI调用步骤:
- 1. resolve_date_range("最近7天")
- → {"date_range": {"start": "2025-11-20", "end": "2025-11-26"}, ...}
- 2. search_news(query="特斯拉", date_range={"start": "2025-11-20", "end": "2025-11-26"})
- """
- try:
- result = await asyncio.to_thread(DateParser.resolve_date_range_expression, expression)
- return json.dumps(result, ensure_ascii=False, indent=2)
- except MCPError as e:
- return json.dumps({
- "success": False,
- "error": e.to_dict()
- }, ensure_ascii=False, indent=2)
- except Exception as e:
- return json.dumps({
- "success": False,
- "error": {
- "code": "INTERNAL_ERROR",
- "message": str(e)
- }
- }, ensure_ascii=False, indent=2)
- # ==================== 数据查询工具 ====================
- @mcp.tool
- async def get_latest_news(
- platforms: Optional[List[str]] = None,
- limit: int = 50,
- include_url: bool = False
- ) -> str:
- """
- 获取最新一批爬取的新闻数据,快速了解当前热点
- Args:
- platforms: 平台ID列表,如 ['zhihu', 'weibo'],不指定则使用所有平台
- limit: 返回条数限制,默认50,最大1000
- include_url: 是否包含URL链接,默认False(节省token)
- Returns:
- JSON格式的新闻列表
- **数据展示建议**
- - 默认展示全部返回数据,除非用户明确要求总结
- - 用户说"总结"或"挑重点"时才进行筛选
- - 用户问"为什么只显示部分"说明需要完整数据
- """
- tools = _get_tools()
- result = await asyncio.to_thread(
- tools['data'].get_latest_news,
- platforms=platforms, limit=limit, include_url=include_url
- )
- return json.dumps(result, ensure_ascii=False, indent=2)
- @mcp.tool
- async def get_trending_topics(
- top_n: int = 10,
- mode: str = 'current',
- extract_mode: str = 'keywords'
- ) -> str:
- """
- 获取热点话题统计
- Args:
- top_n: 返回TOP N话题,默认10
- mode: 时间模式
- - "daily": 当日累计数据统计
- - "current": 最新一批数据统计(默认)
- extract_mode: 提取模式
- - "keywords": 统计预设关注词(基于 config/frequency_words.txt,默认)
- - "auto_extract": 自动从新闻标题提取高频词(无需预设,自动发现热点)
- Returns:
- JSON格式的话题频率统计列表
- Examples:
- - 使用预设关注词: get_trending_topics(mode="current")
- - 自动提取热点: get_trending_topics(extract_mode="auto_extract", top_n=20)
- """
- tools = _get_tools()
- result = await asyncio.to_thread(
- tools['data'].get_trending_topics,
- top_n=top_n, mode=mode, extract_mode=extract_mode
- )
- return json.dumps(result, ensure_ascii=False, indent=2)
- # ==================== RSS 数据查询工具 ====================
- @mcp.tool
- async def get_latest_rss(
- feeds: Optional[List[str]] = None,
- days: int = 1,
- limit: int = 50,
- include_summary: bool = False
- ) -> str:
- """
- 获取最新的 RSS 订阅数据(支持多日查询)
- RSS 数据与热榜新闻分开存储,按时间流展示,适合获取特定来源的最新内容。
- Args:
- feeds: RSS 源 ID 列表,如 ['hacker-news', '36kr'],不指定则返回所有源
- days: 获取最近 N 天的数据,默认 1(仅今天),最大 30 天
- limit: 返回条数限制,默认50,最大500
- include_summary: 是否包含文章摘要,默认False(节省token)
- Returns:
- JSON格式的 RSS 条目列表
- Examples:
- - get_latest_rss()
- - get_latest_rss(days=7, feeds=['hacker-news'])
- """
- tools = _get_tools()
- result = await asyncio.to_thread(
- tools['data'].get_latest_rss,
- feeds=feeds, days=days, limit=limit, include_summary=include_summary
- )
- return json.dumps(result, ensure_ascii=False, indent=2)
- @mcp.tool
- async def search_rss(
- keyword: str,
- feeds: Optional[List[str]] = None,
- days: int = 7,
- limit: int = 50,
- include_summary: bool = False
- ) -> str:
- """
- 搜索 RSS 数据
- 在 RSS 订阅数据中搜索包含指定关键词的文章。
- Args:
- keyword: 搜索关键词(必需)
- feeds: RSS 源 ID 列表,如 ['hacker-news', '36kr']
- - 不指定时:搜索所有 RSS 源
- days: 搜索最近 N 天的数据,默认 7 天,最大 30 天
- limit: 返回条数限制,默认50
- include_summary: 是否包含文章摘要,默认False
- Returns:
- JSON格式的匹配 RSS 条目列表
- Examples:
- - search_rss(keyword="AI")
- - search_rss(keyword="machine learning", feeds=['hacker-news'], days=14)
- """
- tools = _get_tools()
- result = await asyncio.to_thread(
- tools['data'].search_rss,
- keyword=keyword,
- feeds=feeds,
- days=days,
- limit=limit,
- include_summary=include_summary
- )
- return json.dumps(result, ensure_ascii=False, indent=2)
- @mcp.tool
- async def get_rss_feeds_status() -> str:
- """
- 获取 RSS 源状态信息
- 查看当前配置的 RSS 源及其数据统计信息。
- Returns:
- JSON格式的 RSS 源状态,包含:
- - available_dates: 有 RSS 数据的日期列表
- - total_dates: 总日期数
- - today_feeds: 今日各 RSS 源的数据统计
- - {feed_id}: { name, item_count }
- - generated_at: 生成时间
- Examples:
- - get_rss_feeds_status() # 查看所有 RSS 源状态
- """
- tools = _get_tools()
- result = await asyncio.to_thread(tools['data'].get_rss_feeds_status)
- return json.dumps(result, ensure_ascii=False, indent=2)
- @mcp.tool
- async def get_news_by_date(
- date_range: Optional[Union[Dict[str, str], str]] = None,
- platforms: Optional[List[str]] = None,
- limit: int = 50,
- include_url: bool = False
- ) -> str:
- """
- 获取指定日期的新闻数据,用于历史数据分析和对比
- Args:
- date_range: 日期范围,支持多种格式:
- - 范围对象: {"start": "2025-01-01", "end": "2025-01-07"}
- - 自然语言: "今天", "昨天", "本周", "最近7天"
- - 单日字符串: "2025-01-15"
- - 默认值: "今天"
- platforms: 平台ID列表,如 ['zhihu', 'weibo'],不指定则使用所有平台
- limit: 返回条数限制,默认50,最大1000
- include_url: 是否包含URL链接,默认False(节省token)
- Returns:
- JSON格式的新闻列表,包含标题、平台、排名等信息
- """
- tools = _get_tools()
- result = await asyncio.to_thread(
- tools['data'].get_news_by_date,
- date_range=date_range,
- platforms=platforms,
- limit=limit,
- include_url=include_url
- )
- return json.dumps(result, ensure_ascii=False, indent=2)
- # ==================== 高级数据分析工具 ====================
- @mcp.tool
- async def analyze_topic_trend(
- topic: str,
- analysis_type: str = "trend",
- date_range: Optional[Union[Dict[str, str], str]] = None,
- granularity: str = "day",
- spike_threshold: float = 3.0,
- time_window: int = 24,
- lookahead_hours: int = 6,
- confidence_threshold: float = 0.7
- ) -> str:
- """
- 统一话题趋势分析工具 - 整合多种趋势分析模式
- 建议:使用自然语言日期时,先调用 resolve_date_range 获取精确日期范围。
- Args:
- topic: 话题关键词(必需)
- analysis_type: 分析类型
- - "trend": 热度趋势分析(默认)
- - "lifecycle": 生命周期分析
- - "viral": 异常热度检测
- - "predict": 话题预测
- date_range: 日期范围,格式 {"start": "YYYY-MM-DD", "end": "YYYY-MM-DD"},默认最近7天
- granularity: 时间粒度,默认"day"
- spike_threshold: 热度突增倍数阈值(viral模式),默认3.0
- time_window: 检测时间窗口小时数(viral模式),默认24
- lookahead_hours: 预测未来小时数(predict模式),默认6
- confidence_threshold: 置信度阈值(predict模式),默认0.7
- Returns:
- JSON格式的趋势分析结果
- Examples:
- - analyze_topic_trend(topic="AI", date_range={"start": "2025-01-01", "end": "2025-01-07"})
- - analyze_topic_trend(topic="特斯拉", analysis_type="lifecycle")
- """
- tools = _get_tools()
- result = await asyncio.to_thread(
- tools['analytics'].analyze_topic_trend_unified,
- topic=topic,
- analysis_type=analysis_type,
- date_range=date_range,
- granularity=granularity,
- threshold=spike_threshold,
- time_window=time_window,
- lookahead_hours=lookahead_hours,
- confidence_threshold=confidence_threshold
- )
- return json.dumps(result, ensure_ascii=False, indent=2)
- @mcp.tool
- async def analyze_data_insights(
- insight_type: str = "platform_compare",
- topic: Optional[str] = None,
- date_range: Optional[Union[Dict[str, str], str]] = None,
- min_frequency: int = 3,
- top_n: int = 20
- ) -> str:
- """
- 统一数据洞察分析工具 - 整合多种数据分析模式
- Args:
- insight_type: 洞察类型,可选值:
- - "platform_compare": 平台对比分析(对比不同平台对话题的关注度)
- - "platform_activity": 平台活跃度统计(统计各平台发布频率和活跃时间)
- - "keyword_cooccur": 关键词共现分析(分析关键词同时出现的模式)
- topic: 话题关键词(可选,platform_compare模式适用)
- date_range: **【对象类型】** 日期范围(可选)
- - **格式**: {"start": "YYYY-MM-DD", "end": "YYYY-MM-DD"}
- - **示例**: {"start": "2025-01-01", "end": "2025-01-07"}
- - **重要**: 必须是对象格式,不能传递整数
- min_frequency: 最小共现频次(keyword_cooccur模式),默认3
- top_n: 返回TOP N结果(keyword_cooccur模式),默认20
- Returns:
- JSON格式的数据洞察分析结果
- Examples:
- - analyze_data_insights(insight_type="platform_compare", topic="人工智能")
- - analyze_data_insights(insight_type="platform_activity", date_range={"start": "2025-01-01", "end": "2025-01-07"})
- - analyze_data_insights(insight_type="keyword_cooccur", min_frequency=5, top_n=15)
- """
- tools = _get_tools()
- result = await asyncio.to_thread(
- tools['analytics'].analyze_data_insights_unified,
- insight_type=insight_type,
- topic=topic,
- date_range=date_range,
- min_frequency=min_frequency,
- top_n=top_n
- )
- return json.dumps(result, ensure_ascii=False, indent=2)
- @mcp.tool
- async def analyze_sentiment(
- topic: Optional[str] = None,
- platforms: Optional[List[str]] = None,
- date_range: Optional[Union[Dict[str, str], str]] = None,
- limit: int = 50,
- sort_by_weight: bool = True,
- include_url: bool = False
- ) -> str:
- """
- 分析新闻的情感倾向和热度趋势
- 建议:使用自然语言日期时,先调用 resolve_date_range 获取精确日期范围。
- Args:
- topic: 话题关键词(可选)
- platforms: 平台ID列表,如 ['zhihu', 'weibo'],不指定则使用所有平台
- date_range: 日期范围,格式 {"start": "YYYY-MM-DD", "end": "YYYY-MM-DD"},默认今天
- limit: 返回新闻数量,默认50,最大100(会对标题去重)
- sort_by_weight: 是否按热度权重排序,默认True
- include_url: 是否包含URL链接,默认False(节省token)
- Returns:
- JSON格式的分析结果,包含情感分布、热度趋势和相关新闻
- Examples:
- - analyze_sentiment(topic="AI", date_range={"start": "2025-01-01", "end": "2025-01-07"})
- """
- tools = _get_tools()
- result = await asyncio.to_thread(
- tools['analytics'].analyze_sentiment,
- topic=topic,
- platforms=platforms,
- date_range=date_range,
- limit=limit,
- sort_by_weight=sort_by_weight,
- include_url=include_url
- )
- return json.dumps(result, ensure_ascii=False, indent=2)
- @mcp.tool
- async def find_related_news(
- reference_title: str,
- date_range: Optional[Union[Dict[str, str], str]] = None,
- threshold: float = 0.5,
- limit: int = 50,
- include_url: bool = False
- ) -> str:
- """
- 查找与指定新闻标题相关的其他新闻(支持当天和历史数据)
- Args:
- reference_title: 参考新闻标题(完整或部分)
- date_range: 日期范围(可选)
- - 不指定: 只查询今天的数据
- - "today", "yesterday", "last_week", "last_month": 预设值
- - {"start": "YYYY-MM-DD", "end": "YYYY-MM-DD"}: 自定义范围
- threshold: 相似度阈值,0-1之间,默认0.5(越高匹配越严格)
- limit: 返回条数限制,默认50
- include_url: 是否包含URL链接,默认False(节省token)
- Returns:
- JSON格式的相关新闻列表,按相似度排序
- Examples:
- - find_related_news(reference_title="特斯拉降价")
- - find_related_news(reference_title="AI突破", date_range="last_week")
- """
- tools = _get_tools()
- result = await asyncio.to_thread(
- tools['search'].find_related_news_unified,
- reference_title=reference_title,
- date_range=date_range,
- threshold=threshold,
- limit=limit,
- include_url=include_url
- )
- return json.dumps(result, ensure_ascii=False, indent=2)
- @mcp.tool
- async def generate_summary_report(
- report_type: str = "daily",
- date_range: Optional[Union[Dict[str, str], str]] = None
- ) -> str:
- """
- 每日/每周摘要生成器 - 自动生成热点摘要报告
- Args:
- report_type: 报告类型(daily/weekly)
- date_range: **【对象类型】** 自定义日期范围(可选)
- - **格式**: {"start": "YYYY-MM-DD", "end": "YYYY-MM-DD"}
- - **示例**: {"start": "2025-01-01", "end": "2025-01-07"}
- - **重要**: 必须是对象格式,不能传递整数
- Returns:
- JSON格式的摘要报告,包含Markdown格式内容
- """
- tools = _get_tools()
- result = await asyncio.to_thread(
- tools['analytics'].generate_summary_report,
- report_type=report_type,
- date_range=date_range
- )
- return json.dumps(result, ensure_ascii=False, indent=2)
- @mcp.tool
- async def aggregate_news(
- date_range: Optional[Union[Dict[str, str], str]] = None,
- platforms: Optional[List[str]] = None,
- similarity_threshold: float = 0.7,
- limit: int = 50,
- include_url: bool = False
- ) -> str:
- """
- 跨平台新闻聚合 - 对相似新闻进行去重合并
- 将不同平台报道的同一事件合并为一条聚合新闻,显示跨平台覆盖情况和综合热度。
- Args:
- date_range: 日期范围,不指定则查询今天
- platforms: 平台ID列表,如 ['zhihu', 'weibo'],不指定则使用所有平台
- similarity_threshold: 相似度阈值,0.3-1.0,默认0.7(越高越严格)
- limit: 返回聚合新闻数量,默认50
- include_url: 是否包含URL链接,默认False
- Returns:
- JSON格式的聚合结果,包含去重统计、聚合新闻列表和平台覆盖统计
- Examples:
- - aggregate_news()
- - aggregate_news(similarity_threshold=0.8)
- """
- tools = _get_tools()
- result = await asyncio.to_thread(
- tools['analytics'].aggregate_news,
- date_range=date_range,
- platforms=platforms,
- similarity_threshold=similarity_threshold,
- limit=limit,
- include_url=include_url
- )
- return json.dumps(result, ensure_ascii=False, indent=2)
- @mcp.tool
- async def compare_periods(
- period1: Union[Dict[str, str], str],
- period2: Union[Dict[str, str], str],
- topic: Optional[str] = None,
- compare_type: str = "overview",
- platforms: Optional[List[str]] = None,
- top_n: int = 10
- ) -> str:
- """
- 时期对比分析 - 比较两个时间段的新闻数据
- 对比不同时期的热点话题、平台活跃度、新闻数量等维度。
- **使用场景:**
- - 对比本周和上周的热点变化
- - 分析某个话题在两个时期的热度差异
- - 查看各平台活跃度的周期性变化
- Args:
- period1: 第一个时间段(基准期)
- - {"start": "YYYY-MM-DD", "end": "YYYY-MM-DD"}: 日期范围
- - "today", "yesterday", "this_week", "last_week", "this_month", "last_month": 预设值
- period2: 第二个时间段(对比期,格式同 period1)
- topic: 可选的话题关键词(聚焦特定话题的对比)
- compare_type: 对比类型
- - "overview": 总体概览(默认)- 新闻数量、关键词变化、TOP新闻
- - "topic_shift": 话题变化分析 - 上升话题、下降话题、新出现话题
- - "platform_activity": 平台活跃度对比 - 各平台新闻数量变化
- platforms: 平台过滤列表,如 ['zhihu', 'weibo']
- top_n: 返回 TOP N 结果,默认10
- Returns:
- JSON格式的对比分析结果,包含:
- - periods: 两个时期的日期范围
- - compare_type: 对比类型
- - overview/topic_shift/platform_comparison: 具体对比结果(根据类型)
- Examples:
- - compare_periods(period1="last_week", period2="this_week") # 周环比
- - compare_periods(period1="last_month", period2="this_month", compare_type="topic_shift")
- - compare_periods(
- period1={"start": "2025-01-01", "end": "2025-01-07"},
- period2={"start": "2025-01-08", "end": "2025-01-14"},
- topic="人工智能"
- )
- """
- tools = _get_tools()
- result = await asyncio.to_thread(
- tools['analytics'].compare_periods,
- period1=period1,
- period2=period2,
- topic=topic,
- compare_type=compare_type,
- platforms=platforms,
- top_n=top_n
- )
- return json.dumps(result, ensure_ascii=False, indent=2)
- # ==================== 智能检索工具 ====================
- @mcp.tool
- async def search_news(
- query: str,
- search_mode: str = "keyword",
- date_range: Optional[Union[Dict[str, str], str]] = None,
- platforms: Optional[List[str]] = None,
- limit: int = 50,
- sort_by: str = "relevance",
- threshold: float = 0.6,
- include_url: bool = False,
- include_rss: bool = False,
- rss_limit: int = 20
- ) -> str:
- """
- 统一搜索接口,支持多种搜索模式,可同时搜索热榜和RSS
- 建议:使用自然语言日期时,先调用 resolve_date_range 获取精确日期范围。
- Args:
- query: 搜索关键词或内容片段
- search_mode: 搜索模式
- - "keyword": 精确关键词匹配(默认)
- - "fuzzy": 模糊内容匹配
- - "entity": 实体名称搜索(人物/地点/机构)
- date_range: 日期范围,格式 {"start": "YYYY-MM-DD", "end": "YYYY-MM-DD"},默认今天
- platforms: 平台ID列表,如 ['zhihu', 'weibo'],不指定则使用所有平台
- limit: 热榜返回条数限制,默认50
- sort_by: 排序方式 - "relevance"(相关度)/ "weight"(权重)/ "date"(日期)
- threshold: 相似度阈值(仅fuzzy模式),0-1,默认0.6
- include_url: 是否包含URL链接,默认False
- include_rss: 是否同时搜索RSS数据,默认False
- rss_limit: RSS返回条数限制,默认20
- Returns:
- JSON格式的搜索结果,包含热榜新闻列表和可选的RSS结果
- Examples:
- - search_news(query="AI")
- - search_news(query="AI", include_rss=True)
- - search_news(query="特斯拉", date_range={"start": "2025-01-01", "end": "2025-01-07"})
- """
- tools = _get_tools()
- result = await asyncio.to_thread(
- tools['search'].search_news_unified,
- query=query,
- search_mode=search_mode,
- date_range=date_range,
- platforms=platforms,
- limit=limit,
- sort_by=sort_by,
- threshold=threshold,
- include_url=include_url,
- include_rss=include_rss,
- rss_limit=rss_limit
- )
- return json.dumps(result, ensure_ascii=False, indent=2)
- # ==================== 配置与系统管理工具 ====================
- @mcp.tool
- async def get_current_config(
- section: str = "all"
- ) -> str:
- """
- 获取当前系统配置
- Args:
- section: 配置节,可选值:
- - "all": 所有配置(默认)
- - "crawler": 爬虫配置
- - "push": 推送配置
- - "keywords": 关键词配置
- - "weights": 权重配置
- Returns:
- JSON格式的配置信息
- """
- tools = _get_tools()
- result = await asyncio.to_thread(tools['config'].get_current_config, section=section)
- return json.dumps(result, ensure_ascii=False, indent=2)
- @mcp.tool
- async def get_system_status() -> str:
- """
- 获取系统运行状态和健康检查信息
- 返回系统版本、数据统计、缓存状态等信息
- Returns:
- JSON格式的系统状态信息
- """
- tools = _get_tools()
- result = await asyncio.to_thread(tools['system'].get_system_status)
- return json.dumps(result, ensure_ascii=False, indent=2)
- @mcp.tool
- async def check_version(
- proxy_url: Optional[str] = None
- ) -> str:
- """
- 检查版本更新(同时检查 TrendRadar 和 MCP Server)
- 比较本地版本与 GitHub 远程版本,判断是否需要更新。
- Args:
- proxy_url: 可选的代理URL,用于访问 GitHub(如 http://127.0.0.1:7890)
- Returns:
- JSON格式的版本检查结果,包含两个组件的版本对比和是否需要更新
- Examples:
- - check_version()
- - check_version(proxy_url="http://127.0.0.1:7890")
- """
- tools = _get_tools()
- result = await asyncio.to_thread(tools['system'].check_version, proxy_url=proxy_url)
- return json.dumps(result, ensure_ascii=False, indent=2)
- @mcp.tool
- async def trigger_crawl(
- platforms: Optional[List[str]] = None,
- save_to_local: bool = False,
- include_url: bool = False
- ) -> str:
- """
- 手动触发一次爬取任务(可选持久化)
- Args:
- platforms: 平台ID列表,如 ['zhihu', 'weibo'],不指定则使用所有平台
- save_to_local: 是否保存到本地 output 目录,默认 False
- include_url: 是否包含URL链接,默认False(节省token)
- Returns:
- JSON格式的任务状态信息,包含成功/失败平台列表和新闻数据
- Examples:
- - trigger_crawl(platforms=['zhihu'])
- - trigger_crawl(save_to_local=True)
- """
- tools = _get_tools()
- result = await asyncio.to_thread(
- tools['system'].trigger_crawl,
- platforms=platforms, save_to_local=save_to_local, include_url=include_url
- )
- return json.dumps(result, ensure_ascii=False, indent=2)
- # ==================== 存储同步工具 ====================
- @mcp.tool
- async def sync_from_remote(
- days: int = 7
- ) -> str:
- """
- 从远程存储拉取数据到本地
- 用于 MCP Server 等场景:爬虫存到远程云存储(如 Cloudflare R2),
- MCP Server 拉取到本地进行分析查询。
- Args:
- days: 拉取最近 N 天的数据,默认 7 天
- - 0: 不拉取
- - 7: 拉取最近一周的数据
- - 30: 拉取最近一个月的数据
- Returns:
- JSON格式的同步结果,包含:
- - success: 是否成功
- - synced_files: 成功同步的文件数量
- - synced_dates: 成功同步的日期列表
- - skipped_dates: 跳过的日期(本地已存在)
- - failed_dates: 失败的日期及错误信息
- - message: 操作结果描述
- Examples:
- - sync_from_remote() # 拉取最近7天
- - sync_from_remote(days=30) # 拉取最近30天
- Note:
- 需要在 config/config.yaml 中配置远程存储(storage.remote)或设置环境变量:
- - S3_ENDPOINT_URL: 服务端点
- - S3_BUCKET_NAME: 存储桶名称
- - S3_ACCESS_KEY_ID: 访问密钥 ID
- - S3_SECRET_ACCESS_KEY: 访问密钥
- """
- tools = _get_tools()
- result = await asyncio.to_thread(tools['storage'].sync_from_remote, days=days)
- return json.dumps(result, ensure_ascii=False, indent=2)
- @mcp.tool
- async def get_storage_status() -> str:
- """
- 获取存储配置和状态
- 查看当前存储后端配置、本地和远程存储的状态信息。
- Returns:
- JSON格式的存储状态信息,包含本地/远程存储状态和拉取配置
- """
- tools = _get_tools()
- result = await asyncio.to_thread(tools['storage'].get_storage_status)
- return json.dumps(result, ensure_ascii=False, indent=2)
- @mcp.tool
- async def list_available_dates(
- source: str = "both"
- ) -> str:
- """
- 列出本地/远程可用的日期范围
- 查看本地和远程存储中有哪些日期的数据可用。
- Args:
- source: 数据来源
- - "local": 仅本地
- - "remote": 仅远程
- - "both": 同时列出并对比(默认)
- Returns:
- JSON格式的日期列表,包含各来源的日期信息和对比结果
- Examples:
- - list_available_dates()
- - list_available_dates(source="local")
- """
- tools = _get_tools()
- result = await asyncio.to_thread(tools['storage'].list_available_dates, source=source)
- return json.dumps(result, ensure_ascii=False, indent=2)
- # ==================== 文章内容读取工具 ====================
- @mcp.tool
- async def read_article(
- url: str,
- timeout: int = 30
- ) -> str:
- """
- 读取指定 URL 的文章内容,返回 LLM 友好的 Markdown 格式
- 通过 Jina AI Reader 将网页转换为干净的 Markdown,自动去除广告、导航栏等噪音内容。
- 适合用于:阅读新闻正文、获取文章详情、分析文章内容。
- **典型使用流程:**
- 1. 先用 search_news(include_url=True) 搜索新闻获取链接
- 2. 再用 read_article(url=链接) 读取正文内容
- 3. AI 对 Markdown 正文进行分析、摘要、翻译等
- Args:
- url: 文章链接(必需),以 http:// 或 https:// 开头
- timeout: 请求超时时间(秒),默认 30,最大 60
- Returns:
- JSON格式的文章内容,包含完整 Markdown 正文
- Examples:
- - read_article(url="https://example.com/news/123")
- Note:
- - 使用 Jina AI Reader 免费服务(100 RPM 限制)
- - 每次请求间隔 5 秒(内置速率控制)
- - 部分付费墙/登录墙页面可能无法完整获取
- """
- tools = _get_tools()
- timeout = min(max(timeout, 10), 60)
- result = await asyncio.to_thread(
- tools['article'].read_article,
- url=url, timeout=timeout
- )
- return json.dumps(result, ensure_ascii=False, indent=2)
- @mcp.tool
- async def read_articles_batch(
- urls: List[str],
- timeout: int = 30
- ) -> str:
- """
- 批量读取多篇文章内容(最多 5 篇,间隔 5 秒)
- 逐篇请求文章内容,每篇之间自动间隔 5 秒以遵守速率限制。
- **典型使用流程:**
- 1. 先用 search_news(include_url=True) 搜索新闻获取多个链接
- 2. 再用 read_articles_batch(urls=[...]) 批量读取正文
- 3. AI 对多篇文章进行对比分析、综合报告
- Args:
- urls: 文章链接列表(必需),最多处理 5 篇
- timeout: 每篇的请求超时时间(秒),默认 30
- Returns:
- JSON格式的批量读取结果,包含每篇的完整内容和状态
- Examples:
- - read_articles_batch(urls=["https://a.com/1", "https://b.com/2"])
- Note:
- - 单次最多读取 5 篇,超出部分会被跳过
- - 5 篇约需 25-30 秒(每篇间隔 5 秒)
- - 单篇失败不影响其他篇的读取
- """
- tools = _get_tools()
- timeout = min(max(timeout, 10), 60)
- result = await asyncio.to_thread(
- tools['article'].read_articles_batch,
- urls=urls, timeout=timeout
- )
- return json.dumps(result, ensure_ascii=False, indent=2)
- # ==================== 通知推送工具 ====================
- @mcp.tool
- async def get_channel_format_guide(channel: Optional[str] = None) -> str:
- """
- 获取通知渠道的格式化策略指南
- 返回各渠道支持的 Markdown 特性、格式限制和最佳格式化提示词。
- 在调用 send_notification 之前使用此工具,可以了解目标渠道的格式要求,
- 从而生成最佳排版效果的消息内容。
- 各渠道格式差异概览:
- - 飞书:支持 **粗体**、<font color>彩色文本、[链接](url)、--- 分割线
- - 钉钉:支持 ### 标题、**粗体**、> 引用、--- 分割线,不支持颜色
- - 企业微信:仅支持 **粗体**、[链接](url)、> 引用,不支持标题和分割线
- - Telegram:自动转为 HTML,支持粗体/斜体/删除线/代码/链接/引用块
- - ntfy:支持标准 Markdown,不支持颜色
- - Bark:iOS 推送,仅支持粗体和链接,内容需精简
- - Slack:自动转为 mrkdwn,*粗体*、~删除线~、<url|链接>
- - 邮件:自动转为完整 HTML 网页,支持标题/样式/分割线
- - 通用 Webhook:标准 Markdown 或自定义模板
- Args:
- channel: 指定渠道 ID(可选),不指定返回所有渠道策略
- 可选值: feishu, dingtalk, wework, telegram, email, ntfy, bark, slack, generic_webhook
- Returns:
- JSON格式的渠道格式化策略,包含支持特性、限制和格式化提示词
- Examples:
- - get_channel_format_guide() # 获取所有渠道策略
- - get_channel_format_guide(channel="feishu") # 获取飞书策略
- - get_channel_format_guide(channel="telegram") # 获取 Telegram 策略
- """
- tools = _get_tools()
- result = await asyncio.to_thread(
- tools['notification'].get_channel_format_guide,
- channel=channel
- )
- return json.dumps(result, ensure_ascii=False, indent=2)
- @mcp.tool
- async def get_notification_channels() -> str:
- """
- 获取所有已配置的通知渠道及其状态
- 检测 config.yaml 和 .env 环境变量中的通知渠道配置。
- 支持 9 个渠道:飞书、钉钉、企业微信、Telegram、邮件、ntfy、Bark、Slack、通用 Webhook。
- Returns:
- JSON格式的渠道状态,包含每个渠道是否已配置及配置来源
- Examples:
- - get_notification_channels()
- """
- tools = _get_tools()
- result = await asyncio.to_thread(tools['notification'].get_notification_channels)
- return json.dumps(result, ensure_ascii=False, indent=2)
- @mcp.tool
- async def send_notification(
- message: str,
- title: str = "TrendRadar 通知",
- channels: Optional[List[str]] = None,
- ) -> str:
- """
- 向已配置的通知渠道发送消息
- 接受 markdown 格式内容,内部自动适配各渠道的格式要求和限制:
- - 飞书:Markdown 卡片消息(支持 **粗体**、<font color>彩色文本、[链接](url)、---)
- - 钉钉:Markdown(自动降级标题为 ###、剥离 <font> 标签和删除线)
- - 企业微信:Markdown(自动剥离 # 标题、---、<font> 标签、删除线)
- - Telegram:HTML(自动转换 **→<b>、*→<i>、~~→<s>、>→<blockquote>)
- - Email:HTML 邮件(完整网页样式,支持 # 标题、---、粗体斜体)
- - ntfy:Markdown(自动剥离 <font> 标签)
- - Bark:Markdown(自动简化为粗体+链接,适配 iOS 推送)
- - Slack:mrkdwn(自动转换 **→*、~~→~、[text](url)→<url|text>)
- - 通用 Webhook:Markdown(支持自定义模板)
- 提示:发送前可调用 get_channel_format_guide 获取目标渠道的详细格式化策略,
- 以生成最佳排版效果的消息内容。
- Args:
- message: markdown 格式的消息内容(必需)
- title: 消息标题,默认 "TrendRadar 通知"
- channels: 指定发送的渠道列表,不指定则发送到所有已配置渠道
- 可选值: feishu, dingtalk, wework, telegram, email, ntfy, bark, slack, generic_webhook
- Returns:
- JSON格式的发送结果,包含每个渠道的发送状态
- Examples:
- - send_notification(message="**测试消息**\\n这是一条测试通知")
- - send_notification(message="紧急通知", title="系统告警", channels=["feishu", "dingtalk"])
- """
- tools = _get_tools()
- result = await asyncio.to_thread(
- tools['notification'].send_notification,
- message=message, title=title, channels=channels
- )
- return json.dumps(result, ensure_ascii=False, indent=2)
- # ==================== 启动入口 ====================
- def run_server(
- project_root: Optional[str] = None,
- transport: str = 'stdio',
- host: str = '0.0.0.0',
- port: int = 3333
- ):
- """
- 启动 MCP 服务器
- Args:
- project_root: 项目根目录路径
- transport: 传输模式,'stdio' 或 'http'
- host: HTTP模式的监听地址,默认 0.0.0.0
- port: HTTP模式的监听端口,默认 3333
- """
- # 初始化工具实例
- _get_tools(project_root)
- # 打印启动信息
- print()
- print("=" * 60)
- print(" TrendRadar MCP Server - FastMCP 2.0")
- print("=" * 60)
- print(f" 传输模式: {transport.upper()}")
- if transport == 'stdio':
- print(" 协议: MCP over stdio (标准输入输出)")
- print(" 说明: 通过标准输入输出与 MCP 客户端通信")
- elif transport == 'http':
- print(f" 协议: MCP over HTTP (生产环境)")
- print(f" 服务器监听: {host}:{port}")
- if project_root:
- print(f" 项目目录: {project_root}")
- else:
- print(" 项目目录: 当前目录")
- print()
- print(" 已注册的工具:")
- print(" === 日期解析工具(推荐优先调用)===")
- print(" 0. resolve_date_range - 解析自然语言日期为标准格式")
- print()
- print(" === 基础数据查询(P0核心)===")
- print(" 1. get_latest_news - 获取最新新闻")
- print(" 2. get_news_by_date - 按日期查询新闻(支持自然语言)")
- print(" 3. get_trending_topics - 获取趋势话题(支持自动提取)")
- print()
- print(" === RSS 数据查询 ===")
- print(" 4. get_latest_rss - 获取最新 RSS 订阅数据")
- print(" 5. search_rss - 搜索 RSS 数据")
- print(" 6. get_rss_feeds_status - 获取 RSS 源状态")
- print()
- print(" === 智能检索工具 ===")
- print(" 7. search_news - 统一新闻搜索(关键词/模糊/实体)")
- print(" 8. find_related_news - 相关新闻查找(支持历史数据)")
- print()
- print(" === 高级数据分析 ===")
- print(" 9. analyze_topic_trend - 统一话题趋势分析(热度/生命周期/爆火/预测)")
- print(" 10. analyze_data_insights - 统一数据洞察分析(平台对比/活跃度/关键词共现)")
- print(" 11. analyze_sentiment - 情感倾向分析")
- print(" 12. aggregate_news - 跨平台新闻聚合去重")
- print(" 13. compare_periods - 时期对比分析(周环比/月环比)")
- print(" 14. generate_summary_report - 每日/每周摘要生成")
- print()
- print(" === 配置与系统管理 ===")
- print(" 15. get_current_config - 获取当前系统配置")
- print(" 16. get_system_status - 获取系统运行状态")
- print(" 17. check_version - 检查版本更新(对比本地与远程版本)")
- print(" 18. trigger_crawl - 手动触发爬取任务")
- print()
- print(" === 存储同步工具 ===")
- print(" 19. sync_from_remote - 从远程存储拉取数据到本地")
- print(" 20. get_storage_status - 获取存储配置和状态")
- print(" 21. list_available_dates - 列出本地/远程可用日期")
- print()
- print(" === 文章内容读取 ===")
- print(" 22. read_article - 读取单篇文章内容(Markdown格式)")
- print(" 23. read_articles_batch - 批量读取多篇文章(自动限速)")
- print()
- print(" === 通知推送工具 ===")
- print(" 24. get_channel_format_guide - 获取渠道格式化策略指南(提示词)")
- print(" 25. get_notification_channels - 获取已配置的通知渠道状态")
- print(" 26. send_notification - 向通知渠道发送消息(自动适配格式)")
- print("=" * 60)
- print()
- # 根据传输模式运行服务器
- if transport == 'stdio':
- mcp.run(transport='stdio')
- elif transport == 'http':
- # HTTP 模式(生产推荐)
- mcp.run(
- transport='http',
- host=host,
- port=port,
- path='/mcp' # HTTP 端点路径
- )
- else:
- raise ValueError(f"不支持的传输模式: {transport}")
- if __name__ == '__main__':
- import argparse
- parser = argparse.ArgumentParser(
- description='TrendRadar MCP Server - 新闻热点聚合 MCP 工具服务器',
- formatter_class=argparse.RawDescriptionHelpFormatter,
- epilog="""
- 详细配置教程请查看: README-Cherry-Studio.md
- """
- )
- parser.add_argument(
- '--transport',
- choices=['stdio', 'http'],
- default='stdio',
- help='传输模式:stdio (默认) 或 http (生产环境)'
- )
- parser.add_argument(
- '--host',
- default='0.0.0.0',
- help='HTTP模式的监听地址,默认 0.0.0.0'
- )
- parser.add_argument(
- '--port',
- type=int,
- default=3333,
- help='HTTP模式的监听端口,默认 3333'
- )
- parser.add_argument(
- '--project-root',
- help='项目根目录路径'
- )
- args = parser.parse_args()
- run_server(
- project_root=args.project_root,
- transport=args.transport,
- host=args.host,
- port=args.port
- )
|