How to properly configure background postgres to notify listener with aiopg in aiohttp framework?

I am using aiohttp for websockets and postgresql for listening / notifying, with the aiopg driver being a transport queue between servers.
Part of my server setup:

def init(loop):
    ...
    app = Application(loop=loop)
    app.on_startup.append(connect_db)
    app.on_startup.append(start_background_tasks)
    app.on_cleanup.append(cleanup_background_tasks)
    return app


def create_app(loop):
    loop = asyncio.get_event_loop()
    app = init(loop)
    return app

      

Here's my signals:

dsn = 'dbname=test user=root password=test host=127.0.0.1'

async def connect_db(app):
    app['db'] = await aiopg.connect(dsn)
    return app

async def listen_events_from_db(app):
    cursos = await app['db'].cursor()
    await cursos.execute("LISTEN test")
    try:
        while True:
            msg = await app['db'].notifies.get()
            # Here will be a coroutine which will send msg to connected websockets
            print('msg: ', msg.payload)
    except asyncio.CancelledError:
        pass
    finally:
        app['db'].close()

async def start_background_tasks(app):
    app['psql_listener'] = app.loop.create_task(listen_events_from_db(app))


async def cleanup_background_tasks(app):
    app['psql_listener'].cancel()
    await app['psql_listener']

      

As per the docs, I'm finishing a background task to try / close and close the task in on_cleanup

. But in this case, the server fails to start with this exception:

...
raise CancelledError
concurrent.futures._base.CancelledError  

      

If I remove the signal on_shutdown

and catch RuntimeError

it then everything works fine, except for one warning after completion of the destruction listen_events_from_db()

that is still pending. I think this is because of the while loop.
Here's a fixed task:

async def listen_events_from_db(app):
    cursos = await app['db'].cursor()
    await cursos.execute("LISTEN test")
    try:
        while True:
            msg = await app['db'].notifies.get()
            print('msg: ', msg.payload)
    except RuntimeError:
        pass
    finally:
        app['db'].close()

      

I am new to asyncio and aiohttp framework and feel like I am doing it wrong. Please point me in the right direction.

+3


source to share





All Articles