202510
systems

使用 PostgreSQL 实现 DBOS 风格的 Go 持久化工作流:容错重试与检查点

借鉴 DBOS 理念,在 Go 语言中构建容错工作流系统,利用 PostgreSQL 持久化状态,实现自动重试、检查点恢复和精确一次语义,支持分布式任务编排。

在分布式系统中,构建可靠的工作流是确保任务编排稳定性的关键。传统方法往往依赖内存状态或外部队列,但容易因故障丢失进度。借鉴 DBOS(Database-Oriented Operating System)的设计理念,我们可以在 Go 语言中实现一个使用 PostgreSQL 作为状态存储的持久化工作流系统。该系统支持自动重试、检查点机制和精确一次语义(exactly-once semantics),适用于支付处理、数据管道等场景。

为什么选择 PostgreSQL 持久化?

DBOS 的核心在于将所有状态持久化到数据库中,避免了操作系统层面的复杂性。在 Go 项目中,我们可以使用 PostgreSQL 作为单一真相来源(single source of truth)。PostgreSQL 支持 ACID 事务、行级锁和序列化隔离级别,能有效处理并发执行的工作流实例。

关键优势:

  • 容错性:故障时,从最近检查点恢复,无需从头重启。
  • 精确一次:通过唯一执行 ID 和幂等检查,避免重复执行。
  • 可扩展:支持水平扩展,结合连接池处理高并发。
  • 可观测:状态表可直接查询,用于监控和调试。

相比消息队列如 Kafka,PostgreSQL 简化了架构,无需额外组件。但需注意数据库瓶颈,通过索引和分区优化性能。

设置 PostgreSQL schema

首先,设计数据库 schema 来存储工作流状态。核心表包括:

  • workflows:存储工作流定义(名称、版本)。
  • executions:执行实例(ID、状态、当前步骤、输入/输出数据、创建/更新时间)。
  • steps:步骤日志(执行 ID、步骤名、输入/输出、状态、时间戳)。

示例 SQL 脚本(使用 pgx 驱动执行):

CREATE TABLE workflows (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255) UNIQUE NOT NULL,
    version INTEGER NOT NULL,
    definition JSONB NOT NULL  -- 步骤序列化
);

CREATE TABLE executions (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    workflow_id INTEGER REFERENCES workflows(id),
    status VARCHAR(50) NOT NULL,  -- pending, running, completed, failed
    current_step INTEGER,
    input_data JSONB,
    output_data JSONB,
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW()
);

CREATE TABLE steps (
    id SERIAL PRIMARY KEY,
    execution_id UUID REFERENCES executions(id) ON DELETE CASCADE,
    step_name VARCHAR(255) NOT NULL,
    step_index INTEGER NOT NULL,
    input_data JSONB,
    output_data JSONB,
    status VARCHAR(50) NOT NULL,  -- success, failed, retrying
    retry_count INTEGER DEFAULT 0,
    created_at TIMESTAMP DEFAULT NOW(),
    UNIQUE(execution_id, step_index)
);

-- 索引优化
CREATE INDEX idx_executions_status ON executions(status);
CREATE INDEX idx_steps_execution ON steps(execution_id);
CREATE INDEX idx_steps_status ON steps(status);

这些表确保每个步骤的原子性更新。使用 JSONB 存储灵活的数据,避免 schema 变更。

在 Go 中定义工作流结构

使用 Go 的结构体和接口定义工作流。核心组件:

  • Workflow:定义步骤序列。
  • Step:可执行单元,支持输入/输出。
  • Executor:管理执行、检查点和恢复。

示例代码:

package main

import (
    "context"
    "database/sql"
    "encoding/json"
    "errors"
    "fmt"
    "log"
    "time"

    "github.com/google/uuid"
    "github.com/jackc/pgx/v5/stdlib"  // PostgreSQL 驱动
    _ "github.com/lib/pq"
)

type Status string

const (
    Pending   Status = "pending"
    Running   Status = "running"
    Completed Status = "completed"
    Failed    Status = "failed"
)

type StepFunc func(ctx context.Context, input interface{}) (interface{}, error)

type Step struct {
    Name   string
    Index  int
    Func   StepFunc
    Retries int  // 最大重试次数
}

type Workflow struct {
    Name    string
    Version int
    Steps   []Step
}

type Execution struct {
    ID          uuid.UUID
    WorkflowID  int
    Status      Status
    CurrentStep int
    InputData   json.RawMessage
    OutputData  json.RawMessage
    db          *sql.DB
}

func NewExecution(w *Workflow, input interface{}, db *sql.DB) (*Execution, error) {
    inputBytes, _ := json.Marshal(input)
    execID := uuid.New()
    // 插入执行记录
    _, err := db.ExecContext(context.Background(),
        "INSERT INTO executions (id, workflow_id, status, input_data, current_step) VALUES ($1, $2, $3, $4, $5)",
        execID, w.WorkflowID, Running, inputBytes, 0)
    if err != nil {
        return nil, err
    }
    return &Execution{ID: execID, WorkflowID: w.WorkflowID, Status: Running, CurrentStep: 0, db: db}, nil
}

// 执行单个步骤并检查点
func (e *Execution) executeStep(ctx context.Context, step Step, input interface{}) error {
    stepInput, _ := json.Marshal(input)
    // 插入步骤记录(幂等)
    _, err := e.db.ExecContext(ctx,
        "INSERT INTO steps (execution_id, step_name, step_index, input_data, status) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (execution_id, step_index) DO NOTHING",
        e.ID, step.Name, step.Index, stepInput, Running)
    if err != nil {
        return err
    }

    output, err := step.Func(ctx, input)
    outputBytes, _ := json.Marshal(output)

    status := Completed
    if err != nil {
        status = Failed
        if e.getRetryCount(step) < step.Retries {
            // 自动重试逻辑
            time.Sleep(time.Second * time.Duration(1<<e.getRetryCount(step)))  // 指数退避
            return e.executeStep(ctx, step, input)  // 递归重试
        }
    }

    // 更新步骤和执行状态
    _, err = e.db.ExecContext(ctx,
        "UPDATE steps SET output_data = $1, status = $2 WHERE execution_id = $3 AND step_index = $4",
        outputBytes, status, e.ID, step.Index)
    if err != nil {
        return err
    }

    if status == Completed {
        e.CurrentStep = step.Index + 1
        _, err = e.db.ExecContext(ctx, "UPDATE executions SET current_step = $1, updated_at = NOW() WHERE id = $2", e.CurrentStep, e.ID)
    }
    return err
}

// 恢复执行
func (e *Execution) Recover(ctx context.Context, w *Workflow) error {
    // 查询当前步骤
    var current int
    err := e.db.QueryRowContext(ctx, "SELECT current_step FROM executions WHERE id = $1", e.ID).Scan(&current)
    if err != nil {
        return err
    }
    e.CurrentStep = current

    // 从检查点继续
    for i := e.CurrentStep; i < len(w.Steps); i++ {
        step := w.Steps[i]
        // 获取上一步输出作为输入
        prevOutput, err := e.getStepOutput(i - 1)
        if err != nil {
            return err
        }
        if err := e.executeStep(ctx, step, prevOutput); err != nil {
            // 更新执行状态为失败
            _, _ = e.db.ExecContext(ctx, "UPDATE executions SET status = $1 WHERE id = $2", Failed, e.ID)
            return err
        }
    }
    // 完成
    _, _ = e.db.ExecContext(ctx, "UPDATE executions SET status = $1, output_data = $2 WHERE id = $3", Completed, e.getFinalOutput(), e.ID)
    return nil
}

func (e *Execution) getRetryCount(step Step) int {
    var count int
    e.db.QueryRowContext(context.Background(), "SELECT retry_count FROM steps WHERE execution_id = $1 AND step_name = $2", e.ID, step.Name).Scan(&count)
    return count
}

