fix(storage): Add SQLite connection pool and config#367
Merged
Conversation
Contributor
Reviewer's GuideRefactors SQLiteStorage to use an asyncio-based connection pool instead of a single shared connection, and introduces configuration for pool size and related settings. Sequence diagram for SQLite connection pool initialization and usagesequenceDiagram
actor App
participant Settings
participant SQLiteStorage
participant asyncio_Queue as Pool
participant aiosqlite
participant SQLiteDB
App->>SQLiteStorage: __init__(db_path)
SQLiteStorage->>Settings: read storage_path
SQLiteStorage->>Settings: read sqlite_pool_size
SQLiteStorage->>SQLiteStorage: set db_path and pool_size
SQLiteStorage->>Pool: create Queue(maxsize=pool_size)
App->>SQLiteStorage: initialize()
alt already initialized
SQLiteStorage-->>App: return
else not initialized
loop pool_size times
SQLiteStorage->>aiosqlite: connect(db_path)
aiosqlite-->>SQLiteStorage: SQLiteDB
SQLiteStorage->>SQLiteDB: execute(PRAGMA journal_mode=WAL)
SQLiteStorage->>SQLiteDB: execute(PRAGMA synchronous=NORMAL)
SQLiteStorage->>Settings: read sqlite_mmap_size
alt mmap_size > 0
SQLiteStorage->>SQLiteDB: execute(PRAGMA mmap_size=...)
end
alt first connection
SQLiteStorage->>SQLiteDB: executescript(schema.sql)
SQLiteStorage->>SQLiteDB: commit()
end
SQLiteStorage->>Pool: put(SQLiteDB)
end
SQLiteStorage->>SQLiteStorage: set _initialized = True
SQLiteStorage-->>App: return
end
App->>SQLiteStorage: _get_connection()
SQLiteStorage->>Pool: get()
Pool-->>SQLiteStorage: db
SQLiteStorage-->>App: yield db
App->>SQLiteDB: perform queries
App-->>SQLiteStorage: exit context
SQLiteStorage->>Pool: put(db)
App->>SQLiteStorage: close()
loop while pool not empty
SQLiteStorage->>Pool: get_nowait()
Pool-->>SQLiteStorage: db
SQLiteStorage->>SQLiteDB: close()
end
Class diagram for updated SQLiteStorage and Settings configurationclassDiagram
class SQLiteStorage {
- str db_path
- bool _initialized
- asyncio_Queue _pool
- int _pool_size
+ __init__(db_path)
+ initialize() async
+ close() async
+ _get_connection() async
+ _create_connection() async
+ _reset_singleton() class
+ _compress(data)
}
class Settings {
+ str storage_path
+ int sqlite_pool_size
+ int sqlite_mmap_size
}
SQLiteStorage --> Settings : uses
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
Contributor
There was a problem hiding this comment.
Hey - I've found 2 issues, and left some high level feedback:
- The
close()method only drains connections currently in the queue and will silently skip any connections checked out at the time of shutdown, which can leak connections; consider tracking active connections or marking the pool as closed and waiting for all connections to be returned before closing. close()leaves_initializedasTrue, so subsequent callers can still enter_get_connection()and block on a queue that is being drained/closed; consider setting_initialized = False(and possibly guarding_get_connection()against use after shutdown).
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The `close()` method only drains connections currently in the queue and will silently skip any connections checked out at the time of shutdown, which can leak connections; consider tracking active connections or marking the pool as closed and waiting for all connections to be returned before closing.
- `close()` leaves `_initialized` as `True`, so subsequent callers can still enter `_get_connection()` and block on a queue that is being drained/closed; consider setting `_initialized = False` (and possibly guarding `_get_connection()` against use after shutdown).
## Individual Comments
### Comment 1
<location> `app/adapters/storage/sqlite_storage.py:98-102` </location>
<code_context>
+ finally:
+ await self._pool.put(db)
async def close(self) -> None:
- """Close the shared connection if it exists"""
- if self._shared_connection is not None:
- await self._shared_connection.close()
- self._shared_connection = None
+ """Drain the pool and close all connections."""
+ while not self._pool.empty():
+ db = self._pool.get_nowait()
+ await db.close()
async def initialize(self) -> None:
</code_context>
<issue_to_address>
**issue (bug_risk):** `close()` leaves `_initialized` as True and can put closed connections back into circulation.
Because `close()` only drains the queue and leaves `_initialized` as `True`, the pool object remains "active" but has no usable connections. Any later `_get_connection()` will block forever on `self._pool.get()` (nothing is ever put back), and if `close()` runs while a connection is checked out, that connection will be closed and then later returned to the pool by the `finally: await self._pool.put(db)` block. That allows a closed connection to be reused. To avoid this, (a) reset `self._initialized = False` in `close()`, and (b) prevent checked-out connections from being returned after the pool is closed (e.g., track a `closed` flag and skip re-queuing, or detect closed/broken connections and close without putting them back).
</issue_to_address>
### Comment 2
<location> `app/adapters/storage/sqlite_storage.py:87-96` </location>
<code_context>
+ if self._pool.empty() and not self._initialized:
msg = "SQLite connection not initialized. Call initialize() first."
raise RuntimeError(msg)
+ db = await self._pool.get()
try:
- yield self._shared_connection
+ yield db
except Exception as e:
if settings.prometheus_enabled:
error_type = type(e).__name__
sqlite_connection_errors_total.labels(error_type=error_type).inc()
raise
+ finally:
+ await self._pool.put(db)
async def close(self) -> None:
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Consider handling permanently broken connections instead of returning them to the pool.
Right now any exception during use causes the connection to be returned to the pool. For errors indicating a broken DB connection (e.g. `OperationalError`), that connection will keep circulating and failing. Consider detecting these failure types, closing the bad connection instead of returning it, and optionally creating/enqueuing a new one to keep the pool at capacity.
Suggested implementation:
```python
db = await self._pool.get()
broken_connection = False
try:
yield db
except Exception as e:
if settings.prometheus_enabled:
error_type = type(e).__name__
sqlite_connection_errors_total.labels(error_type=error_type).inc()
# Detect errors that likely indicate a permanently broken connection.
# We treat SQLite/aiosqlite OperationalError as fatal for this connection.
broken_error_types = ()
try:
import sqlite3
broken_error_types = (sqlite3.OperationalError,)
except Exception:
# Fall back silently if sqlite3 is not available for some reason.
pass
try:
import aiosqlite # type: ignore[import-not-found]
broken_error_types = broken_error_types + (aiosqlite.OperationalError,)
except Exception:
# aiosqlite may not be used or available; ignore if import fails.
pass
if broken_error_types and isinstance(e, broken_error_types):
broken_connection = True
raise
finally:
if broken_connection:
# Do not return a broken connection to the pool.
try:
await db.close()
except Exception:
# Swallow close errors; the connection is already broken.
pass
else:
await self._pool.put(db)
```
1. If your project already has a centralized way to detect broken DB connections (e.g. a helper function or a predefined tuple of exception types), replace the inline `broken_error_types` logic with that shared mechanism to keep behavior consistent.
2. If you have a custom SQLite wrapper instead of using `aiosqlite` directly, adjust the imported `OperationalError` types accordingly.
3. If you want to keep the pool at a fixed capacity, you can extend the `broken_connection` branch in the `finally` block to create a replacement connection and `put` it into the pool after closing the broken one, using whatever factory/initializer you already use in `initialize()`.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.



Summary by Sourcery
Introduce a configurable SQLite connection pool for the storage adapter to support parallel reads and centralize connection configuration.
New Features:
Enhancements:
Documentation: