tooluniverse.epmc_annotations_tool 源代码

"""
Europe PMC Annotations API tool for ToolUniverse.

The Europe PMC Annotations API provides text-mined annotations from biomedical
literature, including gene/protein mentions, diseases, organisms, chemicals,
and Gene Ontology terms automatically extracted from full-text articles.

API: https://www.ebi.ac.uk/europepmc/annotations_api/
No authentication required.

Documentation: https://europepmc.org/AnnotationsApi
"""

import requests
from typing import Any

from .base_rest_tool import BaseRESTTool
from .tool_registry import register_tool

ANNOTATIONS_BASE = "https://www.ebi.ac.uk/europepmc/annotations_api"


[文档] @register_tool("EPMCAnnotationsTool") class EPMCAnnotationsTool(BaseRESTTool): """ Tool for retrieving text-mined annotations from Europe PMC articles. Provides: - Get genes/proteins mentioned in a paper - Get diseases mentioned in a paper - Get chemicals/drugs mentioned in a paper - Get organisms mentioned in a paper - Get GO terms mentioned in a paper Uses PubMed IDs (PMID) or PMC IDs. No authentication required. """
[文档] def __init__(self, tool_config: dict): super().__init__(tool_config) self.timeout = 30 self.operation = tool_config.get("fields", {}).get( "operation", "get_annotations" )
[文档] def run(self, arguments: dict) -> dict: """Execute the Annotations API call.""" try: return self._query(arguments) except requests.exceptions.Timeout: return { "status": "error", "error": f"EPMC Annotations request timed out after {self.timeout}s", } except requests.exceptions.ConnectionError: return { "status": "error", "error": "Failed to connect to Europe PMC Annotations API.", } except Exception as e: return { "status": "error", "error": f"EPMC Annotations error: {str(e)}", }
[文档] def _query(self, arguments: dict) -> dict: """Route to the appropriate operation.""" op = self.operation if op == "get_annotations": return self._get_annotations(arguments) elif op == "get_genes": return self._get_entity_type(arguments, "Gene_Proteins") elif op == "get_diseases": return self._get_entity_type(arguments, "Diseases") elif op == "get_chemicals": return self._get_entity_type(arguments, "Chemicals") elif op == "get_organisms": return self._get_entity_type(arguments, "Organisms") return {"status": "error", "error": f"Unknown operation: {op}"}
[文档] def _get_annotations(self, arguments: dict) -> dict: """Get all text-mined annotations from an article.""" pmid = str(arguments.get("pmid", "")).strip() pmcid = str(arguments.get("pmcid", "")).strip() annotation_type = arguments.get("annotation_type", "").strip() if not pmid and not pmcid: return { "status": "error", "error": "Either pmid (e.g., '33332779') or pmcid (e.g., 'PMC7781101') is required.", } # Build article ID if pmid: article_id = f"MED:{pmid}" else: article_id = f"PMC:{pmcid.replace('PMC', '')}" params: dict[str, str] = { "articleIds": article_id, "format": "JSON", } if annotation_type: params["type"] = annotation_type resp = requests.get( f"{ANNOTATIONS_BASE}/annotationsByArticleIds", params=params, timeout=self.timeout, ) resp.raise_for_status() data = resp.json() if not data: return { "status": "success", "data": { "article_id": article_id, "annotations": [], "annotation_counts": {}, }, "metadata": { "source": "Europe PMC Annotations", }, } # Parse annotations from first result article = data[0] if isinstance(data, list) else data raw_annotations = article.get("annotations", []) # Group by type and deduplicate by_type: dict[str, dict[str, dict]] = {} for ann in raw_annotations: ann_type = ann.get("type", "Unknown") exact = ann.get("exact", "") if ann_type not in by_type: by_type[ann_type] = {} if exact and exact not in by_type[ann_type]: tags = ann.get("tags", []) uri = tags[0].get("uri", "") if tags else "" tag_name = tags[0].get("name", "") if tags else "" by_type[ann_type][exact] = { "text": exact, "tag_name": tag_name, "uri": uri, "count": 1, } elif exact in by_type.get(ann_type, {}): by_type[ann_type][exact]["count"] += 1 # Build structured output annotation_groups = {} annotation_counts = {} for ann_type, entities in by_type.items(): sorted_entities = sorted(entities.values(), key=lambda x: -x["count"]) annotation_groups[ann_type] = sorted_entities[:30] annotation_counts[ann_type] = len(entities) return { "status": "success", "data": { "article_id": article_id, "pmcid": article.get("pmcid"), "annotations": annotation_groups, "annotation_counts": annotation_counts, }, "metadata": { "source": "Europe PMC Annotations", "description": ( "Text-mined annotations from biomedical literature. " "Types include Gene_Proteins, Diseases, Chemicals, Organisms, " "Gene_Ontology, EFO. count = number of mentions in the article." ), }, }
[文档] def _get_entity_type(self, arguments: dict, entity_type: str) -> dict: """Get annotations of a specific type from an article.""" # Inject annotation_type and delegate to get_annotations arguments = dict(arguments) arguments["annotation_type"] = entity_type result = self._get_annotations(arguments) # Filter to just the requested type if result.get("status") == "success": all_annotations = result["data"].get("annotations", {}) type_annotations = all_annotations.get(entity_type, []) result["data"]["annotations"] = type_annotations result["data"]["total_entities"] = len(type_annotations) result["data"]["entity_type"] = entity_type return result