Source code for gluoncv.data.dataloader
"""DataLoader utils."""
import io
import pickle
import multiprocessing
from multiprocessing.reduction import ForkingPickler
import numpy as np
from mxnet import nd
from mxnet import context
from mxnet.gluon.data.dataloader import DataLoader, _MultiWorkerIter
from mxnet.gluon.data.dataloader import default_mp_batchify_fn, default_batchify_fn
def default_pad_batchify_fn(data):
    """Collate data into batch, labels are padded to same shape"""
    if isinstance(data[0], nd.NDArray):
        return nd.stack(*data)
    elif isinstance(data[0], tuple):
        data = zip(*data)
        return [default_pad_batchify_fn(i) for i in data]
    else:
        data = np.asarray(data)
        pad = max([l.shape[0] for l in data] + [1,])
        buf = np.full((len(data), pad, data[0].shape[-1]), -1, dtype=data[0].dtype)
        for i, l in enumerate(data):
            buf[i][:l.shape[0], :] = l
        return nd.array(buf, dtype=data[0].dtype)
def default_mp_pad_batchify_fn(data):
    """Use shared memory for collating data into batch, labels are padded to same shape"""
    if isinstance(data[0], nd.NDArray):
        out = nd.empty((len(data),) + data[0].shape, dtype=data[0].dtype,
                       ctx=context.Context('cpu_shared', 0))
        return nd.stack(*data, out=out)
    elif isinstance(data[0], tuple):
        data = zip(*data)
        return [default_mp_pad_batchify_fn(i) for i in data]
    else:
        data = np.asarray(data)
        batch_size = len(data)
        pad = max([l.shape[0] for l in data] + [1,])
        buf = np.full((batch_size, pad, data[0].shape[-1]), -1, dtype=data[0].dtype)
        for i, l in enumerate(data):
            buf[i][:l.shape[0], :] = l
        return nd.array(buf, dtype=data[0].dtype, ctx=context.Context('cpu_shared', 0))
def tsn_mp_batchify_fn(data):
    """Collate data into batch. Use shared memory for stacking.
    Modify default batchify function for temporal segment networks.
    Change `nd.stack` to `nd.concat` since batch dimension already exists.
    """
    if isinstance(data[0], nd.NDArray):
        return nd.concat(*data, dim=0)
    elif isinstance(data[0], tuple):
        data = zip(*data)
        return [tsn_mp_batchify_fn(i) for i in data]
    else:
        data = np.asarray(data)
        return nd.array(data, dtype=data.dtype,
                        ctx=context.Context('cpu_shared', 0))
[docs]class DetectionDataLoader(DataLoader):
    """Data loader for detection dataset.
    .. deprecated:: 0.2.0
        :py:class:`DetectionDataLoader` is deprecated,
        please use :py:class:`mxnet.gluon.data.DataLoader` with
        batchify functions listed in `gluoncv.data.batchify` directly.
    It loads data batches from a dataset and then apply data
    transformations. It's a subclass of :py:class:`mxnet.gluon.data.DataLoader`,
    and therefore has very similar APIs.
    The main purpose of the DataLoader is to pad variable length of labels from
    each image, because they have different amount of objects.
    Parameters
    ----------
    dataset : mxnet.gluon.data.Dataset or numpy.ndarray or mxnet.ndarray.NDArray
        The source dataset.
    batch_size : int
        The size of mini-batch.
    shuffle : bool, default False
        If or not randomly shuffle the samples. Often use True for training
        dataset and False for validation/test datasets
    sampler : mxnet.gluon.data.Sampler, default None
        The sampler to use. We should either specify a sampler or enable
        shuffle, not both, because random shuffling is a sampling method.
    last_batch : {'keep', 'discard', 'rollover'}, default is keep
        How to handle the last batch if the batch size does not evenly divide by
        the number of examples in the dataset. There are three options to deal
        with the last batch if its size is smaller than the specified batch
        size.
        - keep: keep it
        - discard: throw it away
        - rollover: insert the examples to the beginning of the next batch
    batch_sampler : mxnet.gluon.data.BatchSampler
        A sampler that returns mini-batches. Do not specify batch_size,
        shuffle, sampler, and last_batch if batch_sampler is specified.
    batchify_fn : callable
        Callback function to allow users to specify how to merge samples
        into a batch.
        Defaults to :py:meth:`gluoncv.data.dataloader.default_pad_batchify_fn`::
            def default_pad_batchify_fn(data):
                if isinstance(data[0], nd.NDArray):
                    return nd.stack(*data)
                elif isinstance(data[0], tuple):
                    data = zip(*data)
                    return [pad_batchify(i) for i in data]
                else:
                    data = np.asarray(data)
                    pad = max([l.shape[0] for l in data])
                    buf = np.full((len(data), pad, data[0].shape[-1]),
                                  -1, dtype=data[0].dtype)
                    for i, l in enumerate(data):
                        buf[i][:l.shape[0], :] = l
                    return nd.array(buf, dtype=data[0].dtype)
    num_workers : int, default 0
        The number of multiprocessing workers to use for data preprocessing.
        If ``num_workers`` = 0, multiprocessing is disabled.
        Otherwise ``num_workers`` multiprocessing worker is used to process data.
    """
    def __init__(self, dataset, batch_size=None, shuffle=False, sampler=None,
                 last_batch=None, batch_sampler=None, batchify_fn=None,
                 num_workers=0):
        import warnings
        warnings.warn('DetectionDataLoader is deprecated. ' +
                      'Please use mxnet.gluon.data.DataLoader '
                      'with batchify functions directly.')
        if batchify_fn is None:
            if num_workers > 0:
                batchify_fn = default_mp_pad_batchify_fn
            else:
                batchify_fn = default_pad_batchify_fn
        super(DetectionDataLoader, self).__init__(
            dataset, batch_size, shuffle, sampler, last_batch,
            batch_sampler, batchify_fn, num_workers)
