今天介紹一個可以取代設定 cronjob 好用的工具 airflow.設定 cronjob 必須預估每個 job 的執行時間然後定排程,而且如果有多台機器的話沒辦法看出整個工作流程,只能到每台機器看 cronjob 的設定去看幾點會跑什麼之類的,但透過 airflow 可以把這些 job 定義成一個流程可以很方便的看出整個 work flow 很方便.
安裝 airflow
pip3 install apache-airflow
airflow 需要一個 database 來存放它的資料,這邊使用預設的 sqlite db.
> airflow initdb
[2019-10-09 11:04:41,590] {__init__.py:51} INFO - Using executor SequentialExecutor
DB: sqlite:////Users/daniel/airflow/airflow.db
[2019-10-09 11:04:41,994] {db.py:369} INFO - Creating tables
INFO [alembic.runtime.migration] Context impl SQLiteImpl.
INFO [alembic.runtime.migration] Will assume non-transactional DDL.
INFO [alembic.runtime.migration] Running upgrade -> e3a246e0dc1, current schema
INFO [alembic.runtime.migration] Running upgrade e3a246e0dc1 -> 1507a7289a2f, create is_encrypted
/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/alembic/ddl/sqlite.py:39: UserWarning: Skipping unsupported ALTER for creation of implicit constraint
"Skipping unsupported ALTER for "
INFO [alembic.runtime.migration] Running upgrade 1507a7289a2f -> 13eb55f81627, maintain history for compatibility with earlier migrations
......
airflow 已經有提供 sample 可以測試,測試程式內容如下.
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('tutorial', default_args=default_args, schedule_interval=timedelta(days=1))
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
執行測試使用 airflow 指令,因為是測試用第一個參數使用 test,第二個參數 tutorial
是上面程式有定義的 DAG ID,第三個參數 print_date
是指上面程式定義的 task_id,第四個參數 2015-06-01
是指執行日期.結果遇到 ValueError: unknown locale: UTF-8
錯誤.
> airflow test tutorial print_date 2015-06-01
[2019-10-09 11:28:07,867] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-10-09 11:28:08,188] {dagbag.py:90} INFO - Filling up the DagBag from /Users/daniel/airflow/dags
[2019-10-09 11:28:08,234] {taskinstance.py:620} INFO - Dependencies all met for <TaskInstance: tutorial.print_date 2015-06-01T00:00:00+00:00 [None]>
[2019-10-09 11:28:08,238] {taskinstance.py:620} INFO - Dependencies all met for <TaskInstance: tutorial.print_date 2015-06-01T00:00:00+00:00 [None]>
[2019-10-09 11:28:08,238] {taskinstance.py:838} INFO -
--------------------------------------------------------------------------------
[2019-10-09 11:28:08,238] {taskinstance.py:839} INFO - Starting attempt 1 of 2
[2019-10-09 11:28:08,238] {taskinstance.py:840} INFO -
--------------------------------------------------------------------------------
[2019-10-09 11:28:08,239] {taskinstance.py:859} INFO - Executing <Task(BashOperator): print_date> on 2015-06-01T00:00:00+00:00
[2019-10-09 11:28:08,247] {taskinstance.py:1051} ERROR - unknown locale: UTF-8
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 895, in _run_raw_task
context = self.get_template_context()
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper
return func(*args, **kwargs)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1133, in get_template_context
ds = self.execution_date.strftime('%Y-%m-%d')
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pendulum/mixins/default.py", line 124, in strftime
return self.format(fmt, _locale.getlocale()[0], 'classic')
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/locale.py", line 587, in getlocale
return _parse_localename(localename)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/locale.py", line 495, in _parse_localename
raise ValueError('unknown locale: %s' % localename)
ValueError: unknown locale: UTF-8
在 ~/.bash_profile
加上下面兩行,然後再 source ~/.bash_profile
export LC_ALL=en_US.UTF-8
export LANG=en_US.UTF-8
重新執行測試就可以了
> airflow test tutorial print_date 2015-06-01
[2019-10-09 12:14:08,800] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-10-09 12:14:09,117] {dagbag.py:90} INFO - Filling up the DagBag from /Users/daniel/airflow/dags
[2019-10-09 12:14:09,163] {taskinstance.py:620} INFO - Dependencies all met for <TaskInstance: tutorial.print_date 2015-06-01T00:00:00+00:00 [None]>
[2019-10-09 12:14:09,167] {taskinstance.py:620} INFO - Dependencies all met for <TaskInstance: tutorial.print_date 2015-06-01T00:00:00+00:00 [None]>
[2019-10-09 12:14:09,167] {taskinstance.py:838} INFO -
--------------------------------------------------------------------------------
[2019-10-09 12:14:09,167] {taskinstance.py:839} INFO - Starting attempt 1 of 2
[2019-10-09 12:14:09,167] {taskinstance.py:840} INFO -
--------------------------------------------------------------------------------
[2019-10-09 12:14:09,168] {taskinstance.py:859} INFO - Executing <Task(BashOperator): print_date> on 2015-06-01T00:00:00+00:00
[2019-10-09 12:14:09,180] {bash_operator.py:81} INFO - Tmp dir root location:
/var/folders/8v/9r5mf0y506734gvj_j337w7w0000gn/T
[2019-10-09 12:14:09,181] {bash_operator.py:91} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_ID=tutorial
AIRFLOW_CTX_TASK_ID=print_date
AIRFLOW_CTX_EXECUTION_DATE=2015-06-01T00:00:00+00:00
[2019-10-09 12:14:09,182] {bash_operator.py:105} INFO - Temporary script location: /var/folders/8v/9r5mf0y506734gvj_j337w7w0000gn/T/airflowtmpquddjput/print_datef1o0u8do
[2019-10-09 12:14:09,182] {bash_operator.py:115} INFO - Running command: date
[2019-10-09 12:14:09,194] {bash_operator.py:124} INFO - Output:
[2019-10-09 12:14:09,200] {bash_operator.py:128} INFO - Tue Oct 8 12:14:09 CST 2019
[2019-10-09 12:14:09,201] {bash_operator.py:132} INFO - Command exited with return code 0
首先會有三個 python 程式,都各自 print 各自的內容.
work_flow1.py
print('this is local workflow1')
work_flow2.py
print('this is remote workflow2')
work_flow3.py
print('this is local workflow3')
要測試的 work flow 如下,決定 work flow 後開始定義 DAG definition file,建立一支 work_flow.py,這邊要去了解到四個物件 SSHHook、BashOperator、SFTPOperator、SSHOperator.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.sftp_operator import SFTPOperator
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.contrib.hooks.ssh_hook import SSHHook
# 定義預設參數 `owner`一定要給,`start_date`代表要從哪一天開始執行,
# 如果設 datetime(2015, 6, 1) schedule_interval 設定每天跑一次,
# 就會從 2015年6月1號、2號、3號補跑,一直跑到目前系統的日期
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2019, 10, 8),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('myworkflowtest', default_args=default_args, schedule_interval=timedelta(days=1))
# 定義 SSHHook 存要連線主機的資訊
myhook = SSHHook(
remote_host='192.168.1.11',
username='test',
password='test123',
port='22',
)
# 1. 先在本機執行 work_flow1.py (使用 BashOperator)
my_flow1 = BashOperator(
task_id='run_work_flow1',
bash_command='python3 /Volumes/Transcend/pylearn/work_flow1.py',
dag=dag)
# 2. 再把 work_flow2.py 丟到另一台主機上 (使用 SSHHook、 SFTPOperator)
my_flow2_1 = SFTPOperator(
task_id="copy_work_flow2",
ssh_hook=myhook,
local_filepath="/Volumes/Transcend/pylearn/work_flow2.py",
remote_filepath="/home/miuser/pylearn/work_flow2.py",
operation="put",
create_intermediate_dirs=True,
dag=dag
)
# 3. 在另外一台主機執行 work_flow2.py (使用 SSHHook、 SSHOperator)
my_flow2_2 = SSHOperator(
task_id="run_work_flow2",
ssh_hook=myhook,
command='python3 /home/miuser/pylearn/work_flow2.py',
dag=dag
)
# 4. 然後在本機執行 work_flow3.py (使用 BashOperator)
my_flow3 = BashOperator(
task_id='run_work_flow3',
bash_command='python3 /Volumes/Transcend/pylearn/work_flow3.py',
dag=dag)
# 5. 把 1、2、3、4 的流程整個串起來.
my_flow1 >> my_flow2_1 >> my_flow2_2 >> my_flow3
建好之後執行看看遇到No module named 'paramiko'
> python3 work_flow.py
Traceback (most recent call last):
File "work_flow.py", line 3, in <module>
from airflow.contrib.operators.sftp_operator import SFTPOperator
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/airflow/contrib/operators/sftp_operator.py", line 21, in <module>
from airflow.contrib.hooks.ssh_hook import SSHHook
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/airflow/contrib/hooks/ssh_hook.py", line 24, in <module>
import paramiko
ModuleNotFoundError: No module named 'paramiko'
安裝 paramiko 模組
pip3 install paramiko
再次執行又遇到No module named 'sshtunnel'
> python3 work_flow.py
Traceback (most recent call last):
File "work_flow.py", line 3, in <module>
from airflow.contrib.operators.sftp_operator import SFTPOperator
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/airflow/contrib/operators/sftp_operator.py", line 21, in <module>
from airflow.contrib.hooks.ssh_hook import SSHHook
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/airflow/contrib/hooks/ssh_hook.py", line 26, in <module>
from sshtunnel import SSHTunnelForwarder
ModuleNotFoundError: No module named 'sshtunnel'
安裝 sshtunnel 模組
pip3 install sshtunnel
之後執行終於正常了.
python3 work_flow.py
接著要把這檔案複製到 airflow 的 dags 目錄底下,如果目錄不存在就自己建立 /Users/daniel/airflow/dags
> cp work_flow.py /Users/daniel/airflow/dags
上面的路徑是看 /Users/daniel/airflow/airflow.cfg
設定檔裡有個 dags_folder
參數.
dags_folder = /Users/daniel/airflow/dags
執行 airflow list_dags
指令看目前有哪些 dags 可以執行.可以看到定義的 DAG myworkflowtest
有出現,代表建立成功了.
> airflow list_dags
[2019-10-09 14:13:51,691] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-10-09 14:13:52,028] {dagbag.py:90} INFO - Filling up the DagBag from /Users/daniel/airflow/dags
-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
......
latest_only
latest_only_with_trigger
myworkflowtest
test_utils
tutorial
都定義好之後,把 airflow scheduler 啟動起來,scheduler 會用來控管 DAG 的排程.
> airflow scheduler
啟動 airflow webserver,啟動後打開網頁http://localhost:8080
就可以看到 web UI.
> airflow webserver
[2019-10-09 11:16:50,303] {__init__.py:51} INFO - Using executor SequentialExecutor
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
[2019-10-09 11:16:50,962] {dagbag.py:90} INFO - Filling up the DagBag from /Users/daniel/airflow/dags
Running the Gunicorn Server with:
Workers: 4 sync
Host: 0.0.0.0:8080
Timeout: 120
Logfiles: - -
=================================================================
[2019-10-09 11:16:51 +0800] [8013] [INFO] Starting gunicorn 19.9.0
[2019-10-09 11:16:51 +0800] [8013] [INFO] Listening at: http://0.0.0.0:8080 (8013)
[2019-10-09 11:16:51 +0800] [8013] [INFO] Using worker: sync
......
打開(on) 並啟動定義的 DAG
可以看到四個 task 最後都成功變成都是綠色的.
透過了 airflow 把不同的程式,整合成一個 data flow 不用去定義 cron job,而且會造順序執行不同的程式,非常方便.airflow 還有很多功能跟細節可以好好研究一下.