Indexing
Contrail’s core job: mirror atproto records into your DB and expose them via XRPC. You describe what to index with a config object; everything else is automatic.
Collection shape
A realistic two-collection example: events and RSVPs. RSVPs point at events via subject.uri; events expose per-status RSVP counts.
collections: {
event: {
collection: "community.lexicon.calendar.event", // full NSID
queryable: {
mode: {}, // ?mode=online
startsAt: { type: "range" }, // ?startsAtMin=...&startsAtMax=...
},
searchable: ["name", "description"], // FTS5 / tsvector
relations: {
rsvps: {
collection: "rsvp", // short name of the child collection
groupBy: "status", // field on the child record
groups: {
going: "community.lexicon.calendar.rsvp#going",
interested: "community.lexicon.calendar.rsvp#interested",
},
},
},
},
rsvp: {
collection: "community.lexicon.calendar.rsvp",
queryable: { status: {} },
references: {
event: { collection: "event", field: "subject.uri" }, // RSVP's field → event's URI
},
},
}
- queryable — string equality or range, exposed as query params.
- searchable — FTS5 on D1/Postgres. Not available on
node:sqlite. - relations — many-to-one with materialized counts. The
eventcollection gainsrsvpsCount,rsvpsGoingCount,rsvpsInterestedCountcolumns — filter (?rsvpsGoingCountMin=10) and sort (?sort=rsvpsGoingCount) on them. Hydrate inline with?hydrateRsvps=5. - references — forward lookups from child → parent.
?hydrateEvent=trueon an RSVP query embeds the referenced event record.
Backfill (historical data)
Run once at setup to pull every record that exists today.
await contrail.backfillAll({ concurrency: 100 }); // discover + backfill, logs progress
Under the hood this is two steps you can call separately if you want finer control:
await contrail.discover(); // walk relays, register DIDs
await contrail.backfill({ concurrency: 100 }); // fetch history for registered DIDs
backfill() picks up where it left off across runs — safe to re-run.
Workers CLI
For Cloudflare Workers deploys, @atmo-dev/contrail ships a contrail bin that handles the wrangler.getPlatformProxy dance — no script file, no package.json alias needed:
pnpm contrail backfill # local D1 (wrangler dev's bindings)
pnpm contrail backfill --remote # production D1
Auto-detects configs at contrail.config.ts, src/contrail.config.ts, src/lib/contrail.config.ts, or app/contrail.config.ts (first match wins). Override with --config <path>. Other flags: --binding <name> (default DB), --concurrency <n> (default 100).
If you’d rather embed backfill inside your own script, @atmo-dev/contrail/workers exports the same logic as a function:
import { backfillAll } from "@atmo-dev/contrail/workers";
import { config } from "../src/contrail.config";
await backfillAll({ config, remote: process.argv.includes("--remote") });
For node/postgres deploys, skip both — you already have a db in hand; just await contrail.backfillAll({}, db) directly.
Ingestion (ongoing new records)
After the initial backfillAll(), keep the index fresh with new records as they’re published. Pick the mode that matches your runtime.
Cron-driven (cloudflare workers)
Workers can’t hold long-lived connections, so run one catch-up cycle per cron fire:
// wrangler.jsonc: "triggers": { "crons": ["*/1 * * * *"] }
async scheduled(_ev, env, ctx) {
ctx.waitUntil(contrail.ingest({}, env.DB));
}
ingest() connects to Jetstream, streams events since the saved cursor, stops when caught up. Running every minute is fine — the next fire resumes where this one left off. Each cycle is bounded, so it can’t blow past the Worker time limit.
Local dev: wrangler’s cron scheduler only runs in deployed production. For local dev use pnpm contrail dev — it runs wrangler dev --test-scheduled, fires /__scheduled on your configured cron interval, and prompts you to run backfill or refresh if the local DB looks stale on start.
Persistent (node / any long-lived server)
If your runtime can keep a socket open, skip the cron entirely:
const ac = new AbortController();
await contrail.runPersistent({
batchSize: 50, // flush every N events (default: 50)
flushIntervalMs: 5000, // or every N ms, whichever first
signal: ac.signal,
});
// ac.abort() flushes the current batch and saves the cursor before returning
One process, one socket, auto-reconnect on drops. Lower latency than cron mode (events land within seconds instead of up-to-a-minute), but needs a runtime that can run indefinitely.
Immediate (notify())
Use this when your own app writes to a user’s PDS and needs the change indexed now — waiting for the next cron / Jetstream flush is too slow:
await contrail.notify(uri); // one record
await contrail.notify([u1, u2, u3]); // batch, up to 25
Fetches directly from the user’s PDS and indexes synchronously. When Jetstream later delivers the same event, the duplicate is detected by CID and skipped.
Which one do I use?
| backfillAll | ingest | runPersistent | notify | |
|---|---|---|---|---|
| when | once, at setup | every cron fire | start once, runs forever | per-write, on demand |
| runtime | local script | cloudflare workers | node / long-lived server | anywhere |
| scope | all historical records | events since last cursor | events since last cursor, live | specific URIs |
| latency | — | ~minute | ~seconds | immediate |
Typical combos:
- workers app:
backfillAll()once +ingest()on cron + optionalnotify()for self-writes - node server:
backfillAll()once +runPersistent()forever + optionalnotify()for self-writes
Refresh (catch-up after outages / dev idle)
When Jetstream drops events — you went offline for a few days in dev, there was an outage, or you just want reassurance nothing was lost — refresh walks every known DID’s PDS and reconciles against your DB:
pnpm contrail refresh # totals only
pnpm contrail refresh --by-collection # totals + per-collection breakdown
pnpm contrail refresh --ignore-window 30 # grace seconds (default: 60)
Each record is classified as:
- missing — PDS has it, DB doesn’t. Inserted.
- stale update — DB has it with a different CID, and the DB row is older than the ignore window. Upserted.
- in sync — same CID, or DB row is within the ignore window (jetstream probably just hadn’t caught up yet).
The ignore window is there so a refresh run seconds after a normal jetstream cycle doesn’t double-count records that are about to sync anyway. Records inside the window are still written if they differ; they just don’t show up in the stats.
Report shape (--by-collection):
by collection:
community.lexicon.calendar.event
3 missing, 1 stale updates, 842 in sync
community.lexicon.calendar.rsvp
12 missing, 0 stale updates, 4108 in sync
total:
15 missing, 1 stale updates, 4950 in sync
234 users scanned, 1 failed in 85.3s
(ignore window: 60s)
Safe to run repeatedly — each pass converges toward zero missing / stale. Programmatic equivalent: contrail.refresh({ ignoreWindowMs, concurrency }) returns the same structure.
Refresh is not a replacement for ingest/runPersistent — it walks every user’s full history, which is expensive. Use it after outages or during dev idle, not as a continuous freshness mechanism.
Reading the indexed data — filters, sorts, hydration, search, pagination — has its own doc: Querying .
Adapters
| Adapter | Use when | FTS |
|---|---|---|
| Cloudflare D1 | Workers | ✅ |
@atmo-dev/contrail/sqlite | Node 22+ local dev | ❌ |
@atmo-dev/contrail/postgres | Node server | ✅ |
import { createPostgresDatabase } from "@atmo-dev/contrail/postgres";
const db = createPostgresDatabase(pool);
Top-level config
| Key | Default | |
|---|---|---|
namespace | — | Reverse-domain for XRPC paths |
profiles | ["app.bsky.actor.profile"] | Profile NSIDs, auto-hydrated via ?profiles=true |
jetstreams | Bluesky | Jetstream URLs |
relays | Bluesky | Relay URLs for discovery |
notify | off | true opens notifyOfUpdate; a string requires Bearer |
spaces | — | See Spaces |
community | — | See Communities |
realtime | — | See Sync |
labels | — | See Labels |