起步
Celery 是一個(gè)簡(jiǎn)單、靈活且可靠的,處理大量消息的分布式系統(tǒng),并且提供維護(hù)這樣一個(gè)系統(tǒng)的必需工具。它是一個(gè)專注于實(shí)時(shí)處理的任務(wù)隊(duì)列,同時(shí)也支持任務(wù)調(diào)度。
運(yùn)行模式是生產(chǎn)者消費(fèi)者模式:
任務(wù)隊(duì)列:任務(wù)隊(duì)列是一種在線程或機(jī)器間分發(fā)任務(wù)的機(jī)制。
消息隊(duì)列:消息隊(duì)列的輸入是工作的一個(gè)單元,稱為任務(wù),獨(dú)立的職程(Worker)進(jìn)程持續(xù)監(jiān)視隊(duì)列中是否有需要處理的新任務(wù)。
Celery 用消息通信,通常使用中間人(Broker)在客戶端和職程間斡旋。這個(gè)過程從客戶端向隊(duì)列添加消息開始,之后中間人把消息派送給職程,職程對(duì)消息進(jìn)行處理。
Celery的架構(gòu)由三部分組成,消息中間件(message broker),任務(wù)執(zhí)行單元(worker)和任務(wù)執(zhí)行結(jié)果存儲(chǔ)(task result store)組成。
消息中間件:Celery本身不提供消息服務(wù),但是可以方便的和第三方提供的消息中間件集成,包括,RabbitMQ, Redis, MongoDB等,本文使用 redis 。
任務(wù)執(zhí)行單元:Worker是Celery提供的任務(wù)執(zhí)行的單元,worker并發(fā)的運(yùn)行在分布式的系統(tǒng)節(jié)點(diǎn)中
任務(wù)結(jié)果存儲(chǔ):Task result store用來(lái)存儲(chǔ)Worker執(zhí)行的任務(wù)的結(jié)果,Celery支持以不同方式存儲(chǔ)任務(wù)的結(jié)果,包括Redis,MongoDB,Django ORM,AMQP等,這里我先不去看它是如何存儲(chǔ)的,就先選用Redis來(lái)存儲(chǔ)任務(wù)執(zhí)行結(jié)果。
安裝
通過 pip 命令即可安裝:
pip install celery
本文使用 redis 做消息中間件,所以需要在安裝:
pip install redis
redis軟件也要安裝,官網(wǎng)只提供了 linux 版本的下載:https://redis.io/download,windows 的可以到 https://github.com/MicrosoftArchive/redis 下載 exe 安裝包。
簡(jiǎn)單的demo
為了運(yùn)行一個(gè)簡(jiǎn)單的任務(wù),從中說(shuō)明 celery 的使用方式。在項(xiàng)目文件夾內(nèi)創(chuàng)建 app.py 和 tasks.py 。tasks.py 用來(lái)定義任務(wù):
# tasks.pyimport timefrom celery import Celerybroker = 'redis://127.0.0.1:6379/1'backend = 'redis://127.0.0.1:6379/2'app = Celery('my_tasks', broker=broker, backend=backend)@app.taskdef add(x, y): print('enter task') time.sleep(3) return x + y
這些代碼做了什么事。 broker 指定任務(wù)隊(duì)列的消息中間件,backend 指定了任務(wù)執(zhí)行結(jié)果的存儲(chǔ)。app 就是我們創(chuàng)建的 Celery 對(duì)象。通過 app.task 修飾器將 add 函數(shù)變成一個(gè)一部的任務(wù)。
# app.pyfrom tasks import addif __name__ == '__main__': print('start task') result = add.delay(2, 18) print('end task') print(result)
add.delay 函數(shù)將任務(wù)序列化發(fā)送到消息中間件。終端執(zhí)行 python app.py 可以看到輸出一個(gè)任務(wù)的唯一識(shí)別:
新聞熱點(diǎn)
疑難解答
圖片精選