Examples#

aio_pika client#

import asyncio

from yarl import URL

import pjrpc
from pjrpc.client.backend import aio_pika as pjrpc_client


async def main():
    async with pjrpc_client.Client(
        broker_url=URL('amqp://guest:guest@localhost:5672/v1'),
        routing_key='math-service',
    ) as client:
        response: pjrpc.Response = await client.send(pjrpc.Request('sum', params=[1, 2], id=1))
        print(f"1 + 2 = {response.result}")

        result = await client('sum', a=1, b=2)
        print(f"1 + 2 = {result}")

        result = await client.proxy.sum(1, 2)
        print(f"1 + 2 = {result}")

        await client.notify('ping')


if __name__ == "__main__":
    asyncio.run(main())

aio_pika server#

import asyncio
import logging

import aio_pika
from yarl import URL

import pjrpc
from pjrpc.server.integration import aio_pika as integration

methods = pjrpc.server.MethodRegistry()


@methods.add(pass_context='message')
def sum(message: aio_pika.IncomingMessage, a: int, b: int) -> int:
    return a + b


@methods.add(pass_context=True)
def sub(context: aio_pika.IncomingMessage, a: int, b: int) -> int:
    return a - b


@methods.add()
async def ping() -> None:
    logging.info("ping")


executor = integration.Executor(URL('amqp://guest:guest@localhost:5672/v1'), request_queue_name='math-service')
executor.dispatcher.add_methods(methods)

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    loop = asyncio.get_event_loop()

    loop.run_until_complete(executor.start())
    try:
        loop.run_forever()
    finally:
        loop.run_until_complete(executor.shutdown())

aiohttp client#

import asyncio

import pjrpc
from pjrpc.client.backend import aiohttp as pjrpc_client


async def main():
    async with pjrpc_client.Client('http://localhost:8080/api/v1') as client:
        response = await client.send(pjrpc.Request('sum', params=[1, 2], id=1))
        print(f"1 + 2 = {response.result}")

        result = await client('sum', a=1, b=2)
        print(f"1 + 2 = {result}")

        result = await client.proxy.sum(1, 2)
        print(f"1 + 2 = {result}")

        await client.notify('ping')


asyncio.run(main())

aiohttp client batch request#

import asyncio

import pjrpc
from pjrpc.client.backend import aiohttp as pjrpc_client


async def main():
    async with pjrpc_client.Client('http://localhost:8080/api/v1') as client:
        async with client.batch() as batch:
            batch.send(pjrpc.Request('sum', [2, 2], id=1))
            batch.send(pjrpc.Request('sub', [2, 2], id=2))
            batch.send(pjrpc.Request('div', [2, 2], id=3))
            batch.send(pjrpc.Request('mult', [2, 2], id=4))

        response = batch.get_response()
        print(f"2 + 2 = {response[0].result}")
        print(f"2 - 2 = {response[1].result}")
        print(f"2 / 2 = {response[2].result}")
        print(f"2 * 2 = {response[3].result}")

        async with client.batch() as batch:
            batch('sum', 2, 2)
            batch('sub', 2, 2)
            batch('div', 2, 2)
            batch('mult', 2, 2)

        result = batch.get_results()
        print(f"2 + 2 = {result[0]}")
        print(f"2 - 2 = {result[1]}")
        print(f"2 / 2 = {result[2]}")
        print(f"2 * 2 = {result[3]}")

        async with client.batch() as batch:
            batch.proxy.sum(2, 2)
            batch.proxy.sub(2, 2)
            batch.proxy.div(2, 2)
            batch.proxy.mult(2, 2)

        result = batch.get_results()
        print(f"2 + 2 = {result[0]}")
        print(f"2 - 2 = {result[1]}")
        print(f"2 / 2 = {result[2]}")
        print(f"2 * 2 = {result[3]}")

        async with client.batch() as batch:
            batch.notify('tick')
            batch.notify('tack')


asyncio.run(main())

aiohttp pytest integration#

from unittest import mock

import pytest

import pjrpc
from pjrpc.client.backend import aiohttp as aiohttp_client
from pjrpc.client.integrations.pytest_aiohttp import PjRpcAiohttpMocker


async def test_using_fixture(pjrpc_aiohttp_mocker):
    client = aiohttp_client.Client('http://localhost/api/v1')

    pjrpc_aiohttp_mocker.add('http://localhost/api/v1', 'sum', result=2)
    result = await client.proxy.sum(1, 1)
    assert result == 2

    pjrpc_aiohttp_mocker.replace(
        'http://localhost/api/v1',
        'sum',
        error=pjrpc.client.exceptions.JsonRpcError(code=1, message='error', data='oops'),
    )
    with pytest.raises(pjrpc.client.exceptions.JsonRpcError) as exc_info:
        await client.proxy.sum(a=1, b=1)

    assert exc_info.type is pjrpc.client.exceptions.JsonRpcError
    assert exc_info.value.code == 1
    assert exc_info.value.message == 'error'
    assert exc_info.value.data == 'oops'

    localhost_calls = pjrpc_aiohttp_mocker.calls['http://localhost/api/v1']
    assert localhost_calls[('2.0', 'sum')].call_count == 2
    assert localhost_calls[('2.0', 'sum')].mock_calls == [mock.call(1, 1), mock.call(a=1, b=1)]


