Examples#
aio_pika client#
import asyncio
from yarl import URL
import pjrpc
from pjrpc.client.backend import aio_pika as pjrpc_client
async def main():
async with pjrpc_client.Client(
broker_url=URL('amqp://guest:guest@localhost:5672/v1'),
routing_key='math-service',
) as client:
response: pjrpc.Response = await client.send(pjrpc.Request('sum', params=[1, 2], id=1))
print(f"1 + 2 = {response.result}")
result = await client('sum', a=1, b=2)
print(f"1 + 2 = {result}")
result = await client.proxy.sum(1, 2)
print(f"1 + 2 = {result}")
await client.notify('ping')
if __name__ == "__main__":
asyncio.run(main())
aio_pika server#
import asyncio
import logging
import aio_pika
from yarl import URL
import pjrpc
from pjrpc.server.integration import aio_pika as integration
methods = pjrpc.server.MethodRegistry()
@methods.add(pass_context='message')
def sum(message: aio_pika.IncomingMessage, a: int, b: int) -> int:
return a + b
@methods.add(pass_context=True)
def sub(context: aio_pika.IncomingMessage, a: int, b: int) -> int:
return a - b
@methods.add()
async def ping() -> None:
logging.info("ping")
executor = integration.Executor(URL('amqp://guest:guest@localhost:5672/v1'), request_queue_name='math-service')
executor.dispatcher.add_methods(methods)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
loop = asyncio.get_event_loop()
loop.run_until_complete(executor.start())
try:
loop.run_forever()
finally:
loop.run_until_complete(executor.shutdown())
aiohttp client#
import asyncio
import pjrpc
from pjrpc.client.backend import aiohttp as pjrpc_client
async def main():
async with pjrpc_client.Client('http://localhost:8080/api/v1') as client:
response = await client.send(pjrpc.Request('sum', params=[1, 2], id=1))
print(f"1 + 2 = {response.result}")
result = await client('sum', a=1, b=2)
print(f"1 + 2 = {result}")
result = await client.proxy.sum(1, 2)
print(f"1 + 2 = {result}")
await client.notify('ping')
asyncio.run(main())
aiohttp client batch request#
import asyncio
import pjrpc
from pjrpc.client.backend import aiohttp as pjrpc_client
async def main():
async with pjrpc_client.Client('http://localhost:8080/api/v1') as client:
async with client.batch() as batch:
batch.send(pjrpc.Request('sum', [2, 2], id=1))
batch.send(pjrpc.Request('sub', [2, 2], id=2))
batch.send(pjrpc.Request('div', [2, 2], id=3))
batch.send(pjrpc.Request('mult', [2, 2], id=4))
response = batch.get_response()
print(f"2 + 2 = {response[0].result}")
print(f"2 - 2 = {response[1].result}")
print(f"2 / 2 = {response[2].result}")
print(f"2 * 2 = {response[3].result}")
async with client.batch() as batch:
batch('sum', 2, 2)
batch('sub', 2, 2)
batch('div', 2, 2)
batch('mult', 2, 2)
result = batch.get_results()
print(f"2 + 2 = {result[0]}")
print(f"2 - 2 = {result[1]}")
print(f"2 / 2 = {result[2]}")
print(f"2 * 2 = {result[3]}")
async with client.batch() as batch:
batch.proxy.sum(2, 2)
batch.proxy.sub(2, 2)
batch.proxy.div(2, 2)
batch.proxy.mult(2, 2)
result = batch.get_results()
print(f"2 + 2 = {result[0]}")
print(f"2 - 2 = {result[1]}")
print(f"2 / 2 = {result[2]}")
print(f"2 * 2 = {result[3]}")
async with client.batch() as batch:
batch.notify('tick')
batch.notify('tack')
asyncio.run(main())
aiohttp pytest integration#
from unittest import mock
import pytest
import pjrpc
from pjrpc.client.backend import aiohttp as aiohttp_client
from pjrpc.client.integrations.pytest_aiohttp import PjRpcAiohttpMocker
async def test_using_fixture(pjrpc_aiohttp_mocker):
client = aiohttp_client.Client('http://localhost/api/v1')
pjrpc_aiohttp_mocker.add('http://localhost/api/v1', 'sum', result=2)
result = await client.proxy.sum(1, 1)
assert result == 2
pjrpc_aiohttp_mocker.replace(
'http://localhost/api/v1', 'sum', error=pjrpc.exc.JsonRpcError(code=1, message='error', data='oops'),
)
with pytest.raises(pjrpc.exc.JsonRpcError) as exc_info:
await client.proxy.sum(a=1, b=1)
assert exc_info.type is pjrpc.exc.JsonRpcError
assert exc_info.value.code == 1
assert exc_info.value.message == 'error'
assert exc_info.value.data == 'oops'
localhost_calls = pjrpc_aiohttp_mocker.calls['http://localhost/api/v1']
assert localhost_calls[('2.0', 'sum')].call_count == 2
assert localhost_calls[('2.0', 'sum')].mock_calls == [mock.call(1, 1), mock.call(a=1, b=1)]
async def test_using_resource_manager():
client = aiohttp_client.Client('http://localhost/api/v1')
with PjRpcAiohttpMocker() as mocker:
mocker.add('http://localhost/api/v1', 'div', result=2)
result = await client.proxy.div(4, 2)
assert result == 2
localhost_calls = mocker.calls['http://localhost/api/v1']
assert localhost_calls[('2.0', 'div')].mock_calls == [mock.call(4, 2)]
aiohttp server#
import logging
from aiohttp import web
import pjrpc.server
from pjrpc.server.integration import aiohttp
methods = pjrpc.server.MethodRegistry()
@methods.add(pass_context='request')
async def sum(request: web.Request, a: int, b: int) -> int:
return a + b
@methods.add(pass_context='request')
async def sub(request: web.Request, a: int, b: int) -> int:
return a - b
@methods.add(pass_context='request')
async def div(request: web.Request, a: int, b: int) -> float:
return a / b
@methods.add(pass_context='request')
async def mult(request: web.Request, a: int, b: int) -> int:
return a * b
@methods.add()
async def ping() -> None:
logging.info("ping")
jsonrpc_app = aiohttp.Application('/api/v1')
jsonrpc_app.add_methods(methods)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
web.run_app(jsonrpc_app.http_app, host='localhost', port=8080)
aiohttp versioning#
import logging
import uuid
from aiohttp import web
import pjrpc.server
from pjrpc.server.integration import aiohttp
methods_v1 = pjrpc.server.MethodRegistry()
@methods_v1.add(name="add_user", pass_context='request')
async def add_user_v1(request: web.Request, user: dict) -> dict:
user_id = uuid.uuid4().hex
request.config_dict['users'][user_id] = user
return {'id': user_id, **user}
methods_v2 = pjrpc.server.MethodRegistry()
@methods_v2.add(name="add_user", pass_context='request')
async def add_user_v2(request: web.Request, user: dict) -> dict:
user_id = uuid.uuid4().hex
request.config_dict['users'][user_id] = user
return {'id': user_id, **user}
app = web.Application()
app['users'] = {}
app_v1 = aiohttp.Application()
app_v1.add_methods(methods_v1)
app.add_subapp('/api/v1', app_v1.http_app)
app_v2 = aiohttp.Application()
app_v2.add_methods(methods_v2)
app.add_subapp('/api/v2', app_v2.http_app)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
web.run_app(app, host='localhost', port=8080)
client prometheus metrics#
import time
from typing import Any, Mapping, Optional
import prometheus_client as prom_cli
from pjrpc import AbstractRequest, AbstractResponse, BatchRequest, Request
from pjrpc.client import MiddlewareHandler
from pjrpc.client.backend import requests as pjrpc_client
method_latency_hist = prom_cli.Histogram('method_latency', 'Method latency', labelnames=['method'])
method_call_total = prom_cli.Counter('method_call_total', 'Method call count', labelnames=['method'])
method_errors_total = prom_cli.Counter('method_errors_total', 'Method errors count', labelnames=['method', 'code'])
def prometheus_tracing_middleware(
request: AbstractRequest,
request_kwargs: Mapping[str, Any],
/,
handler: MiddlewareHandler,
) -> Optional[AbstractResponse]:
if isinstance(request, Request):
started_at = time.time()
method_call_total.labels(request.method).inc()
response = handler(request, request_kwargs)
if response.is_error:
method_call_total.labels(request.method, response.unwrap_error().code).inc()
method_latency_hist.labels(request.method).observe(time.time() - started_at)
elif isinstance(request, BatchRequest):
response = handler(request, request_kwargs)
else:
raise AssertionError("unreachable")
return response
client = pjrpc_client.Client(
'http://localhost:8080/api/v1',
middlewares=(
prometheus_tracing_middleware,
),
)
result = client.proxy.sum(1, 2)
client tracing#
from typing import Any, Mapping, Optional
import opentracing
from opentracing import propagation, tags
from pjrpc.client import MiddlewareHandler
from pjrpc.client.backend import requests as pjrpc_client
from pjrpc.common import AbstractRequest, AbstractResponse, BatchRequest, Request
tracer = opentracing.global_tracer()
def tracing_middleware(
request: AbstractRequest,
request_kwargs: Mapping[str, Any],
/,
handler: MiddlewareHandler,
) -> Optional[AbstractResponse]:
if isinstance(request, Request):
span = tracer.start_active_span(f'jsonrpc.{request.method}').span
span.set_tag(tags.COMPONENT, 'pjrpc.client')
span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_CLIENT)
if http_headers := request_kwargs.get('headers', {}):
tracer.inject(
span_context=span,
format=propagation.Format.HTTP_HEADERS,
carrier=http_headers,
)
response = handler(request, request_kwargs)
if response.is_error:
span = tracer.active_span
span.set_tag(tags.ERROR, response.is_error)
span.set_tag('jsonrpc.error_code', response.unwrap_error().code)
span.set_tag('jsonrpc.error_message', response.unwrap_error().message)
span.finish()
elif isinstance(request, BatchRequest):
response = handler(request, request_kwargs)
else:
raise AssertionError("unreachable")
return response
client = pjrpc_client.Client(
'http://localhost:8080/api/v1',
middlewares=[
tracing_middleware,
],
)
result = client.proxy.sum(1, 2)
flask server#
import pjrpc
from pjrpc.server.integration import flask as integration
methods = pjrpc.server.MethodRegistry()
@methods.add()
def sum(a: int, b: int) -> int:
return a + b
json_rpc = integration.JsonRPC('/api/v1')
json_rpc.add_methods(methods)
if __name__ == "__main__":
json_rpc.http_app.run(port=8080)
flask versioning#
import uuid
import flask
import pjrpc.server
from pjrpc.server.integration import flask as integration
methods_v1 = pjrpc.server.MethodRegistry()
@methods_v1.add(name="add_user")
def add_user_v1(user: dict):
user_id = uuid.uuid4().hex
flask.current_app.users[user_id] = user
return {'id': user_id, **user}
methods_v2 = pjrpc.server.MethodRegistry()
@methods_v2.add(name="add_user")
def add_user_v2(user: dict):
user_id = uuid.uuid4().hex
flask.current_app.users[user_id] = user
return {'id': user_id, **user}
json_rpc = integration.JsonRPC('/api')
json_rpc.http_app.users = {}
json_rpc_v1 = integration.JsonRPC(http_app=flask.Blueprint("v1", __name__))
json_rpc_v1.add_methods(methods_v1)
json_rpc.add_subapp('/v1', json_rpc_v1)
json_rpc_v2 = integration.JsonRPC(http_app=flask.Blueprint("v2", __name__))
json_rpc_v2.add_methods(methods_v2)
json_rpc.add_subapp('/v2', json_rpc_v2)
if __name__ == "__main__":
json_rpc.http_app.run(port=8080)
httpserver#
import http.server
import socketserver
import pjrpc
import pjrpc.server
class JsonRpcHandler(http.server.BaseHTTPRequestHandler):
"""
JSON-RPC handler.
"""
def do_POST(self):
"""
Handles JSON-RPC request.
"""
content_type = self.headers.get('Content-Type')
if content_type not in pjrpc.common.REQUEST_CONTENT_TYPES:
self.send_error(http.HTTPStatus.UNSUPPORTED_MEDIA_TYPE)
return
try:
content_length = int(self.headers.get('Content-Length', -1))
request_text = self.rfile.read(content_length).decode()
except UnicodeDecodeError:
self.send_error(http.HTTPStatus.BAD_REQUEST)
return
response = self.server.dispatcher.dispatch(request_text, context=self)
if response is None:
self.send_response_only(http.HTTPStatus.OK)
self.end_headers()
else:
response_text, error_codes = response
self.send_response(http.HTTPStatus.OK)
self.send_header("Content-type", pjrpc.common.DEFAULT_CONTENT_TYPE)
self.end_headers()
self.wfile.write(response_text.encode())
class JsonRpcServer(http.server.HTTPServer):
"""
:py:class:`http.server.HTTPServer` based JSON-RPC server.
:param path: JSON-RPC handler base path
:param kwargs: arguments to be passed to the dispatcher :py:class:`pjrpc.server.Dispatcher`
"""
def __init__(self, server_address, RequestHandlerClass=JsonRpcHandler, bind_and_activate=True, **kwargs):
super().__init__(server_address, RequestHandlerClass, bind_and_activate)
self._dispatcher = pjrpc.server.Dispatcher(**kwargs)
@property
def dispatcher(self):
"""
JSON-RPC method dispatcher.
"""
return self._dispatcher
methods = pjrpc.server.MethodRegistry()
@methods.add(pass_context='request')
def sum(request: http.server.BaseHTTPRequestHandler, a: int, b: int) -> int:
return a + b
class ThreadingJsonRpcServer(socketserver.ThreadingMixIn, JsonRpcServer):
users = {}
with ThreadingJsonRpcServer(("localhost", 8080)) as server:
server.dispatcher.add_methods(methods)
server.serve_forever()
middlewares#
from aiohttp import web
import pjrpc.server
from pjrpc.common import Request
from pjrpc.server.integration import aiohttp
from pjrpc.server.typedefs import AsyncHandlerType, ContextType, MiddlewareResponse
methods = pjrpc.server.MethodRegistry()
@methods.add(pass_context='request')
async def sum(request: web.Request, a: int, b: int) -> int:
return a + b
async def middleware1(request: Request, context: ContextType, handler: AsyncHandlerType) -> MiddlewareResponse:
print("middleware1 started")
result = await handler(request, context)
print("middleware1 finished")
return result
async def middleware2(request: Request, context: ContextType, handler: AsyncHandlerType) -> MiddlewareResponse:
print("middleware2 started")
result = await handler(request, context)
print("middleware2 finished")
return result
jsonrpc_app = aiohttp.Application(
'/api/v1',
middlewares=[
middleware1,
middleware2,
],
)
jsonrpc_app.add_methods(methods)
if __name__ == "__main__":
web.run_app(jsonrpc_app.http_app, host='localhost', port=8080)
multiple clients#
from typing import ClassVar
import pjrpc
from pjrpc.client.backend import requests as jrpc_client
class ErrorV1(pjrpc.exc.TypedError, base=True):
pass
class PermissionDenied(ErrorV1):
CODE: ClassVar[int] = 1
MESSAGE: ClassVar[str] = 'permission denied'
class ErrorV2(pjrpc.exc.TypedError, base=True):
pass
class ResourceNotFound(ErrorV2):
CODE: ClassVar[int] = 1
MESSAGE: ClassVar[str] = 'resource not found'
client_v1 = jrpc_client.Client('http://localhost:8080/api/v1', error_cls=ErrorV1)
client_v2 = jrpc_client.Client('http://localhost:8080/api/v2', error_cls=ErrorV2)
try:
client_v1.proxy.add_user(user={})
except PermissionDenied as e:
print(e)
try:
client_v2.proxy.add_user(user={})
except ResourceNotFound as e:
print(e)
pydantic validator#
import enum
import uuid
import pydantic
from aiohttp import web
import pjrpc.server
from pjrpc.server.integration import aiohttp
from pjrpc.server.validators import pydantic as validators
methods = pjrpc.server.MethodRegistry(
validator_factory=validators.PydanticValidatorFactory(exclude=aiohttp.is_aiohttp_request),
)
class ContactType(enum.Enum):
PHONE = 'phone'
EMAIL = 'email'
class Contact(pydantic.BaseModel):
type: ContactType
value: str
class User(pydantic.BaseModel):
name: str
surname: str
age: int
contacts: list[Contact]
class UserOut(User):
id: uuid.UUID
@methods.add(pass_context='request')
async def add_user(request: web.Request, user: User) -> UserOut:
user_id = uuid.uuid4()
request.app['users'][user_id] = user
return UserOut(id=user_id, **user.model_dump())
class JSONEncoder(pjrpc.server.JSONEncoder):
def default(self, o):
if isinstance(o, uuid.UUID):
return o.hex
if isinstance(o, enum.Enum):
return o.value
if isinstance(o, pydantic.BaseModel):
return o.model_dump()
return super().default(o)
jsonrpc_app = aiohttp.Application('/api/v1', json_encoder=JSONEncoder)
jsonrpc_app.add_methods(methods)
jsonrpc_app.http_app['users'] = {}
if __name__ == "__main__":
web.run_app(jsonrpc_app.http_app, host='localhost', port=8080)
requests client#
import pjrpc
from pjrpc.client.backend import requests as pjrpc_client
client = pjrpc_client.Client('http://localhost:8080/api/v1')
response: pjrpc.Response = client.send(pjrpc.Request('sum', params=[1, 2], id=1))
print(f"1 + 2 = {response.result}")
result = client('sum', a=1, b=2)
print(f"1 + 2 = {result}")
result = client.proxy.sum(1, 2)
print(f"1 + 2 = {result}")
client.notify('tick')
requests pytest#
from unittest import mock
import pytest
import pjrpc
from pjrpc.client.backend import requests as requests_client
from pjrpc.client.integrations.pytest_requests import PjRpcRequestsMocker
def test_using_fixture(pjrpc_requests_mocker):
client = requests_client.Client('http://localhost/api/v1')
pjrpc_requests_mocker.add('http://localhost/api/v1', 'sum', result=2)
result = client.proxy.sum(1, 1)
assert result == 2
pjrpc_requests_mocker.replace(
'http://localhost/api/v1', 'sum', error=pjrpc.exc.JsonRpcError(code=1, message='error', data='oops'),
)
with pytest.raises(pjrpc.exc.JsonRpcError) as exc_info:
client.proxy.sum(a=1, b=1)
assert exc_info.type is pjrpc.exc.JsonRpcError
assert exc_info.value.code == 1
assert exc_info.value.message == 'error'
assert exc_info.value.data == 'oops'
localhost_calls = pjrpc_requests_mocker.calls['http://localhost/api/v1']
assert localhost_calls[('2.0', 'sum')].call_count == 2
assert localhost_calls[('2.0', 'sum')].mock_calls == [mock.call(1, 1), mock.call(a=1, b=1)]
client = requests_client.Client('http://localhost/api/v2')
with pytest.raises(ConnectionRefusedError):
client.proxy.sum(1, 1)
def test_using_resource_manager():
client = requests_client.Client('http://localhost/api/v1')
with PjRpcRequestsMocker() as mocker:
mocker.add('http://localhost/api/v1', 'mult', result=4)
mocker.add('http://localhost/api/v1', 'div', callback=lambda a, b: a/b)
with client.batch() as batch:
batch.proxy.div(4, 2)
batch.proxy.mult(2, 2)
result = batch.get_results()
assert result == [2, 4]
localhost_calls = mocker.calls['http://localhost/api/v1']
assert localhost_calls[('2.0', 'div')].mock_calls == [mock.call(4, 2)]
assert localhost_calls[('2.0', 'mult')].mock_calls == [mock.call(2, 2)]
with pytest.raises(pjrpc.exc.MethodNotFoundError):
client.proxy.sub(4, 2)
sentry#
import sentry_sdk
from aiohttp import web
import pjrpc.server
from pjrpc.common import Request, Response
from pjrpc.server import AsyncHandlerType
from pjrpc.server.integration import aiohttp
methods = pjrpc.server.MethodRegistry()
@methods.add(pass_context='request')
async def sum(request: web.Request, a: int, b: int) -> int:
return a + b
async def sentry_middleware(request: Request, context: web.Request, handler: AsyncHandlerType) -> Response:
try:
return await handler(request, context)
except pjrpc.exceptions.JsonRpcError as e:
sentry_sdk.capture_exception(e)
raise
jsonrpc_app = aiohttp.Application(
'/api/v1', middlewares=(
sentry_middleware,
),
)
jsonrpc_app.add_methods(methods)
if __name__ == "__main__":
web.run_app(jsonrpc_app.http_app, host='localhost', port=8080)
server prometheus metrics#
import asyncio
import prometheus_client as pc
from aiohttp import web
import pjrpc.server
from pjrpc import Request, Response
from pjrpc.server import AsyncHandlerType
from pjrpc.server.integration import aiohttp
method_error_count = pc.Counter('method_error_count', 'Method error count', labelnames=['method', 'code'])
method_latency_hist = pc.Histogram('method_latency', 'Method latency', labelnames=['method'])
method_active_count = pc.Gauge('method_active_count', 'Method active count', labelnames=['method'])
async def metrics(request):
return web.Response(body=pc.generate_latest())
http_app = web.Application()
http_app.add_routes([web.get('/metrics', metrics)])
methods = pjrpc.server.MethodRegistry()
@methods.add(pass_context='context')
async def sum(context: web.Request, a: int, b: int) -> int:
print("method started")
await asyncio.sleep(1)
print("method finished")
return a + b
async def latency_metric_middleware(request: Request, context: web.Request, handler: AsyncHandlerType) -> Response:
with method_latency_hist.labels(method=request.method).time():
return await handler(request, context)
async def active_count_metric_middleware(request: Request, context: web.Request, handler: AsyncHandlerType) -> Response:
with method_active_count.labels(method=request.method).track_inprogress():
return await handler(request, context)
async def error_counter_middleware(request: Request, context: web.Request, handler: AsyncHandlerType) -> Response:
if response := await handler(request, context):
if response.is_error:
method_error_count.labels(method=request.method, code=response.unwrap_error().code).inc()
return response
jsonrpc_app = aiohttp.Application(
'/api/v1',
http_app=http_app,
middlewares=(
latency_metric_middleware,
active_count_metric_middleware,
error_counter_middleware,
),
)
jsonrpc_app.add_methods(methods)
if __name__ == "__main__":
web.run_app(jsonrpc_app.http_app, host='localhost', port=8080)
server tracing#
import asyncio
import opentracing
from aiohttp import web
from aiohttp.typedefs import Handler as HttpHandler
from opentracing import tags
import pjrpc.server
from pjrpc import Request, Response
from pjrpc.server import AsyncHandlerType
from pjrpc.server.integration import aiohttp
@web.middleware
async def http_tracing_middleware(request: web.Request, handler: HttpHandler) -> web.StreamResponse:
"""
aiohttp server tracer.
"""
tracer = opentracing.global_tracer()
try:
span_ctx = tracer.extract(format=opentracing.Format.HTTP_HEADERS, carrier=request.headers)
except (opentracing.InvalidCarrierException, opentracing.SpanContextCorruptedException):
span_ctx = None
span = tracer.start_span(f'http.{request.method}', child_of=span_ctx)
span.set_tag(tags.COMPONENT, 'aiohttp.server')
span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_SERVER)
span.set_tag(tags.PEER_ADDRESS, request.remote)
span.set_tag(tags.HTTP_URL, str(request.url))
span.set_tag(tags.HTTP_METHOD, request.method)
with tracer.scope_manager.activate(span, finish_on_close=True):
response = await handler(request)
span.set_tag(tags.HTTP_STATUS_CODE, response.status)
span.set_tag(tags.ERROR, response.status >= 400)
return response
http_app = web.Application(
middlewares=(
http_tracing_middleware,
),
)
methods = pjrpc.server.MethodRegistry()
@methods.add(pass_context='context')
async def sum(context: web.Request, a: int, b: int) -> int:
print("method started")
await asyncio.sleep(1)
print("method finished")
return a + b
async def jsonrpc_tracing_middleware(request: Request, context: web.Request, handler: AsyncHandlerType) -> Response:
tracer = opentracing.global_tracer()
span = tracer.start_span(f'jsonrpc.{request.method}')
span.set_tag(tags.COMPONENT, 'pjrpc')
span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_SERVER)
span.set_tag('jsonrpc.version', request.version)
span.set_tag('jsonrpc.id', request.id)
span.set_tag('jsonrpc.method', request.method)
with tracer.scope_manager.activate(span, finish_on_close=True):
if response := await handler(request, context):
if response.is_error:
span.set_tag('jsonrpc.error_code', response.error.code)
span.set_tag('jsonrpc.error_message', response.error.message)
span.set_tag(tags.ERROR, True)
else:
span.set_tag(tags.ERROR, False)
return response
jsonrpc_app = aiohttp.Application(
'/api/v1',
http_app=http_app,
middlewares=[
jsonrpc_tracing_middleware,
],
)
jsonrpc_app.add_methods(methods)
if __name__ == "__main__":
web.run_app(jsonrpc_app.http_app, host='localhost', port=8080)
werkzeug server#
import uuid
import werkzeug
import pjrpc.server
from pjrpc.server.integration import werkzeug as integration
methods = pjrpc.server.MethodRegistry()
@methods.add(pass_context='request')
def add_user(request: werkzeug.Request, user: dict):
user_id = uuid.uuid4().hex
request.environ['app'].users[user_id] = user
return {'id': user_id, **user}
app = integration.JsonRPC('/api/v1')
app.dispatcher.add_methods(methods)
app.users = {}
if __name__ == '__main__':
werkzeug.serving.run_simple('127.0.0.1', 8080, app)
flask OpenAPI specification#
import uuid
from typing import Annotated, Any
import flask
import flask_cors
import pydantic as pd
import pjrpc.server.specs.extractors.pydantic
import pjrpc.server.specs.openapi.ui
from pjrpc.server.integration import flask as integration
from pjrpc.server.specs import extractors, openapi
from pjrpc.server.validators import pydantic as validators
methods = pjrpc.server.MethodRegistry(
validator_factory=validators.PydanticValidatorFactory(),
metadata_processors=[
openapi.MethodSpecificationGenerator(
extractor=extractors.pydantic.PydanticMethodInfoExtractor(),
),
],
)
UserName = Annotated[
str,
pd.Field(description="User name", examples=["John"]),
]
UserSurname = Annotated[
str,
pd.Field(description="User surname", examples=['Doe']),
]
UserAge = Annotated[
int,
pd.Field(description="User age", examples=[25]),
]
UserId = Annotated[
uuid.UUID,
pd.Field(description="User identifier", examples=["c47726c6-a232-45f1-944f-60b98966ff1b"]),
]
class UserIn(pd.BaseModel):
"""
User registration data.
"""
name: UserName
surname: UserSurname
age: UserAge
class UserOut(UserIn):
"""
Registered user data.
"""
id: UserId
class AlreadyExistsError(pjrpc.exc.TypedError):
"""
User already registered error.
"""
CODE = 2001
MESSAGE = "user already exists"
class NotFoundError(pjrpc.exc.TypedError):
"""
User not found error.
"""
CODE = 2002
MESSAGE = "user not found"
@methods.add(
metadata=[
openapi.metadata(
summary='Creates a user',
tags=['users'],
errors=[AlreadyExistsError],
),
],
)
def add_user(user: UserIn) -> UserOut:
"""
Creates a user.
:param object user: user data
:return object: registered user
:raise AlreadyExistsError: user already exists
"""
for existing_user in flask.current_app.users_db.values():
if user.name == existing_user.name:
raise AlreadyExistsError()
user_id = uuid.uuid4()
flask.current_app.users_db[user_id] = user
return UserOut(id=user_id, **user.model_dump())
@methods.add(
metadata=[
openapi.metadata(
summary='Returns a user',
tags=['users'],
errors=[NotFoundError],
),
],
)
def get_user(user_id: UserId) -> UserOut:
"""
Returns a user.
:param object user_id: user id
:return object: registered user
:raise NotFoundError: user not found
"""
user = flask.current_app.users_db.get(user_id.hex)
if not user:
raise NotFoundError()
return UserOut(id=user_id, **user.model_dump())
@methods.add(
metadata=[
openapi.metadata(
summary='Deletes a user',
tags=['users'],
errors=[NotFoundError],
),
],
)
def delete_user(user_id: UserId) -> None:
"""
Deletes a user.
:param object user_id: user id
:raise NotFoundError: user not found
"""
user = flask.current_app.users_db.pop(user_id.hex, None)
if not user:
raise NotFoundError()
class JSONEncoder(pjrpc.server.JSONEncoder):
def default(self, o: Any) -> Any:
if isinstance(o, pd.BaseModel):
return o.model_dump()
if isinstance(o, uuid.UUID):
return str(o)
return super().default(o)
openapi_spec = openapi.OpenAPI(
info=openapi.Info(version="1.0.0", title="User storage"),
servers=[
openapi.Server(
url='http://127.0.0.1:8080',
),
],
security_schemes=dict(
basicAuth=openapi.SecurityScheme(
type=openapi.SecuritySchemeType.HTTP,
scheme='basic',
),
),
security=[
dict(basicAuth=[]),
],
)
jsonrpc_v1 = integration.JsonRPC('/api/v1', json_encoder=JSONEncoder)
jsonrpc_v1.add_methods(methods)
jsonrpc_v1.add_spec(openapi_spec, path='openapi.json')
jsonrpc_v1.add_spec_ui('swagger', ui=openapi.ui.SwaggerUI(), spec_url='../openapi.json')
flask_cors.CORS(jsonrpc_v1.http_app, resources={"/rpc/api/v1/*": {"origins": "*"}})
jsonrpc_v1.http_app.users_db = {}
if __name__ == "__main__":
jsonrpc_v1.http_app.run(port=8080)
aiohttp OpenAPI specification#
import uuid
from typing import Annotated, Any
import aiohttp.typedefs
import aiohttp_cors
import pydantic as pd
from aiohttp import helpers, web
import pjrpc.server.specs.extractors.pydantic
import pjrpc.server.specs.openapi.ui
from pjrpc.server.integration import aiohttp as integration
from pjrpc.server.specs import extractors
from pjrpc.server.specs import openapi as specs
from pjrpc.server.validators import pydantic as validators
credentials = {"admin": "admin"}
methods = pjrpc.server.MethodRegistry(
validator_factory=validators.PydanticValidatorFactory(exclude=integration.is_aiohttp_request),
metadata_processors=[
specs.MethodSpecificationGenerator(
extractor=extractors.pydantic.PydanticMethodInfoExtractor(
exclude=integration.is_aiohttp_request,
),
),
],
)
UserName = Annotated[
str,
pd.Field(description="User name", examples=["John"]),
]
UserSurname = Annotated[
str,
pd.Field(description="User surname", examples=['Doe']),
]
UserAge = Annotated[
int,
pd.Field(description="User age", examples=[36]),
]
UserId = Annotated[
uuid.UUID,
pd.Field(description="User identifier", examples=["226a2c23-c98b-4729-b398-0dae550e99ff"]),
]
class UserIn(pd.BaseModel):
"""
User registration data.
"""
name: UserName
surname: UserSurname
age: UserAge
class UserOut(UserIn):
"""
Registered user data.
"""
id: UserId
class AlreadyExistsError(pjrpc.exc.TypedError):
"""
User already registered error.
"""
CODE = 2001
MESSAGE = "user already exists"
class NotFoundError(pjrpc.exc.TypedError):
"""
User not found error.
"""
CODE = 2002
MESSAGE = "user not found"
@methods.add(
pass_context='request',
metadata=[
specs.metadata(
summary='Creates a user',
tags=['users'],
errors=[AlreadyExistsError],
),
],
)
def add_user(request: web.Request, user: UserIn) -> UserOut:
"""
Creates a user.
:param request: http request
:param object user: user data
:return object: registered user
:raise AlreadyExistsError: user already exists
"""
for existing_user in request.config_dict['users'].values():
if user.name == existing_user.name:
raise AlreadyExistsError()
user_id = uuid.uuid4()
request.config_dict['users'][user_id] = user
return UserOut(id=user_id, **user.model_dump())
@methods.add(
pass_context='request',
metadata=[
specs.metadata(
summary='Returns a user',
tags=['users'],
errors=[NotFoundError],
),
],
)
def get_user(request: web.Request, user_id: UserId) -> UserOut:
"""
Returns a user.
:param request: http request
:param object user_id: user id
:return object: registered user
:raise NotFoundError: user not found
"""
user = request.config_dict['users'].get(user_id.hex)
if not user:
raise NotFoundError()
return UserOut(id=user_id, **user.model_dump())
@methods.add(
pass_context='request',
metadata=[
specs.metadata(
summary='Deletes a user',
tags=['users'],
errors=[NotFoundError],
),
],
)
def delete_user(request: web.Request, user_id: UserId) -> None:
"""
Deletes a user.
:param request: http request
:param object user_id: user id
:raise NotFoundError: user not found
"""
user = request.config_dict['users'].pop(user_id.hex, None)
if not user:
raise NotFoundError()
class JSONEncoder(pjrpc.server.JSONEncoder):
def default(self, o: Any) -> Any:
if isinstance(o, pd.BaseModel):
return o.model_dump()
if isinstance(o, uuid.UUID):
return str(o)
return super().default(o)
async def basic_auth_middleware(request: web.Request, handler: aiohttp.typedefs.Handler) -> web.StreamResponse:
try:
auth = helpers.BasicAuth.decode(request.headers.get('Authorization', ''))
except ValueError:
raise web.HTTPUnauthorized
if credentials.get(auth.login) != auth.password:
raise web.HTTPUnauthorized
return await handler(request)
openapi_spec = specs.OpenAPI(
info=specs.Info(version="1.0.0", title="User storage"),
servers=[
specs.Server(
url='http://127.0.0.1:8080',
),
],
security_schemes=dict(
basicAuth=specs.SecurityScheme(
type=specs.SecuritySchemeType.HTTP,
scheme='basic',
),
),
security=[
dict(basicAuth=[]),
],
)
http_app = web.Application()
http_app['users'] = {}
jsonrpc_app = integration.Application('/api')
jsonrpc_app.add_spec(openapi_spec, path='openapi.json')
jsonrpc_app.add_spec_ui('swagger', specs.ui.SwaggerUI(), spec_url='../openapi.json')
jsonrpc_app.add_spec_ui('redoc', specs.ui.ReDoc(), spec_url='../openapi.json')
jsonrpc_v1_app = integration.Application(
http_app=web.Application(
middlewares=[
basic_auth_middleware,
],
),
json_encoder=JSONEncoder,
)
jsonrpc_v1_app.add_methods(methods)
jsonrpc_app.add_subapp('/v1', jsonrpc_v1_app)
http_app.add_subapp('/rpc', jsonrpc_app.http_app)
cors = aiohttp_cors.setup(
http_app, defaults={
'*': aiohttp_cors.ResourceOptions(
allow_credentials=True,
expose_headers='*',
allow_headers='*',
),
},
)
for route in list(http_app.router.routes()):
cors.add(route)
if __name__ == "__main__":
web.run_app(http_app, host='localhost', port=8080)