浅谈celery实践,目前业界最好的文章
- 编辑时间: 2022-07-23 11:18:35
- 浏览量: 737
- 作者: 追梦人
- 文章分类: python后端
- 评论数: 暂无评论
celery简介
celery是一个基于python开发的。简单、灵活且可靠的分布式任务队列框架,支持使用任务队列的方式在不同的机器上执行任务调度。采用典型的生产者-消费者模型,主要由以下部分组成:
Celery包含如下组件
- Celery Beat:任务调度器,Beat进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列(一般用于定时任务使用)。
- Celery Worker:执行任务的消费者,通常会在多台服务器运行多个消费者来提高执行效率。
- Broker:消息代理,或者叫作消息中间件,接受任务生产者发送过来的任务消息,存进队列再按序分发给任务消费方(本方案使用redis)。
- Producer:调用了Celery提供的API、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。
- Result Backend:任务处理完后保存状态信息和结果,以供查询。(本方案使用redis来存储结果)
Celery实现分布式的结构图
代码部分
目前演练的使用celery版本:5.0.5 > 配置模块 import yaml from datetime import timedelta from celery.schedules import crontab from guigui_app.utils.common import DEFAULT_CONFIG_PATH from kombu import Queue from guigui_app.utils.typing_check import typeassert from guigui_app.utils.web_jump_url import * from guigui_app.utils.auto_import import auto_import_method with open(DEFAULT_CONFIG_PATH, 'r', encoding='utf-8') as f: env_cfg = f.read() env_setting = yaml.load(env_cfg, Loader=yaml.FullLoader) redis_setting = env_setting.get('redis') broker_url = 'redis://:{password}@{host}:{port}/{broker_db}'.format_map(redis_setting) result_backend = 'redis://:{password}@{host}:{port}/{backend_db}'.format_map(redis_setting) imports = ['celery_task.celery_task', ] # 指定导入的任务模块 accept_content = ['application/json'] timezone = 'Asia/Shanghai' # 并行数量 (进程数量) worker_concurrency = 4 # 防止死锁 celeryd_force_execv = True task_reject_on_worker_lost = True # 预取任务数 worker_prefetch_multiplier = 1 result_expires = 60 * 60 task_default_queue = 'celery_new_auto_save_order' # 任务队列 task_queues = ( # 路由键以“celery_new_auto_save_order.”开头的消息都进default队列 Queue("default", routing_key="celery_new_auto_save_order.#"), # 路由键以“auto.”开头的消息都进tasks_auto队列 Queue("tasks_auto", routing_key="auto.#") ) task_routes = ( [ # 将celery_new_auto_save_order任务分配至队列 default ("celery_task.celery_task.celery_new_auto_save_order", {"queue": "default", "priority": 0}), # 将tasks_auto任务分配至队列 tasks_auto ("celery_task.celery_task.tasks_auto", {"queue": "tasks_auto", "priority": 3}) ], ) # 定时任务config beat_schedule = { 'celery_task.test': { 'task': 'celery_task.celery_task.tasks_auto', 'schedule': 3, 'args': ('guigui_app.test', 'test', [{'a': 1, 'b': 2}]) }, 'celery_task.add': { 'task': 'celery_task.celery_task.tasks_auto', 'schedule': 6, 'args': ('celery_task.celery_task', 'add', [{'a': 1, 'b': 2}]) }, } > 应用模块 import logging from celery import Celery from guigui_app.utils.typing_check import typeassert from guigui_app.utils.auto_import import auto_import_method # 实例化celery 对象 app = Celery('tools_server') # 导入celery 配置 app.config_from_object('celery_task.celery_settings') logger = logging.getLogger('celery_auto_import') @app.task @typeassert(str, str, list) def tasks_auto(mod: str, method_name: str, method_param: list = None): result = auto_import_method(mod, method_name, method_param=method_param) return result
定时任务crontab和 timedelta的使用
- timedelta是datetime中的一个对象,需要from datetime import timedelta引入,有如下几个参数 days:天 seconds:秒 microseconds:微妙 milliseconds:毫秒 minutes:分 hours:小时 - crontab的参数有: month_of_year:月份 day_of_month:日期 day_of_week:周 hour:小时 minute:分钟
自动加载模块部分
import logging log = logging.getLogger('auto_import') __all__ = ['auto_import_method'] def auto_import_method(mod, method_name, method_param: list = None): obj = __import__(mod, fromlist=True) if hasattr(obj, method_name): log.info(f"Start path:{str(mod)} method: {str(method_name)}") func = getattr(obj, method_name) if method_param is None: return func() else: if isinstance(method_param, list): return func(method_param) else: log.warning(f"Error method param: {str(method_param)} method param type must be list") raise TypeError(f"Error method param: {str(method_param)} method param type must be list") else: log.error(f"error method {str(mod)} {str(method_name)}") raise NameError(f"There is no attr {method_name} in {str(mod)}.py")
使用装饰器进行参数类型检查
from inspect import signature from functools import wraps def typeassert(*ty_args, **ty_kwargs): def decorate(func): sig = signature(func) bund_types = sig.bind_partial(*ty_args, **ty_kwargs).arguments @wraps(func) def wrapper(*args, **kwargs): bund_values = sig.bind(*args, **kwargs) for name, value in bund_values.arguments.items(): if name in bund_types: if not isinstance(value, bund_types[name]): raise TypeError('Argument {} must be {}'.format(name, bund_types[name])) return func(*args, **kwargs) return wrapper return decorate
项目结构如图所示
启动方式
方式一 # 启动定时任务 # celery -A celery_task.celery_task beat --loglevel=INFO # 启动worker # celery -A celery_task.celery_task worker --loglevel=INFO 方式二 以守护进程的方式启动 - worker celery multi stop 6 -A celery_task.celery_task --loglevel=INFO --pidfile=/var/run/celery/%n.pid --logfile=/var/log/celery/%n%I.log - beat celery -A celery_task.celery_task beat --detach -l info -f beat.log 方式三 一起启动 celery -A celery_task.celery_task worker --concurrency=6 --loglevel=INFO -B
启动效果如图所示
方式一启动的效果
方式二启动的效果
调用方式
tasks_auto.apply_async(args=( 'guigui_app.celery_task.add_or_update_room_gltf_data', 'start_gltf_url', [rid_str]), countdown=1) 或者 tasks_auto.delay( 'guigui_app.celery_task.add_or_update_room_gltf_data', 'start_gltf_url', [rid_str])
附加项
# Celery在执行任务时候,提供了钩子方法用于在任务执行完成时候进行对应的操作 # 在Task源码中提供了很多状态钩子函数如: # on_success(成功后执行) # on_failure(失败时候执行) # on_retry(任务重试时候执行) # after_return(任务返回时候执行) # 在进程中使用是我们只需要重写这些方法,完成相应的操作即可 from celery import Task logger = get_task_logger(__name__) class demotask(Task): # 任务成功执行 def on_success(self, retval, task_id, *args, **kwargs): logger.info(f'task id:{task_id}, arg:{args}, successful!') # 任务失败执行 def on_failure(self, exc, task_id, *args, **kwargs, einfo): logger.info(f'task id:{task_id}, arg:{args}, failed! erros:{exc}') # 任务重试执行 def on_retry(self, exc, task_id, *args, **kwargs, einfo): logger.info(f'task id:{task_id}, arg:{args}, retry! einfo:{exc}') @app.task(base=demotask)
异步调用原理
主要解释 delay 和 apply_async 的使用方法和区别
# -------------------------------------------- # delay源码 # -------------------------------------------- def delay(self, *args, **kwargs): """Star argument version of :meth:`apply_async`. Does not support the extra options enabled by :meth:`apply_async`. Arguments: *args (Any): Positional arguments passed on to the task. **kwargs (Any): Keyword arguments passed on to the task. Returns: celery.result.AsyncResult: Future promise. """ return self.apply_async(args, kwargs) # -------------------------------------------- # apply_async源码 # -------------------------------------------- def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, link=None, link_error=None, shadow=None, **options): if self.typing: try: check_arguments = self.__header__ except AttributeError: # pragma: no cover pass else: check_arguments(*(args or ()), **(kwargs or {})) app = self._get_app() if app.conf.task_always_eager: with denied_join_result(): return self.apply(args, kwargs, task_id=task_id or uuid(), link=link, link_error=link_error, **options) if self.__v2_compat__: shadow = shadow or self.shadow_name(self(), args, kwargs, options) else: shadow = shadow or self.shadow_name(args, kwargs, options) preopts = self._get_exec_options() options = dict(preopts, **options) if options else preopts options.setdefault('ignore_result', self.ignore_result) return app.send_task( self.name, args, kwargs, task_id=task_id, producer=producer, link=link, link_error=link_error, result_cls=self.AsyncResult, shadow=shadow, task_type=self, **options )
总结
注意事项 - 如果有启动celery时,报 from . import async, base SyntaxError: invalid syntax 查了资料后了解到是因为celery依赖中的async模块和Python的关键字async存在冲突, 所以在导入时报错。 使用
pip install --upgrade https://github.com/celery/celery/tarball/master
来更新一下就行了。 - Celery 4.x Windows版本不支持(目前最新支持celery 3.1.25) - 对于Windows用户启动命令后面应该要加上 -P gevent
参考手册
上一篇: 海外便宜的云服务器推荐
提交评论