SapixDBSapixDB/Docs
Home
Python SDK · v0.1

Python SDK

Sync and async Python client for SapixDB. Works with FastAPI, Django, Flask, and plain scripts. Python 3.9+.

Installation

pip
pip install sapixdb
uv
uv add sapixdb
poetry
poetry add sapixdb

Quick Start — Sync

Python
from sapixdb import SapixClient

db = SapixClient(url="http://localhost:7475", agent="my-app")

# Check connection
print(db.ping())  # True

# Write a record
record = db.collection("products").write({
    "name":  "Classic T-Shirt",
    "price": 29.99,
    "stock": 100,
})
print(record.id)    # "nuc_abc123"
print(record.hash)  # "sha3:e7f2a1..."  — cryptographic proof

# Read latest records
products = db.collection("products").latest()

# Filter
shirts = db.collection("products").find({"category": "apparel"})

# Time travel
from datetime import datetime, timedelta, timezone
yesterday = (datetime.now(timezone.utc) - timedelta(days=1)).isoformat()
snapshot = db.collection("orders").as_of(yesterday).latest()

Async Client

AsyncSapixClient has an identical API to SapixClient — every method is just awaitable. Use it with FastAPI, asyncio scripts, or any async framework.

Python — asyncio
import asyncio
from sapixdb import AsyncSapixClient

async def main():
    db = AsyncSapixClient(url="http://localhost:7475", agent="my-app")

    record = await db.collection("products").write({"name": "T-Shirt", "price": 29.99})
    products = await db.collection("products").latest()

    # Async context manager
    async with AsyncSapixClient(url="http://localhost:7475", agent="my-app") as db:
        shirts = await db.collection("products").find({"category": "apparel"})

asyncio.run(main())

FastAPI Integration

Python — main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from sapixdb import AsyncSapixClient, SapixNotFoundError

app = FastAPI()
db  = AsyncSapixClient(url="http://localhost:7475", agent="store")


class ProductIn(BaseModel):
    name:     str
    price:    float
    stock:    int = 0
    category: str | None = None


@app.post("/products", status_code=201)
async def create_product(body: ProductIn):
    record = await db.collection("products").write(body.model_dump())
    return {"id": record.id, "hash": record.hash, "timestamp": record.timestamp}


@app.get("/products")
async def list_products(category: str | None = None):
    filter_ = {"category": category} if category else None
    items = await db.collection("products").latest(filter=filter_)
    return [{"id": r.id, **r.data} for r in items]


@app.get("/products/{record_id}")
async def get_product(record_id: str):
    try:
        r = await db.collection("products").get(record_id)
        return {"id": r.id, **r.data}
    except SapixNotFoundError:
        raise HTTPException(status_code=404, detail="Product not found")

Collection API

.write(data)WriteResult
Append a new record. Returns WriteResult with id, hash, prev_hash, timestamp. Nothing is ever overwritten — every write is permanent.
.write_batch(records)list[WriteResult]
Write multiple records. Async version runs them concurrently with asyncio.gather.
.get(record_id)NucleotideRecord
Fetch by nucleotide ID. Raises SapixNotFoundError if missing.
.latest(*, filter?, limit?)list[NucleotideRecord]
Current (most recent) version of every record in the collection.
.history(*, filter?, limit?)list[NucleotideRecord]
Full append-only history — every version ever written.
.find(filter, *, limit?)list[NucleotideRecord]
Filter records (latest version only).
.find_one(filter)NucleotideRecord | None
First match or None. Never raises on empty result.
.as_of(timestamp)CollectionQuery
Scope reads to a point in time. Returns a query object with .latest(), .find(), .find_one(), .all().

Time Travel

Python
from datetime import datetime, timedelta, timezone

# 30 minutes ago
ts = (datetime.now(timezone.utc) - timedelta(minutes=30)).isoformat()

snapshot = db.collection("orders").as_of(ts).find({"customer_id": "cust_001"})
# Returns orders exactly as they existed 30 minutes ago

Graph Relationships

db.graph.relate(src, dst, edge_type, weight=1.0)None
Create a typed directed edge. Cleanest way to express relationships.
db.graph.traverse(from_id, *, depth=1, direction='outbound')TraverseResult
Walk the graph. Returns TraverseResult with .nodes and .edges. Direction: "outbound" | "inbound" | "both".
db.graph.neighbors(node_id, direction='outbound')list[NucleotideRecord]
Direct neighbours — depth=1 shortcut.
Python
# Link order → customer
db.graph.relate(order.id, customer.id, "placed_by")
db.graph.relate(order.id, product.id, "contains")

# Find everything connected to a customer (depth 2)
result = db.graph.traverse(customer.id, depth=2, direction="inbound")
print(result.nodes)  # list of NucleotideRecord
print(result.edges)  # list of GraphEdge

Agent Ingest

Python — log every AI decision
# Log AI agent decisions permanently and immutably
db.ingest("ai_decisions", {
    "model":      "gpt-4o",
    "action":     "approve_loan",
    "confidence": 0.94,
    "applicant":  "cust_001",
    "reasoning":  "Credit score 780, DTI 28%",
})
# Every decision is cryptographically signed — you can always prove
# what the AI decided, when, and why.

Error Handling

Python
from sapixdb import SapixError, SapixNetworkError, SapixNotFoundError

try:
    record = db.collection("orders").get("nuc_missing")

except SapixNotFoundError as e:
    print(f"Not found: {e.record_id}")         # record ID that was missing

except SapixNetworkError:
    print("Cannot reach SapixDB — is it running?")

except SapixError as e:
    print(f"Error {e.status}: {e}")            # HTTP status + message

Full Example: Online Store

Python — store.py
from sapixdb import SapixClient

db = SapixClient(url="http://localhost:7475", agent="store")

# 1. Add product
shirt = db.collection("products").write({
    "sku": "SHIRT-001", "name": "Classic T-Shirt",
    "price": 29.99, "stock": 200, "category": "apparel",
})

# 2. Register customer
customer = db.collection("customers").write({
    "name": "Alice Johnson", "email": "[email protected]",
})

# 3. Place order
order = db.collection("orders").write({
    "customer_id": customer.id,
    "items": [{"product_id": shirt.id, "qty": 2, "unit_price": 29.99}],
    "total": 59.98,
    "status": "placed",
})

# 4. Link in graph
db.graph.relate(order.id, customer.id, "placed_by")
db.graph.relate(order.id, shirt.id, "contains")

# 5. Ship (append — "placed" version is preserved forever)
db.collection("orders").write({
    "customer_id": customer.id,
    "status":      "shipped",
    "tracking":    "UPS-1Z999AA10123456784",
})

# 6. Audit: what was the status when it was placed?
original = db.collection("orders").as_of(order.timestamp).find_one(
    {"customer_id": customer.id}
)
print(original.data["status"])  # "placed" — not "shipped"

Realtime Subscriptions (SSE)

Subscribe to live writes on any agent using Server-Sent Events. subscribe_agent() and subscribe_global() are Python generators — iterate them with a for loop (sync) or async for (async). The connection stays open until you break out of the loop or the server closes it.

.subscribe_agent(agent_id, *, since, filter_field)Iterator[StreamEvent]
Blocking generator that yields StreamEvent objects for every write to the named agent. since is an HLC timestamp for backfill; filter_field limits events to those whose JSON payload contains that field name.
.subscribe_global(*, agents, filter_field)Iterator[StreamEvent]
Blocking generator for all agents. agents restricts to a list of agent IDs.
Python — sync per-agent stream
from sapixdb import SapixClient, StreamEvent

db = SapixClient(url="http://localhost:7475", agent="my-app")

# Blocking loop — runs until the server closes the stream
for event in db.subscribe_agent("orders"):
    print(f"New record: {event.record_id}  payload={event.payload}")
    if event.payload and event.payload.get("status") == "shipped":
        send_shipping_notification(event.payload)
Python — sync with HLC backfill + field filter
last_seen_hlc = 1748304000000   # replay writes after this HLC timestamp first

for event in db.subscribe_agent(
    "transactions",
    since=last_seen_hlc,
    filter_field="risk_score",   # only events whose payload has risk_score
):
    if event.payload["risk_score"] >= 0.8:
        flag_for_review(event)
Python — async (FastAPI / asyncio)
from sapixdb import AsyncSapixClient

async def stream_handler():
    async with AsyncSapixClient(url="http://localhost:7475", agent="my-app") as db:
        async for event in db.subscribe_agent("orders"):
            await process_event(event)

# Global stream, two agents only
async def global_stream():
    async with AsyncSapixClient(url="http://localhost:7475", agent="my-app") as db:
        async for event in db.subscribe_global(agents=["orders", "payments"]):
            print(event.agent_id, event.record_id, event.timestamp_ms)
Python — StreamEvent fields
from sapixdb import StreamEvent

event: StreamEvent
event.agent_id      # str  — which agent was written to
event.event_type    # str  — always "record.written"
event.record_id     # str  — unique record ID
event.content_hash  # str  — hex content hash (cryptographic proof)
event.timestamp_ms  # int  — write time as Unix milliseconds
event.payload       # Any  — decoded JSON payload (None for raw writes)
Also available: JavaScript / TypeScript and Go SDKs

npm install sapixdb (JS/TS) · go get github.com/sapixdb/sapixdb-go (Go) — same realtime API, every language.