iT邦幫忙

2023 iThome 鐵人賽

DAY 26
0

如果昨天設計 DAG 都了解,今天的課題基本上就是 copy paste 改一改 /images/emoticon/emoticon82.gif

Exception

這邊先補充昨天沒有提到的 @handle_exceptions

由於太多地方會寫 try-except,這時候我們可以善用裝飾器

這樣程式碼可以更簡潔、更俐落

def handle_exceptions(func):
    @functools.wraps(func)
    async def wrapper(*args, **kwargs):
        try:
            return await func(*args, **kwargs)
        except Exception as e:
            logging.error(e)
            traceback.print_exc()
    return wrapper

DAG

一樣要先去定義 Dag,查詢載具只需要一個步驟就可以完成

with DAG(
        dag_id='invoice_valid_carrier',
        default_args=args,
				schedule_interval='你的 cronjob',
        start_date='你的排程初始時間',
        end_date='你的排程結束時間',
        tags=['Invoice'],
        max_active_runs=1,
        concurrency=1,
        catchup=False
) as dag:
    def get_valid_carrier():
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop.run_until_complete(InvoiceETL.get_valid_carrier())
        loop.close()

    get_valid_carrier = PythonOperator(
            task_id='get_carrier_invoices_header',
            python_callable=get_valid_carrier,
    )

    get_valid_carrier

ETL

Dag 會呼叫 get_valid_carrier可抓出需要進行審查的載具

@classmethod
@handle_exceptions
async def get_valid_carrier(cls):
    """
    載具 - 判斷載具正確性與否
    """
    data_dao = DataDao()
    conn = data_dao.conn
    query = {
        "tag": "carrier",
        "data.gov.status": "審查中",
    }
    users = await conn.find_all_async(query)
    logging.info("本次須判斷載具正確性與否人數為: {} 人".format(len(users)))
    await cls.process_chunks_with_semaphore(users, cls.process_aggregate_carrier, conn)
    conn.close()

https://ithelp.ithome.com.tw/upload/images/20230927/20114380smwhfxkgfn.png

我們透過回傳的資訊做不同的處理:

  1. APIError → 參數驗證碼錯誤 → 綁定失敗
  2. APIError → 不是參數驗證碼錯誤 → 總之遇到錯誤,讓排程下一輪可以繼續跑
  3. JSONDecodeError → 總之遇到錯誤,讓排程下一輪可以繼續跑
  4. Exception → 總之遇到錯誤,讓排程下一輪可以繼續跑

拿到 response 之後...
5. response['msg'] → 載具成功綁定
6. 不是 response['msg'] → 總之遇到錯誤,讓排程下一輪可以繼續跑

try:
    response = await asyncio.to_thread(
                twi.get_aggregate_carrier,
                CARD_TYPE,
                my_user["card_no"],
                decrypt_with_salt(my_user["card_encrypt"])
    )
    logging.info("response:{}".format(response))
except Exception as e:
    # 失敗的話...
else:
    # 成功的話...

https://ithelp.ithome.com.tw/upload/images/20230927/20114380xw2rjgQUZO.png

錯誤分類

這裡亦補充說明的錯誤分類有遇到幾種:

  1. APIError :這是引用 tw_invoice 定義的錯誤
  2. JSONDecodeError:from json.decoder import JSONDecodeError
  3. ValidationError:from pydantic import ValidationError

若還有遇到其他錯誤也可以交流一下~


上一篇
Day 25:設計紙本電子發票查詢的 DAG
下一篇
Day 27:設計查詢載具發票 DAG - 1
系列文
透過 python 建立發票系統 - 自己的發票自己查30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言