async def test_using_resource_manager():
    client = aiohttp_client.Client('http://localhost/api/v1')

    with PjRpcAiohttpMocker() as mocker:
        mocker.add('http://localhost/api/v1', 'div', result=2)
        result = await client.proxy.div(4, 2)
        assert result == 2

        localhost_calls = mocker.calls['http://localhost/api/v1']
        assert localhost_calls[('2.0', 'div')].mock_calls == [mock.call(4, 2)]

aiohttp server#

import logging

from aiohttp import web

import pjrpc.server
from pjrpc.server.integration import aiohttp

methods = pjrpc.server.MethodRegistry()


@methods.add(pass_context='request')
async def sum(request: web.Request, a: int, b: int) -> int:
    return a + b


@methods.add(pass_context='request')
async def sub(request: web.Request, a: int, b: int) -> int:
    return a - b


@methods.add(pass_context='request')
async def div(request: web.Request, a: int, b: int) -> float:
    return a / b


@methods.add(pass_context='request')
async def mult(request: web.Request, a: int, b: int) -> int:
    return a * b


@methods.add()
async def ping() -> None:
    logging.info("ping")


jsonrpc_app = aiohttp.Application('/api/v1')
jsonrpc_app.add_methods(methods)

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    web.run_app(jsonrpc_app.http_app, host='localhost', port=8080)

aiohttp versioning#

import logging
import uuid

from aiohttp import web

import pjrpc.server
from pjrpc.server.integration import aiohttp

methods_v1 = pjrpc.server.MethodRegistry()


@methods_v1.add(name="add_user", pass_context='request')
async def add_user_v1(request: web.Request, user: dict) -> dict:
    user_id = uuid.uuid4().hex
    request.config_dict['users'][user_id] = user

    return {'id': user_id, **user}


methods_v2 = pjrpc.server.MethodRegistry()


@methods_v2.add(name="add_user", pass_context='request')
async def add_user_v2(request: web.Request, user: dict) -> dict:
    user_id = uuid.uuid4().hex
    request.config_dict['users'][user_id] = user

    return {'id': user_id, **user}


app = web.Application()
app['users'] = {}

app_v1 = aiohttp.Application()
app_v1.add_methods(methods_v1)
app.add_subapp('/api/v1', app_v1.http_app)


app_v2 = aiohttp.Application()
app_v2.add_methods(methods_v2)
app.add_subapp('/api/v2', app_v2.http_app)

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    web.run_app(app, host='localhost', port=8080)

client prometheus metrics#

import time
from typing import Any, Mapping, Optional

import prometheus_client as prom_cli

from pjrpc import AbstractRequest, AbstractResponse, BatchRequest, Request
from pjrpc.client import MiddlewareHandler
from pjrpc.client.backend import requests as pjrpc_client

method_latency_hist = prom_cli.Histogram('method_latency', 'Method latency', labelnames=['method'])
method_call_total = prom_cli.Counter('method_call_total', 'Method call count', labelnames=['method'])
method_errors_total = prom_cli.Counter('method_errors_total', 'Method errors count', labelnames=['method', 'code'])


def prometheus_tracing_middleware(
    request: AbstractRequest,
    request_kwargs: Mapping[str, Any],
    /,
    handler: MiddlewareHandler,
) -> Optional[AbstractResponse]:
    if isinstance(request, Request):
        started_at = time.time()
        method_call_total.labels(request.method).inc()
        response = handler(request, request_kwargs)
        if response.is_error:
            method_call_total.labels(request.method, response.unwrap_error().code).inc()

        method_latency_hist.labels(request.method).observe(time.time() - started_at)

    elif isinstance(request, BatchRequest):
        response = handler(request, request_kwargs)

    else:
        raise AssertionError("unreachable")

    return response


client = pjrpc_client.Client(
    'http://localhost:8080/api/v1',
    middlewares=(
        prometheus_tracing_middleware,
    ),
)

result = client.proxy.sum(1, 2)

client tracing#

from typing import Any, Mapping, Optional

import opentracing
from opentracing import propagation, tags

from pjrpc.client import MiddlewareHandler
from pjrpc.client.backend import requests as pjrpc_client
from pjrpc.common import AbstractRequest, AbstractResponse, BatchRequest, Request

