iT邦幫忙

2023 iThome 鐵人賽

DAY 25
0

DAG

首先我們先去定義 DAG,僅需要定義撈取紙本發票資訊即可

由於採用非同步方式,所以使用 asynico

with DAG(
        dag_id='invoice_manual_paper',
        default_args=args,
        schedule_interval='你的 cronjob',
        start_date='你的排程初始時間',
        end_date='你的排程結束時間',
        tags=['Invoice'],
        max_active_runs=1,
        concurrency=1,
        catchup=False
) as dag:
    def invoice_manual_paper():
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop.run_until_complete(InvoiceETL.get_invoice_detail())
        loop.close()

    invoice_manual_paper = PythonOperator(
            task_id='invoice_manual_paper',
            python_callable=invoice_manual_paper,
    )

    invoice_manual_paper

ETL

這時候搜尋 審查中 的發票,就可以抓到這次需要更新的發票

@classmethod
@handle_exceptions
async def get_invoice_detail(cls):
    """
    紙本電子發票 - 查詢發票明細
    """
    data_dao = DataDao()
    conn = data_dao.conn
    query = {
        "tag": "invoice_paper",
        "data.gov.status": "審查中"
    }
    invoices = await conn.find_all_async(query)
    logging.info("本次更新發票張數為: {} 張".format(len(invoices)))
    await cls.process_chunks_with_semaphore(invoices, cls.process_invoice_detail, conn)
    conn.close()

https://ithelp.ithome.com.tw/upload/images/20230927/201143803LKSO3EzfW.png

接著,我這邊定義一個 function 是 process_chunks_with_semaphore

考量到發票會有很多,因此可以分批做,而這個參數可以透過這兩個參數做調整

SEM_LIMIT = 10  # 同時最多可以跑幾張發票
CHUNK_SIZE = 10 # 將發票數量進行分割,一輪可以有幾張發票進行
async def process_chunks_with_semaphore(data, process_func, conn, *args, **kwargs):
        sem = asyncio.Semaphore(SEM_LIMIT)
        async with sem:
            chunk_size = CHUNK_SIZE
            num_items = len(data)
            for i in range(0, num_items, CHUNK_SIZE):
                logging.info("第 {} 輪".format(i // chunk_size + 1))
                chunk = data[i:i + chunk_size]
                tasks = []
                for item in chunk:
                    task = asyncio.create_task(process_func(item, conn, *args, **kwargs))
                    tasks.append(task)
                    await asyncio.sleep(0.5)
                await asyncio.gather(*tasks)

接著向財政部呼叫 API,但根據電子發票上傳期限 >>> 如果是近兩天內的發票可能會沒有資訊,所以直接跳過

https://ithelp.ithome.com.tw/upload/images/20230927/201143800vTd50YafD.png

我們透過回傳的資訊做不同的處理:

  1. ValidationError → 發票有誤
  2. JSONDecodeError → 總之遇到錯誤,讓排程下一輪可以繼續跑
  3. Exception → 總之遇到錯誤,讓排程下一輪可以繼續跑

拿到 response 之後 ...
4. response['invStatus'] == "該筆發票並無開立" → 使用者亂輸入
5. not 'details' in response → 隨機碼錯誤

try:
    # 近兩天的發票先不做
    invDate = datetime.strptime(str(my_inv["invDate"]), '%Y%m%d').date()
    today = datetime.now(tz).date()
    if (today - invDate).days <= 2:
        return
    # call API
    response = await asyncio.to_thread(
        twi.get_invoice_detail,
        "Barcode",
        my_inv["invNum"],
        invDate,
        my_inv["random_number"],
        cls.get_invoice_term(my_inv["invDate"])
    )
    logging.info("resp'onse:{}".format(response))
except Exception as e:
    # 失敗的話...
else:
    # 成功的話...

注意 python 版本

這邊留意 asyncio.to_thread 是 python 3.9 以上才有,也因此我在 Dockerfile 安裝的是 python 3.9 版本

如果使用 3.8 以下的版本,應該會遇到 AttributeError: module 'asyncio' has no attribute 'to_thread'

Stackoverflow:Python module 'asyncio' has no attribute 'to_thread'


上一篇
Day 24:設計 MongoDB Dao
下一篇
Day 26:設計查詢載具有效的 DAG
系列文
透過 python 建立發票系統 - 自己的發票自己查30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言