386 lines
14 KiB
Python
386 lines
14 KiB
Python
"""
|
|
Unified CLI for the YOLO detection pipeline.
|
|
|
|
Subcommands (all take a positional boss name):
|
|
build kulemak [--ratio 0.85] [--seed 42] Split raw/ -> dataset/
|
|
train kulemak [--epochs 200] [--name X] Train model (auto-increments name)
|
|
runs kulemak List training runs + metrics table
|
|
annotate kulemak [dir] Launch annotation GUI
|
|
prelabel kulemak [dir] [--model boss-kulemak] Auto-label unlabeled images
|
|
export kulemak [--imgsz 640] Export .pt to ONNX format
|
|
"""
|
|
|
|
import argparse
|
|
import csv
|
|
import os
|
|
import random
|
|
import re
|
|
import shutil
|
|
import sys
|
|
|
|
# ── Shared constants ─────────────────────────────────────────────
|
|
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
REPO_ROOT = os.path.abspath(os.path.join(SCRIPT_DIR, "..", ".."))
|
|
RUNS_DIR = os.path.join(REPO_ROOT, "runs", "detect")
|
|
MODELS_DIR = os.path.join(SCRIPT_DIR, "models")
|
|
|
|
|
|
def raw_dir(boss):
|
|
return os.path.join(REPO_ROOT, "training-data", boss, "raw")
|
|
|
|
def dataset_dir(boss):
|
|
return os.path.join(REPO_ROOT, "training-data", boss, "dataset")
|
|
|
|
def boss_classes(boss):
|
|
"""Single-class list for the given boss."""
|
|
return [boss]
|
|
|
|
|
|
# ── build ────────────────────────────────────────────────────────
|
|
def cmd_build(args):
|
|
"""Scan raw/, split labeled+empty images into dataset train/valid."""
|
|
import glob as g
|
|
|
|
boss = args.boss
|
|
raw = raw_dir(boss)
|
|
dataset = dataset_dir(boss)
|
|
classes = boss_classes(boss)
|
|
|
|
images = sorted(g.glob(os.path.join(raw, "*.jpg")))
|
|
|
|
labeled, empty = [], []
|
|
skipped = 0
|
|
for img in images:
|
|
txt = os.path.splitext(img)[0] + ".txt"
|
|
if not os.path.exists(txt):
|
|
skipped += 1
|
|
continue
|
|
with open(txt) as f:
|
|
content = f.read().strip()
|
|
if content:
|
|
labeled.append(img)
|
|
else:
|
|
empty.append(img)
|
|
|
|
print(f"Raw: {len(images)} images -- {len(labeled)} labeled, "
|
|
f"{len(empty)} empty (negative), {skipped} unlabeled (skipped)")
|
|
|
|
if not labeled and not empty:
|
|
print("Nothing to build.")
|
|
return
|
|
|
|
rng = random.Random(args.seed)
|
|
rng.shuffle(labeled)
|
|
rng.shuffle(empty)
|
|
|
|
def split(lst, ratio):
|
|
n = max(1, round(len(lst) * ratio)) if lst else 0
|
|
return lst[:n], lst[n:]
|
|
|
|
train_labeled, valid_labeled = split(labeled, args.ratio)
|
|
train_empty, valid_empty = split(empty, args.ratio)
|
|
|
|
train_files = train_labeled + train_empty
|
|
valid_files = valid_labeled + valid_empty
|
|
|
|
# Wipe and recreate
|
|
for sub in ("train/images", "train/labels", "valid/images", "valid/labels"):
|
|
d = os.path.join(dataset, sub)
|
|
if os.path.exists(d):
|
|
shutil.rmtree(d)
|
|
os.makedirs(d)
|
|
|
|
def copy_files(file_list, split_name):
|
|
for img in file_list:
|
|
txt = os.path.splitext(img)[0] + ".txt"
|
|
base = os.path.basename(img)
|
|
base_txt = os.path.splitext(base)[0] + ".txt"
|
|
shutil.copy2(img, os.path.join(dataset, split_name, "images", base))
|
|
shutil.copy2(txt, os.path.join(dataset, split_name, "labels", base_txt))
|
|
|
|
copy_files(train_files, "train")
|
|
copy_files(valid_files, "valid")
|
|
|
|
# Write data.yaml
|
|
yaml_path = os.path.join(dataset, "data.yaml")
|
|
with open(yaml_path, "w") as f:
|
|
f.write(f"train: {os.path.join(dataset, 'train', 'images')}\n")
|
|
f.write(f"val: {os.path.join(dataset, 'valid', 'images')}\n\n")
|
|
f.write(f"nc: {len(classes)}\n")
|
|
f.write(f"names: {classes}\n")
|
|
|
|
# Delete stale label caches
|
|
for root, dirs, files in os.walk(dataset):
|
|
for fn in files:
|
|
if fn == "labels.cache":
|
|
os.remove(os.path.join(root, fn))
|
|
|
|
print(f"\nTrain: {len(train_files)} ({len(train_labeled)} labeled + {len(train_empty)} empty)")
|
|
print(f"Valid: {len(valid_files)} ({len(valid_labeled)} labeled + {len(valid_empty)} empty)")
|
|
print(f"data.yaml: {yaml_path}")
|
|
|
|
|
|
# ── runs ─────────────────────────────────────────────────────────
|
|
def _parse_simple_yaml(path):
|
|
"""Parse flat key: value YAML without requiring PyYAML."""
|
|
result = {}
|
|
if not os.path.exists(path):
|
|
return result
|
|
with open(path) as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line or line.startswith("#"):
|
|
continue
|
|
if ": " in line:
|
|
key, val = line.split(": ", 1)
|
|
# Try to cast to int/float/bool/null
|
|
val = val.strip()
|
|
if val == "null" or val == "~":
|
|
val = None
|
|
elif val == "true":
|
|
val = True
|
|
elif val == "false":
|
|
val = False
|
|
else:
|
|
try:
|
|
val = int(val)
|
|
except ValueError:
|
|
try:
|
|
val = float(val)
|
|
except ValueError:
|
|
pass
|
|
result[key.strip()] = val
|
|
return result
|
|
|
|
|
|
def cmd_runs(args):
|
|
"""List training runs with metrics for the given boss."""
|
|
boss = args.boss
|
|
prefix = f"{boss}-v"
|
|
|
|
if not os.path.isdir(RUNS_DIR):
|
|
print(f"No runs directory: {RUNS_DIR}")
|
|
return
|
|
|
|
run_dirs = sorted(
|
|
[d for d in os.listdir(RUNS_DIR)
|
|
if os.path.isdir(os.path.join(RUNS_DIR, d)) and d.startswith(prefix)],
|
|
key=lambda d: _run_sort_key(d)
|
|
)
|
|
|
|
if not run_dirs:
|
|
print(f"No {prefix}* runs found.")
|
|
return
|
|
|
|
rows = []
|
|
for name in run_dirs:
|
|
run_path = os.path.join(RUNS_DIR, name)
|
|
args_file = os.path.join(run_path, "args.yaml")
|
|
csv_file = os.path.join(run_path, "results.csv")
|
|
|
|
model = epochs_cfg = imgsz = "?"
|
|
if os.path.exists(args_file):
|
|
cfg = _parse_simple_yaml(args_file)
|
|
model = os.path.splitext(os.path.basename(str(cfg.get("model", "?"))))[0]
|
|
epochs_cfg = str(cfg.get("epochs", "?"))
|
|
imgsz = str(cfg.get("imgsz", "?"))
|
|
|
|
mAP50 = mAP50_95 = prec = rec = "-"
|
|
actual_epochs = "?"
|
|
status = "unknown"
|
|
|
|
if os.path.exists(csv_file):
|
|
with open(csv_file) as f:
|
|
reader = csv.DictReader(f)
|
|
best_map = -1
|
|
best_row = None
|
|
last_epoch = 0
|
|
for row in reader:
|
|
row = {k.strip(): v.strip() for k, v in row.items()}
|
|
ep = int(row.get("epoch", 0))
|
|
last_epoch = max(last_epoch, ep)
|
|
val = float(row.get("metrics/mAP50(B)", 0))
|
|
if val > best_map:
|
|
best_map = val
|
|
best_row = row
|
|
if best_row:
|
|
mAP50 = f"{float(best_row.get('metrics/mAP50(B)', 0)):.3f}"
|
|
mAP50_95 = f"{float(best_row.get('metrics/mAP50-95(B)', 0)):.3f}"
|
|
prec = f"{float(best_row.get('metrics/precision(B)', 0)):.3f}"
|
|
rec = f"{float(best_row.get('metrics/recall(B)', 0)):.3f}"
|
|
|
|
actual_epochs = str(last_epoch)
|
|
try:
|
|
if int(epochs_cfg) > last_epoch + 1:
|
|
status = "early-stop"
|
|
else:
|
|
status = "done"
|
|
except ValueError:
|
|
status = "?"
|
|
|
|
epoch_str = f"{actual_epochs}/{epochs_cfg}"
|
|
rows.append((name, model, epoch_str, imgsz, mAP50, mAP50_95, prec, rec, status))
|
|
|
|
headers = ("Run", "Model", "Epochs", "ImgSz", "mAP50", "mAP50-95", "P", "R", "Status")
|
|
widths = [max(len(h), max(len(r[i]) for r in rows)) for i, h in enumerate(headers)]
|
|
|
|
header_line = " ".join(h.ljust(w) for h, w in zip(headers, widths))
|
|
print(header_line)
|
|
print(" ".join("-" * w for w in widths))
|
|
for row in rows:
|
|
print(" ".join(val.ljust(w) for val, w in zip(row, widths)))
|
|
|
|
|
|
def _run_sort_key(name):
|
|
m = re.search(r"(\d+)", name)
|
|
return int(m.group(1)) if m else 0
|
|
|
|
|
|
# ── train ────────────────────────────────────────────────────────
|
|
def cmd_train(args):
|
|
"""Train a YOLO model, auto-incrementing run name per boss."""
|
|
boss = args.boss
|
|
|
|
# Auto-increment name: {boss}-v1, {boss}-v2, ...
|
|
if args.name is None:
|
|
prefix = f"{boss}-v"
|
|
highest = 0
|
|
if os.path.isdir(RUNS_DIR):
|
|
for d in os.listdir(RUNS_DIR):
|
|
m = re.match(re.escape(prefix) + r"(\d+)", d)
|
|
if m:
|
|
highest = max(highest, int(m.group(1)))
|
|
args.name = f"{prefix}{highest + 1}"
|
|
print(f"Auto-assigned run name: {args.name}")
|
|
|
|
if args.data is None:
|
|
args.data = os.path.join(dataset_dir(boss), "data.yaml")
|
|
|
|
if not os.path.exists(args.data):
|
|
print(f"data.yaml not found: {args.data}")
|
|
print(f"Run 'python manage.py build {boss}' first.")
|
|
return
|
|
|
|
# Pass boss name so train.py can name the output model
|
|
args.boss = boss
|
|
|
|
from train import run_training
|
|
run_training(args)
|
|
|
|
|
|
# ── annotate ─────────────────────────────────────────────────────
|
|
def cmd_annotate(args):
|
|
"""Launch annotation GUI for the given boss."""
|
|
boss = args.boss
|
|
img_dir = args.dir or raw_dir(boss)
|
|
classes = boss_classes(boss)
|
|
from annotate import run_annotator
|
|
run_annotator(img_dir, classes)
|
|
|
|
|
|
# ── prelabel ─────────────────────────────────────────────────────
|
|
def cmd_prelabel(args):
|
|
"""Auto-label unlabeled images."""
|
|
boss = args.boss
|
|
if args.img_dir is None:
|
|
args.img_dir = raw_dir(boss)
|
|
if args.model == _PRELABEL_MODEL_DEFAULT:
|
|
args.model = f"boss-{boss}"
|
|
from prelabel import run_prelabel
|
|
run_prelabel(args)
|
|
|
|
_PRELABEL_MODEL_DEFAULT = "__auto__"
|
|
|
|
|
|
# ── export ──────────────────────────────────────────────────────
|
|
def cmd_export(args):
|
|
"""Export .pt model to .onnx format for ONNX Runtime inference."""
|
|
boss = args.boss
|
|
model_name = f"boss-{boss}"
|
|
pt_path = os.path.join(MODELS_DIR, f"{model_name}.pt")
|
|
|
|
if not os.path.exists(pt_path):
|
|
print(f"Model not found: {pt_path}")
|
|
return
|
|
|
|
from ultralytics import YOLO
|
|
model = YOLO(pt_path)
|
|
|
|
print(f"Exporting {pt_path} -> ONNX (imgsz={args.imgsz})...")
|
|
model.export(format="onnx", imgsz=args.imgsz, opset=17, simplify=True, dynamic=False)
|
|
|
|
# ultralytics writes the .onnx next to the .pt file
|
|
onnx_src = os.path.join(MODELS_DIR, f"{model_name}.onnx")
|
|
if os.path.exists(onnx_src):
|
|
size_mb = os.path.getsize(onnx_src) / (1024 * 1024)
|
|
print(f"\nExported: {onnx_src} ({size_mb:.1f} MB)")
|
|
else:
|
|
print(f"\nWarning: expected output not found at {onnx_src}")
|
|
|
|
|
|
# ── CLI ──────────────────────────────────────────────────────────
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="YOLO detection pipeline manager",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog=__doc__,
|
|
)
|
|
sub = parser.add_subparsers(dest="command")
|
|
|
|
# annotate
|
|
p = sub.add_parser("annotate", help="Launch annotation GUI")
|
|
p.add_argument("boss", help="Boss name (e.g. kulemak)")
|
|
p.add_argument("dir", nargs="?", default=None, help="Image directory (default: training-data/{boss}/raw)")
|
|
|
|
# build
|
|
p = sub.add_parser("build", help="Build dataset from raw/")
|
|
p.add_argument("boss", help="Boss name (e.g. kulemak)")
|
|
p.add_argument("--ratio", type=float, default=0.85, help="Train ratio (default 0.85)")
|
|
p.add_argument("--seed", type=int, default=42, help="Random seed")
|
|
|
|
# train
|
|
p = sub.add_parser("train", help="Train YOLO model")
|
|
p.add_argument("boss", help="Boss name (e.g. kulemak)")
|
|
p.add_argument("--data", default=None, help="Path to data.yaml")
|
|
p.add_argument("--model", default="yolo11n", help="YOLO model variant")
|
|
p.add_argument("--epochs", type=int, default=200, help="Training epochs")
|
|
p.add_argument("--imgsz", type=int, default=640, help="Image size")
|
|
p.add_argument("--batch", type=int, default=16, help="Batch size")
|
|
p.add_argument("--device", default="0", help="CUDA device")
|
|
p.add_argument("--name", default=None, help="Run name (auto-increments if omitted)")
|
|
|
|
# runs
|
|
p = sub.add_parser("runs", help="List training runs with metrics")
|
|
p.add_argument("boss", help="Boss name (e.g. kulemak)")
|
|
|
|
# prelabel
|
|
p = sub.add_parser("prelabel", help="Pre-label unlabeled images")
|
|
p.add_argument("boss", help="Boss name (e.g. kulemak)")
|
|
p.add_argument("img_dir", nargs="?", default=None, help="Image directory")
|
|
p.add_argument("--model", default=_PRELABEL_MODEL_DEFAULT, help="Model name in models/ (default: boss-{boss})")
|
|
p.add_argument("--conf", type=float, default=0.20, help="Confidence threshold")
|
|
|
|
# export
|
|
p = sub.add_parser("export", help="Export .pt model to ONNX format")
|
|
p.add_argument("boss", help="Boss name (e.g. kulemak)")
|
|
p.add_argument("--imgsz", type=int, default=640, help="Image size for export")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.command is None:
|
|
parser.print_help()
|
|
return
|
|
|
|
commands = {
|
|
"annotate": cmd_annotate,
|
|
"build": cmd_build,
|
|
"train": cmd_train,
|
|
"runs": cmd_runs,
|
|
"prelabel": cmd_prelabel,
|
|
"export": cmd_export,
|
|
}
|
|
commands[args.command](args)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|