tracer = opentracing.global_tracer()


def tracing_middleware(
    request: AbstractRequest,
    request_kwargs: Mapping[str, Any],
    /,
    handler: MiddlewareHandler,
) -> Optional[AbstractResponse]:
    if isinstance(request, Request):
        span = tracer.start_active_span(f'jsonrpc.{request.method}').span
        span.set_tag(tags.COMPONENT, 'pjrpc.client')
        span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_CLIENT)
        if http_headers := request_kwargs.get('headers', {}):
            tracer.inject(
                span_context=span,
                format=propagation.Format.HTTP_HEADERS,
                carrier=http_headers,
            )

        response = handler(request, request_kwargs)
        if response.is_error:
            span = tracer.active_span
            span.set_tag(tags.ERROR, response.is_error)
            span.set_tag('jsonrpc.error_code', response.unwrap_error().code)
            span.set_tag('jsonrpc.error_message', response.unwrap_error().message)

            span.finish()

    elif isinstance(request, BatchRequest):
        response = handler(request, request_kwargs)

    else:
        raise AssertionError("unreachable")

    return response


client = pjrpc_client.Client(
    'http://localhost:8080/api/v1',
    middlewares=[
        tracing_middleware,
    ],
)

result = client.proxy.sum(1, 2)

flask server#

import pjrpc
from pjrpc.server.integration import flask as integration

methods = pjrpc.server.MethodRegistry()


@methods.add()
def sum(a: int, b: int) -> int:
    return a + b


json_rpc = integration.JsonRPC('/api/v1')
json_rpc.add_methods(methods)

if __name__ == "__main__":
    json_rpc.http_app.run(port=8080)

flask versioning#

import uuid

import flask

import pjrpc.server
from pjrpc.server.integration import flask as integration

methods_v1 = pjrpc.server.MethodRegistry()


@methods_v1.add(name="add_user")
def add_user_v1(user: dict):
    user_id = uuid.uuid4().hex
    flask.current_app.users[user_id] = user

    return {'id': user_id, **user}


methods_v2 = pjrpc.server.MethodRegistry()


@methods_v2.add(name="add_user")
def add_user_v2(user: dict):
    user_id = uuid.uuid4().hex
    flask.current_app.users[user_id] = user

    return {'id': user_id, **user}


json_rpc = integration.JsonRPC('/api')
json_rpc.http_app.users = {}

json_rpc_v1 = integration.JsonRPC(http_app=flask.Blueprint("v1", __name__))
json_rpc_v1.add_methods(methods_v1)
json_rpc.add_subapp('/v1', json_rpc_v1)

json_rpc_v2 = integration.JsonRPC(http_app=flask.Blueprint("v2", __name__))
json_rpc_v2.add_methods(methods_v2)
json_rpc.add_subapp('/v2', json_rpc_v2)


if __name__ == "__main__":
    json_rpc.http_app.run(port=8080)

httpserver#

import http.server
import socketserver

import pjrpc
import pjrpc.server


class JsonRpcHandler(http.server.BaseHTTPRequestHandler):
    """
    JSON-RPC handler.
    """

    def do_POST(self):
        """
        Handles JSON-RPC request.
        """

        content_type = self.headers.get('Content-Type')
        if content_type not in pjrpc.common.REQUEST_CONTENT_TYPES:
            self.send_error(http.HTTPStatus.UNSUPPORTED_MEDIA_TYPE)
            return

        try:
            content_length = int(self.headers.get('Content-Length', -1))
            request_text = self.rfile.read(content_length).decode()
        except UnicodeDecodeError:
            self.send_error(http.HTTPStatus.BAD_REQUEST)
            return

        response = self.server.dispatcher.dispatch(request_text, context=self)
        if response is None:
            self.send_response_only(http.HTTPStatus.OK)
            self.end_headers()
        else:
            response_text, error_codes = response
            self.send_response(http.HTTPStatus.OK)
            self.send_header("Content-type", pjrpc.common.DEFAULT_CONTENT_TYPE)
            self.end_headers()

            self.wfile.write(response_text.encode())


class JsonRpcServer(http.server.HTTPServer):
    """
    :py:class:`http.server.HTTPServer` based JSON-RPC server.

    :param path: JSON-RPC handler base path
    :param kwargs: arguments to be passed to the dispatcher :py:class:`pjrpc.server.Dispatcher`
    """

    def __init__(self, server_address, RequestHandlerClass=JsonRpcHandler, bind_and_activate=True, **kwargs):
        super().__init__(server_address, RequestHandlerClass, bind_and_activate)
        self._dispatcher = pjrpc.server.Dispatcher(**kwargs)

    @property
    def dispatcher(self):
        """
        JSON-RPC method dispatcher.
        """

        return self._dispatcher


