#!/usr/bin/env python3 from __future__ import annotations import argparse import csv import hashlib import json import logging import math import os import shlex import shutil import subprocess import sys import time from dataclasses import asdict, dataclass from datetime import UTC, datetime from pathlib import Path from typing import Any DEFAULT_HOST_FFMPEG = "/mnt/vol1/work/compress_test/ffmpeg-master-latest-linux64-gpl/bin/ffmpeg" DEFAULT_HOST_FFPROBE = "/mnt/vol1/work/compress_test/ffmpeg-master-latest-linux64-gpl/bin/ffprobe" DEFAULT_SOURCE_DIR = "/mnt/vol1/work/compress_test/sources" DEFAULT_RESULTS_ROOT = "/mnt/vol1/work/compress_test/results" DEFAULT_CONTAINER_NAME = "immich_server" DEFAULT_CONTAINER_WORKDIR = "/tmp/qsv_vmaf_benchmark" DEFAULT_QUALITIES = [20, 24, 28, 32] DEFAULT_PRESETS = [1, 3, 5, 7] DEFAULT_CODECS = ["h264", "hevc"] DEFAULT_RESOLUTIONS = ["source", "max4k", "max1440p", "max1080p", "max720p"] DEFAULT_RESOLUTION_CAPS = { "source": None, "max4k": (3840, 2160), "max1440p": (2560, 1440), "max1080p": (1920, 1080), "max720p": (1280, 720), } TERMINAL_STATUSES = { "completed", "encode_failed", "copy_back_failed", "probe_failed", "vmaf_failed", "cleanup_failed", } @dataclass(frozen=True) class SourceInfo: path: Path stem: str width: int height: int fps: float duration: float size_bytes: int has_audio: bool @dataclass(frozen=True) class ResolutionTarget: label: str width: int height: int duplicate_of: str | None = None @dataclass(frozen=True) class Job: job_id: str source_path: str source_stem: str codec: str encoder: str resolution_label: str target_width: int target_height: int global_quality: int preset: int clip_seconds: float has_audio: bool class CommandError(RuntimeError): def __init__(self, message: str, result: dict[str, Any]): super().__init__(message) self.result = result def utc_now() -> str: return datetime.now(UTC).replace(microsecond=0).isoformat() def parse_csv_ints(value: str) -> list[int]: items = [] for part in value.split(","): part = part.strip() if not part: continue items.append(int(part)) if not items: raise argparse.ArgumentTypeError("expected at least one integer") return items def parse_csv_strings(value: str) -> list[str]: items = [part.strip() for part in value.split(",") if part.strip()] if not items: raise argparse.ArgumentTypeError("expected at least one value") return items def default_run_name() -> str: return f"qsv-vmaf-{datetime.now().strftime('%Y%m%d-%H%M%S')}" def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="Run unattended QSV encode benchmarks in an Immich container and compute host-side VMAF." ) parser.add_argument("--source-dir", default=DEFAULT_SOURCE_DIR) parser.add_argument("--results-root", default=DEFAULT_RESULTS_ROOT) parser.add_argument("--run-name", default=default_run_name()) parser.add_argument("--host-ffmpeg", default=DEFAULT_HOST_FFMPEG) parser.add_argument("--host-ffprobe", default=DEFAULT_HOST_FFPROBE) parser.add_argument("--container-name", default=DEFAULT_CONTAINER_NAME) parser.add_argument("--container-workdir", default=DEFAULT_CONTAINER_WORKDIR) parser.add_argument("--clip-seconds", type=float, default=20.0) parser.add_argument("--qualities", type=parse_csv_ints, default=DEFAULT_QUALITIES) parser.add_argument("--presets", type=parse_csv_ints, default=DEFAULT_PRESETS) parser.add_argument("--codecs", type=parse_csv_strings, default=DEFAULT_CODECS) parser.add_argument("--resolutions", type=parse_csv_strings, default=DEFAULT_RESOLUTIONS) parser.add_argument("--vmaf-threads", type=int, default=4) parser.add_argument("--max-jobs", type=int, default=None) parser.add_argument("--source-limit", type=int, default=None) parser.add_argument("--retry-failed", action="store_true") parser.add_argument("--keep-encoded", action="store_true") parser.add_argument("--dry-run", action="store_true") return parser def shjoin(command: list[str]) -> str: return shlex.join(command) def run_command( command: list[str], *, cwd: Path | None = None, logger: logging.Logger | None = None, log_path: Path | None = None, check: bool = True, ) -> dict[str, Any]: start = time.monotonic() completed = subprocess.run( command, cwd=str(cwd) if cwd else None, capture_output=True, text=True, encoding="utf-8", errors="replace", ) elapsed = time.monotonic() - start result = { "command": command, "command_text": shjoin(command), "returncode": completed.returncode, "stdout": completed.stdout, "stderr": completed.stderr, "elapsed_seconds": elapsed, } if log_path is not None: log_path.parent.mkdir(parents=True, exist_ok=True) with log_path.open("w", encoding="utf-8") as handle: handle.write(f"started_at: {utc_now()}\n") handle.write(f"command: {result['command_text']}\n") handle.write(f"elapsed_seconds: {elapsed:.6f}\n") handle.write(f"returncode: {completed.returncode}\n") handle.write("\n[stdout]\n") handle.write(completed.stdout) handle.write("\n[stderr]\n") handle.write(completed.stderr) if logger is not None: logger.debug("ran command in %.3fs: %s", elapsed, result["command_text"]) if check and completed.returncode != 0: raise CommandError(f"command failed with exit code {completed.returncode}", result) return result def setup_logging(run_dir: Path) -> logging.Logger: logger = logging.getLogger("qsv_vmaf_benchmark") logger.setLevel(logging.INFO) logger.handlers.clear() formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s") file_handler = logging.FileHandler(run_dir / "run.log", encoding="utf-8") file_handler.setFormatter(formatter) console_handler = logging.StreamHandler(sys.stdout) console_handler.setFormatter(formatter) logger.addHandler(file_handler) logger.addHandler(console_handler) return logger def ensure_file_exists(path: Path, description: str) -> None: if not path.is_file(): raise FileNotFoundError(f"{description} not found: {path}") def ensure_directory(path: Path, description: str) -> None: if not path.is_dir(): raise FileNotFoundError(f"{description} not found: {path}") def load_json(path: Path) -> Any: with path.open("r", encoding="utf-8") as handle: return json.load(handle) def write_json(path: Path, payload: Any) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8") as handle: json.dump(payload, handle, ensure_ascii=True, indent=2, sort_keys=True) handle.write("\n") def append_jsonl(path: Path, payload: dict[str, Any]) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("a", encoding="utf-8") as handle: handle.write(json.dumps(payload, ensure_ascii=True, sort_keys=True)) handle.write("\n") def load_jsonl_by_job_id(path: Path) -> dict[str, dict[str, Any]]: if not path.exists(): return {} records: dict[str, dict[str, Any]] = {} with path.open("r", encoding="utf-8") as handle: for line in handle: line = line.strip() if not line: continue record = json.loads(line) records[record["job_id"]] = record return records def write_results_csv(path: Path, records: dict[str, dict[str, Any]]) -> None: fieldnames = [ "job_id", "status", "source_path", "source_stem", "codec", "encoder", "resolution_label", "target_width", "target_height", "global_quality", "preset", "clip_seconds", "has_audio", "encode_elapsed_seconds", "encode_realtime_factor", "encode_fps", "vmaf_elapsed_seconds", "total_elapsed_seconds", "output_size_bytes", "output_bitrate_bps", "output_duration_seconds", "output_nb_frames", "vmaf_mean", "vmaf_harmonic_mean", "encode_log_path", "vmaf_log_path", "output_path", "error_summary", "completed_at", ] path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8", newline="") as handle: writer = csv.DictWriter(handle, fieldnames=fieldnames) writer.writeheader() for record in sorted(records.values(), key=lambda item: item["job_id"]): row = {key: record.get(key) for key in fieldnames} writer.writerow(row) def get_host_ffmpeg_version(ffmpeg_path: Path) -> str: result = run_command([str(ffmpeg_path), "-hide_banner", "-version"], check=True) return result["stdout"].splitlines()[0].strip() def get_container_ffmpeg_version(container_name: str) -> str: result = run_command(["docker", "exec", container_name, "ffmpeg", "-hide_banner", "-version"], check=True) return result["stdout"].splitlines()[0].strip() def ensure_prerequisites(args: argparse.Namespace, logger: logging.Logger) -> None: ensure_directory(Path(args.source_dir), "source directory") ensure_file_exists(Path(args.host_ffmpeg), "host ffmpeg") ensure_file_exists(Path(args.host_ffprobe), "host ffprobe") docker_check = run_command(["docker", "ps", "--format", "{{.Names}}"], check=True) container_names = {line.strip() for line in docker_check["stdout"].splitlines() if line.strip()} if args.container_name not in container_names: raise RuntimeError(f"container not running: {args.container_name}") host_filters = run_command([args.host_ffmpeg, "-hide_banner", "-filters"], check=True) if "libvmaf" not in host_filters["stdout"]: raise RuntimeError("host ffmpeg does not provide libvmaf") container_encoders = run_command( ["docker", "exec", args.container_name, "ffmpeg", "-hide_banner", "-encoders"], check=True, ) for required_encoder in ("h264_qsv", "hevc_qsv"): if required_encoder not in container_encoders["stdout"]: raise RuntimeError(f"container ffmpeg does not provide {required_encoder}") run_command( [ "docker", "exec", args.container_name, "sh", "-lc", f"mkdir -p {shlex.quote(args.container_workdir)}/sources {shlex.quote(args.container_workdir)}/outputs", ], check=True, ) logger.info("prerequisites verified") def ffprobe_json(command: list[str]) -> dict[str, Any]: result = run_command(command, check=True) return json.loads(result["stdout"]) def probe_source(path: Path, ffprobe_path: Path) -> SourceInfo: payload = ffprobe_json( [ str(ffprobe_path), "-hide_banner", "-v", "error", "-print_format", "json", "-show_streams", "-show_format", str(path), ] ) video_stream = next(stream for stream in payload["streams"] if stream.get("codec_type") == "video") has_audio = any(stream.get("codec_type") == "audio" for stream in payload["streams"]) fps_num, fps_den = video_stream["r_frame_rate"].split("/") fps = float(fps_num) / float(fps_den) return SourceInfo( path=path, stem=path.stem, width=int(video_stream["width"]), height=int(video_stream["height"]), fps=fps, duration=float(payload["format"]["duration"]), size_bytes=int(payload["format"]["size"]), has_audio=has_audio, ) def even_floor(value: float) -> int: floored = int(math.floor(value)) if floored % 2 == 1: floored -= 1 return max(2, floored) def compute_resolution_targets(source: SourceInfo, labels: list[str]) -> tuple[list[ResolutionTarget], list[dict[str, Any]]]: targets: list[ResolutionTarget] = [] skipped: list[dict[str, Any]] = [] seen: dict[tuple[int, int], str] = {} for label in labels: if label not in DEFAULT_RESOLUTION_CAPS: raise ValueError(f"unsupported resolution label: {label}") cap = DEFAULT_RESOLUTION_CAPS[label] if cap is None: width = source.width height = source.height else: max_w, max_h = cap scale = min(1.0, max_w / source.width, max_h / source.height) width = even_floor(source.width * scale) height = even_floor(source.height * scale) dims = (width, height) if dims in seen: skipped.append( { "source_path": str(source.path), "source_stem": source.stem, "requested_label": label, "duplicate_of": seen[dims], "width": width, "height": height, "reason": "duplicate_resolution", "recorded_at": utc_now(), } ) continue seen[dims] = label targets.append(ResolutionTarget(label=label, width=width, height=height)) return targets, skipped def encoder_for_codec(codec: str) -> str: if codec == "h264": return "h264_qsv" if codec == "hevc": return "hevc_qsv" raise ValueError(f"unsupported codec: {codec}") def build_job(source: SourceInfo, target: ResolutionTarget, codec: str, quality: int, preset: int, clip_seconds: float) -> Job: encoder = encoder_for_codec(codec) identity = "|".join( [ str(source.path), codec, target.label, f"{target.width}x{target.height}", str(quality), str(preset), f"{clip_seconds:.3f}", ] ) job_id = hashlib.sha1(identity.encode("utf-8")).hexdigest()[:16] return Job( job_id=job_id, source_path=str(source.path), source_stem=source.stem, codec=codec, encoder=encoder, resolution_label=target.label, target_width=target.width, target_height=target.height, global_quality=quality, preset=preset, clip_seconds=clip_seconds, has_audio=source.has_audio, ) def build_jobs(args: argparse.Namespace, sources: list[SourceInfo]) -> tuple[list[Job], list[dict[str, Any]]]: jobs: list[Job] = [] skipped_records: list[dict[str, Any]] = [] for source in sources: targets, skipped = compute_resolution_targets(source, args.resolutions) skipped_records.extend(skipped) for codec in args.codecs: for target in targets: for quality in args.qualities: for preset in args.presets: jobs.append(build_job(source, target, codec, quality, preset, args.clip_seconds)) if args.max_jobs is not None: jobs = jobs[: args.max_jobs] return jobs, skipped_records def make_source_index(sources: list[SourceInfo]) -> dict[str, SourceInfo]: return {str(source.path): source for source in sources} def container_source_name(source: SourceInfo) -> str: digest = hashlib.sha1(str(source.path).encode("utf-8")).hexdigest()[:10] return f"{source.stem}-{digest}{source.path.suffix.lower()}" def append_result( record: dict[str, Any], *, results_jsonl: Path, results_csv: Path, records_by_job: dict[str, dict[str, Any]], ) -> None: records_by_job[record["job_id"]] = record append_jsonl(results_jsonl, record) write_results_csv(results_csv, records_by_job) def short_error_summary(stderr: str, limit: int = 400) -> str: text = stderr.strip().splitlines() if not text: return "" summary = text[-1].strip() if len(summary) > limit: summary = summary[: limit - 3] + "..." return summary def probe_output_video(ffprobe_path: Path, output_path: Path) -> dict[str, Any]: payload = ffprobe_json( [ str(ffprobe_path), "-hide_banner", "-v", "error", "-print_format", "json", "-show_streams", "-show_format", str(output_path), ] ) video_stream = next(stream for stream in payload["streams"] if stream.get("codec_type") == "video") nb_frames_raw = video_stream.get("nb_frames") return { "width": int(video_stream["width"]), "height": int(video_stream["height"]), "duration_seconds": float(payload["format"]["duration"]), "size_bytes": int(payload["format"]["size"]), "bit_rate_bps": int(payload["format"].get("bit_rate") or 0), "nb_frames": int(nb_frames_raw) if nb_frames_raw and nb_frames_raw.isdigit() else None, } def parse_vmaf_metrics(vmaf_json_path: Path) -> tuple[float | None, float | None]: payload = load_json(vmaf_json_path) pooled = payload.get("pooled_metrics", {}) vmaf = pooled.get("vmaf", {}) mean = vmaf.get("mean") harmonic_mean = vmaf.get("harmonic_mean") return ( float(mean) if mean is not None else None, float(harmonic_mean) if harmonic_mean is not None else None, ) def job_output_filename(job: Job) -> str: return ( f"{job.source_stem}__{job.codec}__{job.resolution_label}__" f"{job.target_width}x{job.target_height}__gq{job.global_quality}__p{job.preset}.mp4" ) def run_job( job: Job, *, args: argparse.Namespace, source: SourceInfo, host_output_dir: Path, job_dir: Path, logger: logging.Logger, ) -> dict[str, Any]: started_at = utc_now() encode_log_path = job_dir / f"{job.job_id}.encode.log" vmaf_log_path = job_dir / f"{job.job_id}.vmaf.log" vmaf_json_path = job_dir / f"{job.job_id}.vmaf.json" host_output_path = host_output_dir / job_output_filename(job) container_source_path = ( Path(args.container_workdir) / "sources" / container_source_name(source) ).as_posix() container_output_path = ( Path(args.container_workdir) / "outputs" / f"{job.job_id}.mp4" ).as_posix() output_probe: dict[str, Any] | None = None encode_result: dict[str, Any] | None = None vmaf_result: dict[str, Any] | None = None vmaf_mean: float | None = None vmaf_harmonic_mean: float | None = None clip_seconds = min(source.duration, job.clip_seconds) scale_needed = source.width != job.target_width or source.height != job.target_height def cleanup_container_output() -> None: run_command( ["docker", "exec", args.container_name, "rm", "-f", container_output_path], logger=logger, check=False, ) def cleanup_host_output_best_effort() -> None: if args.keep_encoded: return try: host_output_path.unlink(missing_ok=True) except OSError as exc: logger.warning("failed to delete host artifact %s: %s", host_output_path, exc) encode_command = [ "docker", "exec", args.container_name, "ffmpeg", "-hide_banner", "-y", "-v", "verbose", "-hwaccel", "qsv", "-hwaccel_output_format", "qsv", "-async_depth", "4", "-noautorotate", "-qsv_device", "/dev/dri/renderD128", "-threads", "1", "-t", f"{clip_seconds:.3f}", "-i", container_source_path, "-movflags", "faststart", "-fps_mode", "passthrough", "-map", "0:v:0", "-map_metadata", "-1", ] if job.has_audio: encode_command.extend(["-map", "0:a:0", "-c:a", "aac"]) if scale_needed: encode_command.extend(["-vf", f"scale_qsv=w={job.target_width}:h={job.target_height}"]) encode_command.extend( [ "-c:v", job.encoder, "-bf", "7", "-refs", "5", "-g", "256", "-preset", str(job.preset), "-global_quality:v", str(job.global_quality), ] ) if job.codec == "hevc": encode_command.extend(["-tag:v", "hvc1"]) encode_command.append(container_output_path) try: encode_result = run_command(encode_command, logger=logger, log_path=encode_log_path, check=True) except CommandError as exc: cleanup_container_output() return { **asdict(job), "status": "encode_failed", "started_at": started_at, "completed_at": utc_now(), "encode_elapsed_seconds": exc.result["elapsed_seconds"], "total_elapsed_seconds": exc.result["elapsed_seconds"], "encode_log_path": str(encode_log_path), "vmaf_log_path": str(vmaf_log_path), "output_path": str(host_output_path), "error_summary": short_error_summary(exc.result["stderr"]), } try: run_command( ["docker", "cp", f"{args.container_name}:{container_output_path}", str(host_output_path)], logger=logger, check=True, ) except CommandError as exc: cleanup_container_output() return { **asdict(job), "status": "copy_back_failed", "started_at": started_at, "completed_at": utc_now(), "encode_elapsed_seconds": encode_result["elapsed_seconds"], "total_elapsed_seconds": encode_result["elapsed_seconds"] + exc.result["elapsed_seconds"], "encode_log_path": str(encode_log_path), "vmaf_log_path": str(vmaf_log_path), "output_path": str(host_output_path), "error_summary": short_error_summary(exc.result["stderr"]), } try: output_probe = probe_output_video(Path(args.host_ffprobe), host_output_path) except Exception as exc: cleanup_container_output() cleanup_host_output_best_effort() return { **asdict(job), "status": "probe_failed", "started_at": started_at, "completed_at": utc_now(), "encode_elapsed_seconds": encode_result["elapsed_seconds"], "total_elapsed_seconds": encode_result["elapsed_seconds"], "encode_log_path": str(encode_log_path), "vmaf_log_path": str(vmaf_log_path), "output_path": str(host_output_path), "error_summary": str(exc), } output_width = output_probe["width"] output_height = output_probe["height"] filtergraph = ( f"[0:v]settb=AVTB,setpts=PTS-STARTPTS[dist];" f"[1:v]trim=duration={clip_seconds:.3f},settb=AVTB,setpts=PTS-STARTPTS," f"scale={output_width}:{output_height}:flags=bicubic[ref];" f"[dist][ref]libvmaf=log_fmt=json:log_path={str(vmaf_json_path)}:n_threads={args.vmaf_threads}[vmafout]" ) vmaf_command = [ args.host_ffmpeg, "-hide_banner", "-v", "info", "-noautorotate", "-i", str(host_output_path), "-noautorotate", "-i", str(source.path), "-lavfi", filtergraph, "-map", "[vmafout]", "-f", "null", "-", ] try: vmaf_result = run_command(vmaf_command, logger=logger, log_path=vmaf_log_path, check=True) vmaf_mean, vmaf_harmonic_mean = parse_vmaf_metrics(vmaf_json_path) except Exception as exc: error_summary = str(exc) if isinstance(exc, CommandError): error_summary = short_error_summary(exc.result["stderr"]) cleanup_container_output() cleanup_host_output_best_effort() return { **asdict(job), "status": "vmaf_failed", "started_at": started_at, "completed_at": utc_now(), "encode_elapsed_seconds": encode_result["elapsed_seconds"], "vmaf_elapsed_seconds": vmaf_result["elapsed_seconds"] if vmaf_result else None, "total_elapsed_seconds": encode_result["elapsed_seconds"] + (vmaf_result["elapsed_seconds"] if vmaf_result else 0.0), "output_size_bytes": output_probe["size_bytes"], "output_bitrate_bps": output_probe["bit_rate_bps"], "output_duration_seconds": output_probe["duration_seconds"], "output_nb_frames": output_probe["nb_frames"], "encode_log_path": str(encode_log_path), "vmaf_log_path": str(vmaf_log_path), "output_path": str(host_output_path), "error_summary": error_summary, } output_frames = output_probe["nb_frames"] if output_frames is None: output_frames = int(round(output_probe["duration_seconds"] * source.fps)) encode_fps = output_frames / encode_result["elapsed_seconds"] if encode_result["elapsed_seconds"] > 0 else None encode_realtime = output_probe["duration_seconds"] / encode_result["elapsed_seconds"] if encode_result["elapsed_seconds"] > 0 else None cleanup_error = None if not args.keep_encoded: try: host_output_path.unlink(missing_ok=True) except OSError as exc: cleanup_error = str(exc) status = "completed" if cleanup_error is None else "cleanup_failed" error_summary = cleanup_error or "" cleanup_container_output() return { **asdict(job), "status": status, "started_at": started_at, "completed_at": utc_now(), "encode_elapsed_seconds": encode_result["elapsed_seconds"], "encode_realtime_factor": encode_realtime, "encode_fps": encode_fps, "vmaf_elapsed_seconds": vmaf_result["elapsed_seconds"], "total_elapsed_seconds": encode_result["elapsed_seconds"] + vmaf_result["elapsed_seconds"], "output_size_bytes": output_probe["size_bytes"], "output_bitrate_bps": output_probe["bit_rate_bps"], "output_duration_seconds": output_probe["duration_seconds"], "output_nb_frames": output_probe["nb_frames"], "vmaf_mean": vmaf_mean, "vmaf_harmonic_mean": vmaf_harmonic_mean, "encode_log_path": str(encode_log_path), "vmaf_log_path": str(vmaf_log_path), "output_path": str(host_output_path), "error_summary": error_summary, } def sync_source_to_container(args: argparse.Namespace, source: SourceInfo, logger: logging.Logger) -> str: source_name = container_source_name(source) container_target = (Path(args.container_workdir) / "sources" / source_name).as_posix() run_command( ["docker", "exec", args.container_name, "rm", "-f", container_target], logger=logger, check=False, ) run_command( ["docker", "cp", str(source.path), f"{args.container_name}:{container_target}"], logger=logger, check=True, ) return container_target def remove_source_from_container(args: argparse.Namespace, source: SourceInfo, logger: logging.Logger) -> None: container_target = (Path(args.container_workdir) / "sources" / container_source_name(source)).as_posix() run_command( ["docker", "exec", args.container_name, "rm", "-f", container_target], logger=logger, check=False, ) def validate_manifest(existing: dict[str, Any], expected: dict[str, Any]) -> None: comparable_keys = [ "source_dir", "clip_seconds", "qualities", "presets", "codecs", "resolutions", "container_name", "container_workdir", "host_ffmpeg", "host_ffprobe", ] for key in comparable_keys: if existing.get("config", {}).get(key) != expected.get("config", {}).get(key): raise RuntimeError(f"existing run manifest does not match current configuration for {key}") def create_manifest(args: argparse.Namespace, sources: list[SourceInfo], jobs: list[Job]) -> dict[str, Any]: return { "created_at": utc_now(), "config": { "source_dir": str(Path(args.source_dir).resolve()), "clip_seconds": args.clip_seconds, "qualities": args.qualities, "presets": args.presets, "codecs": args.codecs, "resolutions": args.resolutions, "container_name": args.container_name, "container_workdir": args.container_workdir, "host_ffmpeg": str(Path(args.host_ffmpeg).resolve()), "host_ffprobe": str(Path(args.host_ffprobe).resolve()), "keep_encoded": args.keep_encoded, }, "environment": { "host_ffmpeg_version": get_host_ffmpeg_version(Path(args.host_ffmpeg)), "container_ffmpeg_version": get_container_ffmpeg_version(args.container_name), "python": sys.version, }, "sources": [ { "path": str(source.path), "stem": source.stem, "width": source.width, "height": source.height, "fps": source.fps, "duration": source.duration, "size_bytes": source.size_bytes, "has_audio": source.has_audio, } for source in sources ], "job_count": len(jobs), } def collect_sources(args: argparse.Namespace) -> list[SourceInfo]: source_dir = Path(args.source_dir) files = sorted(source_dir.glob("*.mp4")) if args.source_limit is not None: files = files[: args.source_limit] if not files: raise RuntimeError(f"no source videos found in {source_dir}") ffprobe_path = Path(args.host_ffprobe) return [probe_source(path, ffprobe_path) for path in files] def should_skip_existing(record: dict[str, Any], retry_failed: bool) -> bool: if not record: return False if record["status"] == "completed": return True if record["status"] in TERMINAL_STATUSES and not retry_failed: return True return False def main() -> int: args = build_parser().parse_args() run_dir = Path(args.results_root) / args.run_name jobs_dir = run_dir / "jobs" artifacts_dir = run_dir / "artifacts" manifest_path = run_dir / "manifest.json" results_jsonl = run_dir / "results.jsonl" results_csv = run_dir / "results.csv" skipped_jsonl = run_dir / "skipped.jsonl" run_dir.mkdir(parents=True, exist_ok=True) jobs_dir.mkdir(parents=True, exist_ok=True) artifacts_dir.mkdir(parents=True, exist_ok=True) logger = setup_logging(run_dir) ensure_prerequisites(args, logger) sources = collect_sources(args) source_index = make_source_index(sources) jobs, skipped_records = build_jobs(args, sources) manifest = create_manifest(args, sources, jobs) if manifest_path.exists(): validate_manifest(load_json(manifest_path), manifest) else: write_json(manifest_path, manifest) for record in skipped_records: append_jsonl(skipped_jsonl, record) existing_records = load_jsonl_by_job_id(results_jsonl) write_results_csv(results_csv, existing_records) total_pending_jobs = sum( 1 for job in jobs if not should_skip_existing(existing_records.get(job.job_id), args.retry_failed) ) logger.info("run directory: %s", run_dir) logger.info("sources: %d, jobs: %d", len(sources), len(jobs)) if args.dry_run: logger.info("dry run requested; exiting before executing jobs") return 0 processed = 0 skipped_existing = 0 source_to_jobs: dict[str, list[Job]] = {} for job in jobs: source_to_jobs.setdefault(job.source_path, []).append(job) for source_path, source_jobs in source_to_jobs.items(): source = source_index[source_path] pending_jobs = [ job for job in source_jobs if not should_skip_existing(existing_records.get(job.job_id), args.retry_failed) ] skipped_existing += len(source_jobs) - len(pending_jobs) if not pending_jobs: logger.info("skipping source with no pending jobs: %s", source.path.name) continue logger.info("syncing source into container: %s", source.path.name) sync_source_to_container(args, source, logger) try: for index, job in enumerate(pending_jobs, start=1): processed += 1 logger.info( "job %d/%d source=%s codec=%s resolution=%s quality=%s preset=%s", processed, total_pending_jobs, source.path.name, job.codec, f"{job.target_width}x{job.target_height}", job.global_quality, job.preset, ) record = run_job( job, args=args, source=source, host_output_dir=artifacts_dir, job_dir=jobs_dir, logger=logger, ) append_result(record, results_jsonl=results_jsonl, results_csv=results_csv, records_by_job=existing_records) logger.info( "job finished status=%s vmaf=%s size=%s bytes", record["status"], record.get("vmaf_mean"), record.get("output_size_bytes"), ) finally: remove_source_from_container(args, source, logger) logger.info("done; completed or recorded jobs: %d", len(existing_records)) return 0 if __name__ == "__main__": try: raise SystemExit(main()) except KeyboardInterrupt: print("interrupted", file=sys.stderr) raise SystemExit(130)