Using AsyncIO with aiopg#
Hiku has several executors, previous examples were using
hiku.executors.sync.SyncExecutor
and they can also be used with
hiku.executors.threads.ThreadsExecutor
to execute query
concurrently using concurrent.futures.ThreadPoolExecutor
without any change in the graph definition.
But, to be able to load data using asyncio
library, all data
loading functions should be coroutines. We will translate one of the
previous examples to show how to use asyncio
and aiopg libraries.
Prerequisites#
Note
Source code of this example can be found on GitHub.
from sqlalchemy import MetaData, Table, Column
from sqlalchemy import Integer, String, ForeignKey, select
from sqlalchemy.sql.ddl import CreateTable
metadata = MetaData()
character_table = Table(
'character',
metadata,
Column('id', Integer, primary_key=True),
Column('name', String),
Column('species', String),
)
actor_table = Table(
'actor',
metadata,
Column('id', Integer, primary_key=True),
Column('name', String),
Column('character_id', ForeignKey('character.id'), nullable=False),
)
async with aiopg.sa.create_engine(db_dsn, loop=loop) as db_engine:
async with db_engine.acquire() as conn:
await conn.execute(CreateTable(character_table))
await conn.execute(CreateTable(actor_table))
await conn.execute(character_table.insert().values([
dict(id=1, name='James T. Kirk', species='Human'),
dict(id=2, name='Spock', species='Vulcan/Human'),
dict(id=3, name='Leonard McCoy', species='Human'),
]))
await conn.execute(actor_table.insert().values([
dict(id=1, character_id=1, name='William Shatner'),
dict(id=2, character_id=2, name='Leonard Nimoy'),
dict(id=3, character_id=3, name='DeForest Kelley'),
dict(id=4, character_id=1, name='Chris Pine'),
dict(id=5, character_id=2, name='Zachary Quinto'),
dict(id=6, character_id=3, name='Karl Urban'),
]))
Graph definition#
1from hiku.graph import Graph, Root, Node, Link, Field
2from hiku.types import TypeRef, Sequence
3from hiku.engine import pass_context
4from hiku.sources.aiopg import FieldsQuery, LinkQuery
5
6SA_ENGINE_KEY = 'sa-engine'
7
8character_query = FieldsQuery(SA_ENGINE_KEY, character_table)
9
10actor_query = FieldsQuery(SA_ENGINE_KEY, actor_table)
11
12character_to_actors_query = LinkQuery(
13 SA_ENGINE_KEY,
14 from_column=actor_table.c.character_id,
15 to_column=actor_table.c.id,
16)
17
18async def direct_link(ids):
19 return ids
20
21@pass_context
22async def to_characters_query(ctx):
23 query = select([character_table.c.id])
24 async with ctx[SA_ENGINE_KEY].acquire() as conn:
25 rows = await conn.execute(query)
26 return [row.id async for row in rows]
27
28@pass_context
29async def to_actors_query(ctx):
30 query = select([actor_table.c.id])
31 async with ctx[SA_ENGINE_KEY].acquire() as conn:
32 rows = await conn.execute(query)
33 return [row.id async for row in rows]
34
35GRAPH = Graph([
36 Node('Character', [
37 Field('id', None, character_query),
38 Field('name', None, character_query),
39 Field('species', None, character_query),
40 Link('actors', Sequence[TypeRef['Actor']], character_to_actors_query,
41 requires='id'),
42 ]),
43 Node('Actor', [
44 Field('id', None, actor_query),
45 Field('name', None, actor_query),
46 Field('character_id', None, actor_query),
47 Link('character', TypeRef['Character'],
48 direct_link, requires='character_id'),
49 ]),
50 Root([
51 Link('characters', Sequence[TypeRef['Character']],
52 to_characters_query, requires=None),
53 Link('actors', Sequence[TypeRef['Actor']],
54 to_actors_query, requires=None),
55 ]),
56])
Note that we are using hiku.sources.aiopg
source [4]
in our graph definition, instead of hiku.sources.sqlalchemy
.
All our custom data loading functions [18,22,29] are coroutine
functions now and using aiopg.sa.Engine
instead of
sqlalchemy.engine.Engine
to execute SQL queries.
Querying graph#
For testing purposes let’s define helper coroutine function execute
:
1import aiopg.sa
2
3from hiku.engine import Engine
4from hiku.result import denormalize
5from hiku.readers.simple import read
6from hiku.executors.asyncio import AsyncIOExecutor
7
8async def execute(hiku_engine, sa_engine, graph, query_string):
9 query = read(query_string)
10 result = await hiku_engine.execute(graph, query, {SA_ENGINE_KEY: sa_engine})
11 return denormalize(graph, result)
Note that hiku.engine.Engine.execute()
method [10]
returns “awaitable” object, when it is using with
hiku.executors.asyncio.AsyncIOExecutor
. Here is how it
should be constructed:
hiku_engine = Engine(AsyncIOExecutor(event_loop))
Testing one to many link:
async with aiopg.sa.create_engine(db_dsn, loop=event_loop) as sa_engine:
result = await execute(hiku_engine, sa_engine, GRAPH,
'[{:characters [:name {:actors [:name]}]}]')
assert result == {
'characters': [
{
'name': 'James T. Kirk',
'actors': [
{'name': 'William Shatner'},
{'name': 'Chris Pine'},
],
},
{
'name': 'Spock',
'actors': [
{'name': 'Leonard Nimoy'},
{'name': 'Zachary Quinto'},
],
},
{
'name': 'Leonard McCoy',
'actors': [
{'name': 'DeForest Kelley'},
{'name': 'Karl Urban'},
],
},
],
}
Testing many to one link:
async with aiopg.sa.create_engine(db_dsn, loop=event_loop) as sa_engine:
result = await execute(hiku_engine, sa_engine, GRAPH,
'[{:actors [:name {:character [:name]}]}]')
assert result == {
'actors': [
{
'name': 'William Shatner',
'character': {'name': 'James T. Kirk'},
},
{
'name': 'Leonard Nimoy',
'character': {'name': 'Spock'},
},
{
'name': 'DeForest Kelley',
'character': {'name': 'Leonard McCoy'},
},
{
'name': 'Chris Pine',
'character': {'name': 'James T. Kirk'},
},
{
'name': 'Zachary Quinto',
'character': {'name': 'Spock'},
},
{
'name': 'Karl Urban',
'character': {'name': 'Leonard McCoy'},
},
],
}