Source code for minos.common.configuration.config

from __future__ import (
    annotations,
)

import abc
import os
from collections import (
    namedtuple,
)
from pathlib import (
    Path,
)
from typing import (
    Any,
    Union,
)

import yaml

from ..exceptions import (
    MinosConfigException,
)

BROKER = namedtuple("Broker", "host port queue")
QUEUE = namedtuple("Queue", "database user password host port records retry")
SERVICE = namedtuple("Service", "name aggregate injections services")
STORAGE = namedtuple("Storage", "path")

SAGA = namedtuple("Saga", "storage")
REST = namedtuple("Rest", "host port")
REPOSITORY = namedtuple("Repository", "database user password host port")
SNAPSHOT = namedtuple("Snapshot", "database user password host port")
DISCOVERY = namedtuple("Discovery", "client host port")

_ENVIRONMENT_MAPPER = {
    "service.name": "MINOS_SERVICE_NAME",
    "rest.host": "MINOS_REST_HOST",
    "rest.port": "MINOS_REST_PORT",
    "broker.host": "MINOS_BROKER_HOST",
    "broker.port": "MINOS_BROKER_PORT",
    "broker.queue.host": "MINOS_BROKER_QUEUE_HOST",
    "broker.queue.port": "MINOS_BROKER_QUEUE_PORT",
    "broker.queue.database": "MINOS_BROKER_QUEUE_DATABASE",
    "broker.queue.user": "MINOS_BROKER_QUEUE_USER",
    "broker.queue.password": "MINOS_BROKER_QUEUE_PASSWORD",
    "commands.service": "MINOS_COMMANDS_SERVICE",
    "queries.service": "MINOS_QUERIES_SERVICE",
    "repository.host": "MINOS_REPOSITORY_HOST",
    "repository.port": "MINOS_REPOSITORY_PORT",
    "repository.database": "MINOS_REPOSITORY_DATABASE",
    "repository.user": "MINOS_REPOSITORY_USER",
    "repository.password": "MINOS_REPOSITORY_PASSWORD",
    "snapshot.host": "MINOS_SNAPSHOT_HOST",
    "snapshot.port": "MINOS_SNAPSHOT_PORT",
    "snapshot.database": "MINOS_SNAPSHOT_DATABASE",
    "snapshot.user": "MINOS_SNAPSHOT_USER",
    "snapshot.password": "MINOS_SNAPSHOT_PASSWORD",
    "discovery.client": "MINOS_DISCOVERY_CLIENT",
    "discovery.host": "MINOS_DISCOVERY_HOST",
    "discovery.port": "MINOS_DISCOVERY_PORT",
}

_PARAMETERIZED_MAPPER = {
    "service.name": "service_name",
    "rest.host": "rest_host",
    "rest.port": "rest_port",
    "broker.host": "broker_host",
    "broker.port": "broker_port",
    "broker.queue.host": "broker_queue_host",
    "broker.queue.port": "broker_queue_port",
    "broker.queue.database": "broker_queue_database",
    "broker.queue.user": "broker_queue_user",
    "broker.queue.password": "broker_queue_password",
    "commands.service": "commands_service",
    "queries.service": "queries_service",
    "saga.broker": "saga_broker",
    "saga.port": "saga_port",
    "repository.host": "repository_host",
    "repository.port": "repository_port",
    "repository.database": "repository_database",
    "repository.user": "repository_user",
    "repository.password": "repository_password",
    "snapshot.host": "snapshot_host",
    "snapshot.port": "snapshot_port",
    "snapshot.database": "snapshot_database",
    "snapshot.user": "snapshot_user",
    "snapshot.password": "snapshot_password",
    "discovery.client": "minos_discovery_client",
    "discovery.host": "minos_discovery_host",
    "discovery.port": "minos_discovery_port",
}


