Airflow 的變數是使用 Key-Value
的方式儲存,我們可以打開任何一種資料庫工具,例如:DBeaver、TablePlus,就可以直接連接到之前提過的 metadata 資料庫,資料庫路徑 /Users/{username}/airflow/airflow.db
,接著點擊 Variable
Table,就會發現它是用這種方式儲存:
Airflow Web UI:Admin 管理 > Variables 變數 > 點擊 +
我們可以在這裡單獨設定變數值或匯入變數清單的 JSON。
airflow variables set my_key1 my_value
airflow variables set -j my_key2 '{"key_in_json": "value_in_json"}'
如果都沒修改,應該可以看到上面範例的 key
和 value
變數都成功儲存了~
from airflow.models import Variable
Variable.set(key="my_key1", value="my_value")
Variable.set(key="my_key2", value={"key_in_json": "value_in_json"}, serialize_json=True)
serialize_json=True
才能將變數用 JSON 儲存喔~Variable.get
Variable.get
就能取得之前設定的變數了from airflow.models import Variable
my_var = Variable.get("my_key1") # my_key 傳回的變數值
my_json_var = Variable.get(
"my_key2", deserialize_json=True
)["key_in_json"]
print(my_var)
print(my_json_var)
**context
context
,就能直接拿到 Variable ,不需要再使用 from airflow.models import Variable
def get_var(**context):
my_var = context["var"]["value"].get("my_key1")
my_json_var = context["var"]["json"].get("my_key2")["key_in_json"]
print("context my_var:",my_var)
print("context my_json_var:",my_json_var)
def print_context_func(**context):
print(context)
哇~也太多東西了吧~
稍微整理一下,會發現有好幾種資料
conf: #airflow 的基本設定
dag: var_dag, --
state:running, |___ => DAG 狀態/執行時間/間隔
"queued_at":"XXX", |
"data_interval_end":"", --
"tzinfo=Timezone(""UTC""))", # 時區資料
"ds":"2023-09-25",
"execution_date":DateTime(2023,9,25 ....),
"logical_date":DateTime(2023,9,25 ....),
"next_ds":...,
"next_execution_date":...,
"prev_ds":...
'run_id': run_id,
'task': task,
'task_instance': self,
'task_instance_key_str': ti_key_str,
'test_mode': self.test_mode,
'ti': self,
'tomorrow_ds': tomorrow_ds,
'tomorrow_ds_nodash': tomorrow_ds_nodash,
'ts': ts,
'ts_nodash': ts_nodash,
'ts_nodash_with_tz': ts_nodash_with_tz,
'var': {
'json': VariableJsonAccessor(),
'value': VariableAccessor(),
},
'yesterday_ds': yesterday_ds,
'yesterday_ds_nodash': yesterday_ds_nodash
大坑
,還有時區問題等等,很值得之後另外花一天說明。from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
def set_var():
Variable.set(key="my_key1", value="my_value")
Variable.set(key="my_key2", value={"key_in_json": "value_in_json"}, serialize_json=True)
def get_var():
my_var = Variable.get("my_key1")
my_json_var = Variable.get("my_key2", deserialize_json=True)["key_in_json"]
print("my_var:",my_var)
print("my_json_var:",my_json_var)
def get_var_by_context(**context):
my_var = context["var"]["value"].get("my_key1")
my_json_var = context["var"]["json"].get("my_key2")["key_in_json"]
print("context my_var:",my_var)
print("context my_json_var:",my_json_var)
def print_context_func(**context):
print("context:",context)
with DAG(
dag_id = 'var_dag',
start_date = datetime(2023, 9, 25),
schedule_interval=None
) as dag:
task1 = PythonOperator(
task_id='set_var',
python_callable=set_var
)
task2 = PythonOperator(
task_id='get_var',
python_callable=get_var
)
task3 = PythonOperator(
task_id='get_var_by_context',
python_callable=get_var_by_context
)
task4 = PythonOperator(
task_id='print_context_func',
python_callable=print_context_func
)
task1 >> [task2, task3] >> task4