0%

Celery-动态路由

Pre:

在使用celery的时候,可以根据任务情况(任务类型,任务参数),更灵活的把任务分发到不同的队列里.


任务的去向

在Celery中, 任务的目的是由下列因素决定(按顺序):

  1. task_routes 中定义的路由

  2. Task.apply_async() 方法的路由参数

  3. Task 本身定义的路由相关属性

最佳实践是不写死这些设置,而是通过 Routers 将它作为配置选项;这是最灵活的方式,但是合理的默认值仍然可以设置称任务属性。


手动路由做法:

一般都会在celery的配置文件里静态写死队列和路由即默认值.如下

1
CELERY_QUEUES=(Queue("fingerprint", Exchange("fingerprint"),routing_key="fingerprint"),  # 指纹识别)
1
CELERY_ROUTES={ 'fingerprint_all_scanner.tasks.startDetectUrl': {"queue": "fingerprint", "routing_key": "fingerprint"},}

动态路由做法:

自定义一个Router类

1
2
3
4
5
6
7
8
9
10
11
12
13

class MyRouter(object):
def route_for_task(self, task, args=None, kwargs=None):
print task # 任务类型
print args # 列表任务参数
print kwargs # 键值对任务参数

# 在此处可以自定义代码,可以根据任务参数或者任务类型更灵活的将任务发到不同的队列里
if task.startswith('fingerprint_all_scanner'):
return {"queue": "fingerprint", "routing_key": "fingerprint"}
else:
return None

然后把配置替换为如下即可

1
CELERY_ROUTES=(MyRouter(),)

Refs: