Source code for minos.networks.brokers.handlers.abc

from psycopg2.sql import (
    SQL,
)

from minos.common import (
    PostgreSqlMinosDatabase,
)


[docs]class BrokerHandlerSetup(PostgreSqlMinosDatabase): """Minos Broker Setup Class"""
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs)
async def _setup(self) -> None: await self._create_event_queue_table() async def _create_event_queue_table(self) -> None: _CREATE_TABLE_QUERY = SQL( "CREATE TABLE IF NOT EXISTS consumer_queue (" '"id" BIGSERIAL NOT NULL PRIMARY KEY, ' '"topic" VARCHAR(255) NOT NULL, ' '"partition" INTEGER,' '"data" BYTEA NOT NULL, ' '"retry" INTEGER NOT NULL DEFAULT 0,' '"processing" BOOL NOT NULL DEFAULT FALSE, ' '"created_at" TIMESTAMPTZ NOT NULL DEFAULT NOW(), ' '"updated_at" TIMESTAMPTZ NOT NULL DEFAULT NOW())' ) await self.submit_query(_CREATE_TABLE_QUERY, lock=hash("consumer_queue"))