iT邦幫忙

0

python pandas to bigquery的table schema問題

  • 分享至 

  • xImage

各位前輩好,近日在寫ETL時(嚴格來說是ELT)遇到Schema相關的問題有點棘手,還請有經驗的前輩協助指點一下

流程:

MySQL -> parquet -> BigQuery

  1. 因有大量ETL要處理,因此尋求統一的做法,不可例外狀況例外處理。
  2. 使用pandas下SQL從MySQL中取得DataFrame
  3. 將該DataFrame轉成parquet檔放在GCS中
  4. 讓BigQuery去GCS中拿parquet寫入 (使用GCP-Client的load_from_uri)
  5. 或透過分布式集群使用pandas讀取parquet後再寫入BigQuery (使用GCP-Client的load_from_dataframe)

遇到的問題:
可以確定的是,欄位名稱與目標端BigQuery是一致的,但無論進行上述第4步或第5步會遇到table schema不匹配的問題,大致都是 expected Decimal but got 'float'之類的問題,並且是來自於pyarrow的錯誤,看起來是GCP-Client的API底層會將資料轉成parquet格式再進行後續的寫入操作,但無論資料如何做轉換,pyarrow都還是會一直遇到schema的問題。

奇怪的部分:
經過嘗試後發現,從MySQL算完的資料讀到DataFrame後(ELT),無法直接寫入BigQuery(schema不匹配)但使用pandas從BigQuery取得的資料,卻可以直接寫回BigQuery,並且我已經檢查過兩者的schema根本一模一樣,但有很多是pandas的object類型,不太確定是否因為object類型的關係,可能實際上有區別我也看不出來?

嘗試過的方法:

  1. 先將BigQuery中的table schema讀出來,然後讓DataFrame針對每個欄位去做對應轉換,失敗。
  2. 與1.作法相似,使用SELECT WHERE 1=0取得空的DataFrame,然後再把兩個DataFrame去做astype的mapping,失敗。
  3. 調整load_data的參數,直接將schema帶入,或是指定tableRef,都失敗。
  4. 後來還嘗試過幾個作法,但有點忘了,但都大同小異,就是做df.astype。

補充:
從MySQL到pandas這段,資料類型會經過一次轉換,然後pandas到parquet還會轉一次,pandas再去讀parquet會不會再轉我已經看不出來,有太多object類型,但是使用pandas讀BigQuery資料寫入parquet後,再將parquet讀出來仍然是可以寫回去,但從MySql那邊來的資料,有極少量的table可以成功寫入,但大部分都不行,並且這些可以成功寫入的,他們使用到的資料類型,與不能寫入的是完全一樣的,真心看不懂是發生甚麼事。

問題總結:
如何在 MySql -> parquet -> BigQuery 這個過程中,處理schema mapping的問題? 寫入MySql的話似乎就不太會有這些schema的問題,pandas的類型支援覆蓋率都很不錯,但BigQuery真的卡到陰。

流程其實很簡單,要處理的問題也看似很簡單,但實際簡不簡單還真不知道,因為目前還沒解決,可能會有很多沒說清楚的地方,若需要補充請再跟我說,進公司後會再補上code跟error的截圖。

問題聚焦:
最後補充一個我目前的思路。
與嘗試過的方法2類似,既然從BigQuery讀出來的pandas.DataFrame可以直接寫回,那我是否可以取出一個包含destination table schema的pandas.DataFrame的殼,在不改變他的schema的條件下,將MySQL取出來的資料塞進去,然後就可以寫進去了,我覺得這個方法似乎是可能性比較高的,只是要怎麼控制,基於方法2的思路去變形,要再嘗試看看,

parquet_file.metadata:
https://ithelp.ithome.com.tw/upload/images/20230921/20114492MgvnAYNKxN.png

參考解:


tdf = pd.read_parquet('mypq.parquet')

bq_table_ref = client.get_table(table_id)
schema = bq_table_ref.schema
dtype_map = {}

