在信息爆炸的时代,如何从海量资讯中精准捕捉关键热点,成为企业决策和个人信息获取的核心挑战。TrendRadar 作为一个基于 MCP(Model Context Protocol)协议的多平台热点聚合与 AI 分析引擎,通过创新的架构设计,实现了 35 个平台的实时舆情监控、智能筛选与多通道推送。本文将深入解析其技术架构、实现原理及可落地的工程参数。
MCP 协议:AI 分析引擎的标准化接口
MCP 协议由 Anthropic 于 2024 年 11 月推出,旨在解决大模型与外部数据源和工具之间的通信标准化问题。正如 CSDN 技术文章所述,MCP 可以比作 "AI 扩展坞",为不同的大模型提供统一的工具调用接口。
TrendRadar 充分利用 MCP 协议的标准化优势,构建了 C/S(客户端 / 服务器)架构的 AI 分析引擎:
1. MCP 服务器架构设计
TrendRadar 的 MCP 服务器提供 13 种核心分析工具,涵盖:
- 基础查询工具:
get_latest_news、get_news_by_date、get_trending_topics - 智能检索工具:
search_news、search_related_news_history - 高级分析工具:
analyze_topic_trend、analyze_data_insights、analyze_sentiment - 系统管理工具:
get_current_config、get_system_status、trigger_crawl
每个工具都遵循 MCP 协议的标准化接口规范,支持 JSON-RPC 2.0 通信协议,确保与不同 AI 客户端的兼容性。
2. 双容器部署架构
TrendRadar 采用微服务架构,将新闻推送服务与 MCP 分析服务分离:
# docker-compose.yml 配置示例
version: '3.8'
services:
trend-radar:
image: wantcat/trendradar:latest
ports:
- "8080:8080"
volumes:
- ./config:/app/config:ro
- ./output:/app/output
environment:
- TZ=Asia/Shanghai
- ENABLE_CRAWLER=true
- REPORT_MODE=daily
trend-radar-mcp:
image: wantcat/trendradar-mcp:latest
ports:
- "127.0.0.1:3333:3333"
volumes:
- ./config:/app/config:ro
- ./output:/app/output:ro
这种架构设计允许:
- 独立扩展:新闻推送和 AI 分析服务可分别扩容
- 故障隔离:一个服务故障不影响另一个服务
- 资源优化:根据负载需求分配计算资源
多平台聚合的技术挑战与解决方案
监控 35 个平台(包括抖音、知乎、B 站、华尔街见闻、财联社等)面临诸多技术挑战,TrendRadar 通过以下方案应对:
1. 异构 API 的统一适配
不同平台的 API 接口、数据格式、认证方式各不相同。TrendRadar 采用适配器模式构建统一的数据采集层:
# 伪代码示例:平台适配器抽象
class PlatformAdapter:
def fetch_hot_topics(self) -> List[NewsItem]:
pass
class ZhihuAdapter(PlatformAdapter):
def __init__(self):
self.base_url = "https://www.zhihu.com/api/v4"
self.headers = {"User-Agent": "TrendRadar/1.0"}
def fetch_hot_topics(self) -> List[NewsItem]:
# 知乎特定的API调用逻辑
response = requests.get(
f"{self.base_url}/topstory/hot-list",
headers=self.headers,
timeout=10
)
return self._parse_response(response.json())
class WeiboAdapter(PlatformAdapter):
# 微博特定的适配逻辑
pass
2. 速率限制与容错机制
为防止被平台封禁,TrendRadar 实现了智能的速率控制:
# config/config.yaml 配置示例
crawler:
enable_crawler: true
request_timeout: 30
max_retries: 3
retry_delay: 5
rate_limit:
zhihu: "1 request/2s"
weibo: "1 request/3s"
bilibili: "1 request/1s"
关键参数说明:
- 请求超时:30 秒,避免长时间阻塞
- 最大重试:3 次,提高采集成功率
- 重试延迟:5 秒,避免频繁重试
- 平台特定限速:根据平台规则定制化配置
3. 数据去重与标准化
多平台聚合常遇到同一新闻在不同平台重复出现的问题。TrendRadar 采用以下策略:
def deduplicate_news(news_list: List[NewsItem]) -> List[NewsItem]:
"""基于内容相似度的新闻去重"""
seen_titles = set()
deduplicated = []
for news in news_list:
# 1. 标题标准化处理
normalized_title = normalize_title(news.title)
# 2. 基于SimHash的相似度检测
title_hash = simhash(normalized_title)
# 3. 去重逻辑
if title_hash not in seen_titles:
seen_titles.add(title_hash)
deduplicated.append(news)
return deduplicated
智能筛选与推送策略
TrendRadar 提供三种推送模式,满足不同场景需求:
1. 推送模式配置参数
report:
mode: "daily" # 可选: "daily" | "incremental" | "current"
rank_threshold: 5
max_news_per_keyword: 10
reverse_content_order: false
notification:
push_window:
enabled: true
time_range:
start: "09:00"
end: "18:00"
once_per_day: false
模式对比分析:
- 当日汇总模式(daily):适合需要全天完整报告的用户,如企业管理者
- 增量监控模式(incremental):适合关注新动态的用户,如投资者、交易员
- 当前榜单模式(current):适合实时追踪热度的用户,如自媒体人
2. 关键词筛选高级语法
TrendRadar 支持 5 种关键词语法,实现精准内容筛选:
# frequency_words.txt 配置示例
[GLOBAL_FILTER]
广告
推广
营销
震惊
标题党
[WORD_GROUPS]
# 第1组:科技AI类
科技
AI
ChatGPT
+技术
!培训
@15 # 最多显示15条
# 第2组:新能源汽车类
特斯拉
比亚迪
蔚来
+销量
!股价预测
# 第3组:股市投资类
A股
上证
深证
+涨跌
!专家预测
@10 # 最多显示10条
语法说明:
- 普通词:基础匹配,包含任意一个即可
- 必须词(+):限定范围,必须同时包含
- 过滤词(!):排除干扰,包含则直接排除
- 数量限制(@):控制显示数量
- 全局过滤:任何情况下都过滤指定内容
存储架构的工程化设计
v4.0.0 版本对存储架构进行了全面重构,支持多存储后端:
1. 存储配置参数
storage:
backend: "auto" # auto | local | remote
formats:
sqlite: true # SQLite数据库存储
txt: true # 文本快照
html: true # HTML报告
local:
data_dir: "output"
retention_days: 30 # 本地保留30天数据
remote:
endpoint_url: "https://<account-id>.r2.cloudflarestorage.com"
bucket_name: "trendradar-data"
access_key_id: "${S3_ACCESS_KEY_ID}"
secret_access_key: "${S3_SECRET_ACCESS_KEY}"
region: "auto"
retention_days: 30 # 远程保留30天数据
2. 后端选择策略
- GitHub Actions 环境:自动选择远程存储,避免污染 Git 仓库
- Docker / 本地环境:自动选择本地 SQLite,数据完全可控
- 手动指定:根据需求强制使用特定后端
3. 数据同步机制
class StorageManager:
def sync_data(self, local_data: Dict, remote_data: Dict) -> Dict:
"""本地与远程数据同步"""
# 1. 冲突检测与解决
conflicts = self.detect_conflicts(local_data, remote_data)
# 2. 基于时间戳的合并策略
merged_data = self.merge_by_timestamp(
local_data,
remote_data,
conflict_strategy="newer_wins"
)
# 3. 数据完整性验证
self.validate_integrity(merged_data)
return merged_data
多通道推送的容错设计
支持 8 种推送渠道,每种渠道都有特定的容错机制:
1. 推送配置参数
notification:
max_accounts_per_channel: 3
webhooks:
feishu_url: "https://hook1.feishu.cn/xxx;https://hook2.feishu.cn/yyy"
wework_url: "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx"
dingtalk_url: "https://oapi.dingtalk.com/robot/send?access_token=xxx"
telegram_bot_token: "123456:AAA-BBB;789012:CCC-DDD"
telegram_chat_id: "-100111;-100222"
2. 分批推送与重试机制
class PushService:
def send_message(self, content: str, channel_config: Dict) -> bool:
"""智能分批推送"""
# 1. 内容分批处理
batches = self.split_content(content, channel_config["max_size"])
success_count = 0
for i, batch in enumerate(batches):
# 2. 带重试的发送逻辑
for attempt in range(3):
try:
result = self._send_single_batch(batch, channel_config)
if result:
success_count += 1
break
except Exception as e:
if attempt == 2: # 最后一次尝试
self.log_error(f"推送失败: {e}")
continue
time.sleep(2 ** attempt) # 指数退避
# 3. 部分成功判定
return success_count > 0
3. 推送优先级策略
- 企业微信 / 个人微信:最高优先级,支持 Markdown 格式
- 飞书 / 钉钉:中等优先级,支持分批推送
- Telegram / 邮件:标准优先级,支持大容量内容
- ntfy/Bark/Slack:补充渠道,适合特定场景
监控与运维要点
1. 健康检查端点
# 健康检查API设计
@app.route("/health")
def health_check():
return {
"status": "healthy",
"version": "4.0.3",
"services": {
"crawler": check_crawler_status(),
"storage": check_storage_status(),
"mcp_server": check_mcp_server_status(),
"push_service": check_push_service_status()
},
"timestamp": datetime.now().isoformat()
}
2. 关键监控指标
# Prometheus监控指标示例
metrics:
crawler:
requests_total: "trendradar_crawler_requests_total"
success_rate: "trendradar_crawler_success_rate"
response_time: "trendradar_crawler_response_time_seconds"
storage:
used_space: "trendradar_storage_used_bytes"
sync_latency: "trendradar_storage_sync_latency_seconds"
push:
success_count: "trendradar_push_success_total"
failure_count: "trendradar_push_failure_total"
latency: "trendradar_push_latency_seconds"
3. 告警阈值配置
alerts:
crawler:
failure_rate: ">0.1" # 失败率超过10%
response_time: ">30" # 响应时间超过30秒
storage:
used_space: ">0.8" # 存储使用超过80%
sync_failure: ">3" # 连续同步失败3次
push:
failure_rate: ">0.2" # 推送失败率超过20%
部署最佳实践
1. GitHub Actions 部署参数
# .github/workflows/crawler.yml
name: Get Hot News
on:
schedule:
- cron: "0 */2 * * *" # 每2小时运行一次
workflow_dispatch: # 支持手动触发
jobs:
crawl:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Run TrendRadar
env:
S3_BUCKET_NAME: ${{ secrets.S3_BUCKET_NAME }}
S3_ACCESS_KEY_ID: ${{ secrets.S3_ACCESS_KEY_ID }}
S3_SECRET_ACCESS_KEY: ${{ secrets.S3_SECRET_ACCESS_KEY }}
FEISHU_WEBHOOK_URL: ${{ secrets.FEISHU_WEBHOOK_URL }}
run: |
python main.py
2. Docker 生产环境配置
# 生产环境启动脚本
#!/bin/bash
# 设置资源限制
docker run -d \
--name trend-radar \
--restart unless-stopped \
--memory="512m" \
--cpus="1.0" \
--log-opt max-size=10m \
--log-opt max-file=3 \
-p 8080:8080 \
-v /data/trendradar/config:/app/config:ro \
-v /data/trendradar/output:/app/output \
-e TZ=Asia/Shanghai \
-e REPORT_MODE=incremental \
-e PUSH_WINDOW_ENABLED=true \
-e PUSH_WINDOW_START=09:00 \
-e PUSH_WINDOW_END=18:00 \
wantcat/trendradar:latest
总结与展望
TrendRadar 通过 MCP 协议的标准化接口、多平台聚合的智能适配、分层存储架构的工程化设计,构建了一个高效、可靠的热点监控与分析系统。其核心价值在于:
- 标准化与开放性:基于 MCP 协议,可与任何支持该协议的 AI 客户端无缝集成
- 可扩展性:模块化设计支持快速添加新平台、新分析工具
- 可靠性:多层次容错机制确保系统稳定运行
- 易用性:丰富的配置选项满足不同用户需求
随着 MCP 生态的不断完善,TrendRadar 这类基于标准化协议的系统将在 AI 应用开发中发挥越来越重要的作用。未来可期待更多专业领域的 MCP 服务器出现,形成丰富的 AI 工具生态系统。
资料来源:
- TrendRadar GitHub 仓库:https://github.com/sansan0/TrendRadar
- MCP 协议技术解析:https://blog.csdn.net/garyond/article/details/149020954