methods = pjrpc.server.MethodRegistry()


@methods.add(pass_context='request')
def sum(request: http.server.BaseHTTPRequestHandler, a: int, b: int) -> int:
    return a + b


class ThreadingJsonRpcServer(socketserver.ThreadingMixIn, JsonRpcServer):
    users = {}


with ThreadingJsonRpcServer(("localhost", 8080)) as server:
    server.dispatcher.add_methods(methods)

    server.serve_forever()

middlewares#

from aiohttp import web

import pjrpc.server
from pjrpc.common import Request
from pjrpc.server.integration import aiohttp
from pjrpc.server.typedefs import AsyncHandlerType, ContextType, MiddlewareResponse

methods = pjrpc.server.MethodRegistry()


@methods.add(pass_context='request')
async def sum(request: web.Request, a: int, b: int) -> int:
    return a + b


async def middleware1(request: Request, context: ContextType, handler: AsyncHandlerType) -> MiddlewareResponse:
    print("middleware1 started")
    result = await handler(request, context)
    print("middleware1 finished")

    return result


async def middleware2(request: Request, context: ContextType, handler: AsyncHandlerType) -> MiddlewareResponse:
    print("middleware2 started")
    result = await handler(request, context)
    print("middleware2 finished")

    return result

jsonrpc_app = aiohttp.Application(
    '/api/v1',
    middlewares=[
        middleware1,
        middleware2,
    ],
)
jsonrpc_app.add_methods(methods)

if __name__ == "__main__":
    web.run_app(jsonrpc_app.http_app, host='localhost', port=8080)

multiple clients#

from typing import ClassVar

import pjrpc
from pjrpc.client.backend import requests as jrpc_client


class ErrorV1(pjrpc.client.exceptions.TypedError, base=True):
    pass


class PermissionDenied(ErrorV1):
    CODE: ClassVar[int] = 1
    MESSAGE: ClassVar[str] = 'permission denied'


class ErrorV2(pjrpc.client.exceptions.TypedError, base=True):
    pass


class ResourceNotFound(ErrorV2):
    CODE: ClassVar[int] = 1
    MESSAGE: ClassVar[str] = 'resource not found'


client_v1 = jrpc_client.Client('http://localhost:8080/api/v1', error_cls=ErrorV1)
client_v2 = jrpc_client.Client('http://localhost:8080/api/v2', error_cls=ErrorV2)

try:
    client_v1.proxy.add_user(user={})
except PermissionDenied as e:
    print(e)

try:
    client_v2.proxy.add_user(user={})
except ResourceNotFound as e:
    print(e)

pydantic validator#

import enum
import uuid

import pydantic
from aiohttp import web

import pjrpc.server
from pjrpc.server.integration import aiohttp
from pjrpc.server.validators import pydantic as validators

methods = pjrpc.server.MethodRegistry(
    validator_factory=validators.PydanticValidatorFactory(exclude=aiohttp.is_aiohttp_request),
)


class ContactType(enum.Enum):
    PHONE = 'phone'
    EMAIL = 'email'


class Contact(pydantic.BaseModel):
    type: ContactType
    value: str


class User(pydantic.BaseModel):
    name: str
    surname: str
    age: int
    contacts: list[Contact]


class UserOut(User):
    id: uuid.UUID


@methods.add(pass_context='request')
async def add_user(request: web.Request, user: User) -> UserOut:
    user_id = uuid.uuid4()
    request.app['users'][user_id] = user

    return UserOut(id=user_id, **user.model_dump())


class JSONEncoder(pjrpc.server.JSONEncoder):
    def default(self, o):
        if isinstance(o, uuid.UUID):
            return o.hex
        if isinstance(o, enum.Enum):
            return o.value
        if isinstance(o, pydantic.BaseModel):
            return o.model_dump()

        return super().default(o)


jsonrpc_app = aiohttp.Application('/api/v1', json_encoder=JSONEncoder)
jsonrpc_app.add_methods(methods)
jsonrpc_app.http_app['users'] = {}

if __name__ == "__main__":
    web.run_app(jsonrpc_app.http_app, host='localhost', port=8080)

requests client#

import pjrpc
from pjrpc.client.backend import requests as pjrpc_client

client = pjrpc_client.Client('http://localhost:8080/api/v1')

response: pjrpc.Response = client.send(pjrpc.Request('sum', params=[1, 2], id=1))
print(f"1 + 2 = {response.result}")

result = client('sum', a=1, b=2)
print(f"1 + 2 = {result}")

result = client.proxy.sum(1, 2)
print(f"1 + 2 = {result}")

client.notify('tick')

requests pytest#

from unittest import mock

import pytest

import pjrpc
from pjrpc.client.backend import requests as requests_client
from pjrpc.client.integrations.pytest_requests import PjRpcRequestsMocker


