minos.networks.brokers.handlers.consumers module¶
- class minos.networks.brokers.handlers.consumers.BrokerConsumer(topics=None, broker=None, client=None, group_id='default', **kwargs)[source]¶
Bases:
minos.networks.brokers.handlers.abc.BrokerHandlerSetupBroker Consumer class.
- async add_topic(topic)[source]¶
Add a topic to the consumer’s subscribed topics.
- Parameters
topic (
str) – Name of the topic to be added.- Return type
None- Returns
This method does not return anything.
- property already_destroyed: bool¶
Already Destroy getter.
- Return type
bool- Returns
A boolean value.
- property already_setup: bool¶
Already Setup getter.
- Return type
bool- Returns
A boolean value.
- property client: aiokafka.consumer.consumer.AIOKafkaConsumer¶
Get the kafka consumer client.
- Return type
aiokafka.consumer.consumer.AIOKafkaConsumer- Returns
An
AIOKafkaConsumerinstance.
- cursor(*args, **kwargs)¶
Get a new cursor.
- Parameters
args – Additional positional arguments.
kwargs – Additional named arguments.
- Return type
typing.AsyncContextManager[aiopg.connection.Cursor]- Returns
A Cursor wrapped into an asynchronous context manager.
- async destroy()¶
Destroy miscellaneous repository things.
- Return type
None- Returns
This method does not return anything.
- async dispatch()[source]¶
Perform a dispatching step.
- Return type
typing.NoReturn- Returns
This method does not return anything.
- async enqueue(topic, partition, binary)[source]¶
Insert row into queue table.
Retrieves number of affected rows and row ID.
- Args:
topic: Kafka topic. Example: “TicketAdded” partition: Kafka partition number. binary: Broker Message in bytes.
- Returns:
Queue ID.
Example: 12
- Raises:
Exception: An error occurred inserting record.
- Return type
int
- classmethod from_config(config=None, **kwargs)¶
Build a new instance from config.
- Parameters
config (
typing.Union[minos.common.configuration.config.MinosConfig,pathlib.Path,None]) – Config instance. If None is provided, default config is chosen.kwargs – Additional named arguments.
- Return type
~S
- Returns
A instance of the called class.
- async handle_message(consumer)[source]¶
Message consumer.
It consumes the messages and sends them for processing.
- Args:
consumer: Kafka Consumer instance (at the moment only Kafka consumer is supported).
- Return type
None
- async handle_single_message(message)[source]¶
Handle Kafka messages.
Add the message to the event_queue table.
- Args:
message: Kafka message.
- Raises:
Exception: An error occurred inserting record.
- locked_cursor(key, *args, **kwargs)¶
Get a new locked cursor.
- Parameters
key (
collections.abc.Hashable) – The key to be used for locking.args – Additional positional arguments.
kwargs – Additional named arguments.
- Return type
typing.AsyncContextManager[aiopg.connection.Cursor]- Returns
A Cursor wrapped into an asynchronous context manager.
- property pool: minos.common.database.pools.PostgreSqlPool¶
Get the connections pool.
- Return type
minos.common.database.pools.PostgreSqlPool- Returns
A
Poolobject.
- async remove_topic(topic)[source]¶
Remove a topic from the consumer’s subscribed topics.
- Parameters
topic (
str) – Name of the topic to be removed.- Return type
None- Returns
This method does not return anything.
- async setup()¶
Setup miscellaneous repository things.
- Return type
None- Returns
This method does not return anything.
- async submit_query(operation, parameters=None, *, timeout=None, lock=None, **kwargs)¶
Submit a SQL query.
- Parameters
operation (
typing.Any) – Query to be executed.parameters (
typing.Optional[typing.Any]) – Parameters to be projected into the query.timeout (
typing.Optional[float]) – An optional timeout.lock (
typing.Optional[typing.Any]) – Optional key to perform the query with locking. If not set, the query is performed without any lock.kwargs – Additional named arguments.
- Return type
None- Returns
This method does not return anything.
- async submit_query_and_fetchone(*args, **kwargs)¶
Submit a SQL query and gets the first response.
- Parameters
args – Additional positional arguments.
kwargs – Additional named arguments.
- Return type
tuple- Returns
This method does not return anything.
- async submit_query_and_iter(operation, parameters=None, *, timeout=None, lock=None, streaming_mode=False, **kwargs)¶
Submit a SQL query and return an asynchronous iterator.
- Parameters
operation (
typing.Any) – Query to be executed.parameters (
typing.Optional[typing.Any]) – Parameters to be projected into the query.timeout (
typing.Optional[float]) – An optional timeout.lock (
typing.Optional[int]) – Optional key to perform the query with locking. If not set, the query is performed without any lock.streaming_mode (
bool) – IfTruethe data fetching is performed in streaming mode, that is iterating over the cursor and yielding once a time (requires an opening connection to do that). Otherwise, all the data is fetched and keep in memory before yielding it.kwargs – Additional named arguments.
- Return type
typing.AsyncIterator[tuple]- Returns
This method does not return anything.
- property topics: set[str]¶
Topics getter.
- Return type
set[str]- Returns
A list of string values.