Github地址:https://github.com/pricingassistant/mrq
在Web应用开发中,异步任务处理是提升系统性能和用户体验的关键技术。MRQ(MongoDB Redis Queue)是一个基于MongoDB和Redis构建的Python分布式任务队列库,专为高并发、大规模的任务处理场景而设计。与传统的Celery等任务队列相比,MRQ提供了更简洁的API、更好的监控界面和更高的性能表现。它特别适用于需要处理大量异步任务、批量数据处理和定时任务调度的应用场景,是构建可扩展Python应用的重要工具。
安装
1、安装方法
MRQ依赖MongoDB和Redis,因此需要先安装这些服务。
使用pip安装MRQ:
# 安装MRQ
pip install mrq
# 或安装包含所有依赖的完整版本
pip install mrq[all]
2、验证安装
安装完成后,可以通过以下步骤验证安装是否成功:
import mrq
from mrq.queue import Queue
# 检查MRQ版本
print(f"MRQ版本: {mrq.__version__}")
# 测试基本连接
queue = Queue()
print("MRQ安装验证成功")
主要特性
-
分布式架构:支持多机器部署,可水平扩展处理能力
-
实时监控面板:内置Web界面,可视化监控任务执行状态和性能指标
-
高性能设计:基于gevent异步处理,单个worker可并发处理数千个任务
-
任务重试机制:自动重试失败任务,支持指数退避和自定义重试策略
-
任务优先级:支持设置任务优先级,确保重要任务优先执行
-
定时任务支持:内置cron风格的定时任务调度功能
-
结果存储:任务结果自动存储在MongoDB中,便于查询和分析
-
错误处理:完善的错误追踪和日志记录机制
基本功能
1、定义和执行任务
在MRQ中定义任务非常简单,只需使用装饰器即可。任务函数可以接收任意参数,并且MRQ会自动处理参数的序列化和反序列化。
from mrq.task import Task
from mrq.queue import Queue
"user.send_email")
(def send_email(email, subject, content):
"""发送邮件任务"""
import time
print(f"正在发送邮件到 {email}")
time.sleep(2) # 模拟邮件发送耗时
print(f"邮件发送成功: {subject}")
return {"status": "sent", "email": email}
# 提交任务到队列
queue = Queue()
job = queue.enqueue("user.send_email",
"user@example.com",
"欢迎注册",
"感谢您注册我们的服务")
print(f"任务ID: {job.id}")
2、启动Worker进程
Worker是执行任务的工作进程,MRQ支持启动多个Worker来并行处理任务。通过命令行可以轻松启动Worker进程,并且可以指定处理的队列、并发数量等参数。
# 通过命令行启动worker(在终端中执行)
# mrq-worker --queues default --processes 4 --greenlets 200
# 或者在代码中启动worker
from mrq.worker import Worker
def start_worker():
worker = Worker(
queues=["default", "emails"],
processes=2,
greenlets=100
)
worker.start()
# 在生产环境中调用
if __name__ == "__main__":
start_worker()
3、监控任务状态
MRQ提供了完善的任务状态监控功能,开发者可以通过编程方式查询任务执行情况,也可以使用内置的Web监控面板。任务状态包括排队中、执行中、已完成、失败等多种状态。
from mrq.queue import Queue
from mrq.job import Job
queue = Queue()
# 提交任务并获取任务对象
job = queue.enqueue("user.send_email", "test@example.com", "测试", "内容")
# 检查任务状态
print(f"任务状态: {job.status}")
print(f"任务ID: {job.id}")
# 等待任务完成并获取结果
result = job.wait()
print(f"任务结果: {result}")
# 启动Web监控界面(命令行执行)
# mrq-dashboard --port 5555
高级功能
1、任务重试和错误处理
MRQ提供了强大的任务重试机制,可以自动处理失败的任务。开发者可以自定义重试次数、重试间隔和重试条件,确保临时性错误不会导致任务永久失败。
from mrq.task import Task
from mrq.exceptions import RetryLater
import requests
"api.fetch_data", retry_max=3, retry_delay=60)
(def fetch_external_data(url):
"""获取外部API数据,支持重试"""
try:
response = requests.get(url, timeout=10)
if response.status_code == 200:
return response.json()
elif response.status_code >= 500:
# 服务器错误,重试
raise RetryLater("服务器错误,稍后重试")
else:
# 客户端错误,不重试
raise Exception(f"请求失败: {response.status_code}")
except requests.RequestException as e:
# 网络错误,重试
raise RetryLater(f"网络错误: {str(e)}")
# 使用任务
queue.enqueue("api.fetch_data", "https://api.example.com/data")
2、定时任务调度
MRQ支持cron风格的定时任务调度,可以设置周期性执行的任务,定时任务的配置灵活,支持复杂的时间表达式。
from mrq.scheduler import Scheduler
from mrq.task import Task
"maintenance.cleanup")
(def cleanup_old_logs():
"""清理旧日志文件"""
import os
import time
log_dir = "/var/log/myapp"
cutoff_time = time.time() - (7 * 24 * 3600) # 7天前
for filename in os.listdir(log_dir):
filepath = os.path.join(log_dir, filename)
if os.path.getctime(filepath) < cutoff_time:
os.remove(filepath)
print(f"删除旧日志: {filename}")
return {"cleaned_files": "completed"}
# 配置定时任务
scheduler_config = {
"maintenance.cleanup": {
"cron": "0 2 * * *", # 每天凌晨2点执行
"params": {}
},
"reports.daily": {
"cron": "0 9 * * 1-5", # 工作日上午9点执行
"params": {"report_type": "daily"}
}
}
# 启动调度器(命令行执行)
# mrq-scheduler --config scheduler_config.json
总结
MRQ作为一个现代化的分布式任务队列解决方案,在Python生态系统中占据了重要位置。其基于MongoDB和Redis的架构设计,为高并发场景提供了出色的性能表现。相比传统的任务队列系统,MRQ的API更加简洁直观,监控功能更加完善,部署和维护也更加便捷。从基础的任务定义和执行,到高级的重试机制和定时调度,MRQ都提供了完整的解决方案。其分布式架构支持水平扩展,能够轻松应对业务增长带来的性能需求。内置的Web监控面板让任务状态一目了然,大大简化了运维工作。
AI工具的成熟,让程序员也有了以前不敢想象的能力。海外市场的广阔,给了我们更大的舞台。
如果你也在考虑新的出路,如果你也想尝试AI编程出海这个方向,欢迎加入我们。
扫码或搜索 257735 添加微信,发送暗号「美金」,了解详细信息。
文章评论