Using AsyncIO with aiopg¶
Hiku has several executors, previous examples were using
hiku.executors.sync.SyncExecutor and they can also be used with
hiku.executors.threads.ThreadsExecutor to execute query
concurrently using concurrent.futures.ThreadPoolExecutor
without any change in the graph definition.
But, to be able to load data using asyncio library, all data
loading functions should be coroutines. We will translate one of the
previous examples to show how to use asyncio
and aiopg libraries.
Prerequisites¶
Note
Source code of this example can be found on GitHub.
from sqlalchemy import MetaData, Table, Column
from sqlalchemy import Integer, String, ForeignKey, select
from sqlalchemy.sql.ddl import CreateTable
metadata = MetaData()
character_table = Table(
"character",
metadata,
Column("id", Integer, primary_key=True),
Column("name", String),
Column("species", String),
)
actor_table = Table(
"actor",
metadata,
Column("id", Integer, primary_key=True),
Column("name", String),
Column("character_id", ForeignKey("character.id"), nullable=False),
)
c def setup_db(db_dsn, *, loop):
async with aiopg.sa.create_engine(db_dsn, loop=loop) as db_engine:
async with db_engine.acquire() as conn:
await conn.execute(CreateTable(character_table))
await conn.execute(CreateTable(actor_table))
await conn.execute(
character_table.insert().values(
[
dict(id=1, name="James T. Kirk", species="Human"),
dict(id=2, name="Spock", species="Vulcan/Human"),
dict(id=3, name="Leonard McCoy", species="Human"),
]
)
)
await conn.execute(
actor_table.insert().values(
Graph definition¶
1
2 db_dsn = "postgresql://postgres:postgres@postgres:5432/{}".format(db_name)
3 loop.run_until_complete(setup_db(db_dsn, loop=loop))
4
5 def fin():
6 loop.run_until_complete(drop_db(pg_dsn, db_name, loop=loop))
7
8 request.addfinalizer(fin)
9 return db_dsn
10
11
12# define graph
13
14from hiku.graph import Graph, Root, Node, Link, Field
15from hiku.types import TypeRef, Sequence
16from hiku.engine import pass_context
17from hiku.sources.aiopg import FieldsQuery, LinkQuery
18
19SA_ENGINE_KEY = "sa-engine"
20
21character_query = FieldsQuery(SA_ENGINE_KEY, character_table)
22
23actor_query = FieldsQuery(SA_ENGINE_KEY, actor_table)
24
25character_to_actors_query = LinkQuery(
26 SA_ENGINE_KEY,
27 from_column=actor_table.c.character_id,
28 to_column=actor_table.c.id,
29)
30
31
32async def direct_link(ids):
33 return ids
34
35
36@pass_context
37async def to_characters_query(ctx):
38 query = select([character_table.c.id])
39 async with ctx[SA_ENGINE_KEY].acquire() as conn:
40 rows = await conn.execute(query)
41 return [row.id async for row in rows]
42
43
44@pass_context
45async def to_actors_query(ctx):
46 query = select([actor_table.c.id])
47 async with ctx[SA_ENGINE_KEY].acquire() as conn:
48 rows = await conn.execute(query)
49 return [row.id async for row in rows]
50
51
52GRAPH = Graph(
53 [
54 Node(
55 "Character",
56 [
Note that we are using hiku.sources.aiopg source [4]
in our graph definition, instead of hiku.sources.sqlalchemy.
All our custom data loading functions [18,22,29] are coroutine
functions now and using aiopg.sa.Engine instead of
sqlalchemy.engine.Engine to execute SQL queries.
Querying graph¶
For testing purposes let’s define helper coroutine function execute:
1 Link(
2 "actors",
3 Sequence[TypeRef["Actor"]],
4 character_to_actors_query,
5 requires="id",
6 ),
7 ],
8 ),
9 Node(
10 "Actor",
11 [
Note that hiku.engine.Engine.execute() method [10]
returns “awaitable” object, when it is using with
hiku.executors.asyncio.AsyncIOExecutor. Here is how it
should be constructed:
Link(
Testing one to many link:
"character",
TypeRef["Character"],
direct_link,
requires="character_id",
),
],
),
Root(
[
Link(
"characters",
Sequence[TypeRef["Character"]],
to_characters_query,
requires=None,
),
Link(
"actors", Sequence[TypeRef["Actor"]], to_actors_query, requires=None
),
]
),
]
st graph
rt aiopg.sa
hiku.engine import Engine
Testing many to one link:
c def execute(hiku_engine, sa_engine, graph, query_string):
query = read(query_string)
result = await hiku_engine.execute(graph, query, {SA_ENGINE_KEY: sa_engine})
return denormalize(graph, result)
est.mark.asyncio(forbid_global_loop=True)
c def test_character_to_actors(db_dsn, event_loop):
hiku_engine = Engine(AsyncIOExecutor(event_loop))
async with aiopg.sa.create_engine(db_dsn, loop=event_loop) as sa_engine:
result = await execute(
hiku_engine,
sa_engine,
GRAPH,
"{ characters { name { { actors { name } } } } }",
)
assert result == {
"characters": [
{
"name": "James T. Kirk",
"actors": [
{"name": "William Shatner"},
{"name": "Chris Pine"},
],
},
{
"name": "Spock",
"actors": [
{"name": "Leonard Nimoy"},
{"name": "Zachary Quinto"},