tooluniverse.ctd_tool 源代码

"""CTD (Comparative Toxicogenomics Database) tool — backed by the RENCI Automat
mirror of CTD's knowledge graph.

CTD's native batchQuery.go now requires an altcha proof-of-work CAPTCHA, so
direct programmatic access via that endpoint is blocked. This tool uses the
NIH-NCATS-Translator-funded mirror at https://automat.renci.org/ctd/ instead.
That mirror is **chemical-centric** (June-2024 snapshot, 26k nodes / 166k
edges, biolink-categorized predicates) — it covers chemical↔gene and
chemical↔disease cleanly, but does NOT contain CTD's gene-disease inferred
edges. Gene→disease queries return an honest error pointing at OpenTargets.
"""

import requests
from typing import Any, Dict, Optional

from .base_tool import BaseTool
from .tool_registry import register_tool

RENCI_BASE = "https://automat.renci.org/ctd"
RENCI_HEADERS = {"Accept": "application/json", "User-Agent": "ToolUniverse CTDTool"}
DATA_AS_OF = "2024-06"  # RENCI snapshot version

# Maps the existing tool configs' (input_type, report_type) to RENCI
# (source_category, target_category). The gene→disease entry is None
# because RENCI's CTD ingestion is chemical-centric (no gene-disease edges).
_TYPE_MAP = {
    ("chem", "genes_curated"): ("biolink:SmallMolecule", "biolink:Gene"),
    ("chem", "diseases_curated"): ("biolink:SmallMolecule", "biolink:Disease"),
    ("gene", "diseases_curated"): None,
    ("gene", "chems_curated"): ("biolink:Gene", "biolink:SmallMolecule"),
    ("disease", "chems_curated"): ("biolink:Disease", "biolink:SmallMolecule"),
}


[文档] @register_tool("CTDTool") class CTDTool(BaseTool): """Query CTD curated relationships via the RENCI Automat mirror."""
[文档] def __init__(self, tool_config: Dict[str, Any]): super().__init__(tool_config) fields = tool_config.get("fields") or {} self.input_type = fields.get("input_type", "chem") self.report_type = fields.get("report_type", "genes_curated") self.timeout = int(fields.get("timeout", 30))
[文档] def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]: try: return self._query(arguments) except requests.exceptions.Timeout: return { "status": "error", "error": f"RENCI CTD mirror timed out after {self.timeout}s", "metadata": {"backend": "RENCI Automat CTD", "data_as_of": DATA_AS_OF}, } except requests.exceptions.ConnectionError: return { "status": "error", "error": "Failed to connect to the RENCI CTD mirror. Check network.", "metadata": {"backend": "RENCI Automat CTD", "data_as_of": DATA_AS_OF}, } except requests.exceptions.HTTPError as e: return { "status": "error", "error": f"RENCI CTD mirror HTTP error: {e.response.status_code}", "metadata": {"backend": "RENCI Automat CTD", "data_as_of": DATA_AS_OF}, } except Exception as e: # noqa: BLE001 return { "status": "error", "error": f"Unexpected error querying RENCI CTD mirror: {e}", "metadata": {"backend": "RENCI Automat CTD", "data_as_of": DATA_AS_OF}, }
# -- internals -----------------------------------------------------------
[文档] def _query(self, arguments: Dict[str, Any]) -> Dict[str, Any]: input_terms = ( arguments.get("input_terms") or arguments.get("query") or arguments.get("gene_symbol") or arguments.get("chemical_name") or arguments.get("disease_name") or "" ).strip() if not input_terms: return {"status": "error", "error": "input_terms parameter is required"} input_type = arguments.get("input_type", self.input_type) report_type = arguments.get("report_type", self.report_type) metadata = { "backend": "RENCI Automat CTD", "data_as_of": DATA_AS_OF, "input_terms": input_terms, "input_type": input_type, "report_type": report_type, } if (input_type, report_type) == ("gene", "diseases_curated"): return { "status": "error", "error": ( "Gene→disease relationships are not available in the RENCI CTD " "mirror (chemical-centric snapshot, no gene-disease edges)." ), "suggestion": ( "Use OpenTargets_get_associated_diseases (live, more sources) " "or DGIdb_search_interactions for gene-disease associations." ), "metadata": metadata, } mapping = _TYPE_MAP.get((input_type, report_type)) if not mapping: return { "status": "error", "error": ( f"Unsupported query: input_type={input_type!r}, " f"report_type={report_type!r}." ), "metadata": metadata, } source_cat, target_cat = mapping canonical = self._resolve_curie(input_terms) if not canonical: return { "status": "error", "error": ( f"'{input_terms}' was not found in the RENCI CTD mirror " f"(searched by id, equivalent_identifiers, and name)." ), "metadata": metadata, } metadata["canonical_curie"] = canonical metadata["source_category"] = source_cat metadata["target_category"] = target_cat url = f"{RENCI_BASE}/{source_cat}/{target_cat}/{canonical}" resp = requests.get(url, headers=RENCI_HEADERS, timeout=self.timeout) resp.raise_for_status() raw_edges = resp.json() if not isinstance(raw_edges, list): return { "status": "error", "error": "RENCI CTD mirror returned unexpected (non-list) payload", "metadata": metadata, } formatted = [self._format_edge(e) for e in raw_edges] formatted = [e for e in formatted if e] metadata["total_results"] = len(formatted) return {"status": "success", "data": formatted, "metadata": metadata}
[文档] def _resolve_curie(self, term: str) -> Optional[str]: """Resolve an input (name or any CURIE) to the graph's canonical id. Uses RENCI's /cypher endpoint to search by `id`, `equivalent_identifiers`, or case-insensitive `name`. Returns the canonical `id` or None. """ safe = term.replace('"', "").replace("\\", "") query = ( 'MATCH (n) WHERE n.id = "' + safe + '" OR "' + safe + '" IN ' 'n.equivalent_identifiers OR toLower(n.name) = toLower("' + safe + '") ' "RETURN n.id AS id LIMIT 1" ) resp = requests.post( f"{RENCI_BASE}/cypher", headers={"Content-Type": "application/json"}, json={"query": query}, timeout=self.timeout, ) resp.raise_for_status() data = resp.json() try: return data["results"][0]["data"][0]["row"][0] except (KeyError, IndexError, TypeError): return None
[文档] @staticmethod def _format_edge(edge: Any) -> Optional[Dict[str, Any]]: """RENCI returns each edge as [source_node, edge_props, target_node].""" if not isinstance(edge, list) or len(edge) < 3: return None src, props, tgt = edge[0], edge[1], edge[2] s = src if isinstance(src, dict) else {} t = tgt if isinstance(tgt, dict) else {} p = props if isinstance(props, dict) else {} return { "source_id": s.get("id"), "source_name": s.get("name"), "target_id": t.get("id"), "target_name": t.get("name"), "predicate": p.get("predicate"), "qualified_predicate": p.get("qualified_predicate"), "object_direction_qualifier": p.get("object_direction_qualifier"), "knowledge_level": p.get("knowledge_level"), "agent_type": p.get("agent_type"), "primary_knowledge_source": p.get("primary_knowledge_source"), }