如果昨天設計 DAG 都了解,今天的課題基本上就是 copy paste 改一改
這邊先補充昨天沒有提到的 @handle_exceptions
由於太多地方會寫 try-except,這時候我們可以善用裝飾器
這樣程式碼可以更簡潔、更俐落
def handle_exceptions(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as e:
logging.error(e)
traceback.print_exc()
return wrapper
一樣要先去定義 Dag,查詢載具只需要一個步驟就可以完成
with DAG(
dag_id='invoice_valid_carrier',
default_args=args,
schedule_interval='你的 cronjob',
start_date='你的排程初始時間',
end_date='你的排程結束時間',
tags=['Invoice'],
max_active_runs=1,
concurrency=1,
catchup=False
) as dag:
def get_valid_carrier():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(InvoiceETL.get_valid_carrier())
loop.close()
get_valid_carrier = PythonOperator(
task_id='get_carrier_invoices_header',
python_callable=get_valid_carrier,
)
get_valid_carrier
Dag 會呼叫 get_valid_carrier
可抓出需要進行審查的載具
@classmethod
@handle_exceptions
async def get_valid_carrier(cls):
"""
載具 - 判斷載具正確性與否
"""
data_dao = DataDao()
conn = data_dao.conn
query = {
"tag": "carrier",
"data.gov.status": "審查中",
}
users = await conn.find_all_async(query)
logging.info("本次須判斷載具正確性與否人數為: {} 人".format(len(users)))
await cls.process_chunks_with_semaphore(users, cls.process_aggregate_carrier, conn)
conn.close()
我們透過回傳的資訊做不同的處理:
拿到 response 之後...
5. response['msg'] → 載具成功綁定
6. 不是 response['msg'] → 總之遇到錯誤,讓排程下一輪可以繼續跑
try:
response = await asyncio.to_thread(
twi.get_aggregate_carrier,
CARD_TYPE,
my_user["card_no"],
decrypt_with_salt(my_user["card_encrypt"])
)
logging.info("response:{}".format(response))
except Exception as e:
# 失敗的話...
else:
# 成功的話...
這裡亦補充說明的錯誤分類有遇到幾種:
若還有遇到其他錯誤也可以交流一下~