在现代分布式系统中,异步任务处理是提升系统弹性和用户体验的关键机制。传统方案如 Redis、RabbitMQ 或专用队列服务虽高效,但引入额外组件会增加运维复杂度与单点故障风险。Postgres 作为关系型数据库,已内置 ACID 事务、JSONB 支持与并发控制,能直接充当耐久任务队列。本文聚焦一种 “扫荡式”(sweeping)实现:通过事务插入任务至表中,多 worker 并发扫取 pending 任务,利用 FOR UPDATE SKIP LOCKED 实现无锁并发、幂等去重与精确一次语义。
为什么选择 Postgres 扫荡任务?
相比外部队列,Postgres 方案的优势显而易见:
- 耐久性:任务持久化于 WAL,支持崩溃恢复,无需额外复制。
- 事务一致:插入与更新原子,支持 fan-out(多副本任务)。
- 去重与幂等:唯一索引 + 条件插入。
- 精确一次:
SKIP LOCKED避免双执行,状态机防重试乱序。 - 查询友好:SQL 直查任务状态、优先级、统计,便于监控。
- 零额外依赖:纯 SQL + 轻量 worker(如 Go/Python 脚本)。
缺点:高吞吐场景下可能热点(可用分区 / 索引缓解);非内存队列,延迟稍高(毫秒级)。
典型场景:用户注册后发邮件 / SMS、批量数据处理、定时报表生成。
核心表结构设计
设计 tasks 表,支持状态机与扩展:
CREATE TABLE tasks (
id BIGSERIAL PRIMARY KEY,
dedup_key TEXT UNIQUE, -- 幂等去重键,如 user_id:hash(payload)
type TEXT NOT NULL, -- 任务类型: 'send_email', 'process_image'
priority INT DEFAULT 1 CHECK (priority BETWEEN 1 AND 10), -- 优先级
payload JSONB NOT NULL, -- 任务数据: {"user_id":123, "email":"a@b.com"}
status TEXT NOT NULL DEFAULT 'pending' CHECK (status IN ('pending', 'processing', 'succeeded', 'failed', 'cancelled')),
retry_count INT DEFAULT 0,
max_retries INT DEFAULT 3,
error_msg TEXT,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
started_at TIMESTAMPTZ,
finished_at TIMESTAMPTZ
);
CREATE INDEX idx_tasks_status_priority_created ON tasks (status, priority DESC, created_at) WHERE status = 'pending';
CREATE INDEX idx_tasks_dedup ON tasks (dedup_key) WHERE dedup_key IS NOT NULL;
CREATE INDEX idx_tasks_type ON tasks (type);
-- 触发器自动更新 timestamp
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ language 'plpgsql';
CREATE TRIGGER update_tasks_updated_at BEFORE UPDATE ON tasks
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();
dedup_key:可选,计算如sha256(user_id || payload::text)防重复插入。- 索引:复合索引优先扫低优先级旧任务;
WHERE status='pending'为部分索引,节省空间。 - JSONB:灵活 payload,支持
payload @> '{"key":"value"}'查询。
任务插入:支持 Fan-out 与幂等
插入函数,确保精确一次 enqueue:
CREATE OR REPLACE FUNCTION enqueue_task(
p_type TEXT,
p_payload JSONB,
p_dedup_key TEXT DEFAULT NULL,
p_priority INT DEFAULT 1,
p_max_retries INT DEFAULT 3
) RETURNS BIGINT AS $$
DECLARE
new_id BIGINT;
BEGIN
-- 幂等检查
IF p_dedup_key IS NOT NULL THEN
INSERT INTO tasks (dedup_key, type, payload, priority, max_retries)
VALUES (p_dedup_key, p_type, p_payload, p_priority, p_max_retries)
ON CONFLICT (dedup_key) DO NOTHING
RETURNING id INTO new_id;
IF new_id IS NULL THEN
-- 已存在,返回其 ID(可选重置 retry)
SELECT id INTO new_id FROM tasks WHERE dedup_key = p_dedup_key;
UPDATE tasks SET retry_count=0 WHERE id = new_id;
END IF;
ELSE
INSERT INTO tasks (type, payload, priority, max_retries)
VALUES (p_type, p_payload, p_priority, p_max_retries)
RETURNING id INTO new_id;
END IF;
RETURN new_id;
END;
$$ LANGUAGE plpgsql;
使用:SELECT enqueue_task('send_email', '{"to":"user@example.com"}', 'dedup:user123');
Fan-out:循环调用插入多份,带不同 type 或 priority。
Worker 扫荡:并发精确执行
Worker(多进程 / 线程)循环执行扫荡 SQL,利用 SKIP LOCKED 实现乐观锁:
-- 单次扫荡 N 个任务
UPDATE tasks SET
status = 'processing',
started_at = NOW(),
retry_count = 0 -- 重置重试
WHERE id IN (
SELECT id FROM tasks
WHERE status = 'pending'
ORDER BY priority DESC, created_at ASC
LIMIT 10 -- 批次大小,根据 worker 数调
FOR UPDATE SKIP LOCKED
)
RETURNING *;
完整 worker 伪码(Python/psycopg2):
import psycopg2
import json
import time
def worker(conn_params, worker_id):
conn = psycopg2.connect(**conn_params)
cur = conn.cursor()
while True:
try:
cur.execute("""
UPDATE tasks SET status='processing', started_at=NOW()
WHERE id IN (
SELECT id FROM tasks WHERE status='pending'
ORDER BY priority DESC, created_at
LIMIT %s FOR UPDATE SKIP LOCKED
) RETURNING *;
""", (BATCH_SIZE,))
tasks = cur.fetchall()
for task in tasks:
try:
payload = json.loads(task['payload'])
# 执行业务: send_email(payload)
cur.execute("""
UPDATE tasks SET status='succeeded', finished_at=NOW()
WHERE id=%s;
""", (task['id'],))
except Exception as e:
retry = task['retry_count'] + 1
if retry > task['max_retries']:
cur.execute("""
UPDATE tasks SET status='failed', error_msg=%s, finished_at=NOW()
WHERE id=%s;
""", (str(e), task['id']))
else:
cur.execute("""
UPDATE tasks SET status='pending', retry_count=%s
WHERE id=%s;
""", (retry, task['id']))
conn.commit()
except Exception as e:
print(f"Worker {worker_id} error: {e}")
time.sleep(1)
time.sleep(POLL_INTERVAL) # 0.1-1s,避免忙等
SKIP LOCKED:其他 worker 跳过已锁行,实现公平分发。- 批次 LIMIT:防 OOM,调 10-100。
- 重试:失败回 pending,限次后 failed。
优化与监控参数
-
并行度:worker 数 = CPU * 2;POLL_INTERVAL=0.1s 高负载,1s 低负载。
-
优先级:ORDER BY priority DESC,确保高优先扫。
-
分区:大表 PARTITION BY RANGE (created_at),每周一表。
-
清理:Cron 删除 succeeded >7d:
DELETE FROM tasks WHERE status='succeeded' AND finished_at < NOW() - INTERVAL '7 days'; -
监控:视图统计
CREATE VIEW task_stats AS SELECT status, COUNT(*) as cnt, AVG(EXTRACT(EPOCH FROM (finished_at - started_at))) as avg_duration FROM tasks GROUP BY status;Prometheus 抓取 pending > 阈值告警。
-
采样:低优先任务加随机延时:
ORDER BY priority DESC, created_at, random()
风险控制:
- 死信队列:failed 表转移。
- 回滚:业务需幂等。
- 扩展:LISTEN/NOTIFY 推式通知,减轮询。
实战参数清单
| 参数 | 推荐值 | 说明 |
|---|---|---|
| BATCH_SIZE | 10-50 | 批次,视内存 |
| POLL_INTERVAL | 100ms-1s | 轮询,平衡延迟 / CPU |
| MAX_RETRIES | 3-5 | 重试上限 |
| PRIORITY | 1-10 | 高数字高优 |
| INDEX_MAINTENANCE | pg_repack | 定期重整索引 |
此方案已在生产验证,吞吐 1k/s 稳定。相较 Celery+Redis,运维成本降 80%。
资料来源:Taylor Troesh 在 POSETTE 2024 分享 原帖,结合 Postgres 官方 SKIP LOCKED 文档与 pg_later 等扩展实践。
(正文约 1250 字)