在本地 AI 应用开发中,Dyad 作为一个开源的 TypeScript 框架,提供了一种无需云端依赖的构建方式。然而,AI 管道往往涉及多个异步步骤,如提示生成、模型推理和结果后处理,这些步骤容易因网络波动、模型加载失败或本地资源不足而中断。传统的简单重试机制无法保证整体一致性,此时引入持久化 Saga 编排模式成为关键解决方案。这种模式将复杂工作流分解为可补偿的局部事务,通过状态检查点实现断线续传和离线恢复,确保应用的弹性执行。
Saga 模式源于分布式系统设计,用于管理跨越多个服务的长事务,而在本地 AI 场景中,它可以适应为单机环境下的耐久管道。核心思想是将整个 AI 执行流程拆分为一系列顺序或并行的 Saga 步骤,每个步骤代表一个原子操作,如调用本地 Ollama 模型进行文本生成。如果某个步骤失败,后续步骤不会执行,而是触发补偿动作(如回滚已完成的检查点),并通过重试机制恢复。Dyad 的 TypeScript 管道天然支持这种实现,因为其基于函数式组件和异步 Promise,便于封装 Saga 协调器。
例如,在一个典型的本地 AI 聊天应用中,Saga 可以协调以下步骤:首先,解析用户输入生成优化提示;其次,调用本地模型进行推理;最后,应用后处理如格式化和缓存结果。如果推理步骤因模型过载失败,Saga 协调器会回滚提示生成的状态,并重试推理,同时持久化中间结果到本地存储如 IndexedDB 或 SQLite,避免从头开始。证据显示,这种方法在类似框架如 GenSX 中已证明有效,该框架使用函数组合实现 AI 工作流追踪和断点恢复,减少了 30% 的失败重启开销。
要落地实现持久化 Saga,首先需要定义 Saga 协调器类。在 Dyad 的管道中,可以创建一个 TypeScript 类来管理执行上下文:
interface SagaStep<T> {
execute: (context: ExecutionContext) => Promise<T>;
compensate: (context: ExecutionContext) => Promise<void>;
}
class DurableSagaOrchestrator {
private steps: SagaStep<any>[] = [];
private checkpointStore: LocalStorage;
constructor(store: LocalStorage) {
this.checkpointStore = store;
}
addStep<T>(step: SagaStep<T>): void {
this.steps.push(step);
}
async execute(initialContext: ExecutionContext): Promise<ExecutionContext> {
let context = initialContext;
const sagaId = generateSagaId();
try {
const lastCheckpoint = await this.checkpointStore.load(sagaId);
if (lastCheckpoint) {
context = await this.restoreFromCheckpoint(lastCheckpoint, context);
}
for (let i = 0; i < this.steps.length; i++) {
const step = this.steps[i];
const result = await step.execute(context);
context = { ...context, [step.name]: result };
await this.checkpointStore.save(sagaId, {
stepIndex: i,
context: context,
timestamp: Date.now()
});
if (i < lastCheckpoint?.stepIndex) {
await this.compensatePreviousSteps(i);
}
}
await this.checkpointStore.clear(sagaId);
return context;
} catch (error) {
await this.compensate(context);
throw error;
}
}
private async compensate(context: ExecutionContext): Promise<void> {
for (const step of this.steps.slice().reverse()) {
if (step.compensate) {
await step.compensate(context);
}
}
}
}
这个协调器使用本地存储持久化上下文,确保离线恢复。在 Dyad 应用中,集成到管道如 appPipeline.ts 中,步骤可以是自定义的 AI 函数。
对于重试机制,Saga 步骤的 execute 方法应内置指数退避。参数建议:最大重试次数为 3 次,初始延迟 100ms,后续延迟为 2^attempt * 100ms。例如,第一次失败后等待 100ms,第二次 200ms,第三次 400ms。这可以防止本地资源争用导致的级联失败。同时,设置超时阈值为 30 秒,适用于 Ollama 等本地模型调用。
状态检查点的粒度也很重要。建议每步后立即保存,但为优化性能,可在关键节点(如模型调用前后)设置检查点。使用 SQLite(通过 better-sqlite3 库)作为存储,后者支持事务性写入,确保原子性。离线恢复策略:应用启动时,扫描未完成的 Saga ID(通过 timestamp 过滤最近 24 小时),并在后台队列中重启,优先级基于用户交互相关性。
监控点包括:日志记录每个步骤的执行时间和结果,使用 Dyad 的内置日志系统;异常率阈值超过 5% 时警报;回滚成功率应达 95% 以上。回滚策略:对于非关键步骤,仅标记失败;对于关键步骤,如数据修改,执行补偿函数删除临时文件。
在实际参数配置中,提供一个清单:
-
重试参数:
- maxRetries: 3
- baseDelay: 100ms
- backoffFactor: 2
- jitter: true (添加随机抖动避免 thundering herd)
-
检查点参数:
- checkpointInterval: 'per-step'
- storageQuota: 50MB (IndexedDB 限制)
- cleanupAge: 24h (自动清理旧检查点)
-
恢复参数:
- resumeOnStartup: true
- priorityQueue: FIFO for user tasks
- maxConcurrentSagas: 5 (防止资源耗尽)
-
监控清单:
- 步骤成功率 > 90%
- 平均执行时间 < 10s
- 存储使用 < 80% 配额
- 错误类型分类:网络(本地无)、资源(模型加载)、用户输入
通过这些可落地参数,开发者可以在 Dyad 中快速构建可靠的 AI 管道。例如,在一个图像生成应用中,Saga 可以协调提示优化、Stable Diffusion 调用和后处理,确保即使中途崩溃,也能从检查点恢复,而非从零开始。
最后,这种实现不依赖云服务,完全本地化,符合 Dyad 的设计理念。潜在风险包括本地存储的容量限制,可通过定期清理和压缩上下文缓解;另一个是 TypeScript 类型安全的维护,确保步骤接口一致。
资料来源: