Hotdry.
systems-engineering

Go 语言生产级事件溯源实现:持久化事件存储、投影与快照

基于 Go 实现可扩展 CQRS 的事件溯源系统,包括追加式事件存储、聚合重建、高效投影和快照策略,提供生产参数与监控要点。

事件溯源(Event Sourcing)结合 CQRS(Command Query Responsibility Segregation)是构建高可扩展系统架构的核心模式,尤其适用于复杂领域如金融、电商订单系统。在 Go 语言中实现生产级事件溯源,需要关注持久化追加式事件存储、聚合根重建、高效投影构建以及快照优化。本文聚焦单一技术栈:使用 PostgreSQL 作为事件存储,提供完整代码框架、可落地参数和监控策略,确保系统在高并发下稳定运行。

事件存储设计:追加式持久化

事件存储是事件溯源的核心,必须支持原子追加、高吞吐和版本控制。推荐使用 PostgreSQL 的 JSONB 类型存储事件数据,支持索引和事务。

数据库 Schema

CREATE TABLE events (
    id BIGSERIAL PRIMARY KEY,
    aggregate_id UUID NOT NULL,
    type VARCHAR(255) NOT NULL,
    data JSONB NOT NULL,
    version BIGINT NOT NULL,
    timestamp TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_events_aggregate_id_version ON events (aggregate_id, version);
CREATE INDEX idx_events_aggregate_id_type ON events (aggregate_id, type);
  • aggregate_id:聚合根 ID,按流分组事件。
  • version:乐观并发版本号,追加时检查 MAX(version) + 1
  • data:事件负载,如 {"amount": 100}

追加事件 API

使用 sqlc 或 pgx 实现追加逻辑:

type Event struct {
    ID         int64
    AggregateID uuid.UUID
    Type       string
    Data       json.RawMessage
    Version    int64
    Timestamp  time.Time
}

func (es *EventStore) Append(ctx context.Context, aggregateID uuid.UUID, expectedVersion int64, events ...DomainEvent) error {
    tx, err := es.db.BeginTx(ctx, nil)
    if err != nil { return err }
    defer tx.Rollback(ctx)

    var maxVer int64
    err = tx.QueryRow(ctx, "SELECT COALESCE(MAX(version), 0) FROM events WHERE aggregate_id = $1", aggregateID).Scan(&maxVer)
    if err != nil { return err }
    if maxVer != expectedVersion { return ErrConcurrency }

    for i, ev := range events {
        _, err := tx.Exec(ctx, `
            INSERT INTO events (aggregate_id, type, data, version) 
            VALUES ($1, $2, $3, $4)`, aggregateID, ev.Type(), ev.Marshal(), maxVer+int64(i+1))
        if err != nil { return err }
    }
    return tx.Commit(ctx)
}

生产参数:

  • 批追加阈值:5-10 个事件 / 批,降低锁竞争。
  • 超时:500ms,结合 context。
  • 监控:追加 QPS > 1000/s,错误率 < 0.1%。

聚合根重建:事件重放

聚合根不存储状态,仅通过事件重放计算。示例:银行账户聚合。

type Account struct {
    ID      uuid.UUID
    Balance int64
    Version int64
    changes []DomainEvent // 变更事件
}

func (a *Account) Apply(e DomainEvent) {
    switch ev := e.(type) {
    case Deposited: a.Balance += ev.Amount
    case Withdrawn: a.Balance -= ev.Amount
    }
}

func (es *EventStore) LoadAggregate(ctx context.Context, id uuid.UUID, snapshotVersion int64) (*Account, error) {
    agg := &Account{ID: id}

    // 加载快照(若存在)
    snapshot, err := es.LoadSnapshot(ctx, id, snapshotVersion)
    if err == nil {
        agg.Unmarshal(snapshot.Data)
        snapshotVersion = snapshot.Version
    }

    // 加载后续事件并重放
    rows, err := es.db.Query(ctx, `
        SELECT type, data FROM events 
        WHERE aggregate_id = $1 AND version > $2 
        ORDER BY version`, id, snapshotVersion)
    // ... 扫描并 agg.Apply(ev)

    return agg, nil
}
  • 重放上限:1000 事件 / 次,超阈使用快照。
  • 风险:事件模式演化,使用 Upcaster 接口转换旧事件。

快照策略:性能优化

快照定期持久化聚合状态,减少重放开销。

CREATE TABLE snapshots (
    aggregate_id UUID PRIMARY KEY,
    version BIGINT NOT NULL,
    data JSONB NOT NULL,
    timestamp TIMESTAMPTZ DEFAULT NOW()
);

快照规则:每 100 事件或 1 小时生成一次。

func (es *EventStore) SaveSnapshot(ctx context.Context, agg Aggregate) error {
    return es.db.Exec(ctx, `
        INSERT INTO snapshots (aggregate_id, version, data) 
        VALUES ($1, $2, $3) 
        ON CONFLICT (aggregate_id) DO UPDATE SET version = $2, data = $3`,
        agg.ID(), agg.Version(), agg.Marshal())
}

生产参数:

  • 快照阈值:50-200 事件,根据聚合复杂度。
  • 存储压缩:GZIP JSONB,TTL 旧快照 7 天。
  • 命中率监控:>90% 查询使用快照。

高效投影:读模型构建

投影将事件转换为查询优化视图,支持 CQRS 读侧。使用 Kafka 或 Postgres 逻辑复制消费事件。

投影器实现

type Projection struct {
    db *sql.DB
    checkpoint int64 // 最后处理版本
}

func (p *Projection) Handle(e DomainEvent) error {
    switch ev := e.(type) {
    case Deposited:
        _, err := p.db.Exec("UPDATE account_balances SET balance = balance + $1 WHERE id = $2", ev.Amount, ev.AccountID)
    }
    // 更新 checkpoint
    return p.db.Exec("UPDATE projections_checkpoint SET position = $1", globalEventID)
}

模式:

  • 实时投影:Inline,命令事务内更新(低延迟)。
  • 异步投影:Daemon 进程 catch-up,HotCold 模式(高吞吐)。 参数:
  • 并行度:per-aggregate 流,10-50 workers。
  • 延迟阈值:<100ms (P99),滞后监控。
  • 幂等:事件 ID 唯一,UPSERT。

可扩展 CQRS 生产实践

  • 命令侧:gRPC/HTTP,验证 → LoadAggregate → ApplyCommand → AppendEvents → Publish (可选 Kafka)。
  • 查询侧:专用读 DB (ClickHouse/Vitess),投影订阅事件。
  • 监控要点:
    指标 阈值 告警
    事件追加延迟 <10ms >50ms
    投影滞后 <1s >10s
    重放事件数 <500 / 次 >1000
    快照命中率 >80% <70%
  • 回滚:事件补偿(反向事件),非删除。
  • 规模:单实例 10k QPS,多副本 + 分区 (by aggregate_id)。

此实现参考 skoredin.pro 事件溯源系列和 thefabric-io/eventsourcing 库,结合 Postgres 生产经验。实际部署时,集成 Watermill 或 Go-Kit 增强可靠性。

(正文字数:1256)

查看归档