Source code for gluoncv.model_zoo.smot.motion_estimation
"""
Motion estimation module for MOT
"""
# pylint: disable=unused-argument,arguments-differ,missing-class-docstring
import time
import logging
from abc import ABC, abstractmethod
import numpy as np
class MotionEstimator(ABC):
@abstractmethod
def initialize(self, first_frame, first_frame_motion_pred_data):
"""
Initialize the motion estimator by feeding the first frame
Parameters
----------
first_frame: data of the first frame
first_frame_motion_pred_data: additional data for motion prediction
Returns:
cache_information
-------
"""
@abstractmethod
def predict_new_locations(self,
prev_frame_cache: np.ndarray,
prev_bboxes: np.ndarray,
new_frame: np.ndarray,
new_frame_motion_pred_data,
tracked_boxes_anchor_indices=None,
tracked_boxes_anchor_weights=None,
skip: bool = False,
**kwargs):
"""
The abstract method for predicting movement of bounding boxes given the two frames.
Parameters
----------
prev_frame_cache: cached image from motion estimation, numpy.ndarray
prev_bboxes: Nx4 numpy.ndarray, bounding boxes in (left, top, right, bottom) format
new_frame: BGR image, numpy.ndarray
new_frame_motion_pred_data: additional data for motion prediction
tracked_boxes_anchor_indices: anchor indices used to build the prev_bboxes
tracked_boxes_anchor_weights: voting weights of anchors used to build prev_bboxes
skip: whether to just skip motion estimation for this frame
kwargs: other information
Returns
new_boxes: Nx4 numpy.ndarray
cache_information:
-------
"""
# pylint: disable=notimplemented-raised,raising-bad-type
raise NotImplemented
[docs]class DummyMotionEstimator(MotionEstimator):
[docs] def predict_new_locations(self,
prev_frame_cache: np.ndarray,
prev_bboxes: np.ndarray,
new_frame: np.ndarray,
skip: bool = False,
**kwargs):
return prev_bboxes, None
class BaseFlowMotionEstimator(MotionEstimator):
"""
The basic structure of a flow based motion estimator
To implement your own flow tracker, extend the followting methods:
compuate_flow(): given preprocessed information, compute optical flow map
prepare_frame(): for preprocessing, the results will be stored and provided to next frame's inference
"""
def initialize(self, first_frame, first_frame_motion_pred_data):
prepared_first_frame = self.prepare_frame(first_frame)
return prepared_first_frame
def predict_new_locations(self,
prev_frame_cache: np.ndarray,
prev_bboxes: np.ndarray,
new_frame: np.ndarray,
new_frame_motion_pred_data,
skip: bool = False,
**kwargs):
t_start_prepare = time.time()
prepared_new_frame = self.prepare_frame(new_frame)
assert prev_frame_cache.shape == prepared_new_frame.shape
e_prepare = time.time() - t_start_prepare
logging.info("flow preparation runtime: {:.05f}".format(e_prepare))
if not skip:
t_start_flow = time.time()
flow_map = self.compute_flow(prev_frame_cache, prepared_new_frame)
assert flow_map.shape[-1] == 2, ValueError("flow map elements must be 2 element vectors!")
e_flow = time.time() - t_start_flow
logging.info("flow computation runtime: {:.05f}".format(e_flow))
t_start_pred = time.time()
predicted_bboxes = self._warp_bbox(new_frame.shape, prev_bboxes, flow_map)
e_pred = time.time() - t_start_pred
logging.info("bounding box prediction runtime: {:.05f}".format(e_pred))
return predicted_bboxes, prepared_new_frame
else:
return prev_bboxes, prepared_new_frame
@abstractmethod
def compute_flow(self, prev_frame_cache, prepared_new_frame):
"""
Compute dense optical flow
Parameters
----------
prev_frame_cache
prepared_new_frame
Returns
flow_map: a NxMx2 map. each spatial local contains a 2-element vector
specifying the delta in x and y directions. The unit of delta is pixel
in this flow_map's coordinate space
-------
"""
# pylint: disable=notimplemented-raised,raising-bad-type
raise NotImplementedError
@abstractmethod
def prepare_frame(self, frame):
# pylint: disable=notimplemented-raised,raising-bad-type
raise not NotImplementedError
@classmethod
def _warp_bbox(cls, image_shape: tuple, bboxes: np.ndarray, flow_map: np.ndarray):
"""
Use the computed denseflow map to estimate the movement of the object bounding box
then warp the boxes to their new locations.
Parameters
----------
image_shape: tuple
bboxes: numpy.ndarray shape: (n, 4)
flow_map: numpy.ndarray shape: (h, w, 2)
Returns: predicted new locations of the boxes, numpy.ndarray shape: (n, 4)
-------
"""
# pylint: disable=logging-format-interpolation,bare-except
flow_shape = flow_map.shape
flow_h, flow_w, _ = flow_shape
t0 = time.time()
image_h = image_shape[0]
image_w = image_shape[1]
image2flow_ratio_h = image_h / flow_h
image2flow_ratio_w = image_w / flow_w
logging.info("ratio: {}x{}".format(image2flow_ratio_w, image2flow_ratio_h))
# Remap the delta to image coordinate space
flow_map[..., 0] *= image2flow_ratio_w
flow_map[..., 1] *= image2flow_ratio_h
flow_map_bboxes = bboxes.copy()
flow_map_bboxes[:, 0] /= image2flow_ratio_w
flow_map_bboxes[:, 2] /= image2flow_ratio_w
flow_map_bboxes[:, 1] /= image2flow_ratio_h
flow_map_bboxes[:, 3] /= image2flow_ratio_h
warped_bboxes = []
for flow_bbox, raw_bbox in zip(flow_map_bboxes, bboxes):
x0, y0, x1, y1 = int(flow_bbox[0]), int(flow_bbox[1]), int(flow_bbox[2]), int(flow_bbox[3])
delta_x = flow_map[y0:max(y1, y0 + 1), x0:max(x1, x0 + 1), 0]
delta_y = flow_map[y0:max(y1, y0 + 1), x0:max(x1, x0 + 1), 1]
dx = np.mean(delta_x)
dy = np.mean(delta_y)
if np.isnan(dx) or np.isnan(dy):
warped_bboxes.append(raw_bbox)
else:
warped_bboxes.append([raw_bbox[0] + dx, raw_bbox[1] + dy, raw_bbox[2] + dx, raw_bbox[3] + dy])
logging.info("bbox warping runtime cost {}".format(time.time() - t0))
return np.array(warped_bboxes)
[docs]class FarneBeckFlowMotionEstimator(BaseFlowMotionEstimator):
"""
Use the farnebeck algorithm for the flow-based motion estimator
"""
def __init__(self, flow_scale=256):
self.flow_scale = flow_scale
import cv2
self._cv2 = cv2
def prepare_frame(self, frame):
img_h, img_w, _ = frame.shape
ratio = min(self.flow_scale / img_w, self.flow_scale / img_h)
flow_h, flow_w = int(img_h * ratio), int(img_w * ratio)
cv2 = self._cv2
resized_gray_frame = cv2.cvtColor(
cv2.resize(frame, (flow_w, flow_h), interpolation=cv2.INTER_NEAREST),
cv2.COLOR_BGR2GRAY)
return resized_gray_frame
[docs] def compute_flow(self, prev_frame_cache, prepared_new_frame):
# Compute Farnebeck flow
cv2 = self._cv2
flow_map = cv2.calcOpticalFlowFarneback(
prev_frame_cache, prepared_new_frame, None, 0.5, 3, 15, 3, 5, 1.2, 0)
return flow_map