Using AsyncIO with aiopg#

Hiku has several executors, previous examples were using hiku.executors.sync.SyncExecutor and they can also be used with hiku.executors.threads.ThreadsExecutor to execute query concurrently using concurrent.futures.ThreadPoolExecutor without any change in the graph definition.

But, to be able to load data using asyncio library, all data loading functions should be coroutines. We will translate one of the previous examples to show how to use asyncio and aiopg libraries.

Prerequisites#

Note

Source code of this example can be found on GitHub.

from sqlalchemy import MetaData, Table, Column
from sqlalchemy import Integer, String, ForeignKey, select
from sqlalchemy.sql.ddl import CreateTable

metadata = MetaData()

character_table = Table(
    'character',
    metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String),
    Column('species', String),
)

actor_table = Table(
    'actor',
    metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String),
    Column('character_id', ForeignKey('character.id'), nullable=False),
)
async with aiopg.sa.create_engine(db_dsn, loop=loop) as db_engine:
    async with db_engine.acquire() as conn:
        await conn.execute(CreateTable(character_table))
        await conn.execute(CreateTable(actor_table))

        await conn.execute(character_table.insert().values([
            dict(id=1, name='James T. Kirk', species='Human'),
            dict(id=2, name='Spock', species='Vulcan/Human'),
            dict(id=3, name='Leonard McCoy', species='Human'),
        ]))
        await conn.execute(actor_table.insert().values([
            dict(id=1, character_id=1, name='William Shatner'),
            dict(id=2, character_id=2, name='Leonard Nimoy'),
            dict(id=3, character_id=3, name='DeForest Kelley'),
            dict(id=4, character_id=1, name='Chris Pine'),
            dict(id=5, character_id=2, name='Zachary Quinto'),
            dict(id=6, character_id=3, name='Karl Urban'),
        ]))

Graph definition#

 1from hiku.graph import Graph, Root, Node, Link, Field
 2from hiku.types import TypeRef, Sequence
 3from hiku.engine import pass_context
 4from hiku.sources.aiopg import FieldsQuery, LinkQuery
 5
 6SA_ENGINE_KEY = 'sa-engine'
 7
 8character_query = FieldsQuery(SA_ENGINE_KEY, character_table)
 9
10actor_query = FieldsQuery(SA_ENGINE_KEY, actor_table)
11
12character_to_actors_query = LinkQuery(
13    SA_ENGINE_KEY,
14    from_column=actor_table.c.character_id,
15    to_column=actor_table.c.id,
16)
17
18async def direct_link(ids):
19    return ids
20
21@pass_context
22async def to_characters_query(ctx):
23    query = select([character_table.c.id])
24    async with ctx[SA_ENGINE_KEY].acquire() as conn:
25        rows = await conn.execute(query)
26        return [row.id async for row in rows]
27
28@pass_context
29async def to_actors_query(ctx):
30    query = select([actor_table.c.id])
31    async with ctx[SA_ENGINE_KEY].acquire() as conn:
32        rows = await conn.execute(query)
33        return [row.id async for row in rows]
34
35GRAPH = Graph([
36    Node('Character', [
37        Field('id', None, character_query),
38        Field('name', None, character_query),
39        Field('species', None, character_query),
40        Link('actors', Sequence[TypeRef['Actor']], character_to_actors_query,
41             requires='id'),
42    ]),
43    Node('Actor', [
44        Field('id', None, actor_query),
45        Field('name', None, actor_query),
46        Field('character_id', None, actor_query),
47        Link('character', TypeRef['Character'],
48             direct_link, requires='character_id'),
49    ]),
50    Root([
51        Link('characters', Sequence[TypeRef['Character']],
52             to_characters_query, requires=None),
53        Link('actors', Sequence[TypeRef['Actor']],
54             to_actors_query, requires=None),
55    ]),
56])

Note that we are using hiku.sources.aiopg source [4] in our graph definition, instead of hiku.sources.sqlalchemy.

All our custom data loading functions [18,22,29] are coroutine functions now and using aiopg.sa.Engine instead of sqlalchemy.engine.Engine to execute SQL queries.

Querying graph#

For testing purposes let’s define helper coroutine function execute:

 1import aiopg.sa
 2
 3from hiku.engine import Engine
 4from hiku.result import denormalize
 5from hiku.readers.simple import read
 6from hiku.executors.asyncio import AsyncIOExecutor
 7
 8async def execute(hiku_engine, sa_engine, graph, query_string):
 9    query = read(query_string)
10    result = await hiku_engine.execute(graph, query, {SA_ENGINE_KEY: sa_engine})
11    return denormalize(graph, result)

Note that hiku.engine.Engine.execute() method [10] returns “awaitable” object, when it is using with hiku.executors.asyncio.AsyncIOExecutor. Here is how it should be constructed:

hiku_engine = Engine(AsyncIOExecutor(event_loop))

Testing one to many link:

async with aiopg.sa.create_engine(db_dsn, loop=event_loop) as sa_engine:
    result = await execute(hiku_engine, sa_engine, GRAPH,
                           '[{:characters [:name {:actors [:name]}]}]')
    assert result == {
        'characters': [
            {
                'name': 'James T. Kirk',
                'actors': [
                    {'name': 'William Shatner'},
                    {'name': 'Chris Pine'},
                ],
            },
            {
                'name': 'Spock',
                'actors': [
                    {'name': 'Leonard Nimoy'},
                    {'name': 'Zachary Quinto'},
                ],
            },
            {
                'name': 'Leonard McCoy',
                'actors': [
                    {'name': 'DeForest Kelley'},
                    {'name': 'Karl Urban'},
                ],
            },
        ],
    }

Testing many to one link:

async with aiopg.sa.create_engine(db_dsn, loop=event_loop) as sa_engine:
    result = await execute(hiku_engine, sa_engine, GRAPH,
                           '[{:actors [:name {:character [:name]}]}]')
    assert result == {
        'actors': [
            {
                'name': 'William Shatner',
                'character': {'name': 'James T. Kirk'},
            },
            {
                'name': 'Leonard Nimoy',
                'character': {'name': 'Spock'},
            },
            {
                'name': 'DeForest Kelley',
                'character': {'name': 'Leonard McCoy'},
            },
            {
                'name': 'Chris Pine',
                'character': {'name': 'James T. Kirk'},
            },
            {
                'name': 'Zachary Quinto',
                'character': {'name': 'Spock'},
            },
            {
                'name': 'Karl Urban',
                'character': {'name': 'Leonard McCoy'},
            },
        ],
    }