_worker_dataset = None
def _worker_initializer(dataset):
    """Initializer for processing pool."""
    # global dataset is per-process based and only available in worker processes
    # this is only necessary to handle MXIndexedRecordIO because otherwise dataset
    # can be passed as argument
    global _worker_dataset
    _worker_dataset = dataset
def _worker_fn(samples, transform_fn, batchify_fn):
    """Function for processing data in worker process."""
    # it is required that each worker process has to fork a new MXIndexedRecordIO handle
    # preserving dataset as global variable can save tons of overhead and is safe in new process
    global _worker_dataset
    t_dataset = _worker_dataset.transform(transform_fn)
    batch = batchify_fn([t_dataset[i] for i in samples])
    buf = io.BytesIO()
    ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(batch)
    return buf.getvalue()
class _RandomTransformMultiWorkerIter(_MultiWorkerIter):
    """Internal multi-worker iterator for DataLoader."""
    def __init__(self, transform_fns, interval, worker_pool, batchify_fn, batch_sampler,
                 pin_memory=False, pin_device_id=0, worker_fn=_worker_fn, prefetch=0):
        super(_RandomTransformMultiWorkerIter, self).__init__(
            worker_pool, batchify_fn, batch_sampler, pin_memory=pin_memory,
            worker_fn=worker_fn, prefetch=0)
        self._transform_fns = transform_fns
        self._current_fn = np.random.choice(self._transform_fns)
        self._interval = max(int(interval), 1)
        self._pin_device_id = pin_device_id
        # pre-fetch, super class was inited without prefetch
        for _ in range(prefetch):
            self._push_next()
    def _push_next(self):
        """Assign next batch workload to workers."""
        r = next(self._iter, None)
        if r is None:
            return
        if self._sent_idx % self._interval == 0:
            self._current_fn = np.random.choice(self._transform_fns)
        async_ret = self._worker_pool.apply_async(
            self._worker_fn, (r, self._current_fn, self._batchify_fn))
        self._data_buffer[self._sent_idx] = async_ret
        self._sent_idx += 1
