System Architecture

架构文档

给想读源码或贡献代码的人写的。假设你有 Python 经验,知道 async/await 是什么。

系统架构总览

整个系统分三层,数据从左到右流动。每一层都可以独立运行和测试。

Layer 1

Collector

数据采集层。从各种数据源抓取原始信息,做初步清洗和去重。

Layer 2

Analyzer

分析引擎。对清洗后的数据做情感分析、趋势识别和交叉验证。

Layer 3

Advisor

建议生成器。把分析结果转化为可执行的投资建议。

层与层之间通过消息队列(Redis Streams)通信。Collector 把抓到的数据写入 stream:raw_articles,Analyzer 消费这个 stream 并把结果写入 stream:insights,Advisor 消费 insights 生成最终建议。

这个设计的好处是你可以单独跑 Collector 攒数据,等攒够了再跑 Analyzer。也可以同时跑多个 Collector 实例抓不同的数据源。

核心模块

Collector 数据采集

每个数据源对应一个 Collector 子类。基类 BaseCollector 定义了 fetch() 接口,子类实现具体的抓取逻辑。内置了限速器(令牌桶算法),防止被数据源封 IP。抓到的数据统一转成 Article 对象,包含标题、正文、来源、时间戳和原始 URL。去重用的是 URL + 标题的 MD5 哈希。

Analyzer 分析引擎

分析管道由多个 Stage 组成,按顺序执行:Preprocessor(分词、去停用词)→ SentimentScorer(情感打分,-1 到 1)→ TrendDetector(趋势识别,基于时间序列的异常检测)→ CrossValidator(交叉验证,多源信息互相印证)。每个 Stage 都是独立的类,可以单独替换。v0.3.0 新增的情感分析引擎用的是 fine-tuned 的 BERT 模型,在 A 股新闻语料上做了微调。

Advisor 建议生成

拿到 Analyzer 的输出后,Advisor 做两件事:一是根据用户配置的风险偏好过滤和排序,二是生成人类可读的建议文本。排序算法综合考虑信心度、时效性和信息源的权威性。文本生成用的是模板 + LLM 混合方案——结构化部分用模板保证格式一致,解释性文字用 LLM 生成。

数据流向

一条数据从进入系统到变成投资建议,经过这些步骤:

# 完整的数据流向

1. 数据源 (新闻网站 / 社交平台 / 公告系统)
   │
   ▼  HTTP 请求,带限速和重试
2. Collector.fetch()
   │  原始 HTML/JSON → 解析 → Article 对象
   │  去重(URL + 标题 MD5)
   │  写入 PostgreSQL articles 表
   ▼  发布到 Redis Stream: raw_articles
3. Analyzer Pipeline
   │  Stage 1: Preprocessor — 分词、去停用词、提取实体
   │  Stage 2: SentimentScorer — 情感打分 [-1, 1]
   │  Stage 3: TrendDetector — 时间序列异常检测
   │  Stage 4: CrossValidator — 多源交叉验证
   │  写入 PostgreSQL insights 表
   ▼  发布到 Redis Stream: insights
4. Advisor.generate()
   │  过滤(按 min_confidence 阈值)
   │  排序(信心度 × 时效性 × 源权威性)
   │  文本生成(模板 + LLM)
   ▼  写入 PostgreSQL recommendations 表
5. API / CLI 输出
   最终用户通过 REST API 或命令行获取建议

插件系统

Collector 和 Analyzer 都支持插件扩展。写法很直接——继承基类,实现接口方法。

自定义 Collector

继承 BaseCollector,实现 async fetch(since: datetime) -> list[Article] 方法。放到 muyuan/collectors/ 目录下,或者通过 config.yaml 的 custom_sources 字段指定外部模块路径。

from muyuan.collectors.base import BaseCollector, Article

class WeChatCollector(BaseCollector):
    """抓取微信公众号文章(需要配合 wechaty)"""

    name = "wechat"
    requires_api_key = True  # 需要 wechaty token

    async def fetch(self, since):
        # 你的抓取逻辑
        articles = await self._get_articles_from_wechaty(since)
        return [
            Article(
                title=a.title,
                content=a.content,
                source=self.name,
                published_at=a.pub_time,
                url=a.url,
                metadata={"account": a.account_name}
            )
            for a in articles
        ]

自定义 Analyzer Stage

继承 BaseStage,实现 process(articles: list[Article]) -> list[Insight]。然后在 config.yaml 的 analyzer.pipeline 里加上你的 Stage。

from muyuan.analyzer.base import BaseStage, Insight

class IndustryCorrelation(BaseStage):
    """分析行业间的关联性"""

    name = "industry_correlation"
    order = 250  # 在 SentimentScorer(200) 之后,TrendDetector(300) 之前

    def process(self, articles, context):
        # context 包含前序 Stage 的输出
        sentiments = context.get("sentiment_scores", {})

        correlations = self._compute_correlations(articles, sentiments)

        return [
            Insight(
                type="correlation",
                sectors=pair,
                strength=score,
                direction=direction,
            )
            for pair, (score, direction) in correlations.items()
            if score > 0.7
        ]

关于 Stage 的 order 字段:数字越小越先执行。内置 Stage 的 order 间隔是 100(Preprocessor=100, SentimentScorer=200, TrendDetector=300, CrossValidator=400),你的自定义 Stage 可以插在任意位置。

技术栈

Python 3.10+ FastAPI PostgreSQL 14+ Redis 7+ SQLAlchemy 2.0 Pydantic v2 httpx transformers jieba pytest Docker Alembic

选 FastAPI 是因为 async 原生支持好,Collector 的 IO 密集型任务受益明显。PostgreSQL 存持久化数据,Redis 做缓存和消息队列(用 Streams,不是 Pub/Sub)。ORM 用 SQLAlchemy 2.0 的新式写法(不是旧的 declarative_base)。

情感分析模型基于 HuggingFace transformers,在 A 股新闻语料上做了 fine-tune。分词用 jieba,因为在金融领域的分词效果比 pkuseg 好(我们测过,jieba 加自定义词典后 F1 高 4 个百分点)。

目录结构

muyuan-ai/
├── muyuan/
│   ├── __init__.py
│   ├── app.py              # FastAPI 应用入口
│   ├── config.py            # 配置加载
│   ├── models/              # SQLAlchemy 模型
│   │   ├── article.py
│   │   ├── insight.py
│   │   └── recommendation.py
│   ├── collectors/          # 数据采集器
│   │   ├── base.py
│   │   ├── sina_news.py
│   │   ├── eastmoney.py
│   │   └── ...
│   ├── analyzer/            # 分析引擎
│   │   ├── base.py
│   │   ├── pipeline.py
│   │   ├── preprocessor.py
│   │   ├── sentiment.py
│   │   ├── trend.py
│   │   └── validator.py
│   ├── advisor/             # 建议生成
│   │   ├── generator.py
│   │   └── templates/
│   ├── api/                 # REST API 路由
│   │   ├── collect.py
│   │   ├── analyze.py
│   │   ├── advise.py
│   │   └── status.py
│   └── utils/
├── tests/
├── alembic/                 # 数据库迁移
├── config.yaml
├── docker-compose.yml
└── pyproject.toml