終於到了倒數第二天了,那前一天我們介紹了用爬蟲作為資料獲取的手段,那今天就來稍微講解一下有了資料之後需要怎樣進行資料流(data pipeline)的管理吧
ETL是一個縮寫,一共由以下幾個部分所組成
圖片來源:Microsoft
通常是用在資料的倉儲與管理,在越大量複雜的資料就越適合使用這種概念進行管理資料。
一筆原始的數據,通常會需要經過許多不同步驟的處理才會送入到我們最終需要的地方。而有的時候比起實作上是怎麼達成的細節,我們通常會更加在意這些經過不同步驟的階段的資料。因此將資料流在過程中以ETL的方式進行拆解和管理會可以更加有效的去維護和開發。
Web介面,圖片來源:Dagster
安裝方式非常簡單,只要用pip像是安裝一般python套件一樣即可
pip install dagster dagit
我們建立一份Day29-1.py
的檔案,內容如下
from dagster import execute_pipeline, pipeline, solid
@solid
def get_name():
return 'dagster'
@solid
def hello(something):
print(something)
@pipeline
def hello_pipeline():
hello(get_name())
這邊我們一共用了兩種decrorater,其中
在Terminal輸入
dagit -f Day29-1.py
以單檔方式打開這份流程,然後開啟localhost:3000
即可進入介面
dagster可以把一些參數留到Web UI的介面調整,因此我們這邊就來試看看,建立一份Day29-2.py
from dagster import pipeline, solid, InputDefinition, OutputDefinition, PresetDefinition, ModeDefinition
@solid(
config_schema={"input": str}, # config輸入型態的檢查
description="說明文字",
)
def string_from_config(context):
text = context.solid_config["input"] # 參數在.solid_config底下的dict
return text
@solid(
input_defs=[InputDefinition("input_string", dagster_type = str)], # 輸入變數的名稱和形態
)
def show(context, input_string):
context.log.info(input_string)
return 1
@pipeline(
mode_defs=[
ModeDefinition(
name="default1",
),
ModeDefinition(
name="default2",
),
],
preset_defs=[
# 預設config用
PresetDefinition(
"default1",
run_config={
"solids": {
"string_from_config": {"config": {"input": ":default string 1:"}}
}
},
mode="default1",
),
# 第二筆config
PresetDefinition(
"default2",
run_config={
"solids": {
"string_from_config": {"config": {"input": ":default string 2:"}}
}
},
mode="default2",
),
],
)
def my_pipeline():
show(string_from_config())
當初看到的時候是覺得這東西真的挺酷的,雖然上手門檻不低而且需要特地去適應他的邏輯,但習慣之後真的對一些資料流程專案的設計管理上是蠻有幫助的~