from __future__ import (
annotations,
)
from abc import (
ABC,
abstractmethod,
)
from asyncio import (
gather,
)
from contextlib import (
suppress,
)
from typing import (
TYPE_CHECKING,
AsyncIterator,
Awaitable,
Optional,
Union,
)
from uuid import (
UUID,
)
from dependency_injector.wiring import (
Provide,
inject,
)
from minos.common import (
NULL_UUID,
Lock,
MinosPool,
MinosSetup,
NotProvidedException,
)
from minos.networks import (
BrokerMessageStrategy,
BrokerPublisher,
)
from ...contextvars import (
IS_REPOSITORY_SERIALIZATION_CONTEXT_VAR,
)
from ...exceptions import (
EventRepositoryConflictException,
EventRepositoryException,
)
from ...transactions import (
TRANSACTION_CONTEXT_VAR,
TransactionEntry,
TransactionRepository,
TransactionStatus,
)
from ..entries import (
EventEntry,
)
if TYPE_CHECKING:
from ...models import (
AggregateDiff,
)
[docs]class EventRepository(ABC, MinosSetup):
"""Base event repository class in ``minos``."""
[docs] @inject
def __init__(
self,
broker_publisher: BrokerPublisher = Provide["broker_publisher"],
transaction_repository: TransactionRepository = Provide["transaction_repository"],
lock_pool: MinosPool[Lock] = Provide["lock_pool"],
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
if broker_publisher is None or isinstance(broker_publisher, Provide):
raise NotProvidedException("A broker instance is required.")
if transaction_repository is None or isinstance(transaction_repository, Provide):
raise NotProvidedException("A transaction repository instance is required.")
if lock_pool is None or isinstance(lock_pool, Provide):
raise NotProvidedException("A lock pool instance is required.")
self._broker_publisher = broker_publisher
self._transaction_repository = transaction_repository
self._lock_pool = lock_pool
[docs] def transaction(self, **kwargs) -> TransactionEntry:
"""Build a transaction instance related to the repository.
:param kwargs: Additional named arguments.
:return: A new ``TransactionEntry`` instance.
"""
return TransactionEntry(event_repository=self, transaction_repository=self._transaction_repository, **kwargs)
[docs] async def create(self, entry: Union[AggregateDiff, EventEntry]) -> EventEntry:
"""Store new creation entry into the repository.
:param entry: Entry to be stored.
:return: The repository entry containing the stored information.
"""
from ...models import (
Action,
)
entry.action = Action.CREATE
return await self.submit(entry)
[docs] async def update(self, entry: Union[AggregateDiff, EventEntry]) -> EventEntry:
"""Store new update entry into the repository.
:param entry: Entry to be stored.
:return: The repository entry containing the stored information.
"""
from ...models import (
Action,
)
entry.action = Action.UPDATE
return await self.submit(entry)
[docs] async def delete(self, entry: Union[AggregateDiff, EventEntry]) -> EventEntry:
"""Store new deletion entry into the repository.
:param entry: Entry to be stored.
:return: The repository entry containing the stored information.
"""
from ...models import (
Action,
)
entry.action = Action.DELETE
return await self.submit(entry)
[docs] async def submit(self, entry: Union[AggregateDiff, EventEntry], **kwargs) -> EventEntry:
"""Store new entry into the repository.
:param entry: The entry to be stored.
:param kwargs: Additional named arguments.
:return: The repository entry containing the stored information.
"""
from ...models import (
Action,
AggregateDiff,
)
token = IS_REPOSITORY_SERIALIZATION_CONTEXT_VAR.set(True)
try:
transaction = TRANSACTION_CONTEXT_VAR.get()
if isinstance(entry, AggregateDiff):
entry = EventEntry.from_aggregate_diff(entry, transaction=transaction)
if not isinstance(entry.action, Action):
raise EventRepositoryException("The 'EventEntry.action' attribute must be an 'Action' instance.")
async with self.write_lock():
if not await self.validate(entry, **kwargs):
raise EventRepositoryConflictException(f"{entry!r} could not be committed!", await self.offset)
entry = await self._submit(entry, **kwargs)
if entry.transaction_uuid == NULL_UUID:
await self._send_events(entry.aggregate_diff)
finally:
IS_REPOSITORY_SERIALIZATION_CONTEXT_VAR.reset(token)
return entry
# noinspection PyUnusedLocal
[docs] async def validate(self, entry: EventEntry, transaction_uuid_ne: Optional[UUID] = None, **kwargs) -> bool:
"""Check if it is able to submit the given entry.
:param entry: The entry to be validated.
:param transaction_uuid_ne: Optional transaction identifier to skip it from the validation.
:param kwargs: Additional named arguments.
:return: ``True`` if the entry can be submitted or ``False`` otherwise.
"""
iterable = self._transaction_repository.select(
destination_uuid=entry.transaction_uuid,
uuid_ne=transaction_uuid_ne,
status_in=(TransactionStatus.RESERVING, TransactionStatus.RESERVED, TransactionStatus.COMMITTING,),
)
transaction_uuids = {e.uuid async for e in iterable}
if len(transaction_uuids):
with suppress(StopAsyncIteration):
iterable = self.select(
aggregate_uuid=entry.aggregate_uuid, transaction_uuid_in=tuple(transaction_uuids), **kwargs
)
await iterable.__anext__() # Will raise a `StopAsyncIteration` exception if not any item.
return False
return True
@abstractmethod
async def _submit(self, entry: EventEntry, **kwargs) -> EventEntry:
raise NotImplementedError
async def _send_events(self, aggregate_diff: AggregateDiff):
from ...models import (
Action,
)
suffix_mapper = {
Action.CREATE: "Created",
Action.UPDATE: "Updated",
Action.DELETE: "Deleted",
}
topic = f"{aggregate_diff.simplified_name}{suffix_mapper[aggregate_diff.action]}"
futures = [self._broker_publisher.send(aggregate_diff, topic, strategy=BrokerMessageStrategy.MULTICAST)]
if aggregate_diff.action == Action.UPDATE:
from ...models import (
IncrementalFieldDiff,
)
for decomposed_aggregate_diff in aggregate_diff.decompose():
diff = next(iter(decomposed_aggregate_diff.fields_diff.flatten_values()))
composed_topic = f"{topic}.{diff.name}"
if isinstance(diff, IncrementalFieldDiff):
composed_topic += f".{diff.action.value}"
futures.append(
self._broker_publisher.send(
decomposed_aggregate_diff, composed_topic, strategy=BrokerMessageStrategy.MULTICAST
)
)
await gather(*futures)
# noinspection PyShadowingBuiltins
[docs] async def select(
self,
aggregate_uuid: Optional[UUID] = None,
aggregate_name: Optional[str] = None,
version: Optional[int] = None,
version_lt: Optional[int] = None,
version_gt: Optional[int] = None,
version_le: Optional[int] = None,
version_ge: Optional[int] = None,
id: Optional[int] = None,
id_lt: Optional[int] = None,
id_gt: Optional[int] = None,
id_le: Optional[int] = None,
id_ge: Optional[int] = None,
transaction_uuid: Optional[UUID] = None,
transaction_uuid_ne: Optional[UUID] = None,
transaction_uuid_in: Optional[tuple[UUID, ...]] = None,
**kwargs,
) -> AsyncIterator[EventEntry]:
"""Perform a selection query of entries stored in to the repository.
:param aggregate_uuid: Aggregate identifier.
:param aggregate_name: Aggregate name.
:param version: Aggregate version.
:param version_lt: Aggregate version lower than the given value.
:param version_gt: Aggregate version greater than the given value.
:param version_le: Aggregate version lower or equal to the given value.
:param version_ge: Aggregate version greater or equal to the given value.
:param id: Entry identifier.
:param id_lt: Entry identifier lower than the given value.
:param id_gt: Entry identifier greater than the given value.
:param id_le: Entry identifier lower or equal to the given value.
:param id_ge: Entry identifier greater or equal to the given value.
:param transaction_uuid: Transaction identifier.
:param transaction_uuid_ne: Transaction identifier distinct of the given value.
:param transaction_uuid_in: Destination Transaction identifier equal to one of the given values.
:return: A list of entries.
"""
generator = self._select(
aggregate_uuid=aggregate_uuid,
aggregate_name=aggregate_name,
version=version,
version_lt=version_lt,
version_gt=version_gt,
version_le=version_le,
version_ge=version_ge,
id=id,
id_lt=id_lt,
id_gt=id_gt,
id_le=id_le,
id_ge=id_ge,
transaction_uuid=transaction_uuid,
transaction_uuid_ne=transaction_uuid_ne,
transaction_uuid_in=transaction_uuid_in,
**kwargs,
)
# noinspection PyTypeChecker
async for entry in generator:
yield entry
@abstractmethod
async def _select(self, *args, **kwargs) -> AsyncIterator[EventEntry]:
"""Perform a selection query of entries stored in to the repository."""
@property
def offset(self) -> Awaitable[int]:
"""Get the current repository offset.
:return: An awaitable containing an integer value.
"""
return self._offset
@property
@abstractmethod
async def _offset(self) -> int:
raise NotImplementedError
[docs] def write_lock(self) -> Lock:
"""Get a write lock.
:return: An asynchronous context manager.
"""
return self._lock_pool.acquire("aggregate_event_write_lock")