Source code for tooluniverse.epa_envirofacts_tool
"""
EPA Envirofacts tools for ToolUniverse — US environmental facility data.
The U.S. EPA Envirofacts REST service exposes regulated-facility data, including
the Toxics Release Inventory (TRI) and the Facility Registry Service (FRS). These
tools query the two validated facility tables by state (and optional city).
API: https://data.epa.gov/efservice/{table}/{col}/{val}.../rows/{a}:{b}/JSON
(public, no authentication, US Gov public domain)
"""
from typing import Any, Dict
from urllib.parse import quote
import requests
from .base_tool import BaseTool
from .tool_registry import register_tool
EFSERVICE = "https://data.epa.gov/efservice"
class _EnvirofactsBase(BaseTool):
table = ""
state_col = ""
city_col = "city_name"
def __init__(self, tool_config: Dict[str, Any]):
super().__init__(tool_config)
self.fields = tool_config.get("fields", {}) or {}
self.timeout = self.fields.get("timeout", 30)
def run(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
# A config can opt into a single-column lookup mode (e.g. query the
# tri_reporting_form table by tri_facility_id) by declaring a
# "lookup_column" in its "fields". When absent, fall back to the
# original state/city facility-search behavior.
if self.fields.get("lookup_column"):
return self._run_lookup(arguments)
return self._run_state_search(arguments)
def _run_lookup(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
table = self.fields.get("table") or self.table
column = self.fields["lookup_column"]
param = self.fields.get("lookup_param", column)
value = (arguments.get(param) or "").strip()
if not value:
return {
"status": "error",
"error": f"'{param}' is required",
}
try:
limit = max(1, min(int(arguments.get("limit") or 10), 100))
except (TypeError, ValueError):
limit = 10
path = f"{EFSERVICE}/{table}/{column}/{quote(value)}/rows/0:{limit - 1}/JSON"
rows = self._fetch(path)
if isinstance(rows, dict) and rows.get("status") == "error":
return rows
return {
"status": "success",
"data": [self._summarize(r) for r in rows if isinstance(r, dict)],
"metadata": {
"total_results": len(rows),
"table": table,
"query": {param: value},
"source": "EPA Envirofacts",
},
}
def _run_state_search(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
state = (arguments.get("state") or "").strip().upper()
if not state:
return {
"status": "error",
"error": "'state' (2-letter code, e.g. 'CA') is required",
}
city = (arguments.get("city") or "").strip()
try:
limit = max(1, min(int(arguments.get("limit") or 10), 100))
except (TypeError, ValueError):
limit = 10
path = f"{EFSERVICE}/{self.table}/{self.state_col}/{quote(state)}"
if city:
path += f"/{self.city_col}/{quote(city.upper())}"
path += f"/rows/0:{limit - 1}/JSON"
rows = self._fetch(path)
if isinstance(rows, dict) and rows.get("status") == "error":
return rows
return {
"status": "success",
"data": [self._summarize(r) for r in rows if isinstance(r, dict)],
"metadata": {
"total_results": len(rows),
"table": self.table,
"query": {"state": state, "city": city or None},
"source": "EPA Envirofacts",
},
}
def _fetch(self, path: str) -> Any:
try:
resp = requests.get(
path, headers={"Accept": "application/json"}, timeout=self.timeout
)
resp.raise_for_status()
rows = resp.json()
except requests.exceptions.Timeout:
return {
"status": "error",
"error": f"EPA Envirofacts request timed out after {self.timeout}s",
}
except requests.exceptions.RequestException as e:
return {"status": "error", "error": f"EPA Envirofacts request failed: {e}"}
except ValueError:
return {
"status": "error",
"error": "EPA Envirofacts returned a non-JSON response",
}
if not isinstance(rows, list):
return []
return rows
@staticmethod
def _summarize(
r: Dict[str, Any],
) -> Dict[str, Any]: # pragma: no cover - overridden
return r
[docs]
@register_tool("EPATRIFacilitiesTool")
class EPATRIFacilitiesTool(_EnvirofactsBase):
"""EPA Toxics Release Inventory (TRI) data via Envirofacts.
Default mode searches TRI facilities by state/city. A config can set
``fields.row_kind = "reporting_form"`` (with ``fields.lookup_column =
"tri_facility_id"``) to instead return the per-facility, per-chemical,
per-year TRI reporting forms for one facility.
"""
table = "tri_facility"
state_col = "state_abbr"
[docs]
def _summarize(self, r: Dict[str, Any]) -> Dict[str, Any]:
if self.fields.get("row_kind") == "reporting_form":
return self._summarize_reporting_form(r)
return self._summarize_facility(r)
[docs]
@staticmethod
def _summarize_facility(r: Dict[str, Any]) -> Dict[str, Any]:
return {
"tri_facility_id": r.get("tri_facility_id"),
"facility_name": r.get("facility_name"),
"street_address": r.get("street_address"),
"city": r.get("city_name"),
"county": r.get("county_name"),
"state": r.get("state_abbr"),
"zip_code": r.get("zip_code"),
"epa_region": r.get("region"),
"closed": r.get("fac_closed_ind"),
}
[docs]
@staticmethod
def _summarize_reporting_form(r: Dict[str, Any]) -> Dict[str, Any]:
return {
"tri_facility_id": r.get("tri_facility_id"),
"tri_chem_id": r.get("tri_chem_id"),
"chemical_name": r.get("cas_chem_name") or r.get("generic_chem_name"),
"reporting_year": r.get("reporting_year"),
"form_type": r.get("form_type_ind"),
"max_amount_code": r.get("max_amount_of_chem"),
"one_time_release_qty": r.get("one_time_release_qty"),
"production_ratio": r.get("production_ratio"),
"federal_facility": r.get("federal_fac_ind"),
"trade_secret": r.get("trade_secret_ind"),
"doc_ctrl_num": r.get("doc_ctrl_num"),
}
[docs]
@register_tool("EPAFRSFacilitiesTool")
class EPAFRSFacilitiesTool(_EnvirofactsBase):
"""Search EPA Facility Registry Service (FRS) facility sites by state/city."""
table = "frs_facility_site"
state_col = "state_code"
city_col = "city_name"
[docs]
@staticmethod
def _summarize(r: Dict[str, Any]) -> Dict[str, Any]:
return {
"facility_name": r.get("std_name") or r.get("std_base_name"),
"address": r.get("std_full_address") or r.get("location_address"),
"city": r.get("std_city_name") or r.get("city_name"),
"state": r.get("state_name"),
"registry_id": r.get("parent_registry_id"),
"federal_facility": r.get("federal_facility_code"),
"tribal_land": r.get("tribal_land_name"),
}