正在加载中...

浅谈celery实践,目前业界最好的文章

  • 编辑时间: 2022-07-23 11:18:35
  • 浏览量: 737
  • 作者: 追梦人
  • 所有标签: python
  • 文章分类: python后端
  • 评论数: 暂无评论

celery简介

celery是一个基于python开发的。简单、灵活且可靠的分布式任务队列框架,支持使用任务队列的方式在不同的机器上执行任务调度。采用典型的生产者-消费者模型,主要由以下部分组成:

Celery包含如下组件
  • Celery Beat:任务调度器,Beat进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列(一般用于定时任务使用)。
  • Celery Worker:执行任务的消费者,通常会在多台服务器运行多个消费者来提高执行效率。
  • Broker:消息代理,或者叫作消息中间件,接受任务生产者发送过来的任务消息,存进队列再按序分发给任务消费方(本方案使用redis)。
  • Producer:调用了Celery提供的API、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。
  • Result Backend:任务处理完后保存状态信息和结果,以供查询。(本方案使用redis来存储结果)

Celery实现分布式的结构图

celery

linux曾经说过

代码部分

目前演练的使用celery版本5.0.5
> 配置模块

import yaml
from datetime import timedelta
from celery.schedules import crontab
from guigui_app.utils.common import DEFAULT_CONFIG_PATH
from kombu import Queue
from guigui_app.utils.typing_check import typeassert
from guigui_app.utils.web_jump_url import *
from guigui_app.utils.auto_import import auto_import_method

with open(DEFAULT_CONFIG_PATH, 'r', encoding='utf-8') as f:
    env_cfg = f.read()
env_setting = yaml.load(env_cfg, Loader=yaml.FullLoader)
redis_setting = env_setting.get('redis')

broker_url = 'redis://:{password}@{host}:{port}/{broker_db}'.format_map(redis_setting)
result_backend = 'redis://:{password}@{host}:{port}/{backend_db}'.format_map(redis_setting)
imports = ['celery_task.celery_task', ]  # 指定导入的任务模块
accept_content = ['application/json']
timezone = 'Asia/Shanghai'
# 并行数量 (进程数量)
worker_concurrency = 4
# 防止死锁
celeryd_force_execv = True
task_reject_on_worker_lost = True
# 预取任务数
worker_prefetch_multiplier = 1
result_expires = 60 * 60
task_default_queue = 'celery_new_auto_save_order'
# 任务队列

task_queues = (
    # 路由键以“celery_new_auto_save_order.”开头的消息都进default队列
    Queue("default", routing_key="celery_new_auto_save_order.#"),
    # 路由键以“auto.”开头的消息都进tasks_auto队列
    Queue("tasks_auto", routing_key="auto.#")
)

task_routes = (
    [  # 将celery_new_auto_save_order任务分配至队列 default
        ("celery_task.celery_task.celery_new_auto_save_order", {"queue": "default", "priority": 0}),
        # 将tasks_auto任务分配至队列 tasks_auto
        ("celery_task.celery_task.tasks_auto", {"queue": "tasks_auto", "priority": 3})
    ],
)

# 定时任务config
beat_schedule = {
    'celery_task.test': {
        'task': 'celery_task.celery_task.tasks_auto',
        'schedule': 3,
        'args': ('guigui_app.test', 'test', [{'a': 1, 'b': 2}])
    },
    'celery_task.add': {
        'task': 'celery_task.celery_task.tasks_auto',
        'schedule': 6,
        'args': ('celery_task.celery_task', 'add', [{'a': 1, 'b': 2}])

    },
}

> 应用模块

import logging
from celery import Celery
from guigui_app.utils.typing_check import typeassert
from guigui_app.utils.auto_import import auto_import_method

# 实例化celery 对象
app = Celery('tools_server')
# 导入celery 配置
app.config_from_object('celery_task.celery_settings')

logger = logging.getLogger('celery_auto_import')

@app.task
@typeassert(str, str, list)
def tasks_auto(mod: str, method_name: str, method_param: list = None):
    result = auto_import_method(mod, method_name, method_param=method_param)
    return result

定时任务crontab和 timedelta的使用

 - timedelta是datetime中的一个对象需要from datetime import timedelta引入有如下几个参数
    days
    seconds
    microseconds微妙
    milliseconds毫秒
    minutes
    hours小时
- crontab的参数有
    month_of_year月份
    day_of_month日期
    day_of_week
    hour小时
    minute分钟

定时任务参数配置参考

自动加载模块部分

import logging

log = logging.getLogger('auto_import')

__all__ = ['auto_import_method']


def auto_import_method(mod, method_name, method_param: list = None):
    obj = __import__(mod, fromlist=True)
    if hasattr(obj, method_name):
        log.info(f"Start path:{str(mod)} method: {str(method_name)}")
        func = getattr(obj, method_name)
        if method_param is None:
            return func()
        else:
            if isinstance(method_param, list):
                return func(method_param)
            else:
                log.warning(f"Error method param: {str(method_param)} method param type must be list")
                raise TypeError(f"Error method param: {str(method_param)} method param type must be list")
    else:
        log.error(f"error method {str(mod)} {str(method_name)}")
        raise NameError(f"There is no attr {method_name}  in {str(mod)}.py")

使用装饰器进行参数类型检查

from inspect import signature
from functools import wraps


def typeassert(*ty_args, **ty_kwargs):
    def decorate(func):

        sig = signature(func)
        bund_types = sig.bind_partial(*ty_args, **ty_kwargs).arguments

        @wraps(func)
        def wrapper(*args, **kwargs):
            bund_values = sig.bind(*args, **kwargs)
            for name, value in bund_values.arguments.items():
                if name in bund_types:
                    if not isinstance(value, bund_types[name]):
                        raise TypeError('Argument {} must be {}'.format(name, bund_types[name]))
            return func(*args, **kwargs)

        return wrapper

    return decorate

项目结构如图所示

结构图

启动方式

方式一
# 启动定时任务
# celery -A celery_task.celery_task beat --loglevel=INFO
# 启动worker
# celery -A celery_task.celery_task worker --loglevel=INFO

方式二
以守护进程的方式启动
- worker
celery multi stop 6 -A celery_task.celery_task --loglevel=INFO --pidfile=/var/run/celery/%n.pid --logfile=/var/log/celery/%n%I.log
- beat
celery -A celery_task.celery_task beat --detach -l info -f beat.log

方式三
一起启动 
celery -A celery_task.celery_task worker --concurrency=6 --loglevel=INFO -B

启动效果如图所示

方式一启动的效果

启动结果图

方式二启动的效果

守护进程的方式

调用方式

tasks_auto.apply_async(args=(
    'guigui_app.celery_task.add_or_update_room_gltf_data', 'start_gltf_url',
    [rid_str]),
    countdown=1)
或者
tasks_auto.delay(
    'guigui_app.celery_task.add_or_update_room_gltf_data', 'start_gltf_url',
    [rid_str])

附加项

# Celery在执行任务时候,提供了钩子方法用于在任务执行完成时候进行对应的操作
# 在Task源码中提供了很多状态钩子函数如:
#   on_success(成功后执行)
#   on_failure(失败时候执行)
#   on_retry(任务重试时候执行)
#   after_return(任务返回时候执行)
# 在进程中使用是我们只需要重写这些方法,完成相应的操作即可

from celery import Task

logger = get_task_logger(__name__)

class demotask(Task):
    # 任务成功执行
    def on_success(self, retval, task_id, *args, **kwargs):
        logger.info(f'task id:{task_id}, arg:{args}, successful!')

    # 任务失败执行
    def on_failure(self, exc, task_id, *args, **kwargs, einfo):
        logger.info(f'task id:{task_id}, arg:{args}, failed! erros:{exc}')

    # 任务重试执行
    def on_retry(self, exc, task_id, *args, **kwargs, einfo):
        logger.info(f'task id:{task_id}, arg:{args}, retry! einfo:{exc}')

@app.task(base=demotask)

异步调用原理

主要解释 delay 和 apply_async 的使用方法和区别

# --------------------------------------------
# delay源码
# --------------------------------------------
def delay(self, *args, **kwargs):
        """Star argument version of :meth:`apply_async`.

        Does not support the extra options enabled by :meth:`apply_async`.

        Arguments:
            *args (Any): Positional arguments passed on to the task.
            **kwargs (Any): Keyword arguments passed on to the task.
        Returns:
            celery.result.AsyncResult: Future promise.
        """
        return self.apply_async(args, kwargs)
# --------------------------------------------
# apply_async源码
# --------------------------------------------
def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
                    link=None, link_error=None, shadow=None, **options):
        if self.typing:
            try:
                check_arguments = self.__header__
            except AttributeError:  # pragma: no cover
                pass
            else:
                check_arguments(*(args or ()), **(kwargs or {}))

        app = self._get_app()
        if app.conf.task_always_eager:
            with denied_join_result():
                return self.apply(args, kwargs, task_id=task_id or uuid(),
                                  link=link, link_error=link_error, **options)

        if self.__v2_compat__:
            shadow = shadow or self.shadow_name(self(), args, kwargs, options)
        else:
            shadow = shadow or self.shadow_name(args, kwargs, options)

        preopts = self._get_exec_options()
        options = dict(preopts, **options) if options else preopts

        options.setdefault('ignore_result', self.ignore_result)

        return app.send_task(
            self.name, args, kwargs, task_id=task_id, producer=producer,
            link=link, link_error=link_error, result_cls=self.AsyncResult,
            shadow=shadow, task_type=self,
            **options
        )

总结

注意事项 - 如果有启动celery时,报 from . import async, base SyntaxError: invalid syntax 查了资料后了解到是因为celery依赖中的async模块和Python的关键字async存在冲突, 所以在导入时报错。 使用 pip install --upgrade https://github.com/celery/celery/tarball/master来更新一下就行了。 - Celery 4.x Windows版本不支持(目前最新支持celery 3.1.25) - 对于Windows用户启动命令后面应该要加上 -P gevent

参考手册

Celery中文手册

上一篇: 海外便宜的云服务器推荐

下一篇: 一句话对于for...else...语句的理解

提交评论

评论列表

暂无评论

关于本站

1.记录生活

2.建站时间2019-04-05

3.主要技术Django2、Bootstrap...