Exposing Information: Query Service¶
Introduction¶
After being described how to define the aggregate in the Modeling the Domain: Aggregates section and being exposed some operations that change its state in the Exposing Operations: Command Service section, so the next step is to describe how to expose queries. As said in previous sections, the minos
framework implements the CQRS pattern, in which the Query Model is decoupled from the Command Model, allowing to orient each one to its specific purpose and together giving the most of themselves.
So, the QueryService
can be seen as an independent component with respect to the rest of the microservice, allowing to externalize it without big effort. So, it should use a dedicated database, different from the one that stores the AggregateDiff
events, the Aggregate
snapshots and so on. Then, at this point the main question is how to populate the QueryService
database without using another components of the microservice nor accessing the main database. Here, the answer is through event subscription, that provides a decoupled way to stay informed about any change produced over the Aggregate
s defined both on the target microservice and in external ones.
The QueryService
workflow can be seen as a set of writing operations that updates the database, that are launched when specific AggregateDiff
events are published, and a set of read operations that queries the database, that are launched when another microservices or external clients perform query operations.
One important thing to notice at this point is the consistency degree of the QueryService
respect to the Aggregate
state and their stored events: As the way to update the query database is based on event subscription, which generates a certain amount of delay between the publication of the event and the reception, a strong consistency cannot be guaranteed. Alternatively, eventual consistency is the offered guarantee, that means that the QueryService
is consistent respect to the received AggregateDiff
events.
Setting up the environment!¶
After being described the basic concepts about how the QueryService
works, the next step is start setting up the one for the exam
microservice. The first part is to add the configuration parameters into the config.yml
file (note that the ones related with interfaces are shared with the CommandService
):
# config.yml
service:
injections:
query_repository: src.queries.ExamQueryRepository
rest:
host: 0.0.0.0
port: 8082
broker:
host: localhost
port: 9092
queue:
database: exam_db
user: minos
password: min0s
host: localhost
port: 5432
records: 1000
retry: 2
queries:
service: src.queries.ExamQueryService
...
After being updated the configuration file, the next step is to create the structure of the query service, which will be defined into src/queries.py
. To keep the code organized, the chosen pattern in this case is to split all the logic into two parts: One related with input and output parsing, performed by the ExamQueryService
, and a second one related with database manipulation, performed by the ExamQueryRepository
.
The ExamQueryService
inherits the minos.cqrs.QueryService
that is a must condition to implement a query service, and the ExamQueryRepository
inherits from the minos.common.MinosSetup
class, who provides some interesting utility methods, like the setup
, and destroy
, really useful for creating and destroying database connections together with the repository instance. Another important detail is that the ExamQueryRepository
will be injected as a dependency injection (as it is defined in the configuration file).
So, the src/queries.py
file skeleton will be similar to:
from minos.common import (
MinosSetup,
)
from minos.cqrs import (
QueryService,
)
class ExamQueryRepository(MinosSetup):
...
class ExamQueryService(QueryService):
...
Adding initializations logic…¶
After being defined the ExamQueryService
and ExamQueryRepository
classes, the next step is to start writing the initialization code. The ExamQueryService
is the easier one, as it only needs to store the repository instance given from the dependency injection (note that the dependency_injector
package is being used here, so it must be added as a dependency: e.g. poetry add dependency-injector
):
from dependency_injector.wiring import (
Provide,
inject,
)
class ExamQueryService(QueryService):
@inject
def __init__(self, repository: ExamQueryRepository = Provide["query_repository"], *args, **kwargs):
super().__init__(*args, **kwargs)
self.repository = repository
...
The case of the ExamQueryRepository
requires a bit more attention as it’s needed to setup the database connections and also provide the initialization logic composed of table creations, etc. Here a sqlite3
database is chosen to store a simple exams
table composed by three columns: uuid
, duration
and subject
.
An interesting detail here is the use of the _setup
and _destroy
methods to create and clean the database connection, and also create the database table. These methods are called by the minos.common.DependencyInjector
before and after performing the injection itself.
import sqlite3
class ExamQueryRepository(MinosSetup):
def __init__(self, database: str = "exam.sqlite", *args, **kwargs):
super().__init__(*args, **kwargs)
self.database = database
self.connection = None
async def _setup(self) -> None:
self.connection = sqlite3.connect(self.database)
cursor = self.connection.cursor()
cursor.execute("CREATE TABLE IF NOT EXISTS exams (uuid TEXT PRIMARY KEY, duration INT, subject TEXT)")
cursor.close()
async def _destroy(self) -> None:
if self.connection is not None:
self.connection.close()
self.connection = None
...
Important: For the sake of simplicity, here a sqlite
database is being used, but in real environments another database systems are highly recommended.
After being added the logic to create and destroy the ExamQueryService
and ExamQueryRepository
, everything is ready to start handling AggregateDiff
events and exposing queries to be used by another microservices and external clients.
Handling events and updating the database…¶
The way to handle events by the QueryService
is mostly equal to the way to handle them by the CommandService
, as it was described in Exposing Operations: Command Service section. The main difference is that in the CommandService
the expected behaviour is to perform an action that change some Aggregate
instances (and generate a set of AggregateDiff
events) but here, in the QueryService
, the expected behaviour is to update the query database.
The events to be handled in this case will be following: ExamCreated
, ExamUpdated.duration
, ExamUpdated.subject
, ExamDeleted
. Note that the update events has an ending qualifier that indicates to what field changes the event should refer. This is an interesting feature provided by the minos
framework that is very useful at some cases, especially when the event to be subscribed is related to an external aggregate and only a specific part of it is needed.
So, the event handling methods will be similar to:
class ExamQueryService(QueryService):
...
@enroute.broker.event("ExamCreated")
async def exam_created(self, request: Request) -> None:
diff = await request.content(resolve_references=False)
try:
uuid, duration, subject = diff.uuid, diff.get_one("duration"), diff.get_one("subject")
except Exception:
raise ResponseException("Some fields could not be extracted.")
self.repository.add(uuid, duration, subject)
@enroute.broker.event("ExamUpdated.duration")
async def exam_duration_updated(self, request: Request) -> None:
diff = await request.content()
try:
uuid, duration = diff.uuid, diff.get_one("duration")
except Exception as exc:
raise ResponseException(f"Some fields could not be extracted: {exc}")
self.repository.update_duration(uuid, duration)
@enroute.broker.event("ExamUpdated.subject")
async def exam_subject_updated(self, request: Request) -> None:
diff = await request.content(resolve_references=False)
try:
uuid, subject = diff.uuid, diff.get_one("subject")
except Exception as exc:
raise ResponseException(f"Some fields could not be extracted: {exc}")
self.repository.update_subject(uuid, subject)
@enroute.broker.event("ExamDeleted")
async def exam_deleted(self, request: Request) -> None:
diff = await request.content()
try:
uuid = diff.uuid
except Exception as exc:
raise ResponseException(f"Some fields could not be extracted: {exc}")
self.repository.delete(uuid)
[TODO: explain how references resolving works.]
After being subscribed to some events, let’s add the repository methods to store the information contained into them on the database. One interesting detail here is that the repository has the responsibility to parse the given parameters into a format that can be stored on the specific database system. This is a good strategy that simplifies the process to migrate to another database system that may support more advanced types.
class ExamQueryRepository(MinosSetup):
...
def add(self, uuid: UUID, duration: timedelta, subject: UUID)-> None:
params = (str(uuid), duration // timedelta(microseconds=1), str(subject))
cursor = self.connection.cursor()
cursor.execute("INSERT INTO exams (uuid, duration, subject) VALUES (?, ?, ?)", params)
cursor.close()
self.connection.commit()
def update_duration(self, uuid: UUID, duration: timedelta)-> None:
params = (duration // timedelta(microseconds=1), str(uuid))
cursor = self.connection.cursor()
cursor.execute("UPDATE exams SET duration = ? WHERE uuid = ?", params)
self.connection.commit()
def update_subject(self, uuid: UUID, subject: UUID)-> None:
params = (str(subject), str(uuid))
cursor = self.connection.cursor()
cursor.execute("UPDATE exams SET subject = ? WHERE uuid = ?", params)
cursor.close()
self.connection.commit()
def delete(self, uuid: UUID) -> None:
params = (str(uuid), )
cursor = self.connection.cursor()
cursor.execute("DELETE FROM table WHERE uuid = ?", params)
cursor.close()
self.connection.commit()
So, after being defined the event handling in the ExamQueryService
and the logic to store the events’ information into the database in the ExamQueryRepository
, the final step is to start writing the queries to be exposed externally.
Defining queries and reading the database…¶
The last and more interesting part of the Query Service definition is the query building process as it’s the tangible result of the implementation from an external perspective. In summary, is what another microservices an external clients will use. One important thing to notice is that the query database should be oriented as much as possible on being highly efficient on these operations. It’s true that faster event handling is better for the microservice performance, but that won’t as much impact on the user experience as the query handling. For this reason, when tradeoffs raise while building Query Services, the priority should be oriented into faster query handling over event handling.
For the sake of simplicity, the ExamQueryService
will include two query examples: the first one will retrieve a list of exam identifiers sorted by the max duration and limited by a given count, the second query will retrieve the list of subject identifiers sorted by the exams count and limited by a given count.
As in the case of the event handling methods, the responsibility of the query handling method is to parse the given Request
, prepare the parameters to the ExamQueryRepository
and build the returned Response
. So the code should be similar to:
class ExamQueryService(QueryService):
...
@enroute.rest.query("/exams/q/exams-with-more-duration", "GET")
@enroute.broker.query("GetExamsWithMoreDuration")
async def exams_with_more_duration(self, request: Request) -> Response:
content = await request.content()
try:
limit = content["limit"]
except KeyError:
limit = 100
result = self.repository.get_longest_exams(limit)
return Response(result)
@enroute.rest.query("/exams/q/subjects-with-more-exams", "GET")
@enroute.broker.query("GetSubjectsWithMoreExams")
async def subjects_with_more_exams(self, request: Request) -> Response:
content = await request.content()
try:
limit = content["limit"]
except KeyError:
limit = 100
result = self.repository.get_subjects_with_more_exams(limit)
return Response(result)
...
Before start defining the ExamQueryRepository
methods, it’s important to give a brief explanation about an interesting minos
feature, that is the ModelType
class. This type
class can be seen as a namedtuple
with typing and serialization superpowers among other interesting features. In this case, it’s used to define the structure of the response objects naming them as ExamDurationDTO
and SubjectExamsDTO
(appending the trailing DTO
suffix is a common naming convention for these cases and states for Data Transfer Object):
ExamDurationDTO = ModelType.build("ExamDurationDTO", {"exam": UUID, "duration": timedelta})
SubjectExamsDTO = ModelType.build("SubjectExamsDTO", {"subject": UUID, "exams": int})
After being introduced the ModelType
class, the final step is to define the ExamQueryRepository
methods to be used by the query handling ones. Here, both the get_longest_exams
and get_subjects_with_more_exams
performs a database query and return the parsed values as lists of instances using the corresponding ModelType
s. The methods should be similar to:
class ExamQueryRepository(MinosSetup):
...
def get_longest_exams(self, limit: int) -> list[ExamDurationDTO]:
params = (limit, )
cursor = self.connection.cursor()
cursor.execute("SELECT uuid, duration FROM exams ORDER BY duration DESC LIMIT ?", params)
rows = cursor.fetchall()
cursor.close()
return [ExamDurationDTO(*row) for row in rows]
def get_subjects_with_more_exams(self, limit: int) -> list[SubjectExamsDTO]:
params = (limit, )
cursor = self.connection.cursor()
cursor.execute(
"SELECT subject, COUNT(uuid) AS exams FROM exams GROUP BY subject ORDER BY exams DESC LIMIT ?", params
)
rows = cursor.fetchall()
cursor.close()
return [SubjectExamsDTO(*row) for row in rows]
...
Summary¶
After being described step by step the main features of the minos.cqrs.QueryService
class and the need to have an auxiliary Query Repository class to keep a consistent separation between the request parsing and the database manipulation, here is a full snapshot of the resulting src/queries.py
file:
"""src/queries.py"""
import sqlite3
from datetime import (
timedelta
)
from uuid import (
UUID,
)
from dependency_injector.wiring import (
inject,
Provide,
)
from minos.common import (
ModelType,
MinosSetup,
)
from minos.cqrs import (
QueryService,
)
from minos.networks import (
Request,
Response,
ResponseException,
enroute,
)
ExamDurationDTO = ModelType.build("ExamDurationDTO", {"exam": UUID, "duration": timedelta})
SubjectExamsDTO = ModelType.build("SubjectExamsDTO", {"subject": UUID, "exams": int})
class ExamQueryRepository(MinosSetup):
def __init__(self, database: str = "exam.sqlite", *args, **kwargs):
super().__init__(*args, **kwargs)
self.database = database
self.connection = None
async def _setup(self) -> None:
self.connection = sqlite3.connect(self.database)
cursor = self.connection.cursor()
cursor.execute("CREATE TABLE IF NOT EXISTS exams (uuid TEXT PRIMARY KEY, duration INT, subject TEXT)")
cursor.close()
async def _destroy(self) -> None:
if self.connection is not None:
self.connection.close()
self.connection = None
def get_longest_exams(self, limit: int) -> list[ExamDurationDTO]:
params = (limit, )
cursor = self.connection.cursor()
cursor.execute("SELECT uuid, duration FROM exams ORDER BY duration DESC LIMIT ?", params)
rows = cursor.fetchall()
cursor.close()
return [ExamDurationDTO(*row) for row in rows]
def get_subjects_with_more_exams(self, limit: int) -> list[SubjectExamsDTO]:
params = (limit, )
cursor = self.connection.cursor()
cursor.execute(
"SELECT subject, COUNT(uuid) AS exams FROM exams GROUP BY subject ORDER BY exams DESC LIMIT ?", params
)
rows = cursor.fetchall()
cursor.close()
return [SubjectExamsDTO(*row) for row in rows]
def add(self, uuid: UUID, duration: timedelta, subject: UUID)-> None:
params = (str(uuid), duration // timedelta(microseconds=1), str(subject))
cursor = self.connection.cursor()
cursor.execute("INSERT INTO exams (uuid, duration, subject) VALUES (?, ?, ?)", params)
cursor.close()
self.connection.commit()
def update_duration(self, uuid: UUID, duration: timedelta)-> None:
params = (duration // timedelta(microseconds=1), str(uuid))
cursor = self.connection.cursor()
cursor.execute("UPDATE exams SET duration = ? WHERE uuid = ?", params)
self.connection.commit()
def update_subject(self, uuid: UUID, subject: UUID)-> None:
params = (str(subject), str(uuid))
cursor = self.connection.cursor()
cursor.execute("UPDATE exams SET subject = ? WHERE uuid = ?", params)
cursor.close()
self.connection.commit()
def delete(self, uuid: UUID) -> None:
params = (str(uuid), )
cursor = self.connection.cursor()
cursor.execute("DELETE FROM table WHERE uuid = ?", params)
cursor.close()
self.connection.commit()
class ExamQueryService(QueryService):
@inject
def __init__(self, repository: ExamQueryRepository = Provide["query_repository"], *args, **kwargs):
super().__init__(*args, **kwargs)
self.repository = repository
@enroute.rest.query("/exams/q/exams-with-more-duration", "GET")
@enroute.broker.query("GetExamsWithMoreDuration")
async def exams_with_more_duration(self, request: Request) -> Response:
content = await request.content()
try:
limit = content["limit"]
except KeyError:
limit = 100
result = self.repository.get_longest_exams(limit)
return Response(result)
@enroute.rest.query("/exams/q/subjects-with-more-exams", "GET")
@enroute.broker.query("GetSubjectsWithMoreExams")
async def subjects_with_more_exams(self, request: Request) -> Response:
content = await request.content()
try:
limit = content["limit"]
except KeyError:
limit = 100
result = self.repository.get_subjects_with_more_exams(limit)
return Response(result)
@enroute.broker.event("ExamCreated")
async def exam_created(self, request: Request) -> None:
diff = await request.content(resolve_references=False)
try:
uuid, duration, subject = diff.uuid, diff.get_one("duration"), diff.get_one("subject")
except Exception:
raise ResponseException("Some fields could not be extracted.")
self.repository.add(uuid, duration, subject)
@enroute.broker.event("ExamUpdated.duration")
async def exam_duration_updated(self, request: Request) -> None:
diff = await request.content()
try:
uuid, duration = diff.uuid, diff.get_one("duration")
except Exception as exc:
raise ResponseException(f"Some fields could not be extracted: {exc}")
self.repository.update_duration(uuid, duration)
@enroute.broker.event("ExamUpdated.subject")
async def exam_subject_updated(self, request: Request) -> None:
diff = await request.content(resolve_references=False)
try:
uuid, subject = diff.uuid, diff.get_one("subject")
except Exception as exc:
raise ResponseException(f"Some fields could not be extracted: {exc}")
self.repository.update_subject(uuid, subject)
@enroute.broker.event("ExamDeleted")
async def exam_deleted(self, request: Request) -> None:
diff = await request.content()
try:
uuid = diff.uuid
except Exception as exc:
raise ResponseException(f"Some fields could not be extracted: {exc}")
self.repository.delete(uuid)