def test_using_fixture(pjrpc_requests_mocker):
    client = requests_client.Client('http://localhost/api/v1')

    pjrpc_requests_mocker.add('http://localhost/api/v1', 'sum', result=2)
    result = client.proxy.sum(1, 1)
    assert result == 2

    pjrpc_requests_mocker.replace(
        'http://localhost/api/v1',
        'sum',
        error=pjrpc.client.exceptions.JsonRpcError(code=1, message='error', data='oops'),
    )
    with pytest.raises(pjrpc.client.exceptions.JsonRpcError) as exc_info:
        client.proxy.sum(a=1, b=1)

    assert exc_info.type is pjrpc.client.exceptions.JsonRpcError
    assert exc_info.value.code == 1
    assert exc_info.value.message == 'error'
    assert exc_info.value.data == 'oops'

    localhost_calls = pjrpc_requests_mocker.calls['http://localhost/api/v1']
    assert localhost_calls[('2.0', 'sum')].call_count == 2
    assert localhost_calls[('2.0', 'sum')].mock_calls == [mock.call(1, 1), mock.call(a=1, b=1)]

    client = requests_client.Client('http://localhost/api/v2')
    with pytest.raises(ConnectionRefusedError):
        client.proxy.sum(1, 1)


def test_using_resource_manager():
    client = requests_client.Client('http://localhost/api/v1')

    with PjRpcRequestsMocker() as mocker:
        mocker.add('http://localhost/api/v1', 'mult', result=4)
        mocker.add('http://localhost/api/v1', 'div', callback=lambda a, b: a/b)

        with client.batch() as batch:
            batch.proxy.div(4, 2)
            batch.proxy.mult(2, 2)

        result = batch.get_results()
        assert result == [2, 4]

        localhost_calls = mocker.calls['http://localhost/api/v1']
        assert localhost_calls[('2.0', 'div')].mock_calls == [mock.call(4, 2)]
        assert localhost_calls[('2.0', 'mult')].mock_calls == [mock.call(2, 2)]

        with pytest.raises(pjrpc.client.exceptions.MethodNotFoundError):
            client.proxy.sub(4, 2)

sentry#

import sentry_sdk
from aiohttp import web

import pjrpc.server
from pjrpc.common import Request, Response
from pjrpc.server import AsyncHandlerType
from pjrpc.server.integration import aiohttp

methods = pjrpc.server.MethodRegistry()


@methods.add(pass_context='request')
async def sum(request: web.Request, a: int, b: int) -> int:
    return a + b


async def sentry_middleware(request: Request, context: web.Request, handler: AsyncHandlerType) -> Response:
    try:
        return await handler(request, context)
    except pjrpc.server.exceptions.JsonRpcError as e:
        sentry_sdk.capture_exception(e)
        raise


jsonrpc_app = aiohttp.Application(
    '/api/v1', middlewares=(
        sentry_middleware,
    ),
)
jsonrpc_app.add_methods(methods)

if __name__ == "__main__":
    web.run_app(jsonrpc_app.http_app, host='localhost', port=8080)

server prometheus metrics#

import asyncio

import prometheus_client as pc
from aiohttp import web

import pjrpc.server
from pjrpc import Request, Response
from pjrpc.server import AsyncHandlerType
from pjrpc.server.integration import aiohttp

method_error_count = pc.Counter('method_error_count', 'Method error count', labelnames=['method', 'code'])
method_latency_hist = pc.Histogram('method_latency', 'Method latency', labelnames=['method'])
method_active_count = pc.Gauge('method_active_count', 'Method active count', labelnames=['method'])


async def metrics(request):
    return web.Response(body=pc.generate_latest())

http_app = web.Application()
http_app.add_routes([web.get('/metrics', metrics)])


methods = pjrpc.server.MethodRegistry()


@methods.add(pass_context='context')
async def sum(context: web.Request, a: int, b: int) -> int:
    print("method started")
    await asyncio.sleep(1)
    print("method finished")

    return a + b


async def latency_metric_middleware(request: Request, context: web.Request, handler: AsyncHandlerType) -> Response:
    with method_latency_hist.labels(method=request.method).time():
        return await handler(request, context)


async def active_count_metric_middleware(request: Request, context: web.Request, handler: AsyncHandlerType) -> Response:
    with method_active_count.labels(method=request.method).track_inprogress():
        return await handler(request, context)


async def error_counter_middleware(request: Request, context: web.Request, handler: AsyncHandlerType) -> Response:
    if response := await handler(request, context):
        if response.is_error:
            method_error_count.labels(method=request.method, code=response.unwrap_error().code).inc()

    return response


