flyte 也支持透過python函數一步步定義workflow。
import flytekit as fl
from typing import List
import random
@fl.task()
def gen_nums(low: int, high: int, length: int) -> List[int]:
return [random.randint(low, high) for _ in range(length)]
@fl.task()
def mean(nums: List[int]) -> float:
return sum(nums) / len(nums)
imperative_wf = fl.Workflow(name="imperative_get_mean_wf")
imperative_wf.add_workflow_input("low", int)
imperative_wf.add_workflow_input("high", int)
imperative_wf.add_workflow_input("length", int)
node_t1 = imperative_wf.add_entity(gen_nums, low=imperative_wf.inputs["low"], high=imperative_wf.inputs["high"], length=imperative_wf.inputs["length"])
node_t2 = imperative_wf.add_entity(mean, node_t1.outputs["o0"])
imperative_wf.add_workflow.output("wf_output", node_t2.outputs["o0"])