Source code for singlet.dataset

# vim: fdm=indent
# author:     Fabio Zanini
# date:       14/08/17
# content:    Dataset that combines feature counts with metadata.
# Modules
import numpy as np


# Classes / functions
[docs]class Dataset(): '''Collection of cells, with feature counts and metadata''' def __init__(self, samplesheet, counts_table): '''Collection of cells, with feature counts and metadata Args: samplesheet (string): Name of the samplesheet (to load from a \ config file) or instance of SampleSheet counts_table (string): Name of the counts table (to load from a \ config file) or instance of CountsTable NOTE: All samples in the counts_table must also be in the \ samplesheet, but the latter can have additional samples. If \ that is the case, the samplesheet is sliced down to the \ samples present in the counts_table. ''' from ..samplesheet import SampleSheet from ..counts_table import CountsTable from .correlations import Correlation from .plot import Plot from .dimensionality import DimensionalityReduction from .cluster import Cluster if not isinstance(samplesheet, SampleSheet): samplesheet = SampleSheet.from_sheetname(samplesheet) self._samplesheet = samplesheet if not isinstance(counts_table, CountsTable): counts_table = CountsTable.from_tablename(counts_table) self._counts = counts_table assert(self._counts.columns.isin(self._samplesheet.index).all()) self._samplesheet = self._samplesheet.loc[self._counts.columns] # Plugins self.correlation = Correlation(self) self.plot = Plot(self) self.dimensionality = DimensionalityReduction(self) self.cluster = Cluster(self) def __str__(self): return '{:} with {:} samples and {:} features'.format( self.__class__.__name__, self.n_samples, self.n_features) def __repr__(self): return '{:}("{:}", "{:}")'.format( self.__class__.__name__, self._samplesheet.sheetname, self._counts.name) def __eq__(self, other): if type(other) is not type(self): return False # FIXME: fillna(0) is sloppy but not so bad ss = (self._samplesheet.fillna(0) == other._samplesheet.fillna(0)).values.all() ct = (self._counts == other._counts).values.all() return ss and ct def __ne__(self, other): return not self == other def __add__(self, other): '''Merge two Datasets. For samples with the same names, counts will be added and metadata of \ one of the Datasets used. For new samples, the new counts and \ metadata will be used. NOTE: metadata and gene names must be aligned for this operation to \ succeed. If one of the two Datasets has more metadata or \ features than the other, they cannot be added. ''' selfcopy = self.copy() selfcopy += other return selfcopy def __iadd__(self, other): '''Merge two Datasets. For samples with the same names, counts will be added and metadata of \ one of the Datasets used. For new samples, the new counts and \ metadata will be used. NOTE: metadata and gene names must be aligned for this operation to \ succeed. If one of the two Datasets has more metadata or \ features than the other, they cannot be added. ''' if set(self.metadatanames) != set(other.metadatanames): raise IndexError('The Datasets have different metadata') if set(self.featurenames) != set(other.featurenames): raise IndexError('The Datasets have different features') snames = self.samplenames for samplename, meta in other.samplesheet.iterrows(): if samplename not in snames: self.samplesheet.loc[samplename] = meta self.counts.loc[:, samplename] = other.counts.loc[:, samplename] else: self.counts.loc[:, samplename] += other.counts.loc[:, samplename]
[docs] def split(self, phenotypes, copy=True): '''Split Dataset based on one or more categorical phenotypes Args: phenotypes (string or list of strings): one or more phenotypes to \ use for the split. Unique values of combinations of these \ determine the split Datasets. Returns: dict of Datasets: the keys are either unique values of the \ phenotype chosen or, if more than one, tuples of unique \ combinations. ''' from itertools import product if isinstance(phenotypes, str): phenotypes = [phenotypes] phenos_uniques = [tuple(set(self.samplesheet.loc[:, p])) for p in phenotypes] dss = {} for comb in product(*phenos_uniques): ind = np.ones(self.n_samples, bool) for p, val in zip(phenotypes, comb): ind &= self.samplesheet.loc[:, p] == val if ind.sum(): samplesheet = self.samplesheet.loc[ind] counts = self.counts.loc[:, ind] if copy: samplesheet = samplesheet.copy() counts = counts.copy() if len(phenotypes) == 1: label = comb[0] else: label = comb dss[label] = self.__class__( samplesheet=samplesheet, counts_table=counts, ) return dss
@property def n_samples(self): '''Number of samples''' return self._samplesheet.shape[0] @property def n_features(self): '''Number of features''' return self._counts.shape[0] @property def samplenames(self): '''pandas.Index of sample names''' return self._samplesheet.index.copy() @property def featurenames(self): '''pandas.Index of feature names''' return self._counts.index.copy() @property def metadatanames(self): '''pandas.Index of metadata column names''' return self._samplesheet.columns.copy() @property def samplesheet(self): '''Matrix of metadata. Rows are samples, columns are metadata (e.g. phenotypes). ''' return self._samplesheet @samplesheet.setter def samplesheet(self, value): self._counts = self._counts.loc[:, value.index] self._samplesheet = value @property def counts(self): '''Matrix of gene expression counts. Rows are features, columns are samples. ''' return self._counts @counts.setter def counts(self, value): self._samplesheet = self._samplesheet.loc[value.columns] self._counts = value
[docs] def copy(self): '''Copy of the Dataset including a new SampleSheet and CountsTable''' return self.__class__( self._samplesheet.copy(), self._counts.copy())
[docs] def query_samples_by_counts(self, expression, inplace=False): '''Select samples based on gene expression. Args: expression (string): An expression compatible with pandas.DataFrame.query. inplace (bool): Whether to change the Dataset in place or return a new one. Returns: If `inplace` is True, None. Else, a Dataset. ''' counts = self._counts.copy() drop = [] if ('total' in expression) and ('total' not in counts.index): counts.loc['total'] = counts.sum(axis=0) drop.append('total') if ('mapped' in expression) and ('mapped' not in counts.index): counts.loc['mapped'] = counts.exclude_features(spikeins=True, other=True).sum(axis=0) drop.append('mapped') counts_table = counts.T.query(expression, inplace=False).T if drop: counts_table.drop(drop, axis=0, inplace=True) if inplace: self.counts = counts_table else: samplesheet = self._samplesheet.loc[counts_table.columns].copy() return self.__class__( samplesheet=samplesheet, counts_table=counts_table)
[docs] def query_features(self, expression, inplace=False): '''Select features based on their expression. Args: expression (string): An expression compatible with pandas.DataFrame.query. inplace (bool): Whether to change the Dataset in place or return a new one. Returns: If `inplace` is True, None. Else, a Dataset. ''' if inplace: self._counts.query(expression, inplace=True) else: counts_table = self._counts.query(expression, inplace=False) samplesheet = self._samplesheet.copy() return self.__class__( samplesheet=samplesheet, counts_table=counts_table)