Modeling the Domain: Aggregates

Introduction

In opposite to monolithic systems who use relational database model to represent and store the business logic, microservice-based systems requires applying slightly different techniques. One of the main reasons to start thinking different is based on the splitting of a single but big information schema into multiple smaller ones, as each microservice has its own dedicated database.

The minos proposal is based on the Domain Drive Design (DDD) ideas, supported by Entities, Value-Objects and an Aggregate (or Root Entity) that represents the main concept of the microservice. These set of concepts allows us to model information at microservice level, but at some cases it’s needed to relate information from one microservice with information from another one. The way to do that in minos is over Aggregate References. To understand these concepts in more detail, the Data Model section provides a more detailed explanation about them.

Defining the Exam aggregate…

As it is advanced at the beginning of the QuickStart guide, the study case will be to define an exam microservice. This one will be able to store all the needed information related with a common exam composed of questions with selectable answers. For the sake of simplicity, all of them must be multiple answer questions, but it’s a good practising exercise to continue working on these case and add another functionalities.

In this case, the Exam class will be the root-entity or Aggregate of the microservice, so that most of the operations sent to it will be related with the Exam. In minos, the way to do that is to inherit from the minos.common.Aggregate class, that is similar to Python‘s dataclasses in the sense that is able to build the class fields based on the typing. The currently supported types are all the simple Python‘s types (int, float, str, …) but also another advanced types are supported, like list, dict, datetime, etc. See the full documentation to obtain a detailed description.

With this simple functionality, it’s already possible to start building the Exam structure, or at least defining the fields based on simpler types, as it can be seen here:

from datetime import (
    timedelta,
)

from minos.common import (
    Aggregate,
)


class Exam(Aggregate):
    name: str
    duration: timedelta
    # subject: ...
    # questions: ...

Then, an exam instance could be created as follows:

Exam("Mid-term", timedelta(hours=1, minutes=30))

One important thing to notice is that the Aggregate class also provides another additional fields related both with the identification and versioning that must not be provided by the developer (they are computed internally). The concrete fields are the following:

  • uuid: UUID Unique identifier of the instance.

  • version: int Version of the instance, computed as an auto-incremental integer starting from 1 during the creation process.

  • created_at: datetime Creation timestamp of the instance.

  • updated_at: datetime Timestamp of the last update of the instance.

But the real potential of the Aggregate class starts being visible when storage operations are performed:

Storage Operations

Before starting to describe the available CRUD operations provided by the Aggregate class, it’s important to notice one important part of the minos framework and how those operations are implemented. The nature of the framework is highly inspired by Event Sourcing thoughts, so that the aggregate is not simply stored on a collection that is being updated progressively without taking history into account, but the aggregate is stored as a sequence of incremental modifications known as AggregateDiff entries that acts as Domain Events.

This set of events are stored on a Repository who is defined by the minos.common.MinosRepository interface, but also are exposed to another microservices over a Broker, who is defined by the minos.common.MinosBroker interface. The event based strategy has an important caveat, that is how to access the full aggregate as most business logic operations will probably require working with a full Exam instance (not only with the sequence of AggregateDiff instances). As the reconstruction each time a full instance is needed is a really inefficient operation, minos holds on a Snapshot who is defined by the minos.common.MinosSnapshot interface and provided direct access to the reconstructed instances. To know more about how Event Sourcing is implemented in minos, the [TODO: link to architecture] sections provides a detailed description.

The configuration file service.injections section of the configuration file allows to setup both the reposiory, event_broker and snapshot concrete classes:

# config.yml

service:
  injections:
    event_broker: minos.networks.EventBroker
    repository: minos.common.PostgreSqlRepository
    snapshot: minos.common.PostgreSqlSnapshot
repository:
  database: exam_db
  user: minos
  password: min0s
  host: localhost
  port: 5432
snapshot:
  database: exam_db
  user: minos
  password: min0s
  host: localhost
  port: 5432
...

After being introduced how aggregate persistence is implemented in minos, the next sections provide a reference guide about how to use the storage operations step by step. One important thing to notice is that all of them are implemented using awaitables, so it’s needed to know the asyncio basis to get the most of them.

Create

To create new aggregate instances, the best choice is to use the create() class method, which is similar to creating an instance directly calling the class constructor, but also stores a creation event into the Repository, so that the persistence is guaranteed. Then, the Broker will publish the update event on the {$AGGREGATE_NAME}Created topic. In addition to that, the retrieved instance has already set the auto-generated fields (uuid, version, etc.).

