Examples¶
aio_pika client¶
import asyncio
import pjrpc
from pjrpc.client.backend import aio_pika as pjrpc_client
async def main():
client = pjrpc_client.Client('amqp://guest:guest@localhost:5672/v1', 'jsonrpc')
await client.connect()
response: pjrpc.Response = await client.send(pjrpc.Request('sum', params=[1, 2], id=1))
print(f"1 + 2 = {response.result}")
result = await client('sum', a=1, b=2)
print(f"1 + 2 = {result}")
result = await client.proxy.sum(1, 2)
print(f"1 + 2 = {result}")
await client.notify('tick')
if __name__ == "__main__":
asyncio.run(main())
aio_pika server¶
import asyncio
import uuid
import aio_pika
import pjrpc
from pjrpc.server.integration import aio_pika as integration
methods = pjrpc.server.MethodRegistry()
@methods.add(context='message')
def add_user(message: aio_pika.IncomingMessage, user: dict):
user_id = uuid.uuid4().hex
return {'id': user_id, **user}
executor = integration.Executor('amqp://guest:guest@localhost:5672/v1', queue_name='jsonrpc')
executor.dispatcher.add_methods(methods)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(executor.start())
try:
loop.run_forever()
finally:
loop.run_until_complete(executor.shutdown())
aiohttp class-based handler¶
import uuid
from aiohttp import web
import pjrpc.server
from pjrpc.server.integration import aiohttp
methods = pjrpc.server.MethodRegistry()
@methods.view(context='request', prefix='user')
class UserView(pjrpc.server.ViewMixin):
def __init__(self, request: web.Request):
super().__init__()
self._users = request.app['users']
async def add(self, user: dict):
user_id = uuid.uuid4().hex
self._users[user_id] = user
return {'id': user_id, **user}
async def get(self, user_id: str):
user = self._users.get(user_id)
if not user:
pjrpc.exc.JsonRpcError(code=1, message='not found')
return user
jsonrpc_app = aiohttp.Application('/api/v1')
jsonrpc_app.dispatcher.add_methods(methods)
jsonrpc_app.app['users'] = {}
if __name__ == "__main__":
web.run_app(jsonrpc_app.app, host='localhost', port=8080)
aiohttp client¶
import asyncio
import pjrpc
from pjrpc.client.backend import aiohttp as pjrpc_client
async def main():
async with pjrpc_client.Client('http://localhost/api/v1') as client:
response = await client.send(pjrpc.Request('sum', params=[1, 2], id=1))
print(f"1 + 2 = {response.result}")
result = await client('sum', a=1, b=2)
print(f"1 + 2 = {result}")
result = await client.proxy.sum(1, 2)
print(f"1 + 2 = {result}")
await client.notify('tick')
asyncio.run(main())
aiohttp client batch request¶
import asyncio
import pjrpc
from pjrpc.client.backend import aiohttp as pjrpc_client
async def main():
async with pjrpc_client.Client('http://localhost:8080/api/v1') as client:
batch_response = await client.batch.send(
pjrpc.BatchRequest(
pjrpc.Request('sum', [2, 2], id=1),
pjrpc.Request('sub', [2, 2], id=2),
pjrpc.Request('div', [2, 2], id=3),
pjrpc.Request('mult', [2, 2], id=4),
),
)
print(f"2 + 2 = {batch_response[0].result}")
print(f"2 - 2 = {batch_response[1].result}")
print(f"2 / 2 = {batch_response[2].result}")
print(f"2 * 2 = {batch_response[3].result}")
result = await client.batch('sum', 2, 2)('sub', 2, 2)('div', 2, 2)('mult', 2, 2).call()
print(f"2 + 2 = {result[0]}")
print(f"2 - 2 = {result[1]}")
print(f"2 / 2 = {result[2]}")
print(f"2 * 2 = {result[3]}")
result = await client.batch[
('sum', 2, 2),
('sub', 2, 2),
('div', 2, 2),
('mult', 2, 2),
]
print(f"2 + 2 = {result[0]}")
print(f"2 - 2 = {result[1]}")
print(f"2 / 2 = {result[2]}")
print(f"2 * 2 = {result[3]}")
result = await client.batch.proxy.sum(2, 2).sub(2, 2).div(2, 2).mult(2, 2).call()
print(f"2 + 2 = {result[0]}")
print(f"2 - 2 = {result[1]}")
print(f"2 / 2 = {result[2]}")
print(f"2 * 2 = {result[3]}")
await client.batch.notify('tick').notify('tack').call()
asyncio.run(main())
aiohttp pytest integration¶
import pytest
from unittest import mock
import pjrpc
from pjrpc.client.integrations.pytest import PjRpcAiohttpMocker
from pjrpc.client.backend import aiohttp as aiohttp_client
async def test_using_fixture(pjrpc_aiohttp_mocker):
client = aiohttp_client.Client('http://localhost/api/v1')
pjrpc_aiohttp_mocker.add('http://localhost/api/v1', 'sum', result=2)
result = await client.proxy.sum(1, 1)
assert result == 2
pjrpc_aiohttp_mocker.replace(
'http://localhost/api/v1', 'sum', error=pjrpc.exc.JsonRpcError(code=1, message='error', data='oops'),
)
with pytest.raises(pjrpc.exc.JsonRpcError) as exc_info:
await client.proxy.sum(a=1, b=1)
assert exc_info.type is pjrpc.exc.JsonRpcError
assert exc_info.value.code == 1
assert exc_info.value.message == 'error'
assert exc_info.value.data == 'oops'
localhost_calls = pjrpc_aiohttp_mocker.calls['http://localhost/api/v1']
assert localhost_calls[('2.0', 'sum')].call_count == 2
assert localhost_calls[('2.0', 'sum')].mock_calls == [mock.call(1, 1), mock.call(a=1, b=1)]
async def test_using_resource_manager():
client = aiohttp_client.Client('http://localhost/api/v1')
with PjRpcAiohttpMocker() as mocker:
mocker.add('http://localhost/api/v1', 'div', result=2)
result = await client.proxy.div(4, 2)
assert result == 2
localhost_calls = mocker.calls['http://localhost/api/v1']
assert localhost_calls[('2.0', 'div')].mock_calls == [mock.call(4, 2)]
aiohttp server¶
import uuid
from aiohttp import web
import pjrpc.server
from pjrpc.server.integration import aiohttp
methods = pjrpc.server.MethodRegistry()
@methods.add(context='request')
async def add_user(request: web.Request, user: dict):
user_id = uuid.uuid4().hex
request.app['users'][user_id] = user
return {'id': user_id, **user}
jsonrpc_app = aiohttp.Application('/api/v1')
jsonrpc_app.dispatcher.add_methods(methods)
jsonrpc_app.app['users'] = {}
if __name__ == "__main__":
web.run_app(jsonrpc_app.app, host='localhost', port=8080)
aiohttp versioning¶
import uuid
from aiohttp import web
import pjrpc.server
from pjrpc.server.integration import aiohttp
methods_v1 = pjrpc.server.MethodRegistry()
@methods_v1.add(context='request')
async def add_user_v1(request: web.Request, user: dict):
user_id = uuid.uuid4().hex
request.config_dict['users'][user_id] = user
return {'id': user_id, **user}
methods_v2 = pjrpc.server.MethodRegistry()
@methods_v2.add(context='request')
async def add_user_v2(request: web.Request, user: dict):
user_id = uuid.uuid4().hex
request.config_dict['users'][user_id] = user
return {'id': user_id, **user}
app = web.Application()
app['users'] = {}
app_v1 = aiohttp.Application()
app_v1.dispatcher.add_methods(methods_v1)
app.add_subapp('/api/v1', app_v1.app)
app_v2 = aiohttp.Application()
app_v2.dispatcher.add_methods(methods_v2)
app.add_subapp('/api/v2', app_v2.app)
if __name__ == "__main__":
web.run_app(app, host='localhost', port=8080)
client prometheus metrics¶
import time
import prometheus_client as prom_cli
from pjrpc.client import tracer
from pjrpc.client.backend import requests as pjrpc_client
method_latency_hist = prom_cli.Histogram('method_latency', 'Method latency', labelnames=['method'])
method_call_total = prom_cli.Counter('method_call_total', 'Method call count', labelnames=['method'])
method_errors_total = prom_cli.Counter('method_errors_total', 'Method errors count', labelnames=['method', 'code'])
class PrometheusTracer(tracer.Tracer):
def on_request_begin(self, trace_context, request):
trace_context.started_at = time.time()
method_call_total.labels(request.method).inc()
def on_request_end(self, trace_context, request, response):
method_latency_hist.labels(request.method).observe(time.time() - trace_context.started_at)
if response.is_error:
method_call_total.labels(request.method, response.error.code).inc()
def on_error(self, trace_context, request, error):
method_latency_hist.labels(request.method).observe(time.time() - trace_context.started_at)
client = pjrpc_client.Client(
'http://localhost/api/v1', tracers=(
PrometheusTracer(),
),
)
result = client.proxy.sum(1, 2)
client tracing¶
import opentracing
from opentracing import tags
from pjrpc.client import tracer
from pjrpc.client.backend import requests as pjrpc_client
class ClientTracer(tracer.Tracer):
def __init__(self):
super().__init__()
self._tracer = opentracing.global_tracer()
def on_request_begin(self, trace_context, request):
span = self._tracer.start_active_span(f'jsonrpc.{request.method}').span
span.set_tag(tags.COMPONENT, 'pjrpc.client')
span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_CLIENT)
def on_request_end(self, trace_context, request, response):
span = self._tracer.active_span
span.set_tag(tags.ERROR, response.is_error)
if response.is_error:
span.set_tag('jsonrpc.error_code', response.error.code)
span.set_tag('jsonrpc.error_message', response.error.message)
span.finish()
def on_error(self, trace_context, request, error):
span = self._tracer.active_span
span.set_tag(tags.ERROR, True)
span.finish()
client = pjrpc_client.Client(
'http://localhost/api/v1', tracers=(
ClientTracer(),
),
)
result = client.proxy.sum(1, 2)
flask class-based handler¶
import uuid
import flask
import pjrpc
from pjrpc.server.integration import flask as integration
app = flask.Flask(__name__)
methods = pjrpc.server.MethodRegistry()
@methods.view(prefix='user')
class UserView(pjrpc.server.ViewMixin):
def __init__(self):
super().__init__()
self._users = flask.current_app.users
def add(self, user: dict):
user_id = uuid.uuid4().hex
self._users[user_id] = user
return {'id': user_id, **user}
def get(self, user_id: str):
user = self._users.get(user_id)
if not user:
pjrpc.exc.JsonRpcError(code=1, message='not found')
return user
json_rpc = integration.JsonRPC('/api/v1')
json_rpc.dispatcher.add_methods(methods)
app.users = {}
json_rpc.init_app(app)
if __name__ == "__main__":
app.run(port=8080)
flask server¶
import uuid
import flask
import pjrpc
from pjrpc.server.integration import flask as integration
app = flask.Flask(__name__)
methods = pjrpc.server.MethodRegistry()
@methods.add
def add_user(user: dict):
user_id = uuid.uuid4().hex
flask.current_app.users[user_id] = user
return {'id': user_id, **user}
json_rpc = integration.JsonRPC('/api/v1')
json_rpc.dispatcher.add_methods(methods)
app.users = {}
json_rpc.init_app(app)
if __name__ == "__main__":
app.run(port=8080)
flask versioning¶
import uuid
import flask
import pjrpc.server
from pjrpc.server.integration import flask as integration
methods_v1 = pjrpc.server.MethodRegistry()
@methods_v1.add
def add_user_v1(user: dict):
user_id = uuid.uuid4().hex
flask.current_app.users[user_id] = user
return {'id': user_id, **user}
methods_v2 = pjrpc.server.MethodRegistry()
@methods_v2.add
def add_user_v2(user: dict):
user_id = uuid.uuid4().hex
flask.current_app.users[user_id] = user
return {'id': user_id, **user}
app_v1 = flask.blueprints.Blueprint('v1', __name__)
json_rpc = integration.JsonRPC('/api/v1')
json_rpc.dispatcher.add_methods(methods_v1)
json_rpc.init_app(app_v1)
app_v2 = flask.blueprints.Blueprint('v2', __name__)
json_rpc = integration.JsonRPC('/api/v2')
json_rpc.dispatcher.add_methods(methods_v2)
json_rpc.init_app(app_v2)
app = flask.Flask(__name__)
app.register_blueprint(app_v1)
app.register_blueprint(app_v2)
app.users = {}
if __name__ == "__main__":
app.run(port=8080)
httpserver¶
import uuid
import http.server
import socketserver
import pjrpc
import pjrpc.server
class JsonRpcHandler(http.server.BaseHTTPRequestHandler):
"""
JSON-RPC handler.
"""
def do_POST(self):
"""
Handles JSON-RPC request.
"""
content_type = self.headers.get('Content-Type')
if content_type != 'application/json':
self.send_response(http.HTTPStatus.UNSUPPORTED_MEDIA_TYPE)
return
try:
content_length = int(self.headers.get('Content-Length', -1))
request_text = self.rfile.read(content_length).decode()
except UnicodeDecodeError:
self.send_response(http.HTTPStatus.BAD_REQUEST)
return
response_text = self.server.dispatcher.dispatch(request_text, context=self)
if response_text is None:
self.send_response(http.HTTPStatus.OK)
else:
self.send_response(http.HTTPStatus.OK)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(response_text.encode())
class JsonRpcServer(http.server.HTTPServer):
"""
:py:class:`http.server.HTTPServer` based JSON-RPC server.
:param path: JSON-RPC handler base path
:param kwargs: arguments to be passed to the dispatcher :py:class:`pjrpc.server.Dispatcher`
"""
def __init__(self, server_address, RequestHandlerClass=JsonRpcHandler, bind_and_activate=True, **kwargs):
super().__init__(server_address, RequestHandlerClass, bind_and_activate)
self._dispatcher = pjrpc.server.Dispatcher(**kwargs)
@property
def dispatcher(self):
"""
JSON-RPC method dispatcher.
"""
return self._dispatcher
methods = pjrpc.server.MethodRegistry()
@methods.add(context='request')
def add_user(request: http.server.BaseHTTPRequestHandler, user: dict):
user_id = uuid.uuid4().hex
request.server.users[user_id] = user
return {'id': user_id, **user}
class ThreadingJsonRpcServer(socketserver.ThreadingMixIn, JsonRpcServer):
users = {}
with ThreadingJsonRpcServer(("localhost", 8080)) as server:
server.dispatcher.add_methods(methods)
server.serve_forever()
jsonschema validator¶
import uuid
from aiohttp import web
import pjrpc.server
from pjrpc.server.validators import jsonschema as validators
from pjrpc.server.integration import aiohttp
methods = pjrpc.server.MethodRegistry()
validator = validators.JsonSchemaValidator()
contact_schema = {
'type': 'object',
'properties': {
'type': {
'type': 'string',
'enum': ['phone', 'email'],
},
'value': {'type': 'string'},
},
'required': ['type', 'value'],
}
user_schema = {
'type': 'object',
'properties': {
'name': {'type': 'string'},
'surname': {'type': 'string'},
'age': {'type': 'integer'},
'contacts': {
'type': 'array',
'items': contact_schema,
},
},
'required': ['name', 'surname', 'age', 'contacts'],
}
params_schema = {
'type': 'object',
'properties': {
'user': user_schema,
},
'required': ['user'],
}
@methods.add(context='request')
@validator.validate(schema=params_schema)
async def add_user(request: web.Request, user):
user_id = uuid.uuid4().hex
request.app['users'][user_id] = user
return {'id': user_id, **user}
jsonrpc_app = aiohttp.Application('/api/v1')
jsonrpc_app.dispatcher.add_methods(methods)
jsonrpc_app.app['users'] = {}
if __name__ == "__main__":
web.run_app(jsonrpc_app.app, host='localhost', port=8080)
kombu client¶
import pjrpc
from pjrpc.client.backend import kombu as pjrpc_client
client = pjrpc_client.Client('amqp://guest:guest@localhost:5672/v1', 'jsonrpc')
response: pjrpc.Response = client.send(pjrpc.Request('sum', params=[1, 2], id=1))
print(f"1 + 2 = {response.result}")
result = client('sum', a=1, b=2)
print(f"1 + 2 = {result}")
result = client.proxy.sum(1, 2)
print(f"1 + 2 = {result}")
client.notify('tick')
kombu server¶
import uuid
import kombu
import pjrpc
from pjrpc.server.integration import kombu as integration
methods = pjrpc.server.MethodRegistry()
@methods.add(context='message')
def add_user(message: kombu.Message, user: dict):
user_id = uuid.uuid4().hex
return {'id': user_id, **user}
executor = integration.Executor('amqp://guest:guest@localhost:5672/v1', queue_name='jsonrpc')
executor.dispatcher.add_methods(methods)
if __name__ == "__main__":
executor.run()
middlewares¶
from aiohttp import web
import pjrpc.server
from pjrpc.server.integration import aiohttp
methods = pjrpc.server.MethodRegistry()
@methods.add(context='request')
async def method(request):
print("method")
async def middleware1(request, context, handler):
print("middleware1 started")
result = await handler(request, context)
print("middleware1 finished")
return result
async def middleware2(request, context, handler):
print("middleware2 started")
result = await handler(request, context)
print("middleware2 finished")
return result
jsonrpc_app = aiohttp.Application(
'/api/v1', middlewares=(
middleware1,
middleware2,
),
)
jsonrpc_app.dispatcher.add_methods(methods)
if __name__ == "__main__":
web.run_app(jsonrpc_app.app, host='localhost', port=8080)
multiple clients¶
import pjrpc
from pjrpc.client.backend import requests as jrpc_client
class ErrorV1(pjrpc.exc.JsonRpcError):
@classmethod
def get_error_cls(cls, code, default):
return next(iter((c for c in cls.__subclasses__() if getattr(c, 'code', None) == code)), default)
class PermissionDenied(ErrorV1):
code = 1
message = 'permission denied'
class ErrorV2(pjrpc.exc.JsonRpcError):
@classmethod
def get_error_cls(cls, code, default):
return next(iter((c for c in cls.__subclasses__() if getattr(c, 'code', None) == code)), default)
class ResourceNotFound(ErrorV2):
code = 1
message = 'resource not found'
client_v1 = jrpc_client.Client('http://localhost:8080/api/v1', error_cls=ErrorV1)
client_v2 = jrpc_client.Client('http://localhost:8080/api/v2', error_cls=ErrorV2)
try:
response: pjrpc.Response = client_v1.proxy.add_user(user={})
except PermissionDenied as e:
print(e)
try:
response: pjrpc.Response = client_v2.proxy.add_user(user={})
except ResourceNotFound as e:
print(e)
pydantic validator¶
import enum
import uuid
from typing import List
import pydantic
from aiohttp import web
import pjrpc.server
from pjrpc.server.validators import pydantic as validators
from pjrpc.server.integration import aiohttp
methods = pjrpc.server.MethodRegistry()
validator = validators.PydanticValidator()
class ContactType(enum.Enum):
PHONE = 'phone'
EMAIL = 'email'
class Contact(pydantic.BaseModel):
type: ContactType
value: str
class User(pydantic.BaseModel):
name: str
surname: str
age: int
contacts: List[Contact]
@methods.add(context='request')
@validator.validate
async def add_user(request: web.Request, user: User):
user_id = uuid.uuid4()
request.app['users'][user_id] = user
return {'id': user_id, **user.dict()}
class JSONEncoder(pjrpc.server.JSONEncoder):
def default(self, o):
if isinstance(o, uuid.UUID):
return o.hex
if isinstance(o, enum.Enum):
return o.value
return super().default(o)
jsonrpc_app = aiohttp.Application('/api/v1', json_encoder=JSONEncoder)
jsonrpc_app.dispatcher.add_methods(methods)
jsonrpc_app.app['users'] = {}
if __name__ == "__main__":
web.run_app(jsonrpc_app.app, host='localhost', port=8080)
requests client¶
import pjrpc
from pjrpc.client.backend import requests as pjrpc_client
client = pjrpc_client.Client('http://localhost/api/v1')
response: pjrpc.Response = client.send(pjrpc.Request('sum', params=[1, 2], id=1))
print(f"1 + 2 = {response.result}")
result = client('sum', a=1, b=2)
print(f"1 + 2 = {result}")
result = client.proxy.sum(1, 2)
print(f"1 + 2 = {result}")
client.notify('tick')
requests pytest¶
import pytest
from unittest import mock
import pjrpc
from pjrpc.client.integrations.pytest import PjRpcRequestsMocker
from pjrpc.client.backend import requests as requests_client
def test_using_fixture(pjrpc_requests_mocker):
client = requests_client.Client('http://localhost/api/v1')
pjrpc_requests_mocker.add('http://localhost/api/v1', 'sum', result=2)
result = client.proxy.sum(1, 1)
assert result == 2
pjrpc_requests_mocker.replace(
'http://localhost/api/v1', 'sum', error=pjrpc.exc.JsonRpcError(code=1, message='error', data='oops'),
)
with pytest.raises(pjrpc.exc.JsonRpcError) as exc_info:
client.proxy.sum(a=1, b=1)
assert exc_info.type is pjrpc.exc.JsonRpcError
assert exc_info.value.code == 1
assert exc_info.value.message == 'error'
assert exc_info.value.data == 'oops'
localhost_calls = pjrpc_requests_mocker.calls['http://localhost/api/v1']
assert localhost_calls[('2.0', 'sum')].call_count == 2
assert localhost_calls[('2.0', 'sum')].mock_calls == [mock.call(1, 1), mock.call(a=1, b=1)]
client = requests_client.Client('http://localhost/api/v2')
with pytest.raises(ConnectionRefusedError):
client.proxy.sum(1, 1)
def test_using_resource_manager():
client = requests_client.Client('http://localhost/api/v1')
with PjRpcRequestsMocker() as mocker:
mocker.add('http://localhost/api/v1', 'mult', result=4)
mocker.add('http://localhost/api/v1', 'div', callback=lambda a, b: a/b)
result = client.batch.proxy.div(4, 2).mult(2, 2).call()
assert result == (2, 4)
localhost_calls = mocker.calls['http://localhost/api/v1']
assert localhost_calls[('2.0', 'div')].mock_calls == [mock.call(4, 2)]
assert localhost_calls[('2.0', 'mult')].mock_calls == [mock.call(2, 2)]
with pytest.raises(pjrpc.exc.MethodNotFoundError):
client.proxy.sub(4, 2)
sentry¶
import sentry_sdk
from aiohttp import web
import pjrpc.server
from pjrpc.server.integration import aiohttp
methods = pjrpc.server.MethodRegistry()
@methods.add(context='request')
async def method(request):
print("method")
async def sentry_middleware(request, context, handler):
try:
return await handler(request, context)
except pjrpc.exceptions.JsonRpcError as e:
sentry_sdk.capture_exception(e)
raise
jsonrpc_app = aiohttp.Application(
'/api/v1', middlewares=(
sentry_middleware,
),
)
jsonrpc_app.dispatcher.add_methods(methods)
if __name__ == "__main__":
web.run_app(jsonrpc_app.app, host='localhost', port=8080)
server prometheus metrics¶
import asyncio
from typing import Any, Callable
import prometheus_client as pc
from aiohttp import web
import pjrpc.server
from pjrpc.server.integration import aiohttp
method_error_count = pc.Counter('method_error_count', 'Method error count', labelnames=['method', 'code'])
method_latency_hist = pc.Histogram('method_latency', 'Method latency', labelnames=['method'])
method_active_count = pc.Gauge('method_active_count', 'Method active count', labelnames=['method'])
async def metrics(request):
return web.Response(body=pc.generate_latest())
http_app = web.Application()
http_app.add_routes([web.get('/metrics', metrics)])
methods = pjrpc.server.MethodRegistry()
@methods.add(context='context')
async def method(context: web.Request):
print("method started")
await asyncio.sleep(1)
print("method finished")
async def latency_metric_middleware(request: pjrpc.Request, context: web.Request, handler: Callable) -> Any:
with method_latency_hist.labels(method=request.method).time():
return await handler(request, context)
async def active_count_metric_middleware(request: pjrpc.Request, context: web.Request, handler: Callable) -> Any:
with method_active_count.labels(method=request.method).track_inprogress():
return await handler(request, context)
async def any_error_handler(
request: pjrpc.Request, context: web.Request, error: pjrpc.exceptions.JsonRpcError,
) -> pjrpc.exceptions.JsonRpcError:
method_error_count.labels(method=request.method, code=error.code).inc()
return error
async def validation_error_handler(
request: pjrpc.Request, context: web.Request, error: pjrpc.exceptions.JsonRpcError,
) -> pjrpc.exceptions.JsonRpcError:
print("validation error occurred")
return error
jsonrpc_app = aiohttp.Application(
'/api/v1',
app=http_app,
middlewares=(
latency_metric_middleware,
active_count_metric_middleware,
),
error_handlers={
-32602: [validation_error_handler],
None: [any_error_handler],
},
)
jsonrpc_app.dispatcher.add_methods(methods)
if __name__ == "__main__":
web.run_app(jsonrpc_app.app, host='localhost', port=8080)
server tracing¶
import asyncio
import opentracing
from opentracing import tags
from aiohttp import web
import pjrpc.server
from pjrpc.server.integration import aiohttp
@web.middleware
async def http_tracing_middleware(request, handler):
"""
aiohttp server tracer.
"""
tracer = opentracing.global_tracer()
try:
span_ctx = tracer.extract(format=opentracing.Format.HTTP_HEADERS, carrier=request.headers)
except (opentracing.InvalidCarrierException, opentracing.SpanContextCorruptedException):
span_ctx = None
span = tracer.start_span(f'http.{request.method}', child_of=span_ctx)
span.set_tag(tags.COMPONENT, 'aiohttp.server')
span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_SERVER)
span.set_tag(tags.PEER_ADDRESS, request.remote)
span.set_tag(tags.HTTP_URL, str(request.url))
span.set_tag(tags.HTTP_METHOD, request.method)
with tracer.scope_manager.activate(span, finish_on_close=True):
response: web.Response = await handler(request)
span.set_tag(tags.HTTP_STATUS_CODE, response.status)
span.set_tag(tags.ERROR, response.status >= 400)
return response
http_app = web.Application(
middlewares=(
http_tracing_middleware,
),
)
methods = pjrpc.server.MethodRegistry()
@methods.add(context='context')
async def method(context):
print("method started")
await asyncio.sleep(1)
print("method finished")
async def jsonrpc_tracing_middleware(request, context, handler):
tracer = opentracing.global_tracer()
span = tracer.start_span(f'jsonrpc.{request.method}')
span.set_tag(tags.COMPONENT, 'pjrpc')
span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_SERVER)
span.set_tag('jsonrpc.version', request.version)
span.set_tag('jsonrpc.id', request.id)
span.set_tag('jsonrpc.method', request.method)
with tracer.scope_manager.activate(span, finish_on_close=True):
response = await handler(request, context)
if response.is_error:
span.set_tag('jsonrpc.error_code', response.error.code)
span.set_tag('jsonrpc.error_message', response.error.message)
span.set_tag(tags.ERROR, True)
else:
span.set_tag(tags.ERROR, False)
return response
jsonrpc_app = aiohttp.Application(
'/api/v1', app=http_app, middlewares=(
jsonrpc_tracing_middleware,
),
)
jsonrpc_app.dispatcher.add_methods(methods)
if __name__ == "__main__":
web.run_app(jsonrpc_app.app, host='localhost', port=8080)
werkzeug server¶
import uuid
import werkzeug
import pjrpc.server
from pjrpc.server.integration import werkzeug as integration
methods = pjrpc.server.MethodRegistry()
@methods.add(context='request')
def add_user(request: werkzeug.Request, user: dict):
user_id = uuid.uuid4().hex
request.environ['app'].users[user_id] = user
return {'id': user_id, **user}
app = integration.JsonRPC('/api/v1')
app.dispatcher.add_methods(methods)
app.users = {}
if __name__ == '__main__':
werkzeug.serving.run_simple('127.0.0.1', 8080, app)