當運行workflow後,UI顯示提交的mean_value_wf的workflow與task版本(sha)。
亦可透過flytectl獲取task tag驗證。
flytectl get tasks -p flytesnacks -d development
當我認為該workflow與task是穩妥的,可自行指定workflow與task版本。
# -p 方案名稱,預設flytesnacks
# -d stage,預設development
pyflyte register mean_value_wf.py --version v1
當我撰寫另一隻workflow時,直接引用gen_nums,不須重複實作。
隨後新增median task計算中位數,並運行
import flytekit as fl
from typing import List
@fl.reference_task(
project="flytesnacks",
domain="development",
name="mean_value_wf.gen_nums",
version="MM8KuoLF9BMRH5bWskuf1Q", #或是剛剛註冊的v1
)
def gen_nums(low: int, high: int, length: int) -> List[int]:
...
@fl.task()
def median(nums: List[int]) -> int:
nums_sorted = sorted(nums)
n = len(nums_sorted)
mid = n // 2
if n % 2 == 1:
return nums_sorted[mid]
return round((nums_sorted[mid - 1] + nums_sorted[mid]) / 2)
@fl.workflow()
def calculate_median_num_wf(low: int, high: int, length: int):
nums = gen_nums(low, high, length)
median_num = median(nums)
return median
提交新workflow,gen_nums版本不會有新版本,應維持兩種。並新增median與
calculate_median_num_wf兩者之tag。
pyflyte run --remote median_wf.py calculate_median_num_wf --low="0" --high="100" --length="100"