Source code for secantus.cli

from __future__ import annotations

import argparse
import datetime
import logging
import signal
import sys
from pathlib import Path
from types import FrameType

from secantus.config import (
    ConfigError,
    SecantusConfig,
    apply_overrides,
    load_config,
)
from secantus.server import SecantusDBServer

log = logging.getLogger(__name__)


[docs] def build_parser() -> argparse.ArgumentParser: """Argparse parser for the ``secantusdb`` daemon. Every value-bearing flag defaults to ``None``. ``None`` is the sentinel main() uses to decide "user passed this" vs "user did not pass this, fall back to TOML / SecantusConfig default." Without it, every argparse default would silently override the TOML file. """ parser = argparse.ArgumentParser( description=( "Run a SecantusDB standalone single-node MongoDB server " "speaking the pymongo wire protocol. Flags override values " "in secantusdb.toml; secantusdb.toml overrides built-in " "defaults." ), ) parser.add_argument( "--config", type=Path, default=None, metavar="PATH", help=( "Path to a secantusdb.toml configuration file. When omitted, " "the launcher auto-discovers ./secantusdb.toml, " "~/.secantus/secantusdb.toml, /etc/secantus/secantusdb.toml " "(first hit wins). Passing this flag disables auto-discovery." ), ) parser.add_argument("--host", default=None) parser.add_argument("--port", type=int, default=None) parser.add_argument( "--storage-path", default=None, metavar="PATH", help=( "WiredTiger home directory (default: './secantus-data'). Created " "if missing; reopened intact across restarts. Pass ':memory:' " "for an ephemeral temp dir cleaned up on shutdown (test mode)." ), ) parser.add_argument( "--log-level", default=None, choices=["DEBUG", "INFO", "WARNING", "ERROR"], ) parser.add_argument( "--auth", action="store_true", default=None, help=( "Require SCRAM-SHA-256 authentication for non-handshake commands. " "Provision users by connecting once without auth and running " "createUser, then restart with --auth. Off by default." ), ) parser.add_argument( "--standalone", action="store_true", default=None, help=( "Drop the single-node replica-set advertisement from the " "``hello`` reply, so drivers see SecantusDB as a STANDALONE " "topology. Default is to advertise as a single-node " "``secantus`` replica-set primary so pymongo's change-stream " "machinery accepts the topology. Test gauges that need the " "driver's single-node code path (e.g. mongo-java-driver's " "``ClusterFixture.getSecondary()`` would otherwise loop " "forever waiting for a SECONDARY) opt into this." ), ) parser.add_argument( "--noop-heartbeat-seconds", type=float, default=None, metavar="SECONDS", help=( "Emit a periodic ``{op: 'n'}`` oplog heartbeat every N seconds " "so quiet change-stream cursors keep their resume token inside " "the oplog retention window (mongod's default is 10s). 0 = " "disabled (the default; embedded test users typically don't " "want the extra writes)." ), ) parser.add_argument( "--cache-size", default=None, metavar="SIZE", help=( "WiredTiger cache size — unit-suffixed string like '256M', " "'1G', '8G'. Defaults to '1G'. Bigger cache = more working " "set in RAM = better hit rates; size to fit the hot doc " "subset of the dataset." ), ) parser.add_argument( "--session-max", type=int, default=None, metavar="N", help=( "WiredTiger session_max — concurrent WT session cap. Each " "client connection + change-stream tailer takes one. " "Default 1000." ), ) parser.add_argument( "--sync-on-commit", action="store_true", default=None, help=( "Fsync the WT log on every transaction commit. Closes the " "writeConcern j:true durability gap (true power-loss " "between commits no longer loses data) at a significant " "throughput cost (1-2 orders of magnitude on small-doc " "insert workloads). Off by default; matches mongod's " "default w:1, j:false." ), ) parser.add_argument( "--oplog-retention-seconds", type=float, default=None, metavar="SECONDS", help=( "Oplog wall-clock retention. Entries older than this are " "pruned opportunistically. Default 3600 (1 hour). Tune up " "for resume tokens that need to survive long idle " "stretches; tune down for a tighter disk budget." ), ) parser.add_argument( "--oplog-max-entries", type=int, default=None, metavar="N", help=( "Oplog count cap. Whichever bound (this or retention) " "hits first prunes the oldest entries. Default 100000." ), ) parser.add_argument( "--oplog-archive-dir", default=None, metavar="DIR", help=( "Enable PITR v2: archive oplog rows to durable segments in DIR before " "pruning drops them. Pair with periodic base snapshots " "(secantusAdmin.archiveBaseSnapshot) so recovery can reach a time " "before the live oplog floor. Off by default." ), ) parser.add_argument( "--tls-cert-file", default=None, metavar="PATH", help=( "PEM-format server certificate chain. When this and " "--tls-key-file are both set, accept()ed sockets are TLS-" "wrapped before the wire protocol starts; clients connect " "with 'mongodb://host:port/?tls=true&tlsCAFile=<ca>'. " "Without TLS the daemon stays plaintext (matches the " "existing behaviour)." ), ) parser.add_argument( "--tls-key-file", default=None, metavar="PATH", help="PEM-format private key matching --tls-cert-file.", ) parser.add_argument( "--tls-ca-file", default=None, metavar="PATH", help=( "PEM-format CA bundle. When set (alongside --tls-cert-file / " "--tls-key-file), the daemon asks connecting clients for an " "X.509 cert during the TLS handshake and verifies it against " "this CA. Required when --tls-require-client-cert is on." ), ) parser.add_argument( "--tls-require-client-cert", action="store_true", default=None, help=( "Reject clients that don't present a valid X.509 cert " "(mTLS). Off by default — verifies a cert if one is offered " "but accepts clients without one (CERT_OPTIONAL). Requires " "--tls-ca-file. mTLS cert-as-username auth (MONGODB-X509) " "is a separate follow-on; this flag is the transport-layer " "gate only." ), ) return parser
def _overrides_from_args(args: argparse.Namespace) -> dict[str, object]: """Extract only the flags the user actually passed (non-None) so they can be layered on top of the TOML file's values.""" arg_to_field: dict[str, str] = { "host": "host", "port": "port", "storage_path": "storage_path", "log_level": "log_level", "auth": "auth", "standalone": "standalone", "noop_heartbeat_seconds": "noop_heartbeat_seconds", "cache_size": "cache_size", "session_max": "session_max", "sync_on_commit": "sync_on_commit", "oplog_retention_seconds": "oplog_retention_seconds", "oplog_max_entries": "oplog_max_entries", "oplog_archive_dir": "oplog_archive_dir", "tls_cert_file": "tls_cert_file", "tls_key_file": "tls_key_file", "tls_ca_file": "tls_ca_file", "tls_require_client_cert": "tls_require_client_cert", } overrides: dict[str, object] = {} for arg_name, field_name in arg_to_field.items(): value = getattr(args, arg_name) if value is not None: overrides[field_name] = value return overrides def _build_restore_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog="secantusdb restore", description=( "Point-in-time recovery: rebuild a fresh data directory as the " "database was at a target time, by replaying a backup's oplog " "forward. The source must be a stopped server's data directory " "or a backup .tar.gz (a live data directory can't be opened — " "WiredTiger holds a single-writer lock). Start a new server on " "--target-dir afterwards." ), ) parser.add_argument( "--source", required=True, metavar="PATH", help=( "Backup .tar.gz archive, a stopped server's data directory, or a " "PITR v2 archive directory (base snapshots + oplog segments — lets " "recovery reach a time before the live oplog floor)." ), ) parser.add_argument( "--target-dir", required=True, metavar="PATH", help="Fresh directory to rebuild into (must not be a live server's path).", ) group = parser.add_mutually_exclusive_group() group.add_argument( "--to-time", metavar="ISO8601", help=( "Recover to the last write at or before this wall-clock time, e.g. " "'2026-06-17T14:30:00Z'. Naive times are treated as UTC." ), ) group.add_argument( "--to-timestamp", metavar="SECS[,ORD]", help="Recover to this cluster timestamp (seconds, optional ordinal).", ) parser.add_argument( "--preserve-oplog", action="store_true", help=( "Carry the replayed oplog onto the restored directory so a change " "stream there can resume from a token minted before the restore " "point. Default: start a fresh oplog timeline (like mongorestore)." ), ) return parser def _parse_iso_utc(value: str) -> datetime.datetime: text = value[:-1] + "+00:00" if value.endswith("Z") else value dt = datetime.datetime.fromisoformat(text) if dt.tzinfo is None: dt = dt.replace(tzinfo=datetime.timezone.utc) return dt def _restore_main(argv: list[str]) -> int: from bson import Timestamp from secantus import oplog_replay args = _build_restore_parser().parse_args(argv) to_ts = None to_wall = None if args.to_timestamp is not None: secs, _, ordinal = args.to_timestamp.partition(",") try: to_ts = Timestamp(int(secs), int(ordinal) if ordinal else 0) except ValueError: print("secantusdb restore: --to-timestamp must be SECS[,ORD]", file=sys.stderr) return 2 if args.to_time is not None: try: to_wall = _parse_iso_utc(args.to_time) except ValueError: print(f"secantusdb restore: cannot parse --to-time {args.to_time!r}", file=sys.stderr) return 2 from secantus import pitr_archive try: if pitr_archive.is_archive_dir(args.source): # PITR v2: a directory of base snapshots + oplog segments. stats = pitr_archive.restore_from_archive_dir( args.source, args.target_dir, to_ts=to_ts, to_wall=to_wall, carry_oplog=args.preserve_oplog, ) else: fn = ( oplog_replay.restore_archive_to_timestamp if args.source.endswith((".tar.gz", ".tgz")) else oplog_replay.restore_to_timestamp ) stats = fn( args.source, args.target_dir, to_ts=to_ts, to_wall=to_wall, carry_oplog=args.preserve_oplog, ) except (ValueError, RuntimeError) as exc: print(f"secantusdb restore: {exc}", file=sys.stderr) return 2 print( f"Restored {stats['opsApplied']} operations " f"(through oplog seq {stats['lastSeq']}) into {stats['targetDir']}.\n" f"Start a server on it: secantusdb --storage-path {stats['targetDir']}" ) return 0
[docs] def main(argv: list[str] | None = None) -> int: raw = sys.argv[1:] if argv is None else argv if raw and raw[0] == "restore": # ``secantusdb restore ...`` is the only subcommand; everything else # is the daemon (bare ``secantusdb --flags`` keeps working unchanged). return _restore_main(raw[1:]) args = build_parser().parse_args(argv) try: base, source = load_config(args.config) cfg: SecantusConfig = apply_overrides(base, _overrides_from_args(args)) except ConfigError as exc: print(f"secantusdb: {exc}", file=sys.stderr) return 2 logging.basicConfig( level=cfg.log_level, format="%(asctime)s %(levelname)s %(name)s: %(message)s", ) if source is not None: log.info("loaded config from %s", source) server = SecantusDBServer( host=cfg.host, port=cfg.port, storage_path=cfg.storage_path, require_auth=cfg.auth, noop_heartbeat_seconds=cfg.noop_heartbeat_seconds, replica_set_name=None if cfg.standalone else "secantus", oplog_retention_seconds=cfg.oplog_retention_seconds, oplog_max_entries=cfg.oplog_max_entries, oplog_archive_dir=cfg.oplog_archive_dir, cache_size=cfg.cache_size, session_max=cfg.session_max, sync_on_commit=cfg.sync_on_commit, tls_cert_file=cfg.tls_cert_file, tls_key_file=cfg.tls_key_file, tls_ca_file=cfg.tls_ca_file, tls_require_client_cert=cfg.tls_require_client_cert, ) def handle_signal(signum: int, frame: FrameType | None) -> None: server.stop() sys.exit(0) signal.signal(signal.SIGINT, handle_signal) signal.signal(signal.SIGTERM, handle_signal) server.start() server.wait() return 0