jsonrpc_app = aiohttp.Application(
    '/api/v1',
    http_app=http_app,
    middlewares=(
        latency_metric_middleware,
        active_count_metric_middleware,
        error_counter_middleware,
    ),
)
jsonrpc_app.add_methods(methods)

if __name__ == "__main__":
    web.run_app(jsonrpc_app.http_app, host='localhost', port=8080)

server tracing#

import asyncio

import opentracing
from aiohttp import web
from aiohttp.typedefs import Handler as HttpHandler
from opentracing import tags

import pjrpc.server
from pjrpc import Request, Response
from pjrpc.server import AsyncHandlerType
from pjrpc.server.integration import aiohttp


@web.middleware
async def http_tracing_middleware(request: web.Request, handler: HttpHandler) -> web.StreamResponse:
    """
    aiohttp server tracer.
    """

    tracer = opentracing.global_tracer()
    try:
        span_ctx = tracer.extract(format=opentracing.Format.HTTP_HEADERS, carrier=request.headers)
    except (opentracing.InvalidCarrierException, opentracing.SpanContextCorruptedException):
        span_ctx = None

    span = tracer.start_span(f'http.{request.method}', child_of=span_ctx)
    span.set_tag(tags.COMPONENT, 'aiohttp.server')
    span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_SERVER)
    span.set_tag(tags.PEER_ADDRESS, request.remote)
    span.set_tag(tags.HTTP_URL, str(request.url))
    span.set_tag(tags.HTTP_METHOD, request.method)

    with tracer.scope_manager.activate(span, finish_on_close=True):
        response = await handler(request)
        span.set_tag(tags.HTTP_STATUS_CODE, response.status)
        span.set_tag(tags.ERROR, response.status >= 400)

    return response

http_app = web.Application(
    middlewares=(
        http_tracing_middleware,
    ),
)

methods = pjrpc.server.MethodRegistry()


@methods.add(pass_context='context')
async def sum(context: web.Request, a: int, b: int) -> int:
    print("method started")
    await asyncio.sleep(1)
    print("method finished")

    return a + b


async def jsonrpc_tracing_middleware(request: Request, context: web.Request, handler: AsyncHandlerType) -> Response:
    tracer = opentracing.global_tracer()
    span = tracer.start_span(f'jsonrpc.{request.method}')

    span.set_tag(tags.COMPONENT, 'pjrpc')
    span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_SERVER)
    span.set_tag('jsonrpc.version', request.version)
    span.set_tag('jsonrpc.id', request.id)
    span.set_tag('jsonrpc.method', request.method)

    with tracer.scope_manager.activate(span, finish_on_close=True):
        if response := await handler(request, context):
            if response.is_error:
                span.set_tag('jsonrpc.error_code', response.error.code)
                span.set_tag('jsonrpc.error_message', response.error.message)
                span.set_tag(tags.ERROR, True)
            else:
                span.set_tag(tags.ERROR, False)

    return response

jsonrpc_app = aiohttp.Application(
    '/api/v1',
    http_app=http_app,
    middlewares=[
        jsonrpc_tracing_middleware,
    ],
)
jsonrpc_app.add_methods(methods)

if __name__ == "__main__":
    web.run_app(jsonrpc_app.http_app, host='localhost', port=8080)

werkzeug server#

import uuid

import werkzeug

import pjrpc.server
from pjrpc.server.integration import werkzeug as integration

methods = pjrpc.server.MethodRegistry()


@methods.add(pass_context='request')
def add_user(request: werkzeug.Request, user: dict):
    user_id = uuid.uuid4().hex
    request.environ['app'].users[user_id] = user

    return {'id': user_id, **user}


app = integration.JsonRPC('/api/v1')
app.dispatcher.add_methods(methods)
app.users = {}


if __name__ == '__main__':
    werkzeug.serving.run_simple('127.0.0.1', 8080, app)

flask OpenAPI specification#

import uuid
from typing import Annotated, Any

import flask
import flask_cors
import pydantic as pd

import pjrpc.server.specs.extractors.pydantic
import pjrpc.server.specs.openapi.ui
from pjrpc.server.integration import flask as integration
from pjrpc.server.specs import extractors, openapi
from pjrpc.server.validators import pydantic as validators

methods = pjrpc.server.MethodRegistry(
    validator_factory=validators.PydanticValidatorFactory(),
    metadata_processors=[
        openapi.MethodSpecificationGenerator(
            extractor=extractors.pydantic.PydanticMethodInfoExtractor(),
        ),
    ],
)


UserName = Annotated[
    str,
    pd.Field(description="User name", examples=["John"]),
]

UserSurname = Annotated[
    str,
    pd.Field(description="User surname", examples=['Doe']),
]

UserAge = Annotated[
    int,
    pd.Field(description="User age", examples=[25]),
]

