Source code for tooluniverse.pubmed_tool

import os
import requests
import time
import threading
from typing import Any, Dict, Optional
from .base_tool import BaseTool
from .base_rest_tool import BaseRESTTool
from .http_utils import request_with_retry
from .tool_registry import register_tool


[docs] @register_tool("PubMedRESTTool") class PubMedRESTTool(BaseRESTTool): """Generic REST tool for PubMed E-utilities (efetch, elink). Implements rate limiting per NCBI guidelines: - Without API key: 3 requests/second - With API key: 10 requests/second API key is read from environment variable NCBI_API_KEY. Get your free key at: https://www.ncbi.nlm.nih.gov/account/ """ # Class-level rate limiting (shared across all instances) _last_request_time = 0.0 _rate_limit_lock = threading.Lock()
[docs] def __init__(self, tool_config): super().__init__(tool_config) # Get API key from environment as fallback self.default_api_key = os.environ.get("NCBI_API_KEY", "")
[docs] def _get_param_mapping(self) -> Dict[str, str]: """Map PubMed E-utilities parameter names.""" return { "limit": "retmax", # limit -> retmax for E-utilities }
[docs] def _enforce_rate_limit(self, has_api_key: bool) -> None: """Enforce NCBI E-utilities rate limits. Args: has_api_key: Whether an API key is provided """ # Rate limits per NCBI guidelines # https://www.ncbi.nlm.nih.gov/books/NBK25497/#chapter2.Usage_Guidelines_and_Requiremen # Using conservative intervals to avoid rate limit errors: # - Without API key: 3 req/sec -> 0.4s interval (more conservative than 0.33s) # - With API key: 10 req/sec -> 0.15s interval (more conservative than 0.1s) min_interval = 0.15 if has_api_key else 0.4 with self._rate_limit_lock: current_time = time.time() time_since_last = current_time - PubMedRESTTool._last_request_time if time_since_last < min_interval: sleep_time = min_interval - time_since_last time.sleep(sleep_time) PubMedRESTTool._last_request_time = time.time()
[docs] def _build_params(self, args: Dict[str, Any]) -> Dict[str, Any]: """Build E-utilities parameters with special handling.""" params = {} # Start with default params from config (db, dbfrom, cmd, linkname, retmode, rettype) for key in ["db", "dbfrom", "cmd", "linkname", "retmode", "rettype"]: if key in self.tool_config["fields"]: params[key] = self.tool_config["fields"][key] # Handle PMID as 'id' parameter (for efetch, elink) if "pmid" in args: params["id"] = args["pmid"] # Handle query as 'term' parameter (for esearch) if "query" in args: params["term"] = args["query"] # Add API key from environment variable if self.default_api_key: params["api_key"] = self.default_api_key # Handle limit if "limit" in args and args["limit"]: params["retmax"] = args["limit"] # Set retmode to json for elink and esearch (easier parsing) endpoint = self.tool_config["fields"]["endpoint"] if "retmode" not in params and ("elink" in endpoint or "esearch" in endpoint): params["retmode"] = "json" return params
[docs] def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """ PubMed E-utilities need special handling for direct endpoint URLs. Enforces NCBI rate limits to prevent API errors. """ url = None try: # Enforce rate limiting before making request has_api_key = bool(self.default_api_key) self._enforce_rate_limit(has_api_key) endpoint = self.tool_config["fields"]["endpoint"] params = self._build_params(arguments) response = request_with_retry( self.session, "GET", endpoint, params=params, timeout=self.timeout, max_attempts=3, ) if response.status_code != 200: return { "status": "error", "error": "PubMed API error", "url": response.url, "status_code": response.status_code, "detail": (response.text or "")[:500], } # Try JSON first (elink, esearch) try: data = response.json() # Check for API errors in response if "ERROR" in data: error_msg = data.get("ERROR", "Unknown API error") return { "status": "error", "data": f"NCBI API error: {error_msg[:200]}", "url": response.url, } # For esearch responses, extract ID list if "esearchresult" in data: esearch_result = data.get("esearchresult", {}) id_list = esearch_result.get("idlist", []) return { "status": "success", "data": id_list, "count": len(id_list), "total_count": int(esearch_result.get("count", 0)), "url": response.url, } # For elink responses with LinkOut URLs (llinks command) if "linksets" in data: linksets = data.get("linksets", []) # Check for empty linksets with errors if not linksets or (linksets and len(linksets) == 0): return { "status": "success", "data": [], "count": 0, "url": response.url, } if linksets and len(linksets) > 0: linkset = linksets[0] # Extract linked IDs if "linksetdbs" in linkset: linksetdbs = linkset.get("linksetdbs", []) if linksetdbs and len(linksetdbs) > 0: links = linksetdbs[0].get("links", []) return { "status": "success", "data": links, "count": len(links), "url": response.url, } # Extract LinkOut URLs (idurllist) elif "idurllist" in linkset: return { "status": "success", "data": linkset.get("idurllist", {}), "url": response.url, } # For elink responses with LinkOut URLs (llinks returns direct idurllist) if "idurllist" in data: return { "status": "success", "data": data.get("idurllist", []), "url": response.url, } return { "status": "success", "data": data, "url": response.url, } except Exception: # For XML responses (efetch), return as text return { "status": "success", "data": response.text, "url": response.url, } except Exception as e: return { "status": "error", "error": f"PubMed API error: {str(e)}", "url": url, }