Run in Google Colab
|
View source on GitHub
|
|
Step 7: Iteration Loop#
Now you have a trained model and know where it fails. This step shows how to:
Run a Golden QA check to detect annotation drift
Select the next batch using a hybrid strategy
The hybrid strategy balances:
30% Coverage - Diversity sampling to avoid tunnel vision
70% Targeted - Samples similar to failures
This balance is critical. Only chasing failures creates a model that’s great at edge cases and terrible at normal cases.
[ ]:
!pip install -q scikit-learn
[ ]:
import fiftyone as fo
import fiftyone.brain as fob
from fiftyone import ViewField as F
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from collections import Counter
LABEL_FIELD_2D = "human_detections"
dataset = fo.load_dataset("annotation_tutorial")
# Get schema classes
if "annotation_schema_2d" in dataset.info:
SCHEMA_CLASSES = set(dataset.info["annotation_schema_2d"]["classes"])
else:
SCHEMA_CLASSES = {"Car", "Van", "Truck", "Pedestrian", "Person_sitting", "Cyclist", "Tram", "Misc"}
Golden QA Check#
Before selecting the next batch, verify annotation quality hasn’t drifted. The golden set is a small, carefully reviewed sample we check each iteration.
What to look for:
Label count distribution staying stable
No unexpected empty samples
Class distribution roughly matching earlier rounds
[ ]:
# Load golden QA set (left camera slice)
golden = dataset.load_saved_view("golden_qa").select_group_slices(["left"])
# For tutorial, copy ground_truth to human_detections if not present
for sample in golden:
if sample.ground_truth and not sample[LABEL_FIELD_2D]:
filtered_dets = [
fo.Detection(label=d.label, bounding_box=d.bounding_box)
for d in sample.ground_truth.detections
if d.label in SCHEMA_CLASSES
]
sample[LABEL_FIELD_2D] = fo.Detections(detections=filtered_dets)
sample.save()
print(f"Golden QA set (left camera): {len(golden)} samples")
[ ]:
# Golden QA Check: Compute baseline stats
golden_stats = {
"total_samples": len(golden),
"samples_with_labels": 0,
"total_detections": 0,
"class_counts": Counter()
}
for sample in golden:
if sample[LABEL_FIELD_2D] and len(sample[LABEL_FIELD_2D].detections) > 0:
golden_stats["samples_with_labels"] += 1
golden_stats["total_detections"] += len(sample[LABEL_FIELD_2D].detections)
for det in sample[LABEL_FIELD_2D].detections:
golden_stats["class_counts"][det.label] += 1
print("=" * 40)
print("GOLDEN QA CHECK")
print("=" * 40)
print(f"Samples with labels: {golden_stats['samples_with_labels']}/{golden_stats['total_samples']}")
print(f"Total detections: {golden_stats['total_detections']}")
print(f"Avg detections/sample: {golden_stats['total_detections']/max(1,golden_stats['samples_with_labels']):.1f}")
print(f"\nTop classes:")
for cls, count in golden_stats["class_counts"].most_common(5):
print(f" {cls}: {count}")
print("=" * 40)
print("\nIf these numbers change unexpectedly between iterations,")
print("investigate annotation consistency before continuing.")
[ ]:
# Store golden stats for comparison in future iterations
if "golden_qa_history" not in dataset.info:
dataset.info["golden_qa_history"] = []
dataset.info["golden_qa_history"].append({
"iteration": len(dataset.info["golden_qa_history"]),
"samples_with_labels": golden_stats["samples_with_labels"],
"total_detections": golden_stats["total_detections"],
"top_classes": dict(golden_stats["class_counts"].most_common(5))
})
dataset.save()
print(f"Saved golden QA stats (iteration {len(dataset.info['golden_qa_history'])-1})")
Prepare for Next Batch Selection#
[ ]:
# Get unlabeled groups from pool
pool = dataset.load_saved_view("pool")
pool_left = pool.select_group_slices(["left"])
# Find samples still unlabeled
remaining = pool_left.match(F("annotation_status") == "unlabeled")
remaining_groups = remaining.distinct("group.id")
print(f"Pool (left camera): {len(pool_left)} samples")
print(f"Remaining unlabeled: {len(remaining)} samples ({len(remaining_groups)} groups)")
[ ]:
# Get failure samples from evaluation
try:
failures = dataset.load_saved_view("eval_v0_failures")
print(f"Failure samples: {len(failures)}")
except:
failures = dataset.limit(0) # Empty view
print("No failure view found. Run Step 6 first, or continue with coverage-only selection.")
Define Acquisition Budget#
Batch sizing guidance:
Size batches to your labeling capacity
For this tutorial, we’ll select ~20% of remaining groups
[ ]:
# Select batch size based on remaining pool
batch_size = max(10, int(0.20 * len(remaining_groups)))
# Split: 30% coverage (ZCore), 70% targeted
coverage_budget = int(0.30 * batch_size)
targeted_budget = batch_size - coverage_budget
print(f"Batch v1 budget: {batch_size} groups")
print(f" Coverage (diversity): {coverage_budget} (30%)")
print(f" Targeted (failures): {targeted_budget} (70%)")
Part 1: Coverage Selection (30%)#
Use ZCore scores computed in Step 3 to select diverse groups from remaining pool.
[ ]:
# Get samples with ZCore scores from remaining pool
remaining_with_scores = remaining.match(F("zcore") != None)
if len(remaining_with_scores) == 0:
print("No ZCore scores found in remaining pool. Using random coverage selection.")
# Random fallback
coverage_groups = list(remaining_groups)[:coverage_budget]
else:
# Build group -> score mapping
group_scores = {}
for sample in remaining_with_scores:
group_scores[sample.group.id] = sample.zcore
# Sort and select top groups
sorted_groups = sorted(group_scores.items(), key=lambda x: x[1], reverse=True)
coverage_groups = [gid for gid, _ in sorted_groups[:coverage_budget]]
print(f"Coverage selection: {len(coverage_groups)} groups")
Part 2: Targeted Selection (70%)#
Find groups similar to failures using embedding-based neighbor search.
[ ]:
def find_neighbor_groups(query_embs, query_group_ids, pool_embs, pool_group_ids, n_per_query=3):
"""Find nearest neighbor groups in embedding space."""
if len(query_embs) == 0 or len(pool_embs) == 0:
return []
sims = cosine_similarity(query_embs, pool_embs)
neighbor_groups = set()
for sim_row in sims:
top_idx = np.argsort(sim_row)[-n_per_query:]
for idx in top_idx:
neighbor_groups.add(pool_group_ids[idx])
return list(neighbor_groups)
[ ]:
# Get embeddings for remaining samples
remaining_samples = list(remaining)
remaining_embs = np.array([s.embeddings for s in remaining_samples if s.embeddings is not None])
remaining_group_ids = [s.group.id for s in remaining_samples if s.embeddings is not None]
if len(failures) > 0 and len(remaining_embs) > 0:
failure_embs = np.array([s.embeddings for s in failures if s.embeddings is not None])
failure_group_ids = [s.group.id for s in failures if s.embeddings is not None]
print(f"Finding neighbors of {len(failure_embs)} failure samples...")
# Find neighbor groups (excluding already-selected coverage groups)
targeted_groups = find_neighbor_groups(
failure_embs, failure_group_ids,
remaining_embs, remaining_group_ids,
n_per_query=5
)
targeted_groups = [gid for gid in targeted_groups if gid not in coverage_groups][:targeted_budget]
print(f"Targeted selection: {len(targeted_groups)} groups")
else:
print("No failures to target or no embeddings. Using coverage-only selection.")
# Fall back to more coverage
if len(remaining_with_scores) > coverage_budget:
extra_groups = [gid for gid, _ in sorted_groups[coverage_budget:coverage_budget + targeted_budget]]
targeted_groups = [gid for gid in extra_groups if gid not in coverage_groups]
else:
targeted_groups = []
print(f"Additional coverage selection: {len(targeted_groups)} groups")
Combine and Tag Batch v1#
[ ]:
# Combine selections
batch_v1_groups = list(set(coverage_groups + targeted_groups))
if len(batch_v1_groups) == 0:
print("No groups selected. Check that Steps 3 and 6 completed successfully.")
else:
# Select ALL samples in these groups (all slices)
batch_v1 = dataset.match(F("group.id").is_in(batch_v1_groups))
# Tag
batch_v1.tag_samples("batch:v1")
batch_v1.tag_samples("to_annotate")
batch_v1.set_values("annotation_status", ["selected"] * len(batch_v1))
# Track source for analysis
dataset.match(F("group.id").is_in(coverage_groups)).tag_samples("source:coverage")
dataset.match(F("group.id").is_in(targeted_groups)).tag_samples("source:targeted")
# Save view
dataset.save_view("batch_v1", dataset.match_tags("batch:v1"))
print(f"\nBatch v1: {len(batch_v1_groups)} groups")
print(f" Coverage: {len(coverage_groups)}")
print(f" Targeted: {len(targeted_groups)}")
print(f" Total samples (all slices): {len(batch_v1)}")
The Complete Loop#
You now have the full iteration recipe:
1. Run Golden QA check (detect drift)
2. Annotate the current batch:
- Step 4: 2D detections on left camera
- Step 5: 3D cuboids on point cloud
3. Train on all annotated data (Step 6)
4. Evaluate on val set, tag failures
5. Select next batch: 30% coverage + 70% targeted
6. Repeat until stopping criteria
Stopping Criteria#
Stop when:
Gains per labeled sample flatten (diminishing returns)
Remaining failures are mostly label ambiguity
Val metrics hit your target threshold
The 30% Coverage Rule#
Don’t skip the coverage budget. Only chasing failures leads to:
Overfitting to edge cases
Distorted class priors
Models that fail on “normal” inputs
Coverage keeps you honest.
[ ]:
# Progress summary
pool = dataset.load_saved_view("pool")
pool_groups = pool.distinct("group.id")
total_pool_groups = len(pool_groups)
annotated_groups = len(dataset.match_tags("annotated:v0").distinct("group.id"))
selected_v1_groups = len(dataset.match_tags("batch:v1").distinct("group.id"))
pool_left = pool.select_group_slices(["left"])
still_unlabeled = len(pool_left.match(F("annotation_status") == "unlabeled").distinct("group.id"))
print("=" * 40)
print("ANNOTATION PROGRESS (by group/scene)")
print("=" * 40)
print(f"Pool total: {total_pool_groups} groups")
print(f"Annotated (v0): {annotated_groups} groups ({100*annotated_groups/total_pool_groups:.0f}%)")
print(f"Selected (v1): {selected_v1_groups} groups ({100*selected_v1_groups/total_pool_groups:.0f}%)")
print(f"Still unlabeled: {still_unlabeled} groups ({100*still_unlabeled/total_pool_groups:.0f}%)")
print("=" * 40)
Summary#
You implemented the iteration loop:
Golden QA check to detect annotation drift
Hybrid acquisition: 30% coverage + 70% targeted
Tagged
batch:v1ready for annotation (all slices: left, right, pcd)
Why this works:
Coverage prevents overfitting to edge cases
Targeting fixes known failures
Golden QA catches annotation drift early
The combination improves faster than either strategy alone
Your turn: Repeat Steps 4-7 with batch_v1, then batch_v2, etc.
Congratulations!#
You’ve completed the Full Loop annotation tutorial. You now know how to:
Setup - Create group-level splits for multimodal data
Select - Use ZCore for diversity-based sample selection
Annotate 2D - Label detections on camera images
Annotate 3D - Label cuboids on point clouds
Train + Evaluate - Train a model and analyze failures
Iterate - Use hybrid acquisition to select the next batch
This workflow scales from small experiments to production annotation pipelines.
Run in Google Colab
View source on GitHub