Source code for minos.saga.executions.commit

import logging
from asyncio import (
    gather,
)
from uuid import (
    UUID,
)

from cached_property import (
    cached_property,
)
from dependency_injector.wiring import (
    Provide,
    inject,
)

from minos.networks import (
    BrokerPublisher,
    DynamicBroker,
    DynamicBrokerPool,
)

from .steps import (
    ConditionalSagaStepExecution,
    SagaStepExecution,
)

logger = logging.getLogger(__name__)


[docs]class TransactionCommitter: """Transaction Committer class.""" # noinspection PyUnusedLocal
[docs] @inject def __init__( self, execution_uuid: UUID, executed_steps: list[SagaStepExecution], broker_pool: DynamicBrokerPool = Provide["broker_pool"], broker_publisher: BrokerPublisher = Provide["broker_publisher"], **kwargs, ): self.executed_steps = executed_steps self.execution_uuid = execution_uuid self.broker_pool = broker_pool self.broker_publisher = broker_publisher
# noinspection PyUnusedCommit,PyMethodOverriding
[docs] async def commit(self, **kwargs) -> None: """Commit the transaction. :param kwargs: Additional named arguments. :return: This method does not return anything. """ logger.info("committing...") if not await self._reserve(): await self.reject() raise ValueError("Some transactions could not be committed.") await self._commit()
async def _reserve(self) -> bool: async with self.broker_pool.acquire() as broker: futures = ( broker.send(data=uuid, topic=f"Reserve{service_name.title()}Transaction") for (uuid, service_name) in self.transactions ) await gather(*futures) return await self._get_response(broker, len(self.transactions)) async def _commit(self) -> None: futures = ( self.broker_publisher.send(data=uuid, topic=f"Commit{service_name.title()}Transaction") for (uuid, service_name) in self.transactions ) await gather(*futures) # await self._get_response(handler, len(self.transactions)) logger.info("Successfully committed!")
[docs] async def reject(self) -> None: """Reject the transaction. :return: This method does not return anything. """ futures = ( self.broker_publisher.send(data=uuid, topic=f"Reject{service_name.title()}Transaction") for (uuid, service_name) in self.transactions ) await gather(*futures) logger.info("Successfully rejected!")
@staticmethod async def _get_response(handler: DynamicBroker, count: int, **kwargs) -> bool: entries = await handler.get_many(count, **kwargs) return all(entry.data.ok for entry in entries) @cached_property def transactions(self) -> list[tuple[UUID, str]]: """Get the list of transactions used during the saga execution. :return: A list of tuples in which the first value is the identifier of the transaction and the second one is the name of the microservice in which the saga was executed. """ transactions = list() uniques = set() def _fn(uuid: UUID, steps: list[SagaStepExecution]) -> None: for step in steps: if isinstance(step, ConditionalSagaStepExecution): inner = step.inner if inner is not None: _fn(inner.uuid, inner.executed_steps) else: for service_name in step.related_services: pair = (uuid, service_name) if pair not in uniques: transactions.append(pair) uniques.add(pair) _fn(self.execution_uuid, self.executed_steps) transactions.sort() return transactions