System Architecture
架构文档
给想读源码或贡献代码的人写的。假设你有 Python 经验,知道 async/await 是什么。
系统架构总览
整个系统分三层,数据从左到右流动。每一层都可以独立运行和测试。
Layer 1
Collector
数据采集层。从各种数据源抓取原始信息,做初步清洗和去重。
Layer 2
Analyzer
分析引擎。对清洗后的数据做情感分析、趋势识别和交叉验证。
Layer 3
Advisor
建议生成器。把分析结果转化为可执行的投资建议。
层与层之间通过消息队列(Redis Streams)通信。Collector 把抓到的数据写入 stream:raw_articles,Analyzer 消费这个 stream 并把结果写入 stream:insights,Advisor 消费 insights 生成最终建议。
这个设计的好处是你可以单独跑 Collector 攒数据,等攒够了再跑 Analyzer。也可以同时跑多个 Collector 实例抓不同的数据源。
核心模块
Collector 数据采集
每个数据源对应一个 Collector 子类。基类 BaseCollector 定义了 fetch() 接口,子类实现具体的抓取逻辑。内置了限速器(令牌桶算法),防止被数据源封 IP。抓到的数据统一转成 Article 对象,包含标题、正文、来源、时间戳和原始 URL。去重用的是 URL + 标题的 MD5 哈希。
Analyzer 分析引擎
分析管道由多个 Stage 组成,按顺序执行:Preprocessor(分词、去停用词)→ SentimentScorer(情感打分,-1 到 1)→ TrendDetector(趋势识别,基于时间序列的异常检测)→ CrossValidator(交叉验证,多源信息互相印证)。每个 Stage 都是独立的类,可以单独替换。v0.3.0 新增的情感分析引擎用的是 fine-tuned 的 BERT 模型,在 A 股新闻语料上做了微调。
Advisor 建议生成
拿到 Analyzer 的输出后,Advisor 做两件事:一是根据用户配置的风险偏好过滤和排序,二是生成人类可读的建议文本。排序算法综合考虑信心度、时效性和信息源的权威性。文本生成用的是模板 + LLM 混合方案——结构化部分用模板保证格式一致,解释性文字用 LLM 生成。
数据流向
一条数据从进入系统到变成投资建议,经过这些步骤:
# 完整的数据流向 1. 数据源 (新闻网站 / 社交平台 / 公告系统) │ ▼ HTTP 请求,带限速和重试 2. Collector.fetch() │ 原始 HTML/JSON → 解析 → Article 对象 │ 去重(URL + 标题 MD5) │ 写入 PostgreSQL articles 表 ▼ 发布到 Redis Stream: raw_articles 3. Analyzer Pipeline │ Stage 1: Preprocessor — 分词、去停用词、提取实体 │ Stage 2: SentimentScorer — 情感打分 [-1, 1] │ Stage 3: TrendDetector — 时间序列异常检测 │ Stage 4: CrossValidator — 多源交叉验证 │ 写入 PostgreSQL insights 表 ▼ 发布到 Redis Stream: insights 4. Advisor.generate() │ 过滤(按 min_confidence 阈值) │ 排序(信心度 × 时效性 × 源权威性) │ 文本生成(模板 + LLM) ▼ 写入 PostgreSQL recommendations 表 5. API / CLI 输出 最终用户通过 REST API 或命令行获取建议
插件系统
Collector 和 Analyzer 都支持插件扩展。写法很直接——继承基类,实现接口方法。
自定义 Collector
继承 BaseCollector,实现 async fetch(since: datetime) -> list[Article] 方法。放到 muyuan/collectors/ 目录下,或者通过 config.yaml 的 custom_sources 字段指定外部模块路径。
from muyuan.collectors.base import BaseCollector, Article class WeChatCollector(BaseCollector): """抓取微信公众号文章(需要配合 wechaty)""" name = "wechat" requires_api_key = True # 需要 wechaty token async def fetch(self, since): # 你的抓取逻辑 articles = await self._get_articles_from_wechaty(since) return [ Article( title=a.title, content=a.content, source=self.name, published_at=a.pub_time, url=a.url, metadata={"account": a.account_name} ) for a in articles ]
自定义 Analyzer Stage
继承 BaseStage,实现 process(articles: list[Article]) -> list[Insight]。然后在 config.yaml 的 analyzer.pipeline 里加上你的 Stage。
from muyuan.analyzer.base import BaseStage, Insight class IndustryCorrelation(BaseStage): """分析行业间的关联性""" name = "industry_correlation" order = 250 # 在 SentimentScorer(200) 之后,TrendDetector(300) 之前 def process(self, articles, context): # context 包含前序 Stage 的输出 sentiments = context.get("sentiment_scores", {}) correlations = self._compute_correlations(articles, sentiments) return [ Insight( type="correlation", sectors=pair, strength=score, direction=direction, ) for pair, (score, direction) in correlations.items() if score > 0.7 ]
关于 Stage 的 order 字段:数字越小越先执行。内置 Stage 的 order 间隔是 100(Preprocessor=100, SentimentScorer=200, TrendDetector=300, CrossValidator=400),你的自定义 Stage 可以插在任意位置。
技术栈
选 FastAPI 是因为 async 原生支持好,Collector 的 IO 密集型任务受益明显。PostgreSQL 存持久化数据,Redis 做缓存和消息队列(用 Streams,不是 Pub/Sub)。ORM 用 SQLAlchemy 2.0 的新式写法(不是旧的 declarative_base)。
情感分析模型基于 HuggingFace transformers,在 A 股新闻语料上做了 fine-tune。分词用 jieba,因为在金融领域的分词效果比 pkuseg 好(我们测过,jieba 加自定义词典后 F1 高 4 个百分点)。
目录结构
muyuan-ai/ ├── muyuan/ │ ├── __init__.py │ ├── app.py # FastAPI 应用入口 │ ├── config.py # 配置加载 │ ├── models/ # SQLAlchemy 模型 │ │ ├── article.py │ │ ├── insight.py │ │ └── recommendation.py │ ├── collectors/ # 数据采集器 │ │ ├── base.py │ │ ├── sina_news.py │ │ ├── eastmoney.py │ │ └── ... │ ├── analyzer/ # 分析引擎 │ │ ├── base.py │ │ ├── pipeline.py │ │ ├── preprocessor.py │ │ ├── sentiment.py │ │ ├── trend.py │ │ └── validator.py │ ├── advisor/ # 建议生成 │ │ ├── generator.py │ │ └── templates/ │ ├── api/ # REST API 路由 │ │ ├── collect.py │ │ ├── analyze.py │ │ ├── advise.py │ │ └── status.py │ └── utils/ ├── tests/ ├── alembic/ # 数据库迁移 ├── config.yaml ├── docker-compose.yml └── pyproject.toml
