Source code for tooluniverse.ncbi_sra_tool

"""NCBI SRA (Sequence Read Archive) Tool for NGS/RNA-seq data access."""

import xml.etree.ElementTree as ET
from typing import Any, Dict
import requests
from .ncbi_eutils_tool import NCBIEUtilsTool
from .tool_registry import register_tool


[docs] @register_tool("NCBISRATool") class NCBISRATool(NCBIEUtilsTool): """NCBI SRA Tool using E-utilities for sequencing run metadata and downloads.""" _OPERATIONS = { "search": "_search_sra_runs", "get_run_info": "_get_run_info", "get_download_urls": "_get_download_urls", "locate_run_files": "_locate_run_files", "link_to_biosample": "_link_to_biosample", } # SRA Data Locator (SDL) v2 — returns verified cloud object locations # (S3/GCP), authoritative byte size, md5 checksum, region, and # modification date for an SRA run accession. _SDL_URL = "https://locate.ncbi.nlm.nih.gov/sdl/2/retrieve"
[docs] def __init__(self, tool_config): super().__init__(tool_config) self.db = "sra"
[docs] @staticmethod def _normalize_accessions(arguments: Dict[str, Any]) -> list: """Extract and normalize accessions from arguments, always returning a list.""" accessions = arguments.get("accessions", []) if isinstance(accessions, str): return [accessions] return accessions
[docs] def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Execute the SRA tool with given arguments.""" operation = arguments.get("operation") or self.get_schema_const_operation() if not operation: return {"status": "error", "error": "Missing required parameter: operation"} method_name = self._OPERATIONS.get(operation) if not method_name: return {"status": "error", "error": f"Unknown operation: {operation}"} return getattr(self, method_name)(arguments)
_SEARCH_FIELDS = { "study": "Study", "organism": "Organism", "strategy": "Strategy", "platform": "Platform", "source": "Source", }
[docs] def _build_search_term(self, arguments: Dict[str, Any]) -> str: """Build NCBI SRA search term from arguments.""" terms = [ f"{arguments[key]}[{field}]" for key, field in self._SEARCH_FIELDS.items() if arguments.get(key) ] # Feature-28B-15 fix: always include free-text query alongside structured filters if arguments.get("query"): terms.append(arguments["query"]) if terms: return " AND ".join(f"({term})" for term in terms) return ""
[docs] def _search_sra_runs(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Search SRA database for runs using esearch.""" try: # Build search term search_term = self._build_search_term(arguments) if not search_term: return { "status": "error", "error": "No search criteria provided. Use study, organism, strategy, platform, source, or query.", } # Build esearch parameters params = { "db": self.db, "term": search_term, "retmode": "json", "retmax": arguments.get("limit", 20), "sort": arguments.get("sort", "relevance"), "usehistory": "y", # Store results on server for large queries } # Make request result = self._make_request("/esearch.fcgi", params) if result["status"] == "error": return result # Extract UIDs from esearch response data = result.get("data", {}) if isinstance(data, dict): esearch_result = data.get("esearchresult", {}) uids = esearch_result.get("idlist", []) count = int(esearch_result.get("count", 0)) return { "status": "success", "data": { "uids": uids, "count": count, "returned": len(uids), "search_term": search_term, }, "total_count": count, "url": result.get("url"), } else: return { "status": "error", "error": "Unexpected response format from NCBI", } except Exception as e: return {"status": "error", "error": f"Search failed: {str(e)}"}
[docs] def _get_run_info(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Get metadata for SRA run accessions via efetch XML.""" try: accessions = self._normalize_accessions(arguments) if not accessions: return { "status": "error", "error": "Missing required parameter: accessions", } # Build efetch parameters params = { "db": self.db, "id": ",".join(str(acc) for acc in accessions), "rettype": "full", "retmode": "xml", } # Make request result = self._make_request("/efetch.fcgi", params) if result["status"] == "error": return result # Parse XML response data = result.get("data", "") if isinstance(data, str): try: run_info = self._parse_sra_xml(data) return { "status": "success", "data": run_info, "count": len(run_info), "url": result.get("url"), } except Exception as e: return { "status": "error", "error": f"Failed to parse XML response: {str(e)}", "raw_data": data[:1000] if len(data) > 1000 else data, } else: return { "status": "error", "error": "Unexpected response format from NCBI", } except Exception as e: return {"status": "error", "error": f"Get run info failed: {str(e)}"}
[docs] def _parse_sra_xml(self, xml_data: str) -> list: """Parse SRA XML metadata to extract run information.""" try: root = ET.fromstring(xml_data) runs = [] # Find all EXPERIMENT_PACKAGE elements for exp_pkg in root.findall(".//EXPERIMENT_PACKAGE"): run_info = {} # Get RUN information run = exp_pkg.find(".//RUN") if run is not None: run_info["run_accession"] = run.get("accession", "") run_info["total_spots"] = run.get("total_spots", "") run_info["total_bases"] = run.get("total_bases", "") run_info["published"] = run.get("published", "") # Get EXPERIMENT information experiment = exp_pkg.find(".//EXPERIMENT") if experiment is not None: run_info["experiment_accession"] = experiment.get("accession", "") # Platform platform = experiment.find(".//PLATFORM") if platform is not None: for child in platform: run_info["platform"] = child.tag instrument = child.find(".//INSTRUMENT_MODEL") if instrument is not None: run_info["instrument"] = instrument.text # Library library = experiment.find(".//LIBRARY_DESCRIPTOR") if library is not None: lib_strategy = library.find("LIBRARY_STRATEGY") lib_source = library.find("LIBRARY_SOURCE") lib_selection = library.find("LIBRARY_SELECTION") lib_layout = library.find("LIBRARY_LAYOUT") if lib_strategy is not None: run_info["library_strategy"] = lib_strategy.text if lib_source is not None: run_info["library_source"] = lib_source.text if lib_selection is not None: run_info["library_selection"] = lib_selection.text if lib_layout is not None: run_info["library_layout"] = ( "PAIRED" if lib_layout.find("PAIRED") is not None else "SINGLE" ) # Get STUDY information study = exp_pkg.find(".//STUDY") if study is not None: run_info["study_accession"] = study.get("accession", "") study_title = study.find(".//STUDY_TITLE") if study_title is not None: run_info["study_title"] = study_title.text # Get SAMPLE information sample = exp_pkg.find(".//SAMPLE") if sample is not None: run_info["sample_accession"] = sample.get("accession", "") organism = sample.find(".//SCIENTIFIC_NAME") if organism is not None: run_info["organism"] = organism.text runs.append(run_info) return runs except ET.ParseError as e: raise Exception(f"XML parsing error: {str(e)}")
[docs] def _get_download_urls(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Get FTP, S3, and NCBI download URLs for SRA run accessions.""" try: accessions = self._normalize_accessions(arguments) if not accessions: return { "status": "error", "error": "Missing required parameter: accessions", } download_urls = [] _VALID_PREFIXES = ("SRR", "ERR", "DRR") _FTP_BASE = ( "ftp://ftp-trace.ncbi.nlm.nih.gov/sra/sra-instant/reads/ByRun/sra" ) for accession in accessions: if not any(accession.startswith(p) for p in _VALID_PREFIXES): download_urls.append( { "accession": accession, "error": "Invalid accession format. Must start with SRR, ERR, or DRR", } ) continue prefix = accession[:3] acc_num = accession[3:] subdir = acc_num[:6] if len(acc_num) >= 6 else acc_num.zfill(6) download_urls.append( { "accession": accession, "ftp_url": f"{_FTP_BASE}/{prefix}/{prefix}{subdir}/{accession}/{accession}.sra", "s3_url": f"s3://sra-pub-run-odp/sra/{accession}/{accession}", "ncbi_url": f"https://trace.ncbi.nlm.nih.gov/Traces/sra/?run={accession}", "note": "Use SRA Toolkit (fastq-dump or fasterq-dump) to convert SRA to FASTQ format", } ) return { "status": "success", "data": download_urls, "count": len(download_urls), } except Exception as e: return {"status": "error", "error": f"Get download URLs failed: {str(e)}"}
[docs] def _locate_run_files(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Resolve current cloud download locations + size/md5 via the SDL API. The legacy ftp-trace ByRun path is dead (HTTP 404). The SRA Data Locator (SDL) v2 service returns the verified object: real https S3/GCP link, exact byte size, md5 checksum, region, and modification date. """ try: accessions = self._normalize_accessions(arguments) if not accessions: return { "status": "error", "error": "Missing required parameter: accessions", } results = [] _VALID_PREFIXES = ("SRR", "ERR", "DRR") for accession in accessions: if not any(accession.startswith(p) for p in _VALID_PREFIXES): results.append( { "accession": accession, "error": "Invalid accession format. Must start with SRR, ERR, or DRR", } ) continue try: resp = requests.get( self._SDL_URL, params={"acc": accession}, timeout=self.timeout, ) resp.raise_for_status() payload = resp.json() except requests.exceptions.RequestException as exc: results.append( { "accession": accession, "error": f"SDL request failed: {str(exc)}", } ) continue except ValueError as exc: results.append( { "accession": accession, "error": f"SDL returned non-JSON response: {str(exc)}", } ) continue results.append(self._parse_sdl_result(accession, payload)) return { "status": "success", "data": results, "count": len(results), } except Exception as e: return {"status": "error", "error": f"Locate run files failed: {str(e)}"}
[docs] @staticmethod def _parse_sdl_result(accession: str, payload: Dict[str, Any]) -> Dict[str, Any]: """Parse one SDL v2 response into a structured per-run file record.""" bundles = payload.get("result", []) or [] if not bundles: return { "accession": accession, "error": "No result returned by SDL for this accession", } bundle = bundles[0] bundle_status = bundle.get("status") if bundle_status not in (200, "200", None): return { "accession": accession, "error": bundle.get("msg", f"SDL status {bundle_status}"), "status_code": bundle_status, } files = [] for f in bundle.get("files", []) or []: locations = [ { "service": loc.get("service"), "region": loc.get("region"), "link": loc.get("link"), } for loc in (f.get("locations") or []) ] files.append( { "name": f.get("name"), "type": f.get("type"), "size": f.get("size"), "md5": f.get("md5"), "modification_date": f.get("modificationDate"), "locations": locations, } ) return { "accession": accession, "files": files, "file_count": len(files), }