from __future__ import (
annotations,
)
from pathlib import (
Path,
)
from typing import (
Any,
Optional,
Type,
Union,
)
import lmdb
from ..protocol import (
MinosAvroDatabaseProtocol,
MinosBinaryProtocol,
)
from .abc import (
MinosStorage,
)
[docs]class MinosStorageLmdb(MinosStorage):
"""Minos Storage LMDB class"""
__slots__ = "_env", "_protocol", "_tables"
# noinspection PyUnusedLocal
[docs] def __init__(
self, env: lmdb.Environment, protocol: Type[MinosBinaryProtocol] = MinosAvroDatabaseProtocol, **kwargs
):
self._env: lmdb.Environment = env
self._protocol = protocol
self._tables = {}
[docs] def add(self, table: str, key: str, value: Any) -> None:
"""Store a value.
:param table: Table in which the data is stored.
:param key: Key that identifies the data.
:param value: Data to be stored.
:return: This method does not return anything.
"""
db_instance = self._get_table(table)
with self._env.begin(write=True) as txn:
value_bytes: bytes = self._protocol.encode(value)
txn.put(key.encode(), value_bytes, db=db_instance)
[docs] def get(self, table: str, key: str) -> Optional[Any]:
"""Get the stored value..
:param table: Table in which the data is stored.
:param key: Key that identifies the data.
:return: The stored value or ``None`` if it's empty.
"""
db_instance = self._get_table(table)
with self._env.begin(db=db_instance) as txn:
value_binary = txn.get(key.encode())
if value_binary is not None:
# decode the returned value
return self._protocol.decode(value_binary)
return None
[docs] def delete(self, table: str, key: str) -> None:
"""Delete the stored value.
:param table: Table in which the data is stored.
:param key: Key that identifies the data.
:return: This method does not return anything.
"""
db_instance = self._get_table(table)
with self._env.begin(write=True, db=db_instance) as txn:
txn.delete(key.encode())
[docs] def update(self, table: str, key: str, value: Any) -> None:
"""Update the stored value.
:param table: Table in which the data is stored.
:param key: Key that identifies the data.
:param value: Data to be stored.
:return: This method does not return anything.
"""
db_instance = self._get_table(table)
with self._env.begin(write=True, db=db_instance) as txn:
value_bytes: bytes = self._protocol.encode(value)
txn.put(key.encode(), value_bytes, db=db_instance, overwrite=True)
def _get_table(self, table: str):
if table in self._tables:
return self._tables[table]
else:
# create a new table
self._tables[table] = self._env.open_db(table.encode())
return self._tables[table]
[docs] @classmethod
def build(cls, path: Union[str, Path], max_db: int = 10, map_size: int = int(1e9), **kwargs) -> MinosStorageLmdb:
"""Build a new instance.
:param path: Path in which the database is stored.
:param max_db: Maximum number of available databases.
:param map_size: Maximum number of entries to be stored on the database. Default set to 1GB
:param kwargs: Additional named arguments.
:return: A ``MinosStorageLmdb`` instance.
"""
env: lmdb.Environment = lmdb.open(str(path), max_dbs=max_db, map_size=map_size)
return cls(env, **kwargs)