diff --git a/.env.dist b/.env.dist index 910552aa..34e7166c 100644 --- a/.env.dist +++ b/.env.dist @@ -37,6 +37,7 @@ STORAGE_PATH=/code/data/overfast.db # Default: 0 (disabled) - Recommended for production: 268435456 (256MB) to 1073741824 (1GB) # Set according to your database size and available RAM SQLITE_MMAP_SIZE=0 +SQLITE_POOL_SIZE=10 UNKNOWN_PLAYER_BASE_RETRY_AFTER=600 UNKNOWN_PLAYER_RETRY_MULTIPLIER=3 diff --git a/app/adapters/storage/sqlite_storage.py b/app/adapters/storage/sqlite_storage.py index d6866fd0..f1d14eaf 100644 --- a/app/adapters/storage/sqlite_storage.py +++ b/app/adapters/storage/sqlite_storage.py @@ -1,9 +1,11 @@ """SQLite storage adapter with zstd compression""" +import asyncio import json +import sqlite3 import time from compression import zstd -from contextlib import asynccontextmanager +from contextlib import asynccontextmanager, suppress from pathlib import Path from typing import TYPE_CHECKING @@ -36,7 +38,7 @@ class SQLiteStorage(metaclass=Singleton): All data is compressed with zstd (Python 3.14+ built-in) before storage. Uses Singleton pattern to ensure single instance across application. - Uses a single persistent connection to avoid per-operation PRAGMA overhead. + Uses a connection pool to allow parallel reads under WAL mode. """ def __init__(self, db_path: str | None = None): @@ -51,7 +53,15 @@ def __init__(self, db_path: str | None = None): """ self.db_path = db_path or settings.storage_path self._initialized = False - self._shared_connection: aiosqlite.Connection | None = None + self._closed = False + self._init_lock = asyncio.Lock() + + # In-memory DBs are per-connection in SQLite — pool size forced to 1 + pool_size = 1 if self.db_path == MEMORY_DB else settings.sqlite_pool_size + self._pool: asyncio.Queue[aiosqlite.Connection] = asyncio.Queue( + maxsize=pool_size + ) + self._pool_size = pool_size @classmethod def _reset_singleton(cls): @@ -60,61 +70,121 @@ def _reset_singleton(cls): if cls in instances: del instances[cls] + async def _create_connection(self) -> aiosqlite.Connection: + """Open a new connection and configure it with the required PRAGMAs.""" + db = await aiosqlite.connect(self.db_path) + + await db.execute("PRAGMA journal_mode=WAL") + await db.execute("PRAGMA synchronous=NORMAL") + + if settings.sqlite_mmap_size > 0: + await db.execute(f"PRAGMA mmap_size={settings.sqlite_mmap_size}") + + return db + + async def _acquire_connection(self) -> aiosqlite.Connection: + """Acquire a live connection from the pool, healing tombstone None slots.""" + db = await self._pool.get() + + while db is None: + logger.warning("Skipping dead pool slot, attempting to replace") + try: + db = await self._create_connection() + except (OSError, sqlite3.Error): + await self._pool.put(None) + db = await self._pool.get() + + return db + + async def _release_connection( + self, db: aiosqlite.Connection, *, broken: bool + ) -> None: + """Return a connection to the pool, replacing it if broken or closing if shut down.""" + if self._closed: + with suppress(Exception): + await db.close() + return + + if broken: + with suppress(Exception): + await db.close() + + try: + replacement = await self._create_connection() + await self._pool.put(replacement) + except (OSError, sqlite3.Error): + # Replacement failed — tombstone keeps the slot so the queue + # never blocks permanently; _acquire_connection will retry. + logger.warning("Failed to create replacement SQLite connection") + await self._pool.put(None) # type: ignore[arg-type] + else: + await self._pool.put(db) + @asynccontextmanager async def _get_connection(self) -> AsyncIterator[aiosqlite.Connection]: """ - Get the persistent shared database connection. - PRAGMAs are set once during initialize(), not on every operation. + Acquire a connection from the pool, yield it, then return it. + Callers block if all connections are in use (natural backpressure). + Broken connections are replaced to keep the pool at capacity. """ - if self._shared_connection is None: + if self._closed: + msg = "SQLite storage is closed." + raise RuntimeError(msg) + if not self._initialized: msg = "SQLite connection not initialized. Call initialize() first." raise RuntimeError(msg) + db = await self._acquire_connection() + broken = False try: - yield self._shared_connection + yield db except Exception as e: if settings.prometheus_enabled: error_type = type(e).__name__ sqlite_connection_errors_total.labels(error_type=error_type).inc() + if isinstance(e, sqlite3.OperationalError): + broken = True raise + finally: + await self._release_connection(db, broken=broken) async def close(self) -> None: - """Close the shared connection if it exists""" - if self._shared_connection is not None: - await self._shared_connection.close() - self._shared_connection = None + """ + Mark the pool as closed and drain all idle connections. + Checked-out connections will be closed when returned via _get_connection's finally block. + """ + self._closed = True + self._initialized = False + while not self._pool.empty(): + db = self._pool.get_nowait() + with suppress(Exception): + if db is not None: + await db.close() async def initialize(self) -> None: - """Open the persistent connection, configure PRAGMAs, and apply schema.""" - if self._initialized: - return - - # Ensure directory exists (skip for in-memory database) - if self.db_path != MEMORY_DB: - Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) - - # Open the single persistent connection - db = await aiosqlite.connect(self.db_path) - - # Enable WAL mode (persists in DB file; safe to set every startup) - await db.execute("PRAGMA journal_mode=WAL") - - # NORMAL synchronous mode: survives app crash, acceptable risk for cache data - await db.execute("PRAGMA synchronous=NORMAL") - - # Memory-mapped I/O (optional, connection-level) - if settings.sqlite_mmap_size > 0: - await db.execute(f"PRAGMA mmap_size={settings.sqlite_mmap_size}") - - self._shared_connection = db - - # Load and apply schema - schema_path = Path(__file__).parent / "schema.sql" - schema_sql = schema_path.read_text() - await db.executescript(schema_sql) - await db.commit() - - self._initialized = True - logger.info(f"SQLite storage initialized at {self.db_path}") + """Open the connection pool, configure PRAGMAs, and apply schema.""" + async with self._init_lock: + if self._initialized: + return + + # Ensure directory exists (skip for in-memory database) + if self.db_path != MEMORY_DB: + Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) + + # Create all pool connections; apply schema on the first one + for i in range(self._pool_size): + db = await self._create_connection() + if i == 0: + schema_path = Path(__file__).parent / "schema.sql" + await db.executescript(schema_path.read_text()) + await db.commit() + await self._pool.put(db) + + self._initialized = True + self._closed = False + logger.info( + f"SQLite storage initialized at {self.db_path} " + f"(pool_size={self._pool_size})" + ) def _compress(self, data: str) -> bytes: """Compress string data using zstd (module-level function for performance)""" diff --git a/app/config.py b/app/config.py index 65d711f0..9f88d994 100644 --- a/app/config.py +++ b/app/config.py @@ -59,6 +59,11 @@ class Settings(BaseSettings): # Use ":memory:" for in-memory database (testing/ephemeral deployments) storage_path: str = "data/overfast.db" + # SQLite connection pool size — each connection runs in its own thread, + # enabling parallel reads under WAL mode. Writes still serialize at the + # SQLite level. Use 1 for :memory: (shared only within one connection). + sqlite_pool_size: int = 5 + # SQLite memory-mapped I/O size in bytes (optional performance tuning) sqlite_mmap_size: int = 0 diff --git a/uv.lock b/uv.lock index 3f32e579..52a5de16 100644 --- a/uv.lock +++ b/uv.lock @@ -604,7 +604,7 @@ wheels = [ [[package]] name = "overfast-api" -version = "3.41.1" +version = "3.41.3" source = { virtual = "." } dependencies = [ { name = "aiosqlite" },