tooluniverse.ols_tool 源代码
"""OLS API tool for ToolUniverse.
This module exposes the Ontology Lookup Service (OLS) endpoints that were
previously available through the dedicated MCP server. The MCP tooling has been
adapted into a synchronous local tool that fits the ToolUniverse runtime.
"""
from __future__ import annotations
import urllib.parse
from typing import Any, Dict, List, Optional
import requests
from pydantic import BaseModel, Field, HttpUrl, ValidationError
from .base_tool import BaseTool
from .tool_registry import register_tool
OLS_BASE_URL = "https://www.ebi.ac.uk/ols4"
REQUEST_TIMEOUT = 30.0 # 30 second timeout to prevent hanging on slow API responses
def url_encode_iri(iri: str) -> str:
"""Double URL encode an IRI as required by the OLS API."""
return urllib.parse.quote(urllib.parse.quote(iri, safe=""), safe="")
def _expand_short_term_id(term_id: str) -> str:
"""Convert short ontology IDs (e.g. GO:0006338) to full OBO IRIs.
Handles standard OBO ontologies (GO, HP, MONDO, CHEBI, etc.).
EFO and other non-OBO ontologies keep their original form.
"""
if not term_id or term_id.startswith("http"):
return term_id
if ":" in term_id:
prefix, local = term_id.split(":", 1)
return f"http://purl.obolibrary.org/obo/{prefix}_{local}"
return term_id
def _infer_ontology_from_term_id(term_id: str) -> str:
"""Infer OLS ontology identifier from a CURIE prefix (e.g. 'HP:0001234' → 'hp')."""
if term_id and ":" in term_id and not term_id.startswith("http"):
return term_id.split(":", 1)[0].lower()
return ""
[文档]
class OntologyInfo(BaseModel):
"""Description of a single ontology entry in OLS."""
id: str = Field(
..., description="Unique identifier for the ontology", alias="ontologyId"
)
title: str = Field(..., description="Name of the ontology")
version: Optional[str] = Field(None, description="Version of the ontology")
description: Optional[str] = Field(None, description="Description of the ontology")
domain: Optional[str] = Field(None, description="Domain of the ontology")
homepage: Optional[HttpUrl] = Field(None, description="URL for the ontology")
preferred_prefix: Optional[str] = Field(
None, description="Preferred prefix for the ontology", alias="preferredPrefix"
)
number_of_terms: Optional[int] = Field(
None, description="Number of terms in the ontology"
)
number_of_classes: Optional[int] = Field(
None, description="Number of classes in the ontology", alias="numberOfClasses"
)
repository: Optional[HttpUrl] = Field(
None, description="Repository URL for the ontology"
)
[文档]
class PagedResponse(BaseModel):
"""Base structure for paginated responses returned by OLS."""
total_elements: int = Field(
0, description="Total number of items", alias="totalElements"
)
page: int = Field(0, description="Current page number")
size: int = Field(
20, description="Number of items in current page", alias="numElements"
)
total_pages: int = Field(0, description="Total number of pages", alias="totalPages")
[文档]
class OntologySearchResponse(PagedResponse):
"""Paginated collection of ontologies returned by the search endpoint."""
ontologies: List[OntologyInfo] = Field(
..., description="List of ontologies matching the search criteria"
)
[文档]
class TermInfo(BaseModel):
"""Basic term representation returned by OLS."""
model_config = {"populate_by_name": True}
iri: HttpUrl = Field(..., description="IRI of the term")
ontology_name: str = Field(
...,
description="Name of the ontology containing the term",
alias="ontologyName",
)
short_form: str = Field(
..., description="Short form identifier for the term", alias="shortForm"
)
label: str = Field(..., description="Human-readable label for the term")
obo_id: Optional[str] = Field(
None, description="OBOLibrary ID for the term", alias="oboId"
)
is_obsolete: Optional[bool] = Field(
False, description="Indicates if the term is obsolete", alias="isObsolete"
)
[文档]
class TermSearchResponse(PagedResponse):
"""Paginated set of OLS terms."""
num_found: int = Field(
0, description="Total number of terms found", alias="numFound"
)
terms: List[TermInfo] = Field(
..., description="List of terms matching the search criteria"
)
[文档]
class DetailedTermInfo(TermInfo):
"""Extended term details in OLS."""
description: Optional[List[str]] = Field(None, description="Definition of the term")
synonyms: Optional[List[str]] = Field(
None, description="List of synonyms for the term"
)
[文档]
@register_tool("OLSTool")
class OLSTool(BaseTool):
"""Interact with the EMBL-EBI Ontology Lookup Service (OLS) REST API."""
_OPERATIONS = {
"search_terms": "_handle_search_terms",
"get_ontology_info": "_handle_get_ontology_info",
"search_ontologies": "_handle_search_ontologies",
"get_term_info": "_handle_get_term_info",
"get_term_children": "_handle_get_term_children",
"get_term_ancestors": "_handle_get_term_ancestors",
"find_similar_terms": "_handle_find_similar_terms",
}
[文档]
def __init__(self, tool_config):
super().__init__(tool_config)
self.base_url = tool_config.get("base_url", OLS_BASE_URL).rstrip("/")
self.timeout = tool_config.get("timeout", REQUEST_TIMEOUT)
self.session = requests.Session()
def __del__(self):
try:
self.session.close()
except Exception:
pass
[文档]
def run(self, arguments=None, **_: Any):
"""Dispatch the requested OLS operation."""
arguments = arguments or {}
operation = arguments.get("operation")
# Auto-fill operation from tool config const if not provided by user
if not operation:
operation = self.get_schema_const_operation()
if not operation:
return {
"status": "error",
"error": "`operation` argument is required.",
"available_operations": sorted(self._OPERATIONS.keys()),
}
handler_name = self._OPERATIONS.get(operation)
if not handler_name:
return {
"status": "error",
"error": f"Unsupported operation '{operation}'.",
"available_operations": sorted(self._OPERATIONS.keys()),
}
handler = getattr(self, handler_name)
try:
result = handler(arguments)
if isinstance(result, dict) and "status" not in result:
return {"status": "success", **result}
return result
except requests.RequestException as exc:
return {"status": "error", "error": str(exc)}
except ValidationError as exc:
return {
"status": "error",
"error": "Failed to validate OLS response.",
"details": exc.errors(),
}
[文档]
def _handle_search_terms(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
query = arguments.get("query")
if not query:
return {
"status": "error",
"error": "`query` parameter is required for `search_terms`.",
}
rows = int(
arguments.get("rows")
or arguments.get("limit")
or arguments.get("size")
or 10
)
ontology = arguments.get("ontology")
exact_match = bool(arguments.get("exact_match", False))
include_obsolete = bool(arguments.get("include_obsolete", False))
params = {
"q": query,
"rows": rows,
"start": 0,
"exact": exact_match,
"obsoletes": include_obsolete,
}
if ontology:
params["ontology"] = ontology
data = self._get_json("/api/search", params=params)
# OLS /api/search returns a Solr-style envelope: {"response": {"docs": [...], "numFound": N}, ...}
# Extract docs and numFound directly to avoid returning noisy facet_counts.
solr_response = data.get("response") if isinstance(data, dict) else None
if isinstance(solr_response, dict) and "docs" in solr_response:
docs = solr_response.get("docs", [])
num_found = solr_response.get("numFound", len(docs))
term_models = [self._build_term_model(item) for item in docs[:rows]]
term_models = [m for m in term_models if m is not None]
formatted: Dict[str, Any] = {
"terms": [
m.model_dump(by_alias=True, mode="json") for m in term_models
],
"total_items": num_found,
"showing": len(term_models),
}
else:
formatted = self._format_term_collection(data, rows)
formatted["query"] = query
formatted["filters"] = {
"ontology": ontology,
"exact_match": exact_match,
"include_obsolete": include_obsolete,
}
return formatted
[文档]
def _handle_get_ontology_info(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
# Feature-120A-003: accept 'ontology' alias for consistency with other OLS tools
ontology_id = arguments.get("ontology_id") or arguments.get("ontology")
if not ontology_id:
return {
"status": "error",
"error": "`ontology_id` (or `ontology`) is required. E.g. 'mondo', 'hp', 'go'.",
}
data = self._get_json(f"/api/v2/ontologies/{ontology_id}")
ontology = OntologyInfo.model_validate(data)
# Convert HttpUrl objects to strings for JSON compatibility
result = ontology.model_dump(by_alias=True, mode="json")
return result
[文档]
def _handle_search_ontologies(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
search = arguments.get("search")
page = int(arguments.get("page", 0))
size = int(arguments.get("size", 20))
params: Dict[str, Any] = {"page": page, "size": size}
if search:
params["search"] = search
data = self._get_json("/api/v2/ontologies", params=params)
# Feature-120A-001: OLS v4 returns ontologies in top-level 'elements', not '_embedded'
ontologies = data.get(
"elements", data.get("_embedded", {}).get("ontologies", [])
)
validated: List[Dict[str, Any]] = []
for item in ontologies:
try:
validated.append(
OntologyInfo.model_validate(item).model_dump(
by_alias=True, mode="json"
)
)
except ValidationError:
continue
return {
"status": "success",
"results": validated or ontologies,
"pagination": {
"page": page,
"size": size,
"total_pages": data.get("totalPages", 0),
"total_items": data.get("totalElements", len(ontologies)),
},
"search": search,
}
[文档]
def _handle_get_term_info(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
# Feature-111A-003: term_iri alias for id (consistent with sibling OLS tools)
identifier = (
arguments.get("id") or arguments.get("term_id") or arguments.get("term_iri")
)
if not identifier:
return {
"status": "error",
"error": "`id` parameter is required for `get_term_info`. Use HP:0001903 style IDs.",
}
# Use ontology-specific endpoint when a CURIE prefix is known (e.g. GO:, HP:)
# to avoid getting a term from an importing ontology (e.g. bcgo) instead of canonical source.
ontology = arguments.get("ontology") or _infer_ontology_from_term_id(identifier)
terms = None
if ontology:
data = self._get_json(
f"/api/ontologies/{ontology}/terms", params={"obo_id": identifier}
)
embedded = data.get("_embedded", {})
terms = embedded.get("terms") if isinstance(embedded, dict) else None
if not terms:
data = self._get_json("/api/terms", params={"id": identifier})
embedded = data.get("_embedded", {})
terms = embedded.get("terms") if isinstance(embedded, dict) else None
if not terms:
return {
"status": "error",
"error": f"Term with ID '{identifier}' was not found in OLS.",
}
# Normalize the term data before validation
term_data = terms[0]
if "ontologyId" in term_data and "ontologyName" not in term_data:
term_data["ontologyName"] = term_data["ontologyId"]
term = DetailedTermInfo.model_validate(term_data)
# Convert HttpUrl objects to strings for JSON compatibility
return term.model_dump(by_alias=True, mode="json")
[文档]
def _handle_get_term_children(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
raw_term_id = arguments.get("term_iri") or arguments.get("term_id", "")
term_iri = _expand_short_term_id(raw_term_id)
ontology = arguments.get("ontology") or _infer_ontology_from_term_id(
raw_term_id
)
if not term_iri or not ontology:
return {
"status": "error",
"error": "`term_iri` (or `term_id`) and `ontology` are required for `get_term_children`. Tip: if you pass `term_id` like 'HP:0001234', the ontology is inferred automatically.",
}
include_obsolete = bool(arguments.get("include_obsolete", False))
size = int(arguments.get("size", 20))
encoded = url_encode_iri(term_iri)
params = {
"page": 0,
"size": size,
"includeObsoleteEntities": include_obsolete,
}
data = self._get_json(
f"/api/v2/ontologies/{ontology}/classes/{encoded}/children", params=params
)
formatted = self._format_term_collection(data, size)
formatted["term_iri"] = term_iri
formatted["ontology"] = ontology
formatted["filters"] = {"include_obsolete": include_obsolete}
return formatted
[文档]
def _handle_get_term_ancestors(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
raw_term_id = arguments.get("term_iri") or arguments.get("term_id", "")
term_iri = _expand_short_term_id(raw_term_id)
ontology = arguments.get("ontology") or _infer_ontology_from_term_id(
raw_term_id
)
if not term_iri or not ontology:
return {
"status": "error",
"error": "`term_iri` (or `term_id`) and `ontology` are required for `get_term_ancestors`. Tip: if you pass `term_id` like 'HP:0001234', the ontology is inferred automatically.",
}
include_obsolete = bool(arguments.get("include_obsolete", False))
size = int(arguments.get("size", 20))
encoded = url_encode_iri(term_iri)
params = {
"page": 0,
"size": size,
"includeObsoleteEntities": include_obsolete,
}
data = self._get_json(
f"/api/v2/ontologies/{ontology}/classes/{encoded}/ancestors", params=params
)
formatted = self._format_term_collection(data, size)
formatted["term_iri"] = term_iri
formatted["ontology"] = ontology
formatted["filters"] = {"include_obsolete": include_obsolete}
return formatted
[文档]
def _handle_find_similar_terms(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
# Feature-120A-002: /llm_similar does not exist in OLS v4; use text search instead.
# Resolve the term to get its label, then search within the ontology.
term_iri = _expand_short_term_id(
arguments.get("term_iri") or arguments.get("term_id", "")
)
ontology = arguments.get("ontology")
if not term_iri or not ontology:
return {
"status": "error",
"error": "`term_iri` (or `term_id`) and `ontology` are required for `find_similar_terms`.",
}
size = int(arguments.get("size", 10))
# Step 1: get the term's label to use as search query
label = ""
try:
encoded = url_encode_iri(term_iri)
term_data = self._get_json(
f"/api/v2/ontologies/{ontology}/classes/{encoded}"
)
label = term_data.get("label", "")
except Exception:
pass
if not label:
return {
"status": "error",
"error": f"Could not retrieve label for term '{term_iri}' in ontology '{ontology}'. Verify the term ID is correct.",
}
# Step 2: search within the ontology for terms with similar labels
params = {"q": label, "ontology": ontology, "type": "class", "rows": size + 1}
data = self._get_json("/api/search", params=params)
docs = data.get("response", {}).get("docs", [])
# Exclude the query term itself
similar = [d for d in docs if d.get("iri") != term_iri][:size]
return {
"status": "success",
"term_iri": term_iri,
"source_label": label,
"ontology": ontology,
"similar_terms": similar,
"total": len(similar),
"note": "Results via text search (OLS v4 semantic similarity endpoint unavailable).",
}
[文档]
def _get_json(
self, path: str, params: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Make a GET request to the OLS API and return JSON response.
Args:
path: API endpoint path
params: Optional query parameters
Returns:
JSON response as dictionary
Raises:
requests.RequestException: On network errors or timeouts
requests.HTTPError: On HTTP errors (4xx, 5xx)
"""
url = f"{self.base_url}{path}"
try:
response = self.session.get(url, params=params, timeout=self.timeout)
if response.status_code == 503:
raise requests.RequestException(
"EBI OLS4 service is temporarily unavailable (HTTP 503). "
"Try again later. Alternatives: search HPO phenotypes via "
"Orphanet_get_phenotypes, or look up disease terms via "
"Orphanet_search_by_name or EuropePMC."
)
response.raise_for_status()
return response.json()
except requests.Timeout as e:
raise requests.RequestException(
f"OLS API request timed out after {self.timeout}s: {url}"
) from e
except requests.RequestException as e:
raise requests.RequestException(
f"OLS API request failed for {url}: {str(e)}"
) from e
[文档]
def _format_term_collection(
self, data: Dict[str, Any], size: int
) -> Dict[str, Any]:
elements: Optional[List[Dict[str, Any]]] = None
if isinstance(data, dict):
if "elements" in data and isinstance(data["elements"], list):
elements = data["elements"]
else:
embedded = data.get("_embedded")
if isinstance(embedded, dict):
# OLS responses can use different embedded keys depending on endpoint/version.
# Keep this list conservative but inclusive for OLS4 v2 term hierarchy endpoints.
for key in ("terms", "children", "ancestors", "classes"):
if key in embedded and isinstance(embedded[key], list):
elements = embedded[key]
break
if elements is None:
candidates = [
value
for value in embedded.values()
if isinstance(value, list)
]
if candidates:
elements = candidates[0]
if not elements:
return data if isinstance(data, dict) else {"items": data}
limited = elements[:size]
term_models = [self._build_term_model(item) for item in limited]
term_models = [model for model in term_models if model is not None]
total = (
data.get("totalElements")
or data.get("page", {}).get("totalElements")
or len(elements)
)
result: Dict[str, Any] = {
"terms": [
model.model_dump(by_alias=True, mode="json") for model in term_models
],
"total_items": total,
"showing": len(term_models),
}
page_info = data.get("page") if isinstance(data, dict) else None
if isinstance(page_info, dict):
result["pagination"] = {
"page": page_info.get("number", 0),
"size": page_info.get("size", len(limited)),
"total_pages": page_info.get("totalPages", 0),
"total_items": page_info.get("totalElements", total),
}
return result
[文档]
@staticmethod
def _build_term_model(item: Dict[str, Any]) -> Optional[TermInfo]:
# OLS4 v2 endpoints may represent the identifier as `iri`, `@id`, or `id`.
iri = item.get("iri") or item.get("@id") or item.get("id")
# OLS4 v2 often returns `label` as a list (e.g. ["lymphocyte"]).
label = item.get("label")
if isinstance(label, list):
label = next(
(val for val in label if isinstance(val, str) and val.strip()), ""
)
elif not isinstance(label, str):
label = ""
# Prefer CURIE if present (more human-friendly), otherwise fall back to shortForm.
short_form = (
item.get("curie") or item.get("shortForm") or item.get("short_form") or ""
)
payload = {
"iri": iri,
"ontology_name": item.get("ontologyName")
or item.get("ontology_name")
or item.get("ontologyId")
or "",
"short_form": short_form,
"label": label,
"oboId": item.get("oboId")
or item.get("obo_id")
or item.get("curie")
or short_form
or None,
"isObsolete": item.get("isObsolete") or item.get("is_obsolete", False),
}
if not payload["iri"]:
return None
try:
return TermInfo.model_validate(payload)
except ValidationError:
return None
__all__ = [
"OLSTool",
"OntologyInfo",
"OntologySearchResponse",
"TermInfo",
"TermSearchResponse",
"DetailedTermInfo",
"url_encode_iri",
]