Hotdry.
systems-engineering

Postgres 表实现异步任务队列:可靠扫取、重试与分布式调度

基于 Postgres 表构建无额外依赖的异步任务系统,利用 FOR UPDATE SKIP LOCKED 实现分布式无锁摄取,支持优先级、重试和监控。

在现代后端系统中,异步任务处理是确保高可用性和用户体验的关键。引入专用队列如 Redis 或 RabbitMQ 会增加运维复杂度、数据一致性和成本开销。Postgres 作为核心数据库,可原生充当可靠的任务队列,利用其 ACID 事务、行级锁和通知机制,实现摄取、重试、监控和分布式调度。本文聚焦单一技术点:通过 “扫取”(sweep)模式,用表驱动 worker 高效处理任务,避免专用队列的额外开销。

核心观点是:Postgres 表的原子更新操作天然支持分布式 worker 协作。传统同步处理易导致 500 错误,而异步队列使容错成为默认设计。证据来自 Postgres 的 FOR UPDATE SKIP LOCKED 特性,它允许并发 worker 竞争任务时,失败者自动跳过已锁行,无需复杂协调。该模式已在生产环境中验证,如 Taylor Troesh 在 POSETTE 2024 分享的实践。

任务表 Schema 设计

设计一张 async_tasks 表,核心字段如下:

CREATE TABLE async_tasks (
    id BIGSERIAL PRIMARY KEY,
    task_type TEXT NOT NULL,  -- e.g., 'send_email', 'process_image'
    payload JSONB NOT NULL,   -- 任务数据
    status TEXT NOT NULL DEFAULT 'pending',  -- pending/processing/succeeded/failed/retrying
    priority INT DEFAULT 0,   -- 高优先级先处理,负值延迟
    scheduled_at TIMESTAMPTZ DEFAULT NOW(),
    attempts INT DEFAULT 0,
    max_attempts INT DEFAULT 5,
    last_error TEXT,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

-- 关键索引,避免全表扫
CREATE INDEX idx_tasks_status_scheduled ON async_tasks (status, scheduled_at, priority) WHERE status = 'pending';
CREATE INDEX idx_tasks_type ON async_tasks (task_type) WHERE status IN ('pending', 'retrying');

此 schema 支持优先级调度(ORDER BY priority DESC, scheduled_at ASC)和重试(status='retrying')。JSONB payload 灵活存储任务参数,减少表扩展。

入队(Enqueue)流程

应用层插入任务:

INSERT INTO async_tasks (task_type, payload, priority, scheduled_at)
VALUES ('send_email', '{"to": "user@example.com", "subject": "Welcome"}'::jsonb, 1, NOW() + INTERVAL '10 seconds');

可选结合 LISTEN/NOTIFY 实时通知 worker:

-- 插入后
NOTIFY task_channel, 'new_task';

Worker 监听频道,高效唤醒而非纯轮询。

Worker 扫取(Sweep)逻辑

分布式 worker 的核心查询,使用 FOR UPDATE SKIP LOCKED 原子认领:

BEGIN;
SELECT * FROM async_tasks 
WHERE status IN ('pending', 'retrying') 
  AND scheduled_at <= NOW()
ORDER BY priority DESC, scheduled_at ASC
LIMIT 10  -- 批处理
FOR UPDATE SKIP LOCKED;

-- 若找到,立即更新
UPDATE async_tasks SET 
    status = 'processing', 
    attempts = attempts + 1,
    updated_at = NOW()
WHERE id = <claimed_id>;

COMMIT;

解释:SKIP LOCKED 确保一个 worker 锁定时,其他跳过该行,继续下一个。无锁竞争,零饥饿。处理后:

-- 成功
UPDATE async_tasks SET status = 'succeeded', updated_at = NOW() WHERE id = ?;

-- 失败,重试
UPDATE async_tasks SET 
    status = CASE WHEN attempts >= max_attempts THEN 'failed' ELSE 'retrying' END,
    last_error = ?,
    scheduled_at = NOW() + (2 ^ attempts) * INTERVAL '1 minute',  -- 指数退避
    updated_at = NOW()
WHERE id = ?;

死信队列:定期移入 dead_tasks 表,手动干预。

可落地参数与清单

生产部署参数:

  • 轮询间隔:1-5 秒(结合 NOTIFY 降至毫秒级)。
  • 批大小:5-50,根据任务耗时调优(长任务小批)。
  • Worker 数:CPU 核数 * 2,避免 DB 过载。
  • 重试策略:max_attempts=5,退避 1min * 2^attempts,上限 1 小时。
  • 优先级:-1(延迟)、0(正常)、1+(紧急)。
  • 分区:队列超 1000w 行,按 task_type 或 created_at 分区。
  • TTL:cron 清理 succeeded > 7 天,VACUUM 优化。

监控要点清单:

  1. 积压:SELECT COUNT(*) FROM async_tasks WHERE status='pending'; > 阈值报警。
  2. 失败率:SELECT AVG(CASE WHEN status='failed' THEN 1 ELSE 0 END) FROM async_tasks WHERE created_at > NOW() - INTERVAL '1 hour';
  3. 吞吐:pg_stat_statements 监控扫取查询 QPS。
  4. 锁等待:pg_locks 检查 SKIP LOCKED 效率。
  5. 回滚:worker 崩溃用 idle_in_transaction_session_timeout=5min 自动回滚。
参数 推荐值 调优依据
poll_interval 2s 平衡延迟与 DB 负载
batch_size 20 任务 <1s 时最大化
max_retries 5 防无限循环
heartbeat 30s worker 存活检测

优势与风险缓解

相较 Redis:零额外组件,强一致性,无网络跳跃。vs Celery:简化依赖。

风险:HOL 阻塞 —— 用多队列(per type)缓解;DB 负载 —— 限 worker 数,读副本 offload 监控。

实际参数落地:从小队列测试,渐增负载,观察 pg_stat_activity

资料来源:Taylor Troesh POSETTE 2024 演讲 “sweep async tasks under Postgres table”;Postgres 文档 FOR UPDATE SKIP LOCKED。

(正文字数约 1250)

查看归档