Source code for tooluniverse.core_tool

#!/usr/bin/env python3
"""
CORE API Tool for searching open access academic papers.

CORE is the world's largest collection of open access research papers.
This tool provides access to over 200 million open access papers from
repositories and journals worldwide.
"""

import requests
import os
import re
import tempfile
import io
from urllib.parse import urljoin
from typing import Dict, List, Any, Optional
from .base_tool import BaseTool
from .tool_registry import register_tool
from .http_utils import request_with_retry

try:
    from markitdown import MarkItDown

    MARKITDOWN_AVAILABLE = True
except Exception:
    MARKITDOWN_AVAILABLE = False
    MarkItDown = None

try:
    import fitz  # PyMuPDF

    FITZ_AVAILABLE = True
except Exception:
    FITZ_AVAILABLE = False
    fitz = None

try:
    from pypdf import PdfReader

    PYPDF_AVAILABLE = True
except Exception:
    PYPDF_AVAILABLE = False
    PdfReader = None


[docs] @register_tool("CoreTool") class CoreTool(BaseTool): """Tool for searching CORE open access academic papers."""
[docs] def __init__(self, tool_config=None): super().__init__(tool_config) self.base_url = "https://api.core.ac.uk/v3" self.session = requests.Session() self.session.headers.update( {"User-Agent": "ToolUniverse/1.0", "Accept": "application/json"} )
[docs] def _extract_authors(self, authors: List[Dict]) -> List[str]: """Extract author names from CORE API response.""" if not authors: return [] author_names = [] for author in authors: name = author.get("name", "") if name: author_names.append(name) return author_names
[docs] def _extract_year(self, published_date: str) -> str: """Extract year from published date.""" if not published_date: return "Unknown" try: # CORE API returns dates in ISO format return published_date[:4] except Exception: return "Unknown"
[docs] def run(self, tool_arguments) -> List[Dict[str, Any]]: """ Execute the CORE search. Args: tool_arguments: Dictionary containing search parameters Returns List of paper dictionaries """ query = ( tool_arguments.get("query") or tool_arguments.get("search") or tool_arguments.get("q") or "" ) if not query: return [{"error": "Query parameter is required"}] limit = tool_arguments.get( "limit", tool_arguments.get("page_size", tool_arguments.get("max_results", 10)), ) year_from = tool_arguments.get("year_from") year_to = tool_arguments.get("year_to") language = tool_arguments.get("language") return self._search( query=query, limit=limit, year_from=year_from, year_to=year_to, language=language, )
[docs] @register_tool("CorePDFSnippetsTool") class CorePDFSnippetsTool(BaseTool): """ Fetch an open-access PDF (commonly returned by CORE) and return bounded text snippets around user-provided terms. Extraction backends (fastest first when extractor="auto"): - PyMuPDF (fitz) - pypdf - markitdown """
[docs] def __init__(self, tool_config): super().__init__(tool_config) self.session = requests.Session() self.session.headers.update( { "Accept": "application/pdf, */*;q=0.8", "User-Agent": "ToolUniverse/1.0 (CORE pdf client; contact: support@tooluniverse.ai)", } ) self.md_converter = MarkItDown() if MARKITDOWN_AVAILABLE else None
[docs] def run(self, arguments) -> Dict[str, Any]: pdf_url = arguments.get("pdf_url") or arguments.get("url") terms = arguments.get("terms") extractor = str(arguments.get("extractor") or "auto").strip().lower() if not isinstance(terms, list) or not [ t for t in terms if isinstance(t, str) and t.strip() ]: return { "status": "error", "error": "`terms` must be a non-empty list of strings.", "retryable": False, } if not isinstance(pdf_url, str) or not pdf_url.strip(): return { "status": "error", "error": "Provide `pdf_url` (preferred) or `url` (alias) pointing to a PDF.", "retryable": False, } pdf_url = pdf_url.strip() if extractor not in {"auto", "fitz", "pypdf", "markitdown"}: return { "status": "error", "error": "`extractor` must be one of: auto, fitz, pypdf, markitdown.", "retryable": False, } # Fail fast on missing extractors to avoid unnecessary PDF downloads. if extractor == "markitdown" and not MARKITDOWN_AVAILABLE: return { "status": "error", "error": "markitdown library not available. Install with: pip install 'markitdown[all]'", "retryable": False, } if extractor == "fitz" and not FITZ_AVAILABLE: return { "status": "error", "error": "PyMuPDF (fitz) not available. Install with: pip install pymupdf", "retryable": False, } if extractor == "pypdf" and not PYPDF_AVAILABLE: return { "status": "error", "error": "pypdf not available. Install with: pip install pypdf", "retryable": False, } if extractor == "auto" and not ( FITZ_AVAILABLE or PYPDF_AVAILABLE or MARKITDOWN_AVAILABLE ): return { "status": "error", "error": "No PDF text extractor available (need pymupdf, pypdf, or markitdown).", "retryable": False, } try: window_chars = int(arguments.get("window_chars", 220)) except (TypeError, ValueError): window_chars = 220 window_chars = max(20, min(window_chars, 2000)) try: max_snippets_per_term = int(arguments.get("max_snippets_per_term", 3)) except (TypeError, ValueError): max_snippets_per_term = 3 max_snippets_per_term = max(1, min(max_snippets_per_term, 10)) try: max_total_chars = int(arguments.get("max_total_chars", 8000)) except (TypeError, ValueError): max_total_chars = 8000 max_total_chars = max(1000, min(max_total_chars, 50000)) try: download_timeout = int(arguments.get("timeout", 20)) except (TypeError, ValueError): download_timeout = 20 download_timeout = max(5, min(download_timeout, 55)) try: max_pdf_bytes = int(arguments.get("max_pdf_bytes", 20_000_000)) except (TypeError, ValueError): max_pdf_bytes = 20_000_000 max_pdf_bytes = max(1_000_000, min(max_pdf_bytes, 100_000_000)) try: max_pages = int(arguments.get("max_pages", 12)) except (TypeError, ValueError): max_pages = 12 max_pages = max(1, min(max_pages, 200)) try: max_text_chars = int(arguments.get("max_text_chars", 400_000)) except (TypeError, ValueError): max_text_chars = 400_000 max_text_chars = max(50_000, min(max_text_chars, 2_000_000)) retrieval_trace: list[dict[str, Any]] = [] final_url = pdf_url extractor_used: Optional[str] = None pages_scanned: Optional[int] = None def _looks_like_pdf(content_type: Optional[str], body: Optional[bytes]) -> bool: ct = (content_type or "").lower() if "application/pdf" in ct or ct.startswith("application/x-pdf"): return True if isinstance(body, (bytes, bytearray)) and body[:4] == b"%PDF": return True return False def _extract_pdf_url_from_html(base: str, html: str) -> Optional[str]: if not isinstance(html, str) or ".pdf" not in html.lower(): return None # Prefer explicit href/src ending with .pdf m = re.search(r'(?is)(?:href|src)=["\']([^"\']+\.pdf[^"\']*)["\']', html) if not m: return None return urljoin(base, m.group(1)) try: # Some CORE PDF URLs redirect to filesXX.core.ac.uk and can be unstable. # If a CORE numeric id is present, try a few variants. candidates = [pdf_url] m = re.search(r"/download/(?:pdf/)?(\\d+)\\.pdf", pdf_url) if m: pid = m.group(1) candidates.extend( [ f"https://core.ac.uk/download/{pid}.pdf", f"https://core.ac.uk/download/pdf/{pid}.pdf", ] ) resp = None pdf_bytes: Optional[bytes] = None seen: set[str] = set() i = 0 while i < len(candidates): u = candidates[i] i += 1 if u in seen: continue seen.add(u) # HEAD to check size quickly when possible. try: head = request_with_retry( self.session, "HEAD", u, timeout=min(10, download_timeout), max_attempts=2, ) retrieval_trace.append( { "attempt": "head_pdf", "url": getattr(head, "url", None) or u, "status_code": getattr(head, "status_code", None), "content_type": (getattr(head, "headers", {}) or {}).get( "content-type" ), "content_length": (getattr(head, "headers", {}) or {}).get( "content-length" ), "note": None, } ) if head.status_code in (200, 206): cl = (head.headers or {}).get("content-length") if cl and str(cl).isdigit() and int(cl) > max_pdf_bytes: return { "status": "error", "error": "PDF too large to download within tool limits.", "pdf_url": pdf_url, "candidate_url": u, "content_length": int(cl), "max_pdf_bytes": max_pdf_bytes, "retryable": False, "retrieval_trace": retrieval_trace, } except Exception: pass resp = request_with_retry( self.session, "GET", u, timeout=download_timeout, max_attempts=2 ) final_url = getattr(resp, "url", None) or u content_type = (getattr(resp, "headers", {}) or {}).get("content-type") retrieval_trace.append( { "attempt": "download_pdf", "url": final_url, "status_code": getattr(resp, "status_code", None), "content_type": content_type, "note": None, } ) if resp.status_code == 200: # Some "PDF" URLs (e.g., PMC /pdf/ endpoints) return an HTML page # that contains the real .pdf link. Detect and follow once. if not _looks_like_pdf(content_type, resp.content): try: html_text = ( resp.text if hasattr(resp, "text") else resp.content.decode("utf-8", "ignore") ) except Exception: html_text = "" next_pdf = _extract_pdf_url_from_html(final_url, html_text) if next_pdf: candidates.append(next_pdf) retrieval_trace.append( { "attempt": "html_to_pdf_link", "url": next_pdf, "status_code": None, "content_type": "application/pdf", "note": "Followed PDF link extracted from HTML response.", } ) continue pdf_bytes = resp.content if pdf_bytes is not None and len(pdf_bytes) > max_pdf_bytes: return { "status": "error", "error": "Downloaded PDF exceeds tool size cap.", "pdf_url": pdf_url, "candidate_url": u, "downloaded_bytes": len(pdf_bytes), "max_pdf_bytes": max_pdf_bytes, "retryable": False, "retrieval_trace": retrieval_trace, } break if resp is None or resp.status_code != 200 or pdf_bytes is None: sc = getattr(resp, "status_code", None) return { "status": "error", "error": f"PDF download failed (HTTP {sc})", "pdf_url": pdf_url, "status_code": sc, "retryable": sc in (408, 429, 500, 502, 503, 504), "retrieval_trace": retrieval_trace, } text = "" if extractor in {"auto", "fitz"} and FITZ_AVAILABLE: extractor_used = "fitz" doc = fitz.open(stream=pdf_bytes, filetype="pdf") try: parts = [] n = min(len(doc), max_pages) for i in range(n): parts.append(doc.load_page(i).get_text("text")) pages_scanned = n text = "\n".join(parts) finally: try: doc.close() except Exception: pass elif extractor in {"auto", "pypdf"} and PYPDF_AVAILABLE: extractor_used = "pypdf" reader = PdfReader(io.BytesIO(pdf_bytes)) parts = [] n = min(len(reader.pages), max_pages) for i in range(n): try: parts.append(reader.pages[i].extract_text() or "") except Exception: parts.append("") pages_scanned = n text = "\n".join(parts) else: if extractor == "fitz" and not FITZ_AVAILABLE: return { "status": "error", "error": "PyMuPDF (fitz) not available. Install with: pip install pymupdf", "retryable": False, "retrieval_trace": retrieval_trace, } if extractor == "pypdf" and not PYPDF_AVAILABLE: return { "status": "error", "error": "pypdf not available. Install with: pip install pypdf", "retryable": False, "retrieval_trace": retrieval_trace, } if extractor == "markitdown" and not MARKITDOWN_AVAILABLE: return { "status": "error", "error": "markitdown library not available. Install with: pip install 'markitdown[all]'", "retryable": False, "retrieval_trace": retrieval_trace, } if not MARKITDOWN_AVAILABLE: return { "status": "error", "error": "No PDF text extractor available (need pymupdf, pypdf, or markitdown).", "retryable": False, "retrieval_trace": retrieval_trace, } extractor_used = "markitdown" with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp: tmp.write(pdf_bytes) tmp_path = tmp.name try: result = self.md_converter.convert(tmp_path) text = ( result.text_content if hasattr(result, "text_content") else str(result) ) finally: try: os.unlink(tmp_path) except Exception: pass except Exception as e: retrieval_trace.append( { "attempt": "download_pdf_exception", "url": final_url, "status_code": None, "content_type": None, "note": str(e), } ) return { "status": "error", "error": f"PDF download/processing failed: {str(e)}", "pdf_url": pdf_url, "retryable": True, "retrieval_trace": retrieval_trace, } snippets: list[dict[str, str]] = [] total_chars = 0 text_scanned = (text or "")[:max_text_chars] low = text_scanned.lower() for raw_term in terms: if not isinstance(raw_term, str): continue term = raw_term.strip() if not term: continue needle = term.lower() found = 0 for m in re.finditer(re.escape(needle), low): if found >= max_snippets_per_term: break start = max(0, m.start() - window_chars) end = min(len(text_scanned), m.end() + window_chars) snippet = text_scanned[start:end].strip() if total_chars + len(snippet) > max_total_chars: break snippets.append({"term": term, "snippet": snippet}) total_chars += len(snippet) found += 1 return { "status": "success", "pdf_url": final_url, "retrieval_trace": retrieval_trace, "snippets": snippets, "snippets_count": len(snippets), "truncated": total_chars >= max_total_chars, "extractor_used": extractor_used, "pages_scanned": pages_scanned, "text_chars_scanned": len(text_scanned), }