Examples#
aio_pika client#
import asyncio
import pjrpc
from pjrpc.client.backend import aio_pika as pjrpc_client
async def main():
client = pjrpc_client.Client('amqp://guest:guest@localhost:5672/v1', 'jsonrpc')
await client.connect()
response: pjrpc.Response = await client.send(pjrpc.Request('sum', params=[1, 2], id=1))
print(f"1 + 2 = {response.result}")
result = await client('sum', a=1, b=2)
print(f"1 + 2 = {result}")
result = await client.proxy.sum(1, 2)
print(f"1 + 2 = {result}")
await client.notify('tick')
if __name__ == "__main__":
asyncio.run(main())
aio_pika server#
import asyncio
import logging
import uuid
from dataclasses import dataclass
import aio_pika
from yarl import URL
import pjrpc
from pjrpc.server.integration import aio_pika as integration
@dataclass
class UserInfo:
"""User information dataclass for the add_user example RPC call"""
username: str
name: str
age: int
@dataclass
class AddedUser(UserInfo):
"""User information dataclass (with uuid) for the add_user example RPC call"""
uuid: uuid.UUID
methods = pjrpc.server.MethodRegistry()
@methods.add
def sum(a: int, b: int) -> int:
"""RPC method implementing examples/aio_pika_client.py's calls to sum(1, 2) -> 3"""
return a + b
@methods.add
def tick() -> None:
"""RPC method implementing examples/aio_pika_client.py's notification 'tick'"""
print("examples/aio_pika_server.py: received tick")
@methods.add(context='message')
def add_user(message: aio_pika.IncomingMessage, user_info: UserInfo) -> AddedUser:
"""Simluate the creation of a user: Receive user info and return it with an uuid4.
:param UserInfo user_info: user data
:returns: user_info with a randomly generated uuid4 added
:rtype: AddedUser"""
return AddedUser(**user_info.__dict__, uuid=uuid.uuid4())
executor = integration.Executor(
broker_url=URL('amqp://guest:guest@localhost:5672/v1'), queue_name='jsonrpc',
)
executor.dispatcher.add_methods(methods)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
logging.info("Example result from a local call to add_user():")
logging.info(add_user(None, UserInfo("username", "firstname lastname", 18)))
loop = asyncio.new_event_loop()
loop.run_until_complete(executor.start())
try:
loop.run_forever()
finally:
loop.run_until_complete(executor.shutdown())
aiohttp class-based handler#
import uuid
from aiohttp import web
import pjrpc.server
from pjrpc.server.integration import aiohttp
methods = pjrpc.server.MethodRegistry()
@methods.view(context='request', prefix='user')
class UserView(pjrpc.server.ViewMixin):
def __init__(self, request: web.Request):
super().__init__()
self._users = request.app['users']
async def add(self, user: dict):
user_id = uuid.uuid4().hex
self._users[user_id] = user
return {'id': user_id, **user}
async def get(self, user_id: str):
user = self._users.get(user_id)
if not user:
pjrpc.exc.JsonRpcError(code=1, message='not found')
return user
jsonrpc_app = aiohttp.Application('/api/v1')
jsonrpc_app.dispatcher.add_methods(methods)
jsonrpc_app.app['users'] = {}
if __name__ == "__main__":
web.run_app(jsonrpc_app.app, host='localhost', port=8080)
aiohttp client#
import asyncio
import pjrpc
from pjrpc.client.backend import aiohttp as pjrpc_client
async def main():
async with pjrpc_client.Client('http://localhost/api/v1') as client:
response = await client.send(pjrpc.Request('sum', params=[1, 2], id=1))
print(f"1 + 2 = {response.result}")
result = await client('sum', a=1, b=2)
print(f"1 + 2 = {result}")
result = await client.proxy.sum(1, 2)
print(f"1 + 2 = {result}")
await client.notify('tick')
asyncio.run(main())
aiohttp client batch request#
import asyncio
import pjrpc
from pjrpc.client.backend import aiohttp as pjrpc_client
async def main():
async with pjrpc_client.Client('http://localhost:8080/api/v1') as client:
batch_response = await client.batch.send(
pjrpc.BatchRequest(
pjrpc.Request('sum', [2, 2], id=1),
pjrpc.Request('sub', [2, 2], id=2),
pjrpc.Request('div', [2, 2], id=3),
pjrpc.Request('mult', [2, 2], id=4),
),
)
print(f"2 + 2 = {batch_response[0].result}")
print(f"2 - 2 = {batch_response[1].result}")
print(f"2 / 2 = {batch_response[2].result}")
print(f"2 * 2 = {batch_response[3].result}")
result = await client.batch('sum', 2, 2)('sub', 2, 2)('div', 2, 2)('mult', 2, 2).call()
print(f"2 + 2 = {result[0]}")
print(f"2 - 2 = {result[1]}")
print(f"2 / 2 = {result[2]}")
print(f"2 * 2 = {result[3]}")
result = await client.batch[
('sum', 2, 2),
('sub', 2, 2),
('div', 2, 2),
('mult', 2, 2),
]
print(f"2 + 2 = {result[0]}")
print(f"2 - 2 = {result[1]}")
print(f"2 / 2 = {result[2]}")
print(f"2 * 2 = {result[3]}")
result = await client.batch.proxy.sum(2, 2).sub(2, 2).div(2, 2).mult(2, 2).call()
print(f"2 + 2 = {result[0]}")
print(f"2 - 2 = {result[1]}")
print(f"2 / 2 = {result[2]}")
print(f"2 * 2 = {result[3]}")
await client.batch.notify('tick').notify('tack').call()
asyncio.run(main())
aiohttp pytest integration#
from unittest import mock
import pytest
import pjrpc
from pjrpc.client.backend import aiohttp as aiohttp_client
from pjrpc.client.integrations.pytest import PjRpcAiohttpMocker
async def test_using_fixture(pjrpc_aiohttp_mocker):
client = aiohttp_client.Client('http://localhost/api/v1')
pjrpc_aiohttp_mocker.add('http://localhost/api/v1', 'sum', result=2)
result = await client.proxy.sum(1, 1)
assert result == 2
pjrpc_aiohttp_mocker.replace(
'http://localhost/api/v1', 'sum', error=pjrpc.exc.JsonRpcError(code=1, message='error', data='oops'),
)
with pytest.raises(pjrpc.exc.JsonRpcError) as exc_info:
await client.proxy.sum(a=1, b=1)
assert exc_info.type is pjrpc.exc.JsonRpcError
assert exc_info.value.code == 1
assert exc_info.value.message == 'error'
assert exc_info.value.data == 'oops'
localhost_calls = pjrpc_aiohttp_mocker.calls['http://localhost/api/v1']
assert localhost_calls[('2.0', 'sum')].call_count == 2
assert localhost_calls[('2.0', 'sum')].mock_calls == [mock.call(1, 1), mock.call(a=1, b=1)]
async def test_using_resource_manager():
client = aiohttp_client.Client('http://localhost/api/v1')
with PjRpcAiohttpMocker() as mocker:
mocker.add('http://localhost/api/v1', 'div', result=2)
result = await client.proxy.div(4, 2)
assert result == 2
localhost_calls = mocker.calls['http://localhost/api/v1']
assert localhost_calls[('2.0', 'div')].mock_calls == [mock.call(4, 2)]
aiohttp server#
import uuid
from aiohttp import web
import pjrpc.server
from pjrpc.server.integration import aiohttp
methods = pjrpc.server.MethodRegistry()
@methods.add(context='request')
async def add_user(request: web.Request, user: dict):
user_id = uuid.uuid4().hex
request.app['users'][user_id] = user
return {'id': user_id, **user}
jsonrpc_app = aiohttp.Application('/api/v1')
jsonrpc_app.dispatcher.add_methods(methods)
jsonrpc_app.app['users'] = {}
if __name__ == "__main__":
web.run_app(jsonrpc_app.app, host='localhost', port=8080)
aiohttp versioning#
import uuid
from aiohttp import web
import pjrpc.server
from pjrpc.server.integration import aiohttp
methods_v1 = pjrpc.server.MethodRegistry()
@methods_v1.add(context='request')
async def add_user_v1(request: web.Request, user: dict):
user_id = uuid.uuid4().hex
request.config_dict['users'][user_id] = user
return {'id': user_id, **user}
methods_v2 = pjrpc.server.MethodRegistry()
@methods_v2.add(context='request')
async def add_user_v2(request: web.Request, user: dict):
user_id = uuid.uuid4().hex
request.config_dict['users'][user_id] = user
return {'id': user_id, **user}
app = web.Application()
app['users'] = {}
app_v1 = aiohttp.Application()
app_v1.dispatcher.add_methods(methods_v1)
app.add_subapp('/api/v1', app_v1.app)
app_v2 = aiohttp.Application()
app_v2.dispatcher.add_methods(methods_v2)
app.add_subapp('/api/v2', app_v2.app)
if __name__ == "__main__":
web.run_app(app, host='localhost', port=8080)
client prometheus metrics#
import time
import prometheus_client as prom_cli
from pjrpc.client import tracer
from pjrpc.client.backend import requests as pjrpc_client
method_latency_hist = prom_cli.Histogram('method_latency', 'Method latency', labelnames=['method'])
method_call_total = prom_cli.Counter('method_call_total', 'Method call count', labelnames=['method'])
method_errors_total = prom_cli.Counter('method_errors_total', 'Method errors count', labelnames=['method', 'code'])
class PrometheusTracer(tracer.Tracer):
def on_request_begin(self, trace_context, request):
trace_context.started_at = time.time()
method_call_total.labels(request.method).inc()
def on_request_end(self, trace_context, request, response):
method_latency_hist.labels(request.method).observe(time.time() - trace_context.started_at)
if response.is_error:
method_call_total.labels(request.method, response.error.code).inc()
def on_error(self, trace_context, request, error):
method_latency_hist.labels(request.method).observe(time.time() - trace_context.started_at)
client = pjrpc_client.Client(
'http://localhost/api/v1', tracers=(
PrometheusTracer(),
),
)
result = client.proxy.sum(1, 2)
client tracing#
import opentracing
from opentracing import tags
from pjrpc.client import tracer
from pjrpc.client.backend import requests as pjrpc_client
class ClientTracer(tracer.Tracer):
def __init__(self):
super().__init__()
self._tracer = opentracing.global_tracer()
def on_request_begin(self, trace_context, request):
span = self._tracer.start_active_span(f'jsonrpc.{request.method}').span
span.set_tag(tags.COMPONENT, 'pjrpc.client')
span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_CLIENT)
def on_request_end(self, trace_context, request, response):
span = self._tracer.active_span
span.set_tag(tags.ERROR, response.is_error)
if response.is_error:
span.set_tag('jsonrpc.error_code', response.error.code)
span.set_tag('jsonrpc.error_message', response.error.message)
span.finish()
def on_error(self, trace_context, request, error):
span = self._tracer.active_span
span.set_tag(tags.ERROR, True)
span.finish()
client = pjrpc_client.Client(
'http://localhost/api/v1', tracers=(
ClientTracer(),
),
)
result = client.proxy.sum(1, 2)
flask class-based handler#
import uuid
import flask
import pjrpc
from pjrpc.server.integration import flask as integration
app = flask.Flask(__name__)
methods = pjrpc.server.MethodRegistry()
@methods.view(prefix='user')
class UserView(pjrpc.server.ViewMixin):
def __init__(self):
super().__init__()
self._users = flask.current_app.users
def add(self, user: dict):
user_id = uuid.uuid4().hex
self._users[user_id] = user
return {'id': user_id, **user}
def get(self, user_id: str):
user = self._users.get(user_id)
if not user:
pjrpc.exc.JsonRpcError(code=1, message='not found')
return user
json_rpc = integration.JsonRPC('/api/v1')
json_rpc.dispatcher.add_methods(methods)
app.users = {}
json_rpc.init_app(app)
if __name__ == "__main__":
app.run(port=8080)
flask server#
import uuid
import flask
import pjrpc
from pjrpc.server.integration import flask as integration
app = flask.Flask(__name__)
methods = pjrpc.server.MethodRegistry()
@methods.add
def add_user(user: dict):
user_id = uuid.uuid4().hex
flask.current_app.users[user_id] = user
return {'id': user_id, **user}
json_rpc = integration.JsonRPC('/api/v1')
json_rpc.dispatcher.add_methods(methods)
app.users = {}
json_rpc.init_app(app)
if __name__ == "__main__":
app.run(port=8080)
flask versioning#
import uuid
import flask
import pjrpc.server
from pjrpc.server.integration import flask as integration
methods_v1 = pjrpc.server.MethodRegistry()
@methods_v1.add
def add_user_v1(user: dict):
user_id = uuid.uuid4().hex
flask.current_app.users[user_id] = user
return {'id': user_id, **user}
methods_v2 = pjrpc.server.MethodRegistry()
@methods_v2.add
def add_user_v2(user: dict):
user_id = uuid.uuid4().hex
flask.current_app.users[user_id] = user
return {'id': user_id, **user}
app_v1 = flask.blueprints.Blueprint('v1', __name__)
json_rpc = integration.JsonRPC('/api/v1')
json_rpc.dispatcher.add_methods(methods_v1)
json_rpc.init_app(app_v1)
app_v2 = flask.blueprints.Blueprint('v2', __name__)
json_rpc = integration.JsonRPC('/api/v2')
json_rpc.dispatcher.add_methods(methods_v2)
json_rpc.init_app(app_v2)
app = flask.Flask(__name__)
app.register_blueprint(app_v1)
app.register_blueprint(app_v2)
app.users = {}
if __name__ == "__main__":
app.run(port=8080)
httpserver#
import http.server
import socketserver
import uuid
import pjrpc
import pjrpc.server
class JsonRpcHandler(http.server.BaseHTTPRequestHandler):
"""
JSON-RPC handler.
"""
def do_POST(self):
"""
Handles JSON-RPC request.
"""
content_type = self.headers.get('Content-Type')
if content_type not in pjrpc.common.REQUEST_CONTENT_TYPES:
self.send_error(http.HTTPStatus.UNSUPPORTED_MEDIA_TYPE)
return
try:
content_length = int(self.headers.get('Content-Length', -1))
request_text = self.rfile.read(content_length).decode()
except UnicodeDecodeError:
self.send_error(http.HTTPStatus.BAD_REQUEST)
return
response_text = self.server.dispatcher.dispatch(request_text, context=self)
if response_text is None:
self.send_response_only(http.HTTPStatus.OK)
self.end_headers()
else:
self.send_response(http.HTTPStatus.OK)
self.send_header("Content-type", pjrpc.common.DEFAULT_CONTENT_TYPE)
self.end_headers()
self.wfile.write(response_text.encode())
class JsonRpcServer(http.server.HTTPServer):
"""
:py:class:`http.server.HTTPServer` based JSON-RPC server.
:param path: JSON-RPC handler base path
:param kwargs: arguments to be passed to the dispatcher :py:class:`pjrpc.server.Dispatcher`
"""
def __init__(self, server_address, RequestHandlerClass=JsonRpcHandler, bind_and_activate=True, **kwargs):
super().__init__(server_address, RequestHandlerClass, bind_and_activate)
self._dispatcher = pjrpc.server.Dispatcher(**kwargs)
@property
def dispatcher(self):
"""
JSON-RPC method dispatcher.
"""
return self._dispatcher
methods = pjrpc.server.MethodRegistry()
@methods.add(context='request')
def add_user(request: http.server.BaseHTTPRequestHandler, user: dict):
user_id = uuid.uuid4().hex
request.server.users[user_id] = user
return {'id': user_id, **user}
class ThreadingJsonRpcServer(socketserver.ThreadingMixIn, JsonRpcServer):
users = {}
with ThreadingJsonRpcServer(("localhost", 8080)) as server:
server.dispatcher.add_methods(methods)
server.serve_forever()
jsonschema validator#
import uuid
from aiohttp import web
import pjrpc.server
from pjrpc.server.integration import aiohttp
from pjrpc.server.validators import jsonschema as validators
methods = pjrpc.server.MethodRegistry()
validator = validators.JsonSchemaValidator()
contact_schema = {
'type': 'object',
'properties': {
'type': {
'type': 'string',
'enum': ['phone', 'email'],
},
'value': {'type': 'string'},
},
'required': ['type', 'value'],
}
user_schema = {
'type': 'object',
'properties': {
'name': {'type': 'string'},
'surname': {'type': 'string'},
'age': {'type': 'integer'},
'contacts': {
'type': 'array',
'items': contact_schema,
},
},
'required': ['name', 'surname', 'age', 'contacts'],
}
params_schema = {
'type': 'object',
'properties': {
'user': user_schema,
},
'required': ['user'],
}
@methods.add(context='request')
@validator.validate(schema=params_schema)
async def add_user(request: web.Request, user):
user_id = uuid.uuid4().hex
request.app['users'][user_id] = user
return {'id': user_id, **user}
jsonrpc_app = aiohttp.Application('/api/v1')
jsonrpc_app.dispatcher.add_methods(methods)
jsonrpc_app.app['users'] = {}
if __name__ == "__main__":
web.run_app(jsonrpc_app.app, host='localhost', port=8080)
kombu client#
import pjrpc
from pjrpc.client.backend import kombu as pjrpc_client
client = pjrpc_client.Client(
'amqp://guest:guest@localhost:5672/v1',
'jsonrpc',
# Compatible with queue of examples/aio_pika_*
# and works better with examples/kombu_server.py:
result_queue_args={"durable": False},
)
response: pjrpc.Response = client.send(pjrpc.Request('sum', params=[1, 2], id=1))
print(f"1 + 2 = {response.result}")
result = client('sum', a=1, b=2)
print(f"1 + 2 = {result}")
result = client.proxy.sum(1, 2)
print(f"1 + 2 = {result}")
client.notify('tick')
kombu server#
import uuid
import kombu
import pjrpc
from pjrpc.server.integration import kombu as integration
methods = pjrpc.server.MethodRegistry()
@methods.add(context='message')
def add_user(message: kombu.Message, user: dict):
user_id = uuid.uuid4().hex
return {'id': user_id, **user}
@methods.add # type: ignore
def sum(a: int, b: int) -> int:
"""RPC method sum(a, b) for kombu_client.py and aio_pika_client.py"""
return a + b
@methods.add # type: ignore
def tick() -> None:
"""RPC notification "tick" for kombu_client.py and aio_pika_client.py"""
print("received tick")
# Note: The server may not work well with examples/kombu_client.py yet.
# Use with examples/aio_pika_client.py in case server or client gets stuck.
if __name__ == "__main__":
executor = integration.Executor(
"amqp://guest:guest@localhost:5672/v1",
queue_name="jsonrpc",
# Compatible with queue of examples/aio_pika_*
# and works better with examples/kombu_client.py:
queue_args={"durable": False},
)
executor.dispatcher.add_methods(methods)
executor.run()
middlewares#
from typing import Any
from aiohttp import web
import pjrpc.server
from pjrpc.common import Request
from pjrpc.server.integration import aiohttp
from pjrpc.server.typedefs import AsyncHandlerType, ContextType, MiddlewareResponse
methods = pjrpc.server.MethodRegistry()
@methods.add(context='request')
async def method(request: Any) -> None:
print("method")
async def middleware1(
request: Request, context: ContextType, handler: AsyncHandlerType,
) -> MiddlewareResponse:
print("middleware1 started")
result = await handler(request, context)
print("middleware1 finished")
return result
async def middleware2(
request: Request, context: ContextType, handler: AsyncHandlerType,
) -> MiddlewareResponse:
print("middleware2 started")
result = await handler(request, context)
print("middleware2 finished")
return result
jsonrpc_app = aiohttp.Application(
'/api/v1', middlewares=(
middleware1,
middleware2,
),
)
jsonrpc_app.dispatcher.add_methods(methods)
if __name__ == "__main__":
web.run_app(jsonrpc_app.app, host='localhost', port=8080)
multiple clients#
import pjrpc
from pjrpc.client.backend import requests as jrpc_client
class ErrorV1(pjrpc.exc.JsonRpcError):
@classmethod
def get_error_cls(cls, code, default):
return next(iter((c for c in cls.__subclasses__() if getattr(c, 'code', None) == code)), default)
class PermissionDenied(ErrorV1):
code = 1
message = 'permission denied'
class ErrorV2(pjrpc.exc.JsonRpcError):
@classmethod
def get_error_cls(cls, code, default):
return next(iter((c for c in cls.__subclasses__() if getattr(c, 'code', None) == code)), default)
class ResourceNotFound(ErrorV2):
code = 1
message = 'resource not found'
client_v1 = jrpc_client.Client('http://localhost:8080/api/v1', error_cls=ErrorV1)
client_v2 = jrpc_client.Client('http://localhost:8080/api/v2', error_cls=ErrorV2)
try:
response: pjrpc.Response = client_v1.proxy.add_user(user={})
except PermissionDenied as e:
print(e)
try:
response: pjrpc.Response = client_v2.proxy.add_user(user={})
except ResourceNotFound as e:
print(e)
pydantic validator#
import enum
import uuid
from typing import List
import pydantic
from aiohttp import web
import pjrpc.server
from pjrpc.server.integration import aiohttp
from pjrpc.server.validators import pydantic as validators
methods = pjrpc.server.MethodRegistry()
validator = validators.PydanticValidator()
class ContactType(enum.Enum):
PHONE = 'phone'
EMAIL = 'email'
class Contact(pydantic.BaseModel):
type: ContactType
value: str
class User(pydantic.BaseModel):
name: str
surname: str
age: int
contacts: List[Contact]
@methods.add(context='request')
@validator.validate
async def add_user(request: web.Request, user: User):
user_id = uuid.uuid4()
request.app['users'][user_id] = user
return {'id': user_id, **user.dict()}
class JSONEncoder(pjrpc.server.JSONEncoder):
def default(self, o):
if isinstance(o, uuid.UUID):
return o.hex
if isinstance(o, enum.Enum):
return o.value
return super().default(o)
jsonrpc_app = aiohttp.Application('/api/v1', json_encoder=JSONEncoder)
jsonrpc_app.dispatcher.add_methods(methods)
jsonrpc_app.app['users'] = {}
if __name__ == "__main__":
web.run_app(jsonrpc_app.app, host='localhost', port=8080)
requests client#
import pjrpc
from pjrpc.client.backend import requests as pjrpc_client
client = pjrpc_client.Client('http://localhost/api/v1')
response: pjrpc.Response = client.send(pjrpc.Request('sum', params=[1, 2], id=1))
print(f"1 + 2 = {response.result}")
result = client('sum', a=1, b=2)
print(f"1 + 2 = {result}")
result = client.proxy.sum(1, 2)
print(f"1 + 2 = {result}")
client.notify('tick')
requests pytest#
from unittest import mock
import pytest
import pjrpc
from pjrpc.client.backend import requests as requests_client
from pjrpc.client.integrations.pytest import PjRpcRequestsMocker
def test_using_fixture(pjrpc_requests_mocker):
client = requests_client.Client('http://localhost/api/v1')
pjrpc_requests_mocker.add('http://localhost/api/v1', 'sum', result=2)
result = client.proxy.sum(1, 1)
assert result == 2
pjrpc_requests_mocker.replace(
'http://localhost/api/v1', 'sum', error=pjrpc.exc.JsonRpcError(code=1, message='error', data='oops'),
)
with pytest.raises(pjrpc.exc.JsonRpcError) as exc_info:
client.proxy.sum(a=1, b=1)
assert exc_info.type is pjrpc.exc.JsonRpcError
assert exc_info.value.code == 1
assert exc_info.value.message == 'error'
assert exc_info.value.data == 'oops'
localhost_calls = pjrpc_requests_mocker.calls['http://localhost/api/v1']
assert localhost_calls[('2.0', 'sum')].call_count == 2
assert localhost_calls[('2.0', 'sum')].mock_calls == [mock.call(1, 1), mock.call(a=1, b=1)]
client = requests_client.Client('http://localhost/api/v2')
with pytest.raises(ConnectionRefusedError):
client.proxy.sum(1, 1)
def test_using_resource_manager():
client = requests_client.Client('http://localhost/api/v1')
with PjRpcRequestsMocker() as mocker:
mocker.add('http://localhost/api/v1', 'mult', result=4)
mocker.add('http://localhost/api/v1', 'div', callback=lambda a, b: a/b)
result = client.batch.proxy.div(4, 2).mult(2, 2).call()
assert result == (2, 4)
localhost_calls = mocker.calls['http://localhost/api/v1']
assert localhost_calls[('2.0', 'div')].mock_calls == [mock.call(4, 2)]
assert localhost_calls[('2.0', 'mult')].mock_calls == [mock.call(2, 2)]
with pytest.raises(pjrpc.exc.MethodNotFoundError):
client.proxy.sub(4, 2)
sentry#
import sentry_sdk
from aiohttp import web
import pjrpc.server
from pjrpc.server.integration import aiohttp
methods = pjrpc.server.MethodRegistry()
@methods.add(context='request')
async def method(request):
print("method")
async def sentry_middleware(request, context, handler):
try:
return await handler(request, context)
except pjrpc.exceptions.JsonRpcError as e:
sentry_sdk.capture_exception(e)
raise
jsonrpc_app = aiohttp.Application(
'/api/v1', middlewares=(
sentry_middleware,
),
)
jsonrpc_app.dispatcher.add_methods(methods)
if __name__ == "__main__":
web.run_app(jsonrpc_app.app, host='localhost', port=8080)
server prometheus metrics#
import asyncio
from typing import Any, Callable
import prometheus_client as pc
from aiohttp import web
import pjrpc.server
from pjrpc.server.integration import aiohttp
method_error_count = pc.Counter('method_error_count', 'Method error count', labelnames=['method', 'code'])
method_latency_hist = pc.Histogram('method_latency', 'Method latency', labelnames=['method'])
method_active_count = pc.Gauge('method_active_count', 'Method active count', labelnames=['method'])
async def metrics(request):
return web.Response(body=pc.generate_latest())
http_app = web.Application()
http_app.add_routes([web.get('/metrics', metrics)])
methods = pjrpc.server.MethodRegistry()
@methods.add(context='context')
async def method(context: web.Request):
print("method started")
await asyncio.sleep(1)
print("method finished")
async def latency_metric_middleware(request: pjrpc.Request, context: web.Request, handler: Callable) -> Any:
with method_latency_hist.labels(method=request.method).time():
return await handler(request, context)
async def active_count_metric_middleware(request: pjrpc.Request, context: web.Request, handler: Callable) -> Any:
with method_active_count.labels(method=request.method).track_inprogress():
return await handler(request, context)
async def any_error_handler(
request: pjrpc.Request, context: web.Request, error: pjrpc.exceptions.JsonRpcError,
) -> pjrpc.exceptions.JsonRpcError:
method_error_count.labels(method=request.method, code=error.code).inc()
return error
async def validation_error_handler(
request: pjrpc.Request, context: web.Request, error: pjrpc.exceptions.JsonRpcError,
) -> pjrpc.exceptions.JsonRpcError:
print("validation error occurred")
return error
jsonrpc_app = aiohttp.Application(
'/api/v1',
app=http_app,
middlewares=(
latency_metric_middleware,
active_count_metric_middleware,
),
error_handlers={
-32602: [validation_error_handler],
None: [any_error_handler],
},
)
jsonrpc_app.dispatcher.add_methods(methods)
if __name__ == "__main__":
web.run_app(jsonrpc_app.app, host='localhost', port=8080)
server tracing#
import asyncio
import opentracing
from aiohttp import web
from opentracing import tags
import pjrpc.server
from pjrpc.server.integration import aiohttp
@web.middleware
async def http_tracing_middleware(request, handler):
"""
aiohttp server tracer.
"""
tracer = opentracing.global_tracer()
try:
span_ctx = tracer.extract(format=opentracing.Format.HTTP_HEADERS, carrier=request.headers)
except (opentracing.InvalidCarrierException, opentracing.SpanContextCorruptedException):
span_ctx = None
span = tracer.start_span(f'http.{request.method}', child_of=span_ctx)
span.set_tag(tags.COMPONENT, 'aiohttp.server')
span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_SERVER)
span.set_tag(tags.PEER_ADDRESS, request.remote)
span.set_tag(tags.HTTP_URL, str(request.url))
span.set_tag(tags.HTTP_METHOD, request.method)
with tracer.scope_manager.activate(span, finish_on_close=True):
response: web.Response = await handler(request)
span.set_tag(tags.HTTP_STATUS_CODE, response.status)
span.set_tag(tags.ERROR, response.status >= 400)
return response
http_app = web.Application(
middlewares=(
http_tracing_middleware,
),
)
methods = pjrpc.server.MethodRegistry()
@methods.add(context='context')
async def method(context):
print("method started")
await asyncio.sleep(1)
print("method finished")
async def jsonrpc_tracing_middleware(request, context, handler):
tracer = opentracing.global_tracer()
span = tracer.start_span(f'jsonrpc.{request.method}')
span.set_tag(tags.COMPONENT, 'pjrpc')
span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_SERVER)
span.set_tag('jsonrpc.version', request.version)
span.set_tag('jsonrpc.id', request.id)
span.set_tag('jsonrpc.method', request.method)
with tracer.scope_manager.activate(span, finish_on_close=True):
response = await handler(request, context)
if response.is_error:
span.set_tag('jsonrpc.error_code', response.error.code)
span.set_tag('jsonrpc.error_message', response.error.message)
span.set_tag(tags.ERROR, True)
else:
span.set_tag(tags.ERROR, False)
return response
jsonrpc_app = aiohttp.Application(
'/api/v1', app=http_app, middlewares=(
jsonrpc_tracing_middleware,
),
)
jsonrpc_app.dispatcher.add_methods(methods)
if __name__ == "__main__":
web.run_app(jsonrpc_app.app, host='localhost', port=8080)
werkzeug server#
import uuid
import werkzeug
import pjrpc.server
from pjrpc.server.integration import werkzeug as integration
methods = pjrpc.server.MethodRegistry()
@methods.add(context='request')
def add_user(request: werkzeug.Request, user: dict):
user_id = uuid.uuid4().hex
request.environ['app'].users[user_id] = user
return {'id': user_id, **user}
app = integration.JsonRPC('/api/v1')
app.dispatcher.add_methods(methods)
app.users = {}
if __name__ == '__main__':
werkzeug.serving.run_simple('127.0.0.1', 8080, app)
flask OpenAPI specification#
import uuid
from typing import Any, Optional
import flask
import flask_cors
import flask_httpauth
import pydantic
from werkzeug import security
import pjrpc.server.specs.extractors.docstring
import pjrpc.server.specs.extractors.pydantic
from pjrpc.server.integration import flask as integration
from pjrpc.server.specs import extractors
from pjrpc.server.specs import openapi as specs
from pjrpc.server.validators import pydantic as validators
app = flask.Flask('myapp')
flask_cors.CORS(app, resources={"/myapp/api/v1/*": {"origins": "*"}})
methods = pjrpc.server.MethodRegistry()
validator = validators.PydanticValidator()
auth = flask_httpauth.HTTPBasicAuth()
credentials = {"admin": security.generate_password_hash("admin")}
@auth.verify_password
def verify_password(username: str, password: str) -> Optional[str]:
if username in credentials and security.check_password_hash(credentials.get(username), password):
return username
class AuthenticatedJsonRPC(integration.JsonRPC):
@auth.login_required
def _rpc_handle(self, dispatcher: pjrpc.server.Dispatcher) -> flask.Response:
return super()._rpc_handle(dispatcher)
class JSONEncoder(pjrpc.JSONEncoder):
def default(self, o: Any) -> Any:
if isinstance(o, pydantic.BaseModel):
return o.dict()
if isinstance(o, uuid.UUID):
return str(o)
return super().default(o)
class UserIn(pydantic.BaseModel):
"""
User registration data.
"""
name: str
surname: str
age: int
class UserOut(UserIn):
"""
Registered user data.
"""
id: uuid.UUID
class AlreadyExistsError(pjrpc.exc.JsonRpcError):
"""
User already registered error.
"""
code = 2001
message = "user already exists"
class NotFoundError(pjrpc.exc.JsonRpcError):
"""
User not found error.
"""
code = 2002
message = "user not found"
@specs.annotate(
tags=['users'],
errors=[AlreadyExistsError],
examples=[
specs.MethodExample(
summary="Simple example",
params=dict(
user={
'name': 'John',
'surname': 'Doe',
'age': 25,
},
),
result={
'id': 'c47726c6-a232-45f1-944f-60b98966ff1b',
'name': 'John',
'surname': 'Doe',
'age': 25,
},
),
],
)
@methods.add
@validator.validate
def add_user(user: UserIn) -> UserOut:
"""
Creates a user.
:param object user: user data
:return object: registered user
:raise AlreadyExistsError: user already exists
"""
for existing_user in flask.current_app.users_db.values():
if user.name == existing_user.name:
raise AlreadyExistsError()
user_id = uuid.uuid4().hex
flask.current_app.users_db[user_id] = user
return UserOut(id=user_id, **user.dict())
@specs.annotate(
tags=['users'],
errors=[NotFoundError],
examples=[
specs.MethodExample(
summary='Simple example',
params=dict(
user_id='c47726c6-a232-45f1-944f-60b98966ff1b',
),
result={
'id': 'c47726c6-a232-45f1-944f-60b98966ff1b',
'name': 'John',
'surname': 'Doe',
'age': 25,
},
),
],
)
@methods.add
@validator.validate
def get_user(user_id: uuid.UUID) -> UserOut:
"""
Returns a user.
:param object user_id: user id
:return object: registered user
:raise NotFoundError: user not found
"""
user = flask.current_app.users_db.get(user_id.hex)
if not user:
raise NotFoundError()
return UserOut(id=user_id, **user.dict())
@specs.annotate(
tags=['users'],
errors=[NotFoundError],
examples=[
specs.MethodExample(
summary='Simple example',
params=dict(
user_id='c47726c6-a232-45f1-944f-60b98966ff1b',
),
result=None,
),
],
)
@methods.add
@validator.validate
def delete_user(user_id: uuid.UUID) -> None:
"""
Deletes a user.
:param object user_id: user id
:raise NotFoundError: user not found
"""
user = flask.current_app.users_db.pop(user_id.hex, None)
if not user:
raise NotFoundError()
json_rpc = AuthenticatedJsonRPC(
'/api/v1',
json_encoder=JSONEncoder,
spec=specs.OpenAPI(
info=specs.Info(version="1.0.0", title="User storage"),
servers=[
specs.Server(
url='http://127.0.0.1:8080',
),
],
security_schemes=dict(
basicAuth=specs.SecurityScheme(
type=specs.SecuritySchemeType.HTTP,
scheme='basic',
),
),
security=[
dict(basicAuth=[]),
],
schema_extractors=[
extractors.docstring.DocstringSchemaExtractor(),
extractors.pydantic.PydanticSchemaExtractor(),
],
ui=specs.SwaggerUI(),
# ui=specs.RapiDoc(),
# ui=specs.ReDoc(),
),
)
json_rpc.dispatcher.add_methods(methods)
app.users_db = {}
myapp = flask.Blueprint('myapp', __name__, url_prefix='/myapp')
json_rpc.init_app(myapp)
app.register_blueprint(myapp)
if __name__ == "__main__":
app.run(port=8080)
aiohttp OpenAPI specification#
import uuid
from typing import Any
import aiohttp_cors
import pydantic
from aiohttp import helpers, web
import pjrpc.server.specs.extractors.docstring
import pjrpc.server.specs.extractors.pydantic
from pjrpc.server.integration import aiohttp as integration
from pjrpc.server.specs import extractors
from pjrpc.server.specs import openapi as specs
from pjrpc.server.validators import pydantic as validators
methods = pjrpc.server.MethodRegistry()
validator = validators.PydanticValidator()
credentials = {"admin": "admin"}
class JSONEncoder(pjrpc.JSONEncoder):
def default(self, o: Any) -> Any:
if isinstance(o, pydantic.BaseModel):
return o.dict()
if isinstance(o, uuid.UUID):
return str(o)
return super().default(o)
class AuthenticatedJsonRPC(integration.Application):
async def _rpc_handle(self, http_request: web.Request, dispatcher: pjrpc.server.Dispatcher) -> web.Response:
try:
auth = helpers.BasicAuth.decode(http_request.headers.get('Authorization', ''))
except ValueError:
raise web.HTTPUnauthorized
if credentials.get(auth.login) != auth.password:
raise web.HTTPUnauthorized
return await super()._rpc_handle(http_request=http_request, dispatcher=dispatcher)
class UserIn(pydantic.BaseModel):
"""
User registration data.
"""
name: str
surname: str
age: int
class UserOut(UserIn):
"""
Registered user data.
"""
id: uuid.UUID
class AlreadyExistsError(pjrpc.exc.JsonRpcError):
"""
User already registered error.
"""
code = 2001
message = "user already exists"
class NotFoundError(pjrpc.exc.JsonRpcError):
"""
User not found error.
"""
code = 2002
message = "user not found"
@specs.annotate(
tags=['users'],
errors=[AlreadyExistsError],
examples=[
specs.MethodExample(
summary="Simple example",
params=dict(
user={
'name': 'John',
'surname': 'Doe',
'age': 25,
},
),
result={
'id': 'c47726c6-a232-45f1-944f-60b98966ff1b',
'name': 'John',
'surname': 'Doe',
'age': 25,
},
),
],
)
@methods.add(context='request')
@validator.validate
def add_user(request: web.Request, user: UserIn) -> UserOut:
"""
Creates a user.
:param request: http request
:param object user: user data
:return object: registered user
:raise AlreadyExistsError: user already exists
"""
for existing_user in request.config_dict['users'].values():
if user.name == existing_user.name:
raise AlreadyExistsError()
user_id = uuid.uuid4().hex
request.config_dict['users'][user_id] = user
return UserOut(id=user_id, **user.dict())
@specs.annotate(
tags=['users'],
errors=[NotFoundError],
examples=[
specs.MethodExample(
summary='Simple example',
params=dict(
user_id='c47726c6-a232-45f1-944f-60b98966ff1b',
),
result={
'id': 'c47726c6-a232-45f1-944f-60b98966ff1b',
'name': 'John',
'surname': 'Doe',
'age': 25,
},
),
],
)
@methods.add(context='request')
@validator.validate
def get_user(request: web.Request, user_id: uuid.UUID) -> UserOut:
"""
Returns a user.
:param request: http request
:param object user_id: user id
:return object: registered user
:raise NotFoundError: user not found
"""
user = request.config_dict['users'].get(user_id.hex)
if not user:
raise NotFoundError()
return UserOut(id=user_id, **user.dict())
@specs.annotate(
tags=['users'],
errors=[NotFoundError],
examples=[
specs.MethodExample(
summary='Simple example',
params=dict(
user_id='c47726c6-a232-45f1-944f-60b98966ff1b',
),
result=None,
),
],
)
@methods.add(context='request')
@validator.validate
def delete_user(request: web.Request, user_id: uuid.UUID) -> None:
"""
Deletes a user.
:param request: http request
:param object user_id: user id
:raise NotFoundError: user not found
"""
user = request.config_dict['users'].pop(user_id.hex, None)
if not user:
raise NotFoundError()
app = web.Application()
app['users'] = {}
jsonrpc_app = AuthenticatedJsonRPC(
'/api/v1',
json_encoder=JSONEncoder,
spec=specs.OpenAPI(
info=specs.Info(version="1.0.0", title="User storage"),
servers=[
specs.Server(
url='http://127.0.0.1:8080',
),
],
security_schemes=dict(
basicAuth=specs.SecurityScheme(
type=specs.SecuritySchemeType.HTTP,
scheme='basic',
),
),
security=[
dict(basicAuth=[]),
],
schema_extractors=[
extractors.docstring.DocstringSchemaExtractor(),
extractors.pydantic.PydanticSchemaExtractor(),
],
ui=specs.SwaggerUI(),
# ui=specs.RapiDoc(),
# ui=specs.ReDoc(),
),
)
jsonrpc_app.dispatcher.add_methods(methods)
app.add_subapp('/myapp', jsonrpc_app.app)
cors = aiohttp_cors.setup(
app, defaults={
'*': aiohttp_cors.ResourceOptions(
allow_credentials=True,
expose_headers='*',
allow_headers='*',
),
},
)
for route in list(app.router.routes()):
cors.add(route)
if __name__ == "__main__":
web.run_app(app, host='localhost', port=8080)
flask OpenRPC specification#
import uuid
from typing import Any
import flask
import pydantic
from flask_cors import CORS
import pjrpc.server.specs.extractors.docstring
import pjrpc.server.specs.extractors.pydantic
from pjrpc.server.integration import flask as integration
from pjrpc.server.specs import extractors
from pjrpc.server.specs import openrpc as specs
from pjrpc.server.validators import pydantic as validators
app = flask.Flask(__name__)
CORS(app, resources={r"/api/v1/*": {"origins": "*"}})
methods = pjrpc.server.MethodRegistry()
validator = validators.PydanticValidator()
class JsonEncoder(pjrpc.JSONEncoder):
def default(self, o: Any) -> Any:
if isinstance(o, pydantic.BaseModel):
return o.dict()
if isinstance(o, uuid.UUID):
return str(o)
return super().default(o)
class UserIn(pydantic.BaseModel):
"""
User registration data.
"""
name: str
surname: str
age: int
class UserOut(UserIn):
"""
Registered user data.
"""
id: uuid.UUID
class AlreadyExistsError(pjrpc.exc.JsonRpcError):
"""
User already registered error.
"""
code = 2001
message = "user already exists"
class NotFoundError(pjrpc.exc.JsonRpcError):
"""
User not found error.
"""
code = 2002
message = "user not found"
@specs.annotate(
errors=[AlreadyExistsError],
tags=['users'],
examples=[
specs.MethodExample(
name='Simple user',
params=[
specs.ExampleObject(
name='user',
value={
'name': 'John',
'surname': 'Doe',
'age': 25,
},
),
],
result=specs.ExampleObject(
name='result',
value={
'id': 'c47726c6-a232-45f1-944f-60b98966ff1b',
'name': 'John',
'surname': 'Doe',
'age': 25,
},
),
),
],
)
@methods.add
@validator.validate
def add_user(user: UserIn) -> UserOut:
"""
Adds a new user.
:param object user: user data
:return object: registered user
:raise AlreadyExistsError: user already exists
"""
for existing_user in flask.current_app.users_db.values():
if user.name == existing_user.name:
raise AlreadyExistsError()
user_id = uuid.uuid4().hex
flask.current_app.users_db[user_id] = user
return UserOut(id=user_id, **user.dict())
@specs.annotate(
tags=['users'],
errors=[NotFoundError],
examples=[
specs.MethodExample(
name='Simple example',
params=[
specs.ExampleObject(
name='user',
value={
'user_id': 'c47726c6-a232-45f1-944f-60b98966ff1b',
},
),
],
result=specs.ExampleObject(
name="result",
value={
'id': 'c47726c6-a232-45f1-944f-60b98966ff1b',
'name': 'John',
'surname': 'Doe',
'age': 25,
},
),
),
],
)
@methods.add
@validator.validate
def get_user(user_id: uuid.UUID) -> UserOut:
"""
Returns a user.
:param object user_id: user id
:return object: registered user
:raise NotFoundError: user not found
"""
user = flask.current_app.users_db.get(user_id.hex)
if not user:
raise NotFoundError()
return UserOut(id=user_id, **user.dict())
@specs.annotate(
tags=['users'],
errors=[NotFoundError],
examples=[
specs.MethodExample(
name='Simple example',
summary='Simple example',
params=[
specs.ExampleObject(
name='user',
value={
'user_id': 'c47726c6-a232-45f1-944f-60b98966ff1b',
},
),
],
result=specs.ExampleObject(
name="result",
value=None,
),
),
],
)
@methods.add
@validator.validate
def delete_user(user_id: uuid.UUID) -> None:
"""
Deletes a user.
:param object user_id: user id
:raise NotFoundError: user not found
"""
user = flask.current_app.users_db.pop(user_id.hex, None)
if not user:
raise NotFoundError()
json_rpc = integration.JsonRPC(
'/api/v1',
json_encoder=JsonEncoder,
spec=specs.OpenRPC(
info=specs.Info(version="1.0.0", title="User storage"),
servers=[
specs.Server(
name='test',
url='http://127.0.0.1:8080/api/v1/',
summary='test server',
),
],
schema_extractor=extractors.pydantic.PydanticSchemaExtractor(),
),
)
json_rpc.dispatcher.add_methods(methods)
app.users_db = {}
json_rpc.init_app(app)
if __name__ == "__main__":
app.run(port=8080)