UserId = Annotated[
    uuid.UUID,
    pd.Field(description="User identifier", examples=["c47726c6-a232-45f1-944f-60b98966ff1b"]),
]


class UserIn(pd.BaseModel):
    """
    User registration data.
    """

    name: UserName
    surname: UserSurname
    age: UserAge


class UserOut(UserIn):
    """
    Registered user data.
    """

    id: UserId


class AlreadyExistsError(pjrpc.server.exceptions.TypedError):
    """
    User already registered error.
    """

    CODE = 2001
    MESSAGE = "user already exists"


class NotFoundError(pjrpc.server.exceptions.TypedError):
    """
    User not found error.
    """

    CODE = 2002
    MESSAGE = "user not found"


@methods.add(
    metadata=[
        openapi.metadata(
            summary='Creates a user',
            tags=['users'],
            errors=[AlreadyExistsError],
        ),
    ],
)
def add_user(user: UserIn) -> UserOut:
    """
    Creates a user.

    :param object user: user data
    :return object: registered user
    :raise AlreadyExistsError: user already exists
    """

    for existing_user in flask.current_app.users_db.values():
        if user.name == existing_user.name:
            raise AlreadyExistsError()

    user_id = uuid.uuid4()
    flask.current_app.users_db[user_id] = user

    return UserOut(id=user_id, **user.model_dump())


@methods.add(
    metadata=[
        openapi.metadata(
            summary='Returns a user',
            tags=['users'],
            errors=[NotFoundError],
        ),
    ],
)
def get_user(user_id: UserId) -> UserOut:
    """
    Returns a user.

    :param object user_id: user id
    :return object: registered user
    :raise NotFoundError: user not found
    """

    user = flask.current_app.users_db.get(user_id.hex)
    if not user:
        raise NotFoundError()

    return UserOut(id=user_id, **user.model_dump())


@methods.add(
    metadata=[
        openapi.metadata(
            summary='Deletes a user',
            tags=['users'],
            errors=[NotFoundError],
        ),
    ],
)
def delete_user(user_id: UserId) -> None:
    """
    Deletes a user.

    :param object user_id: user id
    :raise NotFoundError: user not found
    """

    user = flask.current_app.users_db.pop(user_id.hex, None)
    if not user:
        raise NotFoundError()


class JSONEncoder(pjrpc.server.JSONEncoder):
    def default(self, o: Any) -> Any:
        if isinstance(o, pd.BaseModel):
            return o.model_dump()
        if isinstance(o, uuid.UUID):
            return str(o)

        return super().default(o)


openapi_spec = openapi.OpenAPI(
    info=openapi.Info(version="1.0.0", title="User storage"),
    servers=[
        openapi.Server(
            url='http://127.0.0.1:8080',
        ),
    ],
    security_schemes=dict(
        basicAuth=openapi.SecurityScheme(
            type=openapi.SecuritySchemeType.HTTP,
            scheme='basic',
        ),
    ),
    security=[
        dict(basicAuth=[]),
    ],
)


jsonrpc_v1 = integration.JsonRPC('/api/v1', json_encoder=JSONEncoder)
jsonrpc_v1.add_methods(methods)
jsonrpc_v1.add_spec(openapi_spec, path='openapi.json')
jsonrpc_v1.add_spec_ui('swagger', ui=openapi.ui.SwaggerUI(), spec_url='../openapi.json')

flask_cors.CORS(jsonrpc_v1.http_app, resources={"/rpc/api/v1/*": {"origins": "*"}})
jsonrpc_v1.http_app.users_db = {}


if __name__ == "__main__":
    jsonrpc_v1.http_app.run(port=8080)

aiohttp OpenAPI specification#

import uuid
from typing import Annotated, Any

import aiohttp.typedefs
import aiohttp_cors
import pydantic as pd
from aiohttp import helpers, web

import pjrpc.server.specs.extractors.pydantic
import pjrpc.server.specs.openapi.ui
from pjrpc.server.integration import aiohttp as integration
from pjrpc.server.specs import extractors
from pjrpc.server.specs import openapi as specs
from pjrpc.server.validators import pydantic as validators

credentials = {"admin": "admin"}


methods = pjrpc.server.MethodRegistry(
    validator_factory=validators.PydanticValidatorFactory(exclude=integration.is_aiohttp_request),
    metadata_processors=[
        specs.MethodSpecificationGenerator(
            extractor=extractors.pydantic.PydanticMethodInfoExtractor(
                exclude=integration.is_aiohttp_request,
            ),
        ),
    ],
)


UserName = Annotated[
    str,
    pd.Field(description="User name", examples=["John"]),
]

UserSurname = Annotated[
    str,
    pd.Field(description="User surname", examples=['Doe']),
]

UserAge = Annotated[
    int,
    pd.Field(description="User age", examples=[36]),
]

