查詢載具發票的 DAG 是比較複雜的邏輯,還記得在 Day 2:電子發票應用 API 規格 - 1
有提到「抓載具發票是分成兩段式,也因此發票 APP 會先拿到所有載具存入的發票,僅接著才可以再拿載具條碼、發票日期、發票號碼等等資訊再去拿到發票購買明細」。
由於內容較多,今天會先介紹前半部 - 怎麼抓到發票表頭,明天會繼續介紹抓發票購買細項哦!
因此 DAG 也必須設計兩段抓取發票資料
with DAG(
dag_id='invoice_auto_carrier',
default_args=args,
schedule_interval='你的 cronjob',
start_date='你的排程初始時間',
end_date='你的排程結束時間',
tags=['Invoice'],
max_active_runs=1,
concurrency=1,
catchup=False
) as dag:
def get_carrier_invoices_header():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(InvoiceETL.get_carrier_invoices_header())
loop.close()
def get_carrier_invoices_detail():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(InvoiceETL.get_carrier_invoices_detail())
loop.close()
get_carrier_invoices_header = PythonOperator(
task_id='get_carrier_invoices_header',
python_callable=get_carrier_invoices_header,
)
get_carrier_invoices_detail = PythonOperator(
task_id='get_carrier_invoices_detail',
python_callable=get_carrier_invoices_detail,
)
get_carrier_invoices_header >> get_carrier_invoices_detail
在 Airflow 的 Graph 呈現如下
@classmethod
@handle_exceptions
async def get_carrier_invoices_header(cls):
"""
載具 - 載具發票表頭查詢
"""
data_dao = DataDao()
conn = data_dao.conn
query = {
"tag": "carrier",
"data.gov.status": "已通過",
}
carriers = await conn.find_all_async(query)
logging.info("本次可更新載具使用者人數為: {} 人".format(len(carriers)))
await cls.process_chunks_with_semaphore(carriers, cls.process_carrier_invoice, conn)
conn.close()
如果你曾經試過打財政部的載具 API,應該不難發現時間超過一個多月可能會報錯,並且也只能打最多半年前的日期,因此設計抓的日期的時候必須要先做檢查。
所以 data_list 定義為 [[5/1, 5/31]],如下圖
invoice_start_date = datetime.strptime(str("2023-05-01"), '%Y-%m-%d').date()
invoice_end_date = datetime.strptime(str("2023-05-31"), '%Y-%m-%d').date()
date_list = [[invoice_start_date, invoice_end_date]]
logging.info("呼叫載具開始")
logging.info("財政部的載具發票表頭查詢 API")
async with cls.semaphore:
for start_date, end_date in date_list:
try:
response = await asyncio.to_thread(
twi.get_carrier_invoices_header,
CARD_TYPE,
my_user["card_no"],
start_date,
end_date,
decrypt_with_salt(my_user["card_encrypt"])
)
except Exception as e:
...
# 如果成功的話 ...
new_invoice = {
"category": "invoice",
"tag": "invoice_carrier",
"created_timestamp": datetime.now(tz),
"updated_timestamp": datetime.now(tz),
"data": {
"gov": {
"response": inv,
"gov_status": "未執行",
"status": "審查中",
"carrier_id": str(user["_id"]),
},
}
}
new_invoices.append(new_invoice)
對了,我在 gov 埋了 carrier_id
,為什麼呢?
因為等等查詢載具發票詳細的 API 也需要載具條碼和載具驗證碼,因此有兩個做法:
由於我不太想要寫重複的載具條碼和載具驗證碼,因此我採用前者,屆時需要多做一次查詢
屬於節省空間但增加查詢時間的做法
拿到載具發票一開始如下,抓到發票號碼和其他基本資訊,但還沒有抓細項
由於 MongoDB 沒有 PK 值,那要怎麼避免重複呢?
這裡避免重複的方式就是在塞進 DB 之前先去查詢
若有更好的方法,也可以來交流~~
query_find = {
"tag": "invoice_carrier",
"data.gov.response.invNum": inv["invNum"]
}
existing_record = conn.find_one(query_find)
if existing_record is None:
# 才去 insert