Examples¶
aio_pika client¶
import asyncio
import pjrpc
from pjrpc.client.backend import aio_pika as pjrpc_client
async def main():
client = pjrpc_client.Client('amqp://guest:guest@localhost:5672/v1', 'jsonrpc')
await client.connect()
response: pjrpc.Response = await client.send(pjrpc.Request('sum', params=[1, 2], id=1))
print(f"1 + 2 = {response.result}")
result = await client('sum', a=1, b=2)
print(f"1 + 2 = {result}")
result = await client.proxy.sum(1, 2)
print(f"1 + 2 = {result}")
await client.notify('tick')
if __name__ == "__main__":
asyncio.run(main())
aio_pika server¶
import asyncio
import uuid
import aio_pika
import pjrpc
from pjrpc.server.integration import aio_pika as integration
methods = pjrpc.server.MethodRegistry()
@methods.add(context='message')
def add_user(message: aio_pika.IncomingMessage, user: dict):
user_id = uuid.uuid4().hex
return {'id': user_id, **user}
executor = integration.Executor('amqp://guest:guest@localhost:5672/v1', queue_name='jsonrpc')
executor.dispatcher.add_methods(methods)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(executor.start())
try:
loop.run_forever()
finally:
loop.run_until_complete(executor.shutdown())
aiohttp class-based handler¶
import uuid
from aiohttp import web
import pjrpc.server
from pjrpc.server.integration import aiohttp
methods = pjrpc.server.MethodRegistry()
@methods.view(context='request', prefix='user')
class UserView(pjrpc.server.ViewMixin):
def __init__(self, request: web.Request):
super().__init__()
self._users = request.app['users']
async def add(self, user: dict):
user_id = uuid.uuid4().hex
self._users[user_id] = user
return {'id': user_id, **user}
async def get(self, user_id: str):
user = self._users.get(user_id)
if not user:
pjrpc.exc.JsonRpcError(code=1, message='not found')
return user
jsonrpc_app = aiohttp.Application('/api/v1')
jsonrpc_app.dispatcher.add_methods(methods)
jsonrpc_app.app['users'] = {}
if __name__ == "__main__":
web.run_app(jsonrpc_app.app, host='localhost', port=8080)
aiohttp client¶
import asyncio
import pjrpc
from pjrpc.client.backend import aiohttp as pjrpc_client
async def main():
async with pjrpc_client.Client('http://localhost/api/v1') as client:
response = await client.send(pjrpc.Request('sum', params=[1, 2], id=1))
print(f"1 + 2 = {response.result}")
result = await client('sum', a=1, b=2)
print(f"1 + 2 = {result}")
result = await client.proxy.sum(1, 2)
print(f"1 + 2 = {result}")
await client.notify('tick')
asyncio.run(main())
aiohttp client batch request¶
import asyncio
import pjrpc
from pjrpc.client.backend import aiohttp as pjrpc_client
async def main():
async with pjrpc_client.Client('http://localhost:8080/api/v1') as client:
batch_response = await client.batch.send(
pjrpc.BatchRequest(
pjrpc.Request('sum', [2, 2], id=1),
pjrpc.Request('sub', [2, 2], id=2),
pjrpc.Request('div', [2, 2], id=3),
pjrpc.Request('mult', [2, 2], id=4),
),
)
print(f"2 + 2 = {batch_response[0].result}")
print(f"2 - 2 = {batch_response[1].result}")
print(f"2 / 2 = {batch_response[2].result}")
print(f"2 * 2 = {batch_response[3].result}")
result = await client.batch('sum', 2, 2)('sub', 2, 2)('div', 2, 2)('mult', 2, 2).call()
print(f"2 + 2 = {result[0]}")
print(f"2 - 2 = {result[1]}")
print(f"2 / 2 = {result[2]}")
print(f"2 * 2 = {result[3]}")
result = await client.batch[
('sum', 2, 2),
('sub', 2, 2),
('div', 2, 2),
('mult', 2, 2),
]
print(f"2 + 2 = {result[0]}")
print(f"2 - 2 = {result[1]}")
print(f"2 / 2 = {result[2]}")
print(f"2 * 2 = {result[3]}")
result = await client.batch.proxy.sum(2, 2).sub(2, 2).div(2, 2).mult(2, 2).call()
print(f"2 + 2 = {result[0]}")
print(f"2 - 2 = {result[1]}")
print(f"2 / 2 = {result[2]}")
print(f"2 * 2 = {result[3]}")
await client.batch.notify('tick').notify('tack').call()
asyncio.run(main())
aiohttp pytest integration¶
import pytest
from unittest import mock
import pjrpc
from pjrpc.client.integrations.pytest import PjRpcAiohttpMocker
from pjrpc.client.backend import aiohttp as aiohttp_client
async def test_using_fixture(pjrpc_aiohttp_mocker):
client = aiohttp_client.Client('http://localhost/api/v1')
pjrpc_aiohttp_mocker.add('http://localhost/api/v1', 'sum', result=2)
result = await client.proxy.sum(1, 1)
assert result == 2
pjrpc_aiohttp_mocker.replace(
'http://localhost/api/v1', 'sum', error=pjrpc.exc.JsonRpcError(code=1, message='error', data='oops'),
)
with pytest.raises(pjrpc.exc.JsonRpcError) as exc_info:
await client.proxy.sum(a=1, b=1)
assert exc_info.type is pjrpc.exc.JsonRpcError
assert exc_info.value.code == 1
assert exc_info.value.message == 'error'
assert exc_info.value.data == 'oops'
localhost_calls = pjrpc_aiohttp_mocker.calls['http://localhost/api/v1']
assert localhost_calls[('2.0', 'sum')].call_count == 2
assert localhost_calls[('2.0', 'sum')].mock_calls == [mock.call(1, 1), mock.call(a=1, b=1)]
async def test_using_resource_manager():
client = aiohttp_client.Client('http://localhost/api/v1')
with PjRpcAiohttpMocker() as mocker:
mocker.add('http://localhost/api/v1', 'div', result=2)
result = await client.proxy.div(4, 2)
assert result == 2
localhost_calls = mocker.calls['http://localhost/api/v1']
assert localhost_calls[('2.0', 'div')].mock_calls == [mock.call(4, 2)]
aiohttp server¶
import uuid
from aiohttp import web
import pjrpc.server
from pjrpc.server.integration import aiohttp
methods = pjrpc.server.MethodRegistry()
@methods.add(context='request')
async def add_user(request: web.Request, user: dict):
user_id = uuid.uuid4().hex
request.app['users'][user_id] = user
return {'id': user_id, **user}
jsonrpc_app = aiohttp.Application('/api/v1')
jsonrpc_app.dispatcher.add_methods(methods)
jsonrpc_app.app['users'] = {}
if __name__ == "__main__":
web.run_app(jsonrpc_app.app, host='localhost', port=8080)
aiohttp versioning¶
import uuid
from aiohttp import web
import pjrpc.server
from pjrpc.server.integration import aiohttp
methods_v1 = pjrpc.server.MethodRegistry()
@methods_v1.add(context='request')
async def add_user_v1(request: web.Request, user: dict):
user_id = uuid.uuid4().hex
request.config_dict['users'][user_id] = user
return {'id': user_id, **user}
methods_v2 = pjrpc.server.MethodRegistry()
@methods_v2.add(context='request')
async def add_user_v2(request: web.Request, user: dict):
user_id = uuid.uuid4().hex
request.config_dict['users'][user_id] = user
return {'id': user_id, **user}
app = web.Application()
app['users'] = {}
app_v1 = aiohttp.Application()
app_v1.dispatcher.add_methods(methods_v1)
app.add_subapp('/api/v1', app_v1.app)
app_v2 = aiohttp.Application()
app_v2.dispatcher.add_methods(methods_v2)
app.add_subapp('/api/v2', app_v2.app)
if __name__ == "__main__":
web.run_app(app, host='localhost', port=8080)
client prometheus metrics¶
import time
import prometheus_client as prom_cli
from pjrpc.client import tracer
from pjrpc.client.backend import requests as pjrpc_client
method_latency_hist = prom_cli.Histogram('method_latency', 'Method latency', labelnames=['method'])
method_call_total = prom_cli.Counter('method_call_total', 'Method call count', labelnames=['method'])
method_errors_total = prom_cli.Counter('method_errors_total', 'Method errors count', labelnames=['method', 'code'])
class PrometheusTracer(tracer.Tracer):
def on_request_begin(self, trace_context, request):
trace_context.started_at = time.time()
method_call_total.labels(request.method).inc()
def on_request_end(self, trace_context, request, response):
method_latency_hist.labels(request.method).observe(time.time() - trace_context.started_at)
if response.is_error:
method_call_total.labels(request.method, response.error.code).inc()
def on_error(self, trace_context, request, error):
method_latency_hist.labels(request.method).observe(time.time() - trace_context.started_at)
client = pjrpc_client.Client(
'http://localhost/api/v1', tracers=(
PrometheusTracer(),
),
)
result = client.proxy.sum(1, 2)
client tracing¶
import opentracing
from opentracing import tags
from pjrpc.client import tracer
from pjrpc.client.backend import requests as pjrpc_client
class ClientTracer(tracer.Tracer):
def __init__(self):
super().__init__()
self._tracer = opentracing.global_tracer()
def on_request_begin(self, trace_context, request):
span = self._tracer.start_active_span(f'jsonrpc.{request.method}').span
span.set_tag(tags.COMPONENT, 'pjrpc.client')
span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_CLIENT)
def on_request_end(self, trace_context, request, response):
span = self._tracer.active_span
span.set_tag(tags.ERROR, response.is_error)
if response.is_error:
span.set_tag('jsonrpc.error_code', response.error.code)
span.set_tag('jsonrpc.error_message', response.error.message)
span.finish()
def on_error(self, trace_context, request, error):
span = self._tracer.active_span
span.set_tag(tags.ERROR, True)
span.finish()
client = pjrpc_client.Client(
'http://localhost/api/v1', tracers=(
ClientTracer(),
),
)
result = client.proxy.sum(1, 2)
flask class-based handler¶
import uuid
import flask
import pjrpc
from pjrpc.server.integration import flask as integration
app = flask.Flask(__name__)
methods = pjrpc.server.MethodRegistry()
@methods.view(prefix='user')
class UserView(pjrpc.server.ViewMixin):
def __init__(self):
super().__init__()
self._users = flask.current_app.users
def add(self, user: dict):
user_id = uuid.uuid4().hex
self._users[user_id] = user
return {'id': user_id, **user}
def get(self, user_id: str):
user = self._users.get(user_id)
if not user:
pjrpc.exc.JsonRpcError(code=1, message='not found')
return user
json_rpc = integration.JsonRPC('/api/v1')
json_rpc.dispatcher.add_methods(methods)
app.users = {}
json_rpc.init_app(app)
if __name__ == "__main__":
app.run(port=8080)
flask server¶
import uuid
import flask
import pjrpc
from pjrpc.server.integration import flask as integration
app = flask.Flask(__name__)
methods = pjrpc.server.MethodRegistry()
@methods.add
def add_user(user: dict):
user_id = uuid.uuid4().hex
flask.current_app.users[user_id] = user
return {'id': user_id, **user}
json_rpc = integration.JsonRPC('/api/v1')
json_rpc.dispatcher.add_methods(methods)
app.users = {}
json_rpc.init_app(app)
if __name__ == "__main__":
app.run(port=8080)
flask versioning¶
import uuid
import flask
import pjrpc.server
from pjrpc.server.integration import flask as integration
methods_v1 = pjrpc.server.MethodRegistry()
@methods_v1.add
def add_user_v1(user: dict):
user_id = uuid.uuid4().hex
flask.current_app.users[user_id] = user
return {'id': user_id, **user}
methods_v2 = pjrpc.server.MethodRegistry()
@methods_v2.add
def add_user_v2(user: dict):
user_id = uuid.uuid4().hex
flask.current_app.users[user_id] = user
return {'id': user_id, **user}
app_v1 = flask.blueprints.Blueprint('v1', __name__)
json_rpc = integration.JsonRPC('/api/v1')
json_rpc.dispatcher.add_methods(methods_v1)
json_rpc.init_app(app_v1)
app_v2 = flask.blueprints.Blueprint('v2', __name__)
json_rpc = integration.JsonRPC('/api/v2')
json_rpc.dispatcher.add_methods(methods_v2)
json_rpc.init_app(app_v2)
app = flask.Flask(__name__)
app.register_blueprint(app_v1)
app.register_blueprint(app_v2)
app.users = {}
if __name__ == "__main__":
app.run(port=8080)
httpserver¶
import uuid
import http.server
import socketserver
import pjrpc
import pjrpc.server
class JsonRpcHandler(http.server.BaseHTTPRequestHandler):
"""
JSON-RPC handler.
"""
def do_POST(self):
"""
Handles JSON-RPC request.
"""
content_type = self.headers.get('Content-Type')
if content_type != 'application/json':
self.send_response(http.HTTPStatus.UNSUPPORTED_MEDIA_TYPE)
return
try:
content_length = int(self.headers.get('Content-Length', -1))
request_text = self.rfile.read(content_length).decode()
except UnicodeDecodeError:
self.send_response(http.HTTPStatus.BAD_REQUEST)
return
response_text = self.server.dispatcher.dispatch(request_text, context=self)
if response_text is None:
self.send_response(http.HTTPStatus.OK)
else:
self.send_response(http.HTTPStatus.OK)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(response_text.encode())
class JsonRpcServer(http.server.HTTPServer):
"""
:py:class:`http.server.HTTPServer` based JSON-RPC server.
:param path: JSON-RPC handler base path
:param kwargs: arguments to be passed to the dispatcher :py:class:`pjrpc.server.Dispatcher`
"""
def __init__(self, server_address, RequestHandlerClass=JsonRpcHandler, bind_and_activate=True, **kwargs):
super().__init__(server_address, RequestHandlerClass, bind_and_activate)
self._dispatcher = pjrpc.server.Dispatcher(**kwargs)
@property
def dispatcher(self):
"""
JSON-RPC method dispatcher.
"""
return self._dispatcher
methods = pjrpc.server.MethodRegistry()
@methods.add(context='request')
def add_user(request: http.server.BaseHTTPRequestHandler, user: dict):
user_id = uuid.uuid4().hex
request.server.users[user_id] = user
return {'id': user_id, **user}
class ThreadingJsonRpcServer(socketserver.ThreadingMixIn, JsonRpcServer):
users = {}
with ThreadingJsonRpcServer(("localhost", 8080)) as server:
server.dispatcher.add_methods(methods)
server.serve_forever()
jsonschema validator¶
import uuid
from aiohttp import web
import pjrpc.server
from pjrpc.server.validators import jsonschema as validators
from pjrpc.server.integration import aiohttp
methods = pjrpc.server.MethodRegistry()
validator = validators.JsonSchemaValidator()
contact_schema = {
'type': 'object',
'properties': {
'type': {
'type': 'string',
'enum': ['phone', 'email'],
},
'value': {'type': 'string'},
},
'required': ['type', 'value'],
}
user_schema = {
'type': 'object',
'properties': {
'name': {'type': 'string'},
'surname': {'type': 'string'},
'age': {'type': 'integer'},
'contacts': {
'type': 'array',
'items': contact_schema,
},
},
'required': ['name', 'surname', 'age', 'contacts'],
}
params_schema = {
'type': 'object',
'properties': {
'user': user_schema,
},
'required': ['user'],
}
@methods.add(context='request')
@validator.validate(schema=params_schema)
async def add_user(request: web.Request, user):
user_id = uuid.uuid4().hex
request.app['users'][user_id] = user
return {'id': user_id, **user}
jsonrpc_app = aiohttp.Application('/api/v1')
jsonrpc_app.dispatcher.add_methods(methods)
jsonrpc_app.app['users'] = {}
if __name__ == "__main__":
web.run_app(jsonrpc_app.app, host='localhost', port=8080)
kombu client¶
import pjrpc
from pjrpc.client.backend import kombu as pjrpc_client
client = pjrpc_client.Client('amqp://guest:guest@localhost:5672/v1', 'jsonrpc')
response: pjrpc.Response = client.send(pjrpc.Request('sum', params=[1, 2], id=1))
print(f"1 + 2 = {response.result}")
result = client('sum', a=1, b=2)
print(f"1 + 2 = {result}")
result = client.proxy.sum(1, 2)
print(f"1 + 2 = {result}")
client.notify('tick')
kombu server¶
import uuid
import kombu
import pjrpc
from pjrpc.server.integration import kombu as integration
methods = pjrpc.server.MethodRegistry()
@methods.add(context='message')
def add_user(message: kombu.Message, user: dict):
user_id = uuid.uuid4().hex
return {'id': user_id, **user}
executor = integration.Executor('amqp://guest:guest@localhost:5672/v1', queue_name='jsonrpc')
executor.dispatcher.add_methods(methods)
if __name__ == "__main__":
executor.run()
middlewares¶
from aiohttp import web
import pjrpc.server
from pjrpc.server.integration import aiohttp
methods = pjrpc.server.MethodRegistry()
@methods.add(context='request')
async def method(request):
print("method")
async def middleware1(request, context, handler):
print("middleware1 started")
result = await handler(request, context)
print("middleware1 finished")
return result
async def middleware2(request, context, handler):
print("middleware2 started")
result = await handler(request, context)
print("middleware2 finished")
return result
jsonrpc_app = aiohttp.Application(
'/api/v1', middlewares=(
middleware1,
middleware2,
),
)
jsonrpc_app.dispatcher.add_methods(methods)
if __name__ == "__main__":
web.run_app(jsonrpc_app.app, host='localhost', port=8080)
multiple clients¶
import pjrpc
from pjrpc.client.backend import requests as jrpc_client
class ErrorV1(pjrpc.exc.JsonRpcError):
@classmethod
def get_error_cls(cls, code, default):
return next(iter((c for c in cls.__subclasses__() if getattr(c, 'code', None) == code)), default)
class PermissionDenied(ErrorV1):
code = 1
message = 'permission denied'
class ErrorV2(pjrpc.exc.JsonRpcError):
@classmethod
def get_error_cls(cls, code, default):
return next(iter((c for c in cls.__subclasses__() if getattr(c, 'code', None) == code)), default)
class ResourceNotFound(ErrorV2):
code = 1
message = 'resource not found'
client_v1 = jrpc_client.Client('http://localhost:8080/api/v1', error_cls=ErrorV1)
client_v2 = jrpc_client.Client('http://localhost:8080/api/v2', error_cls=ErrorV2)
try:
response: pjrpc.Response = client_v1.proxy.add_user(user={})
except PermissionDenied as e:
print(e)
try:
response: pjrpc.Response = client_v2.proxy.add_user(user={})
except ResourceNotFound as e:
print(e)
pydantic validator¶
import enum
import uuid
from typing import List
import pydantic
from aiohttp import web
import pjrpc.server
from pjrpc.server.validators import pydantic as validators
from pjrpc.server.integration import aiohttp
methods = pjrpc.server.MethodRegistry()
validator = validators.PydanticValidator()
class ContactType(enum.Enum):
PHONE = 'phone'
EMAIL = 'email'
class Contact(pydantic.BaseModel):
type: ContactType
value: str
class User(pydantic.BaseModel):
name: str
surname: str
age: int
contacts: List[Contact]
@methods.add(context='request')
@validator.validate
async def add_user(request: web.Request, user: User):
user_id = uuid.uuid4()
request.app['users'][user_id] = user
return {'id': user_id, **user.dict()}
class JSONEncoder(pjrpc.server.JSONEncoder):
def default(self, o):
if isinstance(o, uuid.UUID):
return o.hex
if isinstance(o, enum.Enum):
return o.value
return super().default(o)
jsonrpc_app = aiohttp.Application('/api/v1', json_encoder=JSONEncoder)
jsonrpc_app.dispatcher.add_methods(methods)
jsonrpc_app.app['users'] = {}
if __name__ == "__main__":
web.run_app(jsonrpc_app.app, host='localhost', port=8080)
requests client¶
import pjrpc
from pjrpc.client.backend import requests as pjrpc_client
client = pjrpc_client.Client('http://localhost/api/v1')
response: pjrpc.Response = client.send(pjrpc.Request('sum', params=[1, 2], id=1))
print(f"1 + 2 = {response.result}")
result = client('sum', a=1, b=2)
print(f"1 + 2 = {result}")
result = client.proxy.sum(1, 2)
print(f"1 + 2 = {result}")
client.notify('tick')
requests pytest¶
import pytest
from unittest import mock
import pjrpc
from pjrpc.client.integrations.pytest import PjRpcRequestsMocker
from pjrpc.client.backend import requests as requests_client
def test_using_fixture(pjrpc_requests_mocker):
client = requests_client.Client('http://localhost/api/v1')
pjrpc_requests_mocker.add('http://localhost/api/v1', 'sum', result=2)
result = client.proxy.sum(1, 1)
assert result == 2
pjrpc_requests_mocker.replace(
'http://localhost/api/v1', 'sum', error=pjrpc.exc.JsonRpcError(code=1, message='error', data='oops'),
)
with pytest.raises(pjrpc.exc.JsonRpcError) as exc_info:
client.proxy.sum(a=1, b=1)
assert exc_info.type is pjrpc.exc.JsonRpcError
assert exc_info.value.code == 1
assert exc_info.value.message == 'error'
assert exc_info.value.data == 'oops'
localhost_calls = pjrpc_requests_mocker.calls['http://localhost/api/v1']
assert localhost_calls[('2.0', 'sum')].call_count == 2
assert localhost_calls[('2.0', 'sum')].mock_calls == [mock.call(1, 1), mock.call(a=1, b=1)]
client = requests_client.Client('http://localhost/api/v2')
with pytest.raises(ConnectionRefusedError):
client.proxy.sum(1, 1)
def test_using_resource_manager():
client = requests_client.Client('http://localhost/api/v1')
with PjRpcRequestsMocker() as mocker:
mocker.add('http://localhost/api/v1', 'mult', result=4)
mocker.add('http://localhost/api/v1', 'div', callback=lambda a, b: a/b)
result = client.batch.proxy.div(4, 2).mult(2, 2).call()
assert result == (2, 4)
localhost_calls = mocker.calls['http://localhost/api/v1']
assert localhost_calls[('2.0', 'div')].mock_calls == [mock.call(4, 2)]
assert localhost_calls[('2.0', 'mult')].mock_calls == [mock.call(2, 2)]
with pytest.raises(pjrpc.exc.MethodNotFoundError):
client.proxy.sub(4, 2)
sentry¶
import sentry_sdk
from aiohttp import web
import pjrpc.server
from pjrpc.server.integration import aiohttp
methods = pjrpc.server.MethodRegistry()
@methods.add(context='request')
async def method(request):
print("method")
async def sentry_middleware(request, context, handler):
try:
return await handler(request, context)
except pjrpc.exceptions.JsonRpcError as e:
sentry_sdk.capture_exception(e)
raise
jsonrpc_app = aiohttp.Application(
'/api/v1', middlewares=(
sentry_middleware,
),
)
jsonrpc_app.dispatcher.add_methods(methods)
if __name__ == "__main__":
web.run_app(jsonrpc_app.app, host='localhost', port=8080)
server prometheus metrics¶
import asyncio
from typing import Any, Callable
import prometheus_client as pc
from aiohttp import web
import pjrpc.server
from pjrpc.server.integration import aiohttp
method_error_count = pc.Counter('method_error_count', 'Method error count', labelnames=['method', 'code'])
method_latency_hist = pc.Histogram('method_latency', 'Method latency', labelnames=['method'])
method_active_count = pc.Gauge('method_active_count', 'Method active count', labelnames=['method'])
async def metrics(request):
return web.Response(body=pc.generate_latest())
http_app = web.Application()
http_app.add_routes([web.get('/metrics', metrics)])
methods = pjrpc.server.MethodRegistry()
@methods.add(context='context')
async def method(context: web.Request):
print("method started")
await asyncio.sleep(1)
print("method finished")
async def latency_metric_middleware(request: pjrpc.Request, context: web.Request, handler: Callable) -> Any:
with method_latency_hist.labels(method=request.method).time():
return await handler(request, context)
async def active_count_metric_middleware(request: pjrpc.Request, context: web.Request, handler: Callable) -> Any:
with method_active_count.labels(method=request.method).track_inprogress():
return await handler(request, context)
async def any_error_handler(
request: pjrpc.Request, context: web.Request, error: pjrpc.exceptions.JsonRpcError,
) -> pjrpc.exceptions.JsonRpcError:
method_error_count.labels(method=request.method, code=error.code).inc()
return error
async def validation_error_handler(
request: pjrpc.Request, context: web.Request, error: pjrpc.exceptions.JsonRpcError,
) -> pjrpc.exceptions.JsonRpcError:
print("validation error occurred")
return error
jsonrpc_app = aiohttp.Application(
'/api/v1',
app=http_app,
middlewares=(
latency_metric_middleware,
active_count_metric_middleware,
),
error_handlers={
-32602: [validation_error_handler],
None: [any_error_handler],
},
)
jsonrpc_app.dispatcher.add_methods(methods)
if __name__ == "__main__":
web.run_app(jsonrpc_app.app, host='localhost', port=8080)
server tracing¶
import asyncio
import opentracing
from opentracing import tags
from aiohttp import web
import pjrpc.server
from pjrpc.server.integration import aiohttp
@web.middleware
async def http_tracing_middleware(request, handler):
"""
aiohttp server tracer.
"""
tracer = opentracing.global_tracer()
try:
span_ctx = tracer.extract(format=opentracing.Format.HTTP_HEADERS, carrier=request.headers)
except (opentracing.InvalidCarrierException, opentracing.SpanContextCorruptedException):
span_ctx = None
span = tracer.start_span(f'http.{request.method}', child_of=span_ctx)
span.set_tag(tags.COMPONENT, 'aiohttp.server')
span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_SERVER)
span.set_tag(tags.PEER_ADDRESS, request.remote)
span.set_tag(tags.HTTP_URL, str(request.url))
span.set_tag(tags.HTTP_METHOD, request.method)
with tracer.scope_manager.activate(span, finish_on_close=True):
response: web.Response = await handler(request)
span.set_tag(tags.HTTP_STATUS_CODE, response.status)
span.set_tag(tags.ERROR, response.status >= 400)
return response
http_app = web.Application(
middlewares=(
http_tracing_middleware,
),
)
methods = pjrpc.server.MethodRegistry()
@methods.add(context='context')
async def method(context):
print("method started")
await asyncio.sleep(1)
print("method finished")
async def jsonrpc_tracing_middleware(request, context, handler):
tracer = opentracing.global_tracer()
span = tracer.start_span(f'jsonrpc.{request.method}')
span.set_tag(tags.COMPONENT, 'pjrpc')
span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_SERVER)
span.set_tag('jsonrpc.version', request.version)
span.set_tag('jsonrpc.id', request.id)
span.set_tag('jsonrpc.method', request.method)
with tracer.scope_manager.activate(span, finish_on_close=True):
response = await handler(request, context)
if response.is_error:
span.set_tag('jsonrpc.error_code', response.error.code)
span.set_tag('jsonrpc.error_message', response.error.message)
span.set_tag(tags.ERROR, True)
else:
span.set_tag(tags.ERROR, False)
return response
jsonrpc_app = aiohttp.Application(
'/api/v1', app=http_app, middlewares=(
jsonrpc_tracing_middleware,
),
)
jsonrpc_app.dispatcher.add_methods(methods)
if __name__ == "__main__":
web.run_app(jsonrpc_app.app, host='localhost', port=8080)
werkzeug server¶
import uuid
import werkzeug
import pjrpc.server
from pjrpc.server.integration import werkzeug as integration
methods = pjrpc.server.MethodRegistry()
@methods.add(context='request')
def add_user(request: werkzeug.Request, user: dict):
user_id = uuid.uuid4().hex
request.environ['app'].users[user_id] = user
return {'id': user_id, **user}
app = integration.JsonRPC('/api/v1')
app.dispatcher.add_methods(methods)
app.users = {}
if __name__ == '__main__':
werkzeug.serving.run_simple('127.0.0.1', 8080, app)
flask OpenAPI specification¶
import uuid
from typing import Any, Optional
import flask
import flask_httpauth
import pydantic
import flask_cors
from werkzeug import security
import pjrpc.server.specs.extractors.pydantic
from pjrpc.server.integration import flask as integration
from pjrpc.server.validators import pydantic as validators
from pjrpc.server.specs import extractors, openapi as specs
app = flask.Flask('myapp')
flask_cors.CORS(app, resources={"/myapp/api/v1/*": {"origins": "*"}})
methods = pjrpc.server.MethodRegistry()
validator = validators.PydanticValidator()
auth = flask_httpauth.HTTPBasicAuth()
credentials = {"admin": security.generate_password_hash("admin")}
@auth.verify_password
def verify_password(username: str, password: str) -> Optional[str]:
if username in credentials and security.check_password_hash(credentials.get(username), password):
return username
class AuthenticatedJsonRPC(integration.JsonRPC):
@auth.login_required
def _rpc_handle(self, dispatcher: pjrpc.server.Dispatcher) -> flask.Response:
return super()._rpc_handle(dispatcher)
class JSONEncoder(pjrpc.JSONEncoder):
def default(self, o: Any) -> Any:
if isinstance(o, pydantic.BaseModel):
return o.dict()
if isinstance(o, uuid.UUID):
return str(o)
return super().default(o)
class UserIn(pydantic.BaseModel):
"""
User registration data.
"""
name: str
surname: str
age: int
class UserOut(UserIn):
"""
Registered user data.
"""
id: uuid.UUID
class AlreadyExistsError(pjrpc.exc.JsonRpcError):
"""
User already registered error.
"""
code = 2001
message = "user already exists"
class NotFoundError(pjrpc.exc.JsonRpcError):
"""
User not found error.
"""
code = 2002
message = "user not found"
@specs.annotate(
tags=['users'],
errors=[AlreadyExistsError],
examples=[
specs.MethodExample(
summary="Simple example",
params=dict(
user={
'name': 'Alex',
'surname': 'Smith',
'age': 25,
},
),
result={
'id': 'c47726c6-a232-45f1-944f-60b98966ff1b',
'name': 'Alex',
'surname': 'Smith',
'age': 25,
},
),
],
)
@methods.add
@validator.validate
def add_user(user: UserIn) -> UserOut:
"""
Creates a user.
:param object user: user data
:return object: registered user
:raise AlreadyExistsError: user already exists
"""
for existing_user in flask.current_app.users_db.values():
if user.name == existing_user.name:
raise AlreadyExistsError()
user_id = uuid.uuid4().hex
flask.current_app.users_db[user_id] = user
return UserOut(id=user_id, **user.dict())
@specs.annotate(
tags=['users'],
errors=[NotFoundError],
examples=[
specs.MethodExample(
summary='Simple example',
params=dict(
user_id='c47726c6-a232-45f1-944f-60b98966ff1b',
),
result={
'id': 'c47726c6-a232-45f1-944f-60b98966ff1b',
'name': 'Alex',
'surname': 'Smith',
'age': 25,
},
),
],
)
@methods.add
@validator.validate
def get_user(user_id: uuid.UUID) -> UserOut:
"""
Returns a user.
:param object user_id: user id
:return object: registered user
:raise NotFoundError: user not found
"""
user = flask.current_app.users_db.get(user_id.hex)
if not user:
raise NotFoundError()
return UserOut(id=user_id, **user.dict())
@specs.annotate(
tags=['users'],
errors=[NotFoundError],
examples=[
specs.MethodExample(
summary='Simple example',
params=dict(
user_id='c47726c6-a232-45f1-944f-60b98966ff1b',
),
result=None,
),
],
)
@methods.add
@validator.validate
def delete_user(user_id: uuid.UUID) -> None:
"""
Deletes a user.
:param object user_id: user id
:raise NotFoundError: user not found
"""
user = flask.current_app.users_db.pop(user_id.hex, None)
if not user:
raise NotFoundError()
json_rpc = AuthenticatedJsonRPC(
'/api/v1',
json_encoder=JSONEncoder,
spec=specs.OpenAPI(
info=specs.Info(version="1.0.0", title="User storage"),
servers=[
specs.Server(
url='http://127.0.0.1:8080',
),
],
security_schemes=dict(
basicAuth=specs.SecurityScheme(
type=specs.SecuritySchemeType.HTTP,
scheme='basic',
),
),
security=[
dict(basicAuth=[]),
],
schema_extractor=extractors.pydantic.PydanticSchemaExtractor(),
ui=specs.SwaggerUI(),
# ui=specs.RapiDoc(),
# ui=specs.ReDoc(),
),
)
json_rpc.dispatcher.add_methods(methods)
app.users_db = {}
myapp = flask.Blueprint('myapp', __name__, url_prefix='/myapp')
json_rpc.init_app(myapp)
app.register_blueprint(myapp)
if __name__ == "__main__":
app.run(port=8080)
aiohttp OpenAPI specification¶
import uuid
from typing import Any
import pydantic
from aiohttp import helpers, web
import pjrpc.server.specs.extractors.pydantic
from pjrpc.server.integration import aiohttp as integration
from pjrpc.server.validators import pydantic as validators
from pjrpc.server.specs import extractors, openapi as specs
methods = pjrpc.server.MethodRegistry()
validator = validators.PydanticValidator()
credentials = {"admin": "admin"}
class JSONEncoder(pjrpc.JSONEncoder):
def default(self, o: Any) -> Any:
if isinstance(o, pydantic.BaseModel):
return o.dict()
if isinstance(o, uuid.UUID):
return str(o)
return super().default(o)
class AuthenticatedJsonRPC(integration.Application):
async def _rpc_handle(self, http_request: web.Request, dispatcher: pjrpc.server.Dispatcher) -> web.Response:
try:
auth = helpers.BasicAuth.decode(http_request.headers.get('Authorization', ''))
except ValueError:
raise web.HTTPUnauthorized
if credentials.get(auth.login) != auth.password:
raise web.HTTPUnauthorized
return await super()._rpc_handle(http_request=http_request, dispatcher=dispatcher)
class UserIn(pydantic.BaseModel):
"""
User registration data.
"""
name: str
surname: str
age: int
class UserOut(UserIn):
"""
Registered user data.
"""
id: uuid.UUID
class AlreadyExistsError(pjrpc.exc.JsonRpcError):
"""
User already registered error.
"""
code = 2001
message = "user already exists"
class NotFoundError(pjrpc.exc.JsonRpcError):
"""
User not found error.
"""
code = 2002
message = "user not found"
@specs.annotate(
tags=['users'],
errors=[AlreadyExistsError],
examples=[
specs.MethodExample(
summary="Simple example",
params=dict(
user={
'name': 'Alex',
'surname': 'Smith',
'age': 25,
},
),
result={
'id': 'c47726c6-a232-45f1-944f-60b98966ff1b',
'name': 'Alex',
'surname': 'Smith',
'age': 25,
},
),
],
)
@methods.add(context='request')
@validator.validate
def add_user(request: web.Request, user: UserIn) -> UserOut:
"""
Creates a user.
:param request: http request
:param object user: user data
:return object: registered user
:raise AlreadyExistsError: user already exists
"""
for existing_user in request.config_dict['users'].values():
if user.name == existing_user.name:
raise AlreadyExistsError()
user_id = uuid.uuid4().hex
request.config_dict['users'][user_id] = user
return UserOut(id=user_id, **user.dict())
@specs.annotate(
tags=['users'],
errors=[NotFoundError],
examples=[
specs.MethodExample(
summary='Simple example',
params=dict(
user_id='c47726c6-a232-45f1-944f-60b98966ff1b',
),
result={
'id': 'c47726c6-a232-45f1-944f-60b98966ff1b',
'name': 'Alex',
'surname': 'Smith',
'age': 25,
},
),
],
)
@methods.add(context='request')
@validator.validate
def get_user(request: web.Request, user_id: uuid.UUID) -> UserOut:
"""
Returns a user.
:param request: http request
:param object user_id: user id
:return object: registered user
:raise NotFoundError: user not found
"""
user = request.config_dict['users'].get(user_id.hex)
if not user:
raise NotFoundError()
return UserOut(id=user_id, **user.dict())
@specs.annotate(
tags=['users'],
errors=[NotFoundError],
examples=[
specs.MethodExample(
summary='Simple example',
params=dict(
user_id='c47726c6-a232-45f1-944f-60b98966ff1b',
),
result=None,
),
],
)
@methods.add(context='request')
@validator.validate
def delete_user(request: web.Request, user_id: uuid.UUID) -> None:
"""
Deletes a user.
:param request: http request
:param object user_id: user id
:raise NotFoundError: user not found
"""
user = request.config_dict['users'].pop(user_id.hex, None)
if not user:
raise NotFoundError()
app = web.Application()
app['users'] = {}
jsonrpc_app = AuthenticatedJsonRPC(
'/api/v1',
json_encoder=JSONEncoder,
spec=specs.OpenAPI(
info=specs.Info(version="1.0.0", title="User storage"),
servers=[
specs.Server(
url='http://127.0.0.1:8080',
),
],
security_schemes=dict(
basicAuth=specs.SecurityScheme(
type=specs.SecuritySchemeType.HTTP,
scheme='basic',
),
),
security=[
dict(basicAuth=[]),
],
schema_extractor=extractors.pydantic.PydanticSchemaExtractor(),
ui=specs.SwaggerUI(),
# ui=specs.RapiDoc(),
# ui=specs.ReDoc(),
),
)
jsonrpc_app.dispatcher.add_methods(methods)
app.add_subapp('/myapp', jsonrpc_app.app)
if __name__ == "__main__":
web.run_app(app, host='localhost', port=8080)
flask OpenRPC specification¶
import uuid
from typing import Any
import flask
import pydantic
from flask_cors import CORS
import pjrpc.server.specs.extractors.pydantic
import pjrpc.server.specs.extractors.docstring
from pjrpc.server.integration import flask as integration
from pjrpc.server.validators import pydantic as validators
from pjrpc.server.specs import extractors, openrpc as specs
app = flask.Flask(__name__)
CORS(app, resources={r"/api/v1/*": {"origins": "*"}})
methods = pjrpc.server.MethodRegistry()
validator = validators.PydanticValidator()
class JsonEncoder(pjrpc.JSONEncoder):
def default(self, o: Any) -> Any:
if isinstance(o, pydantic.BaseModel):
return o.dict()
if isinstance(o, uuid.UUID):
return str(o)
return super().default(o)
class UserIn(pydantic.BaseModel):
"""
User registration data.
"""
name: str
surname: str
age: int
class UserOut(UserIn):
"""
Registered user data.
"""
id: uuid.UUID
class AlreadyExistsError(pjrpc.exc.JsonRpcError):
"""
User already registered error.
"""
code = 2001
message = "user already exists"
class NotFoundError(pjrpc.exc.JsonRpcError):
"""
User not found error.
"""
code = 2002
message = "user not found"
@specs.annotate(
errors=[AlreadyExistsError],
tags=['users'],
examples=[
specs.MethodExample(
name='Simple user',
params=[
specs.ExampleObject(
name='user',
value={
'name': 'Alex',
'surname': 'Smith',
'age': 25,
},
),
],
result=specs.ExampleObject(
name='result',
value={
'id': 'c47726c6-a232-45f1-944f-60b98966ff1b',
'name': 'Alex',
'surname': 'Smith',
'age': 25,
},
),
),
],
)
@methods.add
@validator.validate
def add_user(user: UserIn) -> UserOut:
"""
Adds a new user.
:param object user: user data
:return object: registered user
:raise AlreadyExistsError: user already exists
"""
for existing_user in flask.current_app.users_db.values():
if user.name == existing_user.name:
raise AlreadyExistsError()
user_id = uuid.uuid4().hex
flask.current_app.users_db[user_id] = user
return UserOut(id=user_id, **user.dict())
@specs.annotate(
tags=['users'],
errors=[NotFoundError],
examples=[
specs.MethodExample(
name='Simple example',
params=[
specs.ExampleObject(
name='user',
value={
'user_id': 'c47726c6-a232-45f1-944f-60b98966ff1b',
},
),
],
result=specs.ExampleObject(
name="result",
value={
'id': 'c47726c6-a232-45f1-944f-60b98966ff1b',
'name': 'Alex',
'surname': 'Smith',
'age': 25,
},
),
),
],
)
@methods.add
@validator.validate
def get_user(user_id: uuid.UUID) -> UserOut:
"""
Returns a user.
:param object user_id: user id
:return object: registered user
:raise NotFoundError: user not found
"""
user = flask.current_app.users_db.get(user_id.hex)
if not user:
raise NotFoundError()
return UserOut(id=user_id, **user.dict())
@specs.annotate(
tags=['users'],
errors=[NotFoundError],
examples=[
specs.MethodExample(
name='Simple example',
summary='Simple example',
params=[
specs.ExampleObject(
name='user',
value={
'user_id': 'c47726c6-a232-45f1-944f-60b98966ff1b',
},
),
],
result=specs.ExampleObject(
name="result",
value=None,
),
),
],
)
@methods.add
@validator.validate
def delete_user(user_id: uuid.UUID) -> None:
"""
Deletes a user.
:param object user_id: user id
:raise NotFoundError: user not found
"""
user = flask.current_app.users_db.pop(user_id.hex, None)
if not user:
raise NotFoundError()
json_rpc = integration.JsonRPC(
'/api/v1',
json_encoder=JsonEncoder,
spec=specs.OpenRPC(
info=specs.Info(version="1.0.0", title="User storage"),
servers=[
specs.Server(
name='test',
url='http://127.0.0.1:8080/api/v1/',
summary='test server',
),
],
schema_extractor=extractors.pydantic.PydanticSchemaExtractor(),
),
)
json_rpc.dispatcher.add_methods(methods)
app.users_db = {}
json_rpc.init_app(app)
if __name__ == "__main__":
app.run(port=8080)