Source code for polygraph.motifs

import itertools

import numpy as np
import pandas as pd
from sklearn.decomposition import NMF
from statsmodels.stats.multitest import fdrcorrection

from polygraph.stats import groupwise_fishers, groupwise_mann_whitney


[docs] def scan(seqs, meme_file, group_col="Group", pthresh=1e-3, rc=True): """ Scan a DNA sequence using motifs from a MEME file. Args: seqs (str): Dataframe containing DNA sequences meme_file (str): Path to MEME file group_col (str): Column containing group IDs pthresh (float): p-value cutoff for binding sites rc (bool): Whether to scan the sequence reverse complement as well Returns: pd.DataFrame containing columns 'MotifID', 'SeqID', 'start', 'end', 'strand'. """ from collections import defaultdict from pymemesuite.common import Sequence from pymemesuite.fimo import FIMO from polygraph.input import read_meme_file # Load motifs motifs, bg = read_meme_file(meme_file) # Format sequences sequences = [ Sequence(seq, name=id.encode()) for seq, id in zip(seqs["Sequence"].tolist(), seqs.index.tolist()) ] # Setup FIMO fimo = FIMO(both_strands=rc, threshold=pthresh) # Empty dictionary for output out = defaultdict(list) # Scan for motif in motifs: match = fimo.score_motif(motif, sequences, bg).matched_elements for m in match: out["MotifID"].append(motif.name.decode()) out["SeqID"].append(m.source.accession.decode()) if m.strand=='+': out["start"].append(m.start) out["end"].append(m.stop) else: out["end"].append(m.start) out["start"].append(m.stop) out["strand"].append(m.strand) return pd.DataFrame(out).merge(seqs[[group_col]], left_on="SeqID", right_index=True)
[docs] def motif_frequencies(sites, normalize=False, seqs=None): """ Count frequency of occurrence of motifs in a list of sequences Args: sites (list): Output of `scan` function normalize (bool): Whether to normalize the resulting count matrix to correct for sequence length seqs (pd.DataFrame): Pandas dataframe containing DNA sequences. Needed if normalize=True. Returns: cts (pd.DataFrame): Count matrix with rows = sequences and columns = motifs """ motifs = sites.MotifID.unique() cts = np.zeros((len(seqs), len(motifs))) cts = sites[["MotifID", "SeqID"]].value_counts().reset_index(name="count") cts = cts.pivot_table(index="SeqID", columns="MotifID", values="count") cts = cts.merge(seqs[[]], left_index=True, right_index=True, how="right").fillna(0) if normalize: assert seqs is not None, "seqs must be provided for normalization" seq_lens = seqs["Sequence"].apply(len) cts = cts.divide(seq_lens.tolist(), axis=0) return cts
[docs] def nmf(counts, seqs, reference_group, group_col="Group", n_components=10): """ Perform NMF on motif count matrix Args: counts (pd.DataFrame): motif count matrix where rows are sequences and columns are motifs. seqs (pd.DataFrame): pandas dataframe containing DNA sequences. reference_group (str): ID for the group to use as reference group_col (str): Name of the column in `seqs` containing group IDs n_components (int): Number of components or factors to extract using NMF Returns: W (pd.DataFrame): Pandas dataframe of size sequences x factors, containing the contribution of each factor to each sequence. H (pd.DataFrame): Pandas dataframe of size factors x motifs, containing the contribution of each motif to each factor. res (pd.DataFrame): Pandas dataframe containing the FDR-corrected significance testing results for factor contribution between groups. """ # Run NMF model = NMF(n_components=n_components, init="random", random_state=0) # Obtain W and H matrices W = pd.DataFrame(model.fit_transform(counts.values)) # seqs x factors H = pd.DataFrame(model.components_) # factors x motifs # Format W and H matrices factors = [f"factor_{i}" for i in range(n_components)] W.index = counts.index W.columns = factors H.index = factors H.columns = counts.columns # Add group IDs to W W[group_col] = seqs[group_col].tolist() # Significance testing between groups res = pd.DataFrame() for col in W.columns[:-1]: # For each factor, test whether its abundance differs between groups factor_res = groupwise_mann_whitney( W, val_col=col, reference_group=reference_group, group_col=group_col ) factor_res["factor"] = col res = pd.concat([res, factor_res]) # FDR correction res["padj"] = fdrcorrection(res.pval)[1] return W, H, res
[docs] def get_motif_pairs(sites): """ List the pairs of motifs present in each sequence. Args: sites (pd.DataFrame): Pandas dataframe containing FIMO output. Returns: pairs (pd.DataFrame): Dataframe containing all motif pairs in each sequence with their orientation and distance. """ # Get midpoint of motif sites["mid"] = sites["start"] + ((sites["end"] - sites["start"]) / 2) # List the motif, strand and position for each site df = sites[["SeqID"]].copy() df["data"] = sites[["MotifID", "strand", "mid"]].apply( lambda row: row.tolist(), axis=1 ) # Get all pairs of sites present in each sequence pairs = pd.DataFrame( df.groupby("SeqID")["data"].apply(lambda x: list(itertools.combinations(x, 2))), columns=["data"], ) pairs = pairs.explode("data") # Get the orientation and distance for each pair of motifs pairs = pd.DataFrame( pairs.data.apply(lambda x: list(zip(*x))).tolist(), index=pairs.index, columns=["MotifID", "strand", "pos"], ) pairs["orientation"] = pairs.strand.apply( lambda x: "same" if len(set(x)) == 1 else "opposite" ) pairs["distance"] = pairs.pos.apply(lambda x: np.abs(x[1] - x[0])) return pairs[["MotifID", "orientation", "distance"]]
def _filter_motif_pairs( motif_pairs, seqs, reference_group=None, group_col="Group", min_prop_cutoff=0, max_prop_cutoff=0, ref_prop_cutoff=0, ): # Count occurrence of motif pairs in each group cts = motif_pairs[["MotifID", group_col]].reset_index().drop_duplicates() cts = cts[["MotifID", group_col]].value_counts().reset_index(name="count") cts["group_total"] = seqs[group_col].value_counts()[cts[group_col]].tolist() cts["group_prop"] = cts["count"] / cts.group_total cts = cts.pivot_table( index="MotifID", columns=group_col, values="group_prop" ).fillna(0) # Filter rare pairs by proportion sel_pairs = set( cts.index[(cts.max(1) >= max_prop_cutoff) & (cts.min(1) >= min_prop_cutoff)] ) if ref_prop_cutoff > 0: assert reference_group is not None sel_pairs = sel_pairs.intersection( cts.index[cts[reference_group] >= ref_prop_cutoff] ) print(f"Selected {len(sel_pairs)} pairs based on in-group proportion") motif_pairs = motif_pairs[motif_pairs.MotifID.isin(sel_pairs)] return motif_pairs.copy()
[docs] def motif_pair_differential_abundance( motif_pairs, seqs, reference_group, group_col="Group", max_prop_cutoff=0, min_prop_cutoff=0, ref_prop_cutoff=0, ): """ Compare the rate of occurence of pairwise combinations of motifs between groups Args: motif_pairs (pd.DataFrame): Pandas dataframe containing the ouptut of get_motif_pairs. seqs (pd.DataFrame): Pandas dataframe containing sequences reference_group (str): ID of group to use as reference group_col (str): Name of column in `seqs` containing group IDs max_prop_cutoff (int): Limit to combinations with this proportion in at least one group. min_prop_cutoff (float): Limit to combinations with this proportion in in all groups. Returns: res (pd.DataFrame): Pandas dataframe containing FDR-corrected significance testing results for the occurrence of pairwise combinations between groups """ res = pd.DataFrame() motif_pairs = motif_pairs.merge( seqs[[group_col]], left_index=True, right_index=True ) if (max_prop_cutoff > 0) or (min_prop_cutoff > 0) or (ref_prop_cutoff > 0): motif_pairs = _filter_motif_pairs( motif_pairs, seqs, reference_group=reference_group, group_col=group_col, max_prop_cutoff=max_prop_cutoff, min_prop_cutoff=min_prop_cutoff, ref_prop_cutoff=ref_prop_cutoff, ) df = seqs[[group_col]].copy() for pair in motif_pairs.MotifID.unique(): seqs_with_pair = motif_pairs[motif_pairs.MotifID == pair].index df["has_pair"] = df.index.isin(seqs_with_pair) curr_res = groupwise_fishers( df, reference_group=reference_group, val_col="has_pair", reference_val=None, group_col=group_col, ).reset_index() curr_res["MotifID"] = [pair] * len(curr_res) res = pd.concat([res, curr_res]) # FDR correction res["padj"] = fdrcorrection(res.pval)[1] return res.reset_index(drop=True)
[docs] def motif_pair_differential_orientation( motif_pairs, seqs, reference_group, group_col="Group", max_prop_cutoff=0, min_prop_cutoff=0, ref_prop_cutoff=0, ): """ Compare the mutual orientation of all motif pairs between groups. Args: motif_pairs (pd.DataFrame): Pandas dataframe containing the ouptut of get_motif_pairs. seqs (pd.DataFrame): Pandas dataframe containing sequences reference_group (str): ID of group to use as reference group_col (str): Name of column in `seqs` containing group IDs max_prop_cutoff (int): Limit to combinations with this proportion in at least one group. min_prop_cutoff (float): Limit to combinations with this proportion in in all groups. Returns: res (pd.DataFrame): Pandas dataframe containing FDR-corrected significance testing results for the mutual orientation of pairwise combinations between groups """ res = pd.DataFrame() motif_pairs = motif_pairs.merge( seqs[[group_col]], left_index=True, right_index=True ) if (max_prop_cutoff > 0) or (min_prop_cutoff > 0) or (ref_prop_cutoff > 0): motif_pairs = _filter_motif_pairs( motif_pairs, seqs, reference_group=reference_group, group_col=group_col, max_prop_cutoff=max_prop_cutoff, min_prop_cutoff=min_prop_cutoff, ref_prop_cutoff=ref_prop_cutoff, ) for pair in motif_pairs.MotifID.unique(): curr_res = groupwise_fishers( motif_pairs[motif_pairs.MotifID == pair], reference_group=reference_group, val_col="orientation", reference_val="same", group_col=group_col, ).reset_index() curr_res["MotifID"] = [pair] * len(curr_res) res = pd.concat([res, curr_res]) # FDR correction res["padj"] = fdrcorrection(res.pval)[1] return res.reset_index(drop=True)
[docs] def motif_pair_differential_distance( motif_pairs, seqs, reference_group, group_col="Group", max_prop_cutoff=0, min_prop_cutoff=0, ref_prop_cutoff=0, ): """ Compare the distance between all motif pairs across groups. Args: motif_pairs (pd.DataFrame): Pandas dataframe containing the ouptut of get_motif_pairs. seqs (pd.DataFrame): Pandas dataframe containing sequences reference_group (str): ID of group to use as reference group_col (str): Name of column in `seqs` containing group IDs max_prop_cutoff (int): Limit to combinations with this proportion in at least one group. min_prop_cutoff (float): Limit to combinations with this proportion in in all groups. Returns: res (pd.DataFrame): Pandas dataframe containing FDR-corrected significance testing results for the distance between paired motifs, between groups """ res = pd.DataFrame() motif_pairs = motif_pairs.merge( seqs[[group_col]], left_index=True, right_index=True ) if (max_prop_cutoff > 0) or (min_prop_cutoff > 0) or (ref_prop_cutoff > 0): motif_pairs = _filter_motif_pairs( motif_pairs, seqs, reference_group=reference_group, group_col=group_col, max_prop_cutoff=max_prop_cutoff, min_prop_cutoff=min_prop_cutoff, ref_prop_cutoff=ref_prop_cutoff, ) for pair in motif_pairs.MotifID.unique(): curr_res = groupwise_mann_whitney( motif_pairs[motif_pairs.MotifID == pair], reference_group=reference_group, val_col="distance", group_col=group_col, ).reset_index() curr_res["MotifID"] = [pair] * len(curr_res) res = pd.concat([res, curr_res]) # FDR correction res["padj"] = fdrcorrection(res.pval)[1] return res.reset_index(drop=True)
[docs] def score_sites(sites, seqs, scores): """ Calculate the average score of each motif site given base-level importance scores. Args: sites (pd.DataFrame): Dataframe containing site positions seqs (pd.DataFrame): Dataframe containing sequences scores (np.array): Numpy array of shape (sequences x length) Returns sites (pd.DataFrame): 'sites' dataframe with an additional columns 'score' """ sites["score"] = sites.apply( lambda row: scores[seqs.index == row.SeqID, row.start : row.end].mean() if row.strand == "+" else scores[seqs.index == row.SeqID, row.end : row.start].mean(), axis=1, ) return sites