celery 查询任务状态已经是多线程了,那任务还需要多线程吗

主题信息(必填)
主题描述(最多限制在50个字符)
申请人信息(必填)
申请信息已提交审核,请注意查收邮件,我们会尽快给您反馈。
如有疑问,请联系
一只文艺范的软件攻城狮,Keep Learn,Always.
人生得意须尽欢,莫使金樽空对月。
本人热爱编程,有着很强的兴趣,做事认真
作者:Ethan,Web开发工程师,喜欢开源,热爱分享,从社区学了不少东西,也希望回馈给社区。博客地址:。
责编:陈秋歌,关注前端开发领域,寻求报道或者投稿请发邮件chenqg#csdn.net。可加微信「Rachel_qg」,申请加入“CSDN前端开发者”微信群,参与热点、难点技术交流。申请时请务必注明「公司+职位」。
Celery在程序的运行过程中,我们经常会碰到一些耗时耗资源的操作,为了避免它们阻塞主程序的运行,我们经常会采用多线程或异步任务。比如,在 Web 开发中,对新用户的注册,我们通常会给他发一封激活邮件,而发邮件是个 IO 阻塞式任务,如果直接把它放到应用当中,就需要等邮件发出去之后才能进行下一步操作,此时用户只能等待再等待。更好的方式是在业务逻辑中触发一个发邮件的异步任务,而主程序可以继续往下运行。 是一个强大的分布式任务队列,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。它的架构组成如下图:可以看到,Celery 主要包含以下几个模块:
任务模块 Task
包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往任务队列,而定时任务由 Celery Beat 进程周期性地将任务发往任务队列。
消息中间件 Broker
Broker,即为任务调度队列,接收任务生产者发来的消息(即任务),将任务存入队列。Celery 本身不提供队列服务,官方推荐使用 RabbitMQ 和 Redis 等。
任务执行单元 Worker
Worker 是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它。
任务结果存储 Backend
Backend 用于存储任务的执行结果,以供查询。同消息中间件一样,存储也可使用 RabbitMQ, Redis 和 MongoDB 等。
异步任务使用 Celery 实现异步任务主要包含三个步骤:
创建一个 Celery 实例
启动 Celery Worker
应用程序调用异步任务
快速入门为了简单起见,对于 Broker 和 Backend,这里都使用 redis。在运行下面的例子之前,请确保 redis 已正确安装,并开启 redis 服务,当然,celery 也是要安装的。可以使用下面的命令来安装 celery 及相关依赖:$ pip install 'celery[redis]'创建 Celery 实例将下面的代码保存为文件 tasks.py:
import time
from celery import Celery
broker = 'redis://127.0.0.1:;
backend = 'redis://127.0.0.1:9;
app = Celery('my_task', broker=broker, backend=backend)
def add(x, y):
time.sleep(5)
return x + y上面的代码做了几件事:
创建了一个 Celery 实例 app,名称为 my_task;
指定消息中间件用 redis,URL 为 redis://127.0.0.1:6379;
指定存储用 redis,URL 为 redis://127.0.0.1:6379/0;
创建了一个 Celery 任务 add,当函数被 @app.task 装饰后,就成为可被 Celery 调度的任务;
启动 Celery Worker在当前目录,使用如下方式启动 Celery Worker:$ celery worker -A tasks --loglevel=info其中:
参数 -A 指定了 Celery 实例的位置,本例是在 tasks.py 中,Celery 会自动在该文件中寻找 Celery 对象实例,当然,我们也可以自己指定,在本例,使用 -A tasks.app;
参数 --loglevel 指定了日志级别,默认为 warning,也可以使用 -l info 来表示;
在生产环境中,我们通常会使用
来控制 Celery Worker 进程。启动成功后,控制台会显示如下输出:调用任务现在,我们可以在应用程序中使用 delay() 或 apply_async() 方法来调用任务。在当前目录打开 Python 控制台,输入以下代码:&&& from tasks import add
&&& add.delay(2, 8)
&AsyncResult: 2272ddce-8be5-493f-b5ff-35a0d9fe600f&在上面,我们从 tasks.py 文件中导入了 add 任务对象,然后使用 delay() 方法将任务发送到消息中间件(Broker),Celery Worker 进程监控到该任务后,就会进行执行。我们将窗口切换到 Worker 的启动窗口,会看到多了两条日志:[ 12:00:50,376: INFO/MainProcess]
: .add[2272ddce-8be5-493f-b5ff-35a0d9fe600f]
[ 12:00:55,385: INFO/PoolWorker-4]
.add[2272ddce-8be5-493f-b5ff-35a0d9fe600f]
5.s: 10这说明任务已经被调度并执行成功。另外,我们如果想获取执行后的结果,可以这样做:&&& result = add.delay(2, 6)
&&& result.ready()
&&& result.ready()
&&& result.ready()
&&& result.get()
8在上面,我们是在 Python 的环境中调用任务。事实上,我们通常在应用程序中调用任务。比如,将下面的代码保存为 client.py:
from tasks import add
add.delay(2, 8)
print 'hello world'运行命令 $ python client.py,可以看到,虽然任务函数 add 需要等待 5 秒才返回执行结果,但由于它是一个异步任务,不会阻塞当前的主程序,因此主程序会往下执行 print 语句,打印出结果。使用配置在上面的例子中,我们直接把 Broker 和 Backend 的配置写在了程序当中,更好的做法是将配置项统一写入到一个配置文件中,通常我们将该文件命名为 celeryconfig.py。Celery 的配置比较多,可以在查询每个配置项的含义。下面,我们再看一个例子。项目结构如下:celery_demo
# 项目根目录
├── celery_app
# 存放 celery 相关文件
├── __init__.py
├── celeryconfig.py
# 配置文件
├── task1.py
# 任务文件 1
└── task2.py
# 任务文件 2
└── client.py
# 应用程序__init__.py 代码如下:
from celery import Celery
app = Celery('demo')
app.config_from_object('celery_app.celeryconfig')
celeryconfig.py 代码如下:BROKER_URL = 'redis://127.0.0.1:;
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:9;
CELERY_TIMEZONE='Asia/Shanghai'
CELERY_IMPORTS = (
'celery_app.task1',
'celery_app.task2'
)task1.py 代码如下:import time
from celery_app import app
def add(x, y):
time.sleep(2)
return x + ytask2.py 代码如下:import time
from celery_app import app
def multiply(x, y):
time.sleep(2)
return x * yclient.py 代码如下:
from celery_app import task1
from celery_app import task2
task1.add.apply_async(args=[2, 8])
task2.multiply.apply_async(args=[3, 7])
print 'hello world'现在,让我们启动 Celery Worker 进程,在项目的根目录下执行下面命令:celery_demo $ celery -A celery_app worker --loglevel=info接着,运行 $ python client.py,它会发送两个异步任务到 Broker,在 Worker 的窗口我们可以看到如下输出:[2016-12-10 13:51:58,939: INFO/MainProcess] Received task: celery_app.task1.add[9ccffad0-aca4-4875-84ce-0ccfce5a83aa]
[2016-12-10 13:51:58,941: INFO/MainProcess] Received task: celery_app.task2.multiply[64b1f889-c892-4333-bd1d-ac667e677a8a]
[2016-12-10 13:52:00,948: INFO/PoolWorker-3] Task celery_app.task1.add[9ccffad0-aca4-4875-84ce-0ccfce5a83aa] succeeded in 2.s: 10
[2016-12-10 13:52:00,949: INFO/PoolWorker-4] Task celery_app.task2.multiply[64b1f889-c892-4333-bd1d-ac667e677a8a] succeeded in 2.s: 21delay 和 apply_async在前面的例子中,我们使用 delay() 或 apply_async() 方法来调用任务。事实上,delay 方法封装了 apply_async,如下:def delay(self, *partial_args, **partial_kwargs):
"""Shortcut to :meth:`apply_async` using star arguments."""
return self.apply_async(partial_args, partial_kwargs)也就是说,delay 是使用 apply_async 的快捷方式。apply_async 支持更多的参数,它的一般形式如下:apply_async(args=(), kwargs={}, route_name=None, **options)apply_async 常用的参数如下:
countdown:指定多少秒后执行任务
task1.apply_async(args=(2, 3), countdown=5)
# 5 秒后执行任务
eta (estimated time of arrival):指定任务被调度的具体时间,参数类型是 datetime
from datetime import datetime, timedelta
task1.multiply.apply_async(args=[3, 7], eta=datetime.utcnow() + timedelta(seconds=10))
expires:任务过期时间,参数类型可以是 int,也可以是 datetime
task1.multiply.apply_async(args=[3, 7], expires=10)
更多的参数列表可以在中查看。定时任务Celery 除了可以执行异步任务,也支持执行周期性任务(Periodic Tasks),或者说定时任务。Celery Beat 进程通过读取配置文件的内容,周期性地将定时任务发往任务队列。让我们看看例子,项目结构如下:celery_demo
# 项目根目录
├── celery_app
# 存放 celery 相关文件
├── __init__.py
├── celeryconfig.py
# 配置文件
├── task1.py
# 任务文件
└── task2.py
# 任务文件__init__.py 代码如下:
from celery import Celery
app = Celery('demo')
app.config_from_object('celery_app.celeryconfig')celeryconfig.py 代码如下:
from datetime import timedelta
from celery.schedules import crontab
BROKER_URL = 'redis://127.0.0.1:;
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:9;
CELERY_TIMEZONE='Asia/Shanghai'
CELERY_IMPORTS = (
'celery_app.task1',
'celery_app.task2'
CELERYBEAT_SCHEDULE = {
'add-every-30-seconds': {
'task': 'celery_app.task1.add',
'schedule': timedelta(seconds=30),
'args': (5, 8)
'multiply-at-some-time': {
'task': 'celery_app.task2.multiply',
'schedule': crontab(hour=9, minute=50),
'args': (3, 7)
}task1.py 代码如下:import time
from celery_app import app
def add(x, y):
time.sleep(2)
return x + ytask2.py 代码如下:import time
from celery_app import app
def multiply(x, y):
time.sleep(2)
return x * y现在,让我们启动 Celery Worker 进程,在项目的根目录下执行下面命令:celery_demo $ celery -A celery_app worker --loglevel=info接着,启动 Celery Beat 进程,定时将任务发送到 Broker,在项目根目录下执行下面命令:celery_demo $ celery beat -A celery_app
celery beat v4.0.1 (latentcall) is starting.
LocalTime -& 2016-12-11 09:48:16
Configuration -&
. broker -& redis:
. loader -& celery.loaders.app.AppLoader
. scheduler -& celery.beat.PersistentScheduler
. db -& celerybeat-schedule
. logfile -& [stderr]@%WARNING
. maxinterval -& 5.00 minutes (300s)之后,在 Worker 窗口我们可以看到,任务 task1 每 30 秒执行一次,而 task2 每天早上 9 点 50 分执行一次。在上面,我们用两个命令启动了 Worker 进程和 Beat 进程,我们也可以将它们放在一个命令中:$ celery -B -A celery_app worker --loglevel=infoCelery 周期性任务也有多个配置项,可参考。参考资料&&&&&&&&&&&&&
写作背景介绍
最近在做后台图像处理,需要使用到celery这个异步任务框架。但是使用的时候遇到很多技术问题,为了方便日后再遇到相似问题时能够快速解决。写下这篇文章也希望能够帮助共同奋战在同一战线的程序员们。这篇是入门级的文章,如果你已经使用过celery开发过项目完全可以忽略它。当然也非常欢迎你给我留下你宝贵的意见。下一篇文章将会深入一点,期待能再次看到你。
Celery环境搭建
celery是异步处理框架,我们需要一个消息队列来下发我们的任务。使用RabbitMQ是官方特别推荐的方式,因此我也使用它作为我们的broker
安装rabbitmq
我使用的是linuxmint系统,如果是你使用的是ubuntu或者其他的操作系统可以在官网找到安装方案。
sudo apt-get install rabbitmq-server
安装celery
Celery可以通过pip自动安装,如果你喜欢使用虚拟环境安装可以先使用virtualenv创建一个自己的虚拟环境。反正我喜欢使用virtualenv建立自己的环境。
pip install -U celery
第一个例子
按照编程的惯例,我们先设计一个hello world示例。
编写简单的纯python函数
def say(x,y):
return x+y
if __name__ == '__main__':
say('Hello','World')
如果这个函数不是简单的输出两个字符串相加,而是需要查询数据库或者进行复杂的处理。这种处理需要耗费大量的时间,还是这种方式执行会是多么糟糕的事情。为了演示这种现象,可以使用sleep函数来模拟高耗时任务。
import time
def say(x,y):
time.sleep(5)
return x+y
if __name__ == '__main__':
say('Hello','World')
这时候我们可能会思考怎么使用多进程或者多线程去实现这种任务。对于多进程与多线程的不足这里不做讨论。现在我们可以想想celery到底能不能解决这种问题。
import time
from celery import Celery
app = Celery('sample',broker='amqp://guest@localhost//')
def say(x,y):
time.sleep(5)
return x+y
if __name__ == '__main__':
say('Hello','World')
现在来解释一下新加入的几行代码,首先说明一下加入的新代码完全不需要改变原来的代码。导入celery模块就不用解释了,声明一个celery实例app的参数需要解释一下。
第一个参数是这个python文件的名字,注意到我们已经把.py去掉了。
第二个参数是用到了我们的rabbitmq队列啦!可以看到其使用的方式非常简单,因为它是默认的消息队列端口号都不需要指明。
现在我们已经使用了celery框架了,我们需要让它找几个工人帮我们干活。好现在就让他们干活。
celery -A sample worker --loglevel=info
这条命令有些长,我来解释一下吧。
-A代表的是Application的首字母,我们的应用就是在sample里面呢。
worker 就是我们的工人了,他们会努力完成我们的工作的。
-loglevel=info指明了我们的工作后台执行情况,虽然工人们已经向你保证过一定努力完成任务。但是谨慎的你还是希望看看工作进展情况。
回车后你可以看到类似下面这样一个输出,如果是没有红色的输出那么你应该是没有遇到什么错误的。
现在我们的任务已经被加载到了内存中,我们不能再想之前那样执行python sample.py来运行程序了。我们可以通过终端起到python然后通过下面的方式加载任务。
输入python语句
from sample import say
say.delay('hello','world')
我们的函数会立即返回,不需要等待。就那么简单celery解决了我们的问题。可以发现我们的say函数不是直接调用了,它被celery的task装饰器修饰过了。所以多了一些属性。目前我们只需要知道使用delay就行了。
看看我的结果吧!
可怜的我们会发现结果输出在另一个终端上不是我们调用say.delay的终端上。这其实是正常的,因为我们的工作已经交给后台去处理了。
现在,我们知道celery的确可以在后台完成我们交给它的任务。只不过它脾气我们还没摸透。为什么干完事了没有告诉我们,让我们在任务发布终端等了好久。还有就是celery配置在工作函数say一个文件中总是让我们感觉不爽。这些问题我们下一篇文章将会解决它,看看celery到底多强大。
阅读(...) 评论()[Celery]Celery 最佳实践
orangleliu 翻译 原文点击查看
如果你的工作和 Django 相关, 并且有时候需要执行一些长时间的后台任务。可能你已经使用了某种任务队列,Celery就是(和Django)世界中时下解决类似问题最受欢迎的项目。
当在某些项目使用Celery作为任务队列之后,我总结了一些最佳实践,决定把它们些下来。然而,这里也有一些对自己应该做的却没做的反思,还有一些celery提供但是没有充分利用的功能。
不要使用关系型来作为AMQP的代理
让我来解释下我为什么觉得这是错的。
关系型数据库不像RabbitMQ一样专门作为AMQP代理而设计。它会在某个时间点挂掉,可能在生产中没法那么基于 传输/用户。
我猜测人们使用关系型数据库的最大原因是,已经有了一个数据库为web应用工作,为啥不复用呢。配置非常简单并且你不需要在担心其他的(像RabbitMQ)
假设这样的场景:你有4个后台工作的进程,你把这些任务放到数据库中。这意味着有四个进程相当频繁地去数据库轮询,检查是否有新的任务,这还不包括这4个进程本身也是多个进程。在某些时刻你会发现你的任务进程很慢,有些任务还没处理就有更多的任务进来了,你就自然的增加worker来处理任务。大量的worker为了获取新任务轮询数据库,导致数据库突然变慢,磁盘IO达到瓶颈,你的web应用也会受此影响变得越来越慢,因为这些worker正在对数据库进行基本的DDOS 攻击。
当你有一个像RabbitMQ这样的AMQP代理的时候,这些情况就不会发生了,因为这些队列是存在于内存当中,所以也不会伤害到你的硬盘。这些worker不需要频繁的轮询,因为队列会把新的任务推送给worker,如果AMQP因为某些原因不能工作了,至少不会影响到web应用的所有使用。
我不得不说你也不应该在开发环境中使用关系型数据库来作为代理,像Docker和预先建立好的镜像都能给你一个沙盒中的RabbitMQ环境使用。
使用多个Queues(队列),不要只是使用默认的那个(default)
Celery的启动是相当的简单,它会启动一个默认的队列,除非你定义了别的队列否则它就会把所有的任务放到这一个队列中去。最常见的就是像下面这样。
@app.task()
def my_taskA(a, b, c):
print("doing something here...")
@app.task()
def my_taskB(x, y):
print("doing something here...")
两个任务会放到同一个队列中去(如果没有在celeryconfig.py中配置).我能清楚的看到有哪些事发生,因为你那些可人的后台任务上仅仅有那么一个 装饰器。这里我关心的是,也许 taskA 和 taskB做的是完全不同的两件事情,也许其中一个要比另外一个重要的多,那为什么要把它们扔到一个篮子里呢?虽然一个worker可以处理这两个任务,设想某个时间有大量的taskB,然而更重要的 taskA却没有得到worker的足够重视?这种情况下增加了worker以后,所有的worker还是会平等的对待这两种任务,在大量taskB的情况下,taskA还是无法得到应得的重视。
这就把我们带到了下一个要点中。
使用优先级wokers
解决上面问题办法就是把taskA放到一个队列中去,taskB放到另一个队列中去,分配x个workers去处理Q1队列,有于Q2队列有更多的任务需要处理,其他的workers都分配给Q2队列。这种方式你能保证taskB有足够多的workers去,同时维持几个高优先级的队列给taskA,当taskA任务来的时候不需要等待很久就可以被处理掉。
所以,手工的定义队列
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('for_task_A', Exchange('for_task_A'), routing_key='for_task_A'),
Queue('for_task_B', Exchange('for_task_B'), routing_key='for_task_B'),
你的routes 会决定不同的任务分配到不同的队列
CELERY_ROUTES = {
'my_taskA': {'queue': 'for_task_A', 'routing_key': 'for_task_A'},
'my_taskB': {'queue': 'for_task_B', 'routing_key': 'for_task_B'},
然后你可以为每个任务启动不同的workers
celery worker -E -l INFO -n workerA -Q for_task_A
celery worker -E -l INFO -n workerB -Q for_task_B
使用Celery’s的错误处理机制
我见过最多就是,任务根本就没有错误处理的概念。如果一个任务失败了就是失败了。在某些情况下这样处理是不错的,然而我见过最多的是一些第三方API的错误,网络原因,或者资源不可用等造成的。最简单的处理这种错误的办法就是对任务进行重试。因为有一些第三方的API是因为服务或者网络的出了问题,但是很快就可以恢复,我们为什么不试一试呢?
@app.task(bind=True, default_retry_delay=300, max_retries=5)
def my_task_A():
print("doing stuff here...")
except SomeNetworkException as e:
print("maybe do some clenup here....")
self.retry(e)
我比较喜欢就是给每个任务定义一个重试的间隔和重试的次数(分别是default_retry_delay和max_retries参数)。这是最基本的错误处理方式也是我见过最多的。当然Celery还提供了很多种处理处理但是我把celery的文档地址留给你。
使用Flower
Flower 是一个非常棒的工具,它可以用来监控celery的任务和workers。它是基于web的,所以你可以看到任务进程,详情,worker状态,启动新的workers等。可以通过前面的链接查看它所有的功能。
只有真正需要才追踪task的结果
task状态指的是task执行的结果是成功还是失败。它对于后续的某些分析是有用的。需要注意的一个问题是退出结果并不是任务执行的结果,那些信息更类似于对数据的某些影响(例如更新用户的朋友列表)
项目中我见过最多的是不关心这些任务执行时候的状态,有些只是用默认的sqlite数据库在保存这些信息,更好一点的是花时间保存在常规的数据库中(例如postgres 或者其他数据库)
为什么无缘无故的增加web应用数据库的负担呢?使用CELERY_IGNORE_RESULT = True配置在你的celeryconfig.py配置文件中来丢弃这些执行状态。
不要通过数据库或者ORM对象的方式来执行任务
在一次本地的Python小聚会上发表这个分享之后有几个人建议我把这一条添加到最佳实践的列表中。这个建议是关于什么的呢?不要通过数据库对象(例如你的User model)来执行后台任务,因为对象序列话是包含了一些陈旧的数据。你要作的是把Userid放在任务中,然后任务执行的时候会从数据中获取最新的用户对象。
(window.slotbydup=window.slotbydup || []).push({
id: '2467140',
container: s,
size: '1000,90',
display: 'inlay-fix'
(window.slotbydup=window.slotbydup || []).push({
id: '2467141',
container: s,
size: '1000,90',
display: 'inlay-fix'
(window.slotbydup=window.slotbydup || []).push({
id: '2467142',
container: s,
size: '1000,90',
display: 'inlay-fix'
(window.slotbydup=window.slotbydup || []).push({
id: '2467143',
container: s,
size: '1000,90',
display: 'inlay-fix'
(window.slotbydup=window.slotbydup || []).push({
id: '2467148',
container: s,
size: '1000,90',
display: 'inlay-fix'celery定时任务实践
(window.slotbydup=window.slotbydup || []).push({
id: '2611110',
container: s,
size: '240,200',
display: 'inlay-fix'
您当前位置: &
[ 所属分类
| 时间 2015 |
作者 红领巾 ]
说起celery搞的程序员并不陌生,一般做队列任务之类的总是会用到。最近公司新项目用到类似队列的场景但是还要求定时完成,所以一下想到了celery马上搞起来。
看了资料做了需求分析,celery本身能完成队列和异步任务的功能,但是没有定时功能,只有django-celery里面是有crontab的定时任务功能的。正好我们项目就是用的django所以直接django搞起来
安装不用多说直接pip就搞定, the我们使用的是github上的development版本,与相依赖的是 celery==3.1.18
pip install git+/celery/django-celery.gitpip install celery==3.1.18 # 如果出现依赖问题那就安装
django设置
在settings.py中配置:
import djcelerydjcelery.setup_loader()# BROKER_URL = 'django://' # 直接使用django做broker生产环境不建议,建议使用redis或者rabbitMQBROKER_URL = 'redis://::22222/0' # broker使用reids这是要多2CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' # 定时任务CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'CELERY_ENABLE_UTC = False # 不是用UTCCELERY_TIMEZONE = 'Asia/Shanghai' CELERY_TASK_RESULT_EXPIRES = 10 #任务结果的时效时间CELERYD_LOG_FILE = BASE_DIR + "/logs/celery/celery.log" # log路径CELERYBEAT_LOG_FILE = BASE_DIR + "/logs/celery/beat.log" # beat log路径CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml'] # 允许的格式...INSTALLED_APPS = ( ... 'djcelery', 'kombu.transport.django', ...)
第一二项是必须的,在INSTALLED_APPS中添加的djcelery是必须的. kombu.transport.django则是基于Django的broker,如果使用redis就不需要了。
最后创建Celery所需的数据表(django1.8):
python manage.py migrate
创建一个task
正如前面所说的, 一个task就是一个Pyhton function. 但Celery需要知道这一function是task, 因此我们可以使用celery自带的装饰器decorator: @task. 在django app目录中创建taske.py:
from celery import task@task()def add(x, y): return x + y
当settings.py中的djcelery.setup_loader()运行时, Celery便会查看所有INSTALLED_APPS中app目录中的tasks.py文件, 找到标记为task的function, 并将它们注册为celery task.
将function标注为task并不会妨碍他们的正常执行. 你还是可以像平时那样调用它: z = add(1, 2).
让我们以一个简单的例子作为开始. 例如我们希望在用户发出request后异步执行该task, 马上返回response, 从而不阻塞该request, 使用户有一个流畅的访问过程. 那么, 我们可以使用.delay, 例如在在views.py的一个view中:
from myapp.tasks import add... add.delay(2, 2)...
Celery会将task加入到queue中, 并马上返回. 而在一旁待命的worker看到该task后, 便会按照设定执行它, 并将他从queue中移除. 而worker则会执行以下代码:
import myapp.tasks.addmyapp.tasks.add(2, 2)
启动worker
正如之前说到的, 我们需要worker来执行task. 以下是在开发环境中的如何启动worker:
首先启动terminal, 如同开发django项目一样, 激活virtualenv, 切换到django项目目录. 然后启动django自带web服务器: python manage.py runserver.
然后启动worker:
python manage.py celery worker --loglevel=info
此时, worker将会在该terminal中运行, 并显示输出结果.
打开新的terminal, 激活virtualenv, 并切换到django项目目录:
$ python manage.py shell
&&& from myapp.tasks import add
&&& add.delay(2, 2)
此时, 你可以在worker窗口中看到worker执行该task:
[ 08:47:08,076: INFO/MainProcess] Got task from broker: myapp.tasks.add[e080e047-b2a2-43a7-af74-d7d9d98b02fc][ 08:47:08,299: INFO/MainProcess] Task myapp.tasks.add[e080e047-b2a2-43a7-af74-d7d9d98b02fc] succeeded in 0.s: 4
好了,简单的任务我们已经调通,下面我们还是来看定时任务怎么弄。首先新建一个文件task.py(名字可以随便取)
import datetimeimport jsonfrom djcelery import models as celery_modelsfrom django.utils import timezonedef create_task(name, task, task_args, crontab_time): ''' 创建任务 name # 任务名字 task # 执行的任务 "myapp.tasks.add" task_args # 任务参数 {"x":1, "Y":1} crontab_time # 定时任务时间 格式: { 'month_of_year': 9 # 月份 'day_of_month': 5 # 日期 'hour': 01 # 小时 'minute':05 # 分钟 }
''' # task任务, created是否定时创建 task, created = celery_models.PeriodicTask.objects.get_or_create( name=name, task=task) # 获取 crontab crontab = celery_models.CrontabSchedule.objects.filter( **crontab_time).first() if crontab is None: # 如果没有就创建,有的话就继续复用之前的crontab crontab = celery_models.CrontabSchedule.objects.create(**crontab_time) task.crontab = crontab # 设置crontab task.enabled = True # 开启task task.kwargs = json.dumps(task_args) # 传入task参数 expiration = timezone.now() + datetime.timedelta(day=1) task.expires = expiration # 设置任务过期时间为现在时间的一天以后 task.save() return Truedef disable_task(name): ''' 关闭任务 ''' try: task = celery_models.PeriodicTask.objects.get(name=name) task.enabled = False # 设置关闭 task.save() return True except celery_models.PeriodicTask.DoesNotExist: return True
执行定时任务时, Celery会通过celerybeat进程来完成. Celerybeat会保持运行, 一旦到了某一定时任务需要执行时, Celerybeat便将其加入到queue中. 不像worker进程, Celerybeat只有需要一个即可.
python manage.py celery beat --loglevel=info
其实还有一种简单的启动方式worker和beat一起启动:
python manage.py celery worker --loglevel=info --beat
将定期任务储存在django数据库中. 即是在django和celery都运行的状态, 这一方式也可以让我们方便的修改定时任务. 我们只需要设置settings.py中的一项便能开启这一方式:
# settings.pyCELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
然后我们便可以通过django admin的/admin/djcelery/periodictask/添加定期任务了.也可以直接使用刚才我写的脚本在自己的代码逻辑中自己增加和禁用定时任务
可能大家用了一段时间就会想到,很多任务都是一次执行完就不需要,留在数据库里就是垃圾数据了有没有办法清除。方法肯定有因为django-celery本身就有定时任务功能我们加个任务就解决了。好我们看代码:在django app目录中打开taske.py加入如下代码
from djcelery import models as celery_modelsfrom django.utils import timezone@task()def delete(): ''' 删除任务 从models中过滤出过期时间小于现在的时间然后删除 ''' return celery_models.PeriodicTask.objects.filter( expires__lt=timezone.now()).delete()
大家都记得创建任务脚本里设置了 expires 1天以后过期,这样在filter的时候就能当做条件把过期的任务找到并且删除。
然后我们在django-admin中把这个任务加到定时任务中:
名字为del,任务是myapp.taks.delete,定时为每天的5点执行(crontab的格式,不熟悉的大家可以搜索学习下 crontab格式)
本文开发(python)相关术语:python基础教程 python多线程 web开发工程师 软件开发工程师 软件开发流程
转载请注明本文标题:本站链接:
分享请点击:
1.凡CodeSecTeam转载的文章,均出自其它媒体或其他官网介绍,目的在于传递更多的信息,并不代表本站赞同其观点和其真实性负责;
2.转载的文章仅代表原创作者观点,与本站无关。其原创性以及文中陈述文字和内容未经本站证实,本站对该文以及其中全部或者部分内容、文字的真实性、完整性、及时性,不作出任何保证或承若;
3.如本站转载稿涉及版权等问题,请作者及时联系本站,我们会及时处理。
登录后可拥有收藏文章、关注作者等权限...
阅读(5351)
一艺之成,当尽毕生之力!
手机客户端
,专注代码审计及安全周边编程,转载请注明出处:http://www.codesec.net
转载文章如有侵权,请邮件 admin[at]codesec.net}

我要回帖

更多关于 celery 周期任务 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信