Source code for tooluniverse.database_setup.hf.sync_hf

"""
Hugging Face sync utilities for SQLite + FAISS datastore artifacts.

Artifacts
---------
- <collection>.db     : SQLite content store (docs, FTS5 mirror, metadata)
- <collection>.faiss  : FAISS index (IndexFlatIP), sibling to the DB under the user cache dir (<user_cache_dir>/embeddings)

Public API
----------
db_path_for_collection(collection) -> Path
    Resolve the on-disk SQLite path for a collection.

upload(collection, repo=None, private=True, commit_message="Update", tool_json=None)
    Create/ensure a HF dataset repo and upload <collection>.db/.faiss, plus optional tool JSON file(s).

download(repo, collection, overwrite=False, include_tools=False)
    Download *.db/*.faiss from a HF dataset repo snapshot (and optionally any *.json tool files) and restores
    them under the user cache dir (<user_cache_dir>/embeddings) as <collection>.db/.faiss.

Notes
-----
- Requires HF_TOKEN (env or HF cache) for private repos or authenticated uploads.
- Upload streams large files; download uses tooluniverse.utils.download_from_hf.
- Existing local files are preserved unless overwrite=True.
"""

import os
import shutil
from pathlib import Path
from dotenv import load_dotenv
from huggingface_hub import HfApi, whoami, get_token
from tooluniverse.utils import download_from_hf
from tooluniverse.utils import get_user_cache_dir  # ensure imported for DATA_DIR setup

# Always load .env if present
load_dotenv()

DATA_DIR = Path(
    os.environ.get("TU_DATA_DIR", os.path.join(get_user_cache_dir(), "embeddings"))
)
DATA_DIR.mkdir(parents=True, exist_ok=True)

# ---------------------------
# Helpers
# ---------------------------


def db_path_for_collection(collection: str) -> Path:
    """Return the absolute path for the user cache dir (<user_cache_dir>/embeddings/<collection>.db)."""
    return DATA_DIR / f"{collection}.db"


def get_hf_api():
    """Return an authenticated (HfApi, token) tuple."""
    token = os.getenv("HF_TOKEN") or get_token()
    if not token:
        raise RuntimeError("HF_TOKEN not set in environment or .env file")
    return HfApi(token=token), token


# ---------------------------
# Upload
# ---------------------------


