Source code for minos.networks.discovery.connectors

from __future__ import (
    annotations,
)

import logging
from inspect import (
    isclass,
)
from itertools import (
    chain,
)
from operator import (
    itemgetter,
)
from typing import (
    Any,
    Type,
)

from minos.common import (
    MinosConfig,
    MinosImportException,
    MinosSetup,
    import_module,
)

from ..decorators import (
    EnrouteAnalyzer,
)
from ..exceptions import (
    MinosInvalidDiscoveryClient,
)
from ..utils import (
    get_host_ip,
)
from .clients import (
    DiscoveryClient,
)

logger = logging.getLogger(__name__)


[docs]class DiscoveryConnector(MinosSetup): """Discovery Connector class."""
[docs] def __init__(self, client, name: str, host: str, port: int, endpoints: list[dict[str, Any]], *args, **kwargs): super().__init__(*args, **kwargs) self.client = client self.name = name self.host = host self.port = port self.endpoints = endpoints
@classmethod def _from_config(cls, *args, config: MinosConfig, **kwargs) -> DiscoveryConnector: client_cls = cls._client_cls_from_config(config) client = client_cls(host=config.discovery.host, port=config.discovery.port) port = config.rest.port name = config.service.name host = get_host_ip() endpoints = cls._endpoints_from_config(config) return cls(client, name, host, port, endpoints, *args, **kwargs) @staticmethod def _client_cls_from_config(config: MinosConfig) -> Type[DiscoveryClient]: try: # noinspection PyTypeChecker client_cls: type = import_module(config.discovery.client) except MinosImportException: raise MinosInvalidDiscoveryClient(f"{config.discovery.client} could not be imported.") if not isclass(client_cls) or not issubclass(client_cls, DiscoveryClient): raise MinosInvalidDiscoveryClient(f"{config.discovery.client} not supported.") return client_cls @staticmethod def _endpoints_from_config(config: MinosConfig) -> list[dict[str, Any]]: endpoints = list() for name in config.services: decorators = EnrouteAnalyzer(name, config).get_rest_command_query() endpoints += [ {"url": decorator.url, "method": decorator.method} for decorator in set(chain(*decorators.values())) ] endpoints.sort(key=itemgetter("url", "method")) return endpoints async def _setup(self) -> None: await self.subscribe()
[docs] async def subscribe(self) -> None: """Send a subscribe operation to the discovery. :return: This method does not return anything. """ logger.info("Performing discovery subscription...") await self.client.subscribe(self.host, self.port, self.name, self.endpoints)
async def _destroy(self) -> None: await self.unsubscribe()
[docs] async def unsubscribe(self) -> None: """Send an unsubscribe operation to the discovery. :return: This method does not return anything. """ logger.info("Performing discovery unsubscription...") await self.client.unsubscribe(self.name)