首先我們先去定義 DAG,僅需要定義撈取紙本發票資訊即可
由於採用非同步方式,所以使用 asynico
with DAG(
dag_id='invoice_manual_paper',
default_args=args,
schedule_interval='你的 cronjob',
start_date='你的排程初始時間',
end_date='你的排程結束時間',
tags=['Invoice'],
max_active_runs=1,
concurrency=1,
catchup=False
) as dag:
def invoice_manual_paper():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(InvoiceETL.get_invoice_detail())
loop.close()
invoice_manual_paper = PythonOperator(
task_id='invoice_manual_paper',
python_callable=invoice_manual_paper,
)
invoice_manual_paper
這時候搜尋 審查中
的發票,就可以抓到這次需要更新的發票
@classmethod
@handle_exceptions
async def get_invoice_detail(cls):
"""
紙本電子發票 - 查詢發票明細
"""
data_dao = DataDao()
conn = data_dao.conn
query = {
"tag": "invoice_paper",
"data.gov.status": "審查中"
}
invoices = await conn.find_all_async(query)
logging.info("本次更新發票張數為: {} 張".format(len(invoices)))
await cls.process_chunks_with_semaphore(invoices, cls.process_invoice_detail, conn)
conn.close()
接著,我這邊定義一個 function 是 process_chunks_with_semaphore
考量到發票會有很多,因此可以分批做,而這個參數可以透過這兩個參數做調整
SEM_LIMIT = 10 # 同時最多可以跑幾張發票
CHUNK_SIZE = 10 # 將發票數量進行分割,一輪可以有幾張發票進行
async def process_chunks_with_semaphore(data, process_func, conn, *args, **kwargs):
sem = asyncio.Semaphore(SEM_LIMIT)
async with sem:
chunk_size = CHUNK_SIZE
num_items = len(data)
for i in range(0, num_items, CHUNK_SIZE):
logging.info("第 {} 輪".format(i // chunk_size + 1))
chunk = data[i:i + chunk_size]
tasks = []
for item in chunk:
task = asyncio.create_task(process_func(item, conn, *args, **kwargs))
tasks.append(task)
await asyncio.sleep(0.5)
await asyncio.gather(*tasks)
接著向財政部呼叫 API,但根據電子發票上傳期限 >>> 如果是近兩天內的發票可能會沒有資訊,所以直接跳過
我們透過回傳的資訊做不同的處理:
拿到 response 之後 ...
4. response['invStatus'] == "該筆發票並無開立" → 使用者亂輸入
5. not 'details' in response → 隨機碼錯誤
try:
# 近兩天的發票先不做
invDate = datetime.strptime(str(my_inv["invDate"]), '%Y%m%d').date()
today = datetime.now(tz).date()
if (today - invDate).days <= 2:
return
# call API
response = await asyncio.to_thread(
twi.get_invoice_detail,
"Barcode",
my_inv["invNum"],
invDate,
my_inv["random_number"],
cls.get_invoice_term(my_inv["invDate"])
)
logging.info("resp'onse:{}".format(response))
except Exception as e:
# 失敗的話...
else:
# 成功的話...
這邊留意 asyncio.to_thread
是 python 3.9 以上才有,也因此我在 Dockerfile 安裝的是 python 3.9 版本
如果使用 3.8 以下的版本,應該會遇到 AttributeError: module 'asyncio' has no attribute 'to_thread'
Stackoverflow:Python module 'asyncio' has no attribute 'to_thread'