Source code for gluoncv.data.transforms.presets.ssd
"""Transforms described in https://arxiv.org/abs/1512.02325."""
# pylint: disable=not-callable
from __future__ import absolute_import
import numpy as np
import mxnet as mx
from .. import bbox as tbbox
from .. import image as timage
from .. import experimental
from ....utils import try_import_dali
dali = try_import_dali()
__all__ = ['transform_test', 'load_test', 'SSDDefaultTrainTransform', 'SSDDefaultValTransform',
'SSDDALIPipeline']
[docs]def transform_test(imgs, short, max_size=1024, mean=(0.485, 0.456, 0.406),
std=(0.229, 0.224, 0.225)):
"""A util function to transform all images to tensors as network input by applying
normalizations. This function support 1 NDArray or iterable of NDArrays.
Parameters
----------
imgs : NDArray or iterable of NDArray
Image(s) to be transformed.
short : int
Resize image short side to this `short` and keep aspect ratio.
max_size : int, optional
Maximum longer side length to fit image.
This is to limit the input image shape. Aspect ratio is intact because we
support arbitrary input size in our SSD implementation.
mean : iterable of float
Mean pixel values.
std : iterable of float
Standard deviations of pixel values.
Returns
-------
(mxnet.NDArray, numpy.ndarray) or list of such tuple
A (1, 3, H, W) mxnet NDArray as input to network, and a numpy ndarray as
original un-normalized color image for display.
If multiple image names are supplied, return two lists. You can use
`zip()`` to collapse it.
"""
if isinstance(imgs, mx.nd.NDArray):
imgs = [imgs]
for im in imgs:
assert isinstance(im, mx.nd.NDArray), "Expect NDArray, got {}".format(type(im))
tensors = []
origs = []
for img in imgs:
img = timage.resize_short_within(img, short, max_size)
orig_img = img.asnumpy().astype('uint8')
img = mx.nd.image.to_tensor(img)
img = mx.nd.image.normalize(img, mean=mean, std=std)
tensors.append(img.expand_dims(0))
origs.append(orig_img)
if len(tensors) == 1:
return tensors[0], origs[0]
return tensors, origs
[docs]def load_test(filenames, short, max_size=1024, mean=(0.485, 0.456, 0.406),
std=(0.229, 0.224, 0.225)):
"""A util function to load all images, transform them to tensor by applying
normalizations. This function support 1 filename or iterable of filenames.
Parameters
----------
filenames : str or list of str
Image filename(s) to be loaded.
short : int
Resize image short side to this `short` and keep aspect ratio.
max_size : int, optional
Maximum longer side length to fit image.
This is to limit the input image shape. Aspect ratio is intact because we
support arbitrary input size in our SSD implementation.
mean : iterable of float
Mean pixel values.
std : iterable of float
Standard deviations of pixel values.
Returns
-------
(mxnet.NDArray, numpy.ndarray) or list of such tuple
A (1, 3, H, W) mxnet NDArray as input to network, and a numpy ndarray as
original un-normalized color image for display.
If multiple image names are supplied, return two lists. You can use
`zip()`` to collapse it.
"""
if isinstance(filenames, str):
filenames = [filenames]
imgs = [mx.image.imread(f) for f in filenames]
return transform_test(imgs, short, max_size, mean, std)
[docs]class SSDDefaultTrainTransform(object):
"""Default SSD training transform which includes tons of image augmentations.
Parameters
----------
width : int
Image width.
height : int
Image height.
anchors : mxnet.nd.NDArray, optional
Anchors generated from SSD networks, the shape must be ``(1, N, 4)``.
Since anchors are shared in the entire batch so it is ``1`` for the first dimension.
``N`` is the number of anchors for each image.
.. hint::
If anchors is ``None``, the transformation will not generate training targets.
Otherwise it will generate training targets to accelerate the training phase
since we push some workload to CPU workers instead of GPUs.
mean : array-like of size 3
Mean pixel values to be subtracted from image tensor. Default is [0.485, 0.456, 0.406].
std : array-like of size 3
Standard deviation to be divided from image. Default is [0.229, 0.224, 0.225].
iou_thresh : float
IOU overlap threshold for maximum matching, default is 0.5.
box_norm : array-like of size 4, default is (0.1, 0.1, 0.2, 0.2)
Std value to be divided from encoded values.
"""
def __init__(self, width, height, anchors=None, mean=(0.485, 0.456, 0.406),
std=(0.229, 0.224, 0.225), iou_thresh=0.5, box_norm=(0.1, 0.1, 0.2, 0.2),
**kwargs):
self._width = width
self._height = height
self._anchors = anchors
self._mean = mean
self._std = std
self._internal_target_generator = None
self._iou_thresh = iou_thresh
self._box_norm = box_norm
self._kwargs = kwargs
self._anchors_none = False
if anchors is None:
self._anchors_none = True
return
@property
def _target_generator(self):
# since we do not have predictions yet, so we ignore sampling here
if self._internal_target_generator is None:
if self._anchors_none:
return None
from ....model_zoo.ssd.target import SSDTargetGenerator
self._internal_target_generator = SSDTargetGenerator(
iou_thresh=self._iou_thresh, stds=self._box_norm, negative_mining_ratio=-1, **self._kwargs)
return self._internal_target_generator
else:
return self._internal_target_generator
def __call__(self, src, label):
"""Apply transform to training image/label."""
# random color jittering
img = experimental.image.random_color_distort(src)
# random expansion with prob 0.5
if np.random.uniform(0, 1) > 0.5:
img, expand = timage.random_expand(img, fill=[m * 255 for m in self._mean])
bbox = tbbox.translate(label, x_offset=expand[0], y_offset=expand[1])
else:
img, bbox = img, label
# random cropping
h, w, _ = img.shape
bbox, crop = experimental.bbox.random_crop_with_constraints(bbox, (w, h))
x0, y0, w, h = crop
img = mx.image.fixed_crop(img, x0, y0, w, h)
# resize with random interpolation
h, w, _ = img.shape
interp = np.random.randint(0, 5)
img = timage.imresize(img, self._width, self._height, interp=interp)
bbox = tbbox.resize(bbox, (w, h), (self._width, self._height))
# random horizontal flip
h, w, _ = img.shape
img, flips = timage.random_flip(img, px=0.5)
bbox = tbbox.flip(bbox, (w, h), flip_x=flips[0])
# to tensor
img = mx.nd.image.to_tensor(img)
img = mx.nd.image.normalize(img, mean=self._mean, std=self._std)
if self._anchors is None:
return img, bbox.astype(img.dtype)
# generate training target so cpu workers can help reduce the workload on gpu
gt_bboxes = mx.nd.array(bbox[np.newaxis, :, :4])
gt_ids = mx.nd.array(bbox[np.newaxis, :, 4:5])
cls_targets, box_targets, _ = self._target_generator(
self._anchors, None, gt_bboxes, gt_ids)
return img, cls_targets[0], box_targets[0]
[docs]class SSDDefaultValTransform(object):
"""Default SSD validation transform.
Parameters
----------
width : int
Image width.
height : int
Image height.
mean : array-like of size 3
Mean pixel values to be subtracted from image tensor. Default is [0.485, 0.456, 0.406].
std : array-like of size 3
Standard deviation to be divided from image. Default is [0.229, 0.224, 0.225].
"""
def __init__(self, width, height, mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)):
self._width = width
self._height = height
self._mean = mean
self._std = std
def __call__(self, src, label):
"""Apply transform to validation image/label."""
# resize
h, w, _ = src.shape
img = timage.imresize(src, self._width, self._height, interp=9)
bbox = tbbox.resize(label, in_size=(w, h), out_size=(self._width, self._height))
img = mx.nd.image.to_tensor(img)
img = mx.nd.image.normalize(img, mean=self._mean, std=self._std)
return img, bbox.astype(img.dtype)
[docs]class SSDDALIPipeline(dali.Pipeline):
"""DALI Pipeline with SSD training transform.
Parameters
----------
device_id: int
DALI pipeline arg - Device id.
num_workers:
DALI pipeline arg - Number of CPU workers.
batch_size:
Batch size.
data_shape: int
Height and width length. (height==width in SSD)
anchors: float list
Normalized [ltrb] anchors generated from SSD networks.
The shape length be ``N*4`` since it is a list of the N anchors that have
all 4 float elements.
dataset_reader: float
Partial pipeline object, which __call__ function has to return
(images, bboxes, labels) DALI EdgeReference tuple.
seed: int
Random seed. Default value is -1, which corresponds to no seed.
"""
def __init__(self, num_workers, device_id, batch_size, data_shape,
anchors, dataset_reader, seed=-1):
super(SSDDALIPipeline, self).__init__(
batch_size=batch_size,
device_id=device_id,
num_threads=num_workers,
seed=seed)
self.dataset_reader = dataset_reader
# Augumentation techniques
self.crop = dali.ops.RandomBBoxCrop(
device="cpu",
aspect_ratio=[0.5, 2.0],
thresholds=[0, 0.1, 0.3, 0.5, 0.7, 0.9],
scaling=[0.3, 1.0],
ltrb=True,
allow_no_crop=True,
num_attempts=1)
self.slice = dali.ops.Slice(device="cpu")
self.twist = dali.ops.ColorTwist(device="gpu")
self.resize = dali.ops.Resize(
device="cpu",
resize_x=data_shape,
resize_y=data_shape,
min_filter=dali.types.DALIInterpType.INTERP_TRIANGULAR)
# output_dtype = types.FLOAT16 if args.fp16 else types.FLOAT
output_dtype = dali.types.FLOAT
self.normalize = dali.ops.CropMirrorNormalize(
device="gpu",
crop=(data_shape, data_shape),
mean=[0.485 * 255, 0.456 * 255, 0.406 * 255],
std=[0.229 * 255, 0.224 * 255, 0.225 * 255],
mirror=0,
output_dtype=output_dtype,
output_layout=dali.types.NCHW,
pad_output=False)
# Random variables
self.rng1 = dali.ops.Uniform(range=[0.5, 1.5])
self.rng2 = dali.ops.Uniform(range=[0.875, 1.125])
self.rng3 = dali.ops.Uniform(range=[-0.5, 0.5])
self.flip = dali.ops.Flip(device="cpu")
self.bbflip = dali.ops.BbFlip(device="cpu", ltrb=True)
self.flip_coin = dali.ops.CoinFlip(probability=0.5)
self.box_encoder = dali.ops.BoxEncoder(
device="cpu",
criteria=0.5,
anchors=self._to_normalized_ltrb_list(anchors, data_shape),
offset=True,
stds=[0.1, 0.1, 0.2, 0.2],
scale=data_shape)
def _to_normalized_ltrb_list(self, anchors, size):
"""Prepare anchors into ltrb (normalized DALI anchors format list)"""
if isinstance(anchors, list):
return anchors
anchors_np = anchors.squeeze().asnumpy()
anchors_np_ltrb = anchors_np.copy()
anchors_np_ltrb[:, 0] = anchors_np[:, 0] - 0.5 * anchors_np[:, 2]
anchors_np_ltrb[:, 1] = anchors_np[:, 1] - 0.5 * anchors_np[:, 3]
anchors_np_ltrb[:, 2] = anchors_np[:, 0] + 0.5 * anchors_np[:, 2]
anchors_np_ltrb[:, 3] = anchors_np[:, 1] + 0.5 * anchors_np[:, 3]
anchors_np_ltrb /= size
return anchors_np_ltrb.flatten().tolist()
[docs] def define_graph(self):
"""
Define the DALI graph.
"""
saturation = self.rng1()
contrast = self.rng1()
brightness = self.rng2()
hue = self.rng3()
coin_rnd = self.flip_coin()
images, bboxes, labels = self.dataset_reader()
crop_begin, crop_size, bboxes, labels = self.crop(bboxes, labels)
images = self.slice(images, crop_begin, crop_size)
images = self.flip(images, horizontal=coin_rnd)
bboxes = self.bbflip(bboxes, horizontal=coin_rnd)
images = self.resize(images)
images = images.gpu()
images = self.twist(
images,
saturation=saturation,
contrast=contrast,
brightness=brightness,
hue=hue)
images = self.normalize(images)
bboxes, labels = self.box_encoder(bboxes, labels)
return (images, bboxes.gpu(), labels.gpu())