Source code for tooluniverse.geo_tool

"""
GEO Database REST API Tool

This tool provides access to gene expression data from the GEO database.
GEO is a public repository that archives and freely distributes microarray,
next-generation sequencing, and other forms of high-throughput functional
genomics data.
"""

import re
import requests
from typing import Dict, Any, List
from .ncbi_eutils_tool import NCBIEUtilsTool
from .tool_registry import register_tool

# Base of the NCBI GEO FTP-over-HTTPS supplementary file tree.
GEO_FTP_BASE = "https://ftp.ncbi.nlm.nih.gov/geo"


[docs] @register_tool("GEORESTTool") class GEORESTTool(NCBIEUtilsTool): """ GEO Database REST API tool with rate limiting. Generic wrapper for GEO API endpoints defined in expression_tools.json. """
[docs] def __init__(self, tool_config): super().__init__(tool_config) fields = tool_config.get("fields", {}) parameter = tool_config.get("parameter", {}) self.endpoint_template: str = fields.get("endpoint", "/esearch.fcgi") self.required: List[str] = parameter.get("required", []) self.output_format: str = fields.get("return_format", "JSON") # Optional discriminator: when "supplementary_files", run() lists the # downloadable supplementary/raw files from the GEO FTP tree instead of # calling E-utilities. self.mode: str = fields.get("mode", "")
[docs] def _build_url(self, arguments: Dict[str, Any]) -> str | Dict[str, Any]: """Build URL for GEO API request.""" url_path = self.endpoint_template return self.base_url + url_path
[docs] def _build_params(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Build parameters for GEO API request.""" params = {"db": "gds", "retmode": "json", "retmax": 50} # Build search query query_parts = [] if "query" in arguments: query_parts.append(arguments["query"]) if "organism" in arguments: organism = arguments["organism"] if organism.lower() == "homo sapiens": query_parts.append("Homo sapiens[organism]") elif organism.lower() == "mus musculus": query_parts.append("Mus musculus[organism]") else: query_parts.append(f'"{organism}"[organism]') if "study_type" in arguments: study_type = arguments["study_type"] query_parts.append(f'"{study_type}"[study_type]') if "platform" in arguments: platform = arguments["platform"] query_parts.append(f'"{platform}"[platform]') if "date_range" in arguments: date_range = arguments["date_range"] if ":" in date_range: start_year, end_year = date_range.split(":") query_parts.append(f'"{start_year}"[PDAT] : "{end_year}"[PDAT]') if query_parts: params["term"] = " AND ".join(query_parts) if "limit" in arguments: params["retmax"] = min(arguments["limit"], 500) if "sort" in arguments: sort = arguments["sort"] if sort == "date": params["sort"] = "relevance" elif sort == "title": params["sort"] = "title" else: params["sort"] = "relevance" return params
[docs] def _detect_database(self, dataset_id: str) -> str: """ Return the appropriate NCBI GEO database name. For NCBI E-utilities, GEO records (GDS, GSE, GSM, GPL) are all accessed through the single `gds` database. The accession prefix (GDS/GSE/GSM) is used in the search term, not as the database name. """ return "gds"
[docs] def _accession_to_uid(self, dataset_id: str, db: str) -> Dict[str, Any]: """Convert accession number (e.g. GSE/GDS/GSM) to numeric UID using esearch.""" search_params = { "db": db, # Use ACCN field which is the documented field for accessions in GDS # See: https://www.ncbi.nlm.nih.gov/books/NBK25499/#chapter4.ESearch "term": f"{dataset_id}[ACCN]", "retmode": "json", "retmax": 1, } return self._make_request("/esearch.fcgi", search_params)
[docs] def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Execute the tool with given arguments.""" # Validate required parameters for param in self.required: if param not in arguments: return { "status": "error", "error": f"Missing required parameter: {param}", } if self.mode == "supplementary_files": return self._list_supplementary_files(arguments) # Set endpoint for the base class self.endpoint = self.endpoint_template params = self._build_params(arguments) # Use the parent class's _make_request with rate limiting return self._make_request(self.endpoint, params)
[docs] @staticmethod def _geo_bucket(accession: str) -> str: """Derive the GEO FTP bucket directory for a GSE/GSM accession. The bucket replaces the last three digits of the numeric part with 'nnn'; accessions with three or fewer digits live in the bare bucket. e.g. GSE42657 -> GSE42nnn, GSE1000 -> GSE1nnn, GSE100 -> GSEnnn, GSM1045442 -> GSM1045nnn. """ prefix = accession[:3] num = accession[3:] head = num[:-3] if len(num) > 3 else "" return f"{prefix}{head}nnn"
[docs] def _list_supplementary_files(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """List downloadable supplementary/raw files for a GEO Series/Sample. Series (GSE*) expose a structured filelist.txt TSV (Archive/File, Name, Time, Size, Type). Samples (GSM*) have no filelist.txt, so the suppl/ directory HTML listing is parsed instead. """ try: accession = str(arguments.get("accession", "")).strip().upper() if not accession: return { "status": "error", "error": "Missing required parameter: accession", } if accession.startswith("GSE"): kind, subdir = "series", "series" elif accession.startswith("GSM"): kind, subdir = "sample", "samples" else: return { "status": "error", "error": "accession must be a GEO Series (GSE...) or Sample (GSM...)", } bucket = self._geo_bucket(accession) base = f"{GEO_FTP_BASE}/{subdir}/{bucket}/{accession}/suppl" if kind == "series": return self._parse_filelist(accession, base) return self._parse_suppl_dir(accession, base) except requests.exceptions.Timeout: return { "status": "error", "error": f"GEO FTP request timed out after {self.timeout} seconds", } except requests.exceptions.ConnectionError: return { "status": "error", "error": "Failed to connect to GEO FTP server", } except Exception as e: return { "status": "error", "error": f"Failed to list supplementary files: {str(e)}", }
[docs] def _parse_filelist(self, accession: str, base: str) -> Dict[str, Any]: """Parse a Series filelist.txt TSV into structured file records.""" url = f"{base}/filelist.txt" resp = requests.get(url, timeout=self.timeout) if resp.status_code == 404: return { "status": "error", "error": f"No supplementary filelist found for {accession} (HTTP 404)", "url": url, } resp.raise_for_status() files = [] for line in resp.text.splitlines(): line = line.rstrip("\n") if not line or line.startswith("#"): continue parts = line.split("\t") if len(parts) < 5: continue kind, name, time_str, size, ftype = parts[:5] try: size_val: Any = int(size) except (ValueError, TypeError): size_val = size files.append( { "kind": kind, "name": name, "modified": time_str, "size": size_val, "type": ftype, "download_url": f"{base}/{name}", } ) return { "status": "success", "data": { "accession": accession, "suppl_url": base + "/", "files": files, "file_count": len(files), }, "metadata": { "source": "NCBI GEO FTP", "query": accession, "endpoint": "supplementary_files", }, }
[docs] def _parse_suppl_dir(self, accession: str, base: str) -> Dict[str, Any]: """Parse a Sample suppl/ directory HTML listing into file records.""" url = base + "/" resp = requests.get(url, timeout=self.timeout) if resp.status_code == 404: return { "status": "error", "error": f"No supplementary directory found for {accession} (HTTP 404)", "url": url, } resp.raise_for_status() files = [] seen = set() for href in re.findall(r'href="([^"]+)"', resp.text): # Skip parent links, absolute paths, and external policy links. if href.startswith("/") or href.startswith("http") or href.startswith("?"): continue if href in ("../",) or href.endswith("/"): continue if href in seen: continue seen.add(href) files.append( { "kind": "File", "name": href, "download_url": f"{base}/{href}", } ) return { "status": "success", "data": { "accession": accession, "suppl_url": url, "files": files, "file_count": len(files), }, "metadata": { "source": "NCBI GEO FTP", "query": accession, "endpoint": "supplementary_files", }, }
[docs] @register_tool("GEOSearchDatasets") class GEOSearchDatasets(GEORESTTool): """Search GEO datasets by various criteria."""
[docs] def __init__(self, tool_config): super().__init__(tool_config) self.endpoint_template = "/esearch.fcgi"
[docs] def _build_params(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Build parameters for GEO dataset search.""" params = {"db": "gds", "retmode": "json", "retmax": 50} # Build search query query_parts = [] if "query" in arguments: query_parts.append(arguments["query"]) if "organism" in arguments: organism = arguments["organism"] query_parts.append(f'"{organism}"[organism]') if "study_type" in arguments: study_type = arguments["study_type"] query_parts.append(f'"{study_type}"[study_type]') if "platform" in arguments: platform = arguments["platform"] query_parts.append(f'"{platform}"[platform]') if query_parts: params["term"] = " AND ".join(query_parts) if "limit" in arguments: params["retmax"] = min(arguments["limit"], 500) return params
[docs] @register_tool("GEOGetDatasetInfo") class GEOGetDatasetInfo(GEORESTTool): """Get detailed information about a specific GEO dataset."""
[docs] def __init__(self, tool_config): super().__init__(tool_config) self.endpoint_template = "/esummary.fcgi"
[docs] def _build_params(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Build parameters for GEO dataset info retrieval.""" dataset_id = arguments.get("dataset_id", "") if not dataset_id: return {"status": "error", "error": "dataset_id is required"} # Detect database type db = self._detect_database(dataset_id) # Check if dataset_id is already a numeric UID if dataset_id.isdigit(): return {"db": db, "id": dataset_id, "retmode": "json"} # For accession numbers, we need to convert to UID first # This will be handled in the run method return {"db": db, "id": dataset_id, "retmode": "json"}
[docs] def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Execute the tool with given arguments.""" # Validate required parameters for param in self.required: if param not in arguments: return { "status": "error", "error": f"Missing required parameter: {param}", } dataset_id = arguments.get("dataset_id", "") if not dataset_id: return {"status": "error", "error": "dataset_id is required"} # Detect database type db = self._detect_database(dataset_id) # Check if dataset_id is already a numeric UID if dataset_id.isdigit(): # Direct UID, use esummary directly self.endpoint = self.endpoint_template params = {"db": db, "id": dataset_id, "retmode": "json"} return self._make_request(self.endpoint, params) # For accession numbers, first convert to UID using esearch search_result = self._accession_to_uid(dataset_id, db) if search_result.get("status") != "success": return search_result search_data = search_result.get("data", {}) esearch_result = search_data.get("esearchresult", {}) idlist = esearch_result.get("idlist", []) if not idlist: return { "status": "error", "error": f"No UID found for accession {dataset_id} in database {db}", "data": search_data, } # Use the first UID from the search results uid = idlist[0] # Now use esummary with the UID self.endpoint = self.endpoint_template params = {"db": db, "id": uid, "retmode": "json"} return self._make_request(self.endpoint, params)
[docs] @register_tool("GEOGetSampleInfo") class GEOGetSampleInfo(GEORESTTool): """Get sample information for a GEO dataset."""
[docs] def __init__(self, tool_config): super().__init__(tool_config) self.endpoint_template = "/esummary.fcgi"
[docs] def _build_params(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Build parameters for GEO sample info retrieval.""" dataset_id = arguments.get("dataset_id", "") if not dataset_id: return {"status": "error", "error": "dataset_id is required"} # Detect database type db = self._detect_database(dataset_id) # Check if dataset_id is already a numeric UID if dataset_id.isdigit(): return {"db": db, "id": dataset_id, "retmode": "json"} # For accession numbers, we need to convert to UID first # This will be handled in the run method return {"db": db, "id": dataset_id, "retmode": "json"}
[docs] def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Execute the tool with given arguments.""" # Validate required parameters for param in self.required: if param not in arguments: return { "status": "error", "error": f"Missing required parameter: {param}", } dataset_id = arguments.get("dataset_id", "") if not dataset_id: return {"status": "error", "error": "dataset_id is required"} # Detect database type db = self._detect_database(dataset_id) # Check if dataset_id is already a numeric UID if dataset_id.isdigit(): # Direct UID, use esummary directly self.endpoint = self.endpoint_template params = {"db": db, "id": dataset_id, "retmode": "json"} return self._make_request(self.endpoint, params) # For accession numbers, first convert to UID using esearch search_result = self._accession_to_uid(dataset_id, db) if search_result.get("status") != "success": return search_result search_data = search_result.get("data", {}) esearch_result = search_data.get("esearchresult", {}) idlist = esearch_result.get("idlist", []) if not idlist: return { "status": "error", "error": f"No UID found for accession {dataset_id} in database {db}", "data": search_data, } # Use the first UID from the search results uid = idlist[0] # Now use esummary with the UID self.endpoint = self.endpoint_template params = {"db": db, "id": uid, "retmode": "json"} return self._make_request(self.endpoint, params)