Using AsyncIO with aiopg

Hiku has several executors, previous examples were using hiku.executors.sync.SyncExecutor and they can also be used with hiku.executors.threads.ThreadsExecutor to execute query concurrently using concurrent.futures.ThreadPoolExecutor without any change in the graph definition.

But, to be able to load data using asyncio library, all data loading functions should be coroutines. We will translate one of the previous examples to show how to use asyncio and aiopg libraries.

Prerequisites

Note

Source code of this example can be found on GitHub.

from sqlalchemy import MetaData, Table, Column
from sqlalchemy import Integer, String, ForeignKey, select
from sqlalchemy.sql.ddl import CreateTable

metadata = MetaData()

character_table = Table(
    "character",
    metadata,
    Column("id", Integer, primary_key=True),
    Column("name", String),
    Column("species", String),
)

actor_table = Table(
    "actor",
    metadata,
    Column("id", Integer, primary_key=True),
    Column("name", String),
    Column("character_id", ForeignKey("character.id"), nullable=False),
)

c def setup_db(db_dsn, *, loop):
async with aiopg.sa.create_engine(db_dsn, loop=loop) as db_engine:
    async with db_engine.acquire() as conn:
        await conn.execute(CreateTable(character_table))
        await conn.execute(CreateTable(actor_table))

        await conn.execute(
            character_table.insert().values(
                [
                    dict(id=1, name="James T. Kirk", species="Human"),
                    dict(id=2, name="Spock", species="Vulcan/Human"),
                    dict(id=3, name="Leonard McCoy", species="Human"),
                ]
            )
        )
        await conn.execute(
            actor_table.insert().values(

Graph definition

 1
 2    db_dsn = "postgresql://postgres:postgres@postgres:5432/{}".format(db_name)
 3    loop.run_until_complete(setup_db(db_dsn, loop=loop))
 4
 5    def fin():
 6        loop.run_until_complete(drop_db(pg_dsn, db_name, loop=loop))
 7
 8    request.addfinalizer(fin)
 9    return db_dsn
10
11
12# define graph
13
14from hiku.graph import Graph, Root, Node, Link, Field
15from hiku.types import TypeRef, Sequence
16from hiku.engine import pass_context
17from hiku.sources.aiopg import FieldsQuery, LinkQuery
18
19SA_ENGINE_KEY = "sa-engine"
20
21character_query = FieldsQuery(SA_ENGINE_KEY, character_table)
22
23actor_query = FieldsQuery(SA_ENGINE_KEY, actor_table)
24
25character_to_actors_query = LinkQuery(
26    SA_ENGINE_KEY,
27    from_column=actor_table.c.character_id,
28    to_column=actor_table.c.id,
29)
30
31
32async def direct_link(ids):
33    return ids
34
35
36@pass_context
37async def to_characters_query(ctx):
38    query = select([character_table.c.id])
39    async with ctx[SA_ENGINE_KEY].acquire() as conn:
40        rows = await conn.execute(query)
41        return [row.id async for row in rows]
42
43
44@pass_context
45async def to_actors_query(ctx):
46    query = select([actor_table.c.id])
47    async with ctx[SA_ENGINE_KEY].acquire() as conn:
48        rows = await conn.execute(query)
49        return [row.id async for row in rows]
50
51
52GRAPH = Graph(
53    [
54        Node(
55            "Character",
56            [

Note that we are using hiku.sources.aiopg source [4] in our graph definition, instead of hiku.sources.sqlalchemy.

All our custom data loading functions [18,22,29] are coroutine functions now and using aiopg.sa.Engine instead of sqlalchemy.engine.Engine to execute SQL queries.

Querying graph

For testing purposes let’s define helper coroutine function execute:

 1                Link(
 2                    "actors",
 3                    Sequence[TypeRef["Actor"]],
 4                    character_to_actors_query,
 5                    requires="id",
 6                ),
 7            ],
 8        ),
 9        Node(
10            "Actor",
11            [

Note that hiku.engine.Engine.execute() method [10] returns “awaitable” object, when it is using with hiku.executors.asyncio.AsyncIOExecutor. Here is how it should be constructed:

            Link(

Testing one to many link:

                "character",
                TypeRef["Character"],
                direct_link,
                requires="character_id",
            ),
        ],
    ),
    Root(
        [
            Link(
                "characters",
                Sequence[TypeRef["Character"]],
                to_characters_query,
                requires=None,
            ),
            Link(
                "actors", Sequence[TypeRef["Actor"]], to_actors_query, requires=None
            ),
        ]
    ),
]


st graph

rt aiopg.sa

 hiku.engine import Engine

Testing many to one link:


c def execute(hiku_engine, sa_engine, graph, query_string):
query = read(query_string)
result = await hiku_engine.execute(graph, query, {SA_ENGINE_KEY: sa_engine})
return denormalize(graph, result)


est.mark.asyncio(forbid_global_loop=True)
c def test_character_to_actors(db_dsn, event_loop):
hiku_engine = Engine(AsyncIOExecutor(event_loop))
async with aiopg.sa.create_engine(db_dsn, loop=event_loop) as sa_engine:
    result = await execute(
        hiku_engine,
        sa_engine,
        GRAPH,
        "{ characters { name { { actors { name } } } } }",
    )
    assert result == {
        "characters": [
            {
                "name": "James T. Kirk",
                "actors": [
                    {"name": "William Shatner"},
                    {"name": "Chris Pine"},
                ],
            },
            {
                "name": "Spock",
                "actors": [
                    {"name": "Leonard Nimoy"},
                    {"name": "Zachary Quinto"},