Tracing#
pjrpc supports client and server metrics collection.
client#
The following example illustrate opentracing integration.
from typing import Any, Mapping, Optional
import opentracing
from opentracing import propagation, tags
from pjrpc.client import MiddlewareHandler
from pjrpc.client.backend import requests as pjrpc_client
from pjrpc.common import AbstractRequest, AbstractResponse, BatchRequest, Request
tracer = opentracing.global_tracer()
def tracing_middleware(
request: AbstractRequest,
request_kwargs: Mapping[str, Any],
/,
handler: MiddlewareHandler,
) -> Optional[AbstractResponse]:
if isinstance(request, Request):
span = tracer.start_active_span(f'jsonrpc.{request.method}').span
span.set_tag(tags.COMPONENT, 'pjrpc.client')
span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_CLIENT)
if http_headers := request_kwargs.get('headers', {}):
tracer.inject(
span_context=span,
format=propagation.Format.HTTP_HEADERS,
carrier=http_headers,
)
response = handler(request, request_kwargs)
if response.is_error:
span = tracer.active_span
span.set_tag(tags.ERROR, response.is_error)
span.set_tag('jsonrpc.error_code', response.unwrap_error().code)
span.set_tag('jsonrpc.error_message', response.unwrap_error().message)
span.finish()
elif isinstance(request, BatchRequest):
response = handler(request, request_kwargs)
else:
raise AssertionError("unreachable")
return response
client = pjrpc_client.Client(
'http://localhost:8080/api/v1',
middlewares=[
tracing_middleware,
],
)
result = client.proxy.sum(1, 2)
server#
On the server side you need to implement simple functions (middlewares) and pass them to the JSON-RPC application. The following example illustrate prometheus metrics collection:
import asyncio
import opentracing
from aiohttp import web
from aiohttp.typedefs import Handler as HttpHandler
from opentracing import tags
import pjrpc.server
from pjrpc import Request, Response
from pjrpc.server import AsyncHandlerType
from pjrpc.server.integration import aiohttp
methods = pjrpc.server.MethodRegistry()
@methods.add(pass_context='context')
async def sum(context: web.Request, a: int, b: int) -> int:
print("method started")
await asyncio.sleep(1)
print("method finished")
return a + b
async def jsonrpc_tracing_middleware(request: Request, context: web.Request, handler: AsyncHandlerType) -> Response:
tracer = opentracing.global_tracer()
span = tracer.start_span(f'jsonrpc.{request.method}')
span.set_tag(tags.COMPONENT, 'pjrpc')
span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_SERVER)
span.set_tag('jsonrpc.version', request.version)
span.set_tag('jsonrpc.id', request.id)
span.set_tag('jsonrpc.method', request.method)
with tracer.scope_manager.activate(span, finish_on_close=True):
if response := await handler(request, context):
if response.is_error:
span.set_tag('jsonrpc.error_code', response.error.code)
span.set_tag('jsonrpc.error_message', response.error.message)
span.set_tag(tags.ERROR, True)
else:
span.set_tag(tags.ERROR, False)
return response
jsonrpc_app = aiohttp.Application(
'/api/v1',
middlewares=[
jsonrpc_tracing_middleware,
],
)
jsonrpc_app.add_methods(methods)
if __name__ == "__main__":
web.run_app(jsonrpc_app.http_app, host='localhost', port=8080)