49 lines
1.6 KiB
Python
49 lines
1.6 KiB
Python
from sqlalchemy import text
|
|
from sqlalchemy.ext.asyncio import (
|
|
async_sessionmaker,
|
|
AsyncEngine,
|
|
AsyncSession,
|
|
create_async_engine,
|
|
)
|
|
|
|
from .base_class import Base
|
|
|
|
|
|
class Database:
|
|
def __init__(self) -> None:
|
|
self._engine: AsyncEngine | None = None
|
|
self._session_maker: async_sessionmaker[AsyncSession] | None = None
|
|
self._session: AsyncSession | None = None
|
|
|
|
@property
|
|
def session(self) -> AsyncSession | None:
|
|
if self._session is None and (session_maker := self._session_maker) is not None:
|
|
self._session = session_maker()
|
|
return self._session
|
|
|
|
async def connect(self, db_path: str, clear_static_data: bool = False) -> bool:
|
|
|
|
# TODO: Preserve UserLastStopSearchResults table from drop.
|
|
self._engine = create_async_engine(db_path)
|
|
if self._engine is not None:
|
|
self._session_maker = async_sessionmaker(
|
|
self._engine, expire_on_commit=False, class_=AsyncSession
|
|
)
|
|
if (session := self.session) is not None:
|
|
await session.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm;"))
|
|
|
|
async with self._engine.begin() as conn:
|
|
if clear_static_data:
|
|
await conn.run_sync(Base.metadata.drop_all)
|
|
await conn.run_sync(Base.metadata.create_all)
|
|
|
|
return True
|
|
|
|
async def disconnect(self) -> None:
|
|
if self._session is not None:
|
|
await self._session.close()
|
|
self._session = None
|
|
|
|
if self._engine is not None:
|
|
await self._engine.dispose()
|