Source code for minos.common.database.abc

from collections.abc import (
    Hashable,
)
from typing import (
    Any,
    AsyncContextManager,
    AsyncIterator,
    Optional,
)

from aiomisc.pool import (
    ContextManager,
)
from aiopg import (
    Cursor,
)
from dependency_injector.wiring import (
    Provide,
    inject,
)

from ..setup import (
    MinosSetup,
)
from .locks import (
    PostgreSqlLock,
)
from .pools import (
    PostgreSqlPool,
)


[docs]class PostgreSqlMinosDatabase(MinosSetup): """PostgreSql Minos Database base class."""
[docs] def __init__(self, host: str, port: int, database: str, user: str, password: str, *args, **kwargs): super().__init__(*args, **kwargs) self.host = host self.port = port self.database = database self.user = user self.password = password self._pool = None self._owned_pool = False
async def _destroy(self) -> None: if self._owned_pool: await self._pool.destroy() self._pool = None self._owned_pool = False
[docs] async def submit_query_and_fetchone(self, *args, **kwargs) -> tuple: """Submit a SQL query and gets the first response. :param args: Additional positional arguments. :param kwargs: Additional named arguments. :return: This method does not return anything. """ return await self.submit_query_and_iter(*args, **kwargs).__anext__()
# noinspection PyUnusedLocal
[docs] async def submit_query_and_iter( self, operation: Any, parameters: Any = None, *, timeout: Optional[float] = None, lock: Optional[int] = None, streaming_mode: bool = False, **kwargs, ) -> AsyncIterator[tuple]: """Submit a SQL query and return an asynchronous iterator. :param operation: Query to be executed. :param parameters: Parameters to be projected into the query. :param timeout: An optional timeout. :param lock: Optional key to perform the query with locking. If not set, the query is performed without any lock. :param streaming_mode: If ``True`` the data fetching is performed in streaming mode, that is iterating over the cursor and yielding once a time (requires an opening connection to do that). Otherwise, all the data is fetched and keep in memory before yielding it. :param kwargs: Additional named arguments. :return: This method does not return anything. """ if lock is None: context_manager = self.cursor() else: context_manager = self.locked_cursor(lock) async with context_manager as cursor: await cursor.execute(operation=operation, parameters=parameters, timeout=timeout) if streaming_mode: async for row in cursor: yield row return rows = await cursor.fetchall() for row in rows: yield row
# noinspection PyUnusedLocal
[docs] async def submit_query( self, operation: Any, parameters: Any = None, *, timeout: Optional[float] = None, lock: Any = None, **kwargs, ) -> None: """Submit a SQL query. :param operation: Query to be executed. :param parameters: Parameters to be projected into the query. :param timeout: An optional timeout. :param lock: Optional key to perform the query with locking. If not set, the query is performed without any lock. :param kwargs: Additional named arguments. :return: This method does not return anything. """ if lock is None: context_manager = self.cursor() else: context_manager = self.locked_cursor(lock) async with context_manager as cursor: await cursor.execute(operation=operation, parameters=parameters, timeout=timeout)
[docs] def locked_cursor(self, key: Hashable, *args, **kwargs) -> AsyncContextManager[Cursor]: """Get a new locked cursor. :param key: The key to be used for locking. :param args: Additional positional arguments. :param kwargs: Additional named arguments. :return: A Cursor wrapped into an asynchronous context manager. """ lock = PostgreSqlLock(self.pool.acquire(), key, *args, **kwargs) async def _fn_enter(): await lock.__aenter__() return lock.cursor async def _fn_exit(_): await lock.__aexit__(None, None, None) return ContextManager(_fn_enter, _fn_exit)
[docs] def cursor(self, *args, **kwargs) -> AsyncContextManager[Cursor]: """Get a new cursor. :param args: Additional positional arguments. :param kwargs: Additional named arguments. :return: A Cursor wrapped into an asynchronous context manager. """ acquired = self.pool.acquire() async def _fn_enter(): connection = await acquired.__aenter__() cursor = await connection.cursor(*args, **kwargs).__aenter__() return cursor async def _fn_exit(cursor: Cursor): if not cursor.closed: cursor.close() await acquired.__aexit__(None, None, None) return ContextManager(_fn_enter, _fn_exit)
@property def pool(self) -> PostgreSqlPool: """Get the connections pool. :return: A ``Pool`` object. """ if self._pool is None: self._pool, self._owned_pool = self._build_pool() return self._pool @inject def _build_pool(self, pool: PostgreSqlPool = Provide["postgresql_pool"]) -> tuple[PostgreSqlPool, bool]: if not isinstance(pool, Provide): return pool, False pool = PostgreSqlPool( host=self.host, port=self.port, database=self.database, user=self.user, password=self.password, ) return pool, True