Source code for gluoncv.model_zoo.smot.smot_tracker

"""
MXNet implementation of tracktor in SMOT: Single-Shot Multi Object Tracking
https://arxiv.org/abs/2010.16031
"""
# pylint: disable=line-too-long,logging-format-interpolation,unused-argument,missing-function-docstring
from __future__ import absolute_import
import logging
import time
import numpy as np

import mxnet as mx
from gluoncv.utils.bbox import bbox_iou
from gluoncv.data import COCODetection
from .utils import timeit, Track
from .motion_estimation import FarneBeckFlowMotionEstimator
from .motion_estimation import DummyMotionEstimator


[docs]def nms_fallback(boxes, thresh): """ Perform non-maximal suppression and return the indices Parameters ---------- boxes: [[x, y, xmax, ymax, score]] Returns kept box indices ------- """ order = np.argsort(boxes[:, -1])[::-1] iou_mat = bbox_iou(boxes[:, :4], boxes[:, :4]) keep = [] while len(order) > 0: i = order[0] keep.append(i) IOU = iou_mat[i, order[1:]] remaining = np.where(IOU <= thresh)[0] order = order[remaining + 1] return keep
[docs]def gpu_iou(bbox_a_tensor, bbox_b_tensor): """ Parameters ---------- bbox_a_tensor bbox_b_tensor Returns ------- """ if bbox_a_tensor.shape[1] < 4 or bbox_b_tensor.shape[1] < 4: raise IndexError("Bounding boxes axis 1 must have at least length 4") tl = mx.nd.maximum(bbox_a_tensor.reshape((bbox_a_tensor.shape[0], 1, -1))[:, :, :2], bbox_b_tensor[:, :2]) br = mx.nd.minimum(bbox_a_tensor.reshape((bbox_a_tensor.shape[0], 1, -1))[:, :, 2:4], bbox_b_tensor[:, 2:4]) valid = mx.nd.prod(tl < br, axis=2) area_i = mx.nd.prod(br - tl, axis=2) * valid area_a = mx.nd.prod(bbox_a_tensor[:, 2:4] - bbox_a_tensor[:, :2], axis=1) area_b = mx.nd.prod(bbox_b_tensor[:, 2:4] - bbox_b_tensor[:, :2], axis=1) return area_i / (area_a.expand_dims(1) + area_b.expand_dims(0) - area_i)
[docs]class SMOTTracker: """ Implementation of the SMOT tracker The steps to use the tracker is: 0. Set anchors from the SSD 1. First call tracker.predict(new_frame) 2. Then get the tracking anchor information 3. Run the detractor with the tracking anchor information 4. Run tracker.update(new_detection, track_info). """ # pylint: disable=dangerous-default-value,unnecessary-comprehension def __init__(self, motion_model='no', anchor_array=None, use_motion=True, tracking_classes=[], match_top_k=10, track_keep_alive_thresh=0.1, new_track_iou_thresh=0.3, track_nms_thresh=0.5, gpu_id=0, anchor_assignment_method='iou', joint_linking=False, tracktor=None, ): """ Parameters ---------- anchor_array use_motion match_top_k track_keep_alive_thresh new_track_iou_thresh track_nms_thresh gpu_id anchor_assignment joint_linking """ self.use_motion = use_motion self.tracks = [] self.all_track_id = 0 self.pending_index = [] self.conf_score_thresh = 0.1 self.anchor_array = anchor_array self.next_frame_pred_index = [] self.next_frame_pred_weights = [] self.next_frame_pred_bbox = [] self.waiting_update_tracks = [] self.next_frame_ref_bbox = [] self.last_frame = None self.k = match_top_k self.keep_alive = track_keep_alive_thresh self.new_track_iou_thresh = new_track_iou_thresh self.track_nms_thresh = track_nms_thresh self.frame_cache = None self.mx_ctx = mx.gpu(gpu_id) self.anchor_assignment_method = anchor_assignment_method self.joint_linking = joint_linking if len(tracking_classes) == 0: raise ValueError("Unknown tracking classes, please let us know what object you want to track") self.coco_class_set = set(COCODetection.CLASSES) self.coco_class2index_dict = {i:name for i, name in enumerate(COCODetection.CLASSES)} self.class_set = set(tracking_classes) for class_name in tracking_classes: if class_name not in self.coco_class_set: raise ValueError("Your cunstom class {} is not supported, only COCO classes are currently supported,\ the classes are {}".format(class_name, COCODetection.CLASSES)) if motion_model == 'farneback': self.motion_estimator = FarneBeckFlowMotionEstimator() elif motion_model == 'no': self.motion_estimator = DummyMotionEstimator() else: raise ValueError("Unknown motion model: {}".format(motion_model))
[docs] def process_frame_sequence(self, frame_iterator, tracktor): """ Parameters ---------- frame_iterator: each step it emits a tuple of (frame_id, frame_data) tracktor Returns ------- results_iter: a response iterator with one tuple (frame_id, frame_rst) per frame """ for frame_id, frame in frame_iterator: logging.info('Processing Frame ID: {}'.format(frame_id)) t_iter_start = time.time() # STEP 0: Prepare the tracktor with the new frame data motion_pred_data = tracktor.prepare_for_frame(frame) # STEP 1: Predict the new locations of the tracked bounding boxes in the tracker tracking_anchor_indices, tracking_anchor_weights, tracking_classes = self.motion_predict(frame, motion_pred_data) # STEP 2: Run the tracktor detection_bboxes, detection_anchor_indices, tracking_bboxes, extra_info \ = tracktor.detect_and_track(frame, tracking_anchor_indices, tracking_anchor_weights, tracking_classes ) if self.anchor_array is None: self.set_anchor_array(tracktor.anchors()) # STEP 3: Update the tracker with detector responses self.update(detection_bboxes, tracking_bboxes, detection_anchor_indices, tracking_anchor_indices, tracking_anchor_weights, tracking_classes, extra_info) # yield the results of this frame results = self._produce_frame_result() elapsed = time.time() - t_iter_start logging.info("Total Tracking Runtime: {:2.4f} msec, {:.01f} FPS".format( elapsed * 1000, 1 / elapsed)) yield frame_id, results
@timeit def set_anchor_array(self, anchor_arracy): self.anchor_array = anchor_arracy self.anchor_tensor = mx.nd.array(self.anchor_array, ctx=self.mx_ctx, dtype=np.float32) @timeit def motion_predict(self, new_frame: np.ndarray, motion_pred_data): """ Perform motion prediction and assign the predicted track locations to corresponding anchors for re-detection. It will update the following properties: next_frame_pred_index: indices of anchors that bear tracking information. Each track will be assigned to several anchors. They will vote in the re-detection processs. next_frame_pred_weights: weights in the re-detection voting next_frame_pred_bbox: motion-predicted locations of the tracked objects waiting_update_tracks: active tracks for re-detection next_frame_ref_bbox: original locations of the tracked objects Parameters ---------- new_frame: BGR frame of this timestep motion_pred_data: extra data needed by the motion predictor Returns: next_frame_pred_index next_frame_pred_weights ------- """ # STEP 1: Find all active tracks active_track_boxes = [] active_track_indices = [] active_track_anchor_indices = [] active_track_anchor_weights = [] active_track_classes = [] t_active = time.time() for track_idx, track in enumerate(self.tracks): if track.is_active(): active_track_boxes.append(track.mean) active_track_indices.append(track_idx) src_idx, src_weights = track.source active_track_anchor_indices.append(src_idx) active_track_anchor_weights.append(src_weights) active_track_classes.append([track.class_id]) logging.debug("active track {} with age: {}".format(track.track_id, track.age)) active_track_boxes = np.array(active_track_boxes) active_track_anchor_indices = np.array(active_track_anchor_indices) active_track_anchor_weights = np.array(active_track_anchor_weights) e_active = time.time() - t_active logging.info('find active runtime: {:.05f}'.format(e_active)) if len(active_track_boxes) > 0: # The following steps only happen if we have something to track # STEP 2: Warp the boxes according to flow predicted_track_boxes = self._motion_prediction(new_frame, active_track_boxes, active_track_anchor_indices, active_track_anchor_weights, motion_pred_data, skip=not self.use_motion) # STEP 3: Assign the warped boxes to anchor compositions tracking_anchor_indices, tracking_anchor_weights, tracking_anchor_validity = self._assign_box_to_anchors( predicted_track_boxes, method=self.anchor_assignment_method) # remove tracks becoming invalid after motion prediction invalid_track_numbers = np.nonzero(1 - tracking_anchor_validity)[0] logging.info("{}/{} tracks become invalid after motion prediction".format(len(invalid_track_numbers), len(active_track_boxes))) for i_invalid in invalid_track_numbers: self.tracks[active_track_indices[i_invalid]].mark_missed() # keep the valid tracks for re-detection valid_track_numbers = np.nonzero(tracking_anchor_validity)[0] self.next_frame_pred_index = tracking_anchor_indices[valid_track_numbers, ...] self.next_frame_pred_weights = tracking_anchor_weights[valid_track_numbers, ...] self.next_frame_pred_bbox = predicted_track_boxes[valid_track_numbers, ...] self.next_frame_pred_class = np.array(active_track_classes)[valid_track_numbers, ...] active_track_indices = np.array(active_track_indices)[valid_track_numbers, ...] active_track_boxes = active_track_boxes[valid_track_numbers, ...] else: # skip flow computation if there is no active track # just save the frame in cache predicted_track_boxes = self._motion_prediction(new_frame, active_track_boxes, active_track_anchor_indices, active_track_anchor_weights, motion_pred_data, skip=True) assert len(predicted_track_boxes) == 0 self.next_frame_pred_index = np.array([]) self.next_frame_pred_weights = np.array([]) self.next_frame_pred_bbox = np.array([]) self.next_frame_pred_class = np.array([]) self.waiting_update_tracks = active_track_indices self.next_frame_ref_bbox = active_track_boxes return self.next_frame_pred_index, self.next_frame_pred_weights, self.next_frame_pred_class @timeit def update(self, new_detections: np.ndarray, tracking_predictions: np.ndarray, detection_anchor_indices: np.ndarray, tracking_anchor_indices: np.ndarray, tracking_anchor_weights: np.ndarray, tracking_classes: np.ndarray, extra_info: dict = None): """ Update the tracks according to tracking and detection predictions. Parameters ---------- new_detections: Nx5 ndarray tracking_predictions: Mx5 ndarray extra_info: a dictionary with extra information Returns ------- """ # pylint: disable=too-many-nested-blocks t_pose_processing = time.time() logging.info("tracking predictions 's shape is {}".format(tracking_predictions.shape)) logging.debug(tracking_predictions) logging.debug(self.waiting_update_tracks) detection_landmarks = extra_info['detection_landmarks'] if 'detection_landmarks' in extra_info else None tracking_landmarks = extra_info['tracking_landmarks'] if 'tracking_landmarks' in extra_info else None for t in self.tracks: t.predict() # STEP 1: track level NMS still_active_track_pred_indices = [] still_active_track_indices = [] if len(tracking_predictions) > 0: # class wise NMS keep_set = set() for c in set(tracking_classes.ravel().tolist()): class_pick = np.nonzero(tracking_classes == c)[0] keep_tracking_pred_nms_indices = nms_fallback(tracking_predictions[class_pick, ...], self.track_nms_thresh) for i_keep in keep_tracking_pred_nms_indices: keep_set.add(class_pick[i_keep]) still_active_track_pred_indices = [] for i_pred, i_track in enumerate(self.waiting_update_tracks): if i_pred in keep_set: self.tracks[i_track].update(tracking_predictions[i_pred, :], (tracking_anchor_indices[i_pred, :], tracking_anchor_weights[i_pred, :]), tracking_landmarks[i_pred, :] if tracking_landmarks is not None else None) else: # suppressed tracks in the track NMS process will be marked as Missing self.tracks[i_track].mark_missed() if self.tracks[i_track].is_active(): still_active_track_pred_indices.append(i_pred) still_active_track_indices.append(i_track) # STEP 2: Remove New Detection Overlapping with Tracks if len(still_active_track_pred_indices) > 0 and len(new_detections) > 0: active_tracking_predictions = tracking_predictions[still_active_track_pred_indices, :] det_track_max_iou = bbox_iou(new_detections[:, :4], active_tracking_predictions[:, :4]) same_class = new_detections[:, -1:] == (tracking_classes[still_active_track_pred_indices, :].T) # suppress all new detections that have high IOU with active tracks affinity = (det_track_max_iou * same_class).max(axis=1) keep_detection_indices = np.nonzero(affinity <= self.new_track_iou_thresh)[0] else: # otherwise simply keep all detections keep_detection_indices = list(range(len(new_detections))) active_tracking_predictions = np.array([]) # STEP 3: New Track Initialization if len(keep_detection_indices) > 0: active_new_detections = new_detections[keep_detection_indices, :] # (Optional) STEP 3.a: Perform joint linking of body and head if self.joint_linking: tracking_classes = np.array(tracking_classes) body2face_link, face2body_link = \ self._link_face_body(active_new_detections, extra_info['detection_keypoints'][keep_detection_indices], active_tracking_predictions, extra_info['tracking_keypoints'][still_active_track_pred_indices], tracking_classes[still_active_track_pred_indices] ) else: body2face_link, face2body_link = None, None new_tracks = [] for idx, i_new_track in enumerate(keep_detection_indices): new_track = Track(new_detections[i_new_track, :4], self.all_track_id, (detection_anchor_indices[i_new_track, :], np.array([1])), keep_alive_thresh=self.keep_alive, class_id=new_detections[i_new_track, -1], attributes=detection_landmarks[i_new_track, :] if detection_landmarks is not None else None) if self.joint_linking: if new_track.class_id == 0: # new face track if idx in face2body_link[0]: logging.debug(idx, i_new_track, '0') body_idx = face2body_link[0][idx] if idx > body_idx: new_track.link_to(new_tracks[body_idx]) elif idx in face2body_link[2]: logging.debug(idx, i_new_track, '1') body_idx = face2body_link[2][idx] new_track.link_to(self.tracks[still_active_track_indices[body_idx]]) if new_track.class_id == 1: # new body track if idx in body2face_link[0]: face_idx = body2face_link[0][idx] if idx > face_idx: new_track.link_to(new_tracks[face_idx]) elif idx in body2face_link[2]: face_idx = body2face_link[2][idx] new_track.link_to(self.tracks[still_active_track_indices[face_idx]]) self.all_track_id += 1 self.tracks.append(new_track) new_tracks.append(new_track) elapsed_post_processing = time.time() - t_pose_processing logging.info("total tracklets to now is {}, post-processing time: {:.05f} sec".format( self.all_track_id, elapsed_post_processing)) @property def active_tracks(self): for t in self.tracks: if t.is_active(): yield t def _motion_prediction(self, new_frame, tracked_boxes, tracked_boxes_anchor_indices, tracked_boxes_anchor_weights, motion_pred_data, skip=False): """ Perform motion estimation of a set bounding boxes. Use either optical flow or SOT algorithms to predict the locations of these bounding boxes in the new frame Parameters ---------- new_frame tracked_boxes tracked_boxes_anchor_indices tracked_boxes_anchor_weights skip Returns ------- """ if self.frame_cache is None: # this is the first frame self.frame_cache = self.motion_estimator.initialize(new_frame, motion_pred_data) predicted_bboxes = tracked_boxes else: # this is not the first frame predicted_bboxes, self.frame_cache = self.motion_estimator.predict_new_locations( self.frame_cache, tracked_boxes, new_frame, motion_pred_data, tracked_boxes_anchor_indices=tracked_boxes_anchor_indices, tracked_boxes_anchor_weights=tracked_boxes_anchor_weights, skip=skip) return predicted_bboxes def _assign_box_to_anchors(self, boxes: np.ndarray, method: str = 'avg', min_anchor_iou: float = 0.1): """ The actual implementation of the assignment step. GPU acceleration is used because the number of anchors is huge Parameters ---------- boxes: must have >1 boxes anchors Returns ------- """ t_start = time.time() t_iou = time.time() gpu_boxes = mx.nd.array(boxes, ctx=self.mx_ctx) anchor_track_iou = gpu_iou(self.anchor_tensor, gpu_boxes) elapsed_iou = time.time() - t_iou logging.info("iou computation runtime: {:.05f}".format(elapsed_iou)) # get the top-k closest anchors instead of 1 if method == 'max': tracking_anchor_ious, tracking_anchor_indices = mx.nd.topk(anchor_track_iou, axis=0, k=1, ret_typ='both', dtype='int32') tracking_anchor_ious = tracking_anchor_ious.T.asnumpy() tracking_anchor_indices = tracking_anchor_indices.T.asnumpy() tracking_anchor_weights = np.ones_like(tracking_anchor_indices) elif method == 'avg': tracking_anchor_ious, tracking_anchor_indices = mx.nd.topk(anchor_track_iou, axis=0, k=self.k, ret_typ='both', dtype='int32') tracking_anchor_ious = tracking_anchor_ious.T.asnumpy() tracking_anchor_indices = tracking_anchor_indices.T.asnumpy() tracking_anchor_weights = np.ones_like(tracking_anchor_indices) / self.k elif method == 'iou': t_sort = time.time() tracking_anchor_ious, tracking_anchor_indices = mx.nd.topk(anchor_track_iou, axis=0, k=self.k, ret_typ='both', dtype='int32') tracking_anchor_ious = tracking_anchor_ious.T.asnumpy() tracking_anchor_indices = tracking_anchor_indices.T.asnumpy() e_sort = time.time() - t_sort logging.info('sorting time: {:.05f}'.format(e_sort)) tracking_anchor_weights = tracking_anchor_ious / tracking_anchor_ious.sum(axis=1)[..., None] else: raise ValueError("unknown anchor assignment method") max_track_anchor_ious = tracking_anchor_ious.max(axis=1) tracking_anchor_validity = max_track_anchor_ious >= min_anchor_iou elapsed_assign = time.time() - t_start logging.info("assigment runtime: {}".format(elapsed_assign)) return tracking_anchor_indices, tracking_anchor_weights, tracking_anchor_validity @timeit def _produce_frame_result(self): tracked_objects = [] for track in self.active_tracks: box = { 'left': track.mean[0], 'top': track.mean[1], 'width': track.mean[2] - track.mean[0], 'height': track.mean[3] - track.mean[1] } tid = track.display_id age = track.age classId = track.class_id obj = { 'bbox': box, 'track_id': tid, 'age': age, 'class_id':classId, 'class_name':self.coco_class2index_dict[classId] } if track.attributes is not None: obj['landmarks'] = track.attributes if obj["class_name"] in self.class_set: tracked_objects.append(obj) return tracked_objects