cellxgene_census.experimental.ml.pytorch.ExperimentDataPipe

class cellxgene_census.experimental.ml.pytorch.ExperimentDataPipe(experiment: Experiment, measurement_name: str = 'raw', X_name: str = 'X', obs_query: AxisQuery | None = None, var_query: AxisQuery | None = None, obs_column_names: Sequence[str] = (), batch_size: int = 1, shuffle: bool = False, seed: int | None = None, return_sparse_X: bool = False, soma_chunk_size: int | None = None, use_eager_fetch: bool = True)

An iterable-style PyTorch DataPipe that reads obs and X data from a SOMA Experiment, based upon the specified queries along the obs and var axes. Provides an iterator over these data when the object is passed to Python’s built-in iter function:

>>> for batch in iter(ExperimentDataPipe(...)):
        X_batch, y_batch = batch

The batch_size parameter controls the number of rows of obs and X data that are returned in each iteration. If the batch_size is 1, then each Tensor will have rank 1:

>>> (tensor([0., 0., 0., 0., 0., 1., 0., 0., 0.]),  # X data
     tensor([2415,    0,    0], dtype=torch.int64)) # obs data, encoded

For larger batch_size values, the returned Tensors will have rank 2:

>>> DataLoader(..., batch_size=3, ...):
    (tensor([[0., 0., 0., 0., 0., 1., 0., 0., 0.],     # X batch
             [0., 0., 0., 0., 0., 0., 0., 0., 0.],
             [0., 0., 0., 0., 0., 0., 0., 0., 0.]]),
     tensor([[2415,    0,    0],                       # obs batch
             [2416,    0,    4],
             [2417,    0,    3]], dtype=torch.int64))

The return_sparse_X parameter controls whether the X data is returned as a dense or sparse Tensor. If the model supports use of sparse Tensors, this will reduce memory usage.

The obs_column_names parameter determines the data columns that are returned in the obs Tensor. The first element is always the soma_joinid of the obs DataFrame (or, equiavalently, the soma_dim_0 of the X matrix). The remaining elements are the obs columns specified by obs_column_names, and string-typed columns are encoded as integer values. If needed, these values can be decoded by obtaining the encoder for a given obs column name and calling its inverse_transform method:

>>> exp_data_pipe.obs_encoders["<obs_attr_name>"].inverse_transform(encoded_values)

Lifecycle

experimental

__init__(experiment: Experiment, measurement_name: str = 'raw', X_name: str = 'X', obs_query: AxisQuery | None = None, var_query: AxisQuery | None = None, obs_column_names: Sequence[str] = (), batch_size: int = 1, shuffle: bool = False, seed: int | None = None, return_sparse_X: bool = False, soma_chunk_size: int | None = None, use_eager_fetch: bool = True) None

Construct a new ExperimentDataPipe.

Parameters:
  • experiment – The SOMA Experiment from which to read data.

  • measurement_name – The name of the SOMA Measurement to read. Defaults to “raw”.

  • X_name – The name of the X layer to read. Defaults to “X”.

  • obs_query – The query used to filter along the obs axis. If not specified, all obs and X data will be returned, which can be very large.

  • var_query – The query used to filter along the var axis. If not specified, all var columns (genes/features) will be returned.

  • obs_column_names – The names of the obs columns to return. The soma_joinid index “column” does not need to be specified and will always be returned. If not specified, only the soma_joinid will be returned.

  • batch_size – The number of rows of obs and X data to return in each iteration. Defaults to 1. A value of 1 will result in Tensors of rank 1 being returns (a single row); larger values will result in Tensors of rank 2 (multiple rows).

  • shuffle – Whether to shuffle the obs and X data being returned. Defaults to False (no shuffling). For performance reasons, shuffling is performed in two steps: 1) a global shuffling, where contiguous rows are grouped into chunks and the order of the chunks is randomized, and then 2) a local shuffling, where the rows within each chunk are shuffled. Since ExperimentDataPipe must retrieve data in chunks (to keep memory requirements to a fixed size), global shuffling ensures that a given row in the shuffled result can originate from any position in the non-shuffled result ordering. If shuffling only occurred within each chunk (i.e. “local” shuffling), the first chunk’s rows would always be returned first, the second chunk’s rows would always be returned second, and so on. The chunk size is determined by the soma_chunk_size parameter. Note that rows within a chunk will maintain proximity, even after shuffling, so some experimentation may be required to ensure the shuffling is sufficient for the model training process. To this end, the soma_chunk_size can be treated as a hyperparameter that can be tuned.

  • seed – The random seed used for shuffling. Defaults to None (no seed). This must be specified when using DistributedDataParallel to ensure data partitions are disjoint across worker processes.

  • return_sparse_X – Controls whether the X data is returned as a dense or sparse Tensor. As X data is very sparse, setting this to True will reduce memory usage, if the model supports use of sparse Tensors. Defaults to False, since sparse Tensors are still experimental in PyTorch.

  • soma_chunk_size – The number of obs/X rows to retrieve when reading data from SOMA. This impacts two aspects of ExperimentDataPipe behavior: 1) The maximum memory utilization, with larger values providing better read performance, but also requiring more memory; 2) The granularity of the global shuffling step (see shuffle parameter for details). If not specified, the value is set to utilize ~1 GiB of RAM per SOMA chunk read, based upon the number of var columns (cells/features) being requested and assuming X data sparsity of 95%; the number of rows per chunk will depend on the number of var columns being read.

  • use_eager_fetch – Fetch the next SOMA chunk of obs and X data immediately after a previously fetched SOMA chunk is made available for processing via the iterator. This allows network (or filesystem) requests to be made in parallel with client-side processing of the SOMA data, potentially improving overall performance at the cost of doubling memory utilization. Defaults to True.

Returns:

The constructed ExperimentDataPipe.

Lifecycle

experimental

Methods

__init__(experiment[, measurement_name, ...])

Construct a new ExperimentDataPipe.

register_datapipe_as_function(function_name, ...)

register_function(function_name, function)

reset()

Reset the IterDataPipe to the initial state.

set_getstate_hook(hook_fn)

set_reduce_ex_hook(hook_fn)

stats()

Get data loading stats for this ExperimentDataPipe.

Attributes

functions

getstate_hook

obs_encoders

Returns a dictionary of sklearn.preprocessing.LabelEncoder objects, keyed on obs column names, which were used to encode the obs column values.

reduce_ex_hook

repr_hook

shape

Get the shape of the data that will be returned by this ExperimentDataPipe.

str_hook