iT邦幫忙

2025 iThome 鐵人賽

DAY 18
0
Cloud Native

新興k8s工作流flyte與MLOps。系列 第 18

Day 18: 撰寫範例(修改中)

  • 分享至 

  • xImage
  •  

撰寫下載youtube影片並下載模型判斷的流程,明天會在修改。

#!/usr/bin/env python3
import yt_dlp
import os
import flytekit as fl
from flytekit import WorkflowFailurePolicy
from flytekit.types.error.error import FlyteError
import typing
from pathlib import Path

yt_image = fl.ImageSpec(
    packages=["yt-dlp"],
    python_version="3.12",
    registry="localhost:30000"
)


@fl.task(container_image=yt_image)
def download_mp4(url: str, output_filename: str = "yt.mp4") -> fl.FlyteFile:
    """
    Download video as MP4 with specified filename
    
    Args:
        url (str): Video URL
        output_filename (str): Output filename (with or without .mp4 extension)
    
    Returns:
        bool: True if successful, False if failed
    """
    if not output_filename.endswith('.mp4'):
        output_filename += '.mp4'

    out_path = str(fl.current_context().working_directory) + f"/{output_filename}"
    
    ydl_opts = {
        'format': 'best[ext=mp4]/best',  # Best MP4 format
        'outtmpl': str(out_path),      # Custom filename
        'noplaylist': True,              # Single video only
    }
    
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        ydl.download([url])
    return fl.FlyteFile(path=str(out_path))

@fl.task
def clean_up(err: typing.Optional[FlyteError] = None):
    print(f"Dsiable workflow due to {err}")

@fl.workflow() #(on_failure=clean_up, failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
def wf1(url: str, filename: str) -> fl.FlyteFile:
    f = download_mp4(url, filename)
    return f
import os
import subprocess
import flytekit as fl
from pathlib import Path

ffmpeg_image = fl.ImageSpec(
    packages=["ffmpeg"],
    python_version="3.12",
    registry="localhost:30000"
)

@fl.task(container_image=ffmpeg_image)
def split_video(input_file_metadata: fl.FlyteFile, segment_duration: int=60) -> fl.FlyteDirectory:
    """Split video into 60-second segments using ffmpeg."""
    input_file_metadata.download()
    input_file = str(input_file_metadata.path)

    # Check if input file exists
    if not os.path.exists(input_file):
        print(f"File {input_file} not found!")
        return
    
    # Get file name without extension
    name = input_file.split('.')[0]
    ext = input_file.split('.')[-1]
    
    # Split command
    local_dir = Path(fl.current_context().working_directory) / "split_video"
    local_dir.mkdir(exist_ok=True)
    output_pattern = str(local_dir / f"{name}_%03d.{ext}")
    cmd = [
        'ffmpeg', 
        '-i', input_file,
        '-c', 'copy',
        '-map', '0',
        '-segment_time', str(segment_duration),
        '-f', 'segment',
        '-reset_timestamps', '1',
        output_pattern
    ]
    
    print(f"Splitting {input_file} into {segment_duration}-second segments...")
    
    subprocess.run(cmd, check=True)
    print("Done! Files created:")
    print(os.listdir(local_dir))
    return fl.FlyteDirectory(path=str(local_dir))
from ultralytics import YOLO
import os
from pathlib import Path
import flytekit as fl

model_image = fl.ImageSpec(
    packages=["ultralytics"],
    python_version="3.12",
    registry="localhost:30000"
)

@fl.task(container_image=model_image)
def predict_videos(d: fl.FlyteDirectory) -> fl.FlyteDirectory:
    model = YOLO("yolo11n.pt")

    working_dir = fl.current_context().working_directory
    local_dir = Path(working_dir) / "output"
    local_dir.mkdir(exist_ok=True)
    output_path = str(local_dir)

    d.download()
    source_dir = str(d.path)
    print(os.listdir(source_dir))
    for f_path in os.listdir(source_dir):
        file = source_dir + "/f_path"
        model.predict(file, save=True, conf=0.5,project=output_path, name="demo_results", exist_ok=True)
    
    return fl.FlyteDirectory(path=output_path)

上一篇
Day 17 : ONNX 轉換plugin example
下一篇
Day 19: imperative workflow
系列文
新興k8s工作流flyte與MLOps。19
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言