[docs]class MinosConfigAbstract(abc.ABC): """Minos abstract config class.""" __slots__ = "_services", "_path"
[docs] def __init__(self, path: Union[Path, str]): if isinstance(path, str): path = Path(path) self._services = {} self._path = path self._load(path)
@abc.abstractmethod def _load(self, path: Path) -> None: raise NotImplementedError @abc.abstractmethod def _get(self, key: str, **kwargs: Any): raise NotImplementedError
[docs]class MinosConfig(MinosConfigAbstract): """ A Minos configuration provides information on the connection points available at that service. It consists of the following parts: - Service meta-information (such as name, or version). - REST Service endpoints available. - Repository database connection for event sourcing. - Snapshot database connection. - Events it publishes/consumes from de given Kafka service. - Commands it reacts to from other microservices. - Sagas it takes part on. """ __slots__ = ("_data", "_with_environment", "_parameterized")
[docs] def __init__(self, path: Path | str, with_environment: bool = True, **kwargs): super().__init__(path) self._with_environment = with_environment self._parameterized = kwargs
def _load(self, path: Path) -> None: if not path.exists(): raise MinosConfigException(f"Check if this path: {path} is correct") with path.open() as file: self._data = yaml.load(file, Loader=yaml.FullLoader) def _get(self, key: str, **kwargs: Any) -> Any: if key in _PARAMETERIZED_MAPPER and _PARAMETERIZED_MAPPER[key] in self._parameterized: return self._parameterized[_PARAMETERIZED_MAPPER[key]] if self._with_environment and key in _ENVIRONMENT_MAPPER and _ENVIRONMENT_MAPPER[key] in os.environ: return os.environ[_ENVIRONMENT_MAPPER[key]] def _fn(k: str, data: dict[str, Any]) -> Any: current, _, following = k.partition(".") part = data[current] if not following: return part return _fn(following, part) try: return _fn(key, self._data) except Exception: raise MinosConfigException(f"{key!r} field is not defined on the configuration!") @property def service(self) -> SERVICE: """Get the service config. :return: A ``SERVICE`` NamedTuple instance. """ return SERVICE( name=self._get("service.name"), aggregate=self._get("service.aggregate"), injections=self._service_injections, services=self._service_services, ) @property def _service_injections(self) -> dict[str, str]: try: return self._get("service.injections") except MinosConfigException: return dict() @property def _service_services(self) -> list[str]: try: return self._get("service.services") except MinosConfigException: return list() @property def rest(self) -> REST: """Get the rest config. :return: A ``REST`` NamedTuple instance. """ return REST(host=self._get("rest.host"), port=int(self._get("rest.port"))) @property def broker(self) -> BROKER: """Get the events config. :return: A ``EVENTS`` NamedTuple instance. """ queue = self._broker_queue return BROKER(host=self._get("broker.host"), port=self._get("broker.port"), queue=queue) @property def _broker_queue(self) -> QUEUE: return QUEUE( database=self._get("broker.queue.database"), user=self._get("broker.queue.user"), password=self._get("broker.queue.password"), host=self._get("broker.queue.host"), port=int(self._get("broker.queue.port")), records=int(self._get("broker.queue.records")), retry=int(self._get("broker.queue.retry")), ) @property def services(self) -> list[str]: """Get the commands config. :return: A list containing the service class names as string values.. """ try: return self._get("services") except MinosConfigException: return list() @property def middleware(self) -> list[str]: """Get the commands config. :return: A list containing the service class names as string values.. """ try: return self._get("middleware") except MinosConfigException: return list() @property def saga(self) -> SAGA: """Get the sagas config. :return: A ``SAGAS`` NamedTuple instance. """ storage = self._saga_storage return SAGA(storage=storage) @property def _saga_storage(self) -> STORAGE: raw = self._get("saga.storage.path") path = Path(raw) if raw.startswith("/") else self._path.parent / raw queue = STORAGE(path=path) return queue @property def repository(self) -> REPOSITORY: """Get the repository config. :return: A ``REPOSITORY`` NamedTuple instance. """ return REPOSITORY( database=self._get("repository.database"), user=self._get("repository.user"), password=self._get("repository.password"), host=self._get("repository.host"), port=int(self._get("repository.port")), ) @property def snapshot(self) -> SNAPSHOT: """Get the snapshot config. :return: A ``SNAPSHOT`` NamedTuple instance. """ return SNAPSHOT( database=self._get("snapshot.database"), user=self._get("snapshot.user"), password=self._get("snapshot.password"), host=self._get("snapshot.host"), port=int(self._get("snapshot.port")), ) @property def discovery(self) -> DISCOVERY: """Get the sagas config. :return: A ``DISCOVERY`` NamedTuple instance. """ client = self._get("discovery.client") host = self._get("discovery.host") port = self._get("discovery.port") return DISCOVERY(client=client, host=host, port=port)