For example, creating an Exam aggregate can be done with:

exam = await Exam.create("Mid-term", timedelta(hours=1))

The exam instance will be like:

Exam(
    uuid=..., # generated uuid
    version=1,
    created_at=..., # generated datetime
    updated_at=..., # generated datetime
    name="Mid-term",
    duration=timedelta(hours=1),
)

And the corresponding creation event will be like:

AggregateDiff(
    action=Action.CREATE,
    name="src.aggregates.Exam",
    uuid=..., # generated aggregate uuid
    version=1,
    created_at=..., # generated aggregate datetime
    fields_diff=FieldDiffContainer(
        [
            FieldDiff(
                name="name",
                type_=str,
                value="Mid-term"
            ),
            FieldDiff(
                name="duration",
                type_=timedelta,
                value=timedelta(hours=1)
            ),
        ]
    )
)

Update

The update operation modifies the value of some fields that are composing the aggregate. The way to do that is through the update() method, which get the set of values to be updated as named arguments, in which the given name matches with the corresponding field name. Internally, the method computes the fields difference between the previous and new fields and then stores an update event into the Repository, so that the persistence is guaranteed. Then, the Broker will publish the update event on the {$AGGREGATE_NAME}Updated topic. In this case, also the version and updated_at fields are updated according to the new changes.

For example, updating the duration field from the previously created Exam aggregate can be done with:

await exam.update(duration=exam.duration + timedelta(minutes=30))

The exam instance will be like:

Exam(
    uuid=...,
    version=2, # updated
    created_at=...,
    updated_at=..., # updated
    name="Mid-term",
    duration=timedelta(hours=1, minutes=30), # updated
)

And the corresponding update event will be like:

AggregateDiff(
    action=Action.UPDATE,
    name="src.aggregates.Exam",
    uuid=...,
    version=2,
    created_at=..., # generated datetime
    fields_diff=FieldDiffContainer(
        [
            FieldDiff(
                name="duration",
                type_=timedelta,
                value=timedelta(hours=1, minutes=30)
            ),
        ]
    )
)

Additionally, the Aggregate class provides a save() method that automatically creates or updates the instance depending on if it’s a new one or an already exising. Here is an example:

exam = Exam("Mid-term", timedelta(hours=1))
await exam.save()

exam.duration += timedelta(minutes=30)
await exam.save()

Delete

After being explained who to create and update instances, the remaining operation is the deletion one. In the minos framework it’s implemented with a delete method, that internally stores a deletion event in to the Repository so that the persistence is guaranteed. Then, the Broker will publish the update event on the {$AGGREGATE_NAME}Deleted topic.

For example, deleting an instance can be done with:

await exam.delete()

In this case, does not make any sense to continue working with the exam instance anymore, but the corresponding delete event will be like:

AggregateDiff(
    action=Action.DELETE,
    name="src.aggregates.Exam",
    uuid=...,
    version=3,
    created_at=..., # generated datetime
    fields_diff=FieldDiffContainer.empty()
)

One important thing to notice is that the create, update and delete operations are writing operations, so all of them generates some kind of event to be stored internally and notified to others so that these operations requires to use the Repository and Broker components. However, the following operations (get and find) are reading operations, so the execution of them does not generate any events. Also, as it will be explained later, these operations are related with Aggregate instances, so it’s needed to use the Snapshot component, whose purpose is to provide a simple and efficient way to access them.

Get

The way to obtain an instance based on its identifier is calling the get class method, which returns a single one, failing if it does not exist or is already deleted. In this case, the Snapshot guarantees a strong consistency respect to the Repository of events.

original = await Exam.create(...)

identifier = original.uuid

recovered = await Exam.get(identifier)

assert original == recovered

As the get class method only retrieves one instance at a time, a good option to retrieve multiple instances concurrently, together with the validation checks (existence and not already deletion) is to use asyncio‘s gather function:

from asyncio import (
    gather,
)

uuids: list[UUID] = ...

exams = await gather(*(Exam.get(uuid) for uuid in uuids))

If the validation checks are not needed, or can be performed directly at application level, a better option is to use the find class method.

Find

Previously described operations have one important thing in common, that is all of them need to know the exact aggregate instance to work with, in other words, all of them needed to know the exact identifier of the instance. Is true that in many cases, this is enough to resolve many use cases, but there are some situations in which a more advanced search is needed. A common example is when it’s needed to apply operations to a set of instances characterised by special conditions and so on.

