事件溯源(Event Sourcing)结合 CQRS(Command Query Responsibility Segregation)是构建高可扩展系统架构的核心模式,尤其适用于复杂领域如金融、电商订单系统。在 Go 语言中实现生产级事件溯源,需要关注持久化追加式事件存储、聚合根重建、高效投影构建以及快照优化。本文聚焦单一技术栈:使用 PostgreSQL 作为事件存储,提供完整代码框架、可落地参数和监控策略,确保系统在高并发下稳定运行。
事件存储设计:追加式持久化
事件存储是事件溯源的核心,必须支持原子追加、高吞吐和版本控制。推荐使用 PostgreSQL 的 JSONB 类型存储事件数据,支持索引和事务。
数据库 Schema
CREATE TABLE events (
id BIGSERIAL PRIMARY KEY,
aggregate_id UUID NOT NULL,
type VARCHAR(255) NOT NULL,
data JSONB NOT NULL,
version BIGINT NOT NULL,
timestamp TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_events_aggregate_id_version ON events (aggregate_id, version);
CREATE INDEX idx_events_aggregate_id_type ON events (aggregate_id, type);
aggregate_id:聚合根 ID,按流分组事件。version:乐观并发版本号,追加时检查MAX(version) + 1。data:事件负载,如{"amount": 100}。
追加事件 API
使用 sqlc 或 pgx 实现追加逻辑:
type Event struct {
ID int64
AggregateID uuid.UUID
Type string
Data json.RawMessage
Version int64
Timestamp time.Time
}
func (es *EventStore) Append(ctx context.Context, aggregateID uuid.UUID, expectedVersion int64, events ...DomainEvent) error {
tx, err := es.db.BeginTx(ctx, nil)
if err != nil { return err }
defer tx.Rollback(ctx)
var maxVer int64
err = tx.QueryRow(ctx, "SELECT COALESCE(MAX(version), 0) FROM events WHERE aggregate_id = $1", aggregateID).Scan(&maxVer)
if err != nil { return err }
if maxVer != expectedVersion { return ErrConcurrency }
for i, ev := range events {
_, err := tx.Exec(ctx, `
INSERT INTO events (aggregate_id, type, data, version)
VALUES ($1, $2, $3, $4)`, aggregateID, ev.Type(), ev.Marshal(), maxVer+int64(i+1))
if err != nil { return err }
}
return tx.Commit(ctx)
}
生产参数:
- 批追加阈值:5-10 个事件 / 批,降低锁竞争。
- 超时:500ms,结合 context。
- 监控:追加 QPS > 1000/s,错误率 < 0.1%。
聚合根重建:事件重放
聚合根不存储状态,仅通过事件重放计算。示例:银行账户聚合。
type Account struct {
ID uuid.UUID
Balance int64
Version int64
changes []DomainEvent // 变更事件
}
func (a *Account) Apply(e DomainEvent) {
switch ev := e.(type) {
case Deposited: a.Balance += ev.Amount
case Withdrawn: a.Balance -= ev.Amount
}
}
func (es *EventStore) LoadAggregate(ctx context.Context, id uuid.UUID, snapshotVersion int64) (*Account, error) {
agg := &Account{ID: id}
// 加载快照(若存在)
snapshot, err := es.LoadSnapshot(ctx, id, snapshotVersion)
if err == nil {
agg.Unmarshal(snapshot.Data)
snapshotVersion = snapshot.Version
}
// 加载后续事件并重放
rows, err := es.db.Query(ctx, `
SELECT type, data FROM events
WHERE aggregate_id = $1 AND version > $2
ORDER BY version`, id, snapshotVersion)
// ... 扫描并 agg.Apply(ev)
return agg, nil
}
- 重放上限:1000 事件 / 次,超阈使用快照。
- 风险:事件模式演化,使用 Upcaster 接口转换旧事件。
快照策略:性能优化
快照定期持久化聚合状态,减少重放开销。
CREATE TABLE snapshots (
aggregate_id UUID PRIMARY KEY,
version BIGINT NOT NULL,
data JSONB NOT NULL,
timestamp TIMESTAMPTZ DEFAULT NOW()
);
快照规则:每 100 事件或 1 小时生成一次。
func (es *EventStore) SaveSnapshot(ctx context.Context, agg Aggregate) error {
return es.db.Exec(ctx, `
INSERT INTO snapshots (aggregate_id, version, data)
VALUES ($1, $2, $3)
ON CONFLICT (aggregate_id) DO UPDATE SET version = $2, data = $3`,
agg.ID(), agg.Version(), agg.Marshal())
}
生产参数:
- 快照阈值:50-200 事件,根据聚合复杂度。
- 存储压缩:GZIP JSONB,TTL 旧快照 7 天。
- 命中率监控:>90% 查询使用快照。
高效投影:读模型构建
投影将事件转换为查询优化视图,支持 CQRS 读侧。使用 Kafka 或 Postgres 逻辑复制消费事件。
投影器实现
type Projection struct {
db *sql.DB
checkpoint int64 // 最后处理版本
}
func (p *Projection) Handle(e DomainEvent) error {
switch ev := e.(type) {
case Deposited:
_, err := p.db.Exec("UPDATE account_balances SET balance = balance + $1 WHERE id = $2", ev.Amount, ev.AccountID)
}
// 更新 checkpoint
return p.db.Exec("UPDATE projections_checkpoint SET position = $1", globalEventID)
}
模式:
- 实时投影:Inline,命令事务内更新(低延迟)。
- 异步投影:Daemon 进程 catch-up,HotCold 模式(高吞吐)。 参数:
- 并行度:per-aggregate 流,10-50 workers。
- 延迟阈值:<100ms (P99),滞后监控。
- 幂等:事件 ID 唯一,UPSERT。
可扩展 CQRS 生产实践
- 命令侧:gRPC/HTTP,验证 → LoadAggregate → ApplyCommand → AppendEvents → Publish (可选 Kafka)。
- 查询侧:专用读 DB (ClickHouse/Vitess),投影订阅事件。
- 监控要点:
指标 阈值 告警 事件追加延迟 <10ms >50ms 投影滞后 <1s >10s 重放事件数 <500 / 次 >1000 快照命中率 >80% <70% - 回滚:事件补偿(反向事件),非删除。
- 规模:单实例 10k QPS,多副本 + 分区 (by aggregate_id)。
此实现参考 skoredin.pro 事件溯源系列和 thefabric-io/eventsourcing 库,结合 Postgres 生产经验。实际部署时,集成 Watermill 或 Go-Kit 增强可靠性。
(正文字数:1256)