Connector Hub
The Connector Hub (Zone 7) bridges external systems into SapixDB and fans strand writes out to the rest of your stack. Connectors are first-class named entities — stored in the GraphIndex, surviving restarts. Outbound connectors fire automatically after every successful strand write.
rest_bridgeAny external system POSTs JSON to a named endpoint; written to the strand immediately.
postgres_cdcLogical replication slot config — stored now, background consumer ships in Phase 5.
kafkaKafka topic + consumer group config — stored now, consumer ships in Phase 5.
webhookHTTP POST to your URL after every strand write. Authenticated with a Bearer token.
s3PUT {hash}.json to an S3-compatible endpoint (MinIO, R2) after every strand write.
REST Bridge Inbound · Live
Any external system can push records directly to SapixDB by POSTing JSON to the connector's named push endpoint. No SDK required on the sender side. The payload is tagged with _connector_id, encoded as MessagePack, and appended to the strand. Outbound connectors are notified immediately after.
POST /v1/connectors
{
"name": "IoT Bridge",
"kind": "rest_bridge",
"config": { "flags": 0 }
}
// → { "id": "abc123", "name": "IoT Bridge", "kind": "rest_bridge",
// "direction": "inbound", "enabled": true, "event_count": 0, ... }POST /v1/connectors/inbound/rest/abc123/push
Content-Type: application/json
{ "sensor": "temp-7", "value": 22.4, "unit": "celsius" }
// → { "record_hash": "3f8a2c…", "connector_id": "abc123" }
// Optional: set strand flags
POST /v1/connectors/inbound/rest/abc123/push?flags=2
X-Record-Flags: 2
// flags=2 → TOMBSTONE_connector_id field added before strand write. This lets you filter records by source in SaQL queries without any changes to the sending system.PostgreSQL CDC Inbound · Config only
Store your logical replication slot configuration now. The background consumer process that reads from PostgreSQL WAL and writes change events to the strand ships in Phase 5.
{
"name": "Orders CDC",
"kind": "postgres_cdc",
"config": {
"connection_string": "postgres://user:pass@host:5432/mydb",
"slot": "sapix_slot",
"publication": "sapix_pub",
"table_filter": "public.*"
}
}To set up the PostgreSQL side before the worker arrives:
-- Create the publication
CREATE PUBLICATION sapix_pub FOR ALL TABLES;
-- Create the replication slot
SELECT pg_create_logical_replication_slot('sapix_slot', 'pgoutput');Kafka Inbound · Config only
Store your Kafka topic and consumer group configuration now. The consumer that reads messages and writes them as strand records ships in Phase 5.
{
"name": "Events stream",
"kind": "kafka",
"config": {
"brokers": ["broker1:9092", "broker2:9092"],
"topic": "sapixdb-events",
"group_id": "sapix-consumer",
"auto_offset_reset": "latest"
}
}Webhook Outbound · Live
After every successful POST /v1/records write, SapixDB POSTs a JSON event to your webhook URL. Multiple Webhook connectors are all fired concurrently (fire-and-forget — failures are logged but not retried in this release).
{
"name": "Slack alerts",
"kind": "webhook",
"config": {
"url": "https://hooks.slack.com/services/T00…",
"secret": "optional-bearer-token"
}
}POST https://hooks.slack.com/services/T00…
Authorization: Bearer optional-bearer-token
Content-Type: application/json
{
"event": "write",
"agent_id": "payments-agent",
"record_hash": "3f8a2c…",
"payload": { "amount": 142.50, "currency": "USD" },
"timestamp_ms": 1747003200000
}S3 Outbound · Live
After every strand write, SapixDB PUTs a JSON document to {endpoint}/{bucket}/{prefix}/{record_hash}.json. Compatible with MinIO, Cloudflare R2 (public buckets with a Bearer token), and any S3-compatible API that accepts Authorization: Bearer. AWS SigV4 signing is planned for Phase 5.
{
"name": "R2 archive",
"kind": "s3",
"config": {
"endpoint": "https://account.r2.cloudflarestorage.com",
"bucket": "sapixdb-archive",
"prefix": "prod/records",
"auth_header": "Bearer <R2-token>"
}
}
// Writes land at:
// PUT https://account.r2.cloudflarestorage.com/sapixdb-archive/prod/records/{hash}.jsonManagement API
List all connectors
{
"connectors": [
{
"id": "abc123",
"name": "IoT Bridge",
"kind": "rest_bridge",
"direction": "inbound",
"enabled": true,
"created_at_ms": 1747000000000,
"config": { "flags": 0 },
"last_event_ms": 1747003200000,
"event_count": 142
}
],
"inbound_count": 2,
"outbound_count": 1
}Update (enable / disable / reconfigure)
// Disable without deleting
{ "enabled": false }
// Change target URL of a webhook
{ "config": { "url": "https://new-endpoint.example.com/hook", "secret": "tok" } }
// Both at once
{ "name": "Renamed", "enabled": true, "config": { ... } }
// → 200 ConnectorConfig | 404 if not found// → { "deleted": true, "id": "abc123" }
// Outbound connectors stop firing immediately after deletion.Python
pip install sapixdb-connectors
import asyncio
from sapixdb_connectors import ConnectorClient
async def main():
async with ConnectorClient("http://localhost:7475") as hub:
# --- Inbound: REST Bridge -----------------------------------------
bridge = await hub.add_rest_bridge("IoT Bridge")
print(f"Bridge id: {bridge.id}")
# Push a record through it (simulates an external system)
result = await hub.rest_push(
bridge.id,
{"sensor": "temp-7", "celsius": 22.4, "location": "rack-3"},
)
print(f"Written: {result.record_hash}")
# --- Outbound: Webhook --------------------------------------------
wh = await hub.add_webhook(
"Ops alerts",
url="https://hooks.slack.com/services/T00…",
secret="xoxb-my-token",
)
print(f"Webhook id: {wh.id}")
# --- Outbound: S3 ------------------------------------------------
s3 = await hub.add_s3(
"R2 archive",
endpoint="https://account.r2.cloudflarestorage.com",
bucket="sapixdb-archive",
prefix="prod/records",
auth_header="Bearer <token>",
)
# --- Inspect -------------------------------------------------------
resp = await hub.list_connectors()
print(f"Inbound: {resp.inbound_count} Outbound: {resp.outbound_count}")
for c in resp.connectors:
status = "✓" if c.enabled else "✗"
print(f" {status} [{c.direction:<8}] {c.name:<20} events={c.event_count}")
# --- Disable / delete ----------------------------------------------
await hub.disable(wh.id)
await hub.delete_connector(s3.id)
asyncio.run(main())ConnectorClient API
sapixdb-agent, SapixClient exposes raw-dict wrappers — connector_list(), connector_create(), connector_get(), connector_update(), connector_delete(), and connector_rest_push(). Use sapixdb-connectors for typed Pydantic models and the named convenience factories.