iT邦幫忙

2023 iThome 鐵人賽

DAY 10
0

Airflow 的變數是如何儲存的?

Airflow 的變數是使用 Key-Value 的方式儲存,我們可以打開任何一種資料庫工具,例如:DBeaver、TablePlus,就可以直接連接到之前提過的 metadata 資料庫,資料庫路徑 /Users/{username}/airflow/airflow.db,接著點擊 Variable Table,就會發現它是用這種方式儲存:

  • id(int): 資料的識別 id
  • key(varchar): 要拿取 value 時要使用的值
  • value(text): 實際儲存的值
  • description(text): 這組 key-value 的介紹
  • is_encrypted(boolean): 是否加密,如果是密碼、API KEY、Token 類型的資料要記得加密

Airflow 的變數操作

設定變數

直接在 Web UI 設定:

Airflow Web UI:Admin 管理 > Variables 變數 > 點擊 +
我們可以在這裡單獨設定變數值或匯入變數清單的 JSON。

https://ithelp.ithome.com.tw/upload/images/20230925/20135427iUIbZ4R1Wj.png
https://ithelp.ithome.com.tw/upload/images/20230925/20135427F9hjJgKLNa.png

用 Terminal 設定:

  • 普通變數:
airflow variables set my_key1 my_value
  • json 變數
airflow variables set -j my_key2 '{"key_in_json": "value_in_json"}'

如果都沒修改,應該可以看到上面範例的 keyvalue 變數都成功儲存了~
https://ithelp.ithome.com.tw/upload/images/20230925/20135427lhqrYgIHN1.png

用 Python 設定

from airflow.models import Variable
Variable.set(key="my_key1", value="my_value")
Variable.set(key="my_key2", value={"key_in_json": "value_in_json"}, serialize_json=True)
  • 記得 serialize_json=True 才能將變數用 JSON 儲存喔~

取得變數

使用 Variable.get

  • Variable.get 就能取得之前設定的變數了
from airflow.models import Variable
my_var = Variable.get("my_key1") # my_key 傳回的變數值
my_json_var = Variable.get(
    "my_key2", deserialize_json=True
)["key_in_json"]
print(my_var)
print(my_json_var)

取得變數的神秘魔法 **context

  • context 也是一個字典(key-value)的資料
  • 儲存了運行中 DAG 和 Airflow 環境的變數
  • 在 function 中使用 context,就能直接拿到 Variable ,不需要再使用 from airflow.models import Variable
  • 裡面最有名
def get_var(**context):
    my_var = context["var"]["value"].get("my_key1")
    my_json_var = context["var"]["json"].get("my_key2")["key_in_json"]
    print("context my_var:",my_var)
    print("context my_json_var:",my_json_var)

context 裡面還有什麼

def print_context_func(**context):
    print(context)

https://ithelp.ithome.com.tw/upload/images/20230925/20135427C5xwr4wvPT.png
哇~也太多東西了吧~/images/emoticon/emoticon19.gif
稍微整理一下,會發現有好幾種資料

conf:  #airflow 的基本設定
dag: var_dag,            --
state:running,             |___ => DAG 狀態/執行時間/間隔
"queued_at":"XXX",         |
"data_interval_end":"",  --
"tzinfo=Timezone(""UTC""))", # 時區資料
"ds":"2023-09-25",
"execution_date":DateTime(2023,9,25 ....),
"logical_date":DateTime(2023,9,25 ....),
"next_ds":...,
"next_execution_date":...,
"prev_ds":...
'run_id': run_id,
'task': task,
'task_instance': self,
'task_instance_key_str': ti_key_str,
'test_mode': self.test_mode,
'ti': self,
'tomorrow_ds': tomorrow_ds,
'tomorrow_ds_nodash': tomorrow_ds_nodash,
'ts': ts,
'ts_nodash': ts_nodash,
'ts_nodash_with_tz': ts_nodash_with_tz,
'var': {
    'json': VariableJsonAccessor(),
    'value': VariableAccessor(),
},
'yesterday_ds': yesterday_ds,
'yesterday_ds_nodash': yesterday_ds_nodash
  • 大部分都是在記錄 DAG執行的時間和狀態,上次和下次也都會紀錄。
  • ds、execution_date、logical_date 是很重要的三個變數,都是在記錄執行的當下時間,和我們預計的執行時間又有點不一樣,airflow 的時間是一個 大坑,還有時區問題等等,很值得之後另外花一天說明。

完整的code

from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable


def set_var():
    Variable.set(key="my_key1", value="my_value")
    Variable.set(key="my_key2", value={"key_in_json": "value_in_json"}, serialize_json=True)

def get_var():
    my_var = Variable.get("my_key1")
    my_json_var = Variable.get("my_key2", deserialize_json=True)["key_in_json"]
    print("my_var:",my_var)
    print("my_json_var:",my_json_var)

def get_var_by_context(**context):
    my_var = context["var"]["value"].get("my_key1")
    my_json_var = context["var"]["json"].get("my_key2")["key_in_json"]
    print("context my_var:",my_var)
    print("context my_json_var:",my_json_var)

def print_context_func(**context):
    print("context:",context)

with DAG(
    dag_id = 'var_dag',
    start_date = datetime(2023, 9, 25),
    schedule_interval=None
) as dag:
    task1 = PythonOperator(
    task_id='set_var',
    python_callable=set_var
    )
    task2 = PythonOperator(
    task_id='get_var',
    python_callable=get_var
    )
    task3 = PythonOperator(
    task_id='get_var_by_context',
    python_callable=get_var_by_context
    )
    task4 = PythonOperator(
    task_id='print_context_func',
    python_callable=print_context_func
    )

    task1 >> [task2, task3] >> task4

上一篇
[Day9] Airflow Tasks 之間的資料間諜 XComs
下一篇
[Day11] 當我們同在一起 - Airflow Task Group
系列文
Airflow 是什麼? 能吃嗎 ? 數據水管工的超級蘑菇30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言