Extensions¶
Extensions allow you to add custom functionality to your graph.
Extensions are classes that inherit from hiku.extensions.Extension.
Each extension has a set of methods that are called at different stages of graph processing.
Here are all the methods that can be implemented:
on_init()- when schema is createdon_operation()- when query is executed by the schemaon_parse()- when query string is parsed into ast and the into query Nodeon_validate()- when query is validatedon_execute()- when query is executed by engine
Custom extension¶
To write your own extension you need to inherit from hiku.extensions.Extension
and implement methods that you need.
Note
You can pass either instance or a class of the Extension to the schema’s extensions argument.
At runtime, if extension is a class, it will be instantiated by hiku.
Here is an example of custom extension that measures query execution time:
from typing import Iterator
from hiku.extensions.base_extension import Extension
from hiku.context import ExecutionContext
class TimeItExtension(Extension):
def on_execute(self, execution_context: ExecutionContext) -> Iterator[None]:
start = time.perf_counter()
yield
print('Query execution took {:.3f} seconds'.format(time.perf_counter() - start))
schema = Schema(graph, extensions=[TimeItExtension()])
In this example we use hiku.extensions.Extension as a base class, and implement
on_execute() method.
This method is called when query is executed by the engine. So we measure the time before and after the execution and print the result.
Built-in extensions¶
QueryParseCache¶
Hiku uses graphql-core library to parse queries. But parsing same query again and again is a waste of resources and time.
Hiku provides a way to cache parsed queries. To enable it, you need to use QueryParseCache extensions.
schema = Schema(
graph,
extensions=[QueryParserCache(maxsize=50)],
)
Note than for cache to be effective, you need to separate query and variables, otherwise cache will be useless.
Query with inlined variables is bad for caching.
query User {
user(id: 1) {
name
photo(size: 50)
}
}
Query with separated variables is good for caching.
query User($id: ID!, $photoSize: Int) {
user(id: $id) {
name
photo(size: $photoSize)
}
}
QueryParseCache exposes metrics for query parsing time:
Gauge('hiku_query_cache_hits', 'Query cache hits')
Gauge('hiku_query_cache_misses', 'Query cache misses')
QueryValidationCache¶
QueryValidationCache caches query validation result.
QueryDepthValidator¶
QueryDepthValidator validates query depth. If query depth is greater than max_depth argument, it returns error
which says that query depth is too big.
schema = Schema(
graph,
extensions=[QueryDepthValidator(max_depth=10)],
)
PrometheusMetrics¶
PrometheusMetrics is a wrapper around GraphMetrics visitor. It exposes metrics for query execution time.
from hiku.extensions.prometheus import PrometheusMetrics
schema = Schema(
graph,
extensions=[PrometheusMetrics('user_graph')],
)
Custom metric¶
By default, PrometheusMetrics uses built-in metric graph_field_time:
Summary("graph_field_time", "Graph field time (seconds)", ["graph", "node", "field"])
But you can pass your custom metric to PrometheusMetrics by using metric argument:
from prometheus_client import Gauge
from hiku.extensions.prometheus import PrometheusMetrics
metric = Histogram("my_custom_metric", "Graph field time (seconds)", ["graph", "node", "field"])
schema = Schema(
graph,
extensions=[PrometheusMetrics('user_graph', metric=metric)],
)
Custom labels¶
PrometheusMetrics has ctx_var argument, which allows to pass custom ContextVar variable,
which will be set to execution_context.context. This can be used for example to use this context to expose different lables:
Here we adding new label os to the metric, and we want to use the os value from context:
from prometheus_client import Gauge
from contextvars import ContextVar
from hiku.extensions.prometheus import PrometheusMetrics
metric = Histogram("my_custom_metric", "Graph field time (seconds)", ["graph", "node", "field", "os"])
metrics_ctx = ContextVar('os')
class CustomGraphqMetrics(GraphMetrics):
def get_labels(
self, graph_name: str, node_name: str, field_name: str, ctx: dict
) -> list:
return [graph_name, node_name, field_name, ctx.get('os', 'unknown')]
schema = Schema(
graph,
extensions=[
PrometheusMetrics(
'user_graph',
metric=metric,
ctx_var=metrics_ctx,
transformer_cls=CustomGraphqMetrics
)
],
)
endpoint = GraphQLEndpoint(schema)
@app.post('/graphql')
def graphql(request: Request):
os = get_os(request)
return endpoint.dispatch(request.body, context={'os': os})
PrometheusMetricsAsync¶
PrometheusMetricsAsync is a wrapper around AsyncGraphMetrics visitor. It exposes metrics for query execution time.
CustomContext¶
CustomContext allows to define custom context for query execution.
If you do now want to pass context to dispatch method on every query, you can use hiku.extensions.context.CustomContext extension,
which accepts a callback function, which will be called on every query execution and should return a context object:
db = Database()
def get_context(execution_context: ExecutionContext) -> dict:
return {'db': db}
schema = Schema(
graph,
extensions=[CustomContext(get_context)]
)
result = schema.execute_sync(query)