iT邦幫忙

2023 iThome 鐵人賽

DAY 7
0

又回到最初的 Day3

還記得我們開啟了 http://localhost:8080/ 了嗎?忘記了沒關係,從頭再來一次~

還沒安裝的,可以回到 Day3:快樂安裝
如果已經安裝完就繼續往下吧~

step1: 開啟專案資料夾 airflow-local

  • 可以用 vscode 或是直接開 terminal/命令提示字元 cd 進入也可以。

step2: 進入虛擬環境

source airflow-env/bin/activate

https://ithelp.ithome.com.tw/upload/images/20230924/20135427NwpzDZNGG4.png

  • 看到 terminal 前面有 (airflow-env) 代表進入虛擬環境了。

step3: 開啟 Webserver

airflow webserver -p 8080
  • 打開 http://localhost:8080/ 就可以看到這個畫面了
    https://ithelp.ithome.com.tw/upload/images/20230922/201354279JqwAWPnEc.png
  • 看到上面有三行通知:
    1.The scheduler does not appear to be running:提醒你還沒開啟 Scheduler
    2.Do not use SQLite as metadata DB in production:在正式開發不要用 SQLite
    3.Do not use the SequentialExecutor in production:在正式開發不要用 SequentialExecutor

SequentialExecutor 是第一次安裝 Airflow 時的預設執行器,也是唯一一個可以和 sqlite 協作的執行器,但缺點就是一次只能執行一項任務,所以執行效率很差,但目前測試也沒關係。
Ref: 官方說明

step4: 開啟 Scheduler

開啟一個新的 terminal,運行 airflow scheduler ,接著打開 http://localhost:8080/F5 刷新頁面,第一行通知就會消失了。

開始實作

1. 創建 dags 資料夾和 Python 檔案

首先我們要創建一個 dags/ 資料夾在 ~/airflow/ 專案當中,並在裡面建立一個 python 檔案,dags/first_dag.py

  • 目前整個資料夾結構如下:
--- dags/
    --- first_dag.py
--- logs/
--- airflow.cfg
--- airflow.db
--- webserver_config.py

2. 在 first_dag.py 中匯入套件

from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
  • datetime 模組是用來設定開始時間、間隔時間和時區。
  • airflow 的 DAG 就是我們要創建的模組
  • BashOperator 是我們等等要使用的 Operator,是最基礎的,就像是執行 terminal 指令一樣。

3. Create 一個 DAG 物件

with DAG(
    dag_id = 'first_dag',
    start_date = datetime(2023, 9, 22),
    schedule_interval='@daily'
) as dag:
  • 在這裡我們會幫 DAG 做基礎的設定,dag_id 就是在 dag 的唯一識別名稱

!強烈建議!要和 python 檔案用相同名稱,不然之後 DAG 一多就會很混亂

  • start_date 就是 DAG 要開始執行的日期
  • schedule_interval 是時間間隔,'@daily' 代表以天為單位

4. Create 任務們

) as dag:下方我們就可以直接來寫任務了

...
) as dag:
    task1 = BashOperator(
        task_id='task1',
        bash_command="echo start!!"
    )

    task2 = BashOperator(
        task_id='task2',
        bash_command="echo finish!!"
    )
  • task_id: task 的名稱
  • bash_command:我們要執行的 bash 指令
  • echo start!!:echo 就是 Python 當中的 print 意思,印出 start!!
  • task2 以此類推

5. 設定任務之間的關係 / 相依性 (Dependecies)

task1 >> task2
  • 先執行 task1 再執行 task2

6. 重新運行 webserver

airflow webserver -p 8080
airflow scheduler

https://ithelp.ithome.com.tw/upload/images/20230923/201354273VKvenQQRg.png

虛脫的小傑

很好奇大家都是先準備好貼文才參加鐵人賽嗎/images/emoticon/emoticon02.gif,我目前都是當天現打,然後整理之前的筆記,每天都感覺來不及,希望假日趕緊多補幾篇,看能不能比較有彈性,真的快虛脫了/images/emoticon/emoticon16.gif(這週補班又少一天假了~),哎~~~繼續努力吧!

完整的程式碼

  • path: /Users/{user_name}/airflow-local/dags/first_dag.py
  • virtual-env: airflow-env
from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator

with DAG(
    dag_id = 'first_dag',
    start_date = datetime(2023, 9, 22),
    schedule_interval=None
) as dag:
    task1 = BashOperator(
        task_id='task1',
        bash_command="echo start!!"
    )

    task2 = BashOperator(
        task_id='task2',
        bash_command="echo finish!!"
    )

    task1 >> task2

上一篇
[Day6] 超級蘑菇 Airflow 的食用說明書(2)
下一篇
[Day8] 關於 Airflow Web UI 的大小事
系列文
Airflow 是什麼? 能吃嗎 ? 數據水管工的超級蘑菇30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言