Source code for singlet.dataset.fit

# vim: fdm=indent
# author:     Fabio Zanini
# date:       16/08/17
# content:    Dataset functions to plot gene expression and phenotypes
# Modules
import numpy as np
import pandas as pd
import xarray as xr


# Classes / functions
[docs]class Fit(): '''Fit gene expression and phenotype in single cells''' def __init__(self, dataset): '''Fit gene expression and phenotype in single cells Args: dataset (Dataset): the dataset to analyze. ''' self.dataset = dataset
[docs] def fit_single( self, xs, ys, model, method='least-squares', handle_nans='ignore', **kwargs): '''Fit feature expression or phenotypes against other. Args: xs (list or string): Features and/or phenotypes to use as abscissa (independent variable). The string 'total' means all features including spikeins and other, 'mapped' means all features excluding spikeins and other, 'spikeins' means only spikeins, and 'other' means only 'other' features. ys (list or string): Features and/or phenotypes to use as ordinate (dependent variable). The string 'total' means all features including spikeins and other, 'mapped' means all features excluding spikeins and other, 'spikeins' means only spikeins, and 'other' means only 'other' features. model (string or function): The model to use for fitting. If a string, it must be one of 'linear', 'threshold-linear', 'logistic'. If a function, it must accept an array as first argument (the x) and the parameters as additional arguments (like scipy.optimize.curve_fit). method (string or function): The minimization algorithm. For now, only 'least-squares' is accepted. In this case, the goodness of fit is the sum of the squared residues. handle_nans (string): How to deal with Not a Numbers, typically in the phenotypes. Must be either 'ignore' (default), in which case only the non-NaN samples will be used for fitting, or 'raise', in which case NaNs will stop the fit. **kwargs: Passed to the fit function. For nonlinear least-squares, this is scipy.optimize.curve_fit. Linear least-squares is analytical so it ignores **kwargs. Returns: A 3-dimensional xarray with the xs, ys as first two axes. The third axis, called 'results', contains the parameters and an assessment of the fit quality. If method is least-squres, it is the sum of squared residuals. NOTE: This function fits every combination of x and y independently, interactions are not considered. ''' datad = {'x': {'names': xs, 'data': []}, 'y': {'names': ys, 'data': []}} counts = self.dataset.counts sheet = self.dataset.samplesheet.T for key, dic in datad.items(): args = dic['names'] if args == 'total': counts_k = counts pheno_k = [] elif args == 'mapped': counts_k = counts.exclude_features( spikeins=True, other=True, errors='ignore') pheno_k = [] elif args == 'spikeins': counts_k = counts.get_spikeins() pheno_k = [] elif args == 'other': counts_k = counts.get_other_features() pheno_k = [] else: counts_k = counts.loc[counts.index.isin(args)] pheno_k = sheet.loc[sheet.index.isin(args)] if len(counts_k): dic['data'].append(counts_k) if len(pheno_k): dic['data'].append(pheno_k) if len(dic['data']) == 0: raise ValueError('Data for {:} are empty!'.format(key)) elif len(dic['data']) == 1: dic['data'] = dic['data'][0] else: dic['data'] = pd.concat(dic['data'], axis=0) # Prepare output structure if model == 'linear': n_pars = 2 coords_par = ['slope', 'intercept'] elif model == 'threshold-linear': n_pars = 3 coords_par = ['baseline', 'intercept', 'slope'] elif model == 'logistic': n_pars = 4 coords_par = ['initial', 'final', 'mid-point', 'slope'] elif isinstance(model, str): raise ValueError('model format not accepted') else: from inspect import signature sig = signature(model) n_pars = len(sig.parameters) - 1 coords_par = [p.name for p in sig.parameters[1:]] if method == 'least-squares': gof_name = 'sum_squared_residuals' else: raise ValueError('Only least-squares is implemented') n_x = datad['x']['data'].shape[0] n_y = datad['y']['data'].shape[0] coords_x = datad['x']['data'].index.tolist() coords_y = datad['y']['data'].index.tolist() res = xr.DataArray( np.zeros((n_x, n_y, n_pars + 1)), dims=['x', 'y', 'parameters'], coords={'x': coords_x, 'y': coords_y, 'parameters': coords_par + [gof_name]}) # Set up fitting environment if method == 'least-squares': from scipy.optimize import curve_fit from scipy.stats import linregress if model == 'linear': def fun(x, s, i): return i + s * x elif model == 'threshold-linear': def fun(x, b, i, s): y = i + s * x t = (b - i) / s y[x <= t] = b return y elif model == 'logistic': def fun(x, i, f, mp, s): return i + (f - i) / (1 + np.exp(-s * (x - mp))) else: fun = model # Fit for key_x, data_x in datad['x']['data'].iterrows(): for key_y, data_y in datad['y']['data'].iterrows(): x = data_x.values.astype(float) y = data_y.values.astype(float) # Mask NaNs isnan = np.zeros_like(x, bool) isnan |= np.isnan(x) isnan |= np.isnan(y) if isnan.sum() != 0: if handle_nans == 'raise': raise ValueError('NaNs found in data.') else: x = x[~isnan] y = y[~isnan] if model == 'linear': popt = linregress(x, y)[:2] else: popt, pcov = curve_fit( fun, xdata=x, ydata=y, **kwargs) gof = ((y - fun(x, *popt))**2).sum() res_i = np.append(popt, gof) res.loc[{'x': key_x, 'y': key_y}] = res_i else: raise ValueError('Only least-squares is implemented') return res