# Copyright (c) Facebook, Inc. and its affiliates.
import numpy as np
from typing import List
import pycocotools.mask as mask_util
from detectron2.structures import Instances
from detectron2.utils.visualizer import (
ColorMode,
Visualizer,
_create_text_labels,
_PanopticPrediction,
)
from .colormap import random_color, random_colors
class _DetectedInstance:
"""
Used to store data about detected objects in video frame,
in order to transfer color to objects in the future frames.
Attributes:
label (int):
bbox (tuple[float]):
mask_rle (dict):
color (tuple[float]): RGB colors in range (0, 1)
ttl (int): time-to-live for the instance. For example, if ttl=2,
the instance color can be transferred to objects in the next two frames.
"""
__slots__ = ["label", "bbox", "mask_rle", "color", "ttl"]
def __init__(self, label, bbox, mask_rle, color, ttl):
self.label = label
self.bbox = bbox
self.mask_rle = mask_rle
self.color = color
self.ttl = ttl
[docs]class VideoVisualizer:
[docs] def __init__(self, metadata, instance_mode=ColorMode.IMAGE):
"""
Args:
metadata (MetadataCatalog): image metadata.
"""
self.metadata = metadata
self._old_instances = []
assert instance_mode in [
ColorMode.IMAGE,
ColorMode.IMAGE_BW,
], "Other mode not supported yet."
self._instance_mode = instance_mode
self._max_num_instances = self.metadata.get("max_num_instances", 74)
self._assigned_colors = {}
self._color_pool = random_colors(self._max_num_instances, rgb=True, maximum=1)
self._color_idx_set = set(range(len(self._color_pool)))
[docs] def draw_instance_predictions(self, frame, predictions):
"""
Draw instance-level prediction results on an image.
Args:
frame (ndarray): an RGB image of shape (H, W, C), in the range [0, 255].
predictions (Instances): the output of an instance detection/segmentation
model. Following fields will be used to draw:
"pred_boxes", "pred_classes", "scores", "pred_masks" (or "pred_masks_rle").
Returns:
output (VisImage): image object with visualizations.
"""
frame_visualizer = Visualizer(frame, self.metadata)
num_instances = len(predictions)
if num_instances == 0:
return frame_visualizer.output
boxes = predictions.pred_boxes.tensor.numpy() if predictions.has("pred_boxes") else None
scores = predictions.scores if predictions.has("scores") else None
classes = predictions.pred_classes.numpy() if predictions.has("pred_classes") else None
keypoints = predictions.pred_keypoints if predictions.has("pred_keypoints") else None
colors = predictions.COLOR if predictions.has("COLOR") else [None] * len(predictions)
periods = predictions.ID_period if predictions.has("ID_period") else None
period_threshold = self.metadata.get("period_threshold", 0)
visibilities = (
[True] * len(predictions)
if periods is None
else [x > period_threshold for x in periods]
)
if predictions.has("pred_masks"):
masks = predictions.pred_masks
# mask IOU is not yet enabled
# masks_rles = mask_util.encode(np.asarray(masks.permute(1, 2, 0), order="F"))
# assert len(masks_rles) == num_instances
else:
masks = None
if not predictions.has("COLOR"):
if predictions.has("ID"):
colors = self._assign_colors_by_id(predictions)
else:
# ToDo: clean old assign color method and use a default tracker to assign id
detected = [
_DetectedInstance(classes[i], boxes[i], mask_rle=None, color=colors[i], ttl=8)
for i in range(num_instances)
]
colors = self._assign_colors(detected)
labels = _create_text_labels(classes, scores, self.metadata.get("thing_classes", None))
if self._instance_mode == ColorMode.IMAGE_BW:
# any() returns uint8 tensor
frame_visualizer.output.reset_image(
frame_visualizer._create_grayscale_image(
(masks.any(dim=0) > 0).numpy() if masks is not None else None
)
)
alpha = 0.3
else:
alpha = 0.5
labels = (
None
if labels is None
else [y[0] for y in filter(lambda x: x[1], zip(labels, visibilities))]
) # noqa
assigned_colors = (
None
if colors is None
else [y[0] for y in filter(lambda x: x[1], zip(colors, visibilities))]
) # noqa
frame_visualizer.overlay_instances(
boxes=None if masks is not None else boxes[visibilities], # boxes are a bit distracting
masks=None if masks is None else masks[visibilities],
labels=labels,
keypoints=None if keypoints is None else keypoints[visibilities],
assigned_colors=assigned_colors,
alpha=alpha,
)
return frame_visualizer.output
[docs] def draw_sem_seg(self, frame, sem_seg, area_threshold=None):
"""
Args:
sem_seg (ndarray or Tensor): semantic segmentation of shape (H, W),
each value is the integer label.
area_threshold (Optional[int]): only draw segmentations larger than the threshold
"""
# don't need to do anything special
frame_visualizer = Visualizer(frame, self.metadata)
frame_visualizer.draw_sem_seg(sem_seg, area_threshold=None)
return frame_visualizer.output
def draw_panoptic_seg_predictions(
self, frame, panoptic_seg, segments_info, area_threshold=None, alpha=0.5
):
frame_visualizer = Visualizer(frame, self.metadata)
pred = _PanopticPrediction(panoptic_seg, segments_info, self.metadata)
if self._instance_mode == ColorMode.IMAGE_BW:
frame_visualizer.output.reset_image(
frame_visualizer._create_grayscale_image(pred.non_empty_mask())
)
# draw mask for all semantic segments first i.e. "stuff"
for mask, sinfo in pred.semantic_masks():
category_idx = sinfo["category_id"]
try:
mask_color = [x / 255 for x in self.metadata.stuff_colors[category_idx]]
except AttributeError:
mask_color = None
frame_visualizer.draw_binary_mask(
mask,
color=mask_color,
text=self.metadata.stuff_classes[category_idx],
alpha=alpha,
area_threshold=area_threshold,
)
all_instances = list(pred.instance_masks())
if len(all_instances) == 0:
return frame_visualizer.output
# draw mask for all instances second
masks, sinfo = list(zip(*all_instances))
num_instances = len(masks)
masks_rles = mask_util.encode(
np.asarray(np.asarray(masks).transpose(1, 2, 0), dtype=np.uint8, order="F")
)
assert len(masks_rles) == num_instances
category_ids = [x["category_id"] for x in sinfo]
detected = [
_DetectedInstance(category_ids[i], bbox=None, mask_rle=masks_rles[i], color=None, ttl=8)
for i in range(num_instances)
]
colors = self._assign_colors(detected)
labels = [self.metadata.thing_classes[k] for k in category_ids]
frame_visualizer.overlay_instances(
boxes=None,
masks=masks,
labels=labels,
keypoints=None,
assigned_colors=colors,
alpha=alpha,
)
return frame_visualizer.output
def _assign_colors(self, instances):
"""
Naive tracking heuristics to assign same color to the same instance,
will update the internal state of tracked instances.
Returns:
list[tuple[float]]: list of colors.
"""
# Compute iou with either boxes or masks:
is_crowd = np.zeros((len(instances),), dtype=np.bool)
if instances[0].bbox is None:
assert instances[0].mask_rle is not None
# use mask iou only when box iou is None
# because box seems good enough
rles_old = [x.mask_rle for x in self._old_instances]
rles_new = [x.mask_rle for x in instances]
ious = mask_util.iou(rles_old, rles_new, is_crowd)
threshold = 0.5
else:
boxes_old = [x.bbox for x in self._old_instances]
boxes_new = [x.bbox for x in instances]
ious = mask_util.iou(boxes_old, boxes_new, is_crowd)
threshold = 0.6
if len(ious) == 0:
ious = np.zeros((len(self._old_instances), len(instances)), dtype="float32")
# Only allow matching instances of the same label:
for old_idx, old in enumerate(self._old_instances):
for new_idx, new in enumerate(instances):
if old.label != new.label:
ious[old_idx, new_idx] = 0
matched_new_per_old = np.asarray(ious).argmax(axis=1)
max_iou_per_old = np.asarray(ious).max(axis=1)
# Try to find match for each old instance:
extra_instances = []
for idx, inst in enumerate(self._old_instances):
if max_iou_per_old[idx] > threshold:
newidx = matched_new_per_old[idx]
if instances[newidx].color is None:
instances[newidx].color = inst.color
continue
# If an old instance does not match any new instances,
# keep it for the next frame in case it is just missed by the detector
inst.ttl -= 1
if inst.ttl > 0:
extra_instances.append(inst)
# Assign random color to newly-detected instances:
for inst in instances:
if inst.color is None:
inst.color = random_color(rgb=True, maximum=1)
self._old_instances = instances[:] + extra_instances
return [d.color for d in instances]
def _assign_colors_by_id(self, instances: Instances) -> List:
colors = []
untracked_ids = set(self._assigned_colors.keys())
for id in instances.ID:
if id in self._assigned_colors:
colors.append(self._color_pool[self._assigned_colors[id]])
untracked_ids.remove(id)
else:
assert (
len(self._color_idx_set) >= 1
), f"Number of id exceeded maximum, \
max = {self._max_num_instances}"
idx = self._color_idx_set.pop()
color = self._color_pool[idx]
self._assigned_colors[id] = idx
colors.append(color)
for id in untracked_ids:
self._color_idx_set.add(self._assigned_colors[id])
del self._assigned_colors[id]
return colors