Source code for gluoncv.data.transforms.presets.ssd

"""Transforms described in https://arxiv.org/abs/1512.02325."""
# pylint: disable=not-callable
from __future__ import absolute_import
import numpy as np
import mxnet as mx
from .. import bbox as tbbox
from .. import image as timage
from .. import experimental

from ....utils import try_import_dali

dali = try_import_dali()

__all__ = ['transform_test', 'load_test', 'SSDDefaultTrainTransform', 'SSDDefaultValTransform',
           'SSDDALIPipeline']

[docs]def transform_test(imgs, short, max_size=1024, mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)): """A util function to transform all images to tensors as network input by applying normalizations. This function support 1 NDArray or iterable of NDArrays. Parameters ---------- imgs : NDArray or iterable of NDArray Image(s) to be transformed. short : int Resize image short side to this `short` and keep aspect ratio. max_size : int, optional Maximum longer side length to fit image. This is to limit the input image shape. Aspect ratio is intact because we support arbitrary input size in our SSD implementation. mean : iterable of float Mean pixel values. std : iterable of float Standard deviations of pixel values. Returns ------- (mxnet.NDArray, numpy.ndarray) or list of such tuple A (1, 3, H, W) mxnet NDArray as input to network, and a numpy ndarray as original un-normalized color image for display. If multiple image names are supplied, return two lists. You can use `zip()`` to collapse it. """ if isinstance(imgs, mx.nd.NDArray): imgs = [imgs] for im in imgs: assert isinstance(im, mx.nd.NDArray), "Expect NDArray, got {}".format(type(im)) tensors = [] origs = [] for img in imgs: img = timage.resize_short_within(img, short, max_size) orig_img = img.asnumpy().astype('uint8') img = mx.nd.image.to_tensor(img) img = mx.nd.image.normalize(img, mean=mean, std=std) tensors.append(img.expand_dims(0)) origs.append(orig_img) if len(tensors) == 1: return tensors[0], origs[0] return tensors, origs
[docs]def load_test(filenames, short, max_size=1024, mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)): """A util function to load all images, transform them to tensor by applying normalizations. This function support 1 filename or iterable of filenames. Parameters ---------- filenames : str or list of str Image filename(s) to be loaded. short : int Resize image short side to this `short` and keep aspect ratio. max_size : int, optional Maximum longer side length to fit image. This is to limit the input image shape. Aspect ratio is intact because we support arbitrary input size in our SSD implementation. mean : iterable of float Mean pixel values. std : iterable of float Standard deviations of pixel values. Returns ------- (mxnet.NDArray, numpy.ndarray) or list of such tuple A (1, 3, H, W) mxnet NDArray as input to network, and a numpy ndarray as original un-normalized color image for display. If multiple image names are supplied, return two lists. You can use `zip()`` to collapse it. """ if isinstance(filenames, str): filenames = [filenames] imgs = [mx.image.imread(f) for f in filenames] return transform_test(imgs, short, max_size, mean, std)
[docs]class SSDDefaultTrainTransform(object): """Default SSD training transform which includes tons of image augmentations. Parameters ---------- width : int Image width. height : int Image height. anchors : mxnet.nd.NDArray, optional Anchors generated from SSD networks, the shape must be ``(1, N, 4)``. Since anchors are shared in the entire batch so it is ``1`` for the first dimension. ``N`` is the number of anchors for each image. .. hint:: If anchors is ``None``, the transformation will not generate training targets. Otherwise it will generate training targets to accelerate the training phase since we push some workload to CPU workers instead of GPUs. mean : array-like of size 3 Mean pixel values to be subtracted from image tensor. Default is [0.485, 0.456, 0.406]. std : array-like of size 3 Standard deviation to be divided from image. Default is [0.229, 0.224, 0.225]. iou_thresh : float IOU overlap threshold for maximum matching, default is 0.5. box_norm : array-like of size 4, default is (0.1, 0.1, 0.2, 0.2) Std value to be divided from encoded values. """ def __init__(self, width, height, anchors=None, mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), iou_thresh=0.5, box_norm=(0.1, 0.1, 0.2, 0.2), **kwargs): self._width = width self._height = height self._anchors = anchors self._mean = mean self._std = std self._internal_target_generator = None self._iou_thresh = iou_thresh self._box_norm = box_norm self._kwargs = kwargs self._anchors_none = False if anchors is None: self._anchors_none = True return @property def _target_generator(self): # since we do not have predictions yet, so we ignore sampling here if self._internal_target_generator is None: if self._anchors_none: return None from ....model_zoo.ssd.target import SSDTargetGenerator self._internal_target_generator = SSDTargetGenerator( iou_thresh=self._iou_thresh, stds=self._box_norm, negative_mining_ratio=-1, **self._kwargs) return self._internal_target_generator else: return self._internal_target_generator def __call__(self, src, label): """Apply transform to training image/label.""" # random color jittering img = experimental.image.random_color_distort(src) # random expansion with prob 0.5 if np.random.uniform(0, 1) > 0.5: img, expand = timage.random_expand(img, fill=[m * 255 for m in self._mean]) bbox = tbbox.translate(label, x_offset=expand[0], y_offset=expand[1]) else: img, bbox = img, label # random cropping h, w, _ = img.shape bbox, crop = experimental.bbox.random_crop_with_constraints(bbox, (w, h)) x0, y0, w, h = crop img = mx.image.fixed_crop(img, x0, y0, w, h) # resize with random interpolation h, w, _ = img.shape interp = np.random.randint(0, 5) img = timage.imresize(img, self._width, self._height, interp=interp) bbox = tbbox.resize(bbox, (w, h), (self._width, self._height)) # random horizontal flip h, w, _ = img.shape img, flips = timage.random_flip(img, px=0.5) bbox = tbbox.flip(bbox, (w, h), flip_x=flips[0]) # to tensor img = mx.nd.image.to_tensor(img) img = mx.nd.image.normalize(img, mean=self._mean, std=self._std) if self._anchors is None: return img, bbox.astype(img.dtype) # generate training target so cpu workers can help reduce the workload on gpu gt_bboxes = mx.nd.array(bbox[np.newaxis, :, :4]) gt_ids = mx.nd.array(bbox[np.newaxis, :, 4:5]) cls_targets, box_targets, _ = self._target_generator( self._anchors, None, gt_bboxes, gt_ids) return img, cls_targets[0], box_targets[0]
[docs]class SSDDefaultValTransform(object): """Default SSD validation transform. Parameters ---------- width : int Image width. height : int Image height. mean : array-like of size 3 Mean pixel values to be subtracted from image tensor. Default is [0.485, 0.456, 0.406]. std : array-like of size 3 Standard deviation to be divided from image. Default is [0.229, 0.224, 0.225]. """ def __init__(self, width, height, mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)): self._width = width self._height = height self._mean = mean self._std = std def __call__(self, src, label): """Apply transform to validation image/label.""" # resize h, w, _ = src.shape img = timage.imresize(src, self._width, self._height, interp=9) bbox = tbbox.resize(label, in_size=(w, h), out_size=(self._width, self._height)) img = mx.nd.image.to_tensor(img) img = mx.nd.image.normalize(img, mean=self._mean, std=self._std) return img, bbox.astype(img.dtype)
[docs]class SSDDALIPipeline(dali.Pipeline): """DALI Pipeline with SSD training transform. Parameters ---------- device_id: int DALI pipeline arg - Device id. num_workers: DALI pipeline arg - Number of CPU workers. batch_size: Batch size. data_shape: int Height and width length. (height==width in SSD) anchors: float list Normalized [ltrb] anchors generated from SSD networks. The shape length be ``N*4`` since it is a list of the N anchors that have all 4 float elements. dataset_reader: float Partial pipeline object, which __call__ function has to return (images, bboxes, labels) DALI EdgeReference tuple. seed: int Random seed. Default value is -1, which corresponds to no seed. """ def __init__(self, num_workers, device_id, batch_size, data_shape, anchors, dataset_reader, seed=-1): super(SSDDALIPipeline, self).__init__( batch_size=batch_size, device_id=device_id, num_threads=num_workers, seed=seed) self.dataset_reader = dataset_reader # Augumentation techniques self.crop = dali.ops.RandomBBoxCrop( device="cpu", aspect_ratio=[0.5, 2.0], thresholds=[0, 0.1, 0.3, 0.5, 0.7, 0.9], scaling=[0.3, 1.0], ltrb=True, allow_no_crop=True, num_attempts=1) self.slice = dali.ops.Slice(device="cpu") self.twist = dali.ops.ColorTwist(device="gpu") self.resize = dali.ops.Resize( device="cpu", resize_x=data_shape, resize_y=data_shape, min_filter=dali.types.DALIInterpType.INTERP_TRIANGULAR) # output_dtype = types.FLOAT16 if args.fp16 else types.FLOAT output_dtype = dali.types.FLOAT self.normalize = dali.ops.CropMirrorNormalize( device="gpu", crop=(data_shape, data_shape), mean=[0.485 * 255, 0.456 * 255, 0.406 * 255], std=[0.229 * 255, 0.224 * 255, 0.225 * 255], mirror=0, output_dtype=output_dtype, output_layout=dali.types.NCHW, pad_output=False) # Random variables self.rng1 = dali.ops.Uniform(range=[0.5, 1.5]) self.rng2 = dali.ops.Uniform(range=[0.875, 1.125]) self.rng3 = dali.ops.Uniform(range=[-0.5, 0.5]) self.flip = dali.ops.Flip(device="cpu") self.bbflip = dali.ops.BbFlip(device="cpu", ltrb=True) self.flip_coin = dali.ops.CoinFlip(probability=0.5) self.box_encoder = dali.ops.BoxEncoder( device="cpu", criteria=0.5, anchors=self._to_normalized_ltrb_list(anchors, data_shape), offset=True, stds=[0.1, 0.1, 0.2, 0.2], scale=data_shape) def _to_normalized_ltrb_list(self, anchors, size): """Prepare anchors into ltrb (normalized DALI anchors format list)""" if isinstance(anchors, list): return anchors anchors_np = anchors.squeeze().asnumpy() anchors_np_ltrb = anchors_np.copy() anchors_np_ltrb[:, 0] = anchors_np[:, 0] - 0.5 * anchors_np[:, 2] anchors_np_ltrb[:, 1] = anchors_np[:, 1] - 0.5 * anchors_np[:, 3] anchors_np_ltrb[:, 2] = anchors_np[:, 0] + 0.5 * anchors_np[:, 2] anchors_np_ltrb[:, 3] = anchors_np[:, 1] + 0.5 * anchors_np[:, 3] anchors_np_ltrb /= size return anchors_np_ltrb.flatten().tolist()
[docs] def define_graph(self): """ Define the DALI graph. """ saturation = self.rng1() contrast = self.rng1() brightness = self.rng2() hue = self.rng3() coin_rnd = self.flip_coin() images, bboxes, labels = self.dataset_reader() crop_begin, crop_size, bboxes, labels = self.crop(bboxes, labels) images = self.slice(images, crop_begin, crop_size) images = self.flip(images, horizontal=coin_rnd) bboxes = self.bbflip(bboxes, horizontal=coin_rnd) images = self.resize(images) images = images.gpu() images = self.twist( images, saturation=saturation, contrast=contrast, brightness=brightness, hue=hue) images = self.normalize(images) bboxes, labels = self.box_encoder(bboxes, labels) return (images, bboxes.gpu(), labels.gpu())