# vim: fdm=indent
# author: Fabio Zanini
# date: 09/08/17
# content: Table of gene counts
# Modules
import numpy as np
import pandas as pd
# Classes / functions
[docs]class CountsTable(pd.DataFrame):
'''Table of gene expression counts
- Rows are features, e.g. genes.
- Columns are samples.
'''
_metadata = [
'name',
'_spikeins', '_otherfeatures',
'_normalized', 'pseudocount',
]
pseudocount = 0.1
@property
def _constructor(self):
return CountsTable
@classmethod
[docs] def from_tablename(cls, tablename):
'''Instantiate a CountsTable from its name in the config file.
Args:
tablename (string): name of the counts table in the config file.
Returns:
CountsTable: the counts table.
'''
from .config import config
from .io import parse_counts_table
self = cls(parse_counts_table(tablename))
self.name = tablename
self._spikeins = config['io']['count_tables'][tablename]['spikeins']
self._otherfeatures = config['io']['count_tables'][tablename]['other']
self._normalized = config['io']['count_tables'][tablename]['normalized']
return self
[docs] def exclude_features(self, spikeins=True, other=True, inplace=False,
errors='raise'):
'''Get a slice that excludes secondary features.
Args:
spikeins (bool): Whether to exclude spike-ins
other (bool): Whether to exclude other features, e.g. unmapped reads
inplace (bool): Whether to drop those features in place.
errors (string): Whether to raise an exception if the features \
to be excluded are already not present. Must be 'ignore' \
or 'raise'.
Returns:
CountsTable: a slice of self without those features.
'''
drop = []
if spikeins:
drop.extend(self._spikeins)
if other:
drop.extend(self._otherfeatures)
return self.drop(drop, axis=0, inplace=inplace, errors=errors)
[docs] def get_spikeins(self):
'''Get spike-in features
Returns:
CountsTable: a slice of self with only spike-ins.
'''
return self.loc[self._spikeins]
[docs] def get_other_features(self):
'''Get other features
Returns:
CountsTable: a slice of self with only other features (e.g. unmapped).
'''
return self.loc[self._otherfeatures]
[docs] def log(self, base=10, inplace=False):
'''Take the pseudocounted log of the counts.
Args:
base (float): Base of the log transform
inplace (bool): Whether to do the operation in place or return \
a new CountsTable
Returns:
If inplace is False, a transformed CountsTable.
'''
clog = np.log10(self.pseudocount + self) / np.log10(base)
if inplace:
self[:] = clog.values
else:
return clog
[docs] def unlog(self, base=10, inplace=False):
'''Reverse the pseudocounted log of the counts.
Args:
base (float): Base of the log transform
inplace (bool): Whether to do the operation in place or return \
a new CountsTable
Returns:
If inplace is False, a transformed CountsTable.
'''
cunlog = base**self - self.pseudocount
if inplace:
self[:] = cunlog.values
else:
return cunlog
[docs] def center(self, axis='samples', inplace=False):
'''Center the counts table (subtract mean).
Args:
axis (string): The axis to average over, has to be 'samples' \
or 'features'.
inplace (bool): Whether to do the operation in place or return \
a new CountsTable
Returns:
If inplace is False, a transformed CountsTable.
'''
if inplace:
out = self
else:
out = self.copy()
if axis == 'samples':
out.loc[:] = (self.values.T - self.values.mean(axis=1)).T
elif axis == 'features':
out.loc[:] = self.values - self.values.mean(axis=0)
else:
raise ValueError('Axis not found')
if not inplace:
return out
[docs] def z_score(self, axis='samples', inplace=False, add_to_den=0):
'''Calculate the z scores of the counts table.
In other words, subtract the mean and divide by the standard \
deviation.
Args:
axis (string): The axis to average over, has to be 'samples' \
or 'features'.
inplace (bool): Whether to do the operation in place or return \
a new CountsTable
add_to_den (float): Whether to add a (small) value to the \
denominator to avoid NaNs. 1e-5 or so should be fine.
Returns:
If inplace is False, a transformed CountsTable.
'''
if inplace:
out = self
else:
out = self.copy()
if axis == 'samples':
out.loc[:] = ((self.values.T - self.values.mean(axis=1)) / (add_to_den + self.values.std(axis=1))).T
elif axis == 'features':
out.loc[:] = (self.values - self.values.mean(axis=0)) / (add_to_den + self.values.std(axis=0))
else:
raise ValueError('Axis not found')
if not inplace:
return out
[docs] def standard_scale(self, axis='samples', inplace=False, add_to_den=0):
'''Subtract minimum and divide by (maximum - minimum).
Args:
axis (string): The axis to average over, has to be 'samples' \
or 'features'.
inplace (bool): Whether to do the operation in place or return \
a new CountsTable
add_to_den (float): Whether to add a (small) value to the \
denominator to avoid NaNs. 1e-5 or so should be fine.
Returns:
If inplace is False, a transformed CountsTable.
'''
if inplace:
out = self
else:
out = self.copy()
if axis == 'samples':
mi = self.values.min(axis=1)
ma = self.values.max(axis=1)
out.loc[:] = ((self.values.T - mi) / (add_to_den + ma - mi)).T
elif axis == 'features':
mi = self.values.min(axis=0)
ma = self.values.max(axis=0)
out.loc[:] = (self.values - mi) / (add_to_den + ma - mi)
else:
raise ValueError('Axis not found')
if not inplace:
return out
[docs] def normalize(self, method='counts_per_million', include_spikeins=False, inplace=False, **kwargs):
'''Normalize counts and return new CountsTable.
Args:
method (string or function): The method to use for normalization. \
One of 'counts_per_million', \
'counts_per_thousand_spikeins', \
'counts_per_thousand_features'. If this argument is a \
function, it must take the CountsTable as input and \
return the normalized one as output.
include_spikeins (bool): Whether to include spike-ins in the \
normalization and result.
inplace (bool): Whether to modify the CountsTable in place or \
return a new one.
Returns:
If `inplace` is False, a new, normalized CountsTable.
'''
if self._normalized:
raise ValueError('CountsTable is already normalized')
if method == 'counts_per_million':
counts = self.exclude_features(spikeins=(not include_spikeins), other=True)
counts_norm = 1e6 * counts / counts.sum(axis=0)
elif method == 'counts_per_thousand_spikeins':
counts = self.exclude_features(spikeins=(not include_spikeins), other=True)
norm = self.get_spikeins().sum(axis=0)
counts_norm = 1e3 * counts / norm
elif method == 'counts_per_thousand_features':
if 'features' not in kwargs:
raise ValueError('Set features=<list of normalization features>')
counts = self.exclude_features(spikeins=(not include_spikeins), other=True)
norm = self.loc[kwargs['features']].sum(axis=0)
counts_norm = 1e3 * counts / norm
elif callable(method):
counts_norm = method(self)
method = 'custom'
else:
raise ValueError('Method not understood')
if inplace:
# The new CountsTable lacks other features and maybe spikeins
drop = np.setdiff1d(self.index, counts_norm.index)
self.drop(drop, inplace=True)
self.loc[:, :] = counts_norm
self._normalized = method
else:
counts_norm._normalized = method
return counts_norm
[docs] def get_statistics(self, metrics=('mean', 'cv')):
'''Get statistics of the counts.
Args:
metrics (sequence of strings): any of 'mean', 'var', 'std', 'cv', \
'fano', 'min', 'max'.
Returns:
pandas.DataFrame with features as rows and metrics as columns.
'''
st = {}
if 'mean' in metrics or 'cv' in metrics or 'fano' in metrics:
st['mean'] = self.mean(axis=1)
if ('std' in metrics or 'cv' in metrics or 'fano' in metrics or
'var' in metrics):
st['std'] = self.std(axis=1)
if 'var' in metrics:
st['var'] = st['std'] ** 2
if 'cv' in metrics:
st['cv'] = st['std'] / np.maximum(st['mean'], 1e-10)
if 'fano' in metrics:
st['fano'] = st['std'] ** 2 / np.maximum(st['mean'], 1e-10)
if 'min' in metrics:
st['min'] = self.min(axis=1)
if 'max' in metrics:
st['max'] = self.max(axis=1)
df = pd.concat([st[m] for m in metrics], axis=1)
df.columns = pd.Index(list(metrics), name='metrics')
return df
[docs] def bin(self, bins=5, result='index', inplace=False):
'''Bin feature counts.
Args:
bins (int, array, or list of arrays): If an int, number \
equal-width bins between pseudocounts and the max of \
the counts matrix. If an array of indices of the same \
length as the number of feature, use a different number \
of equal-width bins for each feature. If an array of any \
other length, use these bin edges (including rightmost \
edge) for all features. If a list of arrays, it has to be \
as long as the number of features, and every array in the \
list determines the bin edges (including rightmost edge) \
for that feature, in order.
result (string): Has to be one of 'index' (default), 'left', \
'center', 'right'. 'index' assign to the feature the \
index (starting at 0) of that bin, 'left' assign the left \
bin edge, 'center' the bin center, 'right' the right \
edge. If result is 'index', out-of-bounds values will be \
assigned the value -1, which means Not A Number in ths \
context.
inplace (bool): Whether to perform the operation in place.
Returns:
If inplace is False, a CountsTable with the binned counts.
'''
if result == 'index':
out_dtype = int
else:
out_dtype = float
out = np.zeros_like(self.values, dtype=out_dtype)
nf = self.shape[0]
if np.isscalar(bins):
bins = np.linspace(self.pseudocount, self.values.max(), bins + 1)
bins = np.repeat(bins, nf).reshape((len(bins), nf)).T
elif len(bins) == nf:
bins_new = []
for (key, c), nbin in zip(self.iterrows(), bins):
bins_new.append(np.linspace(self.pseudocount, c.max(), nbin + 1))
bins = bins_new
for i, (bini, (fea, count)) in enumerate(zip(bins, self.iterrows())):
if result == 'index':
labels = False
elif result == 'left':
labels = bini[:-1]
elif result == 'right':
labels = bini[1:]
elif result == 'center':
labels = 0.5 * (bini[1:] + bini[:-1])
else:
raise ValueError('result parameter not understood')
cbin = pd.cut(
np.maximum(self.pseudocount, count), bini,
labels=labels,
right=True,
include_lowest=True)
if result == 'index':
cbin[np.isnan(cbin)] = -1
out[i] = cbin.values
if inplace:
self.loc[:] = out
else:
counts = self.copy()
counts.loc[:] = out
return out