iT邦幫忙

0

自我筆記 - django 系列 [任務列隊celery篇]

django celery

celery是什麼

  • 一種讓異步處理任務的工具
  • 主要應用情境為以下:
    • 任務調度 (例如: 寄信這種會比較需要等待的事情)
    • 定時任務

此篇主要解講定時任務的部份

定時任務

執行環境
* python 3.7.4
* django 2.1.7
* celery 4.4.7
* django-redis 4.12.1 (本篇用redis當作中間人)

必須先安裝redis服務,再進行以下

目錄結構

目錄結構
    |   manage.py
    \---ProjectName
            asgi.py
            settings.py
            urls.py
            wsgi.py
            __init__.py
            ★ celery.py
    \---AppName
        task.py (名稱task為預設)
  • celery.py: 基本設定
    from __future__ import absolute_import, unicode_literals
    import os
    from celery import Celery, platforms
    from celery.schedules import crontab
    from datetime import timedelta
    from kombu import Queue # 用於多任務列隊

    # django_DRF 為專案名稱
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'Django_DRF.settings')

    # 使用redis作為中間人(broker)
    # 使用redis作為定時任務存取地方(backend)
    # 預設redis為本地127.0.0.1:6379/1(redis)
    # ★★★ 此處須注意(若本地沒有redis卻設置redis預設 -> 則會顯示10061連線錯誤)
    app = Celery('Django_DRF', backend='redis://127.0.0.1:6379/14', broker='redis://127.0.0.1:6379/15')

    # Using a string here means the worker don't have to serialize
    # the configuration object to child processes.
    # - namespace='CELERY' means all celery-related configuration keys
    #   should have a `CELERY_` prefix.
    app.config_from_object('django.conf:settings', namespace='CELERY')

    # 預設任務名稱為tasks用意為此,此行會加載已經有註冊的app.task.py檔案
    app.autodiscover_tasks()
    # 設定任務進入哪個列隊 (在開啟worker時,可以分別進行)
    app.conf.task_routes = {
        'AppName1.tasks.TF_postingJob': {
            'queue': 'tasks_one'
        },
        'AppName2.tasks.MLO_dailyDataJob': {
            'queue': 'tasks_two'
        }
    }
    # 設定列隊路由
    app.conf.task_queues = (
        Queue('tasks_one', routing_key='tasks_one'),
        Queue('tasks_two', routing_key='tasks_two'),
    )
    # 允許root用戶運行celery
    platforms.C_FORCE_ROOT = True
    # 有出錯會顯示於此
    @app.task(bind=True)
    def debug_task(self):
        print('Request: {0!r}'.format(self.request))

    # 固定的定時任務
    app.conf.update(
        CELERYBEAT_SCHEDULE = {
            'JobName': {                            # 任務名稱
                'task': 'app.tasks.functionName',   # 執行的任務
                'schedule':  timedelta(seconds=5),  # 週期多久一次
                'args': ()                          # 此處可帶參數
            }
        }
    )
  • app.tasks.py: 任務編寫位置
# celery指定的任務檔案,只能叫做task.py
# 於此加入所需要執行的程序
from __future__ import absolute_import
from celery import shared_task
from channels.layers import get_channel_layer
from .timelyService import *
@shared_task
    def MLC_postingJob():
        print('test')

如何開始執行

  1. 定時任務需在開啟beat服務(borker)

    • 於專案目錄底下輸入celery beat -A projectName -l info
      https://ithelp.ithome.com.tw/upload/images/20210524/20132538uKTX7xCJNw.png
    • 看到上圖已經成功開始定時器了,任務也有成功發送,於redis(下圖)可看見以下任務成功進行 排隊排隊排隊 (很重要所以說三次),此時任務是沒有被成功執行的。
      https://ithelp.ithome.com.tw/upload/images/20210524/20132538kkGXR0K6c7.png
  2. 開啟worker執行broker下達的任務

    • 於專案目錄底下輸入celery -A Django_DRF worker --pool=solo -l info
      https://ithelp.ithome.com.tw/upload/images/20210524/20132538gSJYwZFw2Q.png
    • 看到上圖已經成功開啟,若有任務需處理則會執行,執行結果會存於backend
      https://ithelp.ithome.com.tw/upload/images/20210524/201325382GWxNt5ll5.png
    • 若有執行時間較長的任務,可利用多列隊解決,如下指令
    # 1.開啟CMD輸入以下
    celery -A Django_DRF worker --pool=solo -l info -Q tasks_one # tasks_one 為該worker需要執行的列隊內容。
    

尚未有邦友留言

立即登入留言