0%

Celery-任务调度框架实践

celery架构图:

  1. Celery Beat:任务调度器,Beat进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。

  2. Celery Worker:执行任务的消费者,通常会在多台服务器运行多个消费者来提高执行效率。

  3. Broker:消息代理,或者叫作消息中间件,接受任务生产者发送过来的任务消息,存进队列再按序分发给任务消费方(通常是消息队列或者数据库)。

  4. Producer:调用了Celery提供的API、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。

  5. Result Backend:任务处理完后保存状态信息和结果,以供查询。Celery默认已支持Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等方式。


初始化:

1.运行celery

mac环境下的celery一下载就能直接运行。但是centos6.9 环境下好像下载之后并不能直接运行,估计是环境变量没有配好。需要加上celery的安装路径。使用/usr/local/bin/celery -A your_app worker --loglevel=info


2.以守护进程运行celery:

需要一个初始化脚本:celeryd

  • 使用方法:/etc/init.d/celeryd {start|stop|restart|status}

  • 配置文件:/etc/default/celeryd

参考:


3.使用redis:

mac下安装redis

  • brew install redis

  • 如果需要后台运行 redis 服务,使用命令 brew services start redis

  • 如果不需要后台服务,则使用命令 redis-server /usr/local/etc/redis.conf。

  • mac os 安装 redis

安装redis后,启动时指定配置文件

  • redis-server ./redis.conf

检测后台进程是否存在

  • ps -ef |grep redis

使用分布式时,其他worker机子无法访问redis去取任务:


celery基本操作命令:

参考:


celery worker:

1.启动worker:

  • export PYTHONOPTIMIZE=1 && /usr/local/bin/celery -A your_app worker --loglevel=debug --workdir=/your_dir/your_dir/your_dir/


2.停止worker:

ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill -9


3.在celery中使用多进程:

我用的是from multiprocessing import Pool来实现多进程。但是在运行过程中会直接报出这个错误
AssertionError: daemonic processes are not allowed to have children

解决方法:

  • 重写一个Mypoolhttps://stackoverflow.com/questions/6974695/python-process-pool-non-daemonic

  • 设置环境变量
    export PYTHONOPTIMIZE=1

由于过段时间就会失效。。所以每次启动worker的时候的时候:
export PYTHONOPTIMIZE=1 && /usr/local/bin/celery -A your_app worker --loglevel=debug --workdir=/your_dir/your_dir/your_dir/

以下 的方法没试过:

there are two method to solve this problem ,disable assert:
1.where celery starts set export PYTHONOPTIMIZE=1 OR start celery with this parameter -O OPTIMIZATION
2.disable python packet multiprocessing process.py line 102:
assert not _current_process._config.get(‘daemon’), \ ‘daemonic processes are not allowed to have children’


4.调用worker、添加任务:

delay()和apply_async()

我们之前调用任务使用了”delay()”方法,它其实是对”apply_async()”方法的封装,使得你只要传入任务所需的参数即可。对于特殊的任务调度需求,你需要使用”apply_async()”,其常用的参数有:

  • countdown: 指定多少秒后任务才被执行

  • eta: 指定任务被调度的时间,参数类型是datetime

  • expires: 任务过期时间,参数类型可以是int(秒),也可以是datetime

  • retry: 任务发送失败的重试次数

  • priority: 任务优先级,范围是0-9

  • serializer: 参数和返回值的序列化方式


celery beat 定时任务

一个时间的Bug:

  • 当前使用pip安装的celery,默认是安装的最新版本4.1.0,但是在这个版本中在获取当前时间的逻辑中存在bug,会导致定时任务配置后并不能在指定的时间被执行

  • 回退版本 到4.0.2才行

  • celery 4.1.0 版本定时任务执行时间 bug

另一种可行但麻烦的思路:

celery的定时任务会有一定时间的延迟。比如,我规定模拟登陆新浪微博任务每隔10个小时执行一次,那么定时任务第一次执行就会在开启定时任务之后的10个小时后才会执行。而我抓取微博需要马上执行,需要带上cookie,所以不能等那1个小时。这个没有一个比较好的解决方法,可以使用celery的crontab()来代替schdule做定时,它会在启动的时候就执行。我采用的方法是第一次手动执行该任务,然后再通过schedule执行。


使用命令:

进入到对应your_app对应的目录下:cd /your_dir/your_dir/your_dir再执行:/usr/local/bin/celery -A your_app beat -l info

最方便的是,在命令中指定工作目录,一条命令即可:

/usr/local/bin/celery -A your_dir beat -l info --workdir=/your_dir/your_dir/your_dir/


定时任务参数参考表:


动态管理定时任务:

查过挺多资料,有两种解决方法。跟celery运行的调度器(schedule)息息相关的。

  • 使用第三方schedule:如django-celery-beat库会将定时任务的规则存入到数据库中,而不用通过配置文件来定义。

try to install django-celery instead of django-celery-beat. django-celery works with Celery 3 (unlike django-celery-beat). You can then, for example import PeriodicTask from djcelery.models instead of from django_celery_beat.models . This allows you to add/delete/manipulate tasks both dynamically AND PROGRAMMATICALLY (not only from the Django admin site). The drawback to this workaround is that if one doesn’t need django-celery for anything other than this, then it bloats one’s app. Thus, It would be better to have Celery 4.0 included in cookiecutter-django so that django-celery-beat models can be used

【celery进阶】定时任务和优先级

  • 使用框架默认schedule

我使用的是这一种方法。只能管理celery的配置文件了,每次增加或减少定时任务的时候,都要对配置文件进行相应的修改。每次修改完都要重启celerybeat,试过多种方法,发现用supervisor来管理celery beat的进程是比较好的。


管理celery beat进程

用supervisor来管理celery beat进程。

  • 安装:pip install supervisor

  • Supervisor配置

  • /usr/local/bin/echo_supervisord_conf > /etc/supervisord.conf

  • 操作:

  • 开启:supervisord -c /etc/supervisord.conf

  • 重启:supervisorctl -c /etc/supervisord.conf reload

  • 关闭:supervisorctl -c /etc/supervisord.conf shutdown


other:

分布式:


队列:


flower:

图形化管理celery界面:

/usr/local/bin/celery -A celery_app flower --port=5555


参考: