minos.networks.brokers.handlers.handlers module

class minos.networks.brokers.handlers.handlers.BrokerHandler(records, handlers, retry, publisher, consumer_concurrency=15, **kwargs)[source]

Bases: minos.networks.brokers.handlers.abc.BrokerHandlerSetup

Broker Handler class.

__init__(records, handlers, retry, publisher, consumer_concurrency=15, **kwargs)[source]
property already_destroyed: bool

Already Destroy getter.

Return type

bool

Returns

A boolean value.

property already_setup: bool

Already Setup getter.

Return type

bool

Returns

A boolean value.

property consumers: list[_asyncio.Task]

Get the consumers.

Return type

list[_asyncio.Task]

Returns

A list of Task instances.

cursor(*args, **kwargs)

Get a new cursor.

Parameters
  • args – Additional positional arguments.

  • kwargs – Additional named arguments.

Return type

typing.AsyncContextManager[aiopg.connection.Cursor]

Returns

A Cursor wrapped into an asynchronous context manager.

async destroy()

Destroy miscellaneous repository things.

Return type

None

Returns

This method does not return anything.

async dispatch(cursor=None, background_mode=False)[source]

Dispatch a batch of HandlerEntry instances from the database’s queue.

Parameters
  • cursor (typing.Optional[aiopg.connection.Cursor]) – The cursor to interact with the database. If None is provided a new one is acquired.

  • background_mode (bool) – If True the entries dispatching waits until every entry is processed. Otherwise, the dispatching is performed on background.

Return type

None

Returns

This method does not return anything.

async dispatch_forever(max_wait=60.0)[source]

Dispatch the items in the consuming queue forever.

Parameters

max_wait (typing.Optional[float]) – Maximum seconds to wait for notifications. If None the wait is performed until infinity.

Return type

typing.NoReturn

Returns

This method does not return anything.

async dispatch_one(entry)[source]

Dispatch one row.

Parameters

entry (minos.networks.brokers.handlers.entries.BrokerHandlerEntry) – Entry to be dispatched.

Return type

None

Returns

This method does not return anything.

classmethod from_config(config=None, **kwargs)

Build a new instance from config.

Parameters
  • config (typing.Union[minos.common.configuration.config.MinosConfig, pathlib.Path, None]) – Config instance. If None is provided, default config is chosen.

  • kwargs – Additional named arguments.

Return type

~S

Returns

A instance of the called class.

get_action(topic)[source]

Get handling function to be called.

Gets the instance of the class and method to call.

Args:

topic: Kafka topic. Example: “TicketAdded”

Raises:
MinosNetworkException: topic TicketAdded have no controller/action configured, please review th

configuration file.

Return type

typing.Optional[typing.Callable]

static get_callback(fn)[source]

Get the handler function to be used by the Broker Handler.

Parameters

fn (typing.Callable[[BrokerRequest], Union[BrokerRequest, None, Awaitable[Optional[BrokerRequest]]]]) – The action function.

Return type

typing.Callable[[BrokerMessage], Awaitable[tuple[Any, BrokerMessageStatus, dict[str, str]]]]

Returns

A wrapper function around the given one that is compatible with the Broker Handler API.

property handlers: dict[str, typing.Optional[typing.Callable]]

Handlers getter.

Return type

dict[str, typing.Optional[typing.Callable]]

Returns

A dictionary in which the keys are topics and the values are the handler.

locked_cursor(key, *args, **kwargs)

Get a new locked cursor.

Parameters
  • key (collections.abc.Hashable) – The key to be used for locking.

  • args – Additional positional arguments.

  • kwargs – Additional named arguments.

Return type

typing.AsyncContextManager[aiopg.connection.Cursor]

Returns

A Cursor wrapped into an asynchronous context manager.

property pool: minos.common.database.pools.PostgreSqlPool

Get the connections pool.

Return type

minos.common.database.pools.PostgreSqlPool

Returns

A Pool object.

property publisher: minos.networks.brokers.publishers.publishers.BrokerPublisher

Get the publisher instance.

Return type

minos.networks.brokers.publishers.publishers.BrokerPublisher

Returns

A BrokerPublisher instance.

async setup()

Setup miscellaneous repository things.

Return type

None

Returns

This method does not return anything.

async submit_query(operation, parameters=None, *, timeout=None, lock=None, **kwargs)

Submit a SQL query.

Parameters
  • operation (typing.Any) – Query to be executed.

  • parameters (typing.Optional[typing.Any]) – Parameters to be projected into the query.

  • timeout (typing.Optional[float]) – An optional timeout.

  • lock (typing.Optional[typing.Any]) – Optional key to perform the query with locking. If not set, the query is performed without any lock.

  • kwargs – Additional named arguments.

Return type

None

Returns

This method does not return anything.

async submit_query_and_fetchone(*args, **kwargs)

Submit a SQL query and gets the first response.

Parameters
  • args – Additional positional arguments.

  • kwargs – Additional named arguments.

Return type

tuple

Returns

This method does not return anything.

async submit_query_and_iter(operation, parameters=None, *, timeout=None, lock=None, streaming_mode=False, **kwargs)

Submit a SQL query and return an asynchronous iterator.

Parameters
  • operation (typing.Any) – Query to be executed.

  • parameters (typing.Optional[typing.Any]) – Parameters to be projected into the query.

  • timeout (typing.Optional[float]) – An optional timeout.

  • lock (typing.Optional[int]) – Optional key to perform the query with locking. If not set, the query is performed without any lock.

  • streaming_mode (bool) – If True the data fetching is performed in streaming mode, that is iterating over the cursor and yielding once a time (requires an opening connection to do that). Otherwise, all the data is fetched and keep in memory before yielding it.

  • kwargs – Additional named arguments.

Return type

typing.AsyncIterator[tuple]

Returns

This method does not return anything.

property topics: KeysView[str]

Get an iterable containing the topic names.

Return type

typing.KeysView[str]

Returns

An Iterable of str.