SapixDBSapixDB/Docs
Early Access
Phase 4 · Connector Hub

Connector Hub

The Connector Hub (Zone 7) bridges external systems into SapixDB and fans strand writes out to the rest of your stack. Connectors are first-class named entities — stored in the GraphIndex, surviving restarts. Outbound connectors fire automatically after every successful strand write.

REST Bridge
Inbound · rest_bridge

Any external system POSTs JSON to a named endpoint; written to the strand immediately.

PostgreSQL CDC
Inbound · postgres_cdc
Config only

Logical replication slot config — stored now, background consumer ships in Phase 5.

Kafka
Inbound · kafka
Config only

Kafka topic + consumer group config — stored now, consumer ships in Phase 5.

Webhook
Outbound · webhook

HTTP POST to your URL after every strand write. Authenticated with a Bearer token.

S3
Outbound · s3

PUT {hash}.json to an S3-compatible endpoint (MinIO, R2) after every strand write.

REST Bridge Inbound · Live

Any external system can push records directly to SapixDB by POSTing JSON to the connector's named push endpoint. No SDK required on the sender side. The payload is tagged with _connector_id, encoded as MessagePack, and appended to the strand. Outbound connectors are notified immediately after.

1 — Create the bridge
POST /v1/connectors
{
  "name":   "IoT Bridge",
  "kind":   "rest_bridge",
  "config": { "flags": 0 }
}
// → { "id": "abc123", "name": "IoT Bridge", "kind": "rest_bridge",
//     "direction": "inbound", "enabled": true, "event_count": 0, ... }
2 — Push from any external system
POST /v1/connectors/inbound/rest/abc123/push
Content-Type: application/json

{ "sensor": "temp-7", "value": 22.4, "unit": "celsius" }
// → { "record_hash": "3f8a2c…", "connector_id": "abc123" }

// Optional: set strand flags
POST /v1/connectors/inbound/rest/abc123/push?flags=2
X-Record-Flags: 2
// flags=2 → TOMBSTONE
_connector_id is auto-injectedEvery payload pushed through a REST Bridge gets a _connector_id field added before strand write. This lets you filter records by source in SaQL queries without any changes to the sending system.

PostgreSQL CDC Inbound · Config only

Store your logical replication slot configuration now. The background consumer process that reads from PostgreSQL WAL and writes change events to the strand ships in Phase 5.

POST /v1/connectors
{
  "name": "Orders CDC",
  "kind": "postgres_cdc",
  "config": {
    "connection_string": "postgres://user:pass@host:5432/mydb",
    "slot":              "sapix_slot",
    "publication":       "sapix_pub",
    "table_filter":      "public.*"
  }
}

To set up the PostgreSQL side before the worker arrives:

PostgreSQL setup (run once)
-- Create the publication
CREATE PUBLICATION sapix_pub FOR ALL TABLES;

-- Create the replication slot
SELECT pg_create_logical_replication_slot('sapix_slot', 'pgoutput');

Kafka Inbound · Config only

Store your Kafka topic and consumer group configuration now. The consumer that reads messages and writes them as strand records ships in Phase 5.

POST /v1/connectors
{
  "name": "Events stream",
  "kind": "kafka",
  "config": {
    "brokers":           ["broker1:9092", "broker2:9092"],
    "topic":             "sapixdb-events",
    "group_id":          "sapix-consumer",
    "auto_offset_reset": "latest"
  }
}

Webhook Outbound · Live

After every successful POST /v1/records write, SapixDB POSTs a JSON event to your webhook URL. Multiple Webhook connectors are all fired concurrently (fire-and-forget — failures are logged but not retried in this release).

POST /v1/connectors
{
  "name":   "Slack alerts",
  "kind":   "webhook",
  "config": {
    "url":    "https://hooks.slack.com/services/T00…",
    "secret": "optional-bearer-token"
  }
}
event body delivered to your endpoint
POST https://hooks.slack.com/services/T00…
Authorization: Bearer optional-bearer-token
Content-Type: application/json

{
  "event":        "write",
  "agent_id":     "payments-agent",
  "record_hash":  "3f8a2c…",
  "payload":      { "amount": 142.50, "currency": "USD" },
  "timestamp_ms": 1747003200000
}

S3 Outbound · Live

After every strand write, SapixDB PUTs a JSON document to {endpoint}/{bucket}/{prefix}/{record_hash}.json. Compatible with MinIO, Cloudflare R2 (public buckets with a Bearer token), and any S3-compatible API that accepts Authorization: Bearer. AWS SigV4 signing is planned for Phase 5.

POST /v1/connectors — Cloudflare R2
{
  "name": "R2 archive",
  "kind": "s3",
  "config": {
    "endpoint":    "https://account.r2.cloudflarestorage.com",
    "bucket":      "sapixdb-archive",
    "prefix":      "prod/records",
    "auth_header": "Bearer <R2-token>"
  }
}
// Writes land at:
// PUT https://account.r2.cloudflarestorage.com/sapixdb-archive/prod/records/{hash}.json

