Source code for minos.networks.brokers.messages

from __future__ import (
    annotations,
)

from contextvars import (
    ContextVar,
)
from enum import (
    Enum,
    IntEnum,
)
from typing import (
    Any,
    Final,
    Optional,
)
from uuid import (
    UUID,
    uuid4,
)

from minos.common import (
    DeclarativeModel,
)

REQUEST_REPLY_TOPIC_CONTEXT_VAR: Final[ContextVar[Optional[str]]] = ContextVar("reply_topic", default=None)
REQUEST_HEADERS_CONTEXT_VAR: Final[ContextVar[Optional[dict[str, str]]]] = ContextVar("headers", default=None)


[docs]class BrokerMessage(DeclarativeModel): """Broker Message class.""" topic: str data: Any identifier: UUID reply_topic: Optional[str] user: Optional[UUID] status: BrokerMessageStatus strategy: BrokerMessageStrategy headers: dict[str, str]
[docs] def __init__( self, topic: str, data: Any, *, identifier: Optional[UUID] = None, status: Optional[BrokerMessageStatus] = None, strategy: Optional[BrokerMessageStrategy] = None, headers: Optional[dict[str, str]] = None, **kwargs ): if identifier is None: identifier = uuid4() if status is None: status = BrokerMessageStatus.SUCCESS if strategy is None: strategy = BrokerMessageStrategy.UNICAST if headers is None: headers = dict() super().__init__( topic=topic, data=data, identifier=identifier, status=status, strategy=strategy, headers=headers, **kwargs )
@property def ok(self) -> bool: """Check if the reply is okay or not. :return: ``True`` if the reply is okay or ``False`` otherwise. """ return self.status == BrokerMessageStatus.SUCCESS
[docs]class BrokerMessageStatus(IntEnum): """Broker Message Status class.""" SUCCESS = 200 ERROR = 400 SYSTEM_ERROR = 500
[docs]class BrokerMessageStrategy(str, Enum): """Broker Message Strategy class""" UNICAST = "unicast" MULTICAST = "multicast"