for field in schema:
    if field.field_type in ['INT64', 'INTEGER']:
        dtype_map[field.name] = 'int64'
    elif field.field_type == 'STRING':
        dtype_map[field.name] = str
    elif field.field_type == 'FLOAT64':
        dtype_map[field.name] = 'float64'
    elif field.field_type == 'BOOLEAN':
        dtype_map[field.name] = bool
    elif field.field_type == 'BYTES':
        dtype_map[field.name] = object
    elif field.field_type == 'TIMESTAMP':
        dtype_map[field.name] = 'datetime64[ns]'
    elif field.field_type == 'DATE':
        dtype_map[field.name] = object
    elif field.field_type == 'DATETIME':
        dtype_map[field.name] = 'datetime64[ns]'
    elif field.field_type == 'GEOGRAPHY':
        dtype_map[field.name] = object
    elif field.field_type == 'NUMERIC':
        dtype_map[field.name] = object
    elif field.field_type == 'BIGNUMERIC':
        dtype_map[field.name] = object
    elif field.field_type == 'ARRAY':
        dtype_map[field.name] = object
    elif field.field_type == 'STRUCT':
        dtype_map[field.name] = object
    else:
        dtype_map[field.name] = object

# NUMERIC and BIGNUMERIC expected Decimal in BigQuery
for field in schema:
    if field.field_type in ['NUMERIC', 'BIGNUMERIC']:
        tdf[field.name].fillna(0, inplace=True)
        tdf[field.name] = tdf[field.name].apply(lambda x: decimal.Decimal(str(x)))
        
converted_df = tdf.astype(dtype_map)
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

2 個回答

1
obarisk
iT邦研究生 2 級 ‧ 2023-09-21 08:02:12
最佳解答

沒遇過這個問題。
大概你得提供某些範例資料吧。

做個沒意義的假表,只放幾個紀錄出來看一下。
就我來看,你看起來型別一樣,大概沒有真的一樣。

幾個問題

  • parquet 的版本是?(我指序列化的版本)
  • 載入 bigquery 是用 LOAD 還是 external table?
  • to_parquet 是用 pandas 的 api 還是自己寫的函數?
  • 有沒有辦法提供 pandas to parquet 的型別對應表?

bigquery 的在這裡

確認一下

每個表格的在各個地方的型別

column|mysql|pandas|parquet|bigquery

看更多先前的回應...收起先前的回應...
Peter iT邦新手 4 級 ‧ 2023-09-21 09:23:24 檢舉

感謝回覆,以下提供更多資訊:
parquet 的版本是?(我指序列化的版本)
>> 11.0.0
載入 bigquery 是用 LOAD 還是 external table?
>> 兩個方法都試過,都會卡schema,應該是pandas資料類型的問題
to_parquet 是用 pandas 的 api 還是自己寫的函數?
>> 用pandas的df.to_parquet()
有沒有辦法提供 pandas to parquet 的型別對應表?
>> 稍後補充在文檔

obarisk iT邦研究生 2 級 ‧ 2023-09-21 09:34:42 檢舉
obarisk iT邦研究生 2 級 ‧ 2023-09-21 09:36:38 檢舉

我之前都是用 pyarrow.parquet 直接寫檔案

null 跟 float 的問題請參考上面的連結

Peter iT邦新手 4 級 ‧ 2023-09-21 20:37:04 檢舉

抱歉今天開一整天會,沒能即時回應,你的方向是對的,就是很多類似的類別問題需要轉,我後來一個一個去試,然後調校出一個目前還堪用的mapping list。除了Null的問題之外,還有pandas的int64轉成object之後其實還會是int64(也就是根本就沒轉,只是類別名稱變了),還有Decimal在pandas原生中不支援的的問題,等等等的一堆轉換

obarisk iT邦研究生 2 級 ‧ 2023-09-22 10:30:37 檢舉

所以我大概不會用 pandas 做 :)

1
海綿寶寶
iT邦大神 1 級 ‧ 2023-09-21 07:29:07

如果沒有堅持使用 parquet 的理由
可參考這篇的幾種方法
乍看之下很簡單

Peter iT邦新手 4 級 ‧ 2023-09-21 09:18:13 檢舉

感謝回覆,根據官方document中的方法,我應該全試過了,schema的部分還是會出錯,目前看似繞不過去,必須得轉換成與BigQuery完全相同的類型

我要發表回答

立即登入回答