Source code for minos.networks.brokers.handlers.handlers

from __future__ import (
    annotations,
)

import logging
from asyncio import (
    CancelledError,
    PriorityQueue,
    Task,
    TimeoutError,
    create_task,
    gather,
    wait_for,
)
from functools import (
    wraps,
)
from inspect import (
    isawaitable,
)
from typing import (
    Any,
    Awaitable,
    Callable,
    KeysView,
    NoReturn,
    Optional,
    Union,
)

from aiopg import (
    Cursor,
)
from cached_property import (
    cached_property,
)
from dependency_injector.wiring import (
    Provide,
    inject,
)
from psycopg2.sql import (
    SQL,
    Identifier,
)

from minos.common import (
    MinosConfig,
    NotProvidedException,
)

from ...decorators import (
    EnrouteBuilder,
)
from ...exceptions import (
    MinosActionNotFoundException,
)
from ...requests import (
    REQUEST_USER_CONTEXT_VAR,
    Response,
    ResponseException,
)
from ...utils import (
    consume_queue,
)
from ..messages import (
    REQUEST_HEADERS_CONTEXT_VAR,
    BrokerMessage,
    BrokerMessageStatus,
)
from ..publishers import (
    BrokerPublisher,
)
from .abc import (
    BrokerHandlerSetup,
)
from .entries import (
    BrokerHandlerEntry,
)
from .requests import (
    BrokerRequest,
    BrokerResponse,
)

logger = logging.getLogger(__name__)


