Source code for tooluniverse.markitdown_tool
"""
MarkItDown Tool for ToolUniverse
Simple implementation following Microsoft's official MCP pattern.
Supports http:, https:, file:, data: URIs.
"""
import os
import subprocess
import sys
import urllib.parse
import urllib.request
import tempfile
from typing import Dict, Any
from .base_tool import BaseTool
from .tool_registry import register_tool
[docs]
@register_tool("MarkItDownTool")
class MarkItDownTool(BaseTool):
"""MarkItDown tool for converting files to Markdown."""
[docs]
def __init__(self, tool_config):
super().__init__(tool_config)
self.tool_name = tool_config.get("name", "MarkItDownTool")
[docs]
def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Execute MarkItDown tool."""
try:
return self._convert_to_markdown(arguments)
except Exception as e:
return {"error": str(e)}
def _convert_to_markdown(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Convert a resource described by URI to Markdown using markitdown CLI."""
uri = arguments.get("uri")
output_path = arguments.get("output_path")
enable_plugins = arguments.get("enable_plugins", False)
if not uri:
return {"error": "URI is required"}
try:
# Parse URI
parsed_uri = urllib.parse.urlparse(uri)
scheme = parsed_uri.scheme.lower()
# Handle different URI schemes
if scheme in ["http", "https"]:
# Download from URL
temp_file = self._download_from_url(uri)
if not temp_file:
return {"error": f"Failed to download from URL: {uri}"}
input_path = temp_file
cleanup_temp = True
elif scheme == "file":
# Local file
file_path = urllib.request.url2pathname(parsed_uri.path)
if not os.path.exists(file_path):
return {"error": f"File not found: {file_path}"}
input_path = file_path
cleanup_temp = False
elif scheme == "data":
# Data URI
temp_file = self._handle_data_uri(uri)
if not temp_file:
return {"error": f"Failed to process data URI: {uri}"}
input_path = temp_file
cleanup_temp = True
else:
return {
"error": f"Unsupported URI scheme: {scheme}. Supported schemes: http, https, file, data"
}
# Build markitdown command
cmd = [sys.executable, "-m", "markitdown", input_path]
if enable_plugins:
cmd.append("--use-plugins")
if output_path:
cmd.extend(["-o", output_path])
# Execute command
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
error_msg = f"MarkItDown failed: {result.stderr}"
if cleanup_temp and os.path.exists(input_path):
os.unlink(input_path)
return {"error": error_msg}
# Get markdown content
if output_path and os.path.exists(output_path):
with open(output_path, "r", encoding="utf-8") as f:
markdown_content = f.read()
else:
markdown_content = result.stdout
# Clean up temporary file if needed
if cleanup_temp and os.path.exists(input_path):
os.unlink(input_path)
# Prepare response
response = {
"markdown_content": markdown_content,
"file_info": {
"original_uri": uri,
"uri_scheme": scheme,
"output_file": output_path if output_path else None,
},
}
# If no output_path specified, also return the content as a string for convenience
if not output_path:
response["content"] = markdown_content
return response
except Exception as e:
return {"error": f"URI processing failed: {str(e)}"}
def _download_from_url(self, url: str) -> str:
"""Download content from URL to temporary file."""
try:
with urllib.request.urlopen(url, timeout=30) as response:
content = response.read()
# Create temporary file
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
temp_file.write(content)
return temp_file.name
except Exception:
return None
def _handle_data_uri(self, data_uri: str) -> str:
"""Handle data URI and save to temporary file."""
try:
# Parse data URI: data:[<mediatype>][;base64],<data>
if "," not in data_uri:
return None
header, data = data_uri.split(",", 1)
# Check if base64 encoded
if ";base64" in header:
import base64
content = base64.b64decode(data)
else:
content = data.encode("utf-8")
# Create temporary file
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
temp_file.write(content)
return temp_file.name
except Exception:
return None