Exposing Information: Query Service

Introduction

After being described how to define the aggregate in the Modeling the Domain: Aggregates section and being exposed some operations that change its state in the Exposing Operations: Command Service section, so the next step is to describe how to expose queries. As said in previous sections, the minos framework implements the CQRS pattern, in which the Query Model is decoupled from the Command Model, allowing to orient each one to its specific purpose and together giving the most of themselves.

So, the QueryService can be seen as an independent component with respect to the rest of the microservice, allowing to externalize it without big effort. So, it should use a dedicated database, different from the one that stores the AggregateDiff events, the Aggregate snapshots and so on. Then, at this point the main question is how to populate the QueryService database without using another components of the microservice nor accessing the main database. Here, the answer is through event subscription, that provides a decoupled way to stay informed about any change produced over the Aggregates defined both on the target microservice and in external ones.

The QueryService workflow can be seen as a set of writing operations that updates the database, that are launched when specific AggregateDiff events are published, and a set of read operations that queries the database, that are launched when another microservices or external clients perform query operations.

One important thing to notice at this point is the consistency degree of the QueryService respect to the Aggregate state and their stored events: As the way to update the query database is based on event subscription, which generates a certain amount of delay between the publication of the event and the reception, a strong consistency cannot be guaranteed. Alternatively, eventual consistency is the offered guarantee, that means that the QueryService is consistent respect to the received AggregateDiff events.

Setting up the environment!

After being described the basic concepts about how the QueryService works, the next step is start setting up the one for the exam microservice. The first part is to add the configuration parameters into the config.yml file (note that the ones related with interfaces are shared with the CommandService):

# config.yml
service:
  injections:
    query_repository: src.queries.ExamQueryRepository
rest:
  host: 0.0.0.0
  port: 8082
broker:
  host: localhost
  port: 9092
  queue:
    database: exam_db
    user: minos
    password: min0s
    host: localhost
    port: 5432
    records: 1000
    retry: 2
queries:
  service: src.queries.ExamQueryService
...

After being updated the configuration file, the next step is to create the structure of the query service, which will be defined into src/queries.py. To keep the code organized, the chosen pattern in this case is to split all the logic into two parts: One related with input and output parsing, performed by the ExamQueryService, and a second one related with database manipulation, performed by the ExamQueryRepository.

The ExamQueryService inherits the minos.cqrs.QueryService that is a must condition to implement a query service, and the ExamQueryRepository inherits from the minos.common.MinosSetup class, who provides some interesting utility methods, like the setup, and destroy, really useful for creating and destroying database connections together with the repository instance. Another important detail is that the ExamQueryRepository will be injected as a dependency injection (as it is defined in the configuration file).

So, the src/queries.py file skeleton will be similar to:

from minos.common import (
    MinosSetup,
)
from minos.cqrs import (
    QueryService,
)


class ExamQueryRepository(MinosSetup):
    ...


class ExamQueryService(QueryService):
    ...

Adding initializations logic…

After being defined the ExamQueryService and ExamQueryRepository classes, the next step is to start writing the initialization code. The ExamQueryService is the easier one, as it only needs to store the repository instance given from the dependency injection (note that the dependency_injector package is being used here, so it must be added as a dependency: e.g. poetry add dependency-injector):

from dependency_injector.wiring import (
    Provide,
    inject,
)


