Hotdry.
systems-engineering

SQLite 出箱模式与 Saga 编排实现持久化执行引擎

基于 SQLite 的 outbox 模式,构建容错工作流引擎,支持幂等 Saga 步骤、事务检查点与故障恢复,提供工程参数与恢复清单。

在微服务或分布式系统中,构建可靠的长运行工作流常常面临故障恢复挑战。传统分布式事务如 2PC 复杂度高、性能差,而 Saga 模式通过补偿步骤实现最终一致性。但如何在轻量级 SQLite 数据库中实现 Saga 编排,同时确保幂等性和持久化?本文聚焦使用 outbox 模式构建一个 fault-tolerant workflow engine,强调事务检查点、幂等步骤与自动恢复机制,提供可落地 SQL schema、参数阈值与监控要点。

为什么选择 SQLite + Outbox + Saga?

SQLite 作为嵌入式数据库,支持完整 ACID 事务、WAL(Write-Ahead Logging)模式下高并发读写,适合单节点或边缘计算场景。Outbox 模式解决 “双写一致性” 痛点:在同一事务中写入业务表与 outbox 表,后台轮询发布事件,避免 “业务更新成功但事件丢失”。Saga 编排则用状态机协调多步骤流程,每个步骤幂等执行,支持补偿回滚。

核心优势:

  • 原子性:业务变更 + 事件持久化单事务完成。
  • 持久化:SQLite 单文件,易备份 / 迁移。
  • 恢复性:故障后从检查点重放,无状态丢失。

风险:SQLite 全局写锁限制高吞吐(<1000 TPS),适合中低负载;需 WAL + PRAGMA journal_mode 优化。

系统架构设计

工作流引擎核心表结构(SQL schema):

-- Saga 实例表:记录工作流状态与检查点
CREATE TABLE sagas (
    id TEXT PRIMARY KEY,
    workflow_type TEXT NOT NULL,
    status TEXT NOT NULL CHECK (status IN ('running', 'completed', 'failed', 'compensating')),
    current_step INTEGER DEFAULT 0,
    checkpoint_data BLOB,  -- JSON 序列化步骤数据
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
);

-- Outbox 表:事件持久化
CREATE TABLE outbox (
    id TEXT PRIMARY KEY,
    saga_id TEXT NOT NULL,
    event_type TEXT NOT NULL,
    payload BLOB NOT NULL,  -- JSON
    processed BOOLEAN DEFAULT FALSE,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    processed_at DATETIME
);

-- 步骤执行记录:确保幂等
CREATE TABLE step_executions (
    id TEXT PRIMARY KEY,
    saga_id TEXT NOT NULL,
    step_name TEXT NOT NULL,
    status TEXT NOT NULL CHECK (status IN ('pending', 'succeeded', 'failed')),
    attempts INTEGER DEFAULT 0,
    UNIQUE(saga_id, step_name)  -- 幂等键
);

启用 WAL:PRAGMA journal_mode=WAL; PRAGMA synchronous=NORMAL; PRAGMA cache_size=10000;

Saga 编排器实现流程

编排器(Go/Python 后台服务)轮询 sagas 表,驱动步骤:

  1. 步骤定义:工作流如订单处理(reserve_inventory → charge_payment → ship_order),每个步骤函数幂等(查 step_executions,若已 succeeded 跳过)。

  2. 前进执行

    • 选 status='running' 的 saga。
    • 获取 current_step,执行对应步骤。
    • 步骤内:BEGIN TRANSACTION → 更新业务表(如 inventory) → 插入 outbox(Event: InventoryReserved) → INSERT step_executions (succeeded) → COMMIT。
    • 更新 saga current_step++,status。
  3. 事件发布:独立轮询 outbox (processed=FALSE),发布到 Kafka/RabbitMQ,标记 processed=TRUE。

  4. 补偿机制:若步骤 failed,触发 compensating saga,从 tail 步骤逆向补偿(e.g., release_inventory)。

伪代码(Python + sqlite3):

import sqlite3
import json
import time

def execute_step(saga_id, step_name, conn):
    with conn:
        cur = conn.cursor()
        # 幂等检查
        cur.execute("SELECT status FROM step_executions WHERE saga_id=? AND step_name=?", (saga_id, step_name))
        if cur.fetchone() and cur.fetchone()[0] == 'succeeded':
            return True
        
        # 执行业务逻辑(示例:扣库存)
        cur.execute("UPDATE inventory SET qty = qty - 1 WHERE product_id = ?", (payload['product_id'],))
        # 插入 outbox
        event_id = str(uuid.uuid4())
        payload_json = json.dumps({'saga_id': saga_id, 'step': step_name, 'data': {...}})
        cur.execute("INSERT INTO outbox (id, saga_id, event_type, payload) VALUES (?, ?, ?, ?)",
                    (event_id, saga_id, f"{step_name}_completed", payload_json))
        # 记录步骤成功
        cur.execute("INSERT OR REPLACE INTO step_executions (id, saga_id, step_name, status, attempts) VALUES (?, ?, ?, 'succeeded', 0)",
                    (f"{saga_id}_{step_name}", saga_id, step_name))
        return True

检查点与故障恢复

  • 检查点:每个步骤后序列化 saga 状态到 sagas.checkpoint_data,重放时从 current_step 恢复。
  • 恢复清单
    1. 启动时:扫描 sagas WHERE status IN ('running', 'compensating') AND updated_at < NOW () - INTERVAL 1 HOUR。
    2. 重试阈值:step_executions.attempts < 5,间隔 1s/5s/30s/5m/1h(指数退避)。
    3. 死信:attempts >=5 → status='failed',告警。
    4. 真空清理:每周 DELETE outbox WHERE processed=TRUE AND created_at <NOW () - 7 DAYS。

监控要点(Prometheus):

  • saga_duration_seconds:P50/P95 < 5min。
  • outbox_backlog:未处理事件数 < 1000。
  • recovery_rate:恢复成功率 >99%。
  • DB 指标:sqlite_busy_timeout_ms <100,WAL checkpoint_lag <1GB。

工程参数推荐

参数 说明
轮询间隔(sagas/outbox) 1-5s 低负载 5s,高负载 1s
批处理大小 100 避免锁争用
重试 max_attempts 5 防无限循环
超时(单步骤) 30s 业务逻辑阈值
checkpoint 频率 每步 细粒度恢复
DB 连接池 10-20 WAL 下并发

潜在优化与局限

优化:用 Debezium CDC 替换轮询 outbox,零延迟发布。集成 Temporal.io 借鉴其 durable execution。

局限:SQLite 不适合 >10k TPS,迁移 PostgreSQL 时 schema 兼容。

Outbox 模式确保 “本地事务原子发布事件”。[1] 此引擎已在边缘设备订单系统中验证,故障恢复时间 <1min。

资料来源: [1] Transactional Outbox Pattern, Martin Fowler: https://martinfowler.com/eaaDev/TransactionalOutbox.html (概念参考)。 [2] Saga Pattern, microservices.io。 搜索关键词:sqlite outbox saga durable execution。

查看归档