Source code for minos.networks.discovery.clients.abc

import logging
from abc import (
    ABC,
    abstractmethod,
)
from asyncio import (
    sleep,
)

import aiohttp

from ...exceptions import (
    MinosDiscoveryConnectorException,
)

logger = logging.getLogger(__name__)


[docs]class DiscoveryClient(ABC): """Minos Discovery Client class."""
[docs] def __init__(self, host: str, port: int): self.host = host self.port = port
@property def route(self) -> str: """Get the full http route to the repository. :return: An ``str`` value. """ # noinspection HttpUrlsUsage return f"http://{self.host}:{self.port}"
[docs] @abstractmethod async def subscribe( self, host: str, port: int, name: str, endpoints: list[dict[str, str]], retry_tries: int = 3, retry_delay: float = 5, ) -> None: """ Perform a subscription query. :param host: The ip of the microservice to be subscribed. :param port: The port of the microservice to be subscribed. :param name: The name of the microservice to be subscribed. :param endpoints: List of endpoints exposed by the microservice. :param retry_tries: Number of attempts before raising a failure exception. :param retry_delay: Seconds to wait between attempts. :return: This method does not return anything. """
async def _rest_subscribe( self, endpoint: str, service_metadata: dict[str, str], host: str, port: int, name: str, endpoints: list[dict[str, str]], retry_tries: int = 3, retry_delay: float = 5, ) -> None: logger.debug(f"Subscribing into {endpoint!r}...") try: async with aiohttp.ClientSession() as session: async with session.post(endpoint, json=service_metadata) as response: success = response.ok except Exception as exc: logger.warning(f"An exception was raised while trying to subscribe: {exc!r}") success = False if not success: if retry_tries > 1: await sleep(retry_delay) return await self.subscribe(host, port, name, endpoints, retry_tries - 1, retry_delay) else: raise MinosDiscoveryConnectorException("There was a problem while trying to subscribe.")
[docs] @abstractmethod async def unsubscribe(self, name: str, retry_tries: int = 3, retry_delay: float = 5) -> None: """ Perform an unsubscribe query. :param name: The name of the microservice to be unsubscribed. :param retry_tries: Number of attempts before raising a failure exception. :param retry_delay: Seconds to wait between attempts. :return: This method does not return anything. """
async def _rest_unsubscribe(self, endpoint: str, name: str, retry_tries: int = 3, retry_delay: float = 5) -> None: logger.debug(f"Unsubscribing into {endpoint!r}...") try: async with aiohttp.ClientSession() as session: async with session.delete(endpoint) as response: success = response.ok except Exception as exc: logger.warning(f"An exception was raised while trying to unsubscribe: {exc!r}") success = False if not success: if retry_tries > 1: await sleep(retry_delay) return await self.unsubscribe(name, retry_tries - 1, retry_delay) else: raise MinosDiscoveryConnectorException("There was a problem while trying to unsubscribe.")