Source code for minos.aggregate.models.refs.models

from __future__ import (
    annotations,
)

from typing import (
    Any,
    Generic,
    Optional,
    TypeVar,
    Union,
    get_args,
)
from uuid import (
    UUID,
    SafeUUID,
)

from dependency_injector.wiring import (
    Provide,
    inject,
)

from minos.common import (
    DataDecoder,
    DataEncoder,
    DeclarativeModel,
    Model,
    ModelType,
    SchemaDecoder,
    SchemaEncoder,
)
from minos.networks import (
    DynamicBroker,
    DynamicBrokerPool,
)

from ...contextvars import (
    IS_REPOSITORY_SERIALIZATION_CONTEXT_VAR,
)
from ..entities import (
    Entity,
)

MT = TypeVar("MT", bound=Model)


[docs]class AggregateRef(Entity): """Aggregate Ref class.""" version: int
[docs] def __init__(self, uuid: UUID, *args, **kwargs): super().__init__(uuid=uuid, *args, **kwargs)
[docs]class ModelRef(DeclarativeModel, UUID, Generic[MT]): """Model Reference.""" data: Union[MT, UUID]
[docs] @inject def __init__( self, data: Union[MT, UUID], *args, broker_pool: DynamicBrokerPool = Provide["broker_pool"], **kwargs, ): if not isinstance(data, UUID) and not hasattr(data, "uuid"): raise ValueError(f"data must be an {UUID!r} instance or have 'uuid' as one of its fields") DeclarativeModel.__init__(self, data, *args, **kwargs) self._broker_pool = broker_pool
def __getattr__(self, item: str) -> Any: try: return super().__getattr__(item) except AttributeError as exc: if item != "data": return getattr(self.data, item) raise exc @property def int(self) -> int: """Get the UUID as a 128-bit integer. :return: An integer value. """ return self.uuid.int @property def is_safe(self) -> SafeUUID: """Get an enum indicating whether the UUID has been generated in a way that is safe. :return: A ``SafeUUID`` value. """ return self.uuid.is_safe # noinspection PyMethodParameters
[docs] @classmethod def encode_schema(cls, encoder: SchemaEncoder, target: Any, **kwargs) -> Any: """Encode schema with the given encoder. :param encoder: The encoder instance. :param target: An optional pre-encoded schema. :return: The encoded schema of the instance. """ schema = encoder.build(target.type_hints["data"], **kwargs) return [(sub | {"logicalType": cls.classname}) for sub in schema]
[docs] @classmethod def decode_schema(cls, decoder: SchemaDecoder, target: Any, **kwargs) -> ModelType: """Decode schema with the given encoder. :param decoder: The decoder instance. :param target: The schema to be decoded. :return: The decoded schema as a type. """ decoded = decoder.build(target, **kwargs) if not isinstance(decoded, ModelType): raise ValueError(f"The decoded type is not valid: {decoded}") return ModelType.from_model(cls[decoded])
[docs] @staticmethod def encode_data(encoder: DataEncoder, target: Any, **kwargs) -> Any: """Encode data with the given encoder. :param encoder: The encoder instance. :param target: An optional pre-encoded data. :return: The encoded data of the instance. """ target = target["data"] if IS_REPOSITORY_SERIALIZATION_CONTEXT_VAR.get() and isinstance(target, dict): target = target["uuid"] return encoder.build(target, **kwargs)
[docs] @classmethod def decode_data(cls, decoder: DataDecoder, target: Any, type_: ModelType, **kwargs) -> ModelRef: """Decode data with the given decoder. :param decoder: The decoder instance. :param target: The data to be decoded. :param type_: The data type. :return: A decoded instance. """ decoded = decoder.build(target, type_.type_hints["data"], **kwargs) return cls(decoded, additional_type_hints=type_.type_hints)
def __eq__(self, other): return super().__eq__(other) or self.uuid == other or self.data == other def __hash__(self): return hash(self.uuid) @property def uuid(self) -> UUID: """Get the UUID that identifies the ``Model``. :return: """ if not self.resolved: return self.data return self.data.uuid @property def data_cls(self) -> Optional[type]: """Get data class if available. :return: A model type. """ args = get_args(self.type_hints["data"]) if args[0] != MT: return args[0] return None # noinspection PyUnusedLocal
[docs] async def resolve(self, force: bool = False, **kwargs) -> None: """Resolve the instance. :param force: If ``True``, the resolution will be performed also if it is not necessary. :param kwargs: Additional named arguments. :return: This method does not return anything. """ if not force and self.resolved: return name = self.data_cls.__name__ async with self._broker_pool.acquire() as broker: await broker.send(data={"uuid": self.uuid}, topic=f"Get{name}") self.data = await self._get_response(broker)
@staticmethod async def _get_response(handler: DynamicBroker, **kwargs) -> MT: handler_entry = await handler.get_one(**kwargs) response = handler_entry.data return response.data @property def resolved(self) -> bool: """Check if the instance is already resolved. :return: ``True`` if resolved or ``False`` otherwise. """ return not isinstance(self.data, UUID)