Source code for minos.networks.rest.handlers

from __future__ import (
    annotations,
)

import logging
from functools import (
    cached_property,
    wraps,
)
from inspect import (
    isawaitable,
)
from typing import (
    Awaitable,
    Callable,
    Optional,
    Union,
)

from aiohttp import (
    web,
)

from minos.common import (
    MinosConfig,
    MinosSetup,
)

from ..decorators import (
    EnrouteBuilder,
)
from ..requests import (
    REQUEST_USER_CONTEXT_VAR,
    Response,
    ResponseException,
)
from .requests import (
    RestRequest,
    RestResponse,
)

logger = logging.getLogger(__name__)


[docs]class RestHandler(MinosSetup): """Rest Handler class."""
[docs] def __init__(self, host: str, port: int, endpoints: dict[(str, str), Callable], **kwargs): super().__init__(**kwargs) self._host = host self._port = port self._endpoints = endpoints
@property def endpoints(self) -> dict[(str, str), Callable]: """Endpoints getter. :return: A dictionary value. """ return self._endpoints @classmethod def _from_config(cls, *args, config: MinosConfig, **kwargs) -> RestHandler: host = config.rest.host port = config.rest.port endpoints = cls._endpoints_from_config(config) return cls(host=host, port=port, endpoints=endpoints, **kwargs) @staticmethod def _endpoints_from_config(config: MinosConfig, **kwargs) -> dict[(str, str), Callable]: builder = EnrouteBuilder(*config.services, middleware=config.middleware) decorators = builder.get_rest_command_query(config=config, **kwargs) endpoints = {(decorator.url, decorator.method): fn for decorator, fn in decorators.items()} return endpoints @property def host(self) -> str: """Get the rest host. :return: A ``str`` object. """ return self._host @property def port(self) -> int: """Get the rest port. :return: An integer value. """ return self._port
[docs] def get_app(self) -> web.Application: """Return rest application instance. :return: A `web.Application` instance. """ return self._app
@cached_property def _app(self) -> web.Application: app = web.Application() self._mount_routes(app) return app def _mount_routes(self, app: web.Application): """Load routes from config file.""" for (url, method), fn in self._endpoints.items(): self._mount_one_route(method, url, fn, app) # Load default routes self._mount_system_health(app) def _mount_one_route(self, method: str, url: str, action: Callable, app: web.Application) -> None: handler = self.get_callback(action) app.router.add_route(method, url, handler)
[docs] @staticmethod def get_callback( fn: Callable[[RestRequest], Union[Optional[RestResponse], Awaitable[Optional[RestResponse]]]] ) -> Callable[[web.Request], Awaitable[web.Response]]: """Get the handler function to be used by the ``aiohttp`` Controller. :param fn: The action function. :return: A wrapper function around the given one that is compatible with the ``aiohttp`` Controller. """ @wraps(fn) async def _wrapper(request: web.Request) -> web.Response: logger.info(f"Dispatching '{request!s}' from '{request.remote!s}'...") request = RestRequest(request) token = REQUEST_USER_CONTEXT_VAR.set(request.user) try: response = fn(request) if isawaitable(response): response = await response if isinstance(response, Response): response = await response.raw_content() if response is None: return web.json_response() return web.json_response(response) except ResponseException as exc: logger.warning(f"Raised an application exception: {exc!s}") raise web.HTTPBadRequest(text=str(exc)) except Exception as exc: logger.exception(f"Raised a system exception: {exc!r}") raise web.HTTPInternalServerError() finally: REQUEST_USER_CONTEXT_VAR.reset(token) return _wrapper
def _mount_system_health(self, app: web.Application): """Mount System Health Route.""" app.router.add_get("/system/health", self._system_health_handler) @staticmethod async def _system_health_handler(request: web.Request) -> web.Response: """System Health Route Handler. :return: A `web.json_response` response. """ logger.info(f"Dispatching '{request!s}' from '{request.remote!s}'...") return web.json_response({"host": request.host})