# -*- coding: utf-8 -*-
import json
import logging
import os
import xml.etree.ElementTree as ET
from collections import Counter
from collections.abc import Iterable
from typing import Dict, List, Optional, Tuple, Union
import requests
from gseapy.utils import DEFAULT_CACHE_PATH, unique
[docs]def gsea_cls_parser(cls: str) -> Tuple[str]:
"""Extract class(phenotype) name from .cls file.
:param cls: the a class list instance or .cls file which is identical to GSEA input .
:return: phenotype name and a list of class vector.
"""
if not isinstance(cls, str) and isinstance(cls, Iterable):
classes = list(cls)
sample_name = unique(classes)
elif isinstance(cls, str):
with open(cls) as c:
file = c.readlines()
classes = file[2].strip().split()
sample_name = file[1].strip("#").strip().split()
tmp = set(sample_name) & set(classes)
if len(tmp) < 2: # classes and sample_name are different
s1 = classes[0]
for i, c in enumerate(classes):
if c == s1:
classes[i] = sample_name[0]
else:
classes[i] = sample_name[1]
else:
raise Exception("Error parsing sample name!")
if len(sample_name) != 2:
raise Exception("Input groups have to be 2!")
for c, v in Counter(classes).items():
if v < 3:
raise Exception(f"Number of {c}: {v}, it must be >= 3!")
return sample_name[0], sample_name[1], classes
[docs]def gsea_edb_parser(results_path: str) -> Dict[str, List[str]]:
"""Parse results.edb file stored under **edb** file folder.
:param results_path: the path of results.edb file.
:return:
a dict contains { enrichment_term: [es, nes, pval, fdr, fwer, hit_ind]}
"""
xtree = ET.parse(results_path)
xroot = xtree.getroot()
res = {}
# dict_keys(['RANKED_LIST', 'GENESET', 'FWER', 'ES_PROFILE',
# 'HIT_INDICES', 'ES', 'NES', 'TEMPLATE', 'RND_ES', 'RANK_SCORE_AT_ES',
# 'NP', 'RANK_AT_ES', 'FDR'])
for node in xroot.findall("DTG"):
enrich_term = node.attrib.get("GENESET").split("#")[1]
es_profile = node.attrib.get("ES_PROFILE").split(" ")
# esnull = term.get('RND_ES').split(" ")
hit_ind = node.attrib.get("HIT_INDICES").split(" ")
es_profile = [float(i) for i in es_profile]
hit_ind = [int(i) for i in hit_ind]
# esnull = [float(i) for i in esnull ]
es = float(node.attrib.get("ES"))
nes = float(node.attrib.get("NES"))
pval = float(node.attrib.get("NP"))
fdr = float(node.attrib.get("FDR"))
fwer = float(node.attrib.get("FWER"))
logging.debug("Enriched Gene set is: " + enrich_term)
res[enrich_term] = [es, nes, pval, fdr, fwer, hit_ind]
return res
[docs]def read_gmt(path: str) -> Dict[str, List[str]]:
"""Read GMT file
:param str path: the path to a gmt file.
:return: a dict object
"""
if path.lower().endswith("gmt"):
return get_library(name=path, min_size=0, max_size=100000, gene_list=None)
else:
raise ValueError("Please input a gmt file")
return
[docs]def get_library(
name: str,
organism: str = "Human",
min_size: int = 0,
max_size: int = 2000,
gene_list: Optional[List[str]] = None,
) -> Dict[str, List[str]]:
"""Parse gene_sets.gmt(gene set database) file or download from enrichr server.
:param str name: the gene_sets.gmt file or an enrichr library name.
checkout full enrichr library name here: https://maayanlab.cloud/Enrichr/#libraries
:param str organism: choose one from { 'Human', 'Mouse', 'Yeast', 'Fly', 'Fish', 'Worm' }.
This arugment has not effect if input is a `.gmt` file.
:param min_size: Minimum allowed number of genes for each gene set. Default: 0.
:param max_size: Maximum allowed number of genes for each gene set. Default: 2000.
:param gene_list: if input a gene list, min and max overlapped genes between gene set and gene_list are kept.
:return dict: Return a filtered gene set database dictionary.
Note: **DO NOT** filter gene sets, when use :func:`replot`. Because ``GSEA`` Desktop have already
done this for you.
"""
genesets_dict = {}
if name.lower().endswith(".gmt"):
logging.info("User Defined gene sets is given.......continue..........")
with open(name) as genesets:
for line in genesets:
entries = line.strip().split("\t")
key = entries[0]
genesets_dict[key] = entries[2:]
else:
# get gene sets from enrichr libary
names = get_library_name(organism=organism)
if name in names:
logging.info("Downloading and generating Enrichr library gene sets...")
genesets_dict = download_library(name, organism=organism)
else:
raise ValueError(
"Sorry. The input: %s could be be found given organism: %s"
% (name, organism)
)
# filtering gene_sets
total = len(genesets_dict)
keys = list(genesets_dict.keys())
if gene_list is None:
for k in keys:
if min_size <= len(genesets_dict[k]) <= max_size:
continue
del genesets_dict[k]
else:
# given a gene_list, filter gene sets by gene_overlap numbers
gene_dict = {g: i for i, g in enumerate(gene_list)}
for subset in keys:
subset_list = set(genesets_dict[subset]) # remove duplicates
# drop genes not found in the gene_dict
gene_overlap = [g for g in subset_list if g in gene_dict]
tag_len = len(gene_overlap)
if (min_size <= tag_len <= max_size) and tag_len < len(gene_list):
# tag_len should < gene_list
genesets_dict[subset] = gene_overlap
continue
del genesets_dict[subset]
filsets_num = total - len(genesets_dict)
if filsets_num > 0:
logging.info(
"%04d gene_sets have been filtered out when max_size=%s and min_size=%s"
% (filsets_num, max_size, min_size)
)
if filsets_num == len(genesets_dict):
raise Exception(
"No gene sets passed throught filtering condition!!!, try new paramters again!\n"
+ "Note: Gene names for gseapy is case sensitive."
)
return genesets_dict
[docs]def get_library_name(organism: str = "Human") -> List[str]:
"""return enrichr active enrichr library name.
see also: https://maayanlab.cloud/modEnrichr/
:param str organism: Select one from { 'Human', 'Mouse', 'Yeast', 'Fly', 'Fish', 'Worm' }
:return: a list of enrichr libraries from selected database
"""
default = [
"human",
"mouse",
"hs",
"mm",
"homo sapiens",
"mus musculus",
"h. sapiens",
"m. musculus",
]
_organisms = {
"Fly": ["fly", "d. melanogaster", "drosophila melanogaster"],
"Yeast": ["yeast", "s. cerevisiae", "saccharomyces cerevisiae"],
"Worm": ["worm", "c. elegans", "caenorhabditis elegans", "nematode"],
"Fish": ["fish", "d. rerio", "danio rerio", "zebrafish"],
}
ENRICHR_URL = "http://maayanlab.cloud"
database = ""
if organism.lower() in default:
database = "Enrichr"
else:
for k, v in _organisms.items():
if organism.lower() in v:
database = k + "Enrichr"
break
if not database.endswith("Enrichr"):
raise LookupError(
"""No supported database. Please input one of these:
('Human', 'Mouse', 'Yeast', 'Fly', 'Fish', 'Worm') """
)
# make a get request to get the gmt names and meta data from Enrichr
# old code
# response = requests.get('http://amp.pharm.mssm.edu/Enrichr/geneSetLibrary?mode=meta')
# gmt_data = response.json()
# # generate list of lib names
# libs = []
# # get library names
# for inst_gmt in gmt_data['libraries']:
# # only include active gmts
# if inst_gmt['isActive'] == True:
# libs.append(inst_gmt['libraryName'])
lib_url = "%s/%s/datasetStatistics" % (ENRICHR_URL, database)
response = requests.get(lib_url, verify=True)
if not response.ok:
raise Exception("Error getting the Enrichr libraries")
libs_json = json.loads(response.text)
libs = [lib["libraryName"] for lib in libs_json["statistics"]]
return sorted(libs)
[docs]def download_library(name: str, organism: str = "human") -> Dict[str, List[str]]:
"""download enrichr libraries.
:param str name: the enrichr library name. see `gseapy.get_library_name()`.
:param str organism: Select one from { 'Human', 'Mouse', 'Yeast', 'Fly', 'Fish', 'Worm' }
:return dict: gene_sets of the enrichr library from selected organism
"""
default = [
"human",
"mouse",
"hs",
"mm",
"homo sapiens",
"mus musculus",
"h. sapiens",
"m. musculus",
]
_organisms = {
"Fly": ["fly", "d. melanogaster", "drosophila melanogaster"],
"Yeast": ["yeast", "s. cerevisiae", "saccharomyces cerevisiae"],
"Worm": ["worm", "c. elegans", "caenorhabditis elegans", "nematode"],
"Fish": ["fish", "d. rerio", "danio rerio", "zebrafish"],
}
ENRICHR_URL = "http://maayanlab.cloud"
database = ""
if organism.lower() in default:
database = "Enrichr"
else:
for k, v in _organisms.items():
if organism.lower() in v:
database = k + "Enrichr"
break
if not database.endswith("Enrichr"):
raise LookupError(
"""No supported database. Please input one of these:
('Human', 'Mouse', 'Yeast', 'Fly', 'Fish', 'Worm') """
)
tmpname = "%s.%s.gmt" % (database, name)
tempath = os.path.join(DEFAULT_CACHE_PATH, tmpname)
if os.path.isfile(tempath):
logging.info("Library is already downloaded in: %s, use local file" % tempath)
genesets_dict = {}
with open(tempath) as genesets:
for line in genesets:
entries = line.strip().split("\t")
key = entries[0]
genesets_dict[key] = entries[2:]
return genesets_dict
# queery string
ENRICHR_URL = ENRICHR_URL + "/%s/geneSetLibrary" % database
query_string = "?mode=text&libraryName=%s"
# get
response = requests.get(
ENRICHR_URL + query_string % name, timeout=None, stream=True
)
if not response.ok:
raise Exception(
"Error fetching gene set library, input name is correct for the organism you've set?."
)
# reformat to dict
genesets_dict = {}
# outname = os.path.join(DEFAULT_CACHE_PATH, "enrichr.%s.gmt" % libname)
for line in response.iter_lines(chunk_size=1024, decode_unicode="utf-8"):
line = line.strip().split("\t")
k = line[0]
v = map(lambda x: x.split(",")[0], line[2:])
v = list(filter(lambda x: True if len(x) else False, v))
genesets_dict[k] = v
return genesets_dict