UserId = Annotated[
    uuid.UUID,
    pd.Field(description="User identifier", examples=["226a2c23-c98b-4729-b398-0dae550e99ff"]),
]


class UserIn(pd.BaseModel):
    """
    User registration data.
    """

    name: UserName
    surname: UserSurname
    age: UserAge


class UserOut(UserIn):
    """
    Registered user data.
    """

    id: UserId


class AlreadyExistsError(pjrpc.server.exceptions.TypedError):
    """
    User already registered error.
    """

    CODE = 2001
    MESSAGE = "user already exists"


class NotFoundError(pjrpc.server.exceptions.TypedError):
    """
    User not found error.
    """

    CODE = 2002
    MESSAGE = "user not found"


@methods.add(
    pass_context='request',
    metadata=[
        specs.metadata(
            summary='Creates a user',
            tags=['users'],
            errors=[AlreadyExistsError],
        ),
    ],
)
def add_user(request: web.Request, user: UserIn) -> UserOut:
    """
    Creates a user.

    :param request: http request
    :param object user: user data
    :return object: registered user
    :raise AlreadyExistsError: user already exists
    """

    for existing_user in request.config_dict['users'].values():
        if user.name == existing_user.name:
            raise AlreadyExistsError()

    user_id = uuid.uuid4()
    request.config_dict['users'][user_id] = user

    return UserOut(id=user_id, **user.model_dump())


@methods.add(
    pass_context='request',
    metadata=[
        specs.metadata(
            summary='Returns a user',
            tags=['users'],
            errors=[NotFoundError],
        ),
    ],
)
def get_user(request: web.Request, user_id: UserId) -> UserOut:
    """
    Returns a user.

    :param request: http request
    :param object user_id: user id
    :return object: registered user
    :raise NotFoundError: user not found
    """

    user = request.config_dict['users'].get(user_id.hex)
    if not user:
        raise NotFoundError()

    return UserOut(id=user_id, **user.model_dump())


@methods.add(
    pass_context='request',
    metadata=[
        specs.metadata(
            summary='Deletes a user',
            tags=['users'],
            errors=[NotFoundError],
        ),
    ],
)
def delete_user(request: web.Request, user_id: UserId) -> None:
    """
    Deletes a user.

    :param request: http request
    :param object user_id: user id
    :raise NotFoundError: user not found
    """

    user = request.config_dict['users'].pop(user_id.hex, None)
    if not user:
        raise NotFoundError()


class JSONEncoder(pjrpc.server.JSONEncoder):
    def default(self, o: Any) -> Any:
        if isinstance(o, pd.BaseModel):
            return o.model_dump()
        if isinstance(o, uuid.UUID):
            return str(o)

        return super().default(o)


async def basic_auth_middleware(request: web.Request, handler: aiohttp.typedefs.Handler) -> web.StreamResponse:
    try:
        auth = helpers.BasicAuth.decode(request.headers.get('Authorization', ''))
    except ValueError:
        raise web.HTTPUnauthorized

    if credentials.get(auth.login) != auth.password:
        raise web.HTTPUnauthorized

    return await handler(request)


openapi_spec = specs.OpenAPI(
    info=specs.Info(version="1.0.0", title="User storage"),
    servers=[
        specs.Server(
            url='http://127.0.0.1:8080',
        ),
    ],
    security_schemes=dict(
        basicAuth=specs.SecurityScheme(
            type=specs.SecuritySchemeType.HTTP,
            scheme='basic',
        ),
    ),
    security=[
        dict(basicAuth=[]),
    ],
)

http_app = web.Application()
http_app['users'] = {}

jsonrpc_app = integration.Application('/api')
jsonrpc_app.add_spec(openapi_spec, path='openapi.json')
jsonrpc_app.add_spec_ui('swagger', specs.ui.SwaggerUI(), spec_url='../openapi.json')
jsonrpc_app.add_spec_ui('redoc', specs.ui.ReDoc(), spec_url='../openapi.json')

jsonrpc_v1_app = integration.Application(
    http_app=web.Application(
        middlewares=[
            basic_auth_middleware,
        ],
    ),
    json_encoder=JSONEncoder,
)
jsonrpc_v1_app.add_methods(methods)


jsonrpc_app.add_subapp('/v1', jsonrpc_v1_app)
http_app.add_subapp('/rpc', jsonrpc_app.http_app)

cors = aiohttp_cors.setup(
    http_app, defaults={
        '*': aiohttp_cors.ResourceOptions(
            allow_credentials=True,
            expose_headers='*',
            allow_headers='*',
        ),
    },
)
for route in list(http_app.router.routes()):
    cors.add(route)


if __name__ == "__main__":
    web.run_app(http_app, host='localhost', port=8080)