Change streams

SecantusDB implements MongoDB-compatible change streams against a single-node, durable oplog. pymongo’s Collection.watch(), Database.watch(), and MongoClient.watch() all work end-to-end — clients see insert / update / delete events, get back resume tokens, and can resume by token, startAfter token, or startAtOperationTime.

Watch a collection

from pymongo import MongoClient
from secantus import SecantusDBServer

with SecantusDBServer(port=0) as srv:
    client = MongoClient(srv.uri, directConnection=True)
    db = client["app"]
    db.create_collection("orders")

    cs = db["orders"].watch(max_await_time_ms=2000)
    db["orders"].insert_one({"_id": 1, "total": 9.99})
    event = next(cs)
    assert event["operationType"] == "insert"
    assert event["fullDocument"] == {"_id": 1, "total": 9.99}
    cs.close()
    client.close()

db.watch(...) watches every collection in the database; client.watch(...) (against the admin DB) watches the whole server. Subsequent pipeline stages ($match, $project, etc.) after $changeStream filter the events stream as they would on real mongod.

Resume tokens

Every event carries an opaque _id resume token. Pass it back to resume the stream:

event = next(cs)
token = event["_id"]
cs.close()

# Later, possibly in a new process — but only if the SecantusDB server is
# on-disk and survives. With `:memory:` storage, tokens die with the process.
cs2 = db["orders"].watch(resume_after=token)

Tokens carry the oplog seq, cluster timestamp, namespace, and documentKey._id in their internal payload. The format is not wire-compatible with real mongod’s keystring tokens, so a token issued by SecantusDB cannot be presented to a real mongod and vice versa — but pymongo round-trips them faithfully.

startAtOperationTime is honoured too: pass the Timestamp from a previous hello reply’s lastWrite.opTime.ts, or any cluster time returned in an aggregate reply’s operationTime.

Full document on update

By default, update events carry an updateDescription (faithful diff) but no full document. Ask for the post-image:

cs = db["orders"].watch(full_document="updateLookup")
db["orders"].update_one({"_id": 1}, {"$set": {"total": 19.99}})
event = next(cs)
assert event["fullDocument"] == {"_id": 1, "total": 19.99}

updateLookup re-fetches the current document at projection time. If the doc has since been deleted, fullDocument is None for updateLookup / whenAvailable; required raises ChangeStreamFatalError (code 280).

Pre-images via fullDocumentBeforeChange

Enable pre-image storage on the collection at creation time (or via collMod):

db.create_collection(
    "audit",
    changeStreamPreAndPostImages={"enabled": True},
)
db["audit"].insert_one({"_id": 1, "n": 1})

cs = db["audit"].watch(full_document_before_change="whenAvailable")
db["audit"].update_one({"_id": 1}, {"$set": {"n": 2}})
db["audit"].delete_one({"_id": 1})

update_ev = next(cs)
delete_ev = next(cs)
assert update_ev["fullDocumentBeforeChange"]["n"] == 1
assert delete_ev["fullDocumentBeforeChange"]["n"] == 2

When pre-images are not enabled on the collection, the field is omitted (whenAvailable) or the lookup raises ChangeStreamFatalError (required).

Retention and durability

The oplog is a real WiredTiger table. With on-disk storage (SecantusDBServer(storage_path="/path/to/wt-home")), oplog rows survive process restart and resume tokens minted before a restart can be presented after.

Retention defaults: 1 hour by wall-clock and 100 000 entries by count. Whichever cap kicks in first prunes the oldest entries plus their paired pre-images. Tune via the Storage constructor:

from secantus.storage import Storage
from secantus.server import SecantusDBServer

# A retention-hostile config for tests that exercise resume failures.
srv = SecantusDBServer(port=0, storage_path="/tmp/wt-home")
srv.storage.oplog_retention_seconds = 5.0
srv.storage.oplog_max_entries = 1000

Pruning is opportunistic (every 1000 emits) and exposed publicly as Storage.prune_oplog(now=...). There is no background sweeper — same pattern as TTL indexes.

A resume token whose seq has been pruned surfaces ChangeStreamHistoryLost (code 286), wrapped by pymongo as OperationFailure so the standard “resume failure” handling on the client side works as written.

Querying the oplog directly

For the same reason real mongod does, SecantusDB exposes the oplog as a queryable collection at local.oplog.rs. It’s a read-only synthetic view over the WiredTiger oplog table — find / count / listCollections route to the persisted oplog rows; writes (insert / update / delete / drop / createIndexes / create) are refused with code 13 (Unauthorized), matching mongod.

This is the right surface for debugging a change-stream pipeline, auditing what just got written, or building dashboards on top of the operation log. The entries are the mongod oplog shape — ts, op ("i" / "u" / "d" / "c" / "n"), ns, ui (collection UUID), o (the operation payload), o2 (the update predicate for op:"u"), wall (datetime).

from pymongo import MongoClient

client = MongoClient("mongodb://127.0.0.1:27017/")
oplog = client["local"]["oplog.rs"]

# What just happened, newest first
for entry in oplog.find().sort("ts", -1).limit(10):
    print(entry["ts"], entry["op"], entry["ns"])

# All updates against a specific collection
for entry in oplog.find({"op": "u", "ns": "appdb.users"}):
    print(entry["o2"]["_id"], entry["o"]["diff"])

# Oplog window
oldest = oplog.find().sort("ts", 1).limit(1)[0]["ts"]
newest = oplog.find().sort("ts", -1).limit(1)[0]["ts"]
print(f"oplog covers {oldest}{newest}")

The collection appears in client.list_database_names() (under local) and client.local.list_collection_names() as long as the oplog is enabled. Capped-collection metadata (size, max) reads back through listCollections.options so admin tools that probe the shape see what they expect.

enable_oplog=False (passed to SecantusDBServer / Storage) hides the entire surface — no local database, no oplog.rs collection, no oplog rows. The default is True since change streams depend on it.

Invalidation

A change stream ends with a final invalidate event when the watched scope is destroyed:

  • Collection-level watch: dropping or renaming the watched collection.

  • DB-level watch: dropDatabase on the watched DB.

The invalidate event is the last delivery; the next getMore returns cursor id 0, signalling end-of-stream to the client.

Limitations

See tasks/backlog.md “Change-stream limitations” for the canonical list. Highlights:

  • No transaction state on events (txnNumber / lsid never present).

  • No splitLargeChangeStreamEvents.

  • No noop heartbeats — resume tokens advance only on real ops.

  • DDL createIndexes / dropIndexes events not surfaced.

  • updateDescription.truncatedArrays is emitted only for strict head-prefix array shrinkage; other reshapes produce a wholesale updatedFields entry.

  • Resume-token internals are not wire-compatible with real mongod keystring tokens (round-trip through pymongo is fine).