Source code for minos.aggregate.snapshots.pg.api

from __future__ import (
    annotations,
)

from typing import (
    TYPE_CHECKING,
    AsyncIterator,
    Awaitable,
)

from minos.common import (
    MinosConfig,
)

from ..abc import (
    SnapshotRepository,
)
from .readers import (
    PostgreSqlSnapshotReader,
)
from .writers import (
    PostgreSqlSnapshotWriter,
)

if TYPE_CHECKING:
    from ...models import (
        Aggregate,
    )


[docs]class PostgreSqlSnapshotRepository(SnapshotRepository): """PostgreSQL Snapshot class. The snapshot provides a direct accessor to the aggregate instances stored as events by the event repository class. """ reader: PostgreSqlSnapshotReader writer: PostgreSqlSnapshotWriter
[docs] def __init__(self, *args, reader: PostgreSqlSnapshotReader, writer: PostgreSqlSnapshotWriter, **kwargs): super().__init__(*args, **kwargs) self.reader = reader self.writer = writer
@classmethod def _from_config(cls, config: MinosConfig, **kwargs) -> PostgreSqlSnapshotRepository: if "reader" not in kwargs: kwargs["reader"] = PostgreSqlSnapshotReader.from_config(config, **kwargs) if "writer" not in kwargs: kwargs["writer"] = PostgreSqlSnapshotWriter.from_config(config, **kwargs) return cls(**config.snapshot._asdict(), **kwargs) async def _setup(self) -> None: await self.writer.setup() await self.reader.setup() await super()._setup() async def _destroy(self) -> None: await super()._destroy() await self.reader.destroy() await self.writer.destroy() def _get(self, *args, **kwargs) -> Awaitable[Aggregate]: return self.reader.get(*args, **kwargs) def _find(self, *args, **kwargs,) -> AsyncIterator[Aggregate]: return self.reader.find(*args, **kwargs) def _synchronize(self, *args, **kwargs) -> Awaitable[None]: return self.writer.dispatch(**kwargs)