From c3830fb0b5b98a6d3fde2a2245f9468fbf5ac850 Mon Sep 17 00:00:00 2001 From: Jascha Date: Fri, 15 May 2026 11:11:43 -0700 Subject: [PATCH 1/2] Add PgVectorAdapter for pgvector-backed Postgres MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit VectorPin can now pin records in a pgvector-equipped Postgres table. This is the highest-leverage adapter to add: pgvector is the de-facto choice for teams that already operate Postgres and want to bolt embedding search onto an existing OLTP database, and a vector row is structurally indistinguishable from any other row to surrounding RBAC, backup, replication, and CDC machinery — meaning VectorPin's signed provenance is the only out-of-band integrity check available. src/vectorpin/adapters/pgvector.py (new) - PgVectorAdapter with the same shape as QdrantAdapter / LanceDBAdapter: iter_records, get, attach_pin, plus a classmethod .connect(dsn, ...). - iter_records uses a plain client cursor + fetchmany(batch_size) to bound memory without requiring an explicit transaction (autocommit mode forbids server-side DECLARE CURSOR). - TLS guard mirroring QdrantAdapter._enforce_tls: rejects non-loopback postgres DSNs without sslmode=require (or stronger), unless VECTORPIN_ALLOW_INSECURE_HTTP=1 is set. Postgres credentials live inside the DSN, so plaintext to a remote host leaks them. - Identifier validation on table_name / id_column / vector_column / pin_column: ^[A-Za-z_][A-Za-z0-9_]*$. Postgres has no parameterized form for identifiers; this is the only line of defense against '--table foo; DROP ...' shaped inputs. - Pin column accepts JSONB (decoded to dict, parsed via Pin.from_dict) or TEXT (str, parsed via Pin.from_json). Both routes go through the strict v2 schema validation already on main. src/vectorpin/adapters/__init__.py - Registers PgVectorAdapter in the lazy-import map and __all__. src/vectorpin/cli.py - New audit-pgvector subcommand mirroring audit-lancedb/audit-chroma shape: --dsn, --table, --public-key, --key-id, --id-column (default id), --vector-column (default embedding), --pin-column (default vectorpin), --batch-size. pyproject.toml - New optional extra: pgvector = ['psycopg[binary]>=3.1', 'pgvector>=0.3']. - Added to the 'all' extra. tests/test_adapter_pgvector.py (new, 22 tests) - 14 offline (no DB): TLS guard accepts loopback / sslmode=require, rejects remote plaintext, env-var escape hatch, keyword-form DSN pass-through; identifier validator accepts/rejects parametrized hostile inputs. - 8 live integration: iter_records, attach_pin + get roundtrip, full sign-attach-verify roundtrip under the v2 Verifier, KeyError on unknown id (get + attach_pin), loopback DSN doesn't trip TLS, bad table/column names rejected at connect. - Integration tests auto-discover the compose service via VECTORPIN_TEST_PGVECTOR_URL > PGVECTOR_URL > the compose-default DSN, and skip cleanly when no instance is reachable. All 22 pass against pgvector/pgvector:pg16 from VectorSmuggle's test_vector_dbs_docker/. Full repo suite: 148 pass, 1 skip (Pinecone needs cloud creds). ruff clean. --- pyproject.toml | 3 + src/vectorpin/adapters/__init__.py | 3 + src/vectorpin/adapters/pgvector.py | 261 +++++++++++++++++++++++++++ src/vectorpin/cli.py | 48 +++++ tests/test_adapter_pgvector.py | 273 +++++++++++++++++++++++++++++ 5 files changed, 588 insertions(+) create mode 100644 src/vectorpin/adapters/pgvector.py create mode 100644 tests/test_adapter_pgvector.py diff --git a/pyproject.toml b/pyproject.toml index 85412eb..a7f15f5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,6 +37,7 @@ chroma = ["chromadb>=0.5"] pinecone = ["pinecone>=5.0"] # Note: package was renamed from `pinecone-client` to `pinecone` in 2024. qdrant = ["qdrant-client>=1.7"] +pgvector = ["psycopg[binary]>=3.1", "pgvector>=0.3"] faiss = ["faiss-cpu>=1.8"] detectors = ["scikit-learn>=1.3"] all = [ @@ -44,6 +45,8 @@ all = [ "chromadb>=0.5", "pinecone>=5.0", "qdrant-client>=1.7", + "psycopg[binary]>=3.1", + "pgvector>=0.3", "faiss-cpu>=1.8", "scikit-learn>=1.3", ] diff --git a/src/vectorpin/adapters/__init__.py b/src/vectorpin/adapters/__init__.py index 7cb22fb..6e42be1 100644 --- a/src/vectorpin/adapters/__init__.py +++ b/src/vectorpin/adapters/__init__.py @@ -28,6 +28,7 @@ if TYPE_CHECKING: from vectorpin.adapters.chroma import ChromaAdapter from vectorpin.adapters.lancedb import LanceDBAdapter + from vectorpin.adapters.pgvector import PgVectorAdapter from vectorpin.adapters.pinecone import PineconeAdapter from vectorpin.adapters.qdrant import QdrantAdapter @@ -36,6 +37,7 @@ "BaseAdapter", "ChromaAdapter", "LanceDBAdapter", + "PgVectorAdapter", "PineconeAdapter", "PinnedRecord", "QdrantAdapter", @@ -44,6 +46,7 @@ _LAZY_ADAPTERS = { "ChromaAdapter": ("vectorpin.adapters.chroma", "ChromaAdapter"), "LanceDBAdapter": ("vectorpin.adapters.lancedb", "LanceDBAdapter"), + "PgVectorAdapter": ("vectorpin.adapters.pgvector", "PgVectorAdapter"), "PineconeAdapter": ("vectorpin.adapters.pinecone", "PineconeAdapter"), "QdrantAdapter": ("vectorpin.adapters.qdrant", "QdrantAdapter"), } diff --git a/src/vectorpin/adapters/pgvector.py b/src/vectorpin/adapters/pgvector.py new file mode 100644 index 0000000..3d13407 --- /dev/null +++ b/src/vectorpin/adapters/pgvector.py @@ -0,0 +1,261 @@ +# Copyright 2025-2026 Jascha Wanger / Tarnover, LLC +# SPDX-License-Identifier: Apache-2.0 +"""pgvector adapter. + +pgvector is the de-facto vector store for teams that already operate +PostgreSQL and want to bolt embedding search onto an existing OLTP +database rather than stand up a dedicated vector service. From a +provenance perspective this is the most adversarial deployment shape: +a vector row is structurally indistinguishable from any other row, so +RBAC, backup, replication, and CDC pipelines all treat a poisoned +embedding as ordinary data. VectorPin's role in this environment is to +make the integrity property explicit — a verifier (the audit loop) can +walk the table out-of-band and surface any vector that doesn't match +its signed source/model binding. + +The on-disk shape this adapter expects is a single table with at least: + + - an identifier column (default: ``id``, TEXT-typed), + - a ``pgvector.vector`` column (default: ``embedding``), + - a JSONB pin column (default: ``vectorpin``). + +The pin column holds the canonical Pin JSON string (matching what +:meth:`vectorpin.attestation.Pin.to_json` emits) or NULL. Storing the +pin as JSONB rather than TEXT means downstream operators can index or +query into pin fields (``WHERE vectorpin->>'kid' = 'prod-2026-05'``) +without changing the adapter contract. + +Install with: ``pip install 'vectorpin[pgvector]'`` +""" + +from __future__ import annotations + +import json +import os +import re +from collections.abc import Iterator +from typing import TYPE_CHECKING, Any +from urllib.parse import urlparse + +import numpy as np + +from vectorpin.adapters.base import PIN_METADATA_KEY, BaseAdapter, PinnedRecord +from vectorpin.attestation import Pin + +if TYPE_CHECKING: + import psycopg + +_DEFAULT_TABLE = "embeddings" +_DEFAULT_ID_COLUMN = "id" +_DEFAULT_VECTOR_COLUMN = "embedding" +_DEFAULT_PIN_COLUMN = PIN_METADATA_KEY + +_IDENT_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$") +_LOOPBACK_HOSTS = frozenset({"localhost", "127.0.0.1", "::1"}) + + +def _validate_identifier(name: str, *, field: str) -> str: + """Reject anything that doesn't look like a bare SQL identifier. + + pgvector / postgres has no parameterized form for table or column + names, so adapters that interpolate them MUST validate against a + strict allowlist. Matches the LanceDB adapter's contract. + """ + if not _IDENT_RE.match(name): + raise ValueError( + f"invalid {field}: {name!r} (must match {_IDENT_RE.pattern})" + ) + return name + + +def _is_loopback(host: str | None) -> bool: + if not host: + return False + h = host.strip("[]").lower() + if h in _LOOPBACK_HOSTS: + return True + return h.startswith("127.") + + +def _enforce_tls(dsn: str) -> None: + """Refuse plaintext postgres connections to non-loopback hosts. + + Postgres credentials (``user:password@host``) typically live inside + the DSN string itself, so the same threat model as the Qdrant + adapter applies: a plaintext connection to a remote host leaks the + credential. Postgres TLS is controlled via the ``sslmode`` query + parameter; this check considers ``sslmode in {require, verify-ca, + verify-full}`` as TLS-enabled. ``sslmode`` absent or set to + ``disable``/``allow``/``prefer`` is treated as plaintext. + + Set ``VECTORPIN_ALLOW_INSECURE_HTTP=1`` (env-scoped escape hatch, + same as the Qdrant adapter) to bypass for trusted in-cluster overlay + deployments. + """ + parsed = urlparse(dsn) + if parsed.scheme not in {"postgresql", "postgres"}: + # Non-URL DSNs (e.g. keyword=value form) — leave the decision + # to libpq; nothing we can safely parse here. + return + if _is_loopback(parsed.hostname): + return + # Look for sslmode in the query string. + query = parsed.query or "" + sslmode = None + for pair in query.split("&"): + if pair.startswith("sslmode="): + sslmode = pair.split("=", 1)[1].lower() + if sslmode in {"require", "verify-ca", "verify-full"}: + return + if os.environ.get("VECTORPIN_ALLOW_INSECURE_HTTP") == "1": + return + raise ValueError( + "pgvector DSN to a non-loopback host without sslmode=require " + "refused (set VECTORPIN_ALLOW_INSECURE_HTTP=1 if you know what " + "you're doing, or append ?sslmode=require to the DSN)" + ) + + +class PgVectorAdapter(BaseAdapter): + """Wraps a pgvector-equipped Postgres table for VectorPin reads and writes. + + The adapter does not create the table — it only reads and updates. + Operators are expected to have provisioned the table with their own + schema; the only constraints VectorPin imposes are (a) the pin + column is JSONB or TEXT, (b) the vector column is a pgvector + ``vector(N)``, (c) the id column is comparable with ``=`` against a + Python string. + """ + + def __init__( + self, + conn: psycopg.Connection, + table_name: str, + *, + id_column: str = _DEFAULT_ID_COLUMN, + vector_column: str = _DEFAULT_VECTOR_COLUMN, + pin_column: str = _DEFAULT_PIN_COLUMN, + ) -> None: + self._conn = conn + self._table = _validate_identifier(table_name, field="table_name") + self._id = _validate_identifier(id_column, field="id_column") + self._vec = _validate_identifier(vector_column, field="vector_column") + self._pin = _validate_identifier(pin_column, field="pin_column") + # Register the pgvector type adapter on the connection if it + # isn't already registered. Safe to call repeatedly. + try: + from pgvector.psycopg import register_vector + register_vector(self._conn) + except ImportError as e: + raise ImportError( + "pgvector not installed. Run: pip install 'vectorpin[pgvector]'" + ) from e + + @classmethod + def connect( + cls, + dsn: str, + table_name: str, + *, + id_column: str = _DEFAULT_ID_COLUMN, + vector_column: str = _DEFAULT_VECTOR_COLUMN, + pin_column: str = _DEFAULT_PIN_COLUMN, + ) -> PgVectorAdapter: + """Open a Postgres connection and wrap a pgvector table. + + The DSN must use ``sslmode=require`` (or stronger) for any + non-loopback host. See :func:`_enforce_tls`. + """ + _enforce_tls(dsn) + try: + import psycopg + except ImportError as e: + raise ImportError( + "psycopg not installed. Run: pip install 'vectorpin[pgvector]'" + ) from e + conn = psycopg.connect(dsn, autocommit=True) + return cls( + conn, + table_name, + id_column=id_column, + vector_column=vector_column, + pin_column=pin_column, + ) + + def iter_records(self, *, batch_size: int = 256) -> Iterator[PinnedRecord]: + # Client-side cursor + fetchmany bounds the working set without + # requiring an explicit transaction. (Postgres server-side named + # cursors need a transaction; in autocommit mode psycopg refuses + # to DECLARE CURSOR — we'd have to drop autocommit just for the + # walk, which is more state to manage than fetchmany.) + sql = ( + f'SELECT "{self._id}", "{self._vec}", "{self._pin}" ' + f'FROM "{self._table}" ' + f'ORDER BY "{self._id}"' + ) + chunk = max(1, batch_size) + with self._conn.cursor() as cur: + cur.execute(sql) + while True: + rows = cur.fetchmany(chunk) + if not rows: + return + for row in rows: + yield self._row_to_record(row) + + def get(self, record_id: str) -> PinnedRecord: + sql = ( + f'SELECT "{self._id}", "{self._vec}", "{self._pin}" ' + f'FROM "{self._table}" WHERE "{self._id}" = %s' + ) + with self._conn.cursor() as cur: + cur.execute(sql, (record_id,)) + row = cur.fetchone() + if row is None: + raise KeyError(record_id) + return self._row_to_record(row) + + def attach_pin(self, record_id: str, pin: Pin) -> None: + # Store the pin as JSON. psycopg can cast a Python dict directly + # to JSONB via ``Jsonb``, but going through a plain ``::jsonb`` + # cast on the placeholder keeps the adapter agnostic about + # whether the pin column is JSONB or TEXT. + sql = ( + f'UPDATE "{self._table}" SET "{self._pin}" = %s::jsonb ' + f'WHERE "{self._id}" = %s' + ) + with self._conn.cursor() as cur: + cur.execute(sql, (pin.to_json(), record_id)) + if cur.rowcount == 0: + raise KeyError(record_id) + + # ---- internals ---- + + def _row_to_record(self, row: tuple[Any, Any, Any]) -> PinnedRecord: + rid, embedding, pin_payload = row + if embedding is None: + raise ValueError( + f"record {rid!r} has no vector in column {self._vec!r}" + ) + vector = np.asarray(embedding, dtype=np.float32) + if vector.ndim != 1: + raise ValueError( + f"vector for {rid!r} returned non-1D shape {vector.shape}" + ) + pin: Pin | None = None + if pin_payload is not None: + # JSONB columns come back as already-decoded Python objects + # (dict). TEXT columns come back as str. Handle both. + if isinstance(pin_payload, str): + pin = Pin.from_json(pin_payload) + elif isinstance(pin_payload, dict): + pin = Pin.from_dict(pin_payload) + else: + # Unknown shape — surface as JSON for the strict parser. + pin = Pin.from_json(json.dumps(pin_payload)) + return PinnedRecord( + id=str(rid), + vector=vector, + pin=pin, + metadata={}, + ) diff --git a/src/vectorpin/cli.py b/src/vectorpin/cli.py index 59f799a..6ca1e45 100644 --- a/src/vectorpin/cli.py +++ b/src/vectorpin/cli.py @@ -9,6 +9,7 @@ audit-lancedb Walk a LanceDB table and report on every record's pin. audit-chroma Walk a Chroma collection and report on every record's pin. audit-qdrant Walk a Qdrant collection and report on every record's pin. + audit-pgvector Walk a pgvector-equipped Postgres table and audit every pin. Run `vectorpin --help` for the canonical usage. """ @@ -216,6 +217,27 @@ def _cmd_audit_lancedb(args: argparse.Namespace) -> int: ) +def _cmd_audit_pgvector(args: argparse.Namespace) -> int: + from vectorpin.adapters.pgvector import PgVectorAdapter + + public_bytes = Path(args.public_key).read_bytes() + verifier = Verifier({args.key_id: public_bytes}) + adapter = PgVectorAdapter.connect( + args.dsn, + args.table, + id_column=args.id_column, + vector_column=args.vector_column, + pin_column=args.pin_column, + ) + return _audit_loop( + adapter.iter_records(batch_size=args.batch_size), + verifier, + source_column=None, + label_field="table", + label_value=args.table, + ) + + def _cmd_audit_chroma(args: argparse.Namespace) -> int: from vectorpin.adapters.chroma import ChromaAdapter @@ -322,6 +344,32 @@ def build_parser() -> argparse.ArgumentParser: p_audit_c.add_argument("--batch-size", type=int, default=256) p_audit_c.set_defaults(func=_cmd_audit_chroma) + p_audit_p = sub.add_parser( + "audit-pgvector", + help="audit every pin in a pgvector-equipped Postgres table", + ) + p_audit_p.add_argument( + "--dsn", + required=True, + help=( + "postgres DSN, e.g. postgresql://user:pass@host:5432/db?sslmode=require. " + "Non-loopback hosts require sslmode=require (or stronger) unless " + "VECTORPIN_ALLOW_INSECURE_HTTP=1 is set." + ), + ) + p_audit_p.add_argument("--table", required=True) + p_audit_p.add_argument("--public-key", required=True) + p_audit_p.add_argument("--key-id", required=True) + p_audit_p.add_argument("--id-column", default="id") + p_audit_p.add_argument("--vector-column", default="embedding") + p_audit_p.add_argument( + "--pin-column", + default="vectorpin", + help="JSONB or TEXT column holding the pin payload (default: vectorpin)", + ) + p_audit_p.add_argument("--batch-size", type=int, default=256) + p_audit_p.set_defaults(func=_cmd_audit_pgvector) + return parser diff --git a/tests/test_adapter_pgvector.py b/tests/test_adapter_pgvector.py new file mode 100644 index 0000000..571ba44 --- /dev/null +++ b/tests/test_adapter_pgvector.py @@ -0,0 +1,273 @@ +# Copyright 2025-2026 Jascha Wanger / Tarnover, LLC +# SPDX-License-Identifier: Apache-2.0 +"""PgVectorAdapter roundtrip tests. + +Two layers run here: + +1. Offline unit tests (run whenever ``psycopg`` is importable) — they + exercise the TLS guard, identifier validation, and constructor + plumbing without touching a database. These guard the security- + sensitive surface the audit found (parser/SQL-injection / leaked- + credential / mistyped identifier paths). + +2. Integration tests (run when ``PGVECTOR_URL`` points at a reachable + pgvector-equipped Postgres) — they walk a real table end-to-end: + create a schema, write two rows, attach pins, audit, verify under + :class:`Verifier`. Skipped silently otherwise. The VectorSmuggle + compose file in ``test_vector_dbs_docker/`` exposes a suitable + instance: ``postgresql://postgres:mypassword@localhost:5432/vectordb``. +""" + +from __future__ import annotations + +import os +import uuid + +import numpy as np +import pytest + +psycopg = pytest.importorskip("psycopg") +pgvector = pytest.importorskip("pgvector") +from pgvector.psycopg import register_vector + +from vectorpin import Signer, Verifier +from vectorpin.adapters import PIN_METADATA_KEY, PgVectorAdapter +from vectorpin.adapters.pgvector import ( + _enforce_tls, + _validate_identifier, +) + +# ---- offline tests (no database needed) ------------------------------------ + + +def test_validate_identifier_accepts_normal_names(): + assert _validate_identifier("embeddings", field="x") == "embeddings" + assert _validate_identifier("Embedding_Column_2", field="x") == "Embedding_Column_2" + assert _validate_identifier("_underscored", field="x") == "_underscored" + + +@pytest.mark.parametrize( + "bad", + [ + "1starts_with_digit", + "has space", + 'has"quote', + "has;semicolon", + "drop table foo --", + "", + "newline\nin\nname", + "tab\tname", + ], +) +def test_validate_identifier_rejects_hostile_names(bad): + with pytest.raises(ValueError, match="invalid"): + _validate_identifier(bad, field="x") + + +def test_enforce_tls_allows_loopback_plaintext(): + # Loopback hosts are exempt from the TLS requirement. + _enforce_tls("postgresql://u:p@localhost:5432/db") + _enforce_tls("postgresql://u:p@127.0.0.1:5432/db") + _enforce_tls("postgresql://u:p@[::1]:5432/db") + _enforce_tls("postgresql://u:p@127.0.0.42:5432/db") + + +def test_enforce_tls_allows_sslmode_require(): + _enforce_tls("postgresql://u:p@db.example.com:5432/x?sslmode=require") + _enforce_tls("postgresql://u:p@db.example.com:5432/x?sslmode=verify-ca") + _enforce_tls("postgresql://u:p@db.example.com:5432/x?sslmode=verify-full") + + +def test_enforce_tls_rejects_remote_plaintext(): + with pytest.raises(ValueError, match="sslmode=require"): + _enforce_tls("postgresql://u:p@db.example.com:5432/x") + with pytest.raises(ValueError, match="sslmode=require"): + _enforce_tls("postgresql://u:p@db.example.com:5432/x?sslmode=prefer") + with pytest.raises(ValueError, match="sslmode=require"): + _enforce_tls("postgres://u:p@db.example.com:5432/x?sslmode=disable") + + +def test_enforce_tls_env_escape_hatch(monkeypatch): + monkeypatch.setenv("VECTORPIN_ALLOW_INSECURE_HTTP", "1") + _enforce_tls("postgresql://u:p@db.example.com:5432/x") + + +def test_enforce_tls_skips_keyword_dsn(): + """Keyword=value DSNs (``host=db port=5432 user=u``) are not URL- + parseable; the function leaves them to libpq rather than guessing. + """ + _enforce_tls("host=db.example.com user=u port=5432 dbname=x") + + +# ---- integration tests (require a reachable pgvector instance) ------------- + +_DEFAULT_DSN = "postgresql://postgres:mypassword@localhost:5432/vectordb" + + +def _pgvector_dsn() -> str | None: + """Pick the DSN to use for integration tests. + + Order of precedence: + 1. ``VECTORPIN_TEST_PGVECTOR_URL`` env var (explicit opt-in). + 2. ``PGVECTOR_URL`` env var (shared with VectorSmuggle backend). + 3. The compose-file default (``postgres:mypassword@localhost``) + if the connection succeeds. + + Returns ``None`` if no instance is reachable, which causes the + integration tests to skip rather than fail. + """ + candidates = [ + os.environ.get("VECTORPIN_TEST_PGVECTOR_URL"), + os.environ.get("PGVECTOR_URL"), + _DEFAULT_DSN, + ] + for dsn in candidates: + if not dsn: + continue + try: + with psycopg.connect(dsn, connect_timeout=2) as conn: + with conn.cursor() as cur: + cur.execute("SELECT 1") + cur.fetchone() + return dsn + except Exception: + continue + return None + + +@pytest.fixture(scope="module") +def pgvector_dsn(): + dsn = _pgvector_dsn() + if dsn is None: + pytest.skip( + "no reachable pgvector instance " + "(set VECTORPIN_TEST_PGVECTOR_URL or start the compose service)" + ) + return dsn + + +@pytest.fixture +def pgvector_table(pgvector_dsn): + """Create a per-test table with two rows and an empty pin column.""" + table = f"vectorpin_test_{uuid.uuid4().hex[:10]}" + with psycopg.connect(pgvector_dsn, autocommit=True) as conn: + with conn.cursor() as cur: + cur.execute("CREATE EXTENSION IF NOT EXISTS vector") + register_vector(conn) + with conn.cursor() as cur: + cur.execute( + f""" + CREATE TABLE "{table}" ( + id TEXT PRIMARY KEY, + embedding vector(16) NOT NULL, + {PIN_METADATA_KEY} JSONB + ) + """ + ) + cur.execute( + f'INSERT INTO "{table}" (id, embedding) VALUES (%s, %s)', + ("a", [0.1] * 16), + ) + cur.execute( + f'INSERT INTO "{table}" (id, embedding) VALUES (%s, %s)', + ("b", [0.2] * 16), + ) + yield (pgvector_dsn, table) + with conn.cursor() as cur: + cur.execute(f'DROP TABLE IF EXISTS "{table}"') + + +def test_iter_records_returns_unpinned(pgvector_table): + dsn, table = pgvector_table + adapter = PgVectorAdapter.connect(dsn, table) + records = list(adapter.iter_records()) + assert {r.id for r in records} == {"a", "b"} + assert all(r.pin is None for r in records) + for r in records: + assert r.vector.shape == (16,) + + +def test_attach_pin_and_get(pgvector_table): + dsn, table = pgvector_table + adapter = PgVectorAdapter.connect(dsn, table) + signer = Signer.generate(key_id="test-key") + + rec = adapter.get("a") + assert rec.pin is None + + pin = signer.pin(source="alpha", model="bench-model", vector=rec.vector) + adapter.attach_pin("a", pin) + + refreshed = adapter.get("a") + assert refreshed.pin is not None + assert refreshed.pin.kid == "test-key" + assert refreshed.pin.header.model == "bench-model" + + +def test_full_roundtrip_verifies(pgvector_table): + dsn, table = pgvector_table + adapter = PgVectorAdapter.connect(dsn, table) + signer = Signer.generate(key_id="test-key") + verifier = Verifier(public_keys={signer.key_id: signer.public_key_bytes()}) + + # Sign every record (using id as the source for the test). + for record in adapter.iter_records(): + pin = signer.pin( + source=record.id, + model="bench-model", + vector=record.vector, + ) + adapter.attach_pin(record.id, pin) + + # Re-read and verify under strict v2 rules. + for record in adapter.iter_records(): + assert record.pin is not None + result = verifier.verify( + record.pin, + source=record.id, + vector=record.vector, + ) + assert result.ok, result + + +def test_get_raises_keyerror_for_unknown_id(pgvector_table): + dsn, table = pgvector_table + adapter = PgVectorAdapter.connect(dsn, table) + with pytest.raises(KeyError): + adapter.get("does-not-exist") + + +def test_attach_pin_raises_keyerror_for_unknown_id(pgvector_table): + dsn, table = pgvector_table + adapter = PgVectorAdapter.connect(dsn, table) + signer = Signer.generate(key_id="test-key") + pin = signer.pin( + source="x", model="m", vector=np.full(16, 0.1, dtype=np.float32) + ) + with pytest.raises(KeyError): + adapter.attach_pin("not-there", pin) + + +def test_loopback_dsn_does_not_trigger_tls_guard(pgvector_table): + """Sanity: the integration connect path doesn't tripped the TLS + guard against the loopback DSN the fixture uses.""" + dsn, table = pgvector_table + # No env var, no sslmode — should still work because it's loopback. + adapter = PgVectorAdapter.connect(dsn, table) + _ = list(adapter.iter_records()) + + +def test_invalid_table_name_rejected(pgvector_table): + dsn, _table = pgvector_table + with pytest.raises(ValueError, match="invalid table_name"): + PgVectorAdapter.connect(dsn, 'bad"name') + + +def test_invalid_column_name_rejected(pgvector_table): + dsn, table = pgvector_table + with pytest.raises(ValueError, match="invalid id_column"): + PgVectorAdapter.connect(dsn, table, id_column="drop; --") + with pytest.raises(ValueError, match="invalid vector_column"): + PgVectorAdapter.connect(dsn, table, vector_column="x; SELECT") + with pytest.raises(ValueError, match="invalid pin_column"): + PgVectorAdapter.connect(dsn, table, pin_column="not\nok") From 3a6907159d7034e30d764e7859fc39a86dffa74c Mon Sep 17 00:00:00 2001 From: Jascha Date: Fri, 15 May 2026 11:11:57 -0700 Subject: [PATCH 2/2] Add scripts/pinecone_live_e2e.py for manual Pinecone verification MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The existing tests/test_adapter_pinecone.py::test_pinecone_live_roundtrip requires a pre-populated index and a known record id to fetch — fine for repeat-CI use but unfriendly for a first-time check. This script is self-contained: it creates a fresh serverless index, seeds one record, runs the full sign-attach-verify roundtrip via PineconeAdapter, checks tamper rejection, and deletes the index on exit via try/finally so a failure cannot leak resources in the operator's account. Verified PASS against live Pinecone Serverless (AWS us-east-1, free- tier-eligible). Cost per run: well under one cent. Usage: export PINECONE_API_KEY=pcsk_xxx python scripts/pinecone_live_e2e.py Optional knobs documented in the module docstring: PINECONE_INDEX_NAME, PINECONE_NAMESPACE, PINECONE_CLOUD, PINECONE_REGION, PINECONE_READY_TIMEOUT. Tamper-rejection assertion uses VerifyError.SOURCE_MISMATCH (enum comparison) rather than the .value string form. --- scripts/pinecone_live_e2e.py | 179 +++++++++++++++++++++++++++++++++++ 1 file changed, 179 insertions(+) create mode 100755 scripts/pinecone_live_e2e.py diff --git a/scripts/pinecone_live_e2e.py b/scripts/pinecone_live_e2e.py new file mode 100755 index 0000000..3fd296f --- /dev/null +++ b/scripts/pinecone_live_e2e.py @@ -0,0 +1,179 @@ +#!/usr/bin/env python +# Copyright 2025-2026 Jascha Wanger / Tarnover, LLC +# SPDX-License-Identifier: Apache-2.0 +"""End-to-end verification of PineconeAdapter against Pinecone Cloud. + +This is a manual integration check, not a CI test. It creates a fresh +serverless index, seeds one record, runs the full +attach-pin / re-fetch / verify roundtrip via :class:`PineconeAdapter`, +exercises a tamper-rejection path, and deletes the index on exit +(success *or* failure, via ``try / finally``). + +Use it when: + + - You want to confirm the adapter still works against the live + Pinecone API after a client-library upgrade. + - You want a no-fixtures-required smoke test against a real account. + - You want a worked example of the create-seed-verify-cleanup + pattern for opt-in cloud integration scripts. + +Usage +----- + +:: + + export PINECONE_API_KEY=pcsk_xxx + python scripts/pinecone_live_e2e.py + +Optional env vars (all have safe defaults): + + PINECONE_INDEX_NAME name to create (default: vectorpin-e2e-) + PINECONE_NAMESPACE namespace for the seed record (default: vectorpin-test) + PINECONE_CLOUD serverless cloud (default: aws) + PINECONE_REGION serverless region (default: us-east-1) + PINECONE_READY_TIMEOUT seconds to wait for index ready (default: 120) + +Cost note +--------- + +On Pinecone Serverless the cost of one create + one upsert + a handful +of fetches against a 16-dim record is well under one cent. The index is +deleted on exit; nothing persists in the account after the script +returns. +""" + +from __future__ import annotations + +import os +import sys +import time +import uuid + +import numpy as np +from pinecone import Pinecone, ServerlessSpec + +from vectorpin import Signer, Verifier +from vectorpin.adapters import PineconeAdapter + + +def main() -> int: + api_key = os.environ.get("PINECONE_API_KEY") + if not api_key: + print("PINECONE_API_KEY not set", file=sys.stderr) + return 2 + + index_name = os.environ.get( + "PINECONE_INDEX_NAME", f"vectorpin-e2e-{uuid.uuid4().hex[:10]}" + ) + namespace = os.environ.get("PINECONE_NAMESPACE", "vectorpin-test") + cloud = os.environ.get("PINECONE_CLOUD", "aws") + region = os.environ.get("PINECONE_REGION", "us-east-1") + ready_timeout = int(os.environ.get("PINECONE_READY_TIMEOUT", "120")) + + dim = 16 + record_id = "test-record-1" + + pc = Pinecone(api_key=api_key) + + print( + f"[1/6] creating serverless index {index_name!r} " + f"({dim}-dim, cosine, {cloud} {region})" + ) + pc.create_index( + name=index_name, + dimension=dim, + metric="cosine", + spec=ServerlessSpec(cloud=cloud, region=region), + ) + + try: + print(f"[2/6] waiting for index ready (up to {ready_timeout}s)...") + start = time.time() + while True: + desc = pc.describe_index(index_name) + if desc.status.get("ready"): + print(f" ready after {time.time() - start:.1f}s") + break + if time.time() - start > ready_timeout: + raise TimeoutError( + f"index did not become ready within {ready_timeout}s" + ) + time.sleep(2) + + # Seed one record. + print(f"[3/6] seeding record {record_id!r}") + rng = np.random.default_rng(42) + vec = rng.normal(0, 1, dim).astype(np.float32) + vec /= np.linalg.norm(vec) + index = pc.Index(name=index_name) + index.upsert( + vectors=[(record_id, vec.tolist(), {"source": "live-roundtrip"})], + namespace=namespace, + ) + # Pinecone serverless is eventually consistent on upsert; brief + # pause before fetching. + time.sleep(2) + + # Adapter-driven fetch. + print(f"[4/6] adapter: fetch {record_id!r}") + adapter = PineconeAdapter.connect(api_key, index_name, namespace=namespace) + rec = adapter.get(record_id) + assert rec.id == record_id, f"id mismatch: {rec.id}" + assert rec.vector.shape == (dim,), f"shape mismatch: {rec.vector.shape}" + assert rec.pin is None, "fresh record should have no pin" + print(f" OK, fetched {dim}-dim vector") + + print("[5/6] sign + attach + re-fetch + verify (+ tamper rejection)") + signer = Signer.generate(key_id="vectorpin-pinecone-e2e") + verifier = Verifier( + public_keys={signer.key_id: signer.public_key_bytes()} + ) + pin = signer.pin( + source="live-roundtrip", model="bench-model", vector=rec.vector + ) + adapter.attach_pin(record_id, pin) + time.sleep(2) # eventual consistency on metadata update + refreshed = adapter.get(record_id) + assert refreshed.pin is not None, "pin missing after attach_pin" + assert refreshed.pin.kid == "vectorpin-pinecone-e2e", refreshed.pin.kid + assert refreshed.pin.header.model == "bench-model" + print( + f" pin attached, kid={refreshed.pin.kid!r}, " + f"v={refreshed.pin.header.v}" + ) + + result = verifier.verify( + refreshed.pin, source="live-roundtrip", vector=refreshed.vector + ) + assert result, f"verify failed: {result.error}" + print(" verify: OK") + + # Tamper check: modify the source string and confirm verify rejects. + from vectorpin import VerifyError + + bad = verifier.verify( + refreshed.pin, source="tampered", vector=refreshed.vector + ) + assert not bad, "verify should have rejected tampered source" + assert ( + bad.error is VerifyError.SOURCE_MISMATCH + ), f"expected SOURCE_MISMATCH, got {bad.error}" + print(f" tamper rejection: {bad.error.name} (correct)") + + print("[6/6] end-to-end PASS") + return 0 + + finally: + print(f"\n[cleanup] deleting index {index_name!r}") + try: + pc.delete_index(index_name) + print("[cleanup] done") + except Exception as e: + print( + f"[cleanup] WARN: failed to delete index: {e}", + file=sys.stderr, + ) + + +if __name__ == "__main__": + sys.exit(main())