Hotdry.

把 AI Agent 的批处理任务(分片总结、夜间工单批改、embedding 回填)放进多个常驻 Worker 或 Kubernetes Job 并行消费时,最常见的两类失败是:同一业务键被两个进程各执行一次,以及Worker 崩溃后任务永远卡在「处理中」。消息中间件可以解决这个问题,但若团队已经把会话状态、工具去重表和审计日志放在 PostgreSQL 里,再引入 Kafka/RabbitMQ 往往只为「抢任务」多一套运维面。

PostgreSQL 自 9.5 起提供的 SELECT … FOR UPDATE SKIP LOCKED 被官方文档描述为:在并发事务已锁定部分行时,当前查询跳过这些行而不是阻塞等待。把「待处理任务行」与「租约截止时间」建模在同一张表上,可以在不新增中间件的前提下实现多 Worker 的 at-least-once 消费;再配合应用层幂等(见既有 MCP 工具去重实践),可以把重复交付收敛到可接受范围。

任务状态:pending 经 SKIP LOCKED 进入 leased,成功 ack 到 done,重试耗尽进入 dead,租约过期回到 pending
租约队列的最小状态机:leased 行在 locked_until 之前对其他 Worker 不可见。

问题背景:Deployment 与「抢行」语义不匹配

Agent 批任务与在线 API 的差异在于:

UPDATE … WHERE status = 'pending' LIMIT 1 不加行锁,会在并发下出现双写;用 FOR UPDATE 不加 SKIP LOCKED,Worker 会在锁上排队,吞吐随竞争加剧而下降。官方 SELECT 文档SKIP LOCKED 的说明是:已被锁定的行不会进入结果集,从而避免等待。

可落地实现:表结构、claim 事务与参数

1. 任务表(建议字段)

CREATE TABLE agent_work_queue (
  id            bigserial PRIMARY KEY,
  tenant_id     text NOT NULL,
  shard_key     text NOT NULL,
  payload       jsonb NOT NULL,
  status        text NOT NULL DEFAULT 'pending'
                CHECK (status IN ('pending','leased','done','dead')),
  run_after     timestamptz NOT NULL DEFAULT now(),
  locked_until  timestamptz,
  lease_owner   text,
  attempts      int NOT NULL DEFAULT 0,
  max_attempts  int NOT NULL DEFAULT 8,
  last_error    text,
  created_at    timestamptz NOT NULL DEFAULT now(),
  updated_at    timestamptz NOT NULL DEFAULT now()
);

CREATE INDEX agent_work_queue_claim_idx
  ON agent_work_queue (run_after, id)
  WHERE status = 'pending';

shard_key 用于把同一租户下的互斥任务串行化(例如同一仓库路径):claim 时在 WHERE 子句中排除「同 shard 已有 leased 且未过期」的行,或在业务层保证同 shard 不并行。索引使用部分索引(WHERE status = 'pending')是为减小热区 bloat;生产环境应监控该索引的扫描次数与表膨胀。

2. Claim:单事务内抢租约

推荐把「选中 + 标记 leased」放在同一事务,租约长度 visibility_timeout 与 Agent 单步超时对齐。

-- 参数示例(按环境调整)
-- visibility_timeout: 10 minutes
-- batch_claim: 1(Agent 通常一次只跑一个重任务)

BEGIN;

WITH picked AS (
  SELECT id
  FROM agent_work_queue
  WHERE status = 'pending'
    AND run_after <= now()
  ORDER BY run_after, id
  FOR UPDATE SKIP LOCKED
  LIMIT 1
)
UPDATE agent_work_queue q
SET status       = 'leased',
    lease_owner  = :worker_id,
    locked_until = now() + interval '10 minutes',
    attempts     = attempts + 1,
    updated_at   = now()
FROM picked
WHERE q.id = picked.id
RETURNING q.*;

COMMIT;

Worker 在拿到 RETURNING 行之后执行 Agent 运行时;不要在持有行锁时调用 LLM(长事务会拖住锁,虽然 leased 行已被更新,但会延长事务并影响连接池)。正确做法是:COMMIT 释放锁后再跑模型,靠 locked_until 与心跳续租保证独占。

3. 心跳续租(长任务必备)

UPDATE agent_work_queue
SET locked_until = now() + interval '10 minutes',
    updated_at   = now()
WHERE id = :task_id
  AND lease_owner = :worker_id
  AND status = 'leased'
  AND locked_until > now();

建议心跳间隔为 visibility_timeout / 3。若 visibility_timeout = 600s,则每 180–200s 续租一次。续租失败(返回 0 行)表示租约已丢失,Worker 应停止副作用并退出,避免双写。

4. 完成、失败与退避

-- 成功
UPDATE agent_work_queue
SET status = 'done', locked_until = NULL, lease_owner = NULL, updated_at = now()
WHERE id = :task_id AND lease_owner = :worker_id AND status = 'leased';

-- 可重试失败:回到 pending,指数退避写入 run_after
UPDATE agent_work_queue
SET status = 'pending',
    locked_until = NULL,
    lease_owner = NULL,
    run_after = now() + (interval '1 second' * least(3600, power(2, attempts))),
    last_error = :err,
    updated_at = now()
WHERE id = :task_id AND lease_owner = :worker_id AND status = 'leased'
  AND attempts < max_attempts;

-- 超过 max_attempts → dead(人工或死信消费者处理)

5. 推荐初始参数(8 核 DB、10 个 Agent Worker)

参数建议值说明
visibility_timeout600s应大于 P95 单任务时长;LLM 长尾任务可调到 1200–1800s
heartbeat_interval200s约为 visibility 的 1/3
max_attempts8与指数退避上限 3600s 配合,避免无限重试耗尽配额
worker_idPod 名 / hostname+pid写入 lease_owner 便于审计
连接池 statement_timeoutclaim: 5s;心跳: 3s防止 claim 轮询占满连接
空闲轮询200–500ms jitter无任务时 sleep,避免空转 SELECT

6. 与 Kubernetes Job 的配合

若 Worker 以 Job 形式运行,Job 的 parallelism 与队列 Worker 数应一致;分片键可映射为 JOB_COMPLETION_INDEX 取模,或统一从队列 claim(更简单)。Job 的 activeDeadlineSeconds大于 visibility_timeout + 缓冲,否则 Pod 被杀死时任务会在租约过期后重新 pending,属于预期的 at-least-once 行为,不是 exactly-once。

风险与边界

验收清单

参考来源