Source code for minos.saga.executions.storage

from __future__ import (
    annotations,
)

from typing import (
    Type,
    Union,
)
from uuid import (
    UUID,
)

from minos.common import (
    MinosConfig,
    MinosJsonBinaryProtocol,
    MinosStorage,
    MinosStorageLmdb,
)

from ..exceptions import (
    SagaExecutionNotFoundException,
)
from .saga import (
    SagaExecution,
)


[docs]class SagaExecutionStorage: """Saga Execution Storage class."""
[docs] def __init__( self, storage_cls: Type[MinosStorage] = MinosStorageLmdb, protocol=MinosJsonBinaryProtocol, db_name: str = "LocalState", **kwargs, ): self.db_name = db_name self._storage = storage_cls.build(protocol=protocol, **kwargs)
[docs] @classmethod def from_config(cls, config: MinosConfig, **kwargs) -> SagaExecutionStorage: """Build an instance from config. :param config: Config instance. :param kwargs: Additional named arguments. :return: A new ``SagaExecutionStorage`` instance. """ # noinspection PyProtectedMember return cls(**(config.saga.storage._asdict() | kwargs))
[docs] def store(self, execution: SagaExecution) -> None: """Store an execution. :param execution: Execution to be stored. :return: This method does not return anything. """ key = str(execution.uuid) value = execution.raw self._storage.update(table=self.db_name, key=key, value=value)
[docs] def load(self, key: Union[str, UUID]) -> SagaExecution: """Load the saga execution stored on the given key. :param key: The key to identify the execution. :return: A ``SagaExecution`` instance. """ key = str(key) value = self._storage.get(table=self.db_name, key=key) if value is None: raise SagaExecutionNotFoundException(f"The execution identified by {key} was not found.") execution = SagaExecution.from_raw(value) return execution
[docs] def delete(self, key: Union[SagaExecution, str, UUID]) -> None: """Delete the reference of the given key. :param key: Execution key to be deleted. :return: This method does not return anything. """ if isinstance(key, SagaExecution): key = key.uuid key = str(key) self._storage.delete(table=self.db_name, key=key)