Source code for tooluniverse.biostudies_tool
"""
BioStudies Database Tool
BioStudies is a comprehensive repository for biological study data at EMBL-EBI.
It hosts data from various collections including ArrayExpress, and supports
diverse data types from genomics to imaging and clinical trials.
This tool provides access to the BioStudies API for searching and retrieving
biological study information.
"""
import requests
from typing import Any, Dict, Optional
from .base_tool import BaseTool
from .tool_registry import register_tool
try:
from markitdown import MarkItDown
MARKITDOWN_AVAILABLE = True
except ImportError:
MARKITDOWN_AVAILABLE = False
[docs]
@register_tool("BioStudiesRESTTool")
class BioStudiesRESTTool(BaseTool):
"""
BioStudies REST API tool.
BioStudies is a general-purpose repository for biological studies at EMBL-EBI.
It provides access to diverse study types including genomics, transcriptomics,
proteomics, imaging, and more.
"""
[docs]
def __init__(self, tool_config: Dict):
super().__init__(tool_config)
self.base_url = "https://www.ebi.ac.uk/biostudies/api/v1"
self.session = requests.Session()
self.session.headers.update(
{"Accept": "application/json", "User-Agent": "ToolUniverse/1.0"}
)
self.timeout = 30
# Initialize MarkItDown if available
if MARKITDOWN_AVAILABLE:
self.md_converter = MarkItDown()
else:
self.md_converter = None
[docs]
def _build_url(self, args: Dict[str, Any]) -> str:
"""Build URL from arguments"""
tool_name = self.tool_config.get("name", "")
if tool_name == "biostudies_search":
return f"{self.base_url}/search"
elif tool_name == "biostudies_get_study":
accession = args.get("accession", "")
if accession:
return f"{self.base_url}/studies/{accession}"
elif tool_name == "biostudies_get_study_files":
accession = args.get("accession", "")
if accession:
# Note: files endpoint doesn't exist, we get files from study details
return f"{self.base_url}/studies/{accession}"
elif tool_name == "biostudies_search_by_collection":
# Collection goes in URL path per API docs: /api/v1/{collection}/search
collection = args.get("collection", "")
if collection:
return f"{self.base_url}/{collection}/search"
return f"{self.base_url}/search"
return f"{self.base_url}/search"
[docs]
def _build_params(self, args: Dict[str, Any]) -> Dict[str, Any]:
"""Build query parameters for BioStudies API"""
params = {}
tool_name = self.tool_config.get("name", "")
if tool_name in ["biostudies_search", "biostudies_search_by_collection"]:
# Build search query
if "query" in args:
params["query"] = args["query"]
else:
params["query"] = "*" # Default to all
# Note: for biostudies_search_by_collection, collection is in URL path, not params
# For biostudies_search, collection can still be used as a filter
if tool_name == "biostudies_search" and "collection" in args:
params["collection"] = args["collection"]
# Pagination
page_size = args.get("pageSize", args.get("limit", 10))
params["pageSize"] = min(page_size, 100)
page = args.get("page", 1)
params["page"] = page
# Sorting
if "sortBy" in args:
params["sortBy"] = args["sortBy"]
if "sortOrder" in args:
params["sortOrder"] = args["sortOrder"]
return params
[docs]
def _convert_html_to_markdown(self, html_content: str, url: str) -> str:
"""Convert HTML content to Markdown using markitdown"""
if not self.md_converter:
return html_content
try:
# markitdown expects file-like or string
result = self.md_converter.convert_stream(html_content)
return (
result.text_content if hasattr(result, "text_content") else str(result)
)
except Exception as e:
return f"[Could not convert HTML to Markdown: {str(e)}]\n\n{html_content[:500]}..."
[docs]
def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Execute the BioStudies API call"""
try:
url = self._build_url(arguments)
params = self._build_params(arguments)
tool_name = self.tool_config.get("name", "")
response = self.session.get(url, params=params, timeout=self.timeout)
response.raise_for_status()
# Check content type
content_type = response.headers.get("content-type", "")
if "json" in content_type.lower():
data = response.json()
# Format response based on tool type
if tool_name in [
"biostudies_search",
"biostudies_search_by_collection",
]:
hits = data.get("hits", [])
return {
"status": "success",
"data": {
"hits": hits,
"totalHits": data.get("totalHits", 0),
"page": data.get("page", 1),
"pageSize": data.get("pageSize", len(hits)),
"sortBy": data.get("sortBy"),
"sortOrder": data.get("sortOrder"),
},
"count": len(hits),
"url": response.url,
}
elif tool_name == "biostudies_get_study":
return {
"status": "success",
"data": data,
"url": response.url,
}
elif tool_name == "biostudies_get_study_files":
# Extract file list from response
files = self._extract_files(data)
return {
"status": "success",
"data": files,
"count": len(files),
"url": response.url,
}
else:
# Generic response
return {
"status": "success",
"data": data,
"url": response.url,
}
elif "html" in content_type.lower():
# Handle HTML response using markitdown
html_content = response.text
if self.md_converter:
markdown_content = self._convert_html_to_markdown(
html_content, response.url
)
return {
"status": "success",
"data": {
"format": "markdown",
"content": markdown_content,
"original_format": "html",
"note": "HTML response converted to Markdown using markitdown",
},
"url": response.url,
}
else:
return {
"status": "warning",
"data": {
"format": "html",
"content": html_content,
"note": "HTML response returned (markitdown not available for conversion)",
},
"url": response.url,
}
else:
# Unknown content type
return {
"status": "warning",
"data": {
"format": content_type,
"content": response.text,
"note": f"Unexpected content type: {content_type}",
},
"url": response.url,
}
except requests.exceptions.RequestException as e:
return {
"status": "error",
"error": f"BioStudies API error: {str(e)}",
"url": url if "url" in locals() else None,
}
except Exception as e:
return {
"status": "error",
"error": f"Unexpected error: {str(e)}",
"url": url if "url" in locals() else None,
}
[docs]
def _extract_files(self, data: Any) -> list:
"""Extract file list from BioStudies response"""
files = []
if isinstance(data, dict):
# If data has a section, extract files from it
if "section" in data:
files = self._extract_files_from_section(data["section"])
# If data has direct files array
elif "files" in data and isinstance(data["files"], list):
for file_obj in data["files"]:
if isinstance(file_obj, dict):
files.append(
{
"path": file_obj.get("path", file_obj.get("name", "")),
"size": file_obj.get("size", 0),
"type": file_obj.get("type", ""),
"attributes": file_obj.get("attributes", []),
}
)
elif isinstance(data, list):
# If data is directly a list of files
for file_obj in data:
if isinstance(file_obj, dict):
files.append(
{
"path": file_obj.get("path", file_obj.get("name", "")),
"size": file_obj.get("size", 0),
"type": file_obj.get("type", ""),
}
)
return files
[docs]
def _extract_files_from_section(self, section: Dict[str, Any]) -> list:
"""Extract files from a BioStudies section (recursive)"""
files = []
# Add files from current section
if "files" in section and isinstance(section["files"], list):
for file_obj in section["files"]:
if isinstance(file_obj, dict):
files.append(
{
"path": file_obj.get("path", file_obj.get("name", "")),
"size": file_obj.get("size", 0),
"type": file_obj.get("type", ""),
"attributes": file_obj.get("attributes", []),
}
)
# Recursively extract from subsections
if "subsections" in section and isinstance(section["subsections"], list):
for subsection_group in section["subsections"]:
if isinstance(subsection_group, list):
for subsection in subsection_group:
if isinstance(subsection, dict):
files.extend(self._extract_files_from_section(subsection))
elif isinstance(subsection_group, dict):
files.extend(self._extract_files_from_section(subsection_group))
return files