iT邦幫忙

0

Python & Airflow 學習筆記_SQLAlchemyOperator

  • 分享至 

  • xImage
  •  

在 Airflow 當中有提供 PostgresOperator 這個東西,他可以直接對資料庫進行 SQL 指令的執行,不過對於已經用習慣 ORM 方式操作資料庫的人,實在是有點難受,筆者這邊有在網路上找到有大神寫了一個 SQLAlchemyOperator 可以在 Airflow 當中進行 sqlalchemy 的操作與使用,下方為簡易的說明

一、參考連結與資料庫連線

  1. 參考連結:
    https://sorokin.engineer/posts/en/apache_airflow_sqlalchemy_operator.html
  2. 資料庫設定:
    這篇文章 第六點

程式僅用於參考、教學用途,若有問題,請留言告知,非常感謝

二、建立 SQLAlchemyOperator

(一)、專案目錄架構

附上簡單的目錄架構

  1. models 用來存放 SQLAlchemy 定義好的資料表結構
  2. operators 用來存放客製的 operator,例如本次使用的 SQLAlchemyOperator
    https://ithelp.ithome.com.tw/upload/images/20220826/201440246LZjuUzxes.jpg

(二)、SQLAlchemyOperator 撰寫

運作邏輯:

  1. 利用 airflow 內提供的 PostgresHook 建立一個取得 session 的函式
  2. 繼承 PythonOperator 並於 init 當中繼承所有屬性以及添加 conn_id 屬性
  3. 修改 execute_callable 函式使其在呼叫 python function 的時候,會傳入一個名為 session 的物件
"""
ref:https://sorokin.engineer/posts/en/apache_airflow_sqlalchemy_operator.html
"""

from airflow.operators.python import PythonOperator
from airflow.utils.decorators import apply_defaults
from sqlalchemy.orm import sessionmaker, Session
from airflow.hooks.postgres_hook import PostgresHook


def get_session(conn_id: str) -> Session:
    hook = PostgresHook(postgres_conn_id=conn_id)
    engine = hook.get_sqlalchemy_engine()
    return sessionmaker(bind=engine)()


class SQLAlchemyOperator(PythonOperator):
    @apply_defaults
    def __init__(
            self,
            conn_id: str,
            *args, **kwargs):
        self.conn_id = conn_id
        super().__init__(*args, **kwargs)

    def execute_callable(self):
        session = get_session(self.conn_id)
        try:
            result = self.python_callable(*self.op_args, 
                                          session=session, 
                                          **self.op_kwargs)
        except Exception:
            session.rollback()
            raise
        session.commit()
        return result

三、實際操作

(一)、import 套件

備註: Exhibitions 為透過 sqlalchemy 建立的資料表,不清楚的人可以參考 這篇文章

from airflow.decorators import dag
from operators.sqlalchemy_operator import SQLAlchemyOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators.empty import EmptyOperator
from datetime import datetime
from sqlalchemy.orm import Session
from models.exhibitions import Exhibitions, Base
from sqlalchemy import inspect

(二)、定義會使用到的名稱

class Settings:
    conn_id = "postgres_connection"
    start_task_id = "start_task"
    end_task_id = "end_task"
    create_table_task_id = "create_table"

(三)、建立 create_table 函式

我們可以看到,在參數的部分有準備了一個 session 變數,並且提示為 Session 物件,此函式接到 session 後即會開始執行建立資料表的動作,這部分就不贅述了

def create_table(session: Session):
    insp = inspect(session.get_bind())
    if not insp.has_table(Exhibitions.__tablename__):
        Base.metadata.tables[Exhibitions.__tablename__].create(
            session.get_bind())
        print("資料表建立成功")

(四)、建立 dag 並呼叫 create_table 函式

透過使用 SQLAlchemyOperator 建立 operator 在呼叫指定函式時,會自動傳入一個 session 物件,所以在對應的函示當中必須準備一個名為 session 的參數,或是使用 **kwargs 來接,才不會造成錯誤

@dag(start_date=datetime.today(), tags=['user'])
def create_table_dag():
    start_task = EmptyOperator(task_id=Settings.start_task_id)

    create_table_task = SQLAlchemyOperator(task_id=Settings.create_table_task_id,
                                           python_callable=create_table,
                                           conn_id=Settings.conn_id,
                                           trigger_rule=TriggerRule.ALWAYS)

    end_task = EmptyOperator(task_id=Settings.end_task_id,
                             trigger_rule=TriggerRule.ALWAYS)

    start_task >> create_table_task >> end_task


create_create_table_dag = create_table_dag()

圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言