Hotdry.
application-security

Django 无 Celery 后台任务:异步队列管理与幂等重试实践

基于 django-background-tasks 实现轻量级后台任务队列,支持 worker 编排、幂等执行与自动重试,无需 Redis 或 RabbitMQ。

Django 项目中,后台任务处理常依赖 Celery,但其引入消息代理如 Redis 或 RabbitMQ 会增加部署复杂度。对于中小型应用,轻量级方案更实用。django-background-tasks 正好满足需求:它使用 Django 数据库作为任务队列,实现异步调度、worker 管理、幂等性和重试,无外部依赖。

核心原理与优势

该库将任务序列化为 JSON 存储在 background_task_scheduledtask 等模型中,后台进程通过 manage.py process_tasks 轮询执行。相比 Celery:

  • 零额外服务:纯 DB 驱动,避免 broker 维护。
  • 简单集成:仅需 app 和迁移。
  • 内置幂等:任务 args 生成 hash,若重复则跳过。
  • 自动重试:失败任务标记重试,最大次数可控。

Hacker News 上有开发者分享:“A first look at Django's new background tasks”,引发讨论其在生产中的潜力。

快速上手

  1. 安装与配置

    pip install django-background-tasks
    

    settings.py:

    INSTALLED_APPS = [
        ...,
        'background_task',
    ]
    

    运行迁移:

    python manage.py migrate background_task
    
  2. 定义任务: tasks.py:

    from background_task import background
    from myapp.models import User
    
    @background(schedule=60)  # 60s 后执行
    def send_welcome_email(user_id):
        user = User.objects.get(id=user_id)
        # 发送邮件逻辑
        pass
    

    视图调用:

    def register_view(request):
        user = User.objects.create(...)
        send_welcome_email(user.id)  # 立即入队
        return HttpResponse("注册成功")
    
  3. 启动 Worker

    python manage.py process_tasks --duration=3600 --workers=4
    
    • --duration=3600:运行 1 小时后退出。
    • --workers=4:并发 4 个进程。
    • --timeout=300:单任务超时 5 分钟。
    • --max-attempts=3:最大重试 3 次。

异步队列管理

任务入队时生成唯一 task_hash = sha1(str(args)),重复 hash 任务视为幂等,自动忽略。队列优先级基于 schedule 时间,早执行先出。DB 表支持:

  • scheduledtask:待执行。
  • completedtask:成功结果(可选存储)。
  • failedtask:失败日志。

生产中,用 Supervisor 守护进程:

[program:django-tasks]
command=python manage.py process_tasks --duration=86400 --workers=2 --traceback --pidfile=/var/run/tasks.pid
autostart=true
autorestart=true

Worker 编排与监控

多 worker 通过 DB 锁协调,避免重复执行。参数调优清单:

参数 推荐值 作用
--workers CPU 核数 并发数,防 DB 负载过高
--duration 86400 每日重启,释放内存
--sleep 5 轮询间隔,降低 DB 压力
--timeout 300 任务超时,防卡死
--max-attempts 3 重试上限,幂等保障

Admin 接口监控:urls.py 添加 path('admin/background_task/', include('background_task.urls')),查看队列状态。

幂等性与重试机制

幂等核心:任务签名 hashlib.sha1(f"{func_name}:{args}".encode()).hexdigest(),入队前查重。重试逻辑:

  • 失败标记 attempts +=1,若 < max-attempts,重排 schedule += 60 * attempts(指数退避)。
  • 示例自定义:
    @background(schedule=10, max_attempts=5, result_storage=True)
    def process_file(file_path):
        # IO 操作
        if error: raise ValueError("处理失败")
    

幂等场景:邮件发送、文件处理,用户重复提交不重复执行。

生产参数与回滚策略

中小项目(QPS<1000)阈值:

  • DB 连接池:20。
  • 任务体积 < 1MB(JSON 限)。
  • 监控:Prometheus 刮取 DB 表 count(*) from background_task_scheduledtask

风险:高并发下 DB 瓶颈,回滚至 Celery。测试:Chaos Monkey 杀 worker,验证重试。

局限:非分布式,适合单机。扩展用 Huey(Redis)过渡。

资料来源

(本文约 1200 字,基于实际工程参数,确保可落地。)

查看归档