昨天我們都已經安裝好要使用的redis & celery,今天就是要來詳細介紹基本的一些功能和流程,馬上來給大家看看範例
from flask import Flask
from flask_restx import Api, Resource, fields
from celery import Celery
# create flask instance
app = Flask(__name__)
#create celery instance
celery = Celery(app, broker='redis://localhost:6379',
backend='redis://localhost:6379')
# create flask_restx instance
api = Api(app, version='0.0.1',
title='2022鐵人賽', doc='/api/doc')
add_ns = api.namespace("add_namespace", description='2022鐵人賽_Namespace')
add_payload = add_ns.model('數字加總', {
'number one': fields.Integer(default=1),
'number two': fields.Integer(default=2)
})
add_output = add_ns.model('數字加總結果', {
'number total': fields.Integer(default=3)
})
@add_ns.route('/add')
class Add(Resource):
@add_ns.expect(add_payload)
@add_ns.marshal_with(add_output)
def post(self):
data = add_ns.payload
result = add.delay(data)
# result = add.apply_async([data])
return result
@celery.task()
def add(data):
x = data["number one"]
y = data["number two"]
return x+y
api.add_namespace(add_ns)
if __name__ == '__main__':
app.run(debug=True)
除了前面的flask & flask_restx的流程,首先我在最前面加上了新建celery實例
#create celery instance
celery = Celery(app, broker='redis://localhost:6379',
backend='redis://localhost:6379')
同樣是使用flask的實例作為第一個參數,然後在後方設定celery的 broker & backend
再來是對api的部分做了小修改
@add_ns.route('/add')
class Add(Resource):
@add_ns.expect(add_payload)
@add_ns.marshal_with(add_output)
def post(self):
data = add_ns.payload
result = add.delay(data) # result = add.apply_async([data])
return result
@celery.task()
def add(data):
x = data["number one"]
y = data["number two"]
return x+y
我們可以透過使用add.delay(data)
或者 add.apply_async([data])
來發送一個任務,發送之後,broker接到就會交給worker做@celery.task()
這個裝飾器底下的程式碼。
大致上的流程就是這樣,接著我們馬上就來試著啟動看看,首先我們要檢查flask & redis & celery服務都有啟動
celery -A app.celery worker --loglevel=info
這邊app.celery是我celery實例的位置,有些人也會將Celery取名字,方便下指令都確認啟動之後,我們就可以回到swagger的頁面,嘗試Try it out,看看是不是真的能接到task
按下Execute,一樣可以在我們的cmd上面看紀錄
然後到celery的頁面,可以看到有這一則訊息,就代表任務有接收到,並完成
celery跟redis這邊也先暫告一段落,明天我要跟大家介紹Docker到底為何物