"""
PyROMA: Python implementation of Representation Of Module Activity for single-cell RNA-seq data.
This module implements the ROMA algorithm for quantifying gene set activity
in single-cell transcriptomics data using principal component analysis.
"""
import numpy as np
import pandas as pd
import scanpy as sc
import time
import os
import warnings
import pickle
from scipy import stats, sparse
from scipy.stats import wilcoxon, fisher_exact
from statsmodels.stats.multitest import multipletests
from scipy.stats import false_discovery_control as benj_hoch
from sklearn.decomposition import TruncatedSVD, PCA, IncrementalPCA
from sklearn.model_selection import LeaveOneOut
from joblib import Parallel, delayed
from functools import lru_cache
from typing import Dict, List, Optional, Tuple, Union, Any
from types import SimpleNamespace
import multiprocessing
from .sparse_methods import *
# Suppress warnings
warnings.filterwarnings("ignore")
# Color codes for terminal output
[docs]
class color:
BOLD = '\033[1m'
GREEN = '\033[92m'
PURPLE = '\033[95m'
DARKCYAN = '\033[36m'
END = '\033[0m'
[docs]
class GeneSetResult:
"""
Container for gene set analysis results.
Attributes
----------
subset : sc.AnnData
Subset of data for this gene set
subsetlist : np.ndarray
Array of gene names in the gene set
outliers : List[int]
Indices of outlier genes
nullgenesetsize : int
Size of null gene set for comparison
svd : Union[TruncatedSVD, PCA, IncrementalPCA]
Fitted decomposition object
X : np.ndarray
Processed data matrix
raw_X_subset : np.ndarray
Raw data subset for PC sign correction
projections_1 : np.ndarray
Projections onto first principal component
projections_2 : np.ndarray
Projections onto second principal component
nulll1 : np.ndarray
Null distribution of L1 values
null_median_exp : np.ndarray
Null distribution of median expression
null_projections : np.ndarray
Null projections for both components
q_value : float
Adjusted p-value for L1 statistic
med_exp_q_value : float
Adjusted p-value for median expression
non_adj_p : float
Raw p-value for L1 statistic
non_adj_med_exp_p : float
Raw p-value for median expression
test_l1 : float
L1 test statistic
test_median_exp : float
Median expression test statistic
"""
def __init__(self,
subset: sc.AnnData,
subsetlist: np.ndarray,
outliers: List[int],
nullgenesetsize: int,
svd: Union[TruncatedSVD, PCA, IncrementalPCA]) -> None:
"""Initialize GeneSetResult with basic attributes."""
self.subset = subset
self.subsetlist = subsetlist
self.outliers = outliers
self.nullgenesetsize = nullgenesetsize
self.svd = svd
self.X: Optional[np.ndarray] = None
self.raw_X_subset: Optional[np.ndarray] = None
self.projections_1: Optional[np.ndarray] = None
self.projections_2: Optional[np.ndarray] = None
self.nulll1: Optional[np.ndarray] = None
self.null_median_exp: Optional[np.ndarray] = None
self.null_projections: Optional[np.ndarray] = None
self.q_value: Optional[float] = None
self.med_exp_q_value: Optional[float] = None
self.non_adj_p: Optional[float] = None
self.non_adj_med_exp_p: Optional[float] = None
self.test_l1: Optional[float] = None
self.test_median_exp: Optional[float] = None
self.custom_name: str = ""
def __repr__(self) -> str:
return self.custom_name
def __str__(self) -> str:
return self.custom_name
[docs]
class ROMA:
"""
ROMA (Representation Of Module Activity) implementation for single-cell RNA-seq data.
This class implements the ROMA algorithm for quantifying gene set activity
in single-cell transcriptomics data using principal component analysis.
Attributes
----------
adata : Optional[sc.AnnData]
Annotated data matrix
gmt : Optional[str]
Path to GMT file containing gene sets
genesets : Dict[str, np.ndarray]
Dictionary mapping gene set names to gene arrays
results : Dict[str, GeneSetResult]
Results for each analyzed gene set
null_distributions : Dict[int, Tuple[np.ndarray, np.ndarray]]
Cached null distributions by gene set size
approx_int : int
Granularity of null gene set size approximation
min_n_genes : int
Minimum number of genes required in a gene set
q_L1_threshold : float
Q-value threshold for L1 significance
q_Med_Exp_threshold : float
Q-value threshold for median expression significance
"""
def __init__(self) -> None:
"""Initialize ROMA analyzer with default parameters."""
self.adata: Optional[sc.AnnData] = None
self.gmt: Optional[str] = None
self.genesets: Dict[str, np.ndarray] = {}
self.idx: List[str] = []
self.approx_int: int = 20
self.min_n_genes: int = 10
self.nullgenesetsize: Optional[int] = None
self.subset: Optional[sc.AnnData] = None
self.subsetlist: Optional[np.ndarray] = None
self.outliers: List[int] = []
self.loocv_scores: Dict[str, float] = {}
self.global_gene_counts: Dict[str, int] = {}
self.global_outlier_counts: Dict[str, int] = {}
self.svd: Optional[Union[TruncatedSVD, PCA, IncrementalPCA]] = None
self.X: Optional[np.ndarray] = None
self.raw_X_subset: Optional[np.ndarray] = None
self.nulll1: List[float] = []
self.test_l1: Optional[float] = None
self.p_value: Optional[float] = None
self.test_median_exp: Optional[float] = None
self.med_exp_p_value: Optional[float] = None
self.projections_1: Optional[np.ndarray] = None
self.projections_2: Optional[np.ndarray] = None
self.results: Dict[str, GeneSetResult] = {}
self.null_distributions: Dict[int, Tuple[np.ndarray, np.ndarray]] = {}
self.unprocessed_genesets: List[str] = []
# Multiprocessing manager
self.parallel_results = None
self._manager = None
# Display name
self.custom_name: str = f"{color.BOLD}{color.GREEN}scROMA{color.END}"
# Thresholds
self.q_L1_threshold: float = 0.05
self.q_Med_Exp_threshold: float = 0.05
# PC sign correction parameters
self.gene_weights: Dict[str, np.ndarray] = {}
self.pc_sign_mode: str = 'PreferActivation'
self.pc_sign_thr: float = 0.90
self.def_wei: float = 1.0
self.cor_method: str = 'pearson'
self.correct_pc_sign: int = 1
self.gene_signs: Dict[str, Dict[str, int]] = {}
self.extreme_percent: float = 0.1
# Plotting module
from .plotting import plotting as pl
self.pl = pl()
def __repr__(self) -> str:
return self.custom_name
def __str__(self) -> str:
return self.custom_name
def _get_manager(self):
"""Initialization of multiprocessing manager."""
if self._manager is None:
self._manager = multiprocessing.Manager()
self.parallel_results = self._manager.dict()
return self.parallel_results
[docs]
def read_gmt_to_dict(self, gmt_path: str) -> Dict[str, np.ndarray]:
"""
Read GMT file and convert to dictionary of gene sets.
Parameters
----------
gmt_path : str
Path to GMT format file
Returns
-------
genesets : Dict[str, np.ndarray]
Dictionary mapping gene set names to gene arrays
Raises
------
FileNotFoundError
If GMT file doesn't exist
ValueError
If GMT file format is invalid
"""
if not os.path.exists(gmt_path):
raise FileNotFoundError(f"GMT file not found: {gmt_path}")
genesets = {}
try:
with open(gmt_path, 'r') as file:
for line_num, line in enumerate(file, 1):
parts = line.rstrip('\n').split('\t')
if len(parts) < 3:
raise ValueError(f"Invalid GMT format at line {line_num}: "
f"expected at least 3 tab-separated fields")
name = parts[0]
# Skip description field (parts[1])
genes = [g for g in parts[2:] if g]
if name in genesets:
warnings.warn(f"Duplicate gene set name '{name}' found, "
f"overwriting previous definition")
genesets[name] = np.array(genes)
except Exception as e:
raise ValueError(f"Error reading GMT file: {e}")
self.genesets = genesets
self.gmt = gmt_path
return genesets
[docs]
def indexing(self, adata: sc.AnnData) -> None:
"""
Create index of unique gene names from AnnData object.
Parameters
----------
adata : sc.AnnData
Annotated data matrix
"""
self.idx = list(set(adata.var.index.tolist()))
[docs]
def subsetting(self, adata: sc.AnnData, geneset: np.ndarray, verbose: int = 0) -> Tuple[sc.AnnData, np.ndarray]:
"""
Create subset of data for specific gene set.
Parameters
----------
adata : sc.AnnData
Full annotated data matrix
geneset : np.ndarray
Array of gene names to subset
verbose : int, default=0
Verbosity level
Returns
-------
subset : sc.AnnData
Subset of data containing only genes in gene set
subsetlist : np.ndarray
Array of gene names that were found in the data
"""
if verbose:
print(' '.join(geneset))
if not self.idx:
raise ValueError('No adata index detected. Run indexing() first.')
# Efficient intersection using numpy
subsetlist = geneset[np.isin(geneset, self.idx)]
subset = adata[:, subsetlist]
self.subset = subset
self.subsetlist = subsetlist
return subset, subsetlist
[docs]
def double_mean_center_matrix(self, matrix: np.ndarray) -> np.ndarray:
"""
Double-center a matrix by removing row and column means.
Parameters
----------
matrix : np.ndarray
Input matrix to center
Returns
-------
centered_matrix : np.ndarray
Double-centered matrix
"""
overall_mean = np.mean(matrix)
row_means = np.mean(matrix, axis=1, keepdims=True)
col_means = np.mean(matrix, axis=0, keepdims=True)
centered_matrix = matrix - row_means - col_means + overall_mean
return centered_matrix
@lru_cache(maxsize=128)
def _get_null_distribution(self, size: int) -> Tuple[np.ndarray, np.ndarray]:
"""
Get or compute null distribution for given size with caching.
Parameters
----------
size : int
Size of gene set for null distribution
Returns
-------
null_l1 : np.ndarray
Null distribution of L1 values
null_median : np.ndarray
Null distribution of median expression values
"""
if size in self.null_distributions:
return self.null_distributions[size]
# If not cached, will be computed elsewhere
return None, None
[docs]
def loocv(self, subset: sc.AnnData, verbose: int = 0, for_randomset: bool = False) -> List[int]:
"""
Perform leave-one-out cross-validation for outlier detection.
Uses vectorized operations to efficiently compute L1 scores
with each gene removed.
Parameters
----------
subset : sc.AnnData
Subset of data for current gene set
verbose : int, default=0
Verbosity level
for_randomset : bool, default=False
Whether this is for random set computation
Returns
-------
outliers : List[int]
Indices of detected outlier genes
"""
#X = self._prepare_data(subset.X.T)
X = self.subset.X.T
n_features, n_samples = X.shape
X = np.asarray(X)
if n_samples < 2:
if verbose:
print(f"Cannot perform LOOCV with {n_samples} samples.")
return []
# Vectorized computation of L1 scores
l1scores = np.zeros(n_features)
svd = TruncatedSVD(n_components=1, algorithm='randomized', n_oversamples=2, random_state=42)
# Use efficient leave-one-out
loo = LeaveOneOut()
for i, (train_index, _) in enumerate(loo.split(X)):
svd.fit(X[train_index])
l1scores[i] = svd.explained_variance_ratio_[0]
# Store scores for each gene
loocv_scores = {gene: score for gene, score in zip(subset.var_names, l1scores)}
self.loocv_scores = loocv_scores
# Detect outliers using z-scores
if len(l1scores) > 1:
mean_score = np.mean(l1scores)
std_score = np.std(l1scores)
if std_score > 0:
z_scores = np.abs((l1scores - mean_score) / std_score)
outliers = np.where(z_scores > 3.0)[0].tolist()
else:
outliers = []
else:
outliers = []
if verbose:
print(f"Number of samples: {n_samples}, Number of features: {n_features}")
print(f"Number of outliers detected: {len(outliers)}")
return outliers
[docs]
def fisher_outlier_filter(self,
gene_outlier_counts: Dict[str, int],
gene_pathway_counts: Dict[str, int],
outlier_fisher_thr: float = 0.05,
min_gene_sets: int = 3) -> Dict[str, bool]:
"""
Determine outlier genes using Fisher's exact test.
Tests whether each gene's proportion of outlier calls is significantly
higher than the global average.
Parameters
----------
gene_outlier_counts : Dict[str, int]
Number of gene sets where each gene was flagged as outlier
gene_pathway_counts : Dict[str, int]
Total number of gene sets containing each gene
outlier_fisher_thr : float, default=0.05
P-value threshold for Fisher's test
min_gene_sets : int, default=3
Minimum gene sets required for outlier consideration
Returns
-------
gene_outlier_flags : Dict[str, bool]
Whether each gene is considered an outlier
"""
total_outliers = sum(gene_outlier_counts.values())
total_pathways = sum(gene_pathway_counts.values())
gene_outlier_flags = {}
for gene, pathways_count in gene_pathway_counts.items():
# Skip genes in too few gene sets
if pathways_count < min_gene_sets:
gene_outlier_flags[gene] = False
continue
gene_outlier = gene_outlier_counts.get(gene, 0)
# Build contingency table
table = [
[total_outliers - gene_outlier, gene_outlier],
[total_pathways - pathways_count, pathways_count]
]
# Perform Fisher's exact test
_, p_value = fisher_exact(table, alternative='greater')
gene_outlier_flags[gene] = (p_value < outlier_fisher_thr)
return gene_outlier_flags
[docs]
def limit_outliers_per_geneset(self,
gene_set: List[str],
gene_flags: Dict[str, bool],
loocv_scores: Dict[str, float],
max_outlier_prop: float = 0.5) -> Dict[str, bool]:
"""
Limit the proportion of genes flagged as outliers in a gene set.
Parameters
----------
gene_set : List[str]
Gene names in the gene set
gene_flags : Dict[str, bool]
Current outlier flags for genes
loocv_scores : Dict[str, float]
LOOCV scores for prioritizing outliers
max_outlier_prop : float, default=0.5
Maximum proportion of outliers allowed
Returns
-------
gene_flags : Dict[str, bool]
Updated outlier flags
"""
flagged_genes = [g for g in gene_set if gene_flags.get(g, False)]
allowed_num = int(max_outlier_prop * len(gene_set))
if len(flagged_genes) > allowed_num:
# Sort by LOOCV score extremity
flagged_sorted = sorted(flagged_genes,
key=lambda g: abs(loocv_scores.get(g, 0)),
reverse=True)
keep_flagged = set(flagged_sorted[:allowed_num])
# Update flags
for g in flagged_genes:
gene_flags[g] = (g in keep_flagged)
return gene_flags
def _prepare_data(self, X: Union[np.ndarray, sparse.spmatrix]) -> np.ndarray:
"""
Prepare data matrix for computation, handling sparse matrices efficiently.
If the data resembles bulk transcriptomics, i.e. big proportion of non-zero values,
then the data will used as dense matrix.
Parameters
----------
X : Union[np.ndarray, sparse.spmatrix]
Input data matrix
Returns
-------
X_prepared : np.ndarray
Prepared array (dense or sparse as appropriate)
"""
# TODO: if the matrix is dense, and single cell -> to sparse
if sparse.issparse(X):
# TODO: consider the size of the dense matrix relative to available memory,
# convert if sparse is more efficient
# Only convert to dense if necessary
density = X.nnz / (X.shape[0] * X.shape[1])
if density > 0.5: # More than 50% non-zero
return X.toarray()
else:
# Keep sparse for efficiency in downstream operations
# Note: sklearn TruncatedSVD handles sparse matrices
return X
return np.asarray(X)
[docs]
def robustTruncatedSVD(self,
adata: sc.AnnData,
subsetlist: np.ndarray,
outliers: List[int],
for_randomset: bool = False,
algorithm: str = 'randomized') -> Tuple[TruncatedSVD, np.ndarray]:
# Filter outliers to only include valid indices for current gene_subset
outliers_array = np.array(outliers)
valid_outliers = outliers_array[outliers_array < len(subsetlist)]
# Create boolean mask for genes (excluding outliers)
mask = np.ones(len(subsetlist), dtype=bool)
if len(valid_outliers) > 0:
mask[valid_outliers] = False
# Get the subset of genes excluding outliers
robust_gene_subset = subsetlist[mask]
if len(robust_gene_subset) < 2:
# Return zeros if too few genes remain
return None, None
# Extract data for robust gene subset
X = adata[:, robust_gene_subset].X#.copy()
if hasattr(X, 'toarray'):
X = X.toarray()
X = X.T # Transpose to have genes as rows, samples as columns
# Center the data
#X = X - X.mean(axis=1, keepdims=True)
# Perform SVD
try:
if algorithm == 'arpack' and min(X.shape) > 2:
svd_ = TruncatedSVD(n_components=2, algorithm='arpack')
else:
svd_ = TruncatedSVD(n_components=2, algorithm='randomized')
svd_.fit(X)
except Exception as e:
print(f"SVD failed: {e}")
return None, None
if not for_randomset:
self.svd = svd_
self.X = X
return svd_, X
[docs]
def robustPCA(self,
adata: sc.AnnData,
subsetlist: np.ndarray,
outliers: List[int],
for_randomset: bool = False,
algorithm: str = 'auto') -> Tuple[PCA, np.ndarray]:
"""
Perform robust PCA excluding outlier genes.
Parameters
----------
adata : sc.AnnData
Annotated data matrix
subsetlist : np.ndarray
Gene names to include
outliers : List[int]
Indices of outlier genes to exclude
for_randomset : bool, default=False
Whether this is for random set computation
algorithm : str, default='auto'
PCA algorithm
Returns
-------
pca : PCA
Fitted PCA object
X : np.ndarray
Processed data matrix
"""
mask = np.ones(len(subsetlist), dtype=bool)
mask[outliers] = False
subset_genes = subsetlist[mask]
subset_adata = adata[:, subset_genes]
X = self._prepare_data(subset_adata.X.T)
pca = PCA(n_components=2, svd_solver=algorithm, random_state=42)
pca.fit(X)
if not for_randomset:
self.svd = pca
self.X = X
return pca, X
[docs]
def robustIncrementalPCA(self,
adata: sc.AnnData,
subsetlist: np.ndarray,
outliers: List[int],
for_randomset: bool = False,
partial_fit: bool = False,
batch_size: int = 1000) -> Tuple[IncrementalPCA, np.ndarray]:
"""
Perform robust incremental PCA excluding outlier genes.
Parameters
----------
adata : sc.AnnData
Annotated data matrix
subsetlist : np.ndarray
Gene names to include
outliers : List[int]
Indices of outlier genes to exclude
for_randomset : bool, default=False
Whether this is for random set computation
partial_fit : bool, default=False
Whether to use partial_fit
batch_size : int, default=1000
Batch size for incremental PCA
Returns
-------
ipca : IncrementalPCA
Fitted IncrementalPCA object
X : np.ndarray
Processed data matrix
"""
mask = np.ones(len(subsetlist), dtype=bool)
mask[outliers] = False
subset_genes = subsetlist[mask]
subset_adata = adata[:, subset_genes]
X = self._prepare_data(subset_adata.X.T)
ipca = IncrementalPCA(n_components=2, batch_size=batch_size)
if partial_fit:
ipca.partial_fit(X)
else:
ipca.fit(X)
if not for_randomset:
self.svd = ipca
self.X = X
return ipca, X
[docs]
def fix_pc_sign(self,
GeneScore: np.ndarray,
SampleScore: np.ndarray,
Wei: Optional[np.ndarray] = None,
Mode: str = 'PreferActivation',
DefWei: float = 1.0,
Thr: Optional[float] = None,
Grouping: Optional[Dict[str, str]] = None,
ExpMat: Optional[np.ndarray] = None,
CorMethod: str = "pearson",
gene_set_name: Optional[str] = None) -> int:
"""
Determine the correct orientation of principal component.
Parameters
----------
GeneScore : np.ndarray
Gene loadings on PC
SampleScore : np.ndarray
Sample projections on PC
Wei : Optional[np.ndarray], default=None
Gene weights
Mode : str, default='PreferActivation'
Method for PC orientation
DefWei : float, default=1.0
Default weight for missing values
Thr : Optional[float], default=None
Threshold for various methods
Grouping : Optional[Dict[str, str]], default=None
Sample grouping information
ExpMat : Optional[np.ndarray], default=None
Expression matrix
CorMethod : str, default='pearson'
Correlation method
gene_set_name : Optional[str], default=None
Name of current gene set
Returns
-------
sign : int
+1 or -1 indicating PC orientation
"""
from scipy.stats import pearsonr, spearmanr, kendalltau
# Helper functions
def apply_threshold_to_genescore(gscore: np.ndarray, thr: float) -> np.ndarray:
q_val = np.quantile(np.abs(gscore), thr)
return np.abs(gscore) >= q_val
def correlation_function(method: str):
if method == 'pearson':
return pearsonr
elif method == 'spearman':
return spearmanr
elif method == 'kendall':
return kendalltau
else:
raise ValueError(f"Invalid CorMethod: {method}")
def cor_test(x: np.ndarray, y: np.ndarray, method: str, thr: Optional[float]):
func = correlation_function(method)
corr, pval = func(x, y)
if thr is None:
return (np.nan, corr)
else:
return (pval, corr)
# Ensure Wei is numpy array
if Wei is not None:
Wei = np.asarray(Wei, dtype=float)
else:
Wei = np.full_like(GeneScore, np.nan)
# Handle different modes
if Mode == 'none':
return 1
elif Mode == 'PreferActivation':
ToUse = np.full(len(GeneScore), True, dtype=bool)
if Thr is not None:
ToUse = apply_threshold_to_genescore(GeneScore, Thr)
return -1 if np.sum(GeneScore[ToUse]) < 0 else 1
elif Mode == 'UseAllWeights':
Wei = np.where(np.isnan(Wei), DefWei, Wei)
Mode = 'UseKnownWeights'
if Mode == 'UseKnownWeights':
Wei = np.where(np.isnan(Wei), 0, Wei)
ToUse = np.full(len(GeneScore), True)
if Thr is not None:
ToUse = apply_threshold_to_genescore(GeneScore, Thr)
mask = (~np.isnan(Wei)) & ToUse
if np.sum(mask) < 1:
return 1
return -1 if np.sum(Wei[mask] * GeneScore[mask]) < 0 else 1
elif Mode == 'UseMeanExpressionAllWeights':
Wei = np.where(np.isnan(Wei), DefWei, Wei)
Mode = 'UseMeanExpressionKnownWeights'
if Mode == 'UseMeanExpressionKnownWeights':
if np.sum(~np.isnan(Wei)) < 1 or ExpMat is None:
return 1
ToUse = np.full(len(GeneScore), True)
if Thr is not None:
q_thr_high = np.quantile(GeneScore, Thr)
q_thr_low = np.quantile(GeneScore, 1 - Thr)
ToUse = (GeneScore >= max(q_thr_high, 0)) | (GeneScore <= min(0, q_thr_low))
nbUsed = np.sum(ToUse)
if nbUsed < 2:
if nbUsed == 1:
median_per_gene = np.median(ExpMat, axis=1)
centered_median = median_per_gene - np.mean(median_per_gene)
val = np.sum(GeneScore[ToUse] * Wei[ToUse] * centered_median[ToUse])
return -1 if val <= 0 else 1
else:
return 1
subset_mat = ExpMat[ToUse, :]
row_medians = np.median(subset_mat, axis=1)
centered_medians = row_medians - np.mean(row_medians)
val = np.sum(GeneScore[ToUse] * Wei[ToUse] * centered_medians)
return -1 if val <= 0 else 1
elif Mode == 'UseExtremeWeights':
if ExpMat is None:
return 1
if Thr is None:
ToUse = np.full(len(SampleScore), True, dtype=bool)
else:
cutoff = np.quantile(np.abs(SampleScore), 1 - Thr)
ToUse = (np.abs(SampleScore) >= cutoff)
gene_means = np.mean(ExpMat, axis=1)
sum_val = np.sum(SampleScore[ToUse] * gene_means[ToUse])
return -1 if sum_val <= 0 else 1
# Default
return 1
[docs]
def orient_pc1(self,
pc1: np.ndarray,
X: np.ndarray,
raw_X_subset: np.ndarray,
gene_set_name: Optional[str] = None) -> int:
"""
Orient first principal component according to configured method.
Parameters
----------
pc1 : np.ndarray
First principal component
X : np.ndarray
Processed data matrix
raw_X_subset : np.ndarray
Raw data subset for orientation
gene_set_name : Optional[str], default=None
Name of current gene set
Returns
-------
correct_sign : int
+1 or -1 for PC orientation
"""
sample_score = pc1
gene_score = X @ pc1
# Get gene weights if available
wei = self.gene_weights.get(gene_set_name, None)
# Ensure raw_X_subset is dense array
if sparse.issparse(raw_X_subset):
raw_X_subset = raw_X_subset.toarray()
correct_sign = self.fix_pc_sign(
GeneScore=gene_score,
SampleScore=sample_score,
Wei=wei,
DefWei=self.def_wei,
Mode=self.pc_sign_mode,
Thr=self.pc_sign_thr,
Grouping=None,
ExpMat=raw_X_subset,
CorMethod=self.cor_method,
gene_set_name=gene_set_name
)
return correct_sign
[docs]
def process_iteration(self,
sequence: np.ndarray,
idx: np.ndarray,
iteration: int,
incremental: bool,
partial_fit: bool,
algorithm: str) -> Tuple[float, float, np.ndarray, np.ndarray]:
"""
Process single iteration for null distribution computation.
Parameters
----------
sequence : np.ndarray
Sequence of indices to sample from
idx : np.ndarray
Gene indices
iteration : int
Current iteration number
incremental : bool
Whether to use incremental PCA
partial_fit : bool
Whether to use partial fit
algorithm : str
SVD algorithm
Returns
-------
l1 : float
L1 value for this iteration
median_exp : float
Median expression for this iteration
null_projections_1 : np.ndarray
Projections on PC1
null_projections_2 : np.ndarray
Projections on PC2
"""
# Set random seed for reproducibility within parallel processes
np.random.seed(iteration)
subset = np.random.choice(sequence, self.nullgenesetsize, replace=False)
gene_subset = np.array([x for i, x in enumerate(idx) if i in subset])
outliers = self.loocv(self.adata[:, gene_subset], for_randomset=True)
if incremental:
svd_, X = self.robustIncrementalPCA(self.adata, gene_subset, outliers,
for_randomset=True, partial_fit=partial_fit)
else:
svd_, X = self.robustTruncatedSVD(self.adata, gene_subset, outliers,
for_randomset=True, algorithm=algorithm)
l1 = svd_.explained_variance_ratio_[0]
# Get raw subset for median computation
subsetlist = [x for i, x in enumerate(gene_subset) if i not in outliers]
raw_X_subset = self.adata.raw[:, subsetlist].X.T
median_exp, null_projections_1, null_projections_2 = self.compute_median_exp(
svd_, X, raw_X_subset
)
return l1, median_exp, null_projections_1, null_projections_2
[docs]
def randomset_parallel(self,
subsetlist: np.ndarray,
outliers: List[int],
verbose: int = 0,
prefer_type: str = 'processes',
incremental: bool = False,
iters: int = 100,
partial_fit: bool = False,
algorithm: str = 'randomized') -> None:
"""
Compute null distributions using parallel processing.
Parameters
----------
subsetlist : np.ndarray
Array of gene names in current set
outliers : List[int]
Indices of outlier genes
verbose : int, default=0
Verbosity level
prefer_type : str, default='processes'
Joblib backend preference
incremental : bool, default=False
Whether to use incremental PCA
iters : int, default=100
Number of iterations
partial_fit : bool, default=False
Whether to use partial fit
algorithm : str, default='randomized'
SVD algorithm
"""
start = time.time()
candidate_nullgeneset_size = self.nullgenesetsize
# Check cache
if candidate_nullgeneset_size in self.null_distributions:
self.nulll1, self.null_median_exp = self.null_distributions[candidate_nullgeneset_size]
if verbose:
print('Using existing null distribution')
return
# Setup for parallel processing
sequence = np.arange(self.adata.shape[1])
idx = self.adata.var.index.to_numpy()
# Determine optimal number of jobs
n_jobs = min(multiprocessing.cpu_count(), iters)
# Run parallel iterations with batch processing for large iteration counts
if iters > 1000:
# Process in batches to avoid memory issues
batch_size = 100
n_batches = iters // batch_size
all_results = []
for batch in range(n_batches):
batch_start = batch * batch_size
batch_end = min((batch + 1) * batch_size, iters)
batch_results = Parallel(n_jobs=n_jobs, prefer=prefer_type)(
delayed(self.process_iteration)(
sequence, idx, iteration, incremental, partial_fit, algorithm
) for iteration in range(batch_start, batch_end)
)
all_results.extend(batch_results)
# Clear memory periodically
if batch % 10 == 0:
import gc
gc.collect()
results = all_results
else:
# Standard parallel processing for smaller iteration counts
results = Parallel(n_jobs=n_jobs, prefer=prefer_type)(
delayed(self.process_iteration)(
sequence, idx, iteration, incremental, partial_fit, algorithm
) for iteration in range(iters)
)
# Unpack and convert results
nulll1, null_median_exp, null_proj1, null_proj2 = zip(*results)
nulll1_array = np.array(nulll1)
null_median_exp_array = np.array(null_median_exp)
# Store in cache
self.null_distributions[candidate_nullgeneset_size] = [
nulll1_array.reshape(-1, 1), # Ensure 2D array
null_median_exp_array
]
self.nulll1 = nulll1_array.reshape(-1, 1)
self.null_median_exp = null_median_exp_array
if verbose:
elapsed = time.time() - start
minutes, seconds = divmod(elapsed, 60)
print(f"Running time: {int(minutes):02}:{seconds:05.2f}")
[docs]
def get_raw_p_values(self, gene_set_name: Optional[str] = None) -> None:
"""
Calculate raw p-values from null distributions.
Parameters
----------
gene_set_name : Optional[str], default=None
Name of current gene set
"""
# When parallel=False, compute a simple p-value
if not hasattr(self, 'nulll1') or self.nulll1 is None or len(self.nulll1) == 0:
# No null distribution available, use default values
test_l1 = self.svd.explained_variance_ratio_[0]
self.test_l1 = test_l1
self.p_value = 0.5 # Default p-value when no null distribution
# Compute median expression
test_median_exp, projections_1, projections_2 = self.compute_median_exp(
self.svd, self.X, self.raw_X_subset, gene_set_name
)
self.test_median_exp = test_median_exp
self.med_exp_p_value = 0.5 # Default p-value
self.projections_1 = projections_1
self.projections_2 = projections_2
return
# Normal case with null distribution
null_distribution = np.array(self.nulll1)
if null_distribution.ndim == 2:
null_distribution = null_distribution[:, 0]
null_median_distribution = np.array(self.null_median_exp)
# L1 statistics
test_l1 = self.svd.explained_variance_ratio_[0]
p_value = np.mean(null_distribution >= test_l1)
self.test_l1 = test_l1
self.p_value = p_value
# Median expression statistics
test_median_exp, projections_1, projections_2 = self.compute_median_exp(
self.svd, self.X, self.raw_X_subset, gene_set_name
)
# Empirical p-value
med_exp_p_value = (np.sum(np.abs(null_median_distribution) >= np.abs(test_median_exp)) + 1) / (len(null_median_distribution) + 1)
self.test_median_exp = test_median_exp
self.med_exp_p_value = med_exp_p_value
self.projections_1 = projections_1
self.projections_2 = projections_2
# Clear temporary data
self.nulll1 = None
self.null_median_exp = None
self.raw_X_subset = None
[docs]
def assess_significance(self, results: Dict[str, GeneSetResult]) -> Dict[str, GeneSetResult]:
"""
Compute adjusted p-values using Benjamini-Hochberg procedure.
Parameters
----------
results : Dict[str, GeneSetResult]
Results for each gene set
Returns
-------
results : Dict[str, GeneSetResult]
Results with adjusted p-values
"""
n_tests = len(results)
ps = np.zeros(n_tests)
med_ps = np.zeros(n_tests)
# Collect p-values
for i, (_, gene_set_result) in enumerate(results.items()):
ps[i] = gene_set_result.non_adj_p
med_ps[i] = gene_set_result.non_adj_med_exp_p
# Apply Benjamini-Hochberg correction
qs = benj_hoch(ps)
med_exp_qs = benj_hoch(med_ps)
# Update results
for i, (_, gene_set_result) in enumerate(results.items()):
gene_set_result.q_value = qs[i]
gene_set_result.med_exp_q_value = med_exp_qs[i]
return results
[docs]
def approx_size(self, flag: bool) -> None:
"""
Determine appropriate null gene set size with approximation.
Parameters
----------
flag : bool
Whether this is the first gene set
"""
candidate_size = sum(1 for i in range(len(self.subsetlist)) if i not in self.outliers)
if flag:
self.nullgenesetsize = candidate_size
else:
# Check if we can reuse existing null distribution
for cached_size in self.null_distributions:
if abs(cached_size - candidate_size) <= self.approx_int:
self.nullgenesetsize = cached_size
return
self.nullgenesetsize = candidate_size
[docs]
def select_and_sort_gene_sets(self, selected_geneset_names: List[str]) -> List[str]:
"""
Sort gene sets by size for efficient processing.
Parameters
----------
selected_geneset_names : List[str]
Names of gene sets to process
Returns
-------
sorted_names : List[str]
Gene set names sorted by size
"""
selected_gene_sets = {
name: genes for name, genes in self.genesets.items()
if name in selected_geneset_names
}
sorted_gene_sets = sorted(selected_gene_sets.items(), key=lambda x: len(x[1]))
return [name for name, _ in sorted_gene_sets]
[docs]
def p_values_in_frame(self, assessed_results: Dict[str, GeneSetResult]) -> pd.DataFrame:
"""
Create DataFrame with all p-values and statistics.
Parameters
----------
assessed_results : Dict[str, GeneSetResult]
Results with computed p-values
Returns
-------
df : pd.DataFrame
DataFrame with all statistics
"""
data = {
'L1': {},
'ppv L1': {},
'Median Exp': {},
'ppv Med Exp': {},
'q L1': {},
'q Med Exp': {}
}
for name, result in assessed_results.items():
data['L1'][name] = result.test_l1
data['ppv L1'][name] = result.non_adj_p
data['Median Exp'][name] = result.test_median_exp
data['ppv Med Exp'][name] = result.non_adj_med_exp_p
data['q L1'][name] = result.q_value
data['q Med Exp'][name] = result.med_exp_q_value
return pd.DataFrame(data)
[docs]
def compute(self,
selected_gene_sets: Union[List[str], str] = 'all',
parallel: bool = True,
incremental: bool = False,
iters: int = 100,
partial_fit: bool = False,
algorithm: str = 'randomized',
loocv_on: bool = True,
double_mean_centering: bool = False,
outlier_fisher_thr: float = 0.05,
min_gene_sets: int = 3,
max_outlier_prop: float = 0.5,
verbose: int = 0) -> None:
"""
Compute ROMA module activity scores for selected gene sets.
Parameters
----------
selected_gene_sets : Union[List[str], str], default='all'
List of gene set names to analyze, or 'all' for all gene sets
parallel : bool, default=False
Whether to use parallel processing for null distribution computation
incremental : bool, default=False
Whether to use incremental PCA instead of truncated SVD
iters : int, default=100
Number of iterations for null distribution generation
partial_fit : bool, default=False
Whether to use partial_fit for incremental PCA
algorithm : str, default='randomized'
Algorithm for SVD computation ('randomized' or 'arpack')
loocv_on : bool, default=True
Whether to perform leave-one-out cross-validation for outlier detection
double_mean_centering : bool, default=False
Whether to center data across both samples and genes
outlier_fisher_thr : float, default=0.05
P-value threshold for Fisher's exact test in outlier detection
min_gene_sets : int, default=3
Minimum number of gene sets a gene must appear in to be considered for outlier removal
max_outlier_prop : float, default=0.5
Maximum proportion of genes that can be removed as outliers per gene set
verbose : int, default=0
Verbosity level
Raises
------
ValueError
If no gene sets are loaded or if data is not properly initialized
"""
if self.adata is None:
raise ValueError("No data loaded. Set adata attribute first.")
# Load gene sets from gmt
self.read_gmt_to_dict(self.gmt)
if not self.genesets:
raise ValueError("No gene sets loaded. Run read_gmt_to_dict() first.")
results = {}
# Store raw data efficiently
if not hasattr(self.adata, 'raw') or self.adata.raw is None:
self.adata.raw = self.adata
# Prepare data with minimal copying
X = self.adata.X.T if not self.adata.is_view else self.adata.X.T#.copy()
# Convert sparse to dense if needed
#X = self._prepare_data(X)
#if sparse.issparse(X):
# X = X.toarray()
if not isinstance(X, np.ndarray):
X = X.toarray()
# centering the sparse matrix would give a dense one
# Center data
if double_mean_centering:
X_centered = self.double_mean_center_matrix(X)
else:
#if sparse.issparse(X):
# replicates the centering operation below for sparse matrices
#X_centered = sparse_centering_operator(X)
#else:
# Center over samples (genes have 0 mean)
X_centered = X - np.mean(X, axis=1, keepdims=True)
X_centered = X_centered - np.mean(X_centered, axis=0, keepdims=True)
self.adata.X = X_centered.T
# Index genes
self.indexing(self.adata)
# Handle gene set selection
if selected_gene_sets == 'all':
selected_gene_sets = list(self.genesets.keys())
# Sort gene sets by size for efficiency
sorted_gene_sets = self.select_and_sort_gene_sets(selected_gene_sets)
# Process gene sets
flag = True
for gene_set_name in sorted_gene_sets:
print(f'Processing gene set: {color.BOLD}{color.DARKCYAN}{gene_set_name}{color.END}', end=' | ')
# Subset data
self.subsetting(self.adata, self.genesets[gene_set_name])
print(f'len of subsetlist: {color.BOLD}{len(self.subsetlist)}{color.END}', end=' ')
if len(self.subsetlist) < self.min_n_genes:
self.unprocessed_genesets.append(gene_set_name)
print(" | smaller than min n genes for geneset |")
continue
else:
print()
# Outlier detection
if loocv_on:
self.loocv(self.subset, verbose=verbose)
if len(self.outliers) > 0:
print(f"Outliers: {self.outliers}, Gene: {self.subsetlist[self.outliers[0]]}")
# Update global counts for Fisher test
for gene in self.subsetlist:
self.global_gene_counts[gene] = self.global_gene_counts.get(gene, 0) + 1
for outlier_idx in self.outliers:
gene_name = self.subsetlist[outlier_idx]
self.global_outlier_counts[gene_name] = self.global_outlier_counts.get(gene_name, 0) + 1
# Apply Fisher test
gene_flags = self.fisher_outlier_filter(
self.global_outlier_counts,
self.global_gene_counts,
outlier_fisher_thr=outlier_fisher_thr,
min_gene_sets=min_gene_sets
)
# Limit outliers per gene set
gene_flags = self.limit_outliers_per_geneset(
self.subsetlist,
gene_flags,
self.loocv_scores,
max_outlier_prop=max_outlier_prop
)
# Update outliers list
self.outliers = [i for i, gene in enumerate(self.subsetlist) if gene_flags.get(gene, False)]
# Determine null gene set size
self.approx_size(flag)
flag = False
# Perform decomposition
if incremental:
self.robustIncrementalPCA(self.adata, self.subsetlist, self.outliers)
else:
self.robustTruncatedSVD(self.adata, self.subsetlist, self.outliers, algorithm=algorithm)
# Get raw data for PC sign correction
subsetlist_no_out = [x for i, x in enumerate(self.subsetlist) if i not in self.outliers]
self.raw_X_subset = self.adata.raw[:, subsetlist_no_out].X.T#.copy()
# Compute null distributions if parallel
if parallel:
self.randomset_parallel(
self.subsetlist,
self.outliers,
verbose=verbose,
prefer_type='processes',
incremental=incremental,
iters=iters,
partial_fit=partial_fit,
algorithm=algorithm
)
# Calculate p-values
self.get_raw_p_values(gene_set_name)
# Store results
gene_set_result = GeneSetResult(
self.subset,
self.subsetlist,
self.outliers,
self.nullgenesetsize,
self.svd
)
gene_set_result.custom_name = f"GeneSetResult {gene_set_name}"
gene_set_result.test_l1 = self.test_l1
gene_set_result.non_adj_p = self.p_value
gene_set_result.non_adj_med_exp_p = self.med_exp_p_value
gene_set_result.test_median_exp = self.test_median_exp
gene_set_result.projections_1 = self.projections_1
gene_set_result.projections_2 = self.projections_2
results[gene_set_name] = gene_set_result
# Assess significance with multiple testing correction
assessed_results = self.assess_significance(results)
# Store results
self.adata.uns['ROMA'] = assessed_results
self.adata.uns['ROMA_stats'] = self.p_values_in_frame(assessed_results)
self.select_active_modules(self.q_L1_threshold, self.q_Med_Exp_threshold)
# Update status
self.custom_name = f"{color.BOLD}{color.GREEN}scROMA{color.END}: module activities are computed"
print(f"{color.BOLD}{color.PURPLE}Finished{color.END}", end=': ')
# Update plotting module
self.pl.adata = self.adata
[docs]
def compute_batch(self,
selected_gene_sets: List[str],
batch_size: int = 50,
**kwargs) -> None:
"""
Process gene sets in batches for memory efficiency.
Parameters
----------
selected_gene_sets : List[str]
Gene sets to analyze
batch_size : int, default=50
Number of gene sets per batch
**kwargs
Additional arguments passed to compute()
"""
n_batches = (len(selected_gene_sets) + batch_size - 1) // batch_size
for i in range(n_batches):
start = i * batch_size
end = min((i + 1) * batch_size, len(selected_gene_sets))
batch = selected_gene_sets[start:end]
print(f"Processing batch {i+1}/{n_batches} ({len(batch)} gene sets)")
self.compute(batch, **kwargs)
# Clear memory
import gc
gc.collect()
[docs]
def select_active_modules(self,
q_L1_threshold: float = 0.05,
q_Med_Exp_threshold: float = 0.05) -> None:
"""
Select significantly active modules based on q-value thresholds.
Parameters
----------
q_L1_threshold : float, default=0.05
Q-value threshold for L1 significance
q_Med_Exp_threshold : float, default=0.05
Q-value threshold for median expression significance
"""
df = self.adata.uns['ROMA_stats']
active_modules = df[(df['q L1'] <= q_L1_threshold) | (df['q Med Exp'] <= q_Med_Exp_threshold)]
self.adata.uns['ROMA_active_modules'] = active_modules
[docs]
def save_active_modules_results(self,
path: str,
only_active: bool = True,
save_adata: bool = True) -> None:
"""
Save ROMA results to disk.
Parameters
----------
path : str
Output directory path
only_active : bool, default=True
Whether to save only active modules
save_adata : bool, default=True
Whether to save the AnnData object
"""
os.makedirs(path, exist_ok=True)
active_modules = self.adata.uns['ROMA_active_modules'].index
if only_active:
selected_dict = {k: v for k, v in self.adata.uns['ROMA'].items() if k in active_modules}
else:
selected_dict = self.adata.uns['ROMA']
# Define attributes to save
attributes = {
"subsetlist": "numpy.ndarray",
"outliers": "list",
"projections_1": "numpy.ndarray",
"projections_2": "numpy.ndarray",
"svd.components_": "numpy.ndarray"
}
# Save each gene set result
for key, gene_set_result in selected_dict.items():
key_dir = os.path.join(path, key)
os.makedirs(key_dir, exist_ok=True)
for attr, attr_type in attributes.items():
if "." in attr:
parts = attr.split(".")
attr_value = getattr(gene_set_result, parts[0], None)
if attr_value is not None:
attr_value = getattr(attr_value, parts[1], None)
else:
attr_value = getattr(gene_set_result, attr, None)
if attr_value is not None:
file_path = os.path.join(key_dir, attr)
if attr_type == "numpy.ndarray":
np.save(file_path, attr_value)
elif attr_type == "list":
with open(file_path, "wb") as f:
pickle.dump(attr_value, f)
# Save DataFrames
self.adata.uns['ROMA_stats'].to_csv(os.path.join(path, "ROMA_stats.csv"))
self.adata.uns['ROMA_active_modules'].to_csv(os.path.join(path, "ROMA_active_modules.csv"))
# Save AnnData if requested
if save_adata:
del self.adata.uns['ROMA'] # Remove large dictionary before saving
self.adata.write(os.path.join(path, "roma_adata.h5ad"))
[docs]
def load_active_modules_results(self, path: str) -> None:
"""
Load previously saved ROMA results.
Parameters
----------
path : str
Path to saved results directory
Raises
------
FileNotFoundError
If results directory doesn't exist
"""
if not os.path.exists(path):
raise FileNotFoundError(f"Results directory not found: {path}")
loaded_dict = {}
# Load each gene set result
for key in os.listdir(path):
key_dir = os.path.join(path, key)
if os.path.isdir(key_dir):
gene_set = GeneSetResult(None, None, None, None, None)
gene_set.custom_name = key
for file in os.listdir(key_dir):
file_path = os.path.join(key_dir, file)
attr_name = file.replace(".npy", "")
if attr_name == "subsetlist":
gene_set.subsetlist = np.load(file_path, allow_pickle=True)
elif attr_name == "outliers":
with open(file_path, "rb") as f:
gene_set.outliers = pickle.load(f)
elif attr_name == "projections_1":
gene_set.projections_1 = np.load(file_path, allow_pickle=True)
elif attr_name == "projections_2":
gene_set.projections_2 = np.load(file_path, allow_pickle=True)
elif attr_name == "svd.components_":
components = np.load(file_path, allow_pickle=True)
gene_set.svd = SimpleNamespace(components_=components)
loaded_dict[key] = gene_set
# Load AnnData if not already loaded
if self.adata is None:
adata_path = os.path.join(path, "roma_adata.h5ad")
if os.path.exists(adata_path):
self.adata = sc.read_h5ad(adata_path)
else:
raise FileNotFoundError(f"AnnData file not found: {adata_path}")
# Load DataFrames
self.adata.uns['ROMA'] = loaded_dict
self.adata.uns['ROMA_stats'] = pd.read_csv(os.path.join(path, "ROMA_stats.csv"), index_col=0)
self.adata.uns['ROMA_active_modules'] = pd.read_csv(os.path.join(path, "ROMA_active_modules.csv"), index_col=0)
# Update plotting module
self.pl.adata = self.adata
[docs]
def get_module_scores(self, module_name: str) -> Optional[np.ndarray]:
"""
Get activity scores for a specific module across all samples.
Parameters
----------
module_name : str
Name of the gene set/module
Returns
-------
scores : Optional[np.ndarray]
Activity scores across samples, or None if module not found
"""
if 'ROMA' not in self.adata.uns:
raise ValueError("No ROMA results found. Run compute() first.")
if module_name not in self.adata.uns['ROMA']:
warnings.warn(f"Module '{module_name}' not found in results.")
return None
result = self.adata.uns['ROMA'][module_name]
# Compute sample scores from SVD
if hasattr(result, 'svd') and result.svd is not None:
# Sample scores are the projection of samples onto PC1
pc1 = result.svd.components_[0]
sample_scores = self.adata.X @ result.projections_1
return sample_scores
return None
[docs]
def get_top_genes(self,
module_name: str,
n_genes: int = 10,
absolute: bool = True) -> Optional[pd.DataFrame]:
"""
Get top contributing genes for a module.
Parameters
----------
module_name : str
Name of the gene set/module
n_genes : int, default=10
Number of top genes to return
absolute : bool, default=True
Whether to sort by absolute weight values
Returns
-------
top_genes : Optional[pd.DataFrame]
DataFrame with gene names and weights
"""
if 'ROMA' not in self.adata.uns:
raise ValueError("No ROMA results found. Run compute() first.")
if module_name not in self.adata.uns['ROMA']:
warnings.warn(f"Module '{module_name}' not found in results.")
return None
result = self.adata.uns['ROMA'][module_name]
if result.projections_1 is None or result.subsetlist is None:
return None
# Create DataFrame with genes and their weights
weights = result.projections_1 * self.correct_pc_sign
df = pd.DataFrame({
'gene': result.subsetlist,
'weight': weights,
'abs_weight': np.abs(weights)
})
# Sort and get top genes
if absolute:
df = df.nlargest(n_genes, 'abs_weight')
else:
df = df.nlargest(n_genes, 'weight')
return df[['gene', 'weight']]
[docs]
def export_results(self,
output_file: str,
format: str = 'csv',
include_projections: bool = False) -> None:
"""
Export ROMA results to various formats.
Parameters
----------
output_file : str
Output file path
format : str, default='csv'
Output format ('csv', 'excel', 'h5')
include_projections : bool, default=False
Whether to include gene projections
"""
if 'ROMA_stats' not in self.adata.uns:
raise ValueError("No ROMA results found. Run compute() first.")
stats_df = self.adata.uns['ROMA_stats']
if format == 'csv':
stats_df.to_csv(output_file)
elif format == 'excel':
with pd.ExcelWriter(output_file) as writer:
stats_df.to_excel(writer, sheet_name='Module_Statistics')
if 'ROMA_active_modules' in self.adata.uns:
self.adata.uns['ROMA_active_modules'].to_excel(
writer, sheet_name='Active_Modules'
)
if include_projections:
for module_name, result in self.adata.uns['ROMA'].items():
if result.projections_1 is not None:
proj_df = pd.DataFrame({
'gene': result.subsetlist,
'PC1': result.projections_1,
'PC2': result.projections_2
})
# Excel sheet names have length limit
sheet_name = f"Proj_{module_name[:25]}"
proj_df.to_excel(writer, sheet_name=sheet_name, index=False)
elif format == 'h5':
stats_df.to_hdf(output_file, key='module_stats', mode='w')
if 'ROMA_active_modules' in self.adata.uns:
self.adata.uns['ROMA_active_modules'].to_hdf(
output_file, key='active_modules', mode='a'
)
else:
raise ValueError(f"Unsupported format: {format}")
[docs]
def compare_conditions(self,
condition_key: str,
module_name: str,
test: str = 'wilcoxon') -> Dict[str, Any]:
"""
Compare module activity between conditions.
Parameters
----------
condition_key : str
Key in adata.obs containing condition labels
module_name : str
Name of module to compare
test : str, default='wilcoxon'
Statistical test to use ('wilcoxon', 't-test', 'mann-whitney')
Returns
-------
results : Dict[str, Any]
Test results including p-value and effect size
"""
from scipy.stats import mannwhitneyu, ttest_ind
scores = self.get_module_scores(module_name)
if scores is None:
raise ValueError(f"Module '{module_name}' not found.")
if condition_key not in self.adata.obs:
raise ValueError(f"Condition key '{condition_key}' not found in adata.obs.")
conditions = self.adata.obs[condition_key].unique()
if len(conditions) != 2:
raise ValueError(f"Expected 2 conditions, found {len(conditions)}.")
# Split scores by condition
mask1 = self.adata.obs[condition_key] == conditions[0]
mask2 = self.adata.obs[condition_key] == conditions[1]
scores1 = scores[mask1]
scores2 = scores[mask2]
# Perform test
if test == 'wilcoxon':
stat, pval = mannwhitneyu(scores1, scores2, alternative='two-sided')
elif test == 't-test':
stat, pval = ttest_ind(scores1, scores2)
elif test == 'mann-whitney':
stat, pval = mannwhitneyu(scores1, scores2)
else:
raise ValueError(f"Unknown test: {test}")
# Calculate effect size (Cohen's d)
mean_diff = np.mean(scores1) - np.mean(scores2)
pooled_std = np.sqrt((np.var(scores1) + np.var(scores2)) / 2)
cohens_d = mean_diff / pooled_std if pooled_std > 0 else 0
return {
'module': module_name,
'condition1': conditions[0],
'condition2': conditions[1],
'n1': len(scores1),
'n2': len(scores2),
'mean1': np.mean(scores1),
'mean2': np.mean(scores2),
'median1': np.median(scores1),
'median2': np.median(scores2),
'statistic': stat,
'pvalue': pval,
'cohens_d': cohens_d,
'test': test
}
[docs]
def validate_results(self) -> Dict[str, List[str]]:
"""
Validate ROMA results for potential issues.
Returns
-------
issues : Dict[str, List[str]]
Dictionary of potential issues found
"""
issues = {
'missing_genes': [],
'small_modules': [],
'high_outliers': [],
'low_variance': [],
'convergence': []
}
if 'ROMA' not in self.adata.uns:
issues['missing_genes'].append("No ROMA results found.")
return issues
for module_name, result in self.adata.uns['ROMA'].items():
# Check for missing genes
original_size = len(self.genesets.get(module_name, []))
found_size = len(result.subsetlist) if result.subsetlist is not None else 0
missing_pct = (original_size - found_size) / original_size * 100 if original_size > 0 else 0
if missing_pct > 50:
issues['missing_genes'].append(
f"{module_name}: {missing_pct:.1f}% genes missing"
)
# Check module size
if found_size < self.min_n_genes:
issues['small_modules'].append(
f"{module_name}: only {found_size} genes found"
)
# Check outlier proportion
if result.outliers is not None and found_size > 0:
outlier_pct = len(result.outliers) / found_size * 100
if outlier_pct > 30:
issues['high_outliers'].append(
f"{module_name}: {outlier_pct:.1f}% outliers"
)
# Check variance explained
if result.test_l1 is not None and result.test_l1 < 0.1:
issues['low_variance'].append(
f"{module_name}: only {result.test_l1:.3f} variance explained"
)
return issues
[docs]
def summary(self) -> None:
"""Print summary of ROMA analysis results."""
if 'ROMA_stats' not in self.adata.uns:
print("No ROMA results found. Run compute() first.")
return
stats = self.adata.uns['ROMA_stats']
active = self.adata.uns.get('ROMA_active_modules', pd.DataFrame())
print(f"\n{color.BOLD}=== ROMA Analysis Summary ==={color.END}\n")
print(f"Total gene sets analyzed: {len(stats)}")
print(f"Active modules (overdispersed): {len(active[active['q L1'] <= self.q_L1_threshold])}")
print(f"Active modules (shifted): {len(active[active['q Med Exp'] <= self.q_Med_Exp_threshold])}")
if len(self.unprocessed_genesets) > 0:
print(f"\nUnprocessed gene sets (too small): {len(self.unprocessed_genesets)}")
if len(self.unprocessed_genesets) <= 10:
print(f" {', '.join(self.unprocessed_genesets)}")
# Top modules by L1
print(f"\n{color.BOLD}Top 5 overdispersed modules:{color.END}")
top_l1 = stats.nsmallest(5, 'q L1')[['L1', 'q L1']]
for idx, row in top_l1.iterrows():
print(f" {idx}: L1={row['L1']:.3f}, q={row['q L1']:.3e}")
# Top modules by median expression
print(f"\n{color.BOLD}Top 5 shifted modules:{color.END}")
top_med = stats.nsmallest(5, 'q Med Exp')[['Median Exp', 'q Med Exp']]
for idx, row in top_med.iterrows():
print(f" {idx}: Med={row['Median Exp']:.3f}, q={row['q Med Exp']:.3e}")
# Validation summary
issues = self.validate_results()
total_issues = sum(len(v) for v in issues.values())
if total_issues > 0:
print(f"\n{color.BOLD}Potential issues found: {total_issues}{color.END}")
for issue_type, issue_list in issues.items():
if issue_list:
print(f" {issue_type}: {len(issue_list)} modules")
def __del__(self):
"""Cleanup when object is destroyed."""
# Clean up any temporary files or large objects
if hasattr(self, 'parallel_results') and self.parallel_results is not None:
try:
self.parallel_results.clear()
except:
pass
self.parallel_results.clear()
if hasattr(self, 'null_distributions'):
try:
self.null_distributions.clear()
except:
pass