[docs] def upload( collection: str, repo: str = None, private: bool = True, commit_message: str = "Update", tool_json: list[str] | None = None, ): """Upload a collection’s DB and FAISS index (and optional tool JSON file(s)) to the user’s own HF account.""" api, token = get_hf_api() username = whoami(token=token)["name"] # Default to user's own namespace if not provided if repo is None: repo = f"{username}/{collection}" print(f"No repo specified — using default: {repo}") api.create_repo( repo_id=repo, repo_type="dataset", private=private, exist_ok=True, token=token ) # Upload SQLite DB db_path = db_path_for_collection(collection) if not db_path.exists(): raise FileNotFoundError(f"Database not found: {db_path}") api.upload_file( path_or_fileobj=str(db_path), path_in_repo=f"{collection}.db", repo_id=repo, repo_type="dataset", commit_message=commit_message, token=token, ) # Upload FAISS index faiss_path = DATA_DIR / f"{collection}.faiss" if faiss_path.exists(): api.upload_file( path_or_fileobj=str(faiss_path), path_in_repo=f"{collection}.faiss", repo_id=repo, repo_type="dataset", commit_message=commit_message, token=token, ) else: print(f"No FAISS index found for {collection}") # upload tool JSON(s), if provided if tool_json: for p in tool_json: src = Path(p).expanduser().resolve() if not src.exists() or not src.is_file(): raise FileNotFoundError(f"--tool-json not found or not a file: {src}") api.upload_file( path_or_fileobj=str(src), path_in_repo=src.name, # place at repo root by basename repo_id=repo, repo_type="dataset", commit_message=commit_message, token=token, ) print(f"Uploaded {collection} to HF repo {repo}")
# --------------------------- # Download (via utils.download_from_hf) # --------------------------- def _download_one( repo: str, filename: str, local_target: Path, overwrite: bool = False ): """ Helper to fetch one file (DB or FAISS) using tooluniverse.utils.download_from_hf. """ token = os.getenv("HF_TOKEN") or get_token() or "" cfg = { "hf_dataset_path": { "repo_id": repo, "path_in_repo": filename, "save_to_local_dir": str(DATA_DIR), "token": token, } } res = download_from_hf(cfg) if not res.get("success"): raise RuntimeError(f"Failed to download {filename}: {res.get('error')}") downloaded_path = Path(res["local_path"]) if downloaded_path.resolve() == local_target.resolve(): return local_target # already correct location if local_target.exists() and not overwrite: print(f" {local_target.name} already exists. Skipping (use --overwrite).") return local_target shutil.copyfile(downloaded_path, local_target) return local_target # def download(repo: str, collection: str, overwrite: bool = False, include_tools: bool = False): # """Download <collection>.db and <collection>.faiss using the unified helper.""" # dest_db = db_path_for_collection(collection) # dest_faiss = DATA_DIR / f"{collection}.faiss" # dest_tool = anything.json # if include_tools: # Download tool # try: # file_path = _download_one(repo, f"{.endswith}.db", dest_tool, overwrite) # print(f" Restored {file_path.name} from {repo}") # except Exception as e: # print(f" Failed to download file: {e}") # return # Download DB # try: # db_path = _download_one(repo, f"{collection}.db", dest_db, overwrite) # print(f" Restored {db_path.name} from {repo}") # except Exception as e: # print(f" Failed to download DB: {e}") # return # Download FAISS (optional) # try: # faiss_path = _download_one(repo, f"{collection}.faiss", dest_faiss, overwrite) # print(f" Restored {faiss_path.name} from {repo}") # except Exception as e: # print(f" No FAISS index found or failed to download: {e}") # print(f" Download complete for {collection} from {repo}")
[docs] def download( repo: str, collection: str, overwrite: bool = False, include_tools: bool = False ): """Download <collection>.db and <collection>.faiss (and optionally any .json tool files) using the unified helper.""" dest_db = db_path_for_collection(collection) dest_faiss = DATA_DIR / f"{collection}.faiss" print(f" Downloading from {repo} into {DATA_DIR}...") # ------------------------------------------------- # (1) Optionally fetch tool JSONs (*.json) # ------------------------------------------------- if include_tools: token = os.getenv("HF_TOKEN") or get_token() api = HfApi(token=token) if token else HfApi() try: # list all files in the dataset repo files = api.list_repo_files(repo_id=repo, repo_type="dataset") for filename in files: if filename.endswith(".json"): target_path = DATA_DIR / filename target_path.parent.mkdir(parents=True, exist_ok=True) try: _download_one(repo, filename, target_path, overwrite) print(f" Restored tool file: {filename}") except Exception as e: print(f" Skipped {filename}: {e}") except Exception as e: print(f" Failed to list or download tool JSONs: {e}") # ------------------------------------------------- # (2) Download the DB # ------------------------------------------------- try: db_path = _download_one(repo, f"{collection}.db", dest_db, overwrite) print(f" Restored {db_path.name} from {repo}") except Exception as e: print(f" Failed to download DB: {e}") return # ------------------------------------------------- # (3) Download the FAISS index # ------------------------------------------------- try: faiss_path = _download_one(repo, f"{collection}.faiss", dest_faiss, overwrite) print(f" Restored {faiss_path.name} from {repo}") except Exception as e: print(f" No FAISS index found or failed to download: {e}") print(f"Download complete for {collection} from {repo}")
# --------------------------- # Entrypoint # --------------------------- if __name__ == "__main__": import argparse parser = argparse.ArgumentParser( description="Sync datastore collections with Hugging Face Hub" ) subparsers = parser.add_subparsers(dest="command") # Upload up = subparsers.add_parser("upload", help="Upload a collection to HF Hub") up.add_argument( "--collection", required=True, help="Collection name (e.g., euhealth, demo)" ) up.add_argument( "--repo", required=False, help="HF dataset repo ID (default: <your_username>/<collection> based on HF_TOKEN)", ) up.add_argument( "--repo", required=False, help="HF dataset repo ID (default: <your_username>/<collection> based on HF_TOKEN)", ) up.add_argument( "--private", action=argparse.BooleanOptionalAction, default=True, help="Make repo private (default: True). Use --no-private to make it public.", ) up.add_argument( "--commit_message", default="Update", help="Commit message for upload" ) up.add_argument( "--tool-json", nargs="*", default=None, help="Path(s) to Tool JSON file(s) to upload with the datastore.", ) # Download down = subparsers.add_parser("download", help="Download a collection from HF Hub") down.add_argument("--repo", required=True, help="HF dataset repo ID") down.add_argument( "--collection", required=True, help="Local collection name (e.g., euhealth, demo)", ) down.add_argument( "--overwrite", action="store_true", help="Overwrite existing local DB/FAISS" ) down.add_argument( "--include-tools", action="store_true", help="Also download any *.json tool files", ) args = parser.parse_args() if args.command == "upload": upload( collection=args.collection, repo=args.repo, private=args.private, commit_message=args.commit_message, tool_json=args.tool_json, ) elif args.command == "download": download( repo=args.repo, collection=args.collection, overwrite=args.overwrite, include_tools=args.include_tools, ) else: parser.print_help()