Source code for tooluniverse.database_setup.pipeline

"""
High-level helpers for building and querying collections.

Exposes
-------
build_collection(db_path, collection, docs, embed_provider, embed_model, overwrite=False)
    Create or extend a collection, insert documents with de-dup, embed texts, and persist a FAISS index.
search(db_path, collection, query, method="hybrid", top_k=10, alpha=0.5, embed_provider=None, embed_model=None)
    Keyword/embedding/hybrid search over an existing collection.

Notes
-----
- Input docs are (doc_key, text, metadata, [text_hash]).
- If a collection records embedding_model="precomputed", you must provide an embed
  provider/model at query time for embedding/hybrid searches.
"""

from __future__ import annotations
from typing import List, Dict, Any, Optional, Tuple
import numpy as np
import sqlite3

from .sqlite_store import SQLiteStore
from .vector_store import VectorStore
from .embedder import Embedder
from tooluniverse.database_setup.provider_resolver import (
    resolve_provider,
    resolve_model,
)
from tooluniverse.database_setup.embed_utils import get_model_dim


def _l2norm(x: np.ndarray) -> np.ndarray:
    """Row-wise L2 normalization with an epsilon guard."""
    return x / (np.linalg.norm(x, axis=1, keepdims=True) + 1e-12)


def _get_collection_meta(
    conn: sqlite3.Connection, name: str
) -> Tuple[Optional[str], Optional[int]]:
    """Return (embedding_model, embedding_dimensions) for a collection or (None, None)."""
    cur = conn.execute(
        "SELECT embedding_model, embedding_dimensions FROM collections WHERE name=? LIMIT 1",
        (name,),
    )
    row = cur.fetchone()
    return (row[0], row[1]) if row else (None, None)


[docs] def build_collection( db_path: str, collection: str, docs: List[tuple[str, str, Dict[str, Any], Optional[str]]], embed_provider: str, embed_model: str, overwrite: bool = False, ) -> None: """Create/extend a collection, embed docs, and populate FAISS. Inserts/merges documents (dedupe by (collection, doc_key) and by (collection, text_hash) when present), computes embeddings with the requested provider/model, L2-normalizes them, and appends to <collection>.faiss via VectorStore. Idempotency ----------- Re-running is safe: existing (doc_key) are ignored; content duplicates (text_hash) are skipped. Side effects ------------ - Records the true embedding model and dimension in the `collections` table. """ print(f" Detecting embedding dimension for {embed_provider}:{embed_model} ...") try: embed_dim = get_model_dim(embed_provider, embed_model) print(f" Detected embedding dimension: {embed_dim}") except Exception as e: raise RuntimeError(f"Failed to detect embedding dimension: {e}") # os.makedirs(os.path.dirname(os.path.expanduser(db_path)), exist_ok=True) store = SQLiteStore(db_path) # Upsert collection metadata (safe to call repeatedly) store.upsert_collection( collection, description=f"Datastore for {collection}", embedding_model=embed_model, embedding_dimensions=embed_dim, ) # Insert/merge docs (dedupe by (collection, doc_key); optional text_hash dedupe if index exists) store.insert_docs(collection, docs) # Fetch back to embed rows = store.fetch_docs(collection, limit=100000) if not rows: return texts = [r["text"] for r in rows] doc_ids = [r["id"] for r in rows] emb = Embedder(provider=embed_provider, model=embed_model) vecs = emb.embed(texts).astype("float32") vecs = _l2norm(vecs) print(f"Embedded {len(vecs)} docs with shape {vecs.shape}") vs = VectorStore(db_path) # Optionally reset existing FAISS index if overwrite=True vs.load_index(collection, dim=embed_dim, reset=overwrite) vs.add_embeddings(collection, doc_ids, vecs, dim=embed_dim)
# replace the beginning of search(...)