Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.dist
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ STORAGE_PATH=/code/data/overfast.db
# Default: 0 (disabled) - Recommended for production: 268435456 (256MB) to 1073741824 (1GB)
# Set according to your database size and available RAM
SQLITE_MMAP_SIZE=0
SQLITE_POOL_SIZE=10

UNKNOWN_PLAYER_BASE_RETRY_AFTER=600
UNKNOWN_PLAYER_RETRY_MULTIPLIER=3
Expand Down
154 changes: 112 additions & 42 deletions app/adapters/storage/sqlite_storage.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
"""SQLite storage adapter with zstd compression"""

import asyncio
import json
import sqlite3
import time
from compression import zstd
from contextlib import asynccontextmanager
from contextlib import asynccontextmanager, suppress
from pathlib import Path
from typing import TYPE_CHECKING

Expand Down Expand Up @@ -36,7 +38,7 @@ class SQLiteStorage(metaclass=Singleton):
All data is compressed with zstd (Python 3.14+ built-in) before storage.

Uses Singleton pattern to ensure single instance across application.
Uses a single persistent connection to avoid per-operation PRAGMA overhead.
Uses a connection pool to allow parallel reads under WAL mode.
"""

def __init__(self, db_path: str | None = None):
Expand All @@ -51,7 +53,15 @@ def __init__(self, db_path: str | None = None):
"""
self.db_path = db_path or settings.storage_path
self._initialized = False
self._shared_connection: aiosqlite.Connection | None = None
self._closed = False
self._init_lock = asyncio.Lock()

# In-memory DBs are per-connection in SQLite — pool size forced to 1
pool_size = 1 if self.db_path == MEMORY_DB else settings.sqlite_pool_size
self._pool: asyncio.Queue[aiosqlite.Connection] = asyncio.Queue(
maxsize=pool_size
)
self._pool_size = pool_size

@classmethod
def _reset_singleton(cls):
Expand All @@ -60,61 +70,121 @@ def _reset_singleton(cls):
if cls in instances:
del instances[cls]

async def _create_connection(self) -> aiosqlite.Connection:
"""Open a new connection and configure it with the required PRAGMAs."""
db = await aiosqlite.connect(self.db_path)

await db.execute("PRAGMA journal_mode=WAL")
await db.execute("PRAGMA synchronous=NORMAL")

if settings.sqlite_mmap_size > 0:
await db.execute(f"PRAGMA mmap_size={settings.sqlite_mmap_size}")

return db

async def _acquire_connection(self) -> aiosqlite.Connection:
"""Acquire a live connection from the pool, healing tombstone None slots."""
db = await self._pool.get()

while db is None:
logger.warning("Skipping dead pool slot, attempting to replace")
try:
db = await self._create_connection()
except (OSError, sqlite3.Error):
await self._pool.put(None)
db = await self._pool.get()

return db

async def _release_connection(
self, db: aiosqlite.Connection, *, broken: bool
) -> None:
"""Return a connection to the pool, replacing it if broken or closing if shut down."""
if self._closed:
with suppress(Exception):
await db.close()
return

if broken:
with suppress(Exception):
await db.close()

try:
replacement = await self._create_connection()
await self._pool.put(replacement)
except (OSError, sqlite3.Error):
# Replacement failed — tombstone keeps the slot so the queue
# never blocks permanently; _acquire_connection will retry.
logger.warning("Failed to create replacement SQLite connection")
await self._pool.put(None) # type: ignore[arg-type]
else:
await self._pool.put(db)

@asynccontextmanager
async def _get_connection(self) -> AsyncIterator[aiosqlite.Connection]:
"""
Get the persistent shared database connection.
PRAGMAs are set once during initialize(), not on every operation.
Acquire a connection from the pool, yield it, then return it.
Callers block if all connections are in use (natural backpressure).
Broken connections are replaced to keep the pool at capacity.
"""
if self._shared_connection is None:
if self._closed:
msg = "SQLite storage is closed."
raise RuntimeError(msg)
if not self._initialized:
msg = "SQLite connection not initialized. Call initialize() first."
raise RuntimeError(msg)
db = await self._acquire_connection()
broken = False
try:
yield self._shared_connection
yield db
except Exception as e:
if settings.prometheus_enabled:
error_type = type(e).__name__
sqlite_connection_errors_total.labels(error_type=error_type).inc()
if isinstance(e, sqlite3.OperationalError):
broken = True
raise
finally:
await self._release_connection(db, broken=broken)

async def close(self) -> None:
"""Close the shared connection if it exists"""
if self._shared_connection is not None:
await self._shared_connection.close()
self._shared_connection = None
"""
Mark the pool as closed and drain all idle connections.
Checked-out connections will be closed when returned via _get_connection's finally block.
"""
self._closed = True
self._initialized = False
while not self._pool.empty():
db = self._pool.get_nowait()
with suppress(Exception):
if db is not None:
await db.close()

async def initialize(self) -> None:
"""Open the persistent connection, configure PRAGMAs, and apply schema."""
if self._initialized:
return

# Ensure directory exists (skip for in-memory database)
if self.db_path != MEMORY_DB:
Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)

# Open the single persistent connection
db = await aiosqlite.connect(self.db_path)

# Enable WAL mode (persists in DB file; safe to set every startup)
await db.execute("PRAGMA journal_mode=WAL")

# NORMAL synchronous mode: survives app crash, acceptable risk for cache data
await db.execute("PRAGMA synchronous=NORMAL")

# Memory-mapped I/O (optional, connection-level)
if settings.sqlite_mmap_size > 0:
await db.execute(f"PRAGMA mmap_size={settings.sqlite_mmap_size}")

self._shared_connection = db

# Load and apply schema
schema_path = Path(__file__).parent / "schema.sql"
schema_sql = schema_path.read_text()
await db.executescript(schema_sql)
await db.commit()

self._initialized = True
logger.info(f"SQLite storage initialized at {self.db_path}")
"""Open the connection pool, configure PRAGMAs, and apply schema."""
async with self._init_lock:
if self._initialized:
return

# Ensure directory exists (skip for in-memory database)
if self.db_path != MEMORY_DB:
Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)

# Create all pool connections; apply schema on the first one
for i in range(self._pool_size):
db = await self._create_connection()
if i == 0:
schema_path = Path(__file__).parent / "schema.sql"
await db.executescript(schema_path.read_text())
await db.commit()
await self._pool.put(db)

self._initialized = True
self._closed = False
logger.info(
f"SQLite storage initialized at {self.db_path} "
f"(pool_size={self._pool_size})"
)

def _compress(self, data: str) -> bytes:
"""Compress string data using zstd (module-level function for performance)"""
Expand Down
5 changes: 5 additions & 0 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ class Settings(BaseSettings):
# Use ":memory:" for in-memory database (testing/ephemeral deployments)
storage_path: str = "data/overfast.db"

# SQLite connection pool size — each connection runs in its own thread,
# enabling parallel reads under WAL mode. Writes still serialize at the
# SQLite level. Use 1 for :memory: (shared only within one connection).
sqlite_pool_size: int = 5

# SQLite memory-mapped I/O size in bytes (optional performance tuning)
sqlite_mmap_size: int = 0

Expand Down
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.