Source code for gluoncv.data.dataloader
"""DataLoader utils."""
import io
import pickle
import multiprocessing
from multiprocessing.reduction import ForkingPickler
import numpy as np
from mxnet import nd
from mxnet import context
from mxnet.gluon.data.dataloader import DataLoader, _MultiWorkerIter
from mxnet.gluon.data.dataloader import default_mp_batchify_fn, default_batchify_fn
def default_pad_batchify_fn(data):
"""Collate data into batch, labels are padded to same shape"""
if isinstance(data[0], nd.NDArray):
return nd.stack(*data)
elif isinstance(data[0], tuple):
data = zip(*data)
return [default_pad_batchify_fn(i) for i in data]
else:
data = np.asarray(data)
pad = max([l.shape[0] for l in data] + [1,])
buf = np.full((len(data), pad, data[0].shape[-1]), -1, dtype=data[0].dtype)
for i, l in enumerate(data):
buf[i][:l.shape[0], :] = l
return nd.array(buf, dtype=data[0].dtype)
def default_mp_pad_batchify_fn(data):
"""Use shared memory for collating data into batch, labels are padded to same shape"""
if isinstance(data[0], nd.NDArray):
out = nd.empty((len(data),) + data[0].shape, dtype=data[0].dtype,
ctx=context.Context('cpu_shared', 0))
return nd.stack(*data, out=out)
elif isinstance(data[0], tuple):
data = zip(*data)
return [default_mp_pad_batchify_fn(i) for i in data]
else:
data = np.asarray(data)
batch_size = len(data)
pad = max([l.shape[0] for l in data] + [1,])
buf = np.full((batch_size, pad, data[0].shape[-1]), -1, dtype=data[0].dtype)
for i, l in enumerate(data):
buf[i][:l.shape[0], :] = l
return nd.array(buf, dtype=data[0].dtype, ctx=context.Context('cpu_shared', 0))
def tsn_mp_batchify_fn(data):
"""Collate data into batch. Use shared memory for stacking.
Modify default batchify function for temporal segment networks.
Change `nd.stack` to `nd.concat` since batch dimension already exists.
"""
if isinstance(data[0], nd.NDArray):
return nd.concat(*data, dim=0)
elif isinstance(data[0], tuple):
data = zip(*data)
return [tsn_mp_batchify_fn(i) for i in data]
else:
data = np.asarray(data)
return nd.array(data, dtype=data.dtype,
ctx=context.Context('cpu_shared', 0))
[docs]class DetectionDataLoader(DataLoader):
"""Data loader for detection dataset.
.. deprecated:: 0.2.0
:py:class:`DetectionDataLoader` is deprecated,
please use :py:class:`mxnet.gluon.data.DataLoader` with
batchify functions listed in `gluoncv.data.batchify` directly.
It loads data batches from a dataset and then apply data
transformations. It's a subclass of :py:class:`mxnet.gluon.data.DataLoader`,
and therefore has very similar APIs.
The main purpose of the DataLoader is to pad variable length of labels from
each image, because they have different amount of objects.
Parameters
----------
dataset : mxnet.gluon.data.Dataset or numpy.ndarray or mxnet.ndarray.NDArray
The source dataset.
batch_size : int
The size of mini-batch.
shuffle : bool, default False
If or not randomly shuffle the samples. Often use True for training
dataset and False for validation/test datasets
sampler : mxnet.gluon.data.Sampler, default None
The sampler to use. We should either specify a sampler or enable
shuffle, not both, because random shuffling is a sampling method.
last_batch : {'keep', 'discard', 'rollover'}, default is keep
How to handle the last batch if the batch size does not evenly divide by
the number of examples in the dataset. There are three options to deal
with the last batch if its size is smaller than the specified batch
size.
- keep: keep it
- discard: throw it away
- rollover: insert the examples to the beginning of the next batch
batch_sampler : mxnet.gluon.data.BatchSampler
A sampler that returns mini-batches. Do not specify batch_size,
shuffle, sampler, and last_batch if batch_sampler is specified.
batchify_fn : callable
Callback function to allow users to specify how to merge samples
into a batch.
Defaults to :py:meth:`gluoncv.data.dataloader.default_pad_batchify_fn`::
def default_pad_batchify_fn(data):
if isinstance(data[0], nd.NDArray):
return nd.stack(*data)
elif isinstance(data[0], tuple):
data = zip(*data)
return [pad_batchify(i) for i in data]
else:
data = np.asarray(data)
pad = max([l.shape[0] for l in data])
buf = np.full((len(data), pad, data[0].shape[-1]),
-1, dtype=data[0].dtype)
for i, l in enumerate(data):
buf[i][:l.shape[0], :] = l
return nd.array(buf, dtype=data[0].dtype)
num_workers : int, default 0
The number of multiprocessing workers to use for data preprocessing.
If ``num_workers`` = 0, multiprocessing is disabled.
Otherwise ``num_workers`` multiprocessing worker is used to process data.
"""
def __init__(self, dataset, batch_size=None, shuffle=False, sampler=None,
last_batch=None, batch_sampler=None, batchify_fn=None,
num_workers=0):
import warnings
warnings.warn('DetectionDataLoader is deprecated. ' +
'Please use mxnet.gluon.data.DataLoader '
'with batchify functions directly.')
if batchify_fn is None:
if num_workers > 0:
batchify_fn = default_mp_pad_batchify_fn
else:
batchify_fn = default_pad_batchify_fn
super(DetectionDataLoader, self).__init__(
dataset, batch_size, shuffle, sampler, last_batch,
batch_sampler, batchify_fn, num_workers)
_worker_dataset = None
def _worker_initializer(dataset):
"""Initializer for processing pool."""
# global dataset is per-process based and only available in worker processes
# this is only necessary to handle MXIndexedRecordIO because otherwise dataset
# can be passed as argument
global _worker_dataset
_worker_dataset = dataset
def _worker_fn(samples, transform_fn, batchify_fn):
"""Function for processing data in worker process."""
# it is required that each worker process has to fork a new MXIndexedRecordIO handle
# preserving dataset as global variable can save tons of overhead and is safe in new process
global _worker_dataset
t_dataset = _worker_dataset.transform(transform_fn)
batch = batchify_fn([t_dataset[i] for i in samples])
buf = io.BytesIO()
ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(batch)
return buf.getvalue()
class _RandomTransformMultiWorkerIter(_MultiWorkerIter):
"""Internal multi-worker iterator for DataLoader."""
def __init__(self, transform_fns, interval, worker_pool, batchify_fn, batch_sampler,
pin_memory=False, pin_device_id=0, worker_fn=_worker_fn, prefetch=0):
super(_RandomTransformMultiWorkerIter, self).__init__(
worker_pool, batchify_fn, batch_sampler, pin_memory=pin_memory,
worker_fn=worker_fn, prefetch=0)
self._transform_fns = transform_fns
self._current_fn = np.random.choice(self._transform_fns)
self._interval = max(int(interval), 1)
self._pin_device_id = pin_device_id
# pre-fetch, super class was inited without prefetch
for _ in range(prefetch):
self._push_next()
def _push_next(self):
"""Assign next batch workload to workers."""
r = next(self._iter, None)
if r is None:
return
if self._sent_idx % self._interval == 0:
self._current_fn = np.random.choice(self._transform_fns)
async_ret = self._worker_pool.apply_async(
self._worker_fn, (r, self._current_fn, self._batchify_fn))
self._data_buffer[self._sent_idx] = async_ret
self._sent_idx += 1
class RandomTransformDataLoader(DataLoader):
"""DataLoader that support random transform function applied to dataset.
Parameters
----------
transform_fns : iterable of callables
Transform functions that takes a sample as input and returns the transformed sample.
They will be randomly selected during the dataloader iteration.
dataset : mxnet.gluon.data.Dataset or numpy.ndarray or mxnet.ndarray.NDArray
The source dataset. Original dataset is recommended here since we will apply transform
function from candidates again during the iteration.
interval : int, default is 1
For every `interval` batches, transform function is randomly selected from candidates.
batch_size : int
The size of mini-batch.
shuffle : bool, default False
If or not randomly shuffle the samples. Often use True for training
dataset and False for validation/test datasets
sampler : mxnet.gluon.data.Sampler, default None
The sampler to use. We should either specify a sampler or enable
shuffle, not both, because random shuffling is a sampling method.
last_batch : {'keep', 'discard', 'rollover'}, default is keep
How to handle the last batch if the batch size does not evenly divide by
the number of examples in the dataset. There are three options to deal
with the last batch if its size is smaller than the specified batch
size.
- keep: keep it
- discard: throw it away
- rollover: insert the examples to the beginning of the next batch
batch_sampler : mxnet.gluon.data.BatchSampler
A sampler that returns mini-batches. Do not specify batch_size,
shuffle, sampler, and last_batch if batch_sampler is specified.
batchify_fn : callable
Callback function to allow users to specify how to merge samples
into a batch.
Defaults to :py:meth:`gluoncv.data.dataloader.default_pad_batchify_fn`::
def default_pad_batchify_fn(data):
if isinstance(data[0], nd.NDArray):
return nd.stack(*data)
elif isinstance(data[0], tuple):
data = zip(*data)
return [pad_batchify(i) for i in data]
else:
data = np.asarray(data)
pad = max([l.shape[0] for l in data])
buf = np.full((len(data), pad, data[0].shape[-1]),
-1, dtype=data[0].dtype)
for i, l in enumerate(data):
buf[i][:l.shape[0], :] = l
return nd.array(buf, dtype=data[0].dtype)
num_workers : int, default 0
The number of multiprocessing workers to use for data preprocessing.
If ``num_workers`` = 0, multiprocessing is disabled.
Otherwise ``num_workers`` multiprocessing worker is used to process data.
pin_memory : boolean, default False
If ``True``, the dataloader will copy NDArrays into pinned memory
before returning them. Copying from CPU pinned memory to GPU is faster
than from normal CPU memory.
pin_device_id : int, default 0
The device id to use for allocating pinned memory if pin_memory is ``True``
prefetch : int, default is `num_workers * 2`
The number of prefetching batches only works if `num_workers` > 0.
If `prefetch` > 0, it allow worker process to prefetch certain batches before
acquiring data from iterators.
Note that using large prefetching batch will provide smoother bootstrapping performance,
but will consume more shared_memory. Using smaller number may forfeit the purpose of using
multiple worker processes, try reduce `num_workers` in this case.
By default it defaults to `num_workers * 2`.
"""
def __init__(self, transform_fns, dataset, interval=1, batch_size=None, shuffle=False,
sampler=None, last_batch=None, batch_sampler=None, batchify_fn=None,
num_workers=0, pin_memory=False, pin_device_id=0, prefetch=None):
super(RandomTransformDataLoader, self).__init__(
dataset=dataset, batch_size=batch_size, shuffle=shuffle, sampler=sampler,
last_batch=last_batch, batch_sampler=batch_sampler, batchify_fn=batchify_fn,
num_workers=0, pin_memory=pin_memory)
self._transform_fns = transform_fns
assert len(self._transform_fns) > 0
self._interval = max(int(interval), 1)
# override
self._pin_device_id = pin_device_id
self._num_workers = num_workers if num_workers >= 0 else 0
self._worker_pool = None
self._prefetch = max(0, int(prefetch) if prefetch is not None else 2 * self._num_workers)
if self._num_workers > 0:
self._worker_pool = multiprocessing.Pool(
self._num_workers, initializer=_worker_initializer, initargs=[self._dataset])
if batchify_fn is None:
if num_workers > 0:
self._batchify_fn = default_mp_batchify_fn
else:
self._batchify_fn = default_batchify_fn
else:
self._batchify_fn = batchify_fn
def __iter__(self):
if self._num_workers == 0:
def same_process_iter():
t = np.random.choice(self._transform_fns)
for ib, batch in enumerate(self._batch_sampler):
if ib % self._interval == 0:
t = np.random.choice(self._transform_fns)
yield self._batchify_fn([self._dataset.transform(t)[idx] for idx in batch])
return same_process_iter()
else:
return _RandomTransformMultiWorkerIter(
self._transform_fns, self._interval, self._worker_pool, self._batchify_fn,
self._batch_sampler, pin_memory=self._pin_memory, pin_device_id=self._pin_device_id,
worker_fn=_worker_fn, prefetch=self._prefetch)
def __del__(self):
if self._worker_pool:
# manually terminate due to a bug that pool is not automatically terminated
assert isinstance(self._worker_pool, multiprocessing.pool.Pool)
self._worker_pool.terminate()