class ExamQueryService(QueryService):

    @inject
    def __init__(self, repository: ExamQueryRepository = Provide["query_repository"], *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.repository = repository

    ...

The case of the ExamQueryRepository requires a bit more attention as it’s needed to setup the database connections and also provide the initialization logic composed of table creations, etc. Here a sqlite3 database is chosen to store a simple exams table composed by three columns: uuid, duration and subject.

An interesting detail here is the use of the _setup and _destroy methods to create and clean the database connection, and also create the database table. These methods are called by the minos.common.DependencyInjector before and after performing the injection itself.

import sqlite3


class ExamQueryRepository(MinosSetup):

    def __init__(self, database: str = "exam.sqlite", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.database = database
        self.connection = None

    async def _setup(self) -> None:
        self.connection = sqlite3.connect(self.database)
        cursor = self.connection.cursor()
        cursor.execute("CREATE TABLE IF NOT EXISTS exams (uuid TEXT PRIMARY KEY, duration INT, subject TEXT)")
        cursor.close()

    async def _destroy(self) -> None:
        if self.connection is not None:
            self.connection.close()
            self.connection = None

    ...

Important: For the sake of simplicity, here a sqlite database is being used, but in real environments another database systems are highly recommended.

After being added the logic to create and destroy the ExamQueryService and ExamQueryRepository, everything is ready to start handling AggregateDiff events and exposing queries to be used by another microservices and external clients.

Handling events and updating the database…

The way to handle events by the QueryService is mostly equal to the way to handle them by the CommandService, as it was described in Exposing Operations: Command Service section. The main difference is that in the CommandService the expected behaviour is to perform an action that change some Aggregate instances (and generate a set of AggregateDiff events) but here, in the QueryService, the expected behaviour is to update the query database.

The events to be handled in this case will be following: ExamCreated, ExamUpdated.duration, ExamUpdated.subject, ExamDeleted. Note that the update events has an ending qualifier that indicates to what field changes the event should refer. This is an interesting feature provided by the minos framework that is very useful at some cases, especially when the event to be subscribed is related to an external aggregate and only a specific part of it is needed.

So, the event handling methods will be similar to:

class ExamQueryService(QueryService):
    ...

    @enroute.broker.event("ExamCreated")
    async def exam_created(self, request: Request) -> None:
        diff = await request.content(resolve_references=False)

        try:
            uuid, duration, subject = diff.uuid, diff.get_one("duration"), diff.get_one("subject")
        except Exception:
            raise ResponseException("Some fields could not be extracted.")

        self.repository.add(uuid, duration, subject)

    @enroute.broker.event("ExamUpdated.duration")
    async def exam_duration_updated(self, request: Request) -> None:
        diff = await request.content()

        try:
            uuid, duration = diff.uuid, diff.get_one("duration")
        except Exception as exc:
            raise ResponseException(f"Some fields could not be extracted: {exc}")

        self.repository.update_duration(uuid, duration)

    @enroute.broker.event("ExamUpdated.subject")
    async def exam_subject_updated(self, request: Request) -> None:
        diff = await request.content(resolve_references=False)

        try:
            uuid, subject = diff.uuid, diff.get_one("subject")
        except Exception as exc:
            raise ResponseException(f"Some fields could not be extracted: {exc}")

        self.repository.update_subject(uuid, subject)

    @enroute.broker.event("ExamDeleted")
    async def exam_deleted(self, request: Request) -> None:
        diff = await request.content()

        try:
            uuid = diff.uuid
        except Exception as exc:
            raise ResponseException(f"Some fields could not be extracted: {exc}")

        self.repository.delete(uuid)

[TODO: explain how references resolving works.]

After being subscribed to some events, let’s add the repository methods to store the information contained into them on the database. One interesting detail here is that the repository has the responsibility to parse the given parameters into a format that can be stored on the specific database system. This is a good strategy that simplifies the process to migrate to another database system that may support more advanced types.

class ExamQueryRepository(MinosSetup):

    ...

    def add(self, uuid: UUID, duration: timedelta, subject: UUID)-> None:
        params = (str(uuid), duration // timedelta(microseconds=1), str(subject))
        cursor = self.connection.cursor()
        cursor.execute("INSERT INTO exams (uuid, duration, subject) VALUES (?, ?, ?)", params)
        cursor.close()
        self.connection.commit()

    def update_duration(self, uuid: UUID, duration: timedelta)-> None:
        params = (duration // timedelta(microseconds=1), str(uuid))
        cursor = self.connection.cursor()
        cursor.execute("UPDATE exams SET duration = ? WHERE uuid = ?", params)
        self.connection.commit()

    def update_subject(self, uuid: UUID, subject: UUID)-> None:
        params = (str(subject), str(uuid))
        cursor = self.connection.cursor()
        cursor.execute("UPDATE exams SET subject = ? WHERE uuid = ?", params)
        cursor.close()
        self.connection.commit()

    def delete(self, uuid: UUID) -> None:
        params = (str(uuid), )
        cursor = self.connection.cursor()
        cursor.execute("DELETE FROM table WHERE uuid = ?", params)
        cursor.close()
        self.connection.commit()

So, after being defined the event handling in the ExamQueryService and the logic to store the events’ information into the database in the ExamQueryRepository, the final step is to start writing the queries to be exposed externally.

Defining queries and reading the database…

The last and more interesting part of the Query Service definition is the query building process as it’s the tangible result of the implementation from an external perspective. In summary, is what another microservices an external clients will use. One important thing to notice is that the query database should be oriented as much as possible on being highly efficient on these operations. It’s true that faster event handling is better for the microservice performance, but that won’t as much impact on the user experience as the query handling. For this reason, when tradeoffs raise while building Query Services, the priority should be oriented into faster query handling over event handling.

For the sake of simplicity, the ExamQueryService will include two query examples: the first one will retrieve a list of exam identifiers sorted by the max duration and limited by a given count, the second query will retrieve the list of subject identifiers sorted by the exams count and limited by a given count.

As in the case of the event handling methods, the responsibility of the query handling method is to parse the given Request, prepare the parameters to the ExamQueryRepository and build the returned Response. So the code should be similar to:

class ExamQueryService(QueryService):
    ...

    @enroute.rest.query("/exams/q/exams-with-more-duration", "GET")
    @enroute.broker.query("GetExamsWithMoreDuration")
    async def exams_with_more_duration(self, request: Request) -> Response:
        content = await request.content()

        try:
            limit = content["limit"]
        except KeyError:
            limit = 100

        result = self.repository.get_longest_exams(limit)
        return Response(result)

    @enroute.rest.query("/exams/q/subjects-with-more-exams", "GET")
    @enroute.broker.query("GetSubjectsWithMoreExams")
    async def subjects_with_more_exams(self, request: Request) -> Response:
        content = await request.content()

        try:
            limit = content["limit"]
        except KeyError:
            limit = 100

        result = self.repository.get_subjects_with_more_exams(limit)
        return Response(result)

    ...

Before start defining the ExamQueryRepository methods, it’s important to give a brief explanation about an interesting minos feature, that is the ModelType class. This type class can be seen as a namedtuple with typing and serialization superpowers among other interesting features. In this case, it’s used to define the structure of the response objects naming them as ExamDurationDTO and SubjectExamsDTO (appending the trailing DTO suffix is a common naming convention for these cases and states for Data Transfer Object):

ExamDurationDTO = ModelType.build("ExamDurationDTO", {"exam": UUID, "duration": timedelta})
SubjectExamsDTO = ModelType.build("SubjectExamsDTO", {"subject": UUID, "exams": int})

After being introduced the ModelType class, the final step is to define the ExamQueryRepository methods to be used by the query handling ones. Here, both the get_longest_exams and get_subjects_with_more_exams performs a database query and return the parsed values as lists of instances using the corresponding ModelTypes. The methods should be similar to:

class ExamQueryRepository(MinosSetup):

    ...

    def get_longest_exams(self, limit: int) -> list[ExamDurationDTO]:
        params = (limit, )
        cursor = self.connection.cursor()
        cursor.execute("SELECT uuid, duration FROM exams ORDER BY duration DESC LIMIT ?", params)
        rows = cursor.fetchall()
        cursor.close()

        return [ExamDurationDTO(*row) for row in rows]

    def get_subjects_with_more_exams(self, limit: int) -> list[SubjectExamsDTO]:
        params = (limit, )
        cursor = self.connection.cursor()
        cursor.execute(
            "SELECT subject, COUNT(uuid) AS exams FROM exams GROUP BY subject ORDER BY exams DESC LIMIT ?", params
        )
        rows = cursor.fetchall()
        cursor.close()
        return [SubjectExamsDTO(*row) for row in rows]

    ...

Summary

After being described step by step the main features of the minos.cqrs.QueryService class and the need to have an auxiliary Query Repository class to keep a consistent separation between the request parsing and the database manipulation, here is a full snapshot of the resulting src/queries.py file:

"""src/queries.py"""

import sqlite3
from datetime import (
    timedelta
)
from uuid import (
    UUID,
)
from dependency_injector.wiring import (
    inject,
    Provide,
)
from minos.common import (
    ModelType,
    MinosSetup,
)
from minos.cqrs import (
    QueryService,
)
from minos.networks import (
    Request,
    Response,
    ResponseException,
    enroute,
)


ExamDurationDTO = ModelType.build("ExamDurationDTO", {"exam": UUID, "duration": timedelta})
SubjectExamsDTO = ModelType.build("SubjectExamsDTO", {"subject": UUID, "exams": int})


class ExamQueryRepository(MinosSetup):

    def __init__(self, database: str = "exam.sqlite", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.database = database
        self.connection = None

    async def _setup(self) -> None:
        self.connection = sqlite3.connect(self.database)
        cursor = self.connection.cursor()
        cursor.execute("CREATE TABLE IF NOT EXISTS exams (uuid TEXT PRIMARY KEY, duration INT, subject TEXT)")
        cursor.close()

    async def _destroy(self) -> None:
        if self.connection is not None:
            self.connection.close()
            self.connection = None

    def get_longest_exams(self, limit: int) -> list[ExamDurationDTO]:
        params = (limit, )
        cursor = self.connection.cursor()
        cursor.execute("SELECT uuid, duration FROM exams ORDER BY duration DESC LIMIT ?", params)
        rows = cursor.fetchall()
        cursor.close()

        return [ExamDurationDTO(*row) for row in rows]

    def get_subjects_with_more_exams(self, limit: int) -> list[SubjectExamsDTO]:
        params = (limit, )
        cursor = self.connection.cursor()
        cursor.execute(
            "SELECT subject, COUNT(uuid) AS exams FROM exams GROUP BY subject ORDER BY exams DESC LIMIT ?", params
        )
        rows = cursor.fetchall()
        cursor.close()
        return [SubjectExamsDTO(*row) for row in rows]

    def add(self, uuid: UUID, duration: timedelta, subject: UUID)-> None:
        params = (str(uuid), duration // timedelta(microseconds=1), str(subject))
        cursor = self.connection.cursor()
        cursor.execute("INSERT INTO exams (uuid, duration, subject) VALUES (?, ?, ?)", params)
        cursor.close()
        self.connection.commit()

    def update_duration(self, uuid: UUID, duration: timedelta)-> None:
        params = (duration // timedelta(microseconds=1), str(uuid))
        cursor = self.connection.cursor()
        cursor.execute("UPDATE exams SET duration = ? WHERE uuid = ?", params)
        self.connection.commit()

    def update_subject(self, uuid: UUID, subject: UUID)-> None:
        params = (str(subject), str(uuid))
        cursor = self.connection.cursor()
        cursor.execute("UPDATE exams SET subject = ? WHERE uuid = ?", params)
        cursor.close()
        self.connection.commit()

    def delete(self, uuid: UUID) -> None:
        params = (str(uuid), )
        cursor = self.connection.cursor()
        cursor.execute("DELETE FROM table WHERE uuid = ?", params)
        cursor.close()
        self.connection.commit()


class ExamQueryService(QueryService):

    @inject
    def __init__(self, repository: ExamQueryRepository = Provide["query_repository"], *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.repository = repository

    @enroute.rest.query("/exams/q/exams-with-more-duration", "GET")
    @enroute.broker.query("GetExamsWithMoreDuration")
    async def exams_with_more_duration(self, request: Request) -> Response:
        content = await request.content()

        try:
            limit = content["limit"]
        except KeyError:
            limit = 100

        result = self.repository.get_longest_exams(limit)
        return Response(result)

    @enroute.rest.query("/exams/q/subjects-with-more-exams", "GET")
    @enroute.broker.query("GetSubjectsWithMoreExams")
    async def subjects_with_more_exams(self, request: Request) -> Response:
        content = await request.content()

        try:
            limit = content["limit"]
        except KeyError:
            limit = 100

        result = self.repository.get_subjects_with_more_exams(limit)
        return Response(result)

    @enroute.broker.event("ExamCreated")
    async def exam_created(self, request: Request) -> None:
        diff = await request.content(resolve_references=False)

        try:
            uuid, duration, subject = diff.uuid, diff.get_one("duration"), diff.get_one("subject")
        except Exception:
            raise ResponseException("Some fields could not be extracted.")

        self.repository.add(uuid, duration, subject)

    @enroute.broker.event("ExamUpdated.duration")
    async def exam_duration_updated(self, request: Request) -> None:
        diff = await request.content()

        try:
            uuid, duration = diff.uuid, diff.get_one("duration")
        except Exception as exc:
            raise ResponseException(f"Some fields could not be extracted: {exc}")

        self.repository.update_duration(uuid, duration)

    @enroute.broker.event("ExamUpdated.subject")
    async def exam_subject_updated(self, request: Request) -> None:
        diff = await request.content(resolve_references=False)

        try:
            uuid, subject = diff.uuid, diff.get_one("subject")
        except Exception as exc:
            raise ResponseException(f"Some fields could not be extracted: {exc}")

        self.repository.update_subject(uuid, subject)

    @enroute.broker.event("ExamDeleted")
    async def exam_deleted(self, request: Request) -> None:
        diff = await request.content()

        try:
            uuid = diff.uuid
        except Exception as exc:
            raise ResponseException(f"Some fields could not be extracted: {exc}")

        self.repository.delete(uuid)