Source code for tooluniverse.ndex_tool

# ndex_tool.py
"""
NDEx (Network Data Exchange) API tool for ToolUniverse.

NDEx is a public repository and sharing platform for biological network models.
It hosts thousands of curated biological networks (protein-protein interaction,
signaling, metabolic, gene regulatory) from published studies.

API: https://public.ndexbio.org/v2/
No authentication required for public networks.
"""

import requests
from typing import Dict, Any
from .base_tool import BaseTool
from .tool_registry import register_tool

NDEX_BASE_URL = "https://public.ndexbio.org/v2"


[docs] @register_tool("NDExTool") class NDExTool(BaseTool): """ Tool for querying the NDEx biological network repository. NDEx provides access to thousands of published biological networks including protein-protein interaction (PPI), signaling pathways, gene regulatory networks, and metabolic networks. Networks are contributed by research groups and databases like NCI-PID, SIGNOR, and individual labs. Supports: network search, network summary, network content retrieval. No authentication required for public networks. """
[docs] def __init__(self, tool_config: Dict[str, Any]): super().__init__(tool_config) self.timeout = tool_config.get("timeout", 30) fields = tool_config.get("fields", {}) self.endpoint = fields.get("endpoint", "search")
[docs] def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Execute the NDEx API call.""" try: return self._query(arguments) except requests.exceptions.Timeout: return {"error": f"NDEx API timed out after {self.timeout}s"} except requests.exceptions.ConnectionError: return {"error": "Failed to connect to NDEx API"} except requests.exceptions.HTTPError as e: return {"error": f"NDEx API HTTP error: {e.response.status_code}"} except Exception as e: return {"error": f"Unexpected error querying NDEx: {str(e)}"}
[docs] def _query(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Route to appropriate NDEx endpoint.""" if self.endpoint == "search": return self._search_networks(arguments) elif self.endpoint == "get_summary": return self._get_network_summary(arguments) elif self.endpoint == "get_network": return self._get_network(arguments) else: return {"error": f"Unknown endpoint: {self.endpoint}"}
[docs] def _search_networks(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Search NDEx for biological networks by keyword.""" query = arguments.get("query", "") if not query: return {"error": "query parameter is required"} size = arguments.get("size") or 10 start = arguments.get("start") or 0 url = f"{NDEX_BASE_URL}/search/network" params = {"start": start, "size": min(size, 100)} payload = {"searchString": query} response = requests.post(url, params=params, json=payload, timeout=self.timeout) response.raise_for_status() data = response.json() networks = [] for n in data.get("networks", []): networks.append( { "uuid": n.get("externalId"), "name": n.get("name"), "description": (n.get("description") or "")[:300], "owner": n.get("owner"), "node_count": n.get("nodeCount"), "edge_count": n.get("edgeCount"), "visibility": n.get("visibility"), "doi": n.get("doi"), "version": n.get("version"), } ) return { "data": networks, "metadata": { "source": "NDEx (Network Data Exchange)", "total_results": data.get("numFound", len(networks)), "query": query, "start": start, }, }
[docs] def _get_network_summary(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Get summary information for a specific network by UUID.""" uuid = arguments.get("uuid", "") if not uuid: return {"error": "uuid parameter is required"} url = f"{NDEX_BASE_URL}/network/{uuid}/summary" response = requests.get(url, timeout=self.timeout) response.raise_for_status() data = response.json() # Extract properties properties = {} for p in data.get("properties", []): properties[p.get("predicateString", "")] = str(p.get("value", ""))[:300] return { "data": { "uuid": data.get("externalId"), "name": data.get("name"), "description": (data.get("description") or "")[:500], "owner": data.get("owner"), "node_count": data.get("nodeCount"), "edge_count": data.get("edgeCount"), "visibility": data.get("visibility"), "doi": data.get("doi"), "version": data.get("version"), "creation_time": data.get("creationTime"), "modification_time": data.get("modificationTime"), "properties": properties, }, "metadata": { "source": "NDEx (Network Data Exchange)", }, }
[docs] def _get_network(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Get network content (nodes and edges) in CX format.""" uuid = arguments.get("uuid", "") if not uuid: return {"error": "uuid parameter is required"} url = f"{NDEX_BASE_URL}/network/{uuid}" response = requests.get(url, timeout=self.timeout) response.raise_for_status() data = response.json() # Parse CX format to extract nodes and edges nodes = [] edges = [] network_attrs = {} for aspect in data: if "nodes" in aspect: for node in aspect["nodes"]: nodes.append( { "id": node.get("@id"), "name": node.get("n"), "represents": node.get("r"), } ) elif "edges" in aspect: for edge in aspect["edges"]: edges.append( { "id": edge.get("@id"), "source": edge.get("s"), "target": edge.get("t"), "interaction": edge.get("i"), } ) elif "networkAttributes" in aspect: for attr in aspect["networkAttributes"]: network_attrs[attr.get("n", "")] = str(attr.get("v", ""))[:200] # Limit output size max_nodes = 200 max_edges = 500 return { "data": { "network_name": network_attrs.get("name", ""), "nodes": nodes[:max_nodes], "edges": edges[:max_edges], "total_nodes": len(nodes), "total_edges": len(edges), "truncated_nodes": len(nodes) > max_nodes, "truncated_edges": len(edges) > max_edges, }, "metadata": { "source": "NDEx (Network Data Exchange)", "uuid": uuid, "format": "CX", }, }