minos.networks.brokers.handlers.consumers module

class minos.networks.brokers.handlers.consumers.BrokerConsumer(topics=None, broker=None, client=None, group_id='default', **kwargs)[source]

Bases: minos.networks.brokers.handlers.abc.BrokerHandlerSetup

Broker Consumer class.

__init__(topics=None, broker=None, client=None, group_id='default', **kwargs)[source]
async add_topic(topic)[source]

Add a topic to the consumer’s subscribed topics.

Parameters

topic (str) – Name of the topic to be added.

Return type

None

Returns

This method does not return anything.

property already_destroyed: bool

Already Destroy getter.

Return type

bool

Returns

A boolean value.

property already_setup: bool

Already Setup getter.

Return type

bool

Returns

A boolean value.

property client: aiokafka.consumer.consumer.AIOKafkaConsumer

Get the kafka consumer client.

Return type

aiokafka.consumer.consumer.AIOKafkaConsumer

Returns

An AIOKafkaConsumer instance.

cursor(*args, **kwargs)

Get a new cursor.

Parameters
  • args – Additional positional arguments.

  • kwargs – Additional named arguments.

Return type

typing.AsyncContextManager[aiopg.connection.Cursor]

Returns

A Cursor wrapped into an asynchronous context manager.

async destroy()

Destroy miscellaneous repository things.

Return type

None

Returns

This method does not return anything.

async dispatch()[source]

Perform a dispatching step.

Return type

typing.NoReturn

Returns

This method does not return anything.

async enqueue(topic, partition, binary)[source]

Insert row into queue table.

Retrieves number of affected rows and row ID.

Args:

topic: Kafka topic. Example: “TicketAdded” partition: Kafka partition number. binary: Broker Message in bytes.

Returns:

Queue ID.

Example: 12

Raises:

Exception: An error occurred inserting record.

Return type

int

classmethod from_config(config=None, **kwargs)

Build a new instance from config.

Parameters
  • config (typing.Union[minos.common.configuration.config.MinosConfig, pathlib.Path, None]) – Config instance. If None is provided, default config is chosen.

  • kwargs – Additional named arguments.

Return type

~S

Returns

A instance of the called class.

async handle_message(consumer)[source]

Message consumer.

It consumes the messages and sends them for processing.

Args:

consumer: Kafka Consumer instance (at the moment only Kafka consumer is supported).

Return type

None

async handle_single_message(message)[source]

Handle Kafka messages.

Add the message to the event_queue table.

Args:

message: Kafka message.

Raises:

Exception: An error occurred inserting record.

locked_cursor(key, *args, **kwargs)

Get a new locked cursor.

Parameters
  • key (collections.abc.Hashable) – The key to be used for locking.

  • args – Additional positional arguments.

  • kwargs – Additional named arguments.

Return type

typing.AsyncContextManager[aiopg.connection.Cursor]

Returns

A Cursor wrapped into an asynchronous context manager.

property pool: minos.common.database.pools.PostgreSqlPool

Get the connections pool.

Return type

minos.common.database.pools.PostgreSqlPool

Returns

A Pool object.

async remove_topic(topic)[source]

Remove a topic from the consumer’s subscribed topics.

Parameters

topic (str) – Name of the topic to be removed.

Return type

None

Returns

This method does not return anything.

async setup()

Setup miscellaneous repository things.

Return type

None

Returns

This method does not return anything.

async submit_query(operation, parameters=None, *, timeout=None, lock=None, **kwargs)

Submit a SQL query.

Parameters
  • operation (typing.Any) – Query to be executed.

  • parameters (typing.Optional[typing.Any]) – Parameters to be projected into the query.

  • timeout (typing.Optional[float]) – An optional timeout.

  • lock (typing.Optional[typing.Any]) – Optional key to perform the query with locking. If not set, the query is performed without any lock.

  • kwargs – Additional named arguments.

Return type

None

Returns

This method does not return anything.

async submit_query_and_fetchone(*args, **kwargs)

Submit a SQL query and gets the first response.

Parameters
  • args – Additional positional arguments.

  • kwargs – Additional named arguments.

Return type

tuple

Returns

This method does not return anything.

async submit_query_and_iter(operation, parameters=None, *, timeout=None, lock=None, streaming_mode=False, **kwargs)

Submit a SQL query and return an asynchronous iterator.

Parameters
  • operation (typing.Any) – Query to be executed.

  • parameters (typing.Optional[typing.Any]) – Parameters to be projected into the query.

  • timeout (typing.Optional[float]) – An optional timeout.

  • lock (typing.Optional[int]) – Optional key to perform the query with locking. If not set, the query is performed without any lock.

  • streaming_mode (bool) – If True the data fetching is performed in streaming mode, that is iterating over the cursor and yielding once a time (requires an opening connection to do that). Otherwise, all the data is fetched and keep in memory before yielding it.

  • kwargs – Additional named arguments.

Return type

typing.AsyncIterator[tuple]

Returns

This method does not return anything.

property topics: set[str]

Topics getter.

Return type

set[str]

Returns

A list of string values.