Using GraphQL¶
Note
Hiku is a general-purpose library to expose data as a graph of linked nodes. And it is possible to implement GraphQL server using Hiku.
In order to parse GraphQL queries you will need to install graphql-core
library:
$ pip install graphql-core
To implement GraphQL server we will have to add GraphQL introspection into our graph and to add GraphQL query execution process:
read GraphQL query
validate query against graph definition
execute query using Engine
denormalize result into simple data structure
serialize result and send back to the client
Query¶
To create Query graph:
query_graph = Graph([
Root([
Field('value', String, value_func),
]),
])
Mutation¶
GraphQL schema may have several root object types for each operation type:
query, mutation, subscription… Hiku has only one Root
node to represent entry point into a graph. So, to implement mutations, we will
need a second Root node, and second graph, which is
identical to the query graph, except Root node:
query_graph = Graph([
Root([
Field('value', String, value_func),
]),
])
mutation_graph = Graph(query_graph.nodes + [
Root([
Field('action', Boolean, action_func),
]),
])
Schema¶
In order to expose Hiku graph as GraphQL schema, you will need to create a Schema object:
from hiku.graph import Graph, Root, Field
from hiku.types import String
from hiku.executors.sync import SyncExecutor
def value_func(*_):
return 'Hello, World!'
graph = Graph([
Root([
Field('value', String, value_func),
]),
])
schema = Schema(SyncExecutor(), graph)
To learn more about schema api, see Schema docs.
Endpoint¶
Hiku has a so-called GraphQL endpoint, which can be used to expose Hiku graph as http endpoint.
Here is an example of how to use it:
from hiku.graph import Graph, Root, Field
from hiku.types import String
from hiku.schema import Schema
from hiku.executors.sync import SyncExecutor
from hiku.endpoint.graphql import GraphQLEndpoint
def say_hello(fields):
return ['Hello World!' for _ in fields]
QUERY_GRAPH = Graph([
Root([Field('hello', String, say_hello)]),
])
schema = Schema(SyncExecutor(), QUERY_GRAPH)
endpoint = GraphQLEndpoint(schema)
assert endpoint.dispatch({
'query': "{ hello }",
'variables': None,
'operationName': "GetHello",
}) == {
'data': {
'hello': 'Hello World!',
},
}
from hiku.types import String
from hiku.graph import Graph, Root, Field
from hike.schema import Schema
from hiku.endpoint.graphql import AsyncGraphQLEndpoint
from hiku.executors.asyncio import AsyncIOExecutor
async def say_hello(fields):
return ['Hello World!' for _ in fields]
QUERY_GRAPH = Graph([
Root([Field('hello', String, say_hello)]),
])
schema = Schema(AsyncIOExecutor(), QUERY_GRAPH)
endpoint = AsyncGraphQLEndpoint(schema)
assert await endpoint.dispatch({
'query': "{ hello }",
'variables': None,
'operationName': "GetHello",
}) == {
'data': {
'hello': 'Hello World!',
},
}
Sync endpoint¶
- class hiku.endpoint.graphql.GraphQLEndpoint(schema: Schema, batching: bool = False)
Async endpoint¶
- class hiku.endpoint.graphql.AsyncGraphQLEndpoint(schema: Schema, batching: bool = False)
Introspection¶
By default GraphQL introspection is enabled, but you can disable it by setting introspection argument to False:
endpoint = GraphQLEndpoint(schema, introspection=False)
Validation¶
By default GraphQL validation is enabled, but you can disable it by setting validation argument to False:
endpoint = GraphQLEndpoint(schema, validation=False)
Context¶
GraphQL endpoint has a context as a second argument, which is passed to the query execution process.
db = Database()
endpoint = GraphQLEndpoint(schema)
result = endpoint.dispatch(query, context={'db': db})
If you do now want to pass context to dispatch method on every query, you can use hiku.extensions.context.CustomContext extension,
which accepts a callback function, which will be called on every query execution and should return a context object:
db = Database()
def get_context(execution_context: ExecutionContext) -> dict:
return {'db': db}
schema = Schema(
graph,
extensions=[CustomContext(get_context)]
)
endpoint = GraphQLEndpoint(schema)
result = endpoint.dispatch(query)
Batching¶
GraphQL endpoint has a batching option, which is disabled by default. When enabled, it will collect all queries and execute them in one batch:
endpoint = GraphQLEndpoint(schema, batching=True)
assert endpoint.dispatch({
"query": ["{ a }", "{ b }"],
}) == {"data": ["a", "b"]}
Introspection¶
Note
Fields with underscore-prefixed names are hidden in GraphQL introspection.
- class hiku.introspection.graphql.GraphQLIntrospection(query_graph: Graph, mutation_graph: Graph | None = None)¶
Adds GraphQL introspection into synchronous graph
Example:
from hiku.graph import apply from hiku.introspection.graphql import GraphQLIntrospection graph = apply(graph, [GraphQLIntrospection(graph)])
- Parameters:
query_graph – graph, where Root node represents Query root operation type
mutation_graph – graph, where Root node represents Mutation root operation type
- class hiku.introspection.graphql.AsyncGraphQLIntrospection(query_graph: Graph, mutation_graph: Graph | None = None)¶
Adds GraphQL introspection into asynchronous graph
Example:
from hiku.graph import apply from hiku.introspection.graphql import AsyncGraphQLIntrospection graph = apply(graph, [AsyncGraphQLIntrospection(graph)])
- Parameters:
query_graph – graph, where Root node represents Query root operation type
mutation_graph – graph, where Root node represents Mutation root operation type
Incompatible with GraphQL types are represented as hiku.types.Any
type.
Record data types are represented as interfaces and input objects with
distinct prefixes. Given these data types:
graph = Graph([...], data_types={'Foo': Record[{'x': Integer}]})
You will see Foo data type via introspection as:
interface IFoo {
x: Integer
}
input IOFoo {
x: Integer
}
This is because Hiku’s data types universally can be used in field and option definitions, as long as they don’t have references to nodes.
Reading¶
There are two options:
read()simple queries, when only query operations are expected
read_operation(), when different operations are expected: queries, mutations, etc.
hiku.readers.graphql¶
Support for queries encoded using GraphQL syntax.
- class hiku.readers.graphql.Operation(type_: OperationType, query: Node, name: str | None = None)¶
Represents requested GraphQL operation
- type¶
type of the operation
- query¶
operation’s query
- name¶
optional name of the operation
- class hiku.readers.graphql.OperationType(value)¶
Enumerates GraphQL operation types
- QUERY = OperationType.QUERY¶
query operation
- MUTATION = OperationType.MUTATION¶
mutation operation
- SUBSCRIPTION = OperationType.SUBSCRIPTION¶
subscription operation
- hiku.readers.graphql.read(src: str, variables: Dict | None = None, operation_name: str | None = None) Node¶
Reads a query from the GraphQL document
Example:
query = read('{ foo bar }') result = engine.execute(graph, query)
- Parameters:
- Returns:
hiku.query.Node, ready to execute query object
- hiku.readers.graphql.read_operation(src: str | DocumentNode, variables: Dict | ImmutableDict | None = None, operation_name: str | None = None) Operation¶
Reads an operation from the GraphQL document
Example:
op = read_operation('{ foo bar }') if op.type is OperationType.QUERY: result = engine.execute(op.query, query_graph)
- Returns:
Validation¶
As every other query, GraphQL queries should be validated and errors can be sent back to the client:
from hiku.validate.query import validate
def handler(request):
... # read
errors = validate(graph, query)
if errors:
return {'errors': [{'message': e} for e in errors]}
... # execute
Execution¶
Depending on operation type, you will execute query against one graph or another:
if op.type is OperationType.QUERY:
result = engine.execute_query(query_graph, op.query)
elif op.type is OperationType.MUTATION:
result = engine.execute_mutation(mutation_graph, op.query)
else:
return {'errors': [{'message': ('Unsupported operation type: {!r}'
.format(op.type))}]}
Denormalization¶
Most common serialization format for GraphQL is JSON. But in order to serialize execution result into JSON, it should be denormalized, to replace references (possibly cyclic) with actual data:
from hiku.result import denormalize
def handler(request):
... # execute
result = {'data': denormalize(graph, result)}
return jsonify(result)