RabbitMQ+Celery实现分布式消息队列

6/2/2021 pythonrabbitmqcelery

# 环境搭建

  • 系统:macOS 11.3.1
  • 芯片:Apple M1「ARM64」

# RabbitMQ

  • 使用homebrew安装
  • brew install rabbitmq
  • 找到安装目录将其加入环境变量「修改.zshrc文件」
  • nano ~/.zshrc
# RabbitMQ
export PATH="$PATH:/opt/homebrew/Cellar/rabbitmq/3.8.16/sbin"
source ~/.bash_profile
  • 重启终端,使配置生效
  • 配置RabbitMQ
# 添加用户跟密码
$ rabbitmqctl add_user test test123
# 添加虚拟主机
$ rabbitmqctl add_vhost test_vhost
# 为用户添加标签「给test设置标签test_tag」
$ rabbitmqctl set_user_tags test test_tag
# 设置用户权限「这里是表示用户test将拥有所有配置、写入和读取权限」
$ rabbitmqctl set_permissions -p test_vhost test ".*" ".*" ".*"

# Celery

  • 安装 pip install celery
  • 创建一个Demo,项目结构如下图「__init__.py文件也很重要,不可缺」
  • image.png
  • celery.py
from __future__ import absolute_import
from celery import Celery

app = Celery('test_celery',
broker='amqp://test:test123@localhost/test_vhost',
backend='rpc://',
include=['test_celery.tasks'])
  • run_tasks.py
from .tasks import add_longtime
import time

if __name__ == '__main__':
    result = add_longtime.delay(1,2)
 #此时,任务还未完成,它将返回False
    print('Task finished? ', result.ready())
    print('Task result: ', result.result)
    # 延长到10秒以确保任务已经完成
    time.sleep(10)
    # 现在任务完成,ready方法将返回True
    print('Task finished? ', result.ready())
    print('Task result: ', result.result)

  • tasks.py
from __future__ import absolute_import
from test_celery.celery import app
import time

@app.task
def add_longtime(a, b):
    print('long time task begins')
    # sleep 5 seconds
    time.sleep(5)
    print('long time task finished')
    return a + b
  • 命令行运行celery -A test_celery worker --loglevel=info,启动成功则可以看到以下输出
  • image.png
  • 项目内执行python -m test_celery.run_tasks
  • celery接收到消息
  • image.png
  • 代码执行结果,先输出false,10秒后获得结果
  • image.png

# flower 实时监控

  • 安装 pip install flower
  • 启动 celery -A test_celery flower
  • image.png
  • 可以看到运行状态