Source code for tooluniverse.uspto_tool
import requests
import json
import re
import os
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from .base_tool import BaseTool
from .tool_registry import register_tool
from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv(usecwd=True))
USPTO_API_KEY = os.environ.get("USPTO_API_KEY")
[docs]
@register_tool("USPTOOpenDataPortalTool")
class USPTOOpenDataPortalTool(BaseTool):
"""
A tool for interacting with the USPTO Open Data Portal API to search for and retrieve patent information.
The run method dynamically constructs API requests based on the provided tool configuration.
"""
[docs]
def __init__(
self,
tool_config,
api_key=USPTO_API_KEY,
base_url="https://api.uspto.gov/api/v1",
):
"""
Initializes the USPTOOpenDataPortalTool.
Args:
tool_config: The configuration for the specific tool being run.
api_key: Your USPTO Open Data Portal API key.
base_url: The base URL for the USPTO API.
"""
super().__init__(tool_config)
self.base_url = base_url
if api_key == "YOUR_API_KEY" or not api_key:
raise ValueError(
"You must set a USPTO API key via the USPTO_API_KEY environment variable."
)
self.headers = {"X-API-KEY": api_key, "Accept": "application/json"}
self.session = requests.Session()
retry_strategy = Retry(
total=5,
status_forcelist=[429, 500, 502, 503, 504],
backoff_factor=5, # first retry waits 5s, then 10s, 20s, …
raise_on_status=False,
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("https://", adapter)
[docs]
def get_by_path(self, d, keys):
"""Safely navigate nested dicts by a list of keys."""
for k in keys:
if d is None:
return None
if isinstance(d, dict):
d = d.get(k)
else:
return None
return d
[docs]
def assign_by_path(self, d, path, value):
"""Create nested dicts for a dot‑path and set the final key to value."""
keys = path.split(".")
for k in keys[:-1]:
d = d.setdefault(k, {})
d[keys[-1]] = value
[docs]
def prune_item(self, item, return_fields):
out = {}
missing_fields = []
# 1) First, handle all the list‑of‑objects fields (the "/" ones),
# grouping them by their list‑path prefix.
list_groups = {}
for field in return_fields:
if "/" in field:
list_path, prop = field.split("/", 1)
list_groups.setdefault(list_path, []).append(prop)
for list_path, props in list_groups.items():
prefix_keys = list_path.split(".")
raw_list = self.get_by_path(item, prefix_keys)
if not isinstance(raw_list, list):
for prop in props:
missing_fields.append(f"{list_path}/{prop}")
continue
pruned_list = []
prop_found = {prop: False for prop in props}
for el in raw_list:
if not isinstance(el, dict):
continue
pruned_el = {}
for prop in props:
if prop in el:
pruned_el[prop] = el[prop]
prop_found[prop] = True
if pruned_el:
pruned_list.append(pruned_el)
# Track missing properties
for prop, found in prop_found.items():
if not found:
missing_fields.append(f"{list_path}/{prop}")
if pruned_list:
self.assign_by_path(out, list_path, pruned_list)
# 2) Then handle all the scalar or nested‑dict fields (the "." ones without "/").
for field in return_fields:
if "/" in field:
continue # already done
keys = field.split(".")
raw_value = self.get_by_path(item, keys)
if raw_value is None:
missing_fields.append(field)
continue
self.assign_by_path(out, field, raw_value)
out["missing_fields"] = missing_fields
return out
[docs]
def run(self, arguments):
"""
Runs the specified tool by constructing and executing an API call based on the tool's configuration.
Args:
arguments: A dictionary of arguments for the tool, matching the parameters in the tool definition.
Returns:
The result of the API call, either as a dictionary (for JSON) or a string (for CSV).
"""
endpoint = self.tool_config.get("api_endpoint")
if not endpoint:
return {"error": "API endpoint not found in tool configuration."}
path_params = re.findall(r"\{(\w+)\}", endpoint)
query_params = {}
# Substitute path parameters and build query string parameters
for key, value in arguments.items():
if key in path_params:
endpoint = endpoint.replace(f"{{{key}}}", str(value))
else:
query_params[key] = value
# Remove any None values from the query parameters
for k, v in query_params.items():
if v is None:
if (
self.tool_config.get("parameter")
.get("properties")
.get(k)
.get("default")
is not None
):
query_params[k] = (
self.tool_config.get("parameter")
.get("properties")
.get(k)
.get("default")
)
else:
del query_params[k]
# default parameters if not provided
for k, v in self.tool_config.get("default_query_params", {}).items():
if k not in query_params or query_params[k] is None:
query_params[k] = v
# Special handling for the inputs to this tool
if self.tool_config.get("name") == "get_patent_overview_by_text_query":
if "query" in query_params:
query_params["q"] = query_params["query"]
del query_params["query"]
else:
return {"error": "Missing required parameter 'query'."}
if query_params["exact_match"]:
query_params["q"] = f'"{query_params["q"]}"'
del query_params["exact_match"]
field_mappings = {
"filingDate": "applicationMetaData.filingDate",
"grantDate": "applicationMetaData.grantDate",
}
for old_field, new_field in field_mappings.items():
if old_field in query_params.get("sort", ""):
query_params["sort"] = query_params["sort"].replace(
old_field, new_field
)
if old_field in query_params.get("rangeFilters", ""):
query_params["rangeFilters"] = query_params["rangeFilters"].replace(
old_field, new_field
)
try:
# The timeout for downloads can be longer
timeout = 120 if "download" in self.tool_config.get("name", "") else 30
response = self.session.get(
f"{self.base_url}/{endpoint}",
headers=self.headers,
params=query_params,
timeout=timeout,
)
response.raise_for_status()
# Otherwise, assume the response is JSON
if self.tool_config.get("return_fields", []):
# Filter the JSON response to only include specified fields
pruned_patents = []
result = response.json()
for patent in result.get("patentFileWrapperDataBag", []):
pruned_patents.append(
self.prune_item(patent, self.tool_config.get("return_fields"))
)
result["patentFileWrapperDataBag"] = pruned_patents
else:
result = response.json()
return result
except requests.exceptions.HTTPError as http_err:
# Attempt to return the structured error from the API response body
try:
error_details = http_err.response.json()
except json.JSONDecodeError:
error_details = http_err.response.text
return {
"error": f"HTTP Error: {http_err.response.status_code}",
"details": error_details,
}
except requests.exceptions.RequestException as e:
return {"error": "API request failed", "details": str(e)}