""" Unified CLI for the YOLO detection pipeline. Subcommands (all take a positional boss name): build kulemak [--ratio 0.85] [--seed 42] Split raw/ -> dataset/ train kulemak [--epochs 200] [--name X] Train model (auto-increments name) runs kulemak List training runs + metrics table annotate kulemak [dir] Launch annotation GUI prelabel kulemak [dir] [--model boss-kulemak] Auto-label unlabeled images export kulemak [--imgsz 640] Export .pt to ONNX format """ import argparse import csv import os import random import re import shutil import sys # ── Shared constants ───────────────────────────────────────────── SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) REPO_ROOT = os.path.abspath(os.path.join(SCRIPT_DIR, "..", "..")) RUNS_DIR = os.path.join(REPO_ROOT, "runs", "detect") MODELS_DIR = os.path.join(SCRIPT_DIR, "models") def raw_dir(boss): return os.path.join(REPO_ROOT, "training-data", boss, "raw") def dataset_dir(boss): return os.path.join(REPO_ROOT, "training-data", boss, "dataset") def boss_classes(boss): """Single-class list for the given boss.""" return [boss] # ── build ──────────────────────────────────────────────────────── def cmd_build(args): """Scan raw/, split labeled+empty images into dataset train/valid.""" import glob as g boss = args.boss raw = raw_dir(boss) dataset = dataset_dir(boss) classes = boss_classes(boss) images = sorted(g.glob(os.path.join(raw, "*.jpg"))) labeled, empty = [], [] skipped = 0 for img in images: txt = os.path.splitext(img)[0] + ".txt" if not os.path.exists(txt): skipped += 1 continue with open(txt) as f: content = f.read().strip() if content: labeled.append(img) else: empty.append(img) print(f"Raw: {len(images)} images -- {len(labeled)} labeled, " f"{len(empty)} empty (negative), {skipped} unlabeled (skipped)") if not labeled and not empty: print("Nothing to build.") return rng = random.Random(args.seed) rng.shuffle(labeled) rng.shuffle(empty) def split(lst, ratio): n = max(1, round(len(lst) * ratio)) if lst else 0 return lst[:n], lst[n:] train_labeled, valid_labeled = split(labeled, args.ratio) train_empty, valid_empty = split(empty, args.ratio) train_files = train_labeled + train_empty valid_files = valid_labeled + valid_empty # Wipe and recreate for sub in ("train/images", "train/labels", "valid/images", "valid/labels"): d = os.path.join(dataset, sub) if os.path.exists(d): shutil.rmtree(d) os.makedirs(d) def copy_files(file_list, split_name): for img in file_list: txt = os.path.splitext(img)[0] + ".txt" base = os.path.basename(img) base_txt = os.path.splitext(base)[0] + ".txt" shutil.copy2(img, os.path.join(dataset, split_name, "images", base)) shutil.copy2(txt, os.path.join(dataset, split_name, "labels", base_txt)) copy_files(train_files, "train") copy_files(valid_files, "valid") # Write data.yaml yaml_path = os.path.join(dataset, "data.yaml") with open(yaml_path, "w") as f: f.write(f"train: {os.path.join(dataset, 'train', 'images')}\n") f.write(f"val: {os.path.join(dataset, 'valid', 'images')}\n\n") f.write(f"nc: {len(classes)}\n") f.write(f"names: {classes}\n") # Delete stale label caches for root, dirs, files in os.walk(dataset): for fn in files: if fn == "labels.cache": os.remove(os.path.join(root, fn)) print(f"\nTrain: {len(train_files)} ({len(train_labeled)} labeled + {len(train_empty)} empty)") print(f"Valid: {len(valid_files)} ({len(valid_labeled)} labeled + {len(valid_empty)} empty)") print(f"data.yaml: {yaml_path}") # ── runs ───────────────────────────────────────────────────────── def _parse_simple_yaml(path): """Parse flat key: value YAML without requiring PyYAML.""" result = {} if not os.path.exists(path): return result with open(path) as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue if ": " in line: key, val = line.split(": ", 1) # Try to cast to int/float/bool/null val = val.strip() if val == "null" or val == "~": val = None elif val == "true": val = True elif val == "false": val = False else: try: val = int(val) except ValueError: try: val = float(val) except ValueError: pass result[key.strip()] = val return result def cmd_runs(args): """List training runs with metrics for the given boss.""" boss = args.boss prefix = f"{boss}-v" if not os.path.isdir(RUNS_DIR): print(f"No runs directory: {RUNS_DIR}") return run_dirs = sorted( [d for d in os.listdir(RUNS_DIR) if os.path.isdir(os.path.join(RUNS_DIR, d)) and d.startswith(prefix)], key=lambda d: _run_sort_key(d) ) if not run_dirs: print(f"No {prefix}* runs found.") return rows = [] for name in run_dirs: run_path = os.path.join(RUNS_DIR, name) args_file = os.path.join(run_path, "args.yaml") csv_file = os.path.join(run_path, "results.csv") model = epochs_cfg = imgsz = "?" if os.path.exists(args_file): cfg = _parse_simple_yaml(args_file) model = os.path.splitext(os.path.basename(str(cfg.get("model", "?"))))[0] epochs_cfg = str(cfg.get("epochs", "?")) imgsz = str(cfg.get("imgsz", "?")) mAP50 = mAP50_95 = prec = rec = "-" actual_epochs = "?" status = "unknown" if os.path.exists(csv_file): with open(csv_file) as f: reader = csv.DictReader(f) best_map = -1 best_row = None last_epoch = 0 for row in reader: row = {k.strip(): v.strip() for k, v in row.items()} ep = int(row.get("epoch", 0)) last_epoch = max(last_epoch, ep) val = float(row.get("metrics/mAP50(B)", 0)) if val > best_map: best_map = val best_row = row if best_row: mAP50 = f"{float(best_row.get('metrics/mAP50(B)', 0)):.3f}" mAP50_95 = f"{float(best_row.get('metrics/mAP50-95(B)', 0)):.3f}" prec = f"{float(best_row.get('metrics/precision(B)', 0)):.3f}" rec = f"{float(best_row.get('metrics/recall(B)', 0)):.3f}" actual_epochs = str(last_epoch) try: if int(epochs_cfg) > last_epoch + 1: status = "early-stop" else: status = "done" except ValueError: status = "?" epoch_str = f"{actual_epochs}/{epochs_cfg}" rows.append((name, model, epoch_str, imgsz, mAP50, mAP50_95, prec, rec, status)) headers = ("Run", "Model", "Epochs", "ImgSz", "mAP50", "mAP50-95", "P", "R", "Status") widths = [max(len(h), max(len(r[i]) for r in rows)) for i, h in enumerate(headers)] header_line = " ".join(h.ljust(w) for h, w in zip(headers, widths)) print(header_line) print(" ".join("-" * w for w in widths)) for row in rows: print(" ".join(val.ljust(w) for val, w in zip(row, widths))) def _run_sort_key(name): m = re.search(r"(\d+)", name) return int(m.group(1)) if m else 0 # ── train ──────────────────────────────────────────────────────── def cmd_train(args): """Train a YOLO model, auto-incrementing run name per boss.""" boss = args.boss # Auto-increment name: {boss}-v1, {boss}-v2, ... if args.name is None: prefix = f"{boss}-v" highest = 0 if os.path.isdir(RUNS_DIR): for d in os.listdir(RUNS_DIR): m = re.match(re.escape(prefix) + r"(\d+)", d) if m: highest = max(highest, int(m.group(1))) args.name = f"{prefix}{highest + 1}" print(f"Auto-assigned run name: {args.name}") if args.data is None: args.data = os.path.join(dataset_dir(boss), "data.yaml") if not os.path.exists(args.data): print(f"data.yaml not found: {args.data}") print(f"Run 'python manage.py build {boss}' first.") return # Pass boss name so train.py can name the output model args.boss = boss from train import run_training run_training(args) # ── annotate ───────────────────────────────────────────────────── def cmd_annotate(args): """Launch annotation GUI for the given boss.""" boss = args.boss img_dir = args.dir or raw_dir(boss) classes = boss_classes(boss) from annotate import run_annotator run_annotator(img_dir, classes) # ── prelabel ───────────────────────────────────────────────────── def cmd_prelabel(args): """Auto-label unlabeled images.""" boss = args.boss if args.img_dir is None: args.img_dir = raw_dir(boss) if args.model == _PRELABEL_MODEL_DEFAULT: args.model = f"boss-{boss}" from prelabel import run_prelabel run_prelabel(args) _PRELABEL_MODEL_DEFAULT = "__auto__" # ── export ────────────────────────────────────────────────────── def cmd_export(args): """Export .pt model to .onnx format for ONNX Runtime inference.""" boss = args.boss model_name = f"boss-{boss}" pt_path = os.path.join(MODELS_DIR, f"{model_name}.pt") if not os.path.exists(pt_path): print(f"Model not found: {pt_path}") return from ultralytics import YOLO model = YOLO(pt_path) print(f"Exporting {pt_path} -> ONNX (imgsz={args.imgsz})...") model.export(format="onnx", imgsz=args.imgsz, opset=17, simplify=True, dynamic=False) # ultralytics writes the .onnx next to the .pt file onnx_src = os.path.join(MODELS_DIR, f"{model_name}.onnx") if os.path.exists(onnx_src): size_mb = os.path.getsize(onnx_src) / (1024 * 1024) print(f"\nExported: {onnx_src} ({size_mb:.1f} MB)") else: print(f"\nWarning: expected output not found at {onnx_src}") # ── CLI ────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser( description="YOLO detection pipeline manager", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__, ) sub = parser.add_subparsers(dest="command") # annotate p = sub.add_parser("annotate", help="Launch annotation GUI") p.add_argument("boss", help="Boss name (e.g. kulemak)") p.add_argument("dir", nargs="?", default=None, help="Image directory (default: training-data/{boss}/raw)") # build p = sub.add_parser("build", help="Build dataset from raw/") p.add_argument("boss", help="Boss name (e.g. kulemak)") p.add_argument("--ratio", type=float, default=0.85, help="Train ratio (default 0.85)") p.add_argument("--seed", type=int, default=42, help="Random seed") # train p = sub.add_parser("train", help="Train YOLO model") p.add_argument("boss", help="Boss name (e.g. kulemak)") p.add_argument("--data", default=None, help="Path to data.yaml") p.add_argument("--model", default="yolo11n", help="YOLO model variant") p.add_argument("--epochs", type=int, default=200, help="Training epochs") p.add_argument("--imgsz", type=int, default=640, help="Image size") p.add_argument("--batch", type=int, default=16, help="Batch size") p.add_argument("--device", default="0", help="CUDA device") p.add_argument("--name", default=None, help="Run name (auto-increments if omitted)") # runs p = sub.add_parser("runs", help="List training runs with metrics") p.add_argument("boss", help="Boss name (e.g. kulemak)") # prelabel p = sub.add_parser("prelabel", help="Pre-label unlabeled images") p.add_argument("boss", help="Boss name (e.g. kulemak)") p.add_argument("img_dir", nargs="?", default=None, help="Image directory") p.add_argument("--model", default=_PRELABEL_MODEL_DEFAULT, help="Model name in models/ (default: boss-{boss})") p.add_argument("--conf", type=float, default=0.20, help="Confidence threshold") # export p = sub.add_parser("export", help="Export .pt model to ONNX format") p.add_argument("boss", help="Boss name (e.g. kulemak)") p.add_argument("--imgsz", type=int, default=640, help="Image size for export") args = parser.parse_args() if args.command is None: parser.print_help() return commands = { "annotate": cmd_annotate, "build": cmd_build, "train": cmd_train, "runs": cmd_runs, "prelabel": cmd_prelabel, "export": cmd_export, } commands[args.command](args) if __name__ == "__main__": main()