func (e *Execution) getStepOutput(index int) interface{} {
    // 实现从 steps 表获取输出,解析 JSON
    // 简化示例,返回 nil
    return nil
}

func (e *Execution) getFinalOutput() json.RawMessage {
    // 从最后一步获取输出
    return nil
}

// 运行工作流
func (e *Execution) Run(ctx context.Context, w *Workflow) error {
    if e.Status == Running && e.CurrentStep > 0 {
        return e.Recover(ctx, w)
    }
    var prevOutput interface{} = e.InputData  // 初始输入
    for i, step := range w.Steps {
        if err := e.executeStep(ctx, step, prevOutput); err != nil {
            return err
        }
        prevOutput = e.getStepOutput(i)  // 更新为当前输出
    }
    return nil
}

实现示例:分布式任务编排

假设一个支付工作流:验证账户 → 扣款 → 发送通知。

func validateAccount(ctx context.Context, input map[string]interface{}) (map[string]interface{}, error) {
    // 模拟验证
    if input["balance"].(float64) < input["amount"].(float64) {
        return nil, errors.New("insufficient balance")
    }
    return map[string]interface{}{"valid": true}, nil
}

func deduct(ctx context.Context, input map[string]interface{}) (map[string]interface{}, error) {
    // 模拟扣款,幂等检查
    return map[string]interface{}{"deducted": input["amount"]}, nil
}

func notify(ctx context.Context, input map[string]interface{}) (map[string]interface{}, error) {
    log.Println("Notification sent")
    return map[string]interface{}{"sent": true}, nil
}

func main() {
    db, err := sql.Open("postgres", "postgres://user:pass@localhost/db?sslmode=disable")
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    w := &Workflow{
        Name:    "payment",
        Version: 1,
        Steps: []Step{
            {Name: "validate", Index: 0, Func: StepFunc(validateAccount), Retries: 3},
            {Name: "deduct", Index: 1, Func: StepFunc(deduct), Retries: 1},
            {Name: "notify", Index: 2, Func: StepFunc(notify), Retries: 0},
        },
    }

    // 假设已插入 workflow 到 DB,获取 ID
    w.WorkflowID = 1

    input := map[string]interface{}{"account_id": 123, "amount": 100.0, "balance": 200.0}
    exec, err := NewExecution(w, input, db)
    if err != nil {
        log.Fatal(err)
    }

    ctx := context.Background()
    if err := exec.Run(ctx, w); err != nil {
        log.Printf("Workflow failed: %v", err)
    } else {
        log.Println("Workflow completed")
    }
}

此示例中,validateAccount 若失败,会自动重试 3 次,使用指数退避。整个执行状态持久化到 DB,若进程崩溃,重启后从 current_step 恢复。

最佳实践与参数配置

  1. 幂等性:每个步骤使用唯一 ID 检查是否已执行,避免重复。
  2. 重试策略:配置最大重试(3-5 次)和退避(初始 1s,倍增至 1min)。
  3. 检查点频率:每个步骤后持久化,但对于长步骤,可分段检查点。
  4. 监控:使用 Prometheus 查询 executions 表,警报失败率 >5%。
  5. 回滚:失败时,使用事务回滚到上一步输出。
  6. 性能调优:连接池大小 20-50,批处理更新;分区表按日期。
  7. 安全:加密敏感数据,使用行级安全(RLS)。

风险:高频写入可能导致锁争用,建议读写分离或使用 CockroachDB 增强分布式能力。

结论

通过 PostgreSQL 实现 DBOS 风格的 Go 持久化工作流,我们获得了可靠的分布式任务编排能力。相比传统框架,此方法轻量且灵活,适用于微服务环境。实际部署中,结合 Kubernetes 运行多个 Executor 实例,实现负载均衡。未来,可扩展支持 Saga 模式处理分布式事务。

(字数:1256)