Celery分布式任务队列基本使用
Celery基本概念
Broker
broker是一个消息传输的中间件 每当程序调用celery的异步任务的时候 会向broker传递消息 而后celery的worker将会取到消息
broker的实现方案有 redis rabbitmq 数据库
Backend
backend是用来存储celery执行任务的当前状态和最终结果
worker
具体执行代码的基础单元 负责从rabbitmq或者redis中拉取任务执行 可以在不同的主机上启动worker 实现分布式执行
beat
负责定时或者循环的向redis等broker中写入任务 以便让worker从broker中获取任务执行 两者配合实现celery定时任务
Celery基础操作
pip install celery
[root@node3 jade]# yum install redis
[root@node3 jade]# systemctl start redis
[root@node3 jade]# vi tasks.py
from celery import Celery
app=Celery('tasks',broker='redis://localhost',backend='redis://localhost')
@app.task
def add(x,y):
print("running...",x,y)
return x y
本机启动worker tasks代表的当前目录下的tasks.py文件
[root@node3 jade]# celery -A tasks worker --loglevel=info
调用
>>> from tasks import add
>>> add.delay(4,5)
<AsyncResult: f1d84db4-91dc-4603-99af-c3047aaa32c4>
>>>
t=add.delay(4,5)
t.get() 同步获取结果
t.ready() 检查任务是否完成
t.get(timeout=1) 设置获取结果的超时时间
t.get(propagate=False) 如果执行方法出错 不触发异常 只显示异常结果
t.traceback 输出错误详细信息
View Code
1.在项目的根目录下创建一个目录celery_pro目录
[root@node3 jade]# ls
appconf comlib docs jadehare.log media tools
audit cron elk jobs monitor requirements.txt
celery_pro db.sqlite3 jadehare logcenter navi static
cmdb deploy jadehare.conf manage.py portals templates
[root@node3 jade]# cd celery_pro/
2.在celery_pro目录下创建两个py文件
[root@node3 celery_pro]# ls
celery.py tasks.py
vi celery.py
from __future__ import absolute_import, unicode_literals
from celery import Celery
app = Celery('proj',
broker='redis://localhost',
backend='redis://localhost',
include=['celery_pro.tasks'])
# Optional configuration, see the application user guide.
app.conf.update(
result_expires=3600,
)
if __name__ == '__main__':
app.start()
vi tasks.py
from __future__ import absolute_import, unicode_literals
from .celery import app
@app.task
def add(x,y):
print("running...",x,y)
return x y
@app.task
def mul(x,y):
print("running jianfa.....",x,y)
return x-y
@app.task
def jihe(x,y):
import time
time.sleep(6)
print("0099990000000")
启动celery 在项目的根目录下执行 celery_pro表示刚才上面创建的目录
[root@node3 jade]# celery -A celery_pro worker -l info
/root/.pyenv/versions/3.5.3/envs/jade/lib/python3.5/site-packages/celery/platforms.py:795: RuntimeWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!
[tasks]
. celery_pro.tasks.add
. celery_pro.tasks.jihe
. celery_pro.tasks.mul
后台启动celery worker
[root@node3 jade]# celery multi start w1 -A celery_pro -l info
celery multi v4.1.0 (latentcall)
> Starting nodes...
> w1@node3: OK
重启worker
celery multi restart w1 w2 -A celery_pro
可以在不同主机上启动celery的worker 只需要这些主机都能连接到redis broker和redis backend 这样就实现了celery的分布式执行
调用celery 在项目的根目录下
[root@node3 ~]# cd /mnt/hgfs/jade
[root@node3 jade]# python
Python 3.5.3 (default, Sep 2 2018, 15:29:35)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-28)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from celery_pro import tasks
>>> t = tasks.add.delay(2,5)
>>> t.get()
7
celery配置和任务分离
[root@node3 celery_pro]# vi celery.py
from __future__ import absolute_import, unicode_literals
from celery import Celery
app = Celery('proj',
broker='redis://localhost',
backend='redis://localhost',
include=['celery_pro.tasks','celery_pro.periodic_task'])
# Optional configuration, see the application user guide.
app.conf.update(
result_expires=3600,
)
if __name__ == '__main__':
app.start()
[root@node3 celery_pro]# vi periodic_task.py
from __future__ import absolute_import, unicode_literals
from .celery import app
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls test('hello') every 10 seconds.
#test表示下面定义的函数
#s('hello') 表示给test函数传递的参数
sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
# Calls test('world') every 30 seconds
sender.add_periodic_task(30.0, test.s('world'), expires=10)
# Executes every Monday morning at 7:30 a.m.
sender.add_periodic_task(
crontab(hour=7, minute=30, day_of_week=1),
test.s('Happy Mondays!'),
)
@app.task
def test(arg):
print(arg)
启动beat
[root@node3 jade]# celery -A celery_pro.periodic_task beat -l debug
启动worker
[root@node3 jade]# celery -A celery_pro worker -l info
[2018-10-09 19:51:45,939: WARNING/ForkPoolWorker-4] hello
[2018-10-09 19:51:45,951: INFO/MainProcess] Received task: celery_pro.periodic_task.test[f8d493df-a167-40d6-962e-2c74814443d4] expires:[2018-10-09 11:51:51.135272 00:00]
[2018-10-09 19:51:45,951: INFO/ForkPoolWorker-4] Task celery_pro.periodic_task.test[aa34fad3-7b03-4524-9075-5b3caaf0ae4e] succeeded in 0.0125010800002201s: None
celery定时任务
vi celery.py
app.conf.beat_schedule = {
'add-every-5-seconds': {
'task': 'celery_pro.tasks.add',
'schedule': 5.0,
'args': (16, 16)
},
}
vi celery.py
from celery.schedules import crontab
app.conf.beat_schedule = {
# Executes every Monday morning at 7:30 a.m.
'add-every-monday-morning': {
'task': 'celery_pro.tasks.add',
'schedule': crontab(hour=7, minute=30, day_of_week=1),
'args': (16, 16),
},
}
重启beat
celery -A celery_pro.periodic_task beat -l debug
不需要重启worker便可直接生效
复杂定时任务配置
Celery普通task集成到django
[root@node3 jadehare]# vi settings.py
#Celery 设置
CELERY_ENABLE_UTC = False
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_BACKEND = 'django-db'
#CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/1'
CELERY_TIMEZONE = 'Asia/Shanghai'
#防止内存溢出
CELERYD_MAX_TASKS_PER_CHILD = 3
INSTALLED_APPS = [
'django_celery_results',
'django_celery_beat',
]
settings.py
[root@node3 jadehare]# ls
celery.py __init__.py settings.py urls.py views.py wsgi.py
[root@node3 jadehare]# vi celery.py
#! /usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
#import configparser
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'jadehare.settings')
app = Celery('jadehare')
# redis connect code
#BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
#config = ConfigParser.ConfigParser()
#config.read(os.path.join(BASE_DIR, 'adminset.conf'))
#redis_host = config.get('redis', "redis_host")
#redis_port = config.get("redis", "redis_port")
#redis_db = config.get('redis', "redis_db")
#redis_password = config.get('redis', "redis_password")
#redis_port = config.get("redis", "redis_port")
#redis_db = config.get('redis', "redis_db")
#redis_password = config.get('redis', "redis_password")
from celery import Celery
#import configparser
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'jadehare.settings')
app = Celery('jadehare')
# redis connect code
#BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
#config = ConfigParser.ConfigParser()
#config.read(os.path.join(BASE_DIR, 'adminset.conf'))
#redis_host = config.get('redis', "redis_host")
#redis_port = config.get("redis", "redis_port")
#redis_db = config.get('redis', "redis_db")
#redis_password = config.get('redis', "redis_password")
#if redis_password:
# app.conf.broker_url = 'redis://:{0}@{1}:{2}/{3}'.format(redis_password, redis_host, redis_port, redis_db)
#else:
# app.conf.broker_url = 'redis://{0}:{1}/{2}'.format(redis_host, redis_port, redis_db)
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
#app.conf.broker_transport_options = {'visibility_timeout': 3600}
#app.conf.result_backend = 'redis://127.0.0.1:6379/0'
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
celery.py
[root@node3 jadehare]# vi __init__.py
from __future__ import absolute_import, unicode_literals
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ('celery_app',)
init.py
在任何一个app下添加tasks.py文件 这个文件名称必须叫tasks.py
[root@node3 jade]# pwd
/mnt/hgfs/jade
[root@node3 jade]# cd navi/
[root@node3 navi]# vi tasks.py
from __future__ import absolute_import, unicode_literals
from celery import shared_task
@shared_task
def add_navi(x, y):
return x y
tasks.py
[root@node3 jade]# pwd
/mnt/hgfs/jade
[root@node3 jade]# celery -A jadehare worker -l info
/root/.pyenv/versions/3.5.3/envs/jade/lib/python3.5/site-packages/celery/platforms.py:795: RuntimeWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!
Please specify a different user using the -u option.
User information: uid=0 euid=0 gid=0 egid=0
uid=uid, euid=euid, gid=gid, egid=egid,
-------------- celery@node3 v4.1.0 (latentcall)
---- **** -----
--- * *** * -- Linux-3.10.0-862.11.6.el7.x86_64-x86_64-with-centos-7.5.1804-Core 2018-10-10 08:49:14
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: jadehare:0x7f575ac00710
- ** ---------- .> transport: redis://127.0.0.1:6379/0
- ** ---------- .> results:
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. appconf.tasks.deploy
. appconf.tasks.getadminstatus
. appconf.tasks.getserverstatus
. appconf.tasks.geturlstatus
. appconf.tasks.rollback
. appconf.tasks.task_appconf_updata
. appconf.tasks.task_nginxconf_updata
. appconf.tasks.task_restartapp
. appconf.tasks.task_stopapp
. cmdb.tasks.add
. cmdb.tasks.mul
. cmdb.tasks.xsum
. jadehare.celery.debug_task
. navi.tasks.add_navi
启动worker
[root@node3 jadehare]# pwd
/mnt/hgfs/jade/jadehare
[root@node3 jadehare]# vi urls.py
urlpatterns = [
url(r'celery_call/',views.celery_call),
url(r'celery_back/',views.celery_back),
django的urls.py
[root@node3 jadehare]# vi views.py
from django.shortcuts import redirect,HttpResponse
from navi import tasks 导入某个app下面的tasks模块文件
from celery.result import AsyncResult
def celery_call(request):
t=tasks.add.delay(10,20)
return HttpResponse(t.id)
def celery_back(request):
task_id = request.GET.get("id")
res = AsyncResult(id=task_id)
if res.ready():
return HttpResponse(res.get())
else:
return HttpResponse(res.ready())
http://192.168.11.136:8000/celery_call/
http://192.168.11.136:8000/celery_back/?id=07bd41b6-432b-497d-9dd1-bda8df775e34
django的views.py
Celery定时任务集成到django
[root@node3 jadehare]# pip install django-celery-beat
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'django_celery_results',
'django_celery_beat'
]
settings.py
配置django
启动beat
每添加或修改一个任务,celery beat都需要重启一次,要不然新的配置不会被celery beat进程读到
[root@node3 jade]# celery -A jadehare beat -l info -S django
启动worker
[root@node3 jade]# celery -A jadehare worker -l info
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhibgcke
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
微信运动停用后别人还能看到步数吗
PHP中文网 07-22 -
excel打印预览压线压字怎么办
PHP中文网 06-22