Source code for minos.networks.discovery.clients.minos

import logging

from .abc import (
    DiscoveryClient,
)

logger = logging.getLogger(__name__)


[docs]class MinosDiscoveryClient(DiscoveryClient): """Minos Discovery Client class."""
[docs] async def subscribe( self, host: str, port: int, name: str, endpoints: list[dict[str, str]], retry_tries: int = 3, retry_delay: float = 5, ) -> None: """Perform a subscription query. :param host: The ip of the microservice to be subscribed. :param port: The port of the microservice to be subscribed. :param name: The name of the microservice to be subscribed. :param endpoints: List of endpoints exposed by the microservice. :param retry_tries: Number of attempts before raising a failure exception. :param retry_delay: Seconds to wait between attempts. :return: This method does not return anything. """ endpoint = f"{self.route}/microservices/{name}" service_metadata = { "address": host, "port": port, "endpoints": [[endpoint["method"], endpoint["url"]] for endpoint in endpoints], } await self._rest_subscribe(endpoint, service_metadata, host, port, name, endpoints, retry_tries, retry_delay)
[docs] async def unsubscribe(self, name: str, retry_tries: int = 3, retry_delay: float = 5) -> None: """Perform an unsubscribe query. :param name: The name of the microservice to be unsubscribed. :param retry_tries: Number of attempts before raising a failure exception. :param retry_delay: Seconds to wait between attempts. :return: This method does not return anything. """ endpoint = f"{self.route}/microservices/{name}" await self._rest_unsubscribe(endpoint, name, retry_tries, retry_delay)