把 AI Agent 的批处理任务(分片总结、夜间工单批改、embedding 回填)放进多个常驻 Worker 或 Kubernetes Job 并行消费时,最常见的两类失败是:同一业务键被两个进程各执行一次,以及Worker 崩溃后任务永远卡在「处理中」。消息中间件可以解决这个问题,但若团队已经把会话状态、工具去重表和审计日志放在 PostgreSQL 里,再引入 Kafka/RabbitMQ 往往只为「抢任务」多一套运维面。
PostgreSQL 自 9.5 起提供的 SELECT … FOR UPDATE SKIP LOCKED 被官方文档描述为:在并发事务已锁定部分行时,当前查询跳过这些行而不是阻塞等待。把「待处理任务行」与「租约截止时间」建模在同一张表上,可以在不新增中间件的前提下实现多 Worker 的 at-least-once 消费;再配合应用层幂等(见既有 MCP 工具去重实践),可以把重复交付收敛到可接受范围。
locked_until 之前对其他 Worker 不可见。问题背景:Deployment 与「抢行」语义不匹配
Agent 批任务与在线 API 的差异在于:
- 执行时长不确定:多轮
tools/call叠加后,单次任务从数十秒到数十分钟都可能出现。 - 副作用不可随便重试:写数据库、发邮件、调用计费 API 在传输层重试下会重复执行。
- 分片并行:Indexed Job 或固定数量 Worker 需要一种「谁拿到行谁处理」的协调方式,而不是靠应用内存里的互斥锁。
用 UPDATE … WHERE status = 'pending' LIMIT 1 不加行锁,会在并发下出现双写;用 FOR UPDATE 不加 SKIP LOCKED,Worker 会在锁上排队,吞吐随竞争加剧而下降。官方 SELECT 文档 对 SKIP LOCKED 的说明是:已被锁定的行不会进入结果集,从而避免等待。
可落地实现:表结构、claim 事务与参数
1. 任务表(建议字段)
CREATE TABLE agent_work_queue (
id bigserial PRIMARY KEY,
tenant_id text NOT NULL,
shard_key text NOT NULL,
payload jsonb NOT NULL,
status text NOT NULL DEFAULT 'pending'
CHECK (status IN ('pending','leased','done','dead')),
run_after timestamptz NOT NULL DEFAULT now(),
locked_until timestamptz,
lease_owner text,
attempts int NOT NULL DEFAULT 0,
max_attempts int NOT NULL DEFAULT 8,
last_error text,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX agent_work_queue_claim_idx
ON agent_work_queue (run_after, id)
WHERE status = 'pending';
shard_key 用于把同一租户下的互斥任务串行化(例如同一仓库路径):claim 时在 WHERE 子句中排除「同 shard 已有 leased 且未过期」的行,或在业务层保证同 shard 不并行。索引使用部分索引(WHERE status = 'pending')是为减小热区 bloat;生产环境应监控该索引的扫描次数与表膨胀。
2. Claim:单事务内抢租约
推荐把「选中 + 标记 leased」放在同一事务,租约长度 visibility_timeout 与 Agent 单步超时对齐。
-- 参数示例(按环境调整)
-- visibility_timeout: 10 minutes
-- batch_claim: 1(Agent 通常一次只跑一个重任务)
BEGIN;
WITH picked AS (
SELECT id
FROM agent_work_queue
WHERE status = 'pending'
AND run_after <= now()
ORDER BY run_after, id
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE agent_work_queue q
SET status = 'leased',
lease_owner = :worker_id,
locked_until = now() + interval '10 minutes',
attempts = attempts + 1,
updated_at = now()
FROM picked
WHERE q.id = picked.id
RETURNING q.*;
COMMIT;
Worker 在拿到 RETURNING 行之后执行 Agent 运行时;不要在持有行锁时调用 LLM(长事务会拖住锁,虽然 leased 行已被更新,但会延长事务并影响连接池)。正确做法是:COMMIT 释放锁后再跑模型,靠 locked_until 与心跳续租保证独占。
3. 心跳续租(长任务必备)
UPDATE agent_work_queue
SET locked_until = now() + interval '10 minutes',
updated_at = now()
WHERE id = :task_id
AND lease_owner = :worker_id
AND status = 'leased'
AND locked_until > now();
建议心跳间隔为 visibility_timeout / 3。若 visibility_timeout = 600s,则每 180–200s 续租一次。续租失败(返回 0 行)表示租约已丢失,Worker 应停止副作用并退出,避免双写。
4. 完成、失败与退避
-- 成功
UPDATE agent_work_queue
SET status = 'done', locked_until = NULL, lease_owner = NULL, updated_at = now()
WHERE id = :task_id AND lease_owner = :worker_id AND status = 'leased';
-- 可重试失败:回到 pending,指数退避写入 run_after
UPDATE agent_work_queue
SET status = 'pending',
locked_until = NULL,
lease_owner = NULL,
run_after = now() + (interval '1 second' * least(3600, power(2, attempts))),
last_error = :err,
updated_at = now()
WHERE id = :task_id AND lease_owner = :worker_id AND status = 'leased'
AND attempts < max_attempts;
-- 超过 max_attempts → dead(人工或死信消费者处理)
5. 推荐初始参数(8 核 DB、10 个 Agent Worker)
| 参数 | 建议值 | 说明 |
|---|---|---|
visibility_timeout | 600s | 应大于 P95 单任务时长;LLM 长尾任务可调到 1200–1800s |
heartbeat_interval | 200s | 约为 visibility 的 1/3 |
max_attempts | 8 | 与指数退避上限 3600s 配合,避免无限重试耗尽配额 |
worker_id | Pod 名 / hostname+pid | 写入 lease_owner 便于审计 |
连接池 statement_timeout | claim: 5s;心跳: 3s | 防止 claim 轮询占满连接 |
| 空闲轮询 | 200–500ms jitter | 无任务时 sleep,避免空转 SELECT |
6. 与 Kubernetes Job 的配合
若 Worker 以 Job 形式运行,Job 的 parallelism 与队列 Worker 数应一致;分片键可映射为 JOB_COMPLETION_INDEX 取模,或统一从队列 claim(更简单)。Job 的 activeDeadlineSeconds 应大于 visibility_timeout + 缓冲,否则 Pod 被杀死时任务会在租约过期后重新 pending,属于预期的 at-least-once 行为,不是 exactly-once。
风险与边界
- 语义是 at-least-once,不是 exactly-once:Worker 在副作用已发生但
ack前崩溃,任务会再次变为 pending。写工具必须配合幂等键或去重表(业务层责任,数据库队列不提供保证)。 - 时钟与
now():租约依赖数据库时间;应用服务器与 DB 时钟明显漂移时,可能出现过早或过晚的重复 claim。应使用 NTP,并在监控中对比应用与 DB 的时钟偏移。 - 热表与 VACUUM:高 churn 的
pending ↔ leased更新会产生死元组;需关注n_dead_tup、autovacuum 是否跟得上。极高吞吐场景应评估专用队列(如 Graphile Worker、PgQue)而非自研轮询。 SKIP LOCKED不排序公平性:ORDER BY run_after, id在并发下不保证全局严格 FIFO,只保证「可用行」中的优先顺序;若公平性敏感,需引入分区或单消费者 shard。- 长 LLM 调用不要放在事务里:claim 事务应毫秒级结束;把模型推理包在数据库事务外,否则连接池与锁持有时间会成为瓶颈。
- 多租户隔离:队列表应配合 RLS 或至少
tenant_id强制过滤;Agent 工具层误写tenant_id会导致跨租户任务被 claim(应用与策略层双重校验)。
验收清单
- 并发启动 N 个 Worker,同一批任务最终
done计数等于入队数(允许attempts > 1)。 - 模拟 kill -9 Worker:
locked_until过期后任务重新被 claim,且lease_owner变更可审计。 - 模拟续租失败:Worker 在副作用前自检租约,不继续写外部系统。
- 监控:队列深度(
status=pending)、租约过期重投率、dead比例、claim 事务 P99 延迟。
参考来源
- PostgreSQL 文档:SELECT — FOR UPDATE / SKIP LOCKED
- PostgreSQL 文档:Concurrency Control (MVCC)
- PostgreSQL 文档:UPDATE(与行级锁、事务边界)
- Graphile Worker 文档:Job queue design notes(生产级 PG 队列的租约与重试模型,可与本文模式对照)
- RFC 9110:HTTP Semantics(幂等方法与客户端重试语义,用于对齐 Agent 写工具)