iT邦幫忙

2022 iThome 鐵人賽

DAY 16
0

import importlib
import shutil
import os

import sentry_sdk

from scarabaeus.enum import Status, ErrorCode, Task
from scarabaeus.interface.runable import RunAble
from scarabaeus.lib.error.error_messenger import ErrorMessenger
from scarabaeus.config import Config
from scarabaeus_task_runner.datas.dependency import Dependency
from scarabaeus_task_runner.frameworks.task_runner_facade import TaskRunnerFacade
from scarabaeus_task_runner.utils.auto_discover import load_module
from scarabaeus_task_runner.utils.query import fetch_task_info_by_id, update_common_task, log_common_task_start

class Main(RunAble):
_dependency: Dependency
_task_info: dict
_task_runner: TaskRunnerFacade
_origin_cwd: str
_runtime_path: str

def run(self):
    self._initialize_runner()
    self._fetch_task_info()
    self._load_tool_runner()
    self._setup_environment()
    self._run_task()
    self._clean_environment()

def _initialize_runner(self):
    self._dependency = Dependency()

def _fetch_task_info(self):
    task_id = os.getenv('TASK_ID')
    self._dependency.logger.info('Fetch task info by id: {}'.format(task_id))
    task_info = fetch_task_info_by_id(task_id)
    self._dependency.logger.info('Fetch task info: {}'.format(task_info))
    self._task_info = task_info

def _load_tool_runner(self):
    module_name = self._task_info['module_name']
    tool_class = load_module(module_name)
    self._dependency.logger.info('Load tool class: {}'.format(tool_class))
    self._task_runner = tool_class(self._dependency, self._task_info)

def _setup_environment(self):
    task_id = str(self._task_info['id'])
    base_path = self._dependency.config.BASE_PATH
    runtime_path = os.path.join(base_path, task_id)
    self._dependency.logger.info(f'Runtime path: {runtime_path}')
    os.makedirs(runtime_path, exist_ok=True)
    self._origin_cwd = os.getcwd()
    os.chdir(runtime_path)
    self._runtime_path = runtime_path

def _run_task(self):
    log_common_task_start(self._task_info['id'])
    try:
        self._task_runner()
        output_s3_key = self._task_runner.output_s3_key
        update_common_task(self._task_info['id'], Status.END, output_s3_key)
    except ValueError as e:
        self._dependency.logger.error(e)
        update_common_task(self._task_info['id'], Status.ERROR)
        ErrorMessenger.record(
            ErrorCode.SCAR_BAD_REQUEST.CODE,
            ErrorCode.SCAR_BAD_REQUEST.SUB_CODE,
            str(e),
            Task.AUTO_PACK,
            self._task_info['id']
        )
        raise e
    except Exception as e:
        update_common_task(self._task_info['id'], Status.ERROR)
        raise e

def _clean_environment(self):
    self._dependency.logger.info('Clean environment')
    os.chdir(self._origin_cwd)
    shutil.rmtree(self._runtime_path)

if name == 'main':
sentry_sdk.init(Config.SENTRY_ENDPOINT,
traces_sample_rate=1.0, environment=Config.ENV)
main_process = Main()
main_process()


上一篇
D16 - Design Pattern
下一篇
D17-高低階分離
系列文
寫個好的lib大家用吧!那些好用的lib常見的套路與想法25
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言