from __future__ import (
annotations,
)
from typing import (
AsyncIterator,
Optional,
)
from uuid import (
UUID,
)
from psycopg2 import (
IntegrityError,
)
from psycopg2.sql import (
SQL,
Composable,
Literal,
Placeholder,
)
from minos.common import (
NULL_UUID,
MinosConfig,
PostgreSqlMinosDatabase,
)
from ...exceptions import (
EventRepositoryConflictException,
)
from ..entries import (
EventEntry,
)
from .abc import (
EventRepository,
)
[docs]class PostgreSqlEventRepository(PostgreSqlMinosDatabase, EventRepository):
"""PostgreSQL-based implementation of the event repository class in ``Minos``."""
@classmethod
def _from_config(cls, *args, config: MinosConfig, **kwargs) -> Optional[EventRepository]:
return cls(*args, **config.repository._asdict(), **kwargs)
async def _setup(self):
"""Setup miscellaneous repository thing.
In the PostgreSQL case, configures the needed table to be used to store the data.
:return: This method does not return anything.
"""
await self.submit_query('CREATE EXTENSION IF NOT EXISTS "uuid-ossp";', lock="uuid-ossp")
await self.submit_query(_CREATE_ACTION_ENUM_QUERY, lock="aggregate_event")
await self.submit_query(_CREATE_TABLE_QUERY, lock="aggregate_event")
async def _submit(self, entry: EventEntry, **kwargs) -> EventEntry:
lock = None
if entry.aggregate_uuid != NULL_UUID:
lock = entry.aggregate_uuid.int & (1 << 32) - 1
query, params = await self._build_query(entry)
try:
response = await self.submit_query_and_fetchone(query, params, lock=lock)
except IntegrityError:
raise EventRepositoryConflictException(
f"{entry!r} could not be submitted due to a key (uuid, version, transaction) collision",
await self.offset,
)
entry.id, entry.aggregate_uuid, entry.version, entry.created_at = response
return entry
async def _build_query(self, entry: EventEntry) -> tuple[Composable, dict[str, UUID]]:
if entry.transaction_uuid != NULL_UUID:
transaction = await self._transaction_repository.get(uuid=entry.transaction_uuid)
transaction_uuids = await transaction.uuids
else:
transaction_uuids = (NULL_UUID,)
from_query_parts = list()
parameters = dict()
for index, transaction_uuid in enumerate(transaction_uuids, start=1):
name = f"transaction_uuid_{index}"
parameters[name] = transaction_uuid
from_query_parts.append(
_SELECT_TRANSACTION_CHUNK.format(index=Literal(index), transaction_uuid=Placeholder(name))
)
from_query = SQL(" UNION ALL ").join(from_query_parts)
query = _INSERT_VALUES_QUERY.format(from_parts=from_query)
return query, parameters | entry.as_raw()
async def _select(self, **kwargs) -> AsyncIterator[EventEntry]:
query = self._build_select_query(**kwargs)
async for row in self.submit_query_and_iter(query, kwargs, **kwargs):
yield EventEntry(*row)
# noinspection PyUnusedLocal
@staticmethod
def _build_select_query(
aggregate_uuid: Optional[UUID] = None,
aggregate_name: Optional[str] = None,
version: Optional[int] = None,
version_lt: Optional[int] = None,
version_gt: Optional[int] = None,
version_le: Optional[int] = None,
version_ge: Optional[int] = None,
id: Optional[int] = None,
id_lt: Optional[int] = None,
id_gt: Optional[int] = None,
id_le: Optional[int] = None,
id_ge: Optional[int] = None,
transaction_uuid: Optional[UUID] = None,
transaction_uuid_ne: Optional[UUID] = None,
transaction_uuid_in: Optional[tuple[UUID, ...]] = None,
**kwargs,
) -> str:
conditions = list()
if aggregate_uuid is not None:
conditions.append("aggregate_uuid = %(aggregate_uuid)s")
if aggregate_name is not None:
conditions.append("aggregate_name = %(aggregate_name)s")
if version is not None:
conditions.append("version = %(version)s")
if version_lt is not None:
conditions.append("version < %(version_lt)s")
if version_gt is not None:
conditions.append("version > %(version_gt)s")
if version_le is not None:
conditions.append("version <= %(version_le)s")
if version_ge is not None:
conditions.append("version >= %(version_ge)s")
if id is not None:
conditions.append("id = %(id)s")
if id_lt is not None:
conditions.append("id < %(id_lt)s")
if id_gt is not None:
conditions.append("id > %(id_gt)s")
if id_le is not None:
conditions.append("id <= %(id_le)s")
if id_ge is not None:
conditions.append("id >= %(id_ge)s")
if transaction_uuid is not None:
conditions.append("transaction_uuid = %(transaction_uuid)s")
if transaction_uuid_ne is not None:
conditions.append("transaction_uuid <> %(transaction_uuid_ne)s")
if transaction_uuid_in is not None:
conditions.append("transaction_uuid IN %(transaction_uuid_in)s")
if not conditions:
return f"{_SELECT_ALL_ENTRIES_QUERY} ORDER BY id;"
return f"{_SELECT_ALL_ENTRIES_QUERY} WHERE {' AND '.join(conditions)} ORDER BY id;"
@property
async def _offset(self) -> int:
return (await self.submit_query_and_fetchone(_SELECT_MAX_ID_QUERY))[0] or 0
_CREATE_ACTION_ENUM_QUERY = """
DO
$$
BEGIN
IF NOT EXISTS(SELECT *
FROM pg_type typ
INNER JOIN pg_namespace nsp
ON nsp.oid = typ.typnamespace
WHERE nsp.nspname = current_schema()
AND typ.typname = 'action_type') THEN
CREATE TYPE action_type AS ENUM ('create', 'update', 'delete');
END IF;
END;
$$
LANGUAGE plpgsql;
""".strip()
_CREATE_TABLE_QUERY = """
CREATE TABLE IF NOT EXISTS aggregate_event (
id BIGSERIAL PRIMARY KEY,
action ACTION_TYPE NOT NULL,
aggregate_uuid UUID NOT NULL,
aggregate_name TEXT NOT NULL,
version INT NOT NULL,
data BYTEA NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
transaction_uuid UUID NOT NULL DEFAULT uuid_nil(),
UNIQUE (aggregate_uuid, version, transaction_uuid)
);
""".strip()
_INSERT_VALUES_QUERY = SQL(
"""
INSERT INTO aggregate_event (id, action, aggregate_uuid, aggregate_name, version, data, created_at, transaction_uuid)
VALUES (
default,
%(action)s,
CASE %(aggregate_uuid)s WHEN uuid_nil() THEN uuid_generate_v4() ELSE %(aggregate_uuid)s END,
%(aggregate_name)s,
(
SELECT (CASE WHEN %(version)s IS NULL THEN 1 + COALESCE(MAX(t2.version), 0) ELSE %(version)s END)
FROM (
SELECT DISTINCT ON (t1.aggregate_uuid) t1.version
FROM ( {from_parts} ) AS t1
ORDER BY t1.aggregate_uuid, t1.transaction_index DESC
) AS t2
),
%(data)s,
(CASE WHEN %(created_at)s IS NULL THEN NOW() ELSE %(created_at)s END),
%(transaction_uuid)s
)
RETURNING id, aggregate_uuid, version, created_at;
"""
)
_SELECT_TRANSACTION_CHUNK = SQL(
"""
SELECT {index} AS transaction_index, aggregate_uuid, MAX(version) AS version
FROM aggregate_event
WHERE aggregate_uuid = %(aggregate_uuid)s AND transaction_uuid = {transaction_uuid}
GROUP BY aggregate_uuid
"""
)
_SELECT_ALL_ENTRIES_QUERY = """
SELECT aggregate_uuid, aggregate_name, version, data, id, action, created_at, transaction_uuid
FROM aggregate_event
""".strip()
_SELECT_MAX_ID_QUERY = "SELECT MAX(id) FROM aggregate_event;".strip()