在使用 celery 的时候发现有的时候 celery 会将同一个任务执行两遍,我遇到的情况是相同的任务在不同的 worker 中被分别执行,并且时间只相差几毫秒。这问题我一直以为是自己哪里处理的逻辑有问题,后来发现其他人 也有类似的问题,然后基本上出问题的都是使用 redis 作为 broker 的,而我这边一方面不想将 redis 替换掉,就只能在 task 执行的时候加分布式锁了。

不过在 celery 的 issue 中搜索了一下,有人使用 redis 实现了分布式锁,然后也有人使用了 celery once。 大致看了一下 celery once ,发现非常符合现在的情况,就用了下。

celery once 也是利用 redis 加锁来实现, celery once 在 task 类基础上实现了 queueonce 类,该类提供了任务去重的功能,所以在使用时,我们自己实现的方法需要将 queueonce 设置为 base

@task(base=queueonce, once={‘graceful’: true})

后面的 once 参数表示,在遇到重复方法时的处理方式,默认 graceful 为 false,那样 celery 会抛出 alreadyqueued 异常,手动设置为 true,则静默处理。

另外如果要手动设置任务的 key,可以指定 keys 参数

@celery.task(base=queueonce, once={'keys': ['a']})
def slow_add(a, b):
    sleep(30)
    return a + b

总得来说,分为几步

第一步,安装

pip install -u celery_once

第二步,增加配置

from celery import celery
from celery_once import queueonce
from time import sleep

celery = celery('tasks', broker='amqp://guest@localhost//')
celery.conf.once = {
  'backend': 'celery_once.backends.redis',
  'settings': {
    'url': 'redis://localhost:6379/0',
    'default_timeout': 60 * 60
  }
}

第三步,修改 delay 方法

example.delay(10)
# 修改为
result = example.apply_async(args=(10))

第四步,修改 task 参数

@celery.task(base=queueonce, once={'graceful': true, keys': ['a']})
def slow_add(a, b):
    sleep(30)
    return a + b

参考链接 https://github.com/cameronmaske/celery-once

到此这篇关于使用 celery once 来防止 celery 重复执行同一个任务的文章就介绍到这了,更多相关 celery 重复执行同一个任务内容请搜索www.887551.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持www.887551.com!