Source code for gseapy.biomart

import logging
import os
from collections.abc import Iterable
from concurrent.futures import ThreadPoolExecutor, as_completed
from io import StringIO
from typing import Dict, List, Optional
from xml.etree import cElementTree as ET

import pandas as pd
import requests

from gseapy.utils import log_init, mkdirs, retry


[docs] class Biomart: """query from BioMart""" CHUNK_SIZE = 300 # Default chunk size for batching queries MAX_WORKERS: Optional[int] = None # Default to auto; set an int to override def __init__(self, host: str = "www.ensembl.org", verbose: bool = False): """simple API to BioMart services. How to query validated dataset, attributes, filters. Example:: >>> from gseapy import Biomart >>> bm = Biomart(verbose=False, host="ensembl.org") >>> ## view validated marts >>> marts = bm.get_marts() >>> ## view validated dataset >>> datasets = bm.get_datasets(mart='ENSEMBL_MART_ENSEMBL') >>> ## view validated attributes >>> attrs = bm.get_attributes(dataset='hsapiens_gene_ensembl') >>> ## view validated filters >>> filters = bm.get_filters(dataset='hsapiens_gene_ensembl') >>> ## query results >>> queries = ['ENSG00000125285','ENSG00000182968'] # a python list >>> results = bm.query(dataset='hsapiens_gene_ensembl', attributes=['entrezgene_id', 'go_id'], filters={'ensembl_gene_id': queries} ) """ self._id = str(id(self)) self._logger = log_init( name="Biomart" + self._id, log_level=logging.INFO if verbose else logging.WARNING, filename=None, ) self._set_host(host) self.attributes_xml = [] self.filters_xml = [] self.dataset_xml = "" params = { "host": self.host, "version": "1.0", "virtualSchemaName": "default", "formatter": "TSV", "header": 0, "uniqueRows": 1, "configVersion": "0.6", "completionStamp": 0, } self.header = ( """https://%(host)s/biomart/martservice?query=""" """<?xml version="%(version)s" encoding="UTF-8"?>""" """<!DOCTYPE Query>""" """<Query virtualSchemaName="%(virtualSchemaName)s" formatter="%(formatter)s" """ """header="%(header)s" uniqueRows="%(uniqueRows)s" count="" """ """datasetConfigVersion="%(configVersion)s" completionStamp="%(completionStamp)s">""" ) self.header = self.header % params self.footer = "</Dataset></Query>" self.reset() # get supported marts self._marts = None def __del__(self): handlers = self._logger.handlers[:] for handler in handlers: handler.close() # close file self._logger.removeHandler(handler) def _set_host(self, host: str): """set host""" hosts = ["useast.ensembl.org", "asia.ensembl.org"] hosts.insert(0, host) secure = "s" # if host not work, select next i = 0 while i < len(hosts): url = "http{}://{}/biomart/martservice?type=registry".format(secure, hosts[i]) request = requests.get(url) # '<html>\n\n<head>\n <title>Service unavailable</title>\n # "\n<MartRegistry>\n" if request.ok and request.text.startswith("\n<MartRegistry>\n"): self.host = hosts[i] self._marts = self._get_mart(request.text) break self._logger.warning("host {} is not reachable, try {} ".format(hosts[i], hosts[(i + 1) % len(hosts)])) i += 1 if i == len(hosts): self._logger.warning("hosts is not reachable. Please try again later.")
[docs] def add_filter(self, name: str, value: Iterable[str]): """ key: filter names value: Iterable[str] """ if isinstance(value, str): pass # keep string as is elif isinstance(value, Iterable): value = ",".join([str(v) for v in list(value)]) _filter = "" if name.lower().startswith("with"): _filter = """<Filter name="%s" excluded="%s"/>""" % (name, value) else: _filter = """<Filter name="%s" value="%s"/>""" % (name, value) self.filters_xml.append(_filter)
def add_attribute(self, attribute: str): _attr = """<Attribute name="%s"/>""" % attribute self.attributes_xml.append(_attr) def add_dataset(self, dataset: str): self.dataset_xml = """<Dataset name="%s" interface="default" >""" % dataset def reset(self): self.attributes_xml = [] self.filters_xml = [] self.dataset_xml = "" def get_xml(self): xml = self.header xml += self.dataset_xml for line in self.filters_xml: xml += line for line in self.attributes_xml: xml += line xml += self.footer return xml
[docs] def get_xml_body(self): """Return only the XML body without the URL prefix. This is suitable for POST requests where the XML is sent in the body as the 'query' form field to the biomart endpoint. """ # self.header starts with 'https://{host}/biomart/martservice?query=' then the XML header # Split once on 'query=' to strip the URL and keep the XML portion intact try: xml_header = self.header.split("query=", 1)[1] except Exception: # Fallback: if format changes in the future, best effort to keep original behavior xml_header = self.header xml = xml_header xml += self.dataset_xml for line in self.filters_xml: xml += line for line in self.attributes_xml: xml += line xml += self.footer return xml
def _build_xml_strings( self, dataset: str, attributes: List[str], filters: Dict[str, Iterable[str]], ) -> Dict[str, str]: """Build XML strings without mutating instance state. Returns a dict with keys: - body: XML suitable for POST body (no URL prefix) - url: Full GET URL with query parameter """ # Build dataset and tags locally to keep thread-safety dataset_xml = f'<Dataset name="{dataset}" interface="default" >' filter_tags: List[str] = [] for name, value in filters.items(): if isinstance(value, str): v = value elif isinstance(value, Iterable): v = ",".join([str(vv) for vv in list(value)]) else: v = str(value) if name.lower().startswith("with"): filter_tags.append(f'<Filter name="{name}" excluded="{v}"/>') else: filter_tags.append(f'<Filter name="{name}" value="{v}"/>') attribute_tags = [f'<Attribute name="{a}"/>' for a in attributes] # Header body portion (post-'query=') derived from self.header try: xml_header = self.header.split("query=", 1)[1] except Exception: xml_header = self.header body = xml_header + dataset_xml + "".join(filter_tags) + "".join(attribute_tags) + self.footer url = f"https://{self.host}/biomart/martservice?query=" + body return {"body": body, "url": url} def _get_mart(self, text: str): """ Parse the xml text and return a dataframe of supported marts. Parameters ---------- text : str a xml text Returns ------- marts : pd.DataFrame a dataframe of supported marts with columns: - Mart: the name of mart - Version: the version of mart """ marts = [e.attrib for e in ET.XML(text)] marts = pd.DataFrame(marts) required_columns = ["database", "displayName", "name"] missing = [col for col in required_columns if col not in marts.columns] if missing: raise ValueError(f"BioMart registry XML missing columns: {missing}. Schema may have changed.") marts = marts.loc[:, required_columns] marts.columns = ["Version", "DisplayName", "Mart"] # get supported marts return marts.loc[:, ["Mart", "Version"]]
[docs] def get_marts(self): """Get available marts and their names.""" url = "https://{host}/biomart/martservice?type=registry&requestid=gseapy{i}".format(host=self.host, i=self._id) if self._marts is not None: return self._marts resp = requests.get(url) if resp.ok and resp.text.startswith("\n<MartRegistry>\n"): self._marts = self._get_mart(resp.text) return self._marts return resp.text
[docs] def get_datasets(self, mart: str = "ENSEMBL_MART_ENSEMBL"): """Get available datasets from mart you've selected""" marts = self.get_marts() if mart not in marts["Mart"].values: raise ValueError("Provided mart name (%s) is not valid. see 'names' attribute" % mart) url = "https://{host}/biomart/martservice?type=datasets&mart={mart}".format(host=self.host, mart=mart) resp = requests.get(url) if resp.ok: if resp.text.startswith("Problem"): return resp.text datasets = [record.split("\t") for record in resp.text.split("\n") if len(record) > 1] datasets = pd.DataFrame(datasets).iloc[:, 1:3] datasets.columns = ["Dataset", "Description"] return datasets return resp.text
[docs] def get_attributes(self, dataset: str = "hsapiens_gene_ensembl"): """Get available attritbutes from dataset you've selected""" # assert dataset in url = "https://{host}/biomart/martservice?type=attributes&dataset={dataset}".format( host=self.host, dataset=dataset ) resp = requests.get(url) if resp.ok: attributes = [text.split("\t") for text in resp.text.strip().split("\n")] attributes = pd.DataFrame(attributes).iloc[:, :3] attributes.columns = ["Attribute", "Description", "Additional"] return attributes return resp.text
[docs] def get_filters(self, dataset: str = "hsapiens_gene_ensembl"): """Get available filters from dataset you've selected""" # filters = super(Biomart, self).filters(dataset) # if dataset not in [x for k in self.valid_attributes.keys() for x in self.valid_attributes[k]]: # raise ValueError("provided dataset (%s) is not found. see valid_attributes" % dataset) url = "https://{host}/biomart/martservice?type=filters&dataset={dataset}".format( host=self.host, dataset=dataset ) resp = requests.get(url) if resp.ok: if str(resp.text).startswith("Query ERROR"): return resp.text filters = [text.split("\t") for text in resp.text.strip().split("\n")] df_filters = pd.DataFrame(filters) # Check if there are enough columns before selecting expected_indices = [0, 1, 3, 5] if df_filters.shape[1] >= max(expected_indices) + 1: df_filters = df_filters.iloc[:, expected_indices] df_filters.columns = [ "Filter", "Description", "Additional", "InputType", ] return df_filters else: self._logger.warning( f"Filter response has {df_filters.shape[1]} columns, expected at least {max(expected_indices) + 1}. Returning raw DataFrame." ) return df_filters return resp.text
[docs] def query( self, dataset: str = "hsapiens_gene_ensembl", attributes: Optional[List[str]] = None, filters: Optional[Dict[str, Iterable[str]]] = None, filename: Optional[str] = None, ): """mapping ids using BioMart. :param dataset: str, default: 'hsapiens_gene_ensembl' :param attributes: str, list, tuple :param filters: dict, {'filter name': list(filter value)} :param host: www.ensembl.org, asia.ensembl.org, useast.ensembl.org :return: a dataframe contains all attributes you selected. Example: >>> queries = {'ensembl_gene_id': ['ENSG00000125285','ENSG00000182968'] } # need to be a python dict >>> results = bm.query(dataset='hsapiens_gene_ensembl', attributes=['ensembl_gene_id', 'external_gene_name', 'entrezgene_id', 'go_id'], filters=queries) """ if attributes is None or not attributes: attributes = [ "ensembl_gene_id", "external_gene_name", "entrezgene_id", "go_id", ] if isinstance(attributes, str): attributes = attributes.split(",") if filters is None: filters = {} if not isinstance(filters, dict): raise ValueError("filters only accept a dict object") df = self.query_simple(dataset=dataset, filters=filters, attributes=attributes, filename=None) if df is None: return elif isinstance(df, str): print(df) return if "entrezgene_id" in df.columns: if not pd.api.types.is_integer_dtype(df["entrezgene_id"]): df["entrezgene_id"] = df["entrezgene_id"].astype(pd.Int32Dtype()) if "entrezgene_id" in df.columns: df["entrezgene_id"] = df["entrezgene_id"].astype(pd.Int32Dtype()) self.results = df # save file to cache path. if filename is not None: mkdirs(os.path.dirname(filename)) df.to_csv(filename, sep="\t", index=False) return df
[docs] def query_simple( self, dataset: str = "hsapiens_gene_ensembl", attributes: Optional[List[str]] = None, filters: Optional[Dict[str, Iterable[str]]] = None, filename: Optional[str] = None, ): """ This function is a simple version of BioMart REST API. same parameter to query(). However, you could get cross page of mapping. such as Mouse 2 human gene names **Note**: it will take a couple of minutes to get the results. A xml template for querying biomart. (see https://gist.github.com/keithshep/7776579) Example:: >>> from gseapy import Biomart >>> bm = Biomart() >>> results = bm.query_simple(dataset='mmusculus_gene_ensembl', attributes=['ensembl_gene_id', 'external_gene_name', 'hsapiens_homolog_associated_gene_name']) """ if attributes is None: attributes = [] if filters is None: filters = {} self.reset() self.add_dataset(dataset) self._logger.debug("Add attributes") for at in attributes: self.add_attribute(at) self._logger.debug("Add filters") for n, v in filters.items(): self.add_filter(n, v) self._logger.debug("Build xml") # Build XML for both GET (URL) and POST (body) self._xml = self.get_xml() xml_body = self.get_xml_body() endpoint = f"https://{self.host}/biomart/martservice" s = retry(num=5) def _parse_response_text(text: str): if str(text).startswith("Query ERROR"): self._logger.error(text) return None df_local = pd.read_table(StringIO(text), header=None) # Validate column count before assigning names if len(df_local.columns) == len(attributes): df_local.columns = attributes else: self._logger.warning( f"Response column count ({len(df_local.columns)}) does not match attributes ({len(attributes)})." ) return df_local def _try_post(xml_body_local: str): try: resp = s.post(endpoint, data={"query": xml_body_local}) except Exception as e: self._logger.warning(f"POST request failed: {e}") return None, None if resp.ok: df_local = _parse_response_text(resp.text) if df_local is not None: return df_local, None return None, resp.text return None, resp.text def _try_get(xml_url_local: str): try: resp = s.get(xml_url_local) except Exception as e: self._logger.warning(f"GET request failed: {e}") return None, None if resp.ok: df_local = _parse_response_text(resp.text) if df_local is not None: return df_local, None return None, resp.text return None, resp.text # Strategy: # 1) Prefer POST to avoid 414 (URL too long) # 2) Fallback to GET for small queries # 3) If still failing (e.g., payload too large), batch the largest list-like filter # First attempt: POST once with the whole query df = None err_text = None # Heuristic: if URL is short, it's safe to try GET directly too, but POST is fine for both df, err_text = _try_post(xml_body) if df is None and (err_text is None or "414" in str(err_text)): # Try GET if POST didn't return a valid table (network issue) or explicit 414 indicates GET may also fail df, err_text = _try_get(self._xml) if df is not None: if filename is not None: df.to_csv(filename, sep="\t", index=False) self.results = df return df # If we reached here, try batching on the largest list-like filter value # Identify list-like filters list_like_keys = [] for k, v in filters.items(): if isinstance(v, (list, tuple, set, pd.Series)): list_like_keys.append((k, list(v))) if not list_like_keys: # No list-like filters to batch; return the last error text return err_text if err_text is not None else "BioMart request failed." # Choose the largest filter to batch key_to_batch, values_to_batch = max(list_like_keys, key=lambda kv: len(kv[1])) if len(values_to_batch) == 0: return err_text if err_text is not None else "No values to query." # Reasonable chunk size that works reliably with BioMart dfs: List[pd.DataFrame] = [] # Determine chunk size; allow runtime override via instance/class attribute if present chunk_size = getattr(self, "CHUNK_SIZE", 300) # Build chunk specs first (immutable) chunks = [] for i in range(0, len(values_to_batch), chunk_size): sub_values = values_to_batch[i : i + chunk_size] chunk_filters = {**filters, key_to_batch: sub_values} xmls = self._build_xml_strings(dataset, attributes, chunk_filters) chunks.append((i // chunk_size + 1, xmls["body"], xmls["url"])) def _fetch_chunk(idx: int, body_xml: str, url_xml: str): df_chunk, err_text_chunk = _try_post(body_xml) if df_chunk is None: df_chunk, err_text_chunk = _try_get(url_xml) if df_chunk is None: self._logger.warning(f"BioMart chunk {idx} failed: {err_text_chunk}") return df_chunk # Parallelize chunk fetching with a modest concurrency to improve throughput # Keep concurrency bounded to be nice to the BioMart service max_workers = getattr(self, "MAX_WORKERS", None) if max_workers is None: # Enable moderate parallelism for very large inputs; otherwise keep sequential max_workers = 1 if len(values_to_batch) < 2000 else 8 if max_workers > 1 and len(chunks) > 1: with ThreadPoolExecutor(max_workers=max_workers) as ex: futures = [ex.submit(_fetch_chunk, idx, body, url) for idx, body, url in chunks] for fut in as_completed(futures): df_chunk = fut.result() if df_chunk is not None: dfs.append(df_chunk) else: for idx, body, url in chunks: df_chunk = _fetch_chunk(idx, body, url) if df_chunk is not None: dfs.append(df_chunk) if not dfs: return err_text if err_text is not None else "BioMart request failed in all chunks." df_all = pd.concat(dfs, ignore_index=True).drop_duplicates() if filename is not None: df_all.to_csv(filename, sep="\t", index=False) self.results = df_all return df_all