各位前輩好,近日在寫ETL時(嚴格來說是ELT)遇到Schema相關的問題有點棘手,還請有經驗的前輩協助指點一下
流程:
MySQL -> parquet -> BigQuery
遇到的問題:
可以確定的是,欄位名稱與目標端BigQuery是一致的,但無論進行上述第4步或第5步會遇到table schema不匹配的問題,大致都是 expected Decimal but got 'float'之類的問題,並且是來自於pyarrow的錯誤,看起來是GCP-Client的API底層會將資料轉成parquet格式再進行後續的寫入操作,但無論資料如何做轉換,pyarrow都還是會一直遇到schema的問題。
奇怪的部分:
經過嘗試後發現,從MySQL算完的資料讀到DataFrame後(ELT),無法直接寫入BigQuery(schema不匹配)但使用pandas從BigQuery取得的資料,卻可以直接寫回BigQuery,並且我已經檢查過兩者的schema根本一模一樣,但有很多是pandas的object類型,不太確定是否因為object類型的關係,可能實際上有區別我也看不出來?
嘗試過的方法:
補充:
從MySQL到pandas這段,資料類型會經過一次轉換,然後pandas到parquet還會轉一次,pandas再去讀parquet會不會再轉我已經看不出來,有太多object類型,但是使用pandas讀BigQuery資料寫入parquet後,再將parquet讀出來仍然是可以寫回去,但從MySql那邊來的資料,有極少量的table可以成功寫入,但大部分都不行,並且這些可以成功寫入的,他們使用到的資料類型,與不能寫入的是完全一樣的,真心看不懂是發生甚麼事。
問題總結:
如何在 MySql -> parquet -> BigQuery 這個過程中,處理schema mapping的問題? 寫入MySql的話似乎就不太會有這些schema的問題,pandas的類型支援覆蓋率都很不錯,但BigQuery真的卡到陰。
流程其實很簡單,要處理的問題也看似很簡單,但實際簡不簡單還真不知道,因為目前還沒解決,可能會有很多沒說清楚的地方,若需要補充請再跟我說,進公司後會再補上code跟error的截圖。
問題聚焦:
最後補充一個我目前的思路。
與嘗試過的方法2類似,既然從BigQuery讀出來的pandas.DataFrame可以直接寫回,那我是否可以取出一個包含destination table schema的pandas.DataFrame的殼,在不改變他的schema的條件下,將MySQL取出來的資料塞進去,然後就可以寫進去了,我覺得這個方法似乎是可能性比較高的,只是要怎麼控制,基於方法2的思路去變形,要再嘗試看看,
parquet_file.metadata:
參考解:
tdf = pd.read_parquet('mypq.parquet')
bq_table_ref = client.get_table(table_id)
schema = bq_table_ref.schema
dtype_map = {}
for field in schema:
if field.field_type in ['INT64', 'INTEGER']:
dtype_map[field.name] = 'int64'
elif field.field_type == 'STRING':
dtype_map[field.name] = str
elif field.field_type == 'FLOAT64':
dtype_map[field.name] = 'float64'
elif field.field_type == 'BOOLEAN':
dtype_map[field.name] = bool
elif field.field_type == 'BYTES':
dtype_map[field.name] = object
elif field.field_type == 'TIMESTAMP':
dtype_map[field.name] = 'datetime64[ns]'
elif field.field_type == 'DATE':
dtype_map[field.name] = object
elif field.field_type == 'DATETIME':
dtype_map[field.name] = 'datetime64[ns]'
elif field.field_type == 'GEOGRAPHY':
dtype_map[field.name] = object
elif field.field_type == 'NUMERIC':
dtype_map[field.name] = object
elif field.field_type == 'BIGNUMERIC':
dtype_map[field.name] = object
elif field.field_type == 'ARRAY':
dtype_map[field.name] = object
elif field.field_type == 'STRUCT':
dtype_map[field.name] = object
else:
dtype_map[field.name] = object
# NUMERIC and BIGNUMERIC expected Decimal in BigQuery
for field in schema:
if field.field_type in ['NUMERIC', 'BIGNUMERIC']:
tdf[field.name].fillna(0, inplace=True)
tdf[field.name] = tdf[field.name].apply(lambda x: decimal.Decimal(str(x)))
converted_df = tdf.astype(dtype_map)
沒遇過這個問題。
大概你得提供某些範例資料吧。
做個沒意義的假表,只放幾個紀錄出來看一下。
就我來看,你看起來型別一樣,大概沒有真的一樣。
幾個問題
bigquery 的在這裡
確認一下
每個表格的在各個地方的型別
column|mysql|pandas|parquet|bigquery
感謝回覆,以下提供更多資訊:
parquet 的版本是?(我指序列化的版本)
>> 11.0.0
載入 bigquery 是用 LOAD 還是 external table?
>> 兩個方法都試過,都會卡schema,應該是pandas資料類型的問題
to_parquet 是用 pandas 的 api 還是自己寫的函數?
>> 用pandas的df.to_parquet()
有沒有辦法提供 pandas to parquet 的型別對應表?
>> 稍後補充在文檔
如果沒有堅持使用 parquet 的理由
可參考這篇的幾種方法
乍看之下很簡單