Source code for minos.common.database.pools

import logging
from asyncio import (
    sleep,
)
from collections.abc import (
    Hashable,
)
from typing import (
    Optional,
)

import aiopg
from aiomisc.pool import (
    ContextManager,
)
from aiopg import (
    Connection,
)
from psycopg2 import (
    OperationalError,
)

from ..pools import (
    MinosPool,
)
from .locks import (
    PostgreSqlLock,
)

logger = logging.getLogger(__name__)


[docs]class PostgreSqlPool(MinosPool[ContextManager]): """Postgres Pool class."""
[docs] def __init__(self, host: str, port: int, database: str, user: str, password: str, *args, **kwargs): super().__init__(*args, **kwargs) self.host = host self.port = port self.database = database self.user = user self.password = password
@classmethod def _from_config(cls, *args, config, **kwargs): return cls(*args, **config.repository._asdict(), **kwargs) async def _create_instance(self) -> Optional[Connection]: try: connection = await aiopg.connect( host=self.host, port=self.port, dbname=self.database, user=self.user, password=self.password ) except OperationalError as exc: logger.warning(f"There was an {exc!r} while trying to get a database connection.") await sleep(1) return None logger.info(f"Created {self.database!r} database connection identified by {id(connection)}!") return connection async def _destroy_instance(self, instance: Connection): if not instance.closed: await instance.close() logger.info(f"Destroyed {self.database!r} database connection identified by {id(instance)}!") async def _check_instance(self, instance: Optional[Connection]) -> bool: if instance is None: return False try: # This operation connects to the database and raises an exception if something goes wrong. instance.isolation_level except OperationalError: return False return not instance.closed
[docs]class PostgreSqlLockPool(PostgreSqlPool): """Postgres Locking Pool class."""
[docs] def acquire(self, key: Hashable, *args, **kwargs) -> PostgreSqlLock: """Acquire a new lock. :param key: The key to be used for locking. :return: A ``PostgreSqlLock`` instance. """ return PostgreSqlLock(super().acquire(), key, *args, **kwargs)