在现代后端系统中,异步任务处理是确保高可用性和用户体验的关键。引入专用队列如 Redis 或 RabbitMQ 会增加运维复杂度、数据一致性和成本开销。Postgres 作为核心数据库,可原生充当可靠的任务队列,利用其 ACID 事务、行级锁和通知机制,实现摄取、重试、监控和分布式调度。本文聚焦单一技术点:通过 “扫取”(sweep)模式,用表驱动 worker 高效处理任务,避免专用队列的额外开销。
核心观点是:Postgres 表的原子更新操作天然支持分布式 worker 协作。传统同步处理易导致 500 错误,而异步队列使容错成为默认设计。证据来自 Postgres 的 FOR UPDATE SKIP LOCKED 特性,它允许并发 worker 竞争任务时,失败者自动跳过已锁行,无需复杂协调。该模式已在生产环境中验证,如 Taylor Troesh 在 POSETTE 2024 分享的实践。
任务表 Schema 设计
设计一张 async_tasks 表,核心字段如下:
CREATE TABLE async_tasks (
id BIGSERIAL PRIMARY KEY,
task_type TEXT NOT NULL, -- e.g., 'send_email', 'process_image'
payload JSONB NOT NULL, -- 任务数据
status TEXT NOT NULL DEFAULT 'pending', -- pending/processing/succeeded/failed/retrying
priority INT DEFAULT 0, -- 高优先级先处理,负值延迟
scheduled_at TIMESTAMPTZ DEFAULT NOW(),
attempts INT DEFAULT 0,
max_attempts INT DEFAULT 5,
last_error TEXT,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- 关键索引,避免全表扫
CREATE INDEX idx_tasks_status_scheduled ON async_tasks (status, scheduled_at, priority) WHERE status = 'pending';
CREATE INDEX idx_tasks_type ON async_tasks (task_type) WHERE status IN ('pending', 'retrying');
此 schema 支持优先级调度(ORDER BY priority DESC, scheduled_at ASC)和重试(status='retrying')。JSONB payload 灵活存储任务参数,减少表扩展。
入队(Enqueue)流程
应用层插入任务:
INSERT INTO async_tasks (task_type, payload, priority, scheduled_at)
VALUES ('send_email', '{"to": "user@example.com", "subject": "Welcome"}'::jsonb, 1, NOW() + INTERVAL '10 seconds');
可选结合 LISTEN/NOTIFY 实时通知 worker:
-- 插入后
NOTIFY task_channel, 'new_task';
Worker 监听频道,高效唤醒而非纯轮询。
Worker 扫取(Sweep)逻辑
分布式 worker 的核心查询,使用 FOR UPDATE SKIP LOCKED 原子认领:
BEGIN;
SELECT * FROM async_tasks
WHERE status IN ('pending', 'retrying')
AND scheduled_at <= NOW()
ORDER BY priority DESC, scheduled_at ASC
LIMIT 10 -- 批处理
FOR UPDATE SKIP LOCKED;
-- 若找到,立即更新
UPDATE async_tasks SET
status = 'processing',
attempts = attempts + 1,
updated_at = NOW()
WHERE id = <claimed_id>;
COMMIT;
解释:SKIP LOCKED 确保一个 worker 锁定时,其他跳过该行,继续下一个。无锁竞争,零饥饿。处理后:
-- 成功
UPDATE async_tasks SET status = 'succeeded', updated_at = NOW() WHERE id = ?;
-- 失败,重试
UPDATE async_tasks SET
status = CASE WHEN attempts >= max_attempts THEN 'failed' ELSE 'retrying' END,
last_error = ?,
scheduled_at = NOW() + (2 ^ attempts) * INTERVAL '1 minute', -- 指数退避
updated_at = NOW()
WHERE id = ?;
死信队列:定期移入 dead_tasks 表,手动干预。
可落地参数与清单
生产部署参数:
- 轮询间隔:1-5 秒(结合 NOTIFY 降至毫秒级)。
- 批大小:5-50,根据任务耗时调优(长任务小批)。
- Worker 数:CPU 核数 * 2,避免 DB 过载。
- 重试策略:max_attempts=5,退避 1min * 2^attempts,上限 1 小时。
- 优先级:-1(延迟)、0(正常)、1+(紧急)。
- 分区:队列超 1000w 行,按 task_type 或 created_at 分区。
- TTL:cron 清理 succeeded > 7 天,VACUUM 优化。
监控要点清单:
- 积压:
SELECT COUNT(*) FROM async_tasks WHERE status='pending';> 阈值报警。 - 失败率:
SELECT AVG(CASE WHEN status='failed' THEN 1 ELSE 0 END) FROM async_tasks WHERE created_at > NOW() - INTERVAL '1 hour'; - 吞吐:
pg_stat_statements监控扫取查询 QPS。 - 锁等待:
pg_locks检查 SKIP LOCKED 效率。 - 回滚:worker 崩溃用
idle_in_transaction_session_timeout=5min自动回滚。
| 参数 | 推荐值 | 调优依据 |
|---|---|---|
| poll_interval | 2s | 平衡延迟与 DB 负载 |
| batch_size | 20 | 任务 <1s 时最大化 |
| max_retries | 5 | 防无限循环 |
| heartbeat | 30s | worker 存活检测 |
优势与风险缓解
相较 Redis:零额外组件,强一致性,无网络跳跃。vs Celery:简化依赖。
风险:HOL 阻塞 —— 用多队列(per type)缓解;DB 负载 —— 限 worker 数,读副本 offload 监控。
实际参数落地:从小队列测试,渐增负载,观察 pg_stat_activity。
资料来源:Taylor Troesh POSETTE 2024 演讲 “sweep async tasks under Postgres table”;Postgres 文档 FOR UPDATE SKIP LOCKED。
(正文字数约 1250)