iT邦幫忙

2022 iThome 鐵人賽

DAY 15
0
import importlib
import shutil
import os

import sentry_sdk

from scarabaeus.enum import Status, ErrorCode, Task
from scarabaeus.interface.runable import RunAble
from scarabaeus.lib.error.error_messenger import ErrorMessenger
from scarabaeus.config import Config
from scarabaeus_task_runner.datas.dependency import Dependency
from scarabaeus_task_runner.frameworks.task_runner_facade import TaskRunnerFacade
from scarabaeus_task_runner.utils.auto_discover import load_module
from scarabaeus_task_runner.utils.query import fetch_task_info_by_id, update_common_task, log_common_task_start


class Main(RunAble):
    _dependency: Dependency
    _task_info: dict
    _task_runner: TaskRunnerFacade
    _origin_cwd: str
    _runtime_path: str

    def run(self):
        self._initialize_runner()
        self._fetch_task_info()
        self._load_tool_runner()
        self._setup_environment()
        self._run_task()
        self._clean_environment()

    def _initialize_runner(self):
        self._dependency = Dependency()

    def _fetch_task_info(self):
        task_id = os.getenv('TASK_ID')
        self._dependency.logger.info('Fetch task info by id: {}'.format(task_id))
        task_info = fetch_task_info_by_id(task_id)
        self._dependency.logger.info('Fetch task info: {}'.format(task_info))
        self._task_info = task_info

    def _load_tool_runner(self):
        module_name = self._task_info['module_name']
        tool_class = load_module(module_name)
        self._dependency.logger.info('Load tool class: {}'.format(tool_class))
        self._task_runner = tool_class(self._dependency, self._task_info)

    def _setup_environment(self):
        task_id = str(self._task_info['id'])
        base_path = self._dependency.config.BASE_PATH
        runtime_path = os.path.join(base_path, task_id)
        self._dependency.logger.info(f'Runtime path: {runtime_path}')
        os.makedirs(runtime_path, exist_ok=True)
        self._origin_cwd = os.getcwd()
        os.chdir(runtime_path)
        self._runtime_path = runtime_path

    def _run_task(self):
        log_common_task_start(self._task_info['id'])
        try:
            self._task_runner()
            output_s3_key = self._task_runner.output_s3_key
            update_common_task(self._task_info['id'], Status.END, output_s3_key)
        except ValueError as e:
            self._dependency.logger.error(e)
            update_common_task(self._task_info['id'], Status.ERROR)
            ErrorMessenger.record(
                ErrorCode.SCAR_BAD_REQUEST.CODE,
                ErrorCode.SCAR_BAD_REQUEST.SUB_CODE,
                str(e),
                Task.AUTO_PACK,
                self._task_info['id']
            )
            raise e
        except Exception as e:
            update_common_task(self._task_info['id'], Status.ERROR)
            raise e

    def _clean_environment(self):
        self._dependency.logger.info('Clean environment')
        os.chdir(self._origin_cwd)
        shutil.rmtree(self._runtime_path)


if __name__ == '__main__':
    sentry_sdk.init(Config.SENTRY_ENDPOINT,
                    traces_sample_rate=1.0, environment=Config.ENV)
    main_process = Main()
    main_process()

上一篇
D15 - 說人話啊,談有實用性的log
下一篇
D16 - Unittest
系列文
寫個好的lib大家用吧!那些好用的lib常見的套路與想法25
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言