• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Celery分布式任务队列基本使用

武飞扬头像
langy1990
帮助1

Celery基本概念

     Broker

           broker是一个消息传输的中间件  每当程序调用celery的异步任务的时候 会向broker传递消息 而后celery的worker将会取到消息

           broker的实现方案有 redis  rabbitmq   数据库

     Backend

          backend是用来存储celery执行任务的当前状态和最终结果

     worker

         具体执行代码的基础单元  负责从rabbitmq或者redis中拉取任务执行 可以在不同的主机上启动worker 实现分布式执行

     beat

         负责定时或者循环的向redis等broker中写入任务 以便让worker从broker中获取任务执行 两者配合实现celery定时任务

Celery基础操作

pip install celery

[root@node3 jade]# yum install redis
[root@node3 jade]# systemctl start redis

[root@node3 jade]# vi tasks.py

from celery import Celery
app=Celery('tasks',broker='redis://localhost',backend='redis://localhost')


@app.task
def add(x,y):
        print("running...",x,y)
        return x y

本机启动worker  tasks代表的当前目录下的tasks.py文件
[root@node3 jade]# celery -A tasks worker --loglevel=info


调用
>>> from tasks import add
>>> add.delay(4,5)
<AsyncResult: f1d84db4-91dc-4603-99af-c3047aaa32c4>
>>> 

t=add.delay(4,5)
t.get()             同步获取结果
t.ready()           检查任务是否完成
t.get(timeout=1)    设置获取结果的超时时间
t.get(propagate=False) 如果执行方法出错 不触发异常 只显示异常结果
t.traceback            输出错误详细信息

View Code

1.在项目的根目录下创建一个目录celery_pro目录
[root@node3 jade]# ls
appconf     comlib      docs           jadehare.log  media         tools
audit       cron        elk            jobs          monitor  requirements.txt
celery_pro  db.sqlite3  jadehare       logcenter     navi     static
cmdb        deploy      jadehare.conf  manage.py     portals  templates
[root@node3 jade]# cd celery_pro/
  2.在celery_pro目录下创建两个py文件
[root@node3 celery_pro]# ls
celery.py  tasks.py

vi celery.py

from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('proj',
             broker='redis://localhost',
             backend='redis://localhost',
             include=['celery_pro.tasks'])

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

vi tasks.py

from __future__ import absolute_import, unicode_literals
from .celery import app

@app.task
def add(x,y):
        print("running...",x,y)
        return x y

@app.task
def mul(x,y):
        print("running jianfa.....",x,y)
        return x-y

@app.task
def jihe(x,y):
        import time
        time.sleep(6)
        print("0099990000000")

启动celery 在项目的根目录下执行  celery_pro表示刚才上面创建的目录
[root@node3 jade]# celery -A celery_pro worker -l info
/root/.pyenv/versions/3.5.3/envs/jade/lib/python3.5/site-packages/celery/platforms.py:795: RuntimeWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!

[tasks]
  . celery_pro.tasks.add
  . celery_pro.tasks.jihe
  . celery_pro.tasks.mul

后台启动celery worker
[root@node3 jade]# celery multi start w1 -A celery_pro -l info
 celery multi v4.1.0 (latentcall)
  > Starting nodes...
  > w1@node3: OK

重启worker
  celery multi restart w1 w2 -A celery_pro

可以在不同主机上启动celery的worker 只需要这些主机都能连接到redis broker和redis backend  这样就实现了celery的分布式执行


调用celery 在项目的根目录下
 [root@node3 ~]# cd /mnt/hgfs/jade
 [root@node3 jade]# python
Python 3.5.3 (default, Sep  2 2018, 15:29:35) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-28)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from celery_pro import tasks
>>> t = tasks.add.delay(2,5)
>>> t.get()
7

celery配置和任务分离

