Saga Pattern¶
Context¶
Minos framework apply Database per Service Pattern. Each service has its own database. Some business transactions,
however, span multiple service so we need a mechanism to implement transactions that span services.
For example, let’s imagine that we have an e-commerce store where customers have a credit limit.
The application must ensure that a new order will not exceed the customer’s credit limit. Since Orders
and Customers
are in different databases owned by different services the application cannot simply use a local ACID transaction.
Problem¶
How do we implement transactions that affect several services?
In our case, how do we make Order and Customer communicate?
Solution¶
Implement each business transaction that spans multiple services is a saga. A saga is a sequence of local transactions. Each local transaction updates the database and publishes a message or event to trigger the next local transaction in the saga. If a local transaction fails because it violates a business rule then the saga executes a series of compensating transactions that undo the changes that were made by the preceding local transactions.
In the following example we are going to see how the saga is for the case of adding a Product
to the Cart
, we want
this product to be reserved for us.
Definitions¶
It is important to differentiate the components involved, which are broadly speaking:
SAGA
It is the definition of the different operations involved in the transaction in an orderly and hierarchical manner. For example, to create an order we must reserve the products (microservice Product), generate the ticket or order summary (microservice Ticket) and make the payment (microservice Payment). So the saga ensures that operations are executed in a set manner, i.e. first the product is reserved, then the ticket is generated and finally the payment is created.
SAGA Manager
On the other hand, the saga manager is in charge of coordinating the execution of the SAGA’s.
How does it work?¶
The communication between the microservices is done using Command
and waiting for the corresponding CommandReply
.
CommandBroker
is used to send the Command and ahere are 2 ways to receive the answer:
CommandReplyHandler
if SAGA was paused on disk orDynamicHandler
if was paused on memmory.
You can see more details about the exchange of messages in the Broker section.
SAGA structure¶
The saga consists of steps and are those that define the operations to be performed in an orderly way to communicate with other
services. These steps normally contain the call to the corresponding service (.invoke_participant("CreateOrder")
) and
optionally a compensation (.with_compensation("DeleteOrder", delete_order_callback)
) in case an error occurs or the
operation cannot be performed by the business logic. It also optionally has a function in charge of
receiving the response from the invoked service (which is used in case the response has to be processed).
For a SAGA instance to be valid, a step
, invoke_participant
and commit
are required. So the minimum structure of
a SAGA is:
Saga()
.step()
.invoke_participant("CreateOrder")
.commit()
step()
¶
As mentioned above, the step()
is the minimum unit for a saga to work.
The step is used to define an operation and separate it from the rest, so a SAGA is usually a series of steps. Example:
CREATE_ORDER = (
Saga("CreateOrder")
.step()
.invoke_participant("CreateTicket", _create_ticket)
.on_reply("ticket", _process_ticket_entries)
.step()
.invoke_participant("PurchaseProducts", _purchase_products)
.with_compensation("PurchaseProducts", _revert_purchase_products)
.step()
.invoke_participant("CreatePayment", _payment)
.on_reply("payment", _get_payment)
.commit(_create_commit_callback)
)
The step in turn contains the following operations:
invoke_participant(COMMAND, Callback function[Optional])
It is responsible for invoking a
Command
normally located in another microservice.The callback function is optional and is used to prepare data or information to be sent with the
Command
.In the following example you can see how to build the query where you would send
cart_uuid
together with theCreateTicket
command.Example:
TicketQuery = ModelType.build("TicketQuery", {"cart_uuid": UUID}) def _create_ticket(context: SagaContext) -> Model: cart_uuid = context["cart_uuid"] return TicketQuery(cart_uuid) CREATE_ORDER = ( Saga("CreateOrder") .step() .invoke_participant("CreateTicket", _create_ticket) .commit() ) # The execution of the SAGA is omitted in this example ...
Another simple example without passing data:
CREATE_ORDER = ( Saga("CreateOrder") .step() .invoke_participant("CreateTicket") .commit() ) # The execution of the SAGA is omitted in this example ...
on_reply(Name, Callback function)
- Optionalon reply is optional and is used if necessary to handle the result of the
invoke_participant
. Important - The callback function of on_reply does not have access to the SagaContext. This is intentional since on_reply is only used to process the data. Onreturn
it is automatically stored in SagaContext. Example:def _get_payment(value: Aggregate) -> UUID: return value.uuid CREATE_ORDER = ( Saga("CreateOrder") .step() .invoke_participant("CreatePayment", _payment) .on_reply("payment", _get_payment) .commit(_commit) ) # The execution of the SAGA is omitted in this example ...
with_compensation(COMMAND, Callback function[Optional])
- OptionalIn the event of an error in any of the operations, the compensation of the corresponding step and those of the previous steps, if any, will be executed.
The callback function is optional and is used to prepare data or information to be sent with the
Command
.(If you need to send data you can construct the query in the same way as in the invoke_perticipant callback above.)
commit(Callback function[Optional])
¶
The commit is primarily used to indicate the end of the SAGA definition and optionally to execute a callback function once all the steps have been executed correctly.
The commit can be empty. What I would do is simply store the result.
def _get_payment(value: Aggregate) -> UUID: return value.uuid CREATE_ORDER = ( Saga("CreateOrder") .step() .invoke_participant("CreatePayment", _payment) .on_reply("payment", _get_payment) .commit() ) # The execution of the SAGA is omitted in this example ...
Or callback function can be called. It is normally used to perform operations with Aggregates. For example: Create the order, update it …
def _commit(context: SagaContext) -> SagaContext: # Some logic, normally Aggregate operations ... return SagaContext(order=order) CREATE_ORDER = ( Saga("CreateOrder") .step() .invoke_participant("CreatePayment", _payment) .on_reply("payment", _get_payment) .commit(_commit) ) # The execution of the SAGA is omitted in this example ...
SAGA Manager¶
Saga manager is in charge of coordinating the execution of the saga or resuming the execution from disk or memory.
Previously we have seen how to define a SAGA, now we are going to see how to execute it.
Saga manager accepts the following parameters:
SAGA Definition: Required
Example:
CREATE_ORDER = ( Saga() .step() .invoke_participant("CreateOrder") .commit() )
pause_on_disk
: Optional (defaultFalse
)If it is paused on disk (
pause_on_disk=True
) it means that it is asynchronous, else if it is not paused on disk (pause_on_disk=False
) it means that it is synchronous.raise_on_error
: Optional (defaultTrue
) Indicates what behaviour the SAGA should take in case of an error, being able to set it toFalse
so that in case of error no exception is thrown and other operations can be performed after the SAGA.return_execution
: Optional (defaultTrue
)If
return_execution=True
, the process that launched the SAGA would be waiting until the execution is finished, being able to evaluate the result of this execution. Ifpause_on_disk=False
, the process that launched the SAGA receives the UUID of the SAGA execution and does not wait for it to finish.context
Optional Normally when launching a SAGA, you need to pass some parameters to be able to work with them or once the SAGA is running to store the result of the step and this is where SagaContext is involved.Parameters that you want to send to SAGA in order to be able to operate with them. Example:
context=SagaContext( customer=12345, cart=83274892374, ... ),
Within the SAGA you can access them in the following way:
def example_function(context: SagaContext) -> SagaContext: customer = context["customer_uuid"]
Now we can see an example on how to run a SAGA
with SAGA Manager
:
CREATE_ORDER = (
Saga("CreateOrder")
.step()
.invoke_participant("CreateTicket")
.step()
.invoke_participant("CreatePayment")
.commit()
)
...
class OrderCommandService(CommandService):
@enroute.rest.command("/orders", "POST")
@enroute.broker.command("CreateOrder")
async def create_order(self, request: Request) -> Response:
saga = await self.saga_manager.run(
CREATE_ORDER,
context=SagaContext(
cart_uuid=cart_uuid,
customer_uuid=customer_uuid,
payment_detail=payment_detail,
shipment_detail=shipment_detail,
),
)
Examples¶
Get all users
(please dont use similar in production, you know ;) )
GET_USER = ( Saga() .step() .invoke_participant("GetUsers") .commit() )
We run SAGA with SagaManager as follows:
... saga = await self.saga_manager.run(GET_USER) # get results: users = saga.context["users"]) ...
In invoke_participant SAGA will send the Command
GetUsers
. It will be received by the microservice that has that command defined. Logically it should be the microservice User.Get specific user
UserQuery = ModelType.build("UserQuery", {"user_id": int}) def _user(context: SagaContext) -> Model: user_id = context["user_id"] return UserQuery(user_id) GET_USER = ( Saga() .step() .invoke_participant("GetUsers", _user) .commit() )
We run SAGA with SagaManager as follows:
... saga = await self.saga_manager.run( GET_USER, context=SagaContext( user_id=1234, ), ) # get results: user = saga.context["user"]) ...
In invoke_participant SAGA will send the Command
GetUser
passing the query containing user_id. It will be received by the microservice that has that command defined. Logically it should be the microservice User.