Source code for singlet.dataset.cluster

# vim: fdm=indent
# author:     Fabio Zanini
# date:       16/08/17
# content:    Dataset functions to cluster samples, features, and phenotypes.
# Modules
import numpy as np
import pandas as pd

from .plugins import Plugin


# Classes / functions
[docs]class Cluster(Plugin): '''Cluster samples, features, and phenotypes'''
[docs] def kmeans( self, n_clusters, axis, phenotypes=(), random_state=0): '''K-Means clustering. Args: n_clusters (int): The number of clusters you want. axis (string): It must be 'samples' or 'features'. The Dataset.counts matrix is used and either samples or features are clustered. phenotypes (iterable of strings): Phenotypes to add to the features for joint clustering. log_features (bool): Whether to add pseudocounts and take a log of the feature counts before calculating distances. random_state (int): Set to the same int for deterministic results. Returns: pd.Series with the labels of the clusters. ''' from sklearn.cluster import KMeans data = self.dataset.counts if phenotypes is not None: data = data.copy() for pheno in phenotypes: data.loc[pheno] = self.dataset.samplesheet.loc[:, pheno] if axis == 'samples': data = data.T elif axis == 'features': pass else: raise ValueError('axis must be "samples" or "features"') model = (KMeans(n_clusters=n_clusters, random_state=random_state) .fit(data.values)) labels = pd.Series(model.labels_, index=data.index, dtype='category') return labels
[docs] def dbscan( self, axis, phenotypes=(), **kwargs): '''Density-Based Spatial Clustering of Applications with Noise. Args: axis (string): It must be 'samples' or 'features'. The Dataset.counts matrix is used and either samples or features are clustered. phenotypes (iterable of strings): Phenotypes to add to the features for joint clustering. log_features (bool): Whether to add pseudocounts and take a log of the feature counts before calculating distances. **kwargs: arguments passed to sklearn.cluster.DBSCAN. Returns: pd.Series with the labels of the clusters. ''' from sklearn.cluster import DBSCAN data = self.dataset.counts if phenotypes is not None: data = data.copy() for pheno in phenotypes: data.loc[pheno] = self.dataset.samplesheet.loc[:, pheno] if axis == 'samples': data = data.T elif axis == 'features': pass else: raise ValueError('axis must be "samples" or "features"') model = DBSCAN(**kwargs).fit(data.values) labels = pd.Series(model.labels_, index=data.index, dtype='category') return labels
# NOTE: caching this one is tricky because it has non-kwargs AND it would # need a double cache, one for cells and one for features/phenotypes
[docs] def hierarchical( self, axis, phenotypes=(), metric='correlation', method='average', log_features=False, optimal_ordering=False): '''Hierarchical clustering. Args: axis (string): It must be 'samples' or 'features'. The Dataset.counts matrix is used and either samples or features are clustered. phenotypes (iterable of strings): Phenotypes to add to the features for joint clustering. metric (string or matrix): Metric to calculate the distance matrix. If it is a matrix already, use it as distance (squared). Else it should be a string accepted by scipy.spatial.distance.pdist. method (string): Clustering method. Must be a string accepted by scipy.cluster.hierarchy.linkage. log_features (bool): Whether to add pseudocounts and take a log of the feature counts before calculating distances. optimal_ordering (bool): Whether to resort the linkage so that nearest neighbours have shortest distance. This may take longer than the clustering itself. Returns: dict with the linkage, distance matrix, and ordering. ''' from scipy.spatial.distance import pdist, squareform from scipy.cluster.hierarchy import linkage, leaves_list, optimal_leaf_ordering data = self.dataset.counts if log_features: data = np.log10(self.dataset.counts.pseudocount + data) if phenotypes is not None: data = data.copy() for pheno in phenotypes: data.loc[pheno] = self.dataset.samplesheet.loc[:, pheno] if axis == 'samples': data = data.T elif axis == 'features': pass else: raise ValueError('axis must be "samples" or "features"') if isinstance(metric, str): Y = pdist(data.values, metric=metric) else: Y = np.asarray(metric) assert len(Y.shape) == 2 assert Y.shape[0] == Y.shape[1] assert Y.shape[0] == data.shape[0] Y = squareform(Y) # Some metrics (e.g. correlation) give nan whenever the matrix has no # variation, default this to zero distance (e.g. two features that are # both total dropouts. Y = np.nan_to_num(Y) Z = linkage(Y, method=method) if optimal_ordering: Z = optimal_leaf_ordering(Z, Y) ids = data.index[leaves_list(Z)] return { 'distance': Y, 'linkage': Z, 'leaves': ids, }
[docs] def affinity_propagation( self, axis, phenotypes=(), metric='correlation', log_features=False, ): '''Affinity/label/message propagation. Args: axis (string): It must be 'samples' or 'features'. The Dataset.counts matrix is used and either samples or features are clustered. phenotypes (iterable of strings): Phenotypes to add to the features for joint clustering. metric (string or matrix): Metric to calculate the distance matrix. If it is a matrix already, use it as distance (squared). Else it should be a string accepted by scipy.spatial.distance.pdist. log_features (bool): Whether to add pseudocounts and take a log of the feature counts before calculating distances. Returns: dict with the linkage, distance matrix, and ordering. ''' from scipy.spatial.distance import pdist, squareform from sklearn.cluster import AffinityPropagation data = self.dataset.counts if log_features: data = np.log10(self.dataset.counts.pseudocount + data) if phenotypes is not None: data = data.copy() for pheno in phenotypes: data.loc[pheno] = self.dataset.samplesheet.loc[:, pheno] if axis == 'samples': data = data.T elif axis == 'features': pass else: raise ValueError('axis must be "samples" or "features"') if isinstance(metric, str): Y = squareform(pdist(data.values, metric=metric)) else: Y = np.asarray(metric) assert len(Y.shape) == 2 assert Y.shape[0] == Y.shape[1] assert Y.shape[0] == data.shape[0] # Affinity is the opposite of distance Y = -Y model = AffinityPropagation( affinity='precomputed', ) model.fit(Y) labels = pd.Series(model.labels_, index=data.index, dtype='category') return labels