[docs]class BrokerHandler(BrokerHandlerSetup): """Broker Handler class.""" __slots__ = "_handlers", "_records", "_retry", "_queue", "_consumers", "_consumer_concurrency"
[docs] def __init__( self, records: int, handlers: dict[str, Optional[Callable]], retry: int, publisher: BrokerPublisher, consumer_concurrency: int = 15, **kwargs: Any, ): super().__init__(**kwargs) self._handlers = handlers self._records = records self._retry = retry self._queue = PriorityQueue(maxsize=self._records) self._consumers: list[Task] = list() self._consumer_concurrency = consumer_concurrency self._publisher = publisher
@classmethod def _from_config(cls, config: MinosConfig, **kwargs) -> BrokerHandler: kwargs["handlers"] = cls._get_handlers(config, **kwargs) kwargs["publisher"] = cls._get_publisher(**kwargs) # noinspection PyProtectedMember return cls(**config.broker.queue._asdict(), **kwargs) @staticmethod def _get_handlers( config: MinosConfig, handlers: dict[str, Optional[Callable]] = None, **kwargs ) -> dict[str, Callable[[BrokerRequest], Awaitable[Optional[BrokerResponse]]]]: if handlers is None: builder = EnrouteBuilder(*config.services, middleware=config.middleware) decorators = builder.get_broker_command_query_event(config=config, **kwargs) handlers = {decorator.topic: fn for decorator, fn in decorators.items()} return handlers # noinspection PyUnusedLocal @staticmethod @inject def _get_publisher( publisher: Optional[BrokerPublisher] = None, broker_publisher: BrokerPublisher = Provide["broker_publisher"], **kwargs, ) -> BrokerPublisher: if publisher is None: publisher = broker_publisher if publisher is None or isinstance(publisher, Provide): raise NotProvidedException(f"A {BrokerPublisher!r} object must be provided.") return publisher async def _setup(self) -> None: await super()._setup() await self._create_consumers() async def _destroy(self) -> None: await self._destroy_consumers() await super()._destroy() async def _create_consumers(self): while len(self._consumers) < self._consumer_concurrency: self._consumers.append(create_task(self._consume())) async def _destroy_consumers(self): for consumer in self._consumers: consumer.cancel() await gather(*self._consumers, return_exceptions=True) self._consumers = list() while not self._queue.empty(): entry = self._queue.get_nowait() await self.submit_query(self._queries["update_not_processed"], (entry.id,)) async def _consume(self) -> None: while True: await self._consume_one() async def _consume_one(self) -> None: entry = await self._queue.get() try: await self._dispatch_one(entry) finally: self._queue.task_done() @property def publisher(self) -> BrokerPublisher: """Get the publisher instance. :return: A ``BrokerPublisher`` instance. """ return self._publisher @property def consumers(self) -> list[Task]: """Get the consumers. :return: A list of ``Task`` instances. """ return self._consumers @property def handlers(self) -> dict[str, Optional[Callable]]: """Handlers getter. :return: A dictionary in which the keys are topics and the values are the handler. """ return self._handlers @property def topics(self) -> KeysView[str]: """Get an iterable containing the topic names. :return: An ``Iterable`` of ``str``. """ return self.handlers.keys()
[docs] async def dispatch_forever(self, max_wait: Optional[float] = 60.0) -> NoReturn: """Dispatch the items in the consuming queue forever. :param max_wait: Maximum seconds to wait for notifications. If ``None`` the wait is performed until infinity. :return: This method does not return anything. """ async with self.cursor() as cursor: await self._listen_entries(cursor) try: while True: await self._wait_for_entries(cursor, max_wait) await self.dispatch(cursor, background_mode=True) finally: await self._unlisten_entries(cursor)
async def _listen_entries(self, cursor: Cursor): for topic in self.topics: # noinspection PyTypeChecker await cursor.execute(_LISTEN_QUERY.format(Identifier(topic))) async def _unlisten_entries(self, cursor: Cursor) -> None: for topic in self.topics: # noinspection PyTypeChecker await cursor.execute(_UNLISTEN_QUERY.format(Identifier(topic))) async def _wait_for_entries(self, cursor: Cursor, max_wait: Optional[float]) -> None: if await self._get_count(cursor): return while True: try: return await wait_for(consume_queue(cursor.connection.notifies, self._records), max_wait) except TimeoutError: if await self._get_count(cursor): return async def _get_count(self, cursor) -> int: if not len(self.topics): return 0 await cursor.execute(_COUNT_NOT_PROCESSED_QUERY, (self._retry, tuple(self.topics))) count = (await cursor.fetchone())[0] return count
[docs] async def dispatch(self, cursor: Optional[Cursor] = None, background_mode: bool = False) -> None: """Dispatch a batch of ``HandlerEntry`` instances from the database's queue. :param cursor: The cursor to interact with the database. If ``None`` is provided a new one is acquired. :param background_mode: If ``True`` the entries dispatching waits until every entry is processed. Otherwise, the dispatching is performed on background. :return: This method does not return anything. """ is_external_cursor = cursor is not None if not is_external_cursor: cursor = await self.cursor().__aenter__() async with cursor.begin(): await cursor.execute( self._queries["select_not_processed"], (self._retry, tuple(self.topics), self._records) ) result = await cursor.fetchall() if len(result): entries = self._build_entries(result) await cursor.execute(self._queries["mark_processing"], (tuple(e.id for e in entries),)) for entry in entries: await self._queue.put(entry) if not is_external_cursor: await cursor.__aexit__(None, None, None) if not background_mode: await self._queue.join()
def _build_entries(self, rows: list[tuple]) -> list[BrokerHandlerEntry]: kwargs = {"callback_lookup": self.get_action} return [BrokerHandlerEntry(*row, **kwargs) for row in rows] async def _dispatch_one(self, entry: BrokerHandlerEntry) -> None: logger.debug(f"Dispatching '{entry!r}'...") try: await self.dispatch_one(entry) except (CancelledError, Exception) as exc: logger.warning(f"Raised an exception while dispatching {entry!r}: {exc!r}") entry.exception = exc if isinstance(exc, CancelledError): raise exc finally: query_id = "delete_processed" if entry.success else "update_not_processed" await self.submit_query(self._queries[query_id], (entry.id,))
[docs] async def dispatch_one(self, entry: BrokerHandlerEntry) -> None: """Dispatch one row. :param entry: Entry to be dispatched. :return: This method does not return anything. """ logger.info(f"Dispatching '{entry!s}'...") fn = self.get_callback(entry.callback) message = entry.data data, status, headers = await fn(message) if message.reply_topic is not None: await self.publisher.send( data, topic=message.reply_topic, identifier=message.identifier, status=status, user=message.user, headers=headers, )
[docs] @staticmethod def get_callback( fn: Callable[[BrokerRequest], Union[Optional[BrokerRequest], Awaitable[Optional[BrokerRequest]]]] ) -> Callable[[BrokerMessage], Awaitable[tuple[Any, BrokerMessageStatus, dict[str, str]]]]: """Get the handler function to be used by the Broker Handler. :param fn: The action function. :return: A wrapper function around the given one that is compatible with the Broker Handler API. """ @wraps(fn) async def _wrapper(raw: BrokerMessage) -> tuple[Any, BrokerMessageStatus, dict[str, str]]: request = BrokerRequest(raw) user_token = REQUEST_USER_CONTEXT_VAR.set(request.user) headers_token = REQUEST_HEADERS_CONTEXT_VAR.set(raw.headers) try: response = fn(request) if isawaitable(response): response = await response if isinstance(response, Response): response = await response.content() return response, BrokerMessageStatus.SUCCESS, REQUEST_HEADERS_CONTEXT_VAR.get() except ResponseException as exc: logger.warning(f"Raised an application exception: {exc!s}") return repr(exc), BrokerMessageStatus.ERROR, REQUEST_HEADERS_CONTEXT_VAR.get() except Exception as exc: logger.exception(f"Raised a system exception: {exc!r}") return repr(exc), BrokerMessageStatus.SYSTEM_ERROR, REQUEST_HEADERS_CONTEXT_VAR.get() finally: REQUEST_USER_CONTEXT_VAR.reset(user_token) REQUEST_HEADERS_CONTEXT_VAR.reset(headers_token) return _wrapper
[docs] def get_action(self, topic: str) -> Optional[Callable]: """Get handling function to be called. Gets the instance of the class and method to call. Args: topic: Kafka topic. Example: "TicketAdded" Raises: MinosNetworkException: topic TicketAdded have no controller/action configured, please review th configuration file. """ if topic not in self._handlers: raise MinosActionNotFoundException( f"topic {topic} have no controller/action configured, " f"please review th configuration file" ) handler = self._handlers[topic] logger.debug(f"Loaded {handler!r} action!") return handler
@cached_property def _queries(self) -> dict[str, str]: # noinspection PyTypeChecker return { "count_not_processed": _COUNT_NOT_PROCESSED_QUERY, "select_not_processed": _SELECT_NOT_PROCESSED_QUERY, "mark_processing": _MARK_PROCESSING_QUERY, "delete_processed": _DELETE_PROCESSED_QUERY, "update_not_processed": _UPDATE_NOT_PROCESSED_QUERY, }
# noinspection SqlDerivedTableAlias _COUNT_NOT_PROCESSED_QUERY = SQL( "SELECT COUNT(*) " "FROM (SELECT id FROM consumer_queue WHERE NOT processing AND retry < %s AND topic IN %s FOR UPDATE SKIP LOCKED) s" ) _SELECT_NOT_PROCESSED_QUERY = SQL( "SELECT id, topic, partition, data, retry, created_at, updated_at " "FROM consumer_queue " "WHERE NOT processing AND retry < %s AND topic IN %s " "ORDER BY created_at " "LIMIT %s " "FOR UPDATE SKIP LOCKED" ) _MARK_PROCESSING_QUERY = SQL("UPDATE consumer_queue SET processing = TRUE WHERE id IN %s") _DELETE_PROCESSED_QUERY = SQL("DELETE FROM consumer_queue WHERE id = %s") _UPDATE_NOT_PROCESSED_QUERY = SQL( "UPDATE consumer_queue SET processing = FALSE, retry = retry + 1, updated_at = NOW() WHERE id = %s" ) _LISTEN_QUERY = SQL("LISTEN {}") _UNLISTEN_QUERY = SQL("UNLISTEN {}")