Source code for minos.networks.brokers.handlers.consumers

from __future__ import (

import logging
from contextlib import (
from itertools import (
from typing import (

from aiokafka import (
from kafka.errors import (
from psycopg2.sql import (

from minos.common import (

from ...decorators import (
from .abc import (

logger = logging.getLogger(__name__)

[docs]class BrokerConsumer(BrokerHandlerSetup): """Broker Consumer class.""" __slots__ = "_topics", "_broker", "_client"
[docs] def __init__( self, topics: set[str] = None, broker: Optional[BROKER] = None, client: Optional[AIOKafkaConsumer] = None, group_id: Optional[str] = "default", **kwargs, ): super().__init__(**kwargs) if topics is None: topics = set() self._topics = set(topics) self._broker = broker self._client = client self._group_id = group_id
@classmethod def _from_config(cls, config: MinosConfig, **kwargs) -> BrokerConsumer: topics = set() for name in decorators = EnrouteAnalyzer(name, config).get_broker_command_query_event() topics |= {decorator.topic for decorator in chain(*decorators.values())} # noinspection PyProtectedMember return cls( topics=topics,,, **, **kwargs ) async def _setup(self) -> None: await super()._setup() await self.client.start() async def _destroy(self) -> None: try: await self.client.stop() except KafkaError: # pragma: no cover pass await super()._destroy() @property def topics(self) -> set[str]: """Topics getter. :return: A list of string values. """ return self._topics
[docs] async def add_topic(self, topic: str) -> None: """Add a topic to the consumer's subscribed topics. :param topic: Name of the topic to be added. :return: This method does not return anything. """ self._topics.add(topic) self.client.subscribe(topics=list(self._topics))
[docs] async def remove_topic(self, topic: str) -> None: """Remove a topic from the consumer's subscribed topics. :param topic: Name of the topic to be removed. :return: This method does not return anything. """ self._topics.remove(topic) if len(self._topics): self.client.subscribe(topics=list(self._topics)) else: self.client.unsubscribe()
@property def client(self) -> AIOKafkaConsumer: """Get the kafka consumer client. :return: An ``AIOKafkaConsumer`` instance. """ if self._client is None: # pragma: no cover self._client = AIOKafkaConsumer( *self._topics, bootstrap_servers=f"{}:{self._broker.port}", group_id=self._group_id, enable_auto_commit=False, auto_offset_reset="earliest", ) return self._client
[docs] async def dispatch(self) -> NoReturn: """Perform a dispatching step. :return: This method does not return anything. """ await self.handle_message(self.client)
[docs] async def handle_message(self, consumer: Any) -> None: """Message consumer. It consumes the messages and sends them for processing. Args: consumer: Kafka Consumer instance (at the moment only Kafka consumer is supported). """ async for message in consumer: await self.handle_single_message(message) with suppress(IllegalStateError): await consumer.commit()
[docs] async def handle_single_message(self, message): """Handle Kafka messages. Add the message to the event_queue table. Args: message: Kafka message. Raises: Exception: An error occurred inserting record. """ logger.debug(f"Consuming message with {message.topic!s} topic...") return await self.enqueue(message.topic, message.partition, message.value)
[docs] async def enqueue(self, topic: str, partition: int, binary: bytes) -> int: """Insert row into queue table. Retrieves number of affected rows and row ID. Args: topic: Kafka topic. Example: "TicketAdded" partition: Kafka partition number. binary: Broker Message in bytes. Returns: Queue ID. Example: 12 Raises: Exception: An error occurred inserting record. """ row = await self.submit_query_and_fetchone(_INSERT_QUERY, (topic, partition, binary)) await self.submit_query(_NOTIFY_QUERY.format(Identifier(topic))) return row[0]
_INSERT_QUERY = SQL("INSERT INTO consumer_queue (topic, partition, data) VALUES (%s, %s, %s) RETURNING id") _NOTIFY_QUERY = SQL("NOTIFY {}")