撰寫下載youtube影片並下載模型判斷的流程,明天會在修改。
#!/usr/bin/env python3
import yt_dlp
import os
import flytekit as fl
from flytekit import WorkflowFailurePolicy
from flytekit.types.error.error import FlyteError
import typing
from pathlib import Path
yt_image = fl.ImageSpec(
packages=["yt-dlp"],
python_version="3.12",
registry="localhost:30000"
)
@fl.task(container_image=yt_image)
def download_mp4(url: str, output_filename: str = "yt.mp4") -> fl.FlyteFile:
"""
Download video as MP4 with specified filename
Args:
url (str): Video URL
output_filename (str): Output filename (with or without .mp4 extension)
Returns:
bool: True if successful, False if failed
"""
if not output_filename.endswith('.mp4'):
output_filename += '.mp4'
out_path = str(fl.current_context().working_directory) + f"/{output_filename}"
ydl_opts = {
'format': 'best[ext=mp4]/best', # Best MP4 format
'outtmpl': str(out_path), # Custom filename
'noplaylist': True, # Single video only
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
return fl.FlyteFile(path=str(out_path))
@fl.task
def clean_up(err: typing.Optional[FlyteError] = None):
print(f"Dsiable workflow due to {err}")
@fl.workflow() #(on_failure=clean_up, failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
def wf1(url: str, filename: str) -> fl.FlyteFile:
f = download_mp4(url, filename)
return f
import os
import subprocess
import flytekit as fl
from pathlib import Path
ffmpeg_image = fl.ImageSpec(
packages=["ffmpeg"],
python_version="3.12",
registry="localhost:30000"
)
@fl.task(container_image=ffmpeg_image)
def split_video(input_file_metadata: fl.FlyteFile, segment_duration: int=60) -> fl.FlyteDirectory:
"""Split video into 60-second segments using ffmpeg."""
input_file_metadata.download()
input_file = str(input_file_metadata.path)
# Check if input file exists
if not os.path.exists(input_file):
print(f"File {input_file} not found!")
return
# Get file name without extension
name = input_file.split('.')[0]
ext = input_file.split('.')[-1]
# Split command
local_dir = Path(fl.current_context().working_directory) / "split_video"
local_dir.mkdir(exist_ok=True)
output_pattern = str(local_dir / f"{name}_%03d.{ext}")
cmd = [
'ffmpeg',
'-i', input_file,
'-c', 'copy',
'-map', '0',
'-segment_time', str(segment_duration),
'-f', 'segment',
'-reset_timestamps', '1',
output_pattern
]
print(f"Splitting {input_file} into {segment_duration}-second segments...")
subprocess.run(cmd, check=True)
print("Done! Files created:")
print(os.listdir(local_dir))
return fl.FlyteDirectory(path=str(local_dir))
from ultralytics import YOLO
import os
from pathlib import Path
import flytekit as fl
model_image = fl.ImageSpec(
packages=["ultralytics"],
python_version="3.12",
registry="localhost:30000"
)
@fl.task(container_image=model_image)
def predict_videos(d: fl.FlyteDirectory) -> fl.FlyteDirectory:
model = YOLO("yolo11n.pt")
working_dir = fl.current_context().working_directory
local_dir = Path(working_dir) / "output"
local_dir.mkdir(exist_ok=True)
output_path = str(local_dir)
d.download()
source_dir = str(d.path)
print(os.listdir(source_dir))
for f_path in os.listdir(source_dir):
file = source_dir + "/f_path"
model.predict(file, save=True, conf=0.5,project=output_path, name="demo_results", exist_ok=True)
return fl.FlyteDirectory(path=output_path)