[root@node3 celery_pro]# vi celery.py 
from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('proj',
             broker='redis://localhost',
             backend='redis://localhost',
             include=['celery_pro.tasks','celery_pro.periodic_task'])

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()


[root@node3 celery_pro]# vi periodic_task.py

from __future__ import absolute_import, unicode_literals
from .celery import app

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    #test表示下面定义的函数
    #s('hello') 表示给test函数传递的参数
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')

    # Calls test('world') every 30 seconds
    sender.add_periodic_task(30.0, test.s('world'), expires=10)

    # Executes every Monday morning at 7:30 a.m.
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1),
        test.s('Happy Mondays!'),
    )

@app.task
def test(arg):
    print(arg)


启动beat
[root@node3 jade]# celery -A celery_pro.periodic_task beat -l debug

启动worker
 [root@node3 jade]# celery -A celery_pro worker -l info

 [2018-10-09 19:51:45,939: WARNING/ForkPoolWorker-4] hello
[2018-10-09 19:51:45,951: INFO/MainProcess] Received task: celery_pro.periodic_task.test[f8d493df-a167-40d6-962e-2c74814443d4]   expires:[2018-10-09 11:51:51.135272 00:00]
[2018-10-09 19:51:45,951: INFO/ForkPoolWorker-4] Task celery_pro.periodic_task.test[aa34fad3-7b03-4524-9075-5b3caaf0ae4e] succeeded in 0.0125010800002201s: None

celery定时任务

vi celery.py

app.conf.beat_schedule = {
    'add-every-5-seconds': {
        'task': 'celery_pro.tasks.add',
        'schedule': 5.0,
        'args': (16, 16)
    },
}



vi celery.py
from celery.schedules import crontab
 
app.conf.beat_schedule = {
    # Executes every Monday morning at 7:30 a.m.
    'add-every-monday-morning': {
        'task': 'celery_pro.tasks.add',
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
        'args': (16, 16),
    },
}


重启beat
  celery -A celery_pro.periodic_task beat -l debug

不需要重启worker便可直接生效

复杂定时任务配置

Celery普通task集成到django

[root@node3 jadehare]# vi settings.py
#Celery 设置
CELERY_ENABLE_UTC = False
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_BACKEND = 'django-db'
#CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/1'
CELERY_TIMEZONE = 'Asia/Shanghai'
#防止内存溢出
CELERYD_MAX_TASKS_PER_CHILD = 3


INSTALLED_APPS = [
        'django_celery_results',
        'django_celery_beat',
]

settings.py

[root@node3 jadehare]# ls
celery.py  __init__.py   settings.py   urls.py  views.py  wsgi.py
[root@node3 jadehare]# vi celery.py 
#! /usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
#import configparser

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'jadehare.settings')
app = Celery('jadehare')

# redis connect code
#BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
#config = ConfigParser.ConfigParser()
#config.read(os.path.join(BASE_DIR, 'adminset.conf'))
#redis_host = config.get('redis', "redis_host")
#redis_port = config.get("redis", "redis_port")
#redis_db = config.get('redis', "redis_db")
#redis_password = config.get('redis', "redis_password")
#redis_port = config.get("redis", "redis_port")
#redis_db = config.get('redis', "redis_db")
#redis_password = config.get('redis', "redis_password")
from celery import Celery
#import configparser

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'jadehare.settings')
app = Celery('jadehare')

# redis connect code
#BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
#config = ConfigParser.ConfigParser()
#config.read(os.path.join(BASE_DIR, 'adminset.conf'))
#redis_host = config.get('redis', "redis_host")
#redis_port = config.get("redis", "redis_port")
#redis_db = config.get('redis', "redis_db")
#redis_password = config.get('redis', "redis_password")
#if redis_password:
#    app.conf.broker_url = 'redis://:{0}@{1}:{2}/{3}'.format(redis_password, redis_host, redis_port, redis_db)
#else:
#    app.conf.broker_url = 'redis://{0}:{1}/{2}'.format(redis_host, redis_port, redis_db)

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
#app.conf.broker_transport_options = {'visibility_timeout': 3600}
#app.conf.result_backend = 'redis://127.0.0.1:6379/0'
app.config_from_object('django.conf:settings', namespace='CELERY')


# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

celery.py

[root@node3 jadehare]# vi __init__.py 

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)

init.py

在任何一个app下添加tasks.py文件 这个文件名称必须叫tasks.py
[root@node3 jade]# pwd
/mnt/hgfs/jade
[root@node3 jade]# cd navi/
[root@node3 navi]# vi tasks.py 

from __future__ import absolute_import, unicode_literals
from celery import shared_task


@shared_task
def add_navi(x, y):
    return x   y

tasks.py

[root@node3 jade]# pwd
/mnt/hgfs/jade
[root@node3 jade]# celery -A jadehare worker -l info
/root/.pyenv/versions/3.5.3/envs/jade/lib/python3.5/site-packages/celery/platforms.py:795: RuntimeWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!

Please specify a different user using the -u option.

User information: uid=0 euid=0 gid=0 egid=0

  uid=uid, euid=euid, gid=gid, egid=egid,
 
 -------------- celery@node3 v4.1.0 (latentcall)
---- **** ----- 
--- * ***  * -- Linux-3.10.0-862.11.6.el7.x86_64-x86_64-with-centos-7.5.1804-Core 2018-10-10 08:49:14
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         jadehare:0x7f575ac00710
- ** ---------- .> transport:   redis://127.0.0.1:6379/0
- ** ---------- .> results:     
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . appconf.tasks.deploy
  . appconf.tasks.getadminstatus
  . appconf.tasks.getserverstatus
  . appconf.tasks.geturlstatus
  . appconf.tasks.rollback
  . appconf.tasks.task_appconf_updata
  . appconf.tasks.task_nginxconf_updata
  . appconf.tasks.task_restartapp
  . appconf.tasks.task_stopapp
  . cmdb.tasks.add
  . cmdb.tasks.mul
  . cmdb.tasks.xsum
  . jadehare.celery.debug_task
  . navi.tasks.add_navi

启动worker

[root@node3 jadehare]# pwd
/mnt/hgfs/jade/jadehare

[root@node3 jadehare]# vi urls.py 

urlpatterns = [
    url(r'celery_call/',views.celery_call),
    url(r'celery_back/',views.celery_back),

django的urls.py

[root@node3 jadehare]# vi views.py 

from django.shortcuts import redirect,HttpResponse
from navi import tasks  导入某个app下面的tasks模块文件
from celery.result import AsyncResult


def celery_call(request):
        t=tasks.add.delay(10,20)
        return HttpResponse(t.id)

def celery_back(request):
        task_id = request.GET.get("id")
        res = AsyncResult(id=task_id)
        if res.ready():
                return HttpResponse(res.get())
        else:
                return HttpResponse(res.ready())



http://192.168.11.136:8000/celery_call/
http://192.168.11.136:8000/celery_back/?id=07bd41b6-432b-497d-9dd1-bda8df775e34

django的views.py

Celery定时任务集成到django

     [root@node3 jadehare]# pip install django-celery-beat

INSTALLED_APPS = [
        'django.contrib.admin',
        'django.contrib.auth',
        'django.contrib.contenttypes',
        'django.contrib.sessions',
        'django.contrib.messages',
        'django.contrib.staticfiles',
        'django_celery_results',
        'django_celery_beat'
]

settings.py

配置django

学新通

学新通

   

学新通

学新通

启动beat 

   每添加或修改一个任务,celery beat都需要重启一次,要不然新的配置不会被celery beat进程读到

   [root@node3 jade]# celery -A jadehare beat -l info -S django

启动worker

   [root@node3 jade]# celery -A jadehare worker -l info

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhibgcke
系列文章
更多 icon
同类精品
更多 icon
继续加载