from __future__ import (
    annotations,
)
import logging
from typing import (
    Any,
    Optional,
)
from uuid import (
    UUID,
)
from psycopg2.sql import (
    SQL,
)
from minos.common import (
    MinosConfig,
)
from ..messages import (
    BrokerMessage,
    BrokerMessageStatus,
    BrokerMessageStrategy,
)
from .abc import (
    BrokerPublisherSetup,
)
logger = logging.getLogger(__name__)
[docs]class BrokerPublisher(BrokerPublisherSetup):
    """Broker Publisher class."""
    @classmethod
    def _from_config(cls, *args, config: MinosConfig, **kwargs) -> BrokerPublisher:
        # noinspection PyProtectedMember
        return cls(*args, **config.broker.queue._asdict(), **kwargs)
    # noinspection PyMethodOverriding
[docs]    async def send(
        self,
        data: Any,
        topic: str,
        *,
        identifier: Optional[UUID] = None,
        reply_topic: Optional[str] = None,
        user: Optional[UUID] = None,
        status: BrokerMessageStatus = BrokerMessageStatus.SUCCESS,
        strategy: BrokerMessageStrategy = BrokerMessageStrategy.UNICAST,
        headers: Optional[dict[str, str]] = None,
        **kwargs,
    ) -> UUID:
        """Send a ``BrokerMessage``.
        :param data: The data to be send.
        :param topic: Topic in which the message will be published.
        :param identifier: The identifier of the message.
        :param reply_topic: An optional topic name to wait for a response.
        :param user: The user identifier that send the message.
        :param status: The status code of the message.
        :param strategy: The publishing strategy.
        :param headers: A mapping of string values identified by a string key.
        :param kwargs: Additional named arguments.
        :return: The ``UUID`` identifier of the message.
        """
        message = BrokerMessage(
            topic=topic,
            data=data,
            identifier=identifier,
            status=status,
            reply_topic=reply_topic,
            user=user,
            strategy=strategy,
            headers=headers,
        )
        logger.info(f"Publishing '{message!s}'...")
        await self.enqueue(message.topic, message.strategy, message.avro_bytes)
        return message.identifier 
[docs]    async def enqueue(self, topic: str, strategy: BrokerMessageStrategy, raw: bytes) -> int:
        """Send a sequence of bytes to the given topic.
        :param topic: Topic in which the bytes will be send.
        :param strategy: The publishing strategy.
        :param raw: Bytes sequence to be send.
        :return: The identifier of the message in the queue.
        """
        params = (topic, raw, strategy)
        raw = await self.submit_query_and_fetchone(_INSERT_ENTRY_QUERY, params)
        await self.submit_query(_NOTIFY_QUERY)
        return raw[0]  
_INSERT_ENTRY_QUERY = SQL("INSERT INTO producer_queue (topic, data, strategy) VALUES (%s, %s, %s) RETURNING id")
_NOTIFY_QUERY = SQL("NOTIFY producer_queue")