Source code for minos.networks.brokers.dynamic.pools

from __future__ import (
    annotations,
)

import logging
from contextvars import (
    Token,
)
from typing import (
    AsyncContextManager,
    Optional,
)
from uuid import (
    uuid4,
)

from dependency_injector.wiring import (
    Provide,
    inject,
)
from kafka import (
    KafkaAdminClient,
)
from kafka.admin import (
    NewTopic,
)

from minos.common import (
    MinosConfig,
    MinosPool,
    NotProvidedException,
)

from ..handlers import (
    BrokerConsumer,
)
from ..messages import (
    REQUEST_REPLY_TOPIC_CONTEXT_VAR,
)
from ..publishers import (
    BrokerPublisher,
)
from .brokers import (
    DynamicBroker,
)

logger = logging.getLogger(__name__)


[docs]class DynamicBrokerPool(MinosPool): """Dynamic Broker Pool class."""
[docs] def __init__( self, config: MinosConfig, client: KafkaAdminClient, consumer: BrokerConsumer, publisher: BrokerPublisher, maxsize: int = 5, recycle: Optional[int] = 3600, *args, **kwargs, ): super().__init__(maxsize=maxsize, recycle=recycle, *args, **kwargs) self.config = config self.client = client self.consumer = consumer self.publisher = publisher
@classmethod def _from_config(cls, config: MinosConfig, **kwargs) -> DynamicBrokerPool: kwargs["client"] = KafkaAdminClient(bootstrap_servers=f"{config.broker.host}:{config.broker.port}") kwargs["consumer"] = cls._get_consumer(**kwargs) kwargs["publisher"] = cls._get_publisher(**kwargs) return cls(config, **kwargs) async def _destroy(self) -> None: await super()._destroy() self.client.close() # noinspection PyUnusedLocal @staticmethod @inject def _get_consumer( consumer: Optional[BrokerConsumer] = None, broker_consumer: BrokerConsumer = Provide["broker_consumer"], **kwargs, ) -> BrokerConsumer: if consumer is None: consumer = broker_consumer if consumer is None or isinstance(consumer, Provide): raise NotProvidedException(f"A {BrokerConsumer!r} object must be provided.") return consumer # noinspection PyUnusedLocal @staticmethod @inject def _get_publisher( publisher: Optional[BrokerPublisher] = None, broker_publisher: BrokerPublisher = Provide["broker_publisher"], **kwargs, ) -> BrokerPublisher: if publisher is None: publisher = broker_publisher if publisher is None or isinstance(publisher, Provide): raise NotProvidedException(f"A {BrokerPublisher!r} object must be provided.") return publisher async def _create_instance(self) -> DynamicBroker: topic = str(uuid4()).replace("-", "") await self._create_reply_topic(topic) await self._subscribe_reply_topic(topic) instance = DynamicBroker.from_config(self.config, topic=topic, publisher=self.publisher) await instance.setup() return instance async def _destroy_instance(self, instance: DynamicBroker): await instance.destroy() await self._unsubscribe_reply_topic(instance.topic) await self._delete_reply_topic(instance.topic) async def _create_reply_topic(self, topic: str) -> None: logger.info(f"Creating {topic!r} topic...") self.client.create_topics([NewTopic(name=topic, num_partitions=1, replication_factor=1)]) async def _delete_reply_topic(self, topic: str) -> None: logger.info(f"Deleting {topic!r} topic...") self.client.delete_topics([topic]) async def _subscribe_reply_topic(self, topic: str) -> None: await self.consumer.add_topic(topic) async def _unsubscribe_reply_topic(self, topic: str) -> None: await self.consumer.remove_topic(topic)
[docs] def acquire(self, *args, **kwargs) -> AsyncContextManager: """Acquire a new instance wrapped on an asynchronous context manager. :return: An asynchronous context manager. """ return _ReplyTopicContextManager(super().acquire())
class _ReplyTopicContextManager: _token: Optional[Token] def __init__(self, wrapper: AsyncContextManager[DynamicBroker]): self.wrapper = wrapper self._token = None async def __aenter__(self) -> DynamicBroker: handler = await self.wrapper.__aenter__() self._token = REQUEST_REPLY_TOPIC_CONTEXT_VAR.set(handler.topic) return handler async def __aexit__(self, exc_type, exc_val, exc_tb): REQUEST_REPLY_TOPIC_CONTEXT_VAR.reset(self._token) await self.wrapper.__aexit__(exc_type, exc_val, exc_tb)