class RandomTransformDataLoader(DataLoader):
    """DataLoader that support random transform function applied to dataset.
    Parameters
    ----------
    transform_fns : iterable of callables
        Transform functions that takes a sample as input and returns the transformed sample.
        They will be randomly selected during the dataloader iteration.
    dataset : mxnet.gluon.data.Dataset or numpy.ndarray or mxnet.ndarray.NDArray
        The source dataset. Original dataset is recommended here since we will apply transform
        function from candidates again during the iteration.
    interval : int, default is 1
        For every `interval` batches, transform function is randomly selected from candidates.
    batch_size : int
        The size of mini-batch.
    shuffle : bool, default False
        If or not randomly shuffle the samples. Often use True for training
        dataset and False for validation/test datasets
    sampler : mxnet.gluon.data.Sampler, default None
        The sampler to use. We should either specify a sampler or enable
        shuffle, not both, because random shuffling is a sampling method.
    last_batch : {'keep', 'discard', 'rollover'}, default is keep
        How to handle the last batch if the batch size does not evenly divide by
        the number of examples in the dataset. There are three options to deal
        with the last batch if its size is smaller than the specified batch
        size.
        - keep: keep it
        - discard: throw it away
        - rollover: insert the examples to the beginning of the next batch
    batch_sampler : mxnet.gluon.data.BatchSampler
        A sampler that returns mini-batches. Do not specify batch_size,
        shuffle, sampler, and last_batch if batch_sampler is specified.
    batchify_fn : callable
        Callback function to allow users to specify how to merge samples
        into a batch.
        Defaults to :py:meth:`gluoncv.data.dataloader.default_pad_batchify_fn`::
            def default_pad_batchify_fn(data):
                if isinstance(data[0], nd.NDArray):
                    return nd.stack(*data)
                elif isinstance(data[0], tuple):
                    data = zip(*data)
                    return [pad_batchify(i) for i in data]
                else:
                    data = np.asarray(data)
                    pad = max([l.shape[0] for l in data])
                    buf = np.full((len(data), pad, data[0].shape[-1]),
                                  -1, dtype=data[0].dtype)
                    for i, l in enumerate(data):
                        buf[i][:l.shape[0], :] = l
                    return nd.array(buf, dtype=data[0].dtype)
    num_workers : int, default 0
        The number of multiprocessing workers to use for data preprocessing.
        If ``num_workers`` = 0, multiprocessing is disabled.
        Otherwise ``num_workers`` multiprocessing worker is used to process data.
    pin_memory : boolean, default False
        If ``True``, the dataloader will copy NDArrays into pinned memory
        before returning them. Copying from CPU pinned memory to GPU is faster
        than from normal CPU memory.
    pin_device_id : int, default 0
        The device id to use for allocating pinned memory if pin_memory is ``True``
    prefetch : int, default is `num_workers * 2`
        The number of prefetching batches only works if `num_workers` > 0.
        If `prefetch` > 0, it allow worker process to prefetch certain batches before
        acquiring data from iterators.
        Note that using large prefetching batch will provide smoother bootstrapping performance,
        but will consume more shared_memory. Using smaller number may forfeit the purpose of using
        multiple worker processes, try reduce `num_workers` in this case.
        By default it defaults to `num_workers * 2`.
    """
    def __init__(self, transform_fns, dataset, interval=1, batch_size=None, shuffle=False,
                 sampler=None, last_batch=None, batch_sampler=None, batchify_fn=None,
                 num_workers=0, pin_memory=False, pin_device_id=0, prefetch=None):
        super(RandomTransformDataLoader, self).__init__(
            dataset=dataset, batch_size=batch_size, shuffle=shuffle, sampler=sampler,
            last_batch=last_batch, batch_sampler=batch_sampler, batchify_fn=batchify_fn,
            num_workers=0, pin_memory=pin_memory)
        self._transform_fns = transform_fns
        assert len(self._transform_fns) > 0
        self._interval = max(int(interval), 1)
        # override
        self._pin_device_id = pin_device_id
        self._num_workers = num_workers if num_workers >= 0 else 0
        self._worker_pool = None
        self._prefetch = max(0, int(prefetch) if prefetch is not None else 2 * self._num_workers)
        if self._num_workers > 0:
            self._worker_pool = multiprocessing.Pool(
                self._num_workers, initializer=_worker_initializer, initargs=[self._dataset])
        if batchify_fn is None:
            if num_workers > 0:
                self._batchify_fn = default_mp_batchify_fn
            else:
                self._batchify_fn = default_batchify_fn
        else:
            self._batchify_fn = batchify_fn
    def __iter__(self):
        if self._num_workers == 0:
            def same_process_iter():
                t = np.random.choice(self._transform_fns)
                for ib, batch in enumerate(self._batch_sampler):
                    if ib % self._interval == 0:
                        t = np.random.choice(self._transform_fns)
                    yield self._batchify_fn([self._dataset.transform(t)[idx] for idx in batch])
            return same_process_iter()
        else:
            return _RandomTransformMultiWorkerIter(
                self._transform_fns, self._interval, self._worker_pool, self._batchify_fn,
                self._batch_sampler, pin_memory=self._pin_memory, pin_device_id=self._pin_device_id,
                worker_fn=_worker_fn, prefetch=self._prefetch)
    def __del__(self):
        if self._worker_pool:
            # manually terminate due to a bug that pool is not automatically terminated
            assert isinstance(self._worker_pool, multiprocessing.pool.Pool)
            self._worker_pool.terminate()