接續昨天的透過 IAM 取得 token 之後,今天就要來 create S3 Bucket,接著使用 Airflow 的 S3Hook 來存取 AWS S3 的檔案。
下方基本上都是照著預設,有些部分可以就我所知說明一下:
Security-Token
和 Signature
才行昨天的方法ㄧ是透過 Airflow Web UI 來設定 Connection
打開 airflow project,在 docker-compose.yaml 的 environment 當中加上 AWS_ACCESS_KEY_ID
和 AWS_SECRET_ACCESS_KEY
,下方為官方範例,要修改成昨天 csv 檔案當中的值。
x-airflow-common:
&airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.10.1}
# build: .
environment:
...
AWS_ACCESS_KEY_ID: AKIAIOSFODNN7EXAMPLE
AWS_SECRET_ACCESS_KEY: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
def s3_extract():
source_s3_key = "s3_demo/s3_extract.txt"
source_s3_bucket = "ithome-it30-demo"
dest_file_path = "/opt/airflow/dags/"
source_s3 = S3Hook(aws_conn_id="aws_default")
source_s3.download_file(
key=source_s3_key,
bucket_name=source_s3_bucket,
local_path=dest_file_path
)
ithome-it30-demo
)中的指定路徑(s3_demo/s3_extract.txt
)下載文件到本地目錄裡(/opt/airflow/dags/
)source_s3 = S3Hook(aws_conn_id="aws_default")
使用 Airflow 的 S3Hook 來與 AWS S3 進行互動,並使用已經在 Airflow UI 中配置的 AWS 連接(aws_default
)。source_s3.download_file()
調用了 S3Hook 的 download_file
方法,將檔案從 S3 下載到指定的本地路徑。def s3_upload():
source_s3_key = "s3_demo/s3_extract.txt"
source_s3_bucket = "ithome-it30-demo"
dest_file_path = "/opt/airflow/dags/s3_extract.txt"
source_s3 = S3Hook(aws_conn_id="aws_default")
source_s3.load_file(
filename=dest_file_path,
key=source_s3_key,
bucket_name=source_s3_bucket
)
download_file
改成 load_file
就能將檔案從 local 上傳至 AWS S3 了from datetime import datetime
from airflow.decorators import task, dag
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
@task
def s3_extract():
source_s3_key = "s3_demo/s3_extract.txt"
source_s3_bucket = "ithome-it30-demo"
dest_file_path = "/opt/airflow/dags/"
source_s3 = S3Hook(aws_conn_id="aws_default")
source_s3.download_file(
key=source_s3_key,
bucket_name=source_s3_bucket,
local_path=dest_file_path
)
@dag(
dag_id="s3_extract_taskflow",
start_date=datetime(2024, 9, 18),
schedule=None,
catchup=False,
)
def s3_extract_dag():
s3_extract()
s3_extract_dag()