The way to perform this kind of queries is with the find class method, which not only filters instances according to a given Condition, but can also return them with some specific Ordering and limit the maximum number of instances. For more details about how to write complex queries is highly recommended reading the minos.common.queries reference documentation. Another thing to know about the find class method is that it returns the obtained instances over an AsyncIterator and supports a streaming mode directly from the database if the streaming_mode flag is set to True.

Here is an example of a relatively complex find operation, that will return a ranking of "Mid-term" exams with more duration created during the last week:

from datetime import (
    date,
    timedelta,
)
from minos.common import (
    Condition,
    Ordering,
)


condition = Condition.AND(
    Condition.GREATED_EQUAL("created_at", date.today() - timedelta(days=7)),
    Condition.EQUAL("name", "Mid-term")
)

ordering = Ordering.DESC("duration")

async for exam in Exam.find(condition, ordering, limit=10):
    print(exam)

Field Parsing

There are some specific cases in which the fields should be transformed according to specific rules before setting them into the aggregate instances. To do that, minos is able to automatically recognizes the methods that matches the parse_${FIELD_NAME}(value: Any) -> Any and triggers them before setting the corresponding values.

For example, a parsing method can be used to capitalize the Exam‘s name:

class Exam(Aggregate):
    ...

    def parse_name(self, value: Any) -> Any:
        if not isinstance(value, str):
            return value
        return value.title()

Field Validation

Similar to field parsing, minos provides also the capabilities to implement custom validation methods if type checking is not enough. Concretely, the framework automatically recognizes the methods that matches the validate_${FIELD_NAME}(value: Any) -> bool and triggers them before setting the corresponding values. If the validation result is evaluated to False, then a validation exception is raised and the value is not set.

For example, a validation function can be used to require that Exam‘s name have at least three characters.

class Exam(Aggregate):
    ...

    def validate_name(self, value: Any) -> bool:
        return isinstance(value, str) and len(value) >= 6

Defining the Subject reference…

After being described the basics about aggregates and how to perform operations with them, the next natural step to be able to create complex data models is to learn how to create relations with another aggregates. One important thing to notice here is that whereas multiple Aggregate classes can coexist within the same microservice, the recommended strategy is to keep only one Aggregate per microservice. Then, assuming that it’s needed to create a relation from one to another, minos provides the concept of Aggregate References, which allow to define the schema of the external Aggregate, so that the business logic within that microservice will be able to support on its fields.

For example, if it’s needed to relate the Exam aggregate with a supposed Subject aggregate defined on another microservice then using the AggregateRef will be the solution. One of the reasons why we need to have a reference with Subject could be to use the title field for some kind of query defined in the QueryService (which is described on Exposing Information: Query Service).

from minos.common import (
    AggregateRef,
)


class Subject(AggregateRef):
    title: str

After being defined the AggregateRef, the next step is to create the relation with the Aggregate one. The way to do that is using the ModelRef type hint. The reason why it exists is because Aggregate References should be stored as references on the database, but in some cases, it’s needed to be resolved to use some of their fields. The ModelRef is mostly defined as:

ModelRef[T] = Union[T, UUID] # in which T is an AggregateRef type

Then, the Exam aggregate turns into:

from minos.common import (
    ModelRef,
)


class Exam(Aggregate):
    name: str
    duration: timedelta
    subject: ModelRef[Subject]
    # questions: ...

Defining the Question entity…

One of the most important parts of an exam is the set of questions. Here, the Question will be modeled as an Entity descendant. The Entity classes are characterised for being parts of the Aggregate one, with the extra capability to be uniquely identified based on an internal identifier. To know more about the Entity class, it’s recommended to read the [TODO: link to architecture].

In this case, for sake or simplicity, the Question entity will only have two attributes: a title, which will contain the question to be answered, and a set of answers to be picked. But initially, answers will be ignored:

from minos.common import (
    Entity,
)

class Question(Entity):
    title: str
    # answers: ...

After being defined the Question entity, the next step is to integrate it into the Exam aggregate. In this case, it’s needed to have a special feature, which is multiplicity.

A possible solution could be to use a list or something similar so that questions: list[Question] resolves the problem, but this approach has a caveat, that is the event publication. Using the list class, the questions field is treated as a standard field and the generated events will publish the full list of questions also when only one of them has a small change. In some cases this could be the needed behaviour, but another one can be used.