Management API

List all connectors

GET /v1/connectors
{
  "connectors": [
    {
      "id":           "abc123",
      "name":         "IoT Bridge",
      "kind":         "rest_bridge",
      "direction":    "inbound",
      "enabled":      true,
      "created_at_ms": 1747000000000,
      "config":       { "flags": 0 },
      "last_event_ms": 1747003200000,
      "event_count":  142
    }
  ],
  "inbound_count":  2,
  "outbound_count": 1
}

Update (enable / disable / reconfigure)

PATCH /v1/connectors/:id
// Disable without deleting
{ "enabled": false }

// Change target URL of a webhook
{ "config": { "url": "https://new-endpoint.example.com/hook", "secret": "tok" } }

// Both at once
{ "name": "Renamed", "enabled": true, "config": { ... } }

// → 200 ConnectorConfig  |  404 if not found
DELETE /v1/connectors/:id
// → { "deleted": true, "id": "abc123" }
// Outbound connectors stop firing immediately after deletion.

Python

installation
pip install sapixdb-connectors
REST Bridge + Webhook + S3 in one script
import asyncio
from sapixdb_connectors import ConnectorClient

async def main():
    async with ConnectorClient("http://localhost:7475") as hub:
        # --- Inbound: REST Bridge -----------------------------------------
        bridge = await hub.add_rest_bridge("IoT Bridge")
        print(f"Bridge id: {bridge.id}")

        # Push a record through it (simulates an external system)
        result = await hub.rest_push(
            bridge.id,
            {"sensor": "temp-7", "celsius": 22.4, "location": "rack-3"},
        )
        print(f"Written: {result.record_hash}")

        # --- Outbound: Webhook --------------------------------------------
        wh = await hub.add_webhook(
            "Ops alerts",
            url="https://hooks.slack.com/services/T00…",
            secret="xoxb-my-token",
        )
        print(f"Webhook id: {wh.id}")

        # --- Outbound: S3 ------------------------------------------------
        s3 = await hub.add_s3(
            "R2 archive",
            endpoint="https://account.r2.cloudflarestorage.com",
            bucket="sapixdb-archive",
            prefix="prod/records",
            auth_header="Bearer <token>",
        )

        # --- Inspect -------------------------------------------------------
        resp = await hub.list_connectors()
        print(f"Inbound: {resp.inbound_count}  Outbound: {resp.outbound_count}")
        for c in resp.connectors:
            status = "✓" if c.enabled else "✗"
            print(f"  {status} [{c.direction:<8}] {c.name:<20} events={c.event_count}")

        # --- Disable / delete ----------------------------------------------
        await hub.disable(wh.id)
        await hub.delete_connector(s3.id)

asyncio.run(main())

ConnectorClient API

MethodDescription
list_connectors()Return all connectors with inbound/outbound counts.
create_connector(name, kind, config)Create any connector type. Direction is inferred from kind.
get_connector(id)Get a connector by id. Raises 404 if not found.
update_connector(id, *, name, enabled, config)Patch any combination of name, enabled, config.
delete_connector(id)Delete a connector. Outbound connectors stop firing immediately.
rest_push(id, body, *, flags)Push a JSON payload through a REST Bridge connector.
add_rest_bridge(name, *, flags)Convenience: create a REST Bridge inbound connector.
add_webhook(name, url, *, secret)Convenience: create a Webhook outbound connector.
add_s3(name, endpoint, bucket, *, prefix, auth_header)Convenience: create an S3 outbound connector.
add_postgres_cdc(name, connection_string, slot, publication, table_filter)Convenience: store PostgreSQL CDC config.
add_kafka(name, brokers, topic, *, group_id, auto_offset_reset)Convenience: store Kafka consumer config.
enable(id) / disable(id)Toggle a connector on or off without deleting it.
SapixClient convenience methodsIf you are already using sapixdb-agent, SapixClient exposes raw-dict wrappers — connector_list(), connector_create(), connector_get(), connector_update(), connector_delete(), and connector_rest_push(). Use sapixdb-connectors for typed Pydantic models and the named convenience factories.

Known Limitations

PostgreSQL CDC and Kafka are config-only
The background worker processes that read from these sources are planned for Phase 5. Create the connectors now to reserve the configuration.
Outbound failures are not retried
If a webhook or S3 PUT fails, the failure is logged as a warning and skipped. A retry queue with exponential backoff is planned.
S3 uses Bearer token auth only
AWS SigV4 signing is not yet implemented. Use MinIO, Cloudflare R2, or pre-signed URLs for AWS S3 in production.
No rate limiting on REST Bridge
The push endpoint accepts any throughput. Add a reverse proxy (nginx, Caddy) with rate limiting for high-throughput production deployments.
Every write already has a cryptographic audit trail.

Connectors fan data out. The strand keeps the immutable receipt. Add compliance packages to add HIPAA access logging or SOX dual-admin sign-off on top.