Source code for secantus.cursors
from __future__ import annotations
import itertools
import threading
import time
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any
DEFAULT_IDLE_TTL_SECONDS = 600.0 # matches MongoDB's 10-minute cursor TTL.
[docs]
class CursorNotFound(Exception):
def __init__(self, cursor_id: int) -> None:
super().__init__(f"cursor id {cursor_id} not found")
self.cursor_id = cursor_id
@dataclass
class _Entry:
cursor_id: int
namespace: str
remaining: list[dict[str, Any]]
last_access: float
[docs]
class CursorRegistry:
def __init__(
self,
idle_ttl_seconds: float = DEFAULT_IDLE_TTL_SECONDS,
time_func: Callable[[], float] | None = None,
) -> None:
self._cursors: dict[int, _Entry] = {}
self._lock = threading.Lock()
self._next_id = itertools.count(1)
self.idle_ttl_seconds = idle_ttl_seconds
self._time = time_func or time.monotonic
def _prune_locked(self) -> None:
if self.idle_ttl_seconds <= 0:
return
cutoff = self._time() - self.idle_ttl_seconds
expired = [cid for cid, e in self._cursors.items() if e.last_access < cutoff]
for cid in expired:
del self._cursors[cid]
def register(self, namespace: str, remaining: list[dict[str, Any]]) -> int:
with self._lock:
self._prune_locked()
cursor_id = next(self._next_id)
self._cursors[cursor_id] = _Entry(cursor_id, namespace, list(remaining), self._time())
return cursor_id
def next_batch(self, cursor_id: int, batch_size: int) -> tuple[list[dict[str, Any]], bool]:
with self._lock:
self._prune_locked()
entry = self._cursors.get(cursor_id)
if entry is None:
raise CursorNotFound(cursor_id)
if batch_size <= 0:
batch_size = len(entry.remaining)
batch = entry.remaining[:batch_size]
entry.remaining = entry.remaining[batch_size:]
exhausted = not entry.remaining
if exhausted:
del self._cursors[cursor_id]
else:
entry.last_access = self._time()
return batch, exhausted
def kill(self, cursor_ids: list[int]) -> tuple[list[int], list[int]]:
killed: list[int] = []
not_found: list[int] = []
with self._lock:
self._prune_locked()
for cid in cursor_ids:
if self._cursors.pop(cid, None) is not None:
killed.append(cid)
else:
not_found.append(cid)
return killed, not_found
def __len__(self) -> int:
with self._lock:
self._prune_locked()
return len(self._cursors)