The EntitySet class is the best way to store Entities in most cases, as it provides incremental storing capabilities that provides a big speedup when a big amount of them is stored. In terms of usage, the EntitySet inherits from the collections.abc.MutableSet base class, so it can be used as a standard Python’s set, so that includes the common add, remove, __contains__, etc. methods, but are specially adapted for Entity instances.

In any case, the real advantage to use the EntitySet are the incremental storing capabilities, that is, instead of storing the full entity set after each Aggregate.update call, only the creations, updates or deletions are stored (and also published over the event system).

For example, if the EntitySet is chosen for the questions field, the Exam aggregate turns into:

from minos.common import (
    EntitySet,
)


class Exam(Aggregate):
    name: str
    duration: timedelta
    subject: ModelRef[Subject]
    questions: EntitySet[Question]

Inserting a Question, is as simple as inserting it into any set:

exam.questions.add(Question("What is 1 + 1?"))

The generated event when await exam.save() is called in this case is similar to:

AggregateDiff(
    action=Action.UPDATE,
    name="src.aggregates.Exam",
    uuid=...,
    version=3,
    created_at=..., # generated datetime
    fields_diff=FieldDiffContainer(
        [
            IncrementalFieldDiff(
                name="questions",
                type_=Question,
                value=Question("What is 1 + 1?"),
                action=Action.CREATE
            ),
        ]
    )
)

Defining the Answer value object…

The last part before the Exam aggregate is ready is to define the available answers for the questions. In this case, the Answer class will be defined as a ValueObject descendant. Value Object classes are characterised by both qualities: They are immutable and the way to identify them is through their field values. To know more about this concept it’s recommended to read the [TODO: link to architecture].

In this case, the Answer class will be composed of two simple attributes, the text containing the answer itself and a correct boolean flag. This strategy will allow to extend the implementation to multiple-answer questions without so much effort.

from minos.common import (
    ValueObject,
)


class Answer(ValueObject):
    text: str
    correct: bool

Similarly to the Entity case, to store a collection of Value Object instances, it’s possible to use the list, but the best choice for most cases is the ValueObjectSet class, that also heirs from the collections.abc.MutableSet base class. The main difference between ValueObjectSet and EntitySet is that in this case the hashing is performed from the field values of each ValueObject instance, instead of simply using the Entity‘s unique identifier for obvious reasons.

Then, if the ValueObjectSet is chosen as the collection of answers, the Question class will turns on:

from minos.common import (
    Entity,
    ValueObjectSet,
)


class Question(Entity):
    title: str
    answers: ValueObject[Answer]

Now, the way to create Answer instances is as follows:

question_identifier: UUID = ...
question = exam.questions.get(question_identifier)

question.answers.add(Answer("2", True))
question.answers.add(Answer("5", False))

await exam.save()

One important thing to not in this case is how AggregateDiff works in cases like this one. Here, a ValueObjectSet (with incremental capabilities) is used inside Entity instances stored inside an EntitySet, that is one of the fields of the Aggregate class. So, this is a special case because the incremental capabilities only work at first field level respect to the Aggregate.

In this case, the generated AggregateDiff will be like:

AggregateDiff(
    action=Action.UPDATE,
    name="src.aggregates.Exam",
    uuid=...,
    version=3,
    created_at=..., # generated datetime
    fields_diff=FieldDiffContainer(
        [
            IncrementalFieldDiff(
                name="questions",
                type_=Question,
                value=Question("What is 1 + 1?", ValueObjectSet([Answer("2", True), Answer("5", False)])),
                action=Action.UPDATE
            ),
        ]
    )
)

Summary

After being described step by step the main features of the Aggregate class, and also the commonly used classes that relate with, here is a full snapshot of the resulting src/aggregates.py file:

"""src/aggregates.py"""

from __future__ import (
    annotations,
)
from datetime import (
    timedelta,
)
from typing import (
    Any,
)

from minos.common import (
    Aggregate,
    AggregateRef,
    Entity,
    EntitySet,
    ModelRef,
    ValueObject,
    ValueObjectSet,
)


class Exam(Aggregate):
    name: str
    duration: timedelta
    subject: ModelRef[Subject]
    questions: EntitySet[Question]

    def parse_name(self, value: Any) -> Any:
        if not isinstance(value, str):
            return value
        return value.title()

    def validate_name(self, value: Any) -> bool:
        return isinstance(value, str) and len(value) >= 6


class Subject(AggregateRef):
    title: str


class Question(Entity):
    title: str
    answers: ValueObjectSet[Answer]


class Answer(ValueObject):
    text: str
    correct: bool