正在加载中...

celery分布式设计

  • 编辑时间: 2019-09-18 21:30:44
  • 浏览量: 1980
  • 作者: makerblog
  • 所有标签: 服务器 python
  • 文章分类: python后端
  • 评论数: 1

Celery的架构

  • Celery包含如下组件:
  • Celery Beat:任务调度器,Beat进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列(一般用于定时任务使用)。
  • Celery Worker:执行任务的消费者,通常会在多台服务器运行多个消费者来提高执行效率。
  • Broker:消息代理,或者叫作消息中间件,接受任务生产者发送过来的任务消息,存进队列再按序分发给任务消费方(本方案使用redis)。
  • Producer:调用了Celery提供的API、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。
  • Result Backend:任务处理完后保存状态信息和结果,以供查询。(本方案使用redis来存储结果)

Celery的架构图如图所示。

celery架构图

根据celery架构图将分布式设计方案如下。

  1. 任务发布者:产品在后台添加股票完成之后,点击回测,然后分发任务都不同的机器。
  2. 任务调度:按照设定的时间调用delay方法。
  3. 消息代理:使用redis来存储股票代码。每次执行的股票代码都是从redis读取。
  4. 任务消费者:在不同的机器上开启worker,执行调度的股票进行回测。
  5. 回测结果:存放在redis中,然后读取出渲染在后台上展示出来。

需要用到

from kombu import Queue
from flask import Flask

部分代码如下:

CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_QUEUES = (  # 定义任务队列
    Queue("default", routing_key="distributed.#"),
    Queue("tasks_A", routing_key="A.#"),
    Queue("tasks_B", routing_key="B.#"), 
)

CELERY_ROUTES = (
    [
        ("web_management.web.trade.distributed.add", {"queue": "default"}),  
        ("web_management.web.trade.distributed.taskA", {"queue": "tasks_A"}), 
        ("web_management.web.trade.distributed.taskB", {"queue": "tasks_B"}), 
    ],
)

CELERY_RESULT_SERIALIZER = "json"  

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://' + ppt.redis_ip + ':' + ppt.redis_port + '/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://' + ppt.redis_ip + ':' + ppt.redis_port + '/1'
app.config['CELERY_QUEUES'] = CELERY_QUEUES
app.config['CELERY_TIMEZONE'] = CELERY_TIMEZONE
app.config['CELERY_ROUTES'] = CELERY_ROUTES
app.config['CELERY_RESULT_SERIALIZER'] = CELERY_RESULT_SERIALIZER
app.config['CELERY_TASK_RESULT_EXPIRES'] = CELERY_TASK_RESULT_EXPIRES
celery = Celery('distributed', backend=app.config['CELERY_RESULT_BACKEND'], broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

开启work的方式

celery worker -A web_management.web.trade.distributed.celery -Q tasks_A --concurrency=30 -l info -Q后面为方法对应的队列名称 --concurrency= 后面 为开启的worker数目,也可以使用 -c= 具体开启是数目根据你的电脑CPU个数确定,小于等于cpu个数即可

备注

  • celery 使用4.1.1版本
  • kombu 使用4.2.0版本 *先安装kombu,然后安装celery
  • 可以解决如图问题
  • 版本选择

上一篇: Git协同工作流

下一篇: git的基本使用

提交评论

评论列表

makerblog (2019-12-08 18:30:04):
96

回复

关于本站

1.记录生活

2.建站时间2019-04-05

3.主要技术Django2、Bootstrap...