Source code for minos.aggregate.snapshots.pg.readers

from __future__ import (
    annotations,
)

import logging
from typing import (
    TYPE_CHECKING,
    AsyncIterator,
    Optional,
)
from uuid import (
    UUID,
)

from minos.common import (
    NULL_UUID,
)

from ...exceptions import (
    AggregateNotFoundException,
)
from ...queries import (
    _Condition,
    _EqualCondition,
    _Ordering,
)
from ...transactions import (
    TransactionEntry,
)
from ..entries import (
    SnapshotEntry,
)
from .abc import (
    PostgreSqlSnapshotSetup,
)
from .queries import (
    PostgreSqlSnapshotQueryBuilder,
)

if TYPE_CHECKING:
    from ...models import (
        Aggregate,
    )

logger = logging.getLogger(__name__)


[docs]class PostgreSqlSnapshotReader(PostgreSqlSnapshotSetup): """PostgreSQL Snapshot class. The snapshot provides a direct accessor to the aggregate instances stored as events by the event repository class. """
[docs] async def get(self, aggregate_name: str, uuid: UUID, **kwargs) -> Aggregate: """Get an aggregate instance from its identifier. :param aggregate_name: Class name of the ``Aggregate``. :param uuid: Identifier of the ``Aggregate``. :param kwargs: Additional named arguments. :return: The ``Aggregate`` instance. """ snapshot_entry = await self.get_entry(aggregate_name, uuid, **kwargs) aggregate = snapshot_entry.build_aggregate(**kwargs) return aggregate
# noinspection PyUnusedLocal
[docs] async def get_entry(self, aggregate_name: str, uuid: UUID, **kwargs) -> SnapshotEntry: """Get a ``SnapshotEntry`` from its identifier. :param aggregate_name: Class name of the ``Aggregate``. :param uuid: Identifier of the ``Aggregate``. :param kwargs: Additional named arguments. :return: The ``SnapshotEntry`` instance. """ try: return await self.find_entries( aggregate_name, _EqualCondition("uuid", uuid), **kwargs | {"exclude_deleted": False}, ).__anext__() except StopAsyncIteration: raise AggregateNotFoundException(f"Some aggregates could not be found: {uuid!s}")
[docs] async def find(self, *args, **kwargs) -> AsyncIterator[Aggregate]: """Find a collection of ``Aggregate`` instances based on a ``Condition``. :param args: Additional positional arguments. :param kwargs: Additional named arguments. :return: An asynchronous iterator that containing the ``Aggregate`` instances. """ async for snapshot_entry in self.find_entries(*args, **kwargs): yield snapshot_entry.build_aggregate(**kwargs)
[docs] async def find_entries( self, aggregate_name: str, condition: _Condition, ordering: Optional[_Ordering] = None, limit: Optional[int] = None, streaming_mode: bool = False, transaction: Optional[TransactionEntry] = None, exclude_deleted: bool = True, **kwargs, ) -> AsyncIterator[SnapshotEntry]: """Find a collection of ``SnapshotEntry`` instances based on a ``Condition``. :param aggregate_name: Class name of the ``Aggregate``. :param condition: The condition that must be satisfied by the ``Aggregate`` instances. :param ordering: Optional argument to return the instance with specific ordering strategy. The default behaviour is to retrieve them without any order pattern. :param limit: Optional argument to return only a subset of instances. The default behaviour is to return all the instances that meet the given condition. :param streaming_mode: If ``True`` return the values in streaming directly from the database (keep an open database connection), otherwise preloads the full set of values on memory and then retrieves them. :param transaction: The transaction within the operation is performed. If not any value is provided, then the transaction is extracted from the context var. If not any transaction is being scoped then the query is performed to the global snapshot. :param exclude_deleted: If ``True``, deleted ``Aggregate`` entries are included, otherwise deleted ``Aggregate`` entries are filtered. :param kwargs: Additional named arguments. :return: An asynchronous iterator that containing the ``Aggregate`` instances. """ if transaction is None: transaction_uuids = (NULL_UUID,) else: transaction_uuids = await transaction.uuids qb = PostgreSqlSnapshotQueryBuilder( aggregate_name, condition, ordering, limit, transaction_uuids, exclude_deleted, ) query, parameters = qb.build() async with self.cursor() as cursor: # noinspection PyTypeChecker await cursor.execute(query, parameters) if streaming_mode: async for row in cursor: # noinspection PyArgumentList yield SnapshotEntry(*row) return else: rows = await cursor.fetchall() for row in rows: yield SnapshotEntry(*row)