from __future__ import annotations from pathlib import Path from typing import Callable from .indexers import ( build_filename_matched_records, build_paired_folder_records, build_prefixed_paired_records, build_pre_split_records, build_stem_paired_records, build_xml_annotation_records, ) from .records import SegSampleRecord def _build_bus_uclm(root: Path) -> list[SegSampleRecord]: return build_paired_folder_records( dataset_name="BUS-UCLM", image_dir=root / "images", mask_dir=root / "masks", ) def _build_tg3k(root: Path) -> list[SegSampleRecord]: return build_paired_folder_records( dataset_name="TG3K", image_dir=root / "thyroid-image", mask_dir=root / "thyroid-mask", ) def _build_tn3k(root: Path) -> list[SegSampleRecord]: return build_pre_split_records( dataset_name="TN3K", train_image_dir=root / "trainval-image", train_mask_dir=root / "trainval-mask", test_image_dir=root / "test-image", test_mask_dir=root / "test-mask", ) def _build_otu_2d(root: Path) -> list[SegSampleRecord]: return build_stem_paired_records( dataset_name="OTU_2d", image_dir=root / "images", mask_dir=root / "annotations", ) def _build_busi(root: Path) -> list[SegSampleRecord]: base = root / "Dataset_BUSI_with_GT" records: list[SegSampleRecord] = [] for class_name in ["benign", "malignant", "normal"]: class_dir = base / class_name records.extend( build_filename_matched_records( dataset_name="BUSI", folder=class_dir, class_name=class_name, ) ) return records def _build_bus_bra(root: Path) -> list[SegSampleRecord]: image_dir = root / "BUSBRA" / "BUSBRA" / "Images" mask_dir = root / "BUSBRA" / "BUSBRA" / "Masks" return build_prefixed_paired_records( dataset_name="BUS-BRA", image_dir=image_dir, mask_dir=mask_dir, image_prefix_to_strip="bus_", mask_prefix_to_strip="mask_", ) def _build_bus_uc(root: Path) -> list[SegSampleRecord]: base = root / "BUS_UC" / "BUS_UC" records: list[SegSampleRecord] = [] records.extend( build_paired_folder_records( dataset_name="BUS_UC", image_dir=base / "All" / "images", mask_dir=base / "All" / "masks", split="all", class_name="all", ) ) records.extend( build_paired_folder_records( dataset_name="BUS_UC", image_dir=base / "Benign" / "images", mask_dir=base / "Benign" / "masks", class_name="benign", ) ) records.extend( build_paired_folder_records( dataset_name="BUS_UC", image_dir=base / "Malignant" / "images", mask_dir=base / "Malignant" / "masks", class_name="malignant", ) ) return records def _build_ccaui(root: Path) -> list[SegSampleRecord]: base = root / "Common Carotid Artery Ultrasound Images" return build_paired_folder_records( dataset_name="CCAUI", image_dir=base / "US images", mask_dir=base / "Expert mask images", ) def _build_ddti(root: Path) -> list[SegSampleRecord]: return build_xml_annotation_records( dataset_name="DDTI", root=root, ) DATASET_REGISTRY: dict[str, Callable[[Path], list[SegSampleRecord]]] = { "BUS-UCLM": _build_bus_uclm, "TG3K": _build_tg3k, "TN3K": _build_tn3k, "OTU_2d": _build_otu_2d, "BUSI": _build_busi, "BUS-BRA": _build_bus_bra, "BUS_UC": _build_bus_uc, "CCAUI": _build_ccaui, "DDTI": _build_ddti, } def build_dataset_index(dataset_name: str, root: str | Path) -> list[SegSampleRecord]: builder = DATASET_REGISTRY.get(dataset_name) if builder is None: raise ValueError( f"Unsupported dataset '{dataset_name}'. Expected one of: {', '.join(DATASET_REGISTRY)}." ) root = Path(root) if not root.exists(): raise FileNotFoundError(f"Dataset root not found: {root}") return builder(root) __all__ = ["DATASET_REGISTRY", "build_dataset_index"]