Examples#

aio_pika client#

import asyncio

import pjrpc
from pjrpc.client.backend import aio_pika as pjrpc_client


async def main():
    client = pjrpc_client.Client('amqp://guest:guest@localhost:5672/v1', 'jsonrpc')
    await client.connect()

    response: pjrpc.Response = await client.send(pjrpc.Request('sum', params=[1, 2], id=1))
    print(f"1 + 2 = {response.result}")

    result = await client('sum', a=1, b=2)
    print(f"1 + 2 = {result}")

    result = await client.proxy.sum(1, 2)
    print(f"1 + 2 = {result}")

    await client.notify('tick')


if __name__ == "__main__":
    asyncio.run(main())

aio_pika server#

import asyncio
import logging
import uuid
from dataclasses import dataclass

import aio_pika
from yarl import URL

import pjrpc
from pjrpc.server.integration import aio_pika as integration


@dataclass
class UserInfo:
    """User information dataclass for the add_user example RPC call"""

    username: str
    name: str
    age: int


@dataclass
class AddedUser(UserInfo):
    """User information dataclass (with uuid) for the add_user example RPC call"""

    uuid: uuid.UUID


methods = pjrpc.server.MethodRegistry()


@methods.add
def sum(a: int, b: int) -> int:
    """RPC method implementing examples/aio_pika_client.py's calls to sum(1, 2) -> 3"""
    return a + b


@methods.add
def tick() -> None:
    """RPC method implementing examples/aio_pika_client.py's notification 'tick'"""
    print("examples/aio_pika_server.py: received tick")


@methods.add(context='message')
def add_user(message: aio_pika.IncomingMessage, user_info: UserInfo) -> AddedUser:
    """Simluate the creation of a user: Receive user info and return it with an uuid4.
    :param UserInfo user_info: user data
    :returns: user_info with a randomly generated uuid4 added
    :rtype: AddedUser"""
    return AddedUser(**user_info.__dict__, uuid=uuid.uuid4())


executor = integration.Executor(
    broker_url=URL('amqp://guest:guest@localhost:5672/v1'), queue_name='jsonrpc',
)
executor.dispatcher.add_methods(methods)

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    logging.info("Example result from a local call to add_user():")
    logging.info(add_user(None, UserInfo("username", "firstname lastname", 18)))
    loop = asyncio.new_event_loop()

    loop.run_until_complete(executor.start())
    try:
        loop.run_forever()
    finally:
        loop.run_until_complete(executor.shutdown())

aiohttp class-based handler#

import uuid

from aiohttp import web

import pjrpc.server
from pjrpc.server.integration import aiohttp

methods = pjrpc.server.MethodRegistry()


@methods.view(context='request', prefix='user')
class UserView(pjrpc.server.ViewMixin):

    def __init__(self, request: web.Request):
        super().__init__()

        self._users = request.app['users']

    async def add(self, user: dict):
        user_id = uuid.uuid4().hex
        self._users[user_id] = user

        return {'id': user_id, **user}

    async def get(self, user_id: str):
        user = self._users.get(user_id)
        if not user:
            pjrpc.exc.JsonRpcError(code=1, message='not found')

        return user


jsonrpc_app = aiohttp.Application('/api/v1')
jsonrpc_app.dispatcher.add_methods(methods)
jsonrpc_app.app['users'] = {}

if __name__ == "__main__":
    web.run_app(jsonrpc_app.app, host='localhost', port=8080)

aiohttp client#

import asyncio

import pjrpc
from pjrpc.client.backend import aiohttp as pjrpc_client


async def main():
    async with pjrpc_client.Client('http://localhost/api/v1') as client:
        response = await client.send(pjrpc.Request('sum', params=[1, 2], id=1))
        print(f"1 + 2 = {response.result}")

        result = await client('sum', a=1, b=2)
        print(f"1 + 2 = {result}")

        result = await client.proxy.sum(1, 2)
        print(f"1 + 2 = {result}")

        await client.notify('tick')


asyncio.run(main())

aiohttp client batch request#

import asyncio

import pjrpc
from pjrpc.client.backend import aiohttp as pjrpc_client


async def main():
    async with pjrpc_client.Client('http://localhost:8080/api/v1') as client:

        batch_response = await client.batch.send(
            pjrpc.BatchRequest(
                pjrpc.Request('sum', [2, 2], id=1),
                pjrpc.Request('sub', [2, 2], id=2),
                pjrpc.Request('div', [2, 2], id=3),
                pjrpc.Request('mult', [2, 2], id=4),
            ),
        )
        print(f"2 + 2 = {batch_response[0].result}")
        print(f"2 - 2 = {batch_response[1].result}")
        print(f"2 / 2 = {batch_response[2].result}")
        print(f"2 * 2 = {batch_response[3].result}")

        result = await client.batch('sum', 2, 2)('sub', 2, 2)('div', 2, 2)('mult', 2, 2).call()
        print(f"2 + 2 = {result[0]}")
        print(f"2 - 2 = {result[1]}")
        print(f"2 / 2 = {result[2]}")
        print(f"2 * 2 = {result[3]}")

        result = await client.batch[
            ('sum', 2, 2),
            ('sub', 2, 2),
            ('div', 2, 2),
            ('mult', 2, 2),
        ]
        print(f"2 + 2 = {result[0]}")
        print(f"2 - 2 = {result[1]}")
        print(f"2 / 2 = {result[2]}")
        print(f"2 * 2 = {result[3]}")

        result = await client.batch.proxy.sum(2, 2).sub(2, 2).div(2, 2).mult(2, 2).call()
        print(f"2 + 2 = {result[0]}")
        print(f"2 - 2 = {result[1]}")
        print(f"2 / 2 = {result[2]}")
        print(f"2 * 2 = {result[3]}")

        await client.batch.notify('tick').notify('tack').call()


asyncio.run(main())

aiohttp pytest integration#

from unittest import mock

import pytest

import pjrpc
from pjrpc.client.backend import aiohttp as aiohttp_client
from pjrpc.client.integrations.pytest import PjRpcAiohttpMocker


async def test_using_fixture(pjrpc_aiohttp_mocker):
    client = aiohttp_client.Client('http://localhost/api/v1')

    pjrpc_aiohttp_mocker.add('http://localhost/api/v1', 'sum', result=2)
    result = await client.proxy.sum(1, 1)
    assert result == 2

    pjrpc_aiohttp_mocker.replace(
        'http://localhost/api/v1', 'sum', error=pjrpc.exc.JsonRpcError(code=1, message='error', data='oops'),
    )
    with pytest.raises(pjrpc.exc.JsonRpcError) as exc_info:
        await client.proxy.sum(a=1, b=1)

    assert exc_info.type is pjrpc.exc.JsonRpcError
    assert exc_info.value.code == 1
    assert exc_info.value.message == 'error'
    assert exc_info.value.data == 'oops'

    localhost_calls = pjrpc_aiohttp_mocker.calls['http://localhost/api/v1']
    assert localhost_calls[('2.0', 'sum')].call_count == 2
    assert localhost_calls[('2.0', 'sum')].mock_calls == [mock.call(1, 1), mock.call(a=1, b=1)]


async def test_using_resource_manager():
    client = aiohttp_client.Client('http://localhost/api/v1')

    with PjRpcAiohttpMocker() as mocker:
        mocker.add('http://localhost/api/v1', 'div', result=2)
        result = await client.proxy.div(4, 2)
        assert result == 2

        localhost_calls = mocker.calls['http://localhost/api/v1']
        assert localhost_calls[('2.0', 'div')].mock_calls == [mock.call(4, 2)]

aiohttp server#

import uuid

from aiohttp import web

import pjrpc.server
from pjrpc.server.integration import aiohttp

methods = pjrpc.server.MethodRegistry()


@methods.add(context='request')
async def add_user(request: web.Request, user: dict):
    user_id = uuid.uuid4().hex
    request.app['users'][user_id] = user

    return {'id': user_id, **user}


jsonrpc_app = aiohttp.Application('/api/v1')
jsonrpc_app.dispatcher.add_methods(methods)
jsonrpc_app.app['users'] = {}

if __name__ == "__main__":
    web.run_app(jsonrpc_app.app, host='localhost', port=8080)

aiohttp versioning#

import uuid

from aiohttp import web

import pjrpc.server
from pjrpc.server.integration import aiohttp

methods_v1 = pjrpc.server.MethodRegistry()


@methods_v1.add(context='request')
async def add_user_v1(request: web.Request, user: dict):
    user_id = uuid.uuid4().hex
    request.config_dict['users'][user_id] = user

    return {'id': user_id, **user}


methods_v2 = pjrpc.server.MethodRegistry()


@methods_v2.add(context='request')
async def add_user_v2(request: web.Request, user: dict):
    user_id = uuid.uuid4().hex
    request.config_dict['users'][user_id] = user

    return {'id': user_id, **user}


app = web.Application()
app['users'] = {}

app_v1 = aiohttp.Application()
app_v1.dispatcher.add_methods(methods_v1)
app.add_subapp('/api/v1', app_v1.app)


app_v2 = aiohttp.Application()
app_v2.dispatcher.add_methods(methods_v2)
app.add_subapp('/api/v2', app_v2.app)

if __name__ == "__main__":
    web.run_app(app, host='localhost', port=8080)

client prometheus metrics#

import time

import prometheus_client as prom_cli

from pjrpc.client import tracer
from pjrpc.client.backend import requests as pjrpc_client

method_latency_hist = prom_cli.Histogram('method_latency', 'Method latency', labelnames=['method'])
method_call_total = prom_cli.Counter('method_call_total', 'Method call count', labelnames=['method'])
method_errors_total = prom_cli.Counter('method_errors_total', 'Method errors count', labelnames=['method', 'code'])


class PrometheusTracer(tracer.Tracer):
    def on_request_begin(self, trace_context, request):
        trace_context.started_at = time.time()
        method_call_total.labels(request.method).inc()

    def on_request_end(self, trace_context, request, response):
        method_latency_hist.labels(request.method).observe(time.time() - trace_context.started_at)
        if response.is_error:
            method_call_total.labels(request.method, response.error.code).inc()

    def on_error(self, trace_context, request, error):
        method_latency_hist.labels(request.method).observe(time.time() - trace_context.started_at)


client = pjrpc_client.Client(
    'http://localhost/api/v1', tracers=(
        PrometheusTracer(),
    ),
)

result = client.proxy.sum(1, 2)

client tracing#

import opentracing
from opentracing import tags

from pjrpc.client import tracer
from pjrpc.client.backend import requests as pjrpc_client


class ClientTracer(tracer.Tracer):

    def __init__(self):
        super().__init__()
        self._tracer = opentracing.global_tracer()

    def on_request_begin(self, trace_context, request):
        span = self._tracer.start_active_span(f'jsonrpc.{request.method}').span
        span.set_tag(tags.COMPONENT, 'pjrpc.client')
        span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_CLIENT)

    def on_request_end(self, trace_context, request, response):
        span = self._tracer.active_span
        span.set_tag(tags.ERROR, response.is_error)
        if response.is_error:
            span.set_tag('jsonrpc.error_code', response.error.code)
            span.set_tag('jsonrpc.error_message', response.error.message)

        span.finish()

    def on_error(self, trace_context, request, error):
        span = self._tracer.active_span
        span.set_tag(tags.ERROR, True)
        span.finish()


client = pjrpc_client.Client(
    'http://localhost/api/v1', tracers=(
        ClientTracer(),
    ),
)

result = client.proxy.sum(1, 2)

flask class-based handler#

import uuid

import flask

import pjrpc
from pjrpc.server.integration import flask as integration

app = flask.Flask(__name__)

methods = pjrpc.server.MethodRegistry()


@methods.view(prefix='user')
class UserView(pjrpc.server.ViewMixin):

    def __init__(self):
        super().__init__()

        self._users = flask.current_app.users

    def add(self, user: dict):
        user_id = uuid.uuid4().hex
        self._users[user_id] = user

        return {'id': user_id, **user}

    def get(self, user_id: str):
        user = self._users.get(user_id)
        if not user:
            pjrpc.exc.JsonRpcError(code=1, message='not found')

        return user


json_rpc = integration.JsonRPC('/api/v1')
json_rpc.dispatcher.add_methods(methods)

app.users = {}

json_rpc.init_app(app)

if __name__ == "__main__":
    app.run(port=8080)

flask server#

import uuid

import flask

import pjrpc
from pjrpc.server.integration import flask as integration

app = flask.Flask(__name__)

methods = pjrpc.server.MethodRegistry()


@methods.add
def add_user(user: dict):
    user_id = uuid.uuid4().hex
    flask.current_app.users[user_id] = user

    return {'id': user_id, **user}


json_rpc = integration.JsonRPC('/api/v1')
json_rpc.dispatcher.add_methods(methods)

app.users = {}

json_rpc.init_app(app)

if __name__ == "__main__":
    app.run(port=8080)

flask versioning#

import uuid

import flask

import pjrpc.server
from pjrpc.server.integration import flask as integration

methods_v1 = pjrpc.server.MethodRegistry()


@methods_v1.add
def add_user_v1(user: dict):
    user_id = uuid.uuid4().hex
    flask.current_app.users[user_id] = user

    return {'id': user_id, **user}


methods_v2 = pjrpc.server.MethodRegistry()


@methods_v2.add
def add_user_v2(user: dict):
    user_id = uuid.uuid4().hex
    flask.current_app.users[user_id] = user

    return {'id': user_id, **user}


app_v1 = flask.blueprints.Blueprint('v1', __name__)

json_rpc = integration.JsonRPC('/api/v1')
json_rpc.dispatcher.add_methods(methods_v1)
json_rpc.init_app(app_v1)


app_v2 = flask.blueprints.Blueprint('v2', __name__)

json_rpc = integration.JsonRPC('/api/v2')
json_rpc.dispatcher.add_methods(methods_v2)
json_rpc.init_app(app_v2)


app = flask.Flask(__name__)
app.register_blueprint(app_v1)
app.register_blueprint(app_v2)
app.users = {}


if __name__ == "__main__":
    app.run(port=8080)

httpserver#

import http.server
import socketserver
import uuid

import pjrpc
import pjrpc.server


class JsonRpcHandler(http.server.BaseHTTPRequestHandler):
    """
    JSON-RPC handler.
    """

    def do_POST(self):
        """
        Handles JSON-RPC request.
        """

        content_type = self.headers.get('Content-Type')
        if content_type not in pjrpc.common.REQUEST_CONTENT_TYPES:
            self.send_error(http.HTTPStatus.UNSUPPORTED_MEDIA_TYPE)
            return

        try:
            content_length = int(self.headers.get('Content-Length', -1))
            request_text = self.rfile.read(content_length).decode()
        except UnicodeDecodeError:
            self.send_error(http.HTTPStatus.BAD_REQUEST)
            return

        response_text = self.server.dispatcher.dispatch(request_text, context=self)
        if response_text is None:
            self.send_response_only(http.HTTPStatus.OK)
            self.end_headers()
        else:
            self.send_response(http.HTTPStatus.OK)
            self.send_header("Content-type", pjrpc.common.DEFAULT_CONTENT_TYPE)
            self.end_headers()

            self.wfile.write(response_text.encode())


class JsonRpcServer(http.server.HTTPServer):
    """
    :py:class:`http.server.HTTPServer` based JSON-RPC server.

    :param path: JSON-RPC handler base path
    :param kwargs: arguments to be passed to the dispatcher :py:class:`pjrpc.server.Dispatcher`
    """

    def __init__(self, server_address, RequestHandlerClass=JsonRpcHandler, bind_and_activate=True, **kwargs):
        super().__init__(server_address, RequestHandlerClass, bind_and_activate)
        self._dispatcher = pjrpc.server.Dispatcher(**kwargs)

    @property
    def dispatcher(self):
        """
        JSON-RPC method dispatcher.
        """

        return self._dispatcher


methods = pjrpc.server.MethodRegistry()


@methods.add(context='request')
def add_user(request: http.server.BaseHTTPRequestHandler, user: dict):
    user_id = uuid.uuid4().hex
    request.server.users[user_id] = user

    return {'id': user_id, **user}


class ThreadingJsonRpcServer(socketserver.ThreadingMixIn, JsonRpcServer):
    users = {}


with ThreadingJsonRpcServer(("localhost", 8080)) as server:
    server.dispatcher.add_methods(methods)

    server.serve_forever()

jsonschema validator#

import uuid

from aiohttp import web

import pjrpc.server
from pjrpc.server.integration import aiohttp
from pjrpc.server.validators import jsonschema as validators

methods = pjrpc.server.MethodRegistry()
validator = validators.JsonSchemaValidator()


contact_schema = {
    'type': 'object',
    'properties': {
        'type': {
            'type': 'string',
            'enum': ['phone', 'email'],
        },
        'value': {'type': 'string'},
    },
    'required': ['type', 'value'],
}

user_schema = {
    'type': 'object',
    'properties': {
        'name': {'type': 'string'},
        'surname': {'type': 'string'},
        'age': {'type': 'integer'},
        'contacts': {
            'type': 'array',
            'items': contact_schema,
        },
    },
    'required': ['name', 'surname', 'age', 'contacts'],
}

params_schema = {
    'type': 'object',
    'properties': {
        'user': user_schema,
    },
    'required': ['user'],
}


@methods.add(context='request')
@validator.validate(schema=params_schema)
async def add_user(request: web.Request, user):
    user_id = uuid.uuid4().hex
    request.app['users'][user_id] = user

    return {'id': user_id, **user}


jsonrpc_app = aiohttp.Application('/api/v1')
jsonrpc_app.dispatcher.add_methods(methods)
jsonrpc_app.app['users'] = {}

if __name__ == "__main__":
    web.run_app(jsonrpc_app.app, host='localhost', port=8080)

kombu client#

import pjrpc
from pjrpc.client.backend import kombu as pjrpc_client

client = pjrpc_client.Client(
    'amqp://guest:guest@localhost:5672/v1',
    'jsonrpc',
    # Compatible with queue of examples/aio_pika_*
    # and works better with examples/kombu_server.py:
    result_queue_args={"durable": False},
)


response: pjrpc.Response = client.send(pjrpc.Request('sum', params=[1, 2], id=1))
print(f"1 + 2 = {response.result}")

result = client('sum', a=1, b=2)
print(f"1 + 2 = {result}")

result = client.proxy.sum(1, 2)
print(f"1 + 2 = {result}")

client.notify('tick')

kombu server#

import uuid

import kombu

import pjrpc
from pjrpc.server.integration import kombu as integration

methods = pjrpc.server.MethodRegistry()


@methods.add(context='message')
def add_user(message: kombu.Message, user: dict):
    user_id = uuid.uuid4().hex

    return {'id': user_id, **user}


@methods.add  # type: ignore
def sum(a: int, b: int) -> int:
    """RPC method sum(a, b) for kombu_client.py and aio_pika_client.py"""
    return a + b


@methods.add  # type: ignore
def tick() -> None:
    """RPC notification "tick" for kombu_client.py and aio_pika_client.py"""
    print("received tick")


# Note: The server may not work well with examples/kombu_client.py yet.
# Use with examples/aio_pika_client.py in case server or client gets stuck.

if __name__ == "__main__":
    executor = integration.Executor(
        "amqp://guest:guest@localhost:5672/v1",
        queue_name="jsonrpc",
        # Compatible with queue of examples/aio_pika_*
        # and works better with examples/kombu_client.py:
        queue_args={"durable": False},
    )
    executor.dispatcher.add_methods(methods)
    executor.run()

middlewares#

from typing import Any

from aiohttp import web

import pjrpc.server
from pjrpc.common import Request
from pjrpc.server.integration import aiohttp
from pjrpc.server.typedefs import AsyncHandlerType, ContextType, MiddlewareResponse

methods = pjrpc.server.MethodRegistry()


@methods.add(context='request')
async def method(request: Any) -> None:
    print("method")


async def middleware1(
    request: Request, context: ContextType, handler: AsyncHandlerType,
) -> MiddlewareResponse:
    print("middleware1 started")
    result = await handler(request, context)
    print("middleware1 finished")

    return result


async def middleware2(
    request: Request, context: ContextType, handler: AsyncHandlerType,
) -> MiddlewareResponse:
    print("middleware2 started")
    result = await handler(request, context)
    print("middleware2 finished")

    return result

jsonrpc_app = aiohttp.Application(
    '/api/v1', middlewares=(
        middleware1,
        middleware2,
    ),
)
jsonrpc_app.dispatcher.add_methods(methods)

if __name__ == "__main__":
    web.run_app(jsonrpc_app.app, host='localhost', port=8080)

multiple clients#

import pjrpc
from pjrpc.client.backend import requests as jrpc_client


class ErrorV1(pjrpc.exc.JsonRpcError):
    @classmethod
    def get_error_cls(cls, code, default):
        return next(iter((c for c in cls.__subclasses__() if getattr(c, 'code', None) == code)), default)


class PermissionDenied(ErrorV1):
    code = 1
    message = 'permission denied'


class ErrorV2(pjrpc.exc.JsonRpcError):
    @classmethod
    def get_error_cls(cls, code, default):
        return next(iter((c for c in cls.__subclasses__() if getattr(c, 'code', None) == code)), default)


class ResourceNotFound(ErrorV2):
    code = 1
    message = 'resource not found'


client_v1 = jrpc_client.Client('http://localhost:8080/api/v1', error_cls=ErrorV1)
client_v2 = jrpc_client.Client('http://localhost:8080/api/v2', error_cls=ErrorV2)

try:
    response: pjrpc.Response = client_v1.proxy.add_user(user={})
except PermissionDenied as e:
    print(e)

try:
    response: pjrpc.Response = client_v2.proxy.add_user(user={})
except ResourceNotFound as e:
    print(e)

pydantic validator#

import enum
import uuid
from typing import List

import pydantic
from aiohttp import web

import pjrpc.server
from pjrpc.server.integration import aiohttp
from pjrpc.server.validators import pydantic as validators

methods = pjrpc.server.MethodRegistry()
validator = validators.PydanticValidator()


class ContactType(enum.Enum):
    PHONE = 'phone'
    EMAIL = 'email'


class Contact(pydantic.BaseModel):
    type: ContactType
    value: str


class User(pydantic.BaseModel):
    name: str
    surname: str
    age: int
    contacts: List[Contact]


@methods.add(context='request')
@validator.validate
async def add_user(request: web.Request, user: User):
    user_id = uuid.uuid4()
    request.app['users'][user_id] = user

    return {'id': user_id, **user.dict()}


class JSONEncoder(pjrpc.server.JSONEncoder):

    def default(self, o):
        if isinstance(o, uuid.UUID):
            return o.hex
        if isinstance(o, enum.Enum):
            return o.value

        return super().default(o)


jsonrpc_app = aiohttp.Application('/api/v1', json_encoder=JSONEncoder)
jsonrpc_app.dispatcher.add_methods(methods)
jsonrpc_app.app['users'] = {}

if __name__ == "__main__":
    web.run_app(jsonrpc_app.app, host='localhost', port=8080)

requests client#

import pjrpc
from pjrpc.client.backend import requests as pjrpc_client

client = pjrpc_client.Client('http://localhost/api/v1')

response: pjrpc.Response = client.send(pjrpc.Request('sum', params=[1, 2], id=1))
print(f"1 + 2 = {response.result}")

result = client('sum', a=1, b=2)
print(f"1 + 2 = {result}")

result = client.proxy.sum(1, 2)
print(f"1 + 2 = {result}")

client.notify('tick')

requests pytest#

from unittest import mock

import pytest

import pjrpc
from pjrpc.client.backend import requests as requests_client
from pjrpc.client.integrations.pytest import PjRpcRequestsMocker


def test_using_fixture(pjrpc_requests_mocker):
    client = requests_client.Client('http://localhost/api/v1')

    pjrpc_requests_mocker.add('http://localhost/api/v1', 'sum', result=2)
    result = client.proxy.sum(1, 1)
    assert result == 2

    pjrpc_requests_mocker.replace(
        'http://localhost/api/v1', 'sum', error=pjrpc.exc.JsonRpcError(code=1, message='error', data='oops'),
    )
    with pytest.raises(pjrpc.exc.JsonRpcError) as exc_info:
        client.proxy.sum(a=1, b=1)

    assert exc_info.type is pjrpc.exc.JsonRpcError
    assert exc_info.value.code == 1
    assert exc_info.value.message == 'error'
    assert exc_info.value.data == 'oops'

    localhost_calls = pjrpc_requests_mocker.calls['http://localhost/api/v1']
    assert localhost_calls[('2.0', 'sum')].call_count == 2
    assert localhost_calls[('2.0', 'sum')].mock_calls == [mock.call(1, 1), mock.call(a=1, b=1)]

    client = requests_client.Client('http://localhost/api/v2')
    with pytest.raises(ConnectionRefusedError):
        client.proxy.sum(1, 1)


def test_using_resource_manager():
    client = requests_client.Client('http://localhost/api/v1')

    with PjRpcRequestsMocker() as mocker:
        mocker.add('http://localhost/api/v1', 'mult', result=4)
        mocker.add('http://localhost/api/v1', 'div', callback=lambda a, b: a/b)

        result = client.batch.proxy.div(4, 2).mult(2, 2).call()
        assert result == (2, 4)

        localhost_calls = mocker.calls['http://localhost/api/v1']
        assert localhost_calls[('2.0', 'div')].mock_calls == [mock.call(4, 2)]
        assert localhost_calls[('2.0', 'mult')].mock_calls == [mock.call(2, 2)]

        with pytest.raises(pjrpc.exc.MethodNotFoundError):
            client.proxy.sub(4, 2)

sentry#

import sentry_sdk
from aiohttp import web

import pjrpc.server
from pjrpc.server.integration import aiohttp

methods = pjrpc.server.MethodRegistry()


@methods.add(context='request')
async def method(request):
    print("method")


async def sentry_middleware(request, context, handler):
    try:
        return await handler(request, context)
    except pjrpc.exceptions.JsonRpcError as e:
        sentry_sdk.capture_exception(e)
        raise


jsonrpc_app = aiohttp.Application(
    '/api/v1', middlewares=(
        sentry_middleware,
    ),
)
jsonrpc_app.dispatcher.add_methods(methods)

if __name__ == "__main__":
    web.run_app(jsonrpc_app.app, host='localhost', port=8080)

server prometheus metrics#

import asyncio
from typing import Any, Callable

import prometheus_client as pc
from aiohttp import web

import pjrpc.server
from pjrpc.server.integration import aiohttp

method_error_count = pc.Counter('method_error_count', 'Method error count', labelnames=['method', 'code'])
method_latency_hist = pc.Histogram('method_latency', 'Method latency', labelnames=['method'])
method_active_count = pc.Gauge('method_active_count', 'Method active count', labelnames=['method'])


async def metrics(request):
    return web.Response(body=pc.generate_latest())

http_app = web.Application()
http_app.add_routes([web.get('/metrics', metrics)])


methods = pjrpc.server.MethodRegistry()


@methods.add(context='context')
async def method(context: web.Request):
    print("method started")
    await asyncio.sleep(1)
    print("method finished")


async def latency_metric_middleware(request: pjrpc.Request, context: web.Request, handler: Callable) -> Any:
    with method_latency_hist.labels(method=request.method).time():
        return await handler(request, context)


async def active_count_metric_middleware(request: pjrpc.Request, context: web.Request, handler: Callable) -> Any:
    with method_active_count.labels(method=request.method).track_inprogress():
        return await handler(request, context)


async def any_error_handler(
    request: pjrpc.Request, context: web.Request, error: pjrpc.exceptions.JsonRpcError,
) -> pjrpc.exceptions.JsonRpcError:
    method_error_count.labels(method=request.method, code=error.code).inc()

    return error


async def validation_error_handler(
    request: pjrpc.Request, context: web.Request, error: pjrpc.exceptions.JsonRpcError,
) -> pjrpc.exceptions.JsonRpcError:
    print("validation error occurred")

    return error


jsonrpc_app = aiohttp.Application(
    '/api/v1',
    app=http_app,
    middlewares=(
        latency_metric_middleware,
        active_count_metric_middleware,
    ),
    error_handlers={
        -32602: [validation_error_handler],
        None: [any_error_handler],
    },
)
jsonrpc_app.dispatcher.add_methods(methods)

if __name__ == "__main__":
    web.run_app(jsonrpc_app.app, host='localhost', port=8080)

server tracing#

import asyncio

import opentracing
from aiohttp import web
from opentracing import tags

import pjrpc.server
from pjrpc.server.integration import aiohttp


@web.middleware
async def http_tracing_middleware(request, handler):
    """
    aiohttp server tracer.
    """

    tracer = opentracing.global_tracer()
    try:
        span_ctx = tracer.extract(format=opentracing.Format.HTTP_HEADERS, carrier=request.headers)
    except (opentracing.InvalidCarrierException, opentracing.SpanContextCorruptedException):
        span_ctx = None

    span = tracer.start_span(f'http.{request.method}', child_of=span_ctx)
    span.set_tag(tags.COMPONENT, 'aiohttp.server')
    span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_SERVER)
    span.set_tag(tags.PEER_ADDRESS, request.remote)
    span.set_tag(tags.HTTP_URL, str(request.url))
    span.set_tag(tags.HTTP_METHOD, request.method)

    with tracer.scope_manager.activate(span, finish_on_close=True):
        response: web.Response = await handler(request)
        span.set_tag(tags.HTTP_STATUS_CODE, response.status)
        span.set_tag(tags.ERROR, response.status >= 400)

    return response

http_app = web.Application(
    middlewares=(
        http_tracing_middleware,
    ),
)

methods = pjrpc.server.MethodRegistry()


@methods.add(context='context')
async def method(context):
    print("method started")
    await asyncio.sleep(1)
    print("method finished")


async def jsonrpc_tracing_middleware(request, context, handler):
    tracer = opentracing.global_tracer()
    span = tracer.start_span(f'jsonrpc.{request.method}')

    span.set_tag(tags.COMPONENT, 'pjrpc')
    span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_SERVER)
    span.set_tag('jsonrpc.version', request.version)
    span.set_tag('jsonrpc.id', request.id)
    span.set_tag('jsonrpc.method', request.method)

    with tracer.scope_manager.activate(span, finish_on_close=True):
        response = await handler(request, context)
        if response.is_error:
            span.set_tag('jsonrpc.error_code', response.error.code)
            span.set_tag('jsonrpc.error_message', response.error.message)
            span.set_tag(tags.ERROR, True)
        else:
            span.set_tag(tags.ERROR, False)

    return response

jsonrpc_app = aiohttp.Application(
    '/api/v1', app=http_app, middlewares=(
        jsonrpc_tracing_middleware,
    ),
)
jsonrpc_app.dispatcher.add_methods(methods)

if __name__ == "__main__":
    web.run_app(jsonrpc_app.app, host='localhost', port=8080)

werkzeug server#

import uuid

import werkzeug

import pjrpc.server
from pjrpc.server.integration import werkzeug as integration

methods = pjrpc.server.MethodRegistry()


@methods.add(context='request')
def add_user(request: werkzeug.Request, user: dict):
    user_id = uuid.uuid4().hex
    request.environ['app'].users[user_id] = user

    return {'id': user_id, **user}


app = integration.JsonRPC('/api/v1')
app.dispatcher.add_methods(methods)
app.users = {}


if __name__ == '__main__':
    werkzeug.serving.run_simple('127.0.0.1', 8080, app)

flask OpenAPI specification#

import uuid
from typing import Any, Optional

import flask
import flask_cors
import flask_httpauth
import pydantic
from werkzeug import security

import pjrpc.server.specs.extractors.docstring
import pjrpc.server.specs.extractors.pydantic
from pjrpc.server.integration import flask as integration
from pjrpc.server.specs import extractors
from pjrpc.server.specs import openapi as specs
from pjrpc.server.validators import pydantic as validators

app = flask.Flask('myapp')
flask_cors.CORS(app, resources={"/myapp/api/v1/*": {"origins": "*"}})

methods = pjrpc.server.MethodRegistry()
validator = validators.PydanticValidator()

auth = flask_httpauth.HTTPBasicAuth()
credentials = {"admin": security.generate_password_hash("admin")}


@auth.verify_password
def verify_password(username: str, password: str) -> Optional[str]:
    if username in credentials and security.check_password_hash(credentials.get(username), password):
        return username


class AuthenticatedJsonRPC(integration.JsonRPC):
    @auth.login_required
    def _rpc_handle(self, dispatcher: pjrpc.server.Dispatcher) -> flask.Response:
        return super()._rpc_handle(dispatcher)


class JSONEncoder(pjrpc.JSONEncoder):
    def default(self, o: Any) -> Any:
        if isinstance(o, pydantic.BaseModel):
            return o.dict()
        if isinstance(o, uuid.UUID):
            return str(o)

        return super().default(o)


class UserIn(pydantic.BaseModel):
    """
    User registration data.
    """

    name: str
    surname: str
    age: int


class UserOut(UserIn):
    """
    Registered user data.
    """

    id: uuid.UUID


class AlreadyExistsError(pjrpc.exc.JsonRpcError):
    """
    User already registered error.
    """

    code = 2001
    message = "user already exists"


class NotFoundError(pjrpc.exc.JsonRpcError):
    """
    User not found error.
    """

    code = 2002
    message = "user not found"


@specs.annotate(
    tags=['users'],
    errors=[AlreadyExistsError],
    examples=[
        specs.MethodExample(
            summary="Simple example",
            params=dict(
                user={
                    'name': 'John',
                    'surname': 'Doe',
                    'age': 25,
                },
            ),
            result={
                'id': 'c47726c6-a232-45f1-944f-60b98966ff1b',
                'name': 'John',
                'surname': 'Doe',
                'age': 25,
            },
        ),
    ],
)
@methods.add
@validator.validate
def add_user(user: UserIn) -> UserOut:
    """
    Creates a user.

    :param object user: user data
    :return object: registered user
    :raise AlreadyExistsError: user already exists
    """

    for existing_user in flask.current_app.users_db.values():
        if user.name == existing_user.name:
            raise AlreadyExistsError()

    user_id = uuid.uuid4().hex
    flask.current_app.users_db[user_id] = user

    return UserOut(id=user_id, **user.dict())


@specs.annotate(
    tags=['users'],
    errors=[NotFoundError],
    examples=[
        specs.MethodExample(
            summary='Simple example',
            params=dict(
                user_id='c47726c6-a232-45f1-944f-60b98966ff1b',
            ),
            result={
                'id': 'c47726c6-a232-45f1-944f-60b98966ff1b',
                'name': 'John',
                'surname': 'Doe',
                'age': 25,
            },
        ),
    ],
)
@methods.add
@validator.validate
def get_user(user_id: uuid.UUID) -> UserOut:
    """
    Returns a user.

    :param object user_id: user id
    :return object: registered user
    :raise NotFoundError: user not found
    """

    user = flask.current_app.users_db.get(user_id.hex)
    if not user:
        raise NotFoundError()

    return UserOut(id=user_id, **user.dict())


@specs.annotate(
    tags=['users'],
    errors=[NotFoundError],
    examples=[
        specs.MethodExample(
            summary='Simple example',
            params=dict(
                user_id='c47726c6-a232-45f1-944f-60b98966ff1b',
            ),
            result=None,
        ),
    ],
)
@methods.add
@validator.validate
def delete_user(user_id: uuid.UUID) -> None:
    """
    Deletes a user.

    :param object user_id: user id
    :raise NotFoundError: user not found
    """

    user = flask.current_app.users_db.pop(user_id.hex, None)
    if not user:
        raise NotFoundError()


json_rpc = AuthenticatedJsonRPC(
    '/api/v1',
    json_encoder=JSONEncoder,
    spec=specs.OpenAPI(
        info=specs.Info(version="1.0.0", title="User storage"),
        servers=[
            specs.Server(
                url='http://127.0.0.1:8080',
            ),
        ],
        security_schemes=dict(
            basicAuth=specs.SecurityScheme(
                type=specs.SecuritySchemeType.HTTP,
                scheme='basic',
            ),
        ),
        security=[
            dict(basicAuth=[]),
        ],
        schema_extractors=[
            extractors.docstring.DocstringSchemaExtractor(),
            extractors.pydantic.PydanticSchemaExtractor(),
        ],
        ui=specs.SwaggerUI(),
        # ui=specs.RapiDoc(),
        # ui=specs.ReDoc(),
    ),
)
json_rpc.dispatcher.add_methods(methods)

app.users_db = {}

myapp = flask.Blueprint('myapp', __name__, url_prefix='/myapp')
json_rpc.init_app(myapp)

app.register_blueprint(myapp)

if __name__ == "__main__":
    app.run(port=8080)

aiohttp OpenAPI specification#

import uuid
from typing import Any

import aiohttp_cors
import pydantic
from aiohttp import helpers, web

import pjrpc.server.specs.extractors.docstring
import pjrpc.server.specs.extractors.pydantic
from pjrpc.server.integration import aiohttp as integration
from pjrpc.server.specs import extractors
from pjrpc.server.specs import openapi as specs
from pjrpc.server.validators import pydantic as validators

methods = pjrpc.server.MethodRegistry()
validator = validators.PydanticValidator()

credentials = {"admin": "admin"}


class JSONEncoder(pjrpc.JSONEncoder):
    def default(self, o: Any) -> Any:
        if isinstance(o, pydantic.BaseModel):
            return o.dict()
        if isinstance(o, uuid.UUID):
            return str(o)

        return super().default(o)


class AuthenticatedJsonRPC(integration.Application):
    async def _rpc_handle(self, http_request: web.Request, dispatcher: pjrpc.server.Dispatcher) -> web.Response:
        try:
            auth = helpers.BasicAuth.decode(http_request.headers.get('Authorization', ''))
        except ValueError:
            raise web.HTTPUnauthorized

        if credentials.get(auth.login) != auth.password:
            raise web.HTTPUnauthorized

        return await super()._rpc_handle(http_request=http_request, dispatcher=dispatcher)


class UserIn(pydantic.BaseModel):
    """
    User registration data.
    """

    name: str
    surname: str
    age: int


class UserOut(UserIn):
    """
    Registered user data.
    """

    id: uuid.UUID


class AlreadyExistsError(pjrpc.exc.JsonRpcError):
    """
    User already registered error.
    """

    code = 2001
    message = "user already exists"


class NotFoundError(pjrpc.exc.JsonRpcError):
    """
    User not found error.
    """

    code = 2002
    message = "user not found"


@specs.annotate(
    tags=['users'],
    errors=[AlreadyExistsError],
    examples=[
        specs.MethodExample(
            summary="Simple example",
            params=dict(
                user={
                    'name': 'John',
                    'surname': 'Doe',
                    'age': 25,
                },
            ),
            result={
                'id': 'c47726c6-a232-45f1-944f-60b98966ff1b',
                'name': 'John',
                'surname': 'Doe',
                'age': 25,
            },
        ),
    ],
)
@methods.add(context='request')
@validator.validate
def add_user(request: web.Request, user: UserIn) -> UserOut:
    """
    Creates a user.

    :param request: http request
    :param object user: user data
    :return object: registered user
    :raise AlreadyExistsError: user already exists
    """

    for existing_user in request.config_dict['users'].values():
        if user.name == existing_user.name:
            raise AlreadyExistsError()

    user_id = uuid.uuid4().hex
    request.config_dict['users'][user_id] = user

    return UserOut(id=user_id, **user.dict())


@specs.annotate(
    tags=['users'],
    errors=[NotFoundError],
    examples=[
        specs.MethodExample(
            summary='Simple example',
            params=dict(
                user_id='c47726c6-a232-45f1-944f-60b98966ff1b',
            ),
            result={
                 'id': 'c47726c6-a232-45f1-944f-60b98966ff1b',
                 'name': 'John',
                 'surname': 'Doe',
                 'age': 25,
            },
        ),
    ],
)
@methods.add(context='request')
@validator.validate
def get_user(request: web.Request, user_id: uuid.UUID) -> UserOut:
    """
    Returns a user.

    :param request: http request
    :param object user_id: user id
    :return object: registered user
    :raise NotFoundError: user not found
    """

    user = request.config_dict['users'].get(user_id.hex)
    if not user:
        raise NotFoundError()

    return UserOut(id=user_id, **user.dict())


@specs.annotate(
    tags=['users'],
    errors=[NotFoundError],
    examples=[
        specs.MethodExample(
            summary='Simple example',
            params=dict(
                user_id='c47726c6-a232-45f1-944f-60b98966ff1b',
            ),
            result=None,
        ),
    ],
)
@methods.add(context='request')
@validator.validate
def delete_user(request: web.Request, user_id: uuid.UUID) -> None:
    """
    Deletes a user.

    :param request: http request
    :param object user_id: user id
    :raise NotFoundError: user not found
    """

    user = request.config_dict['users'].pop(user_id.hex, None)
    if not user:
        raise NotFoundError()


app = web.Application()
app['users'] = {}

jsonrpc_app = AuthenticatedJsonRPC(
    '/api/v1',
    json_encoder=JSONEncoder,
    spec=specs.OpenAPI(
        info=specs.Info(version="1.0.0", title="User storage"),
        servers=[
            specs.Server(
                url='http://127.0.0.1:8080',
            ),
        ],
        security_schemes=dict(
            basicAuth=specs.SecurityScheme(
                type=specs.SecuritySchemeType.HTTP,
                scheme='basic',
            ),
        ),
        security=[
            dict(basicAuth=[]),
        ],
        schema_extractors=[
            extractors.docstring.DocstringSchemaExtractor(),
            extractors.pydantic.PydanticSchemaExtractor(),
        ],
        ui=specs.SwaggerUI(),
        # ui=specs.RapiDoc(),
        # ui=specs.ReDoc(),
    ),
)
jsonrpc_app.dispatcher.add_methods(methods)
app.add_subapp('/myapp', jsonrpc_app.app)

cors = aiohttp_cors.setup(
    app, defaults={
        '*': aiohttp_cors.ResourceOptions(
            allow_credentials=True,
            expose_headers='*',
            allow_headers='*',
        ),
    },
)
for route in list(app.router.routes()):
    cors.add(route)


if __name__ == "__main__":
    web.run_app(app, host='localhost', port=8080)

flask OpenRPC specification#

import uuid
from typing import Any

import flask
import pydantic
from flask_cors import CORS

import pjrpc.server.specs.extractors.docstring
import pjrpc.server.specs.extractors.pydantic
from pjrpc.server.integration import flask as integration
from pjrpc.server.specs import extractors
from pjrpc.server.specs import openrpc as specs
from pjrpc.server.validators import pydantic as validators

app = flask.Flask(__name__)
CORS(app, resources={r"/api/v1/*": {"origins": "*"}})

methods = pjrpc.server.MethodRegistry()
validator = validators.PydanticValidator()


class JsonEncoder(pjrpc.JSONEncoder):
    def default(self, o: Any) -> Any:
        if isinstance(o, pydantic.BaseModel):
            return o.dict()
        if isinstance(o, uuid.UUID):
            return str(o)

        return super().default(o)


class UserIn(pydantic.BaseModel):
    """
    User registration data.
    """

    name: str
    surname: str
    age: int


class UserOut(UserIn):
    """
    Registered user data.
    """

    id: uuid.UUID


class AlreadyExistsError(pjrpc.exc.JsonRpcError):
    """
    User already registered error.
    """

    code = 2001
    message = "user already exists"


class NotFoundError(pjrpc.exc.JsonRpcError):
    """
    User not found error.
    """

    code = 2002
    message = "user not found"


@specs.annotate(
    errors=[AlreadyExistsError],
    tags=['users'],
    examples=[
        specs.MethodExample(
            name='Simple user',
            params=[
                specs.ExampleObject(
                    name='user',
                    value={
                        'name': 'John',
                        'surname': 'Doe',
                        'age': 25,
                    },
                ),
            ],
            result=specs.ExampleObject(
                name='result',
                value={
                    'id': 'c47726c6-a232-45f1-944f-60b98966ff1b',
                    'name': 'John',
                    'surname': 'Doe',
                    'age': 25,
                },
            ),
        ),
    ],
)
@methods.add
@validator.validate
def add_user(user: UserIn) -> UserOut:
    """
    Adds a new user.

    :param object user: user data
    :return object: registered user
    :raise AlreadyExistsError: user already exists
    """

    for existing_user in flask.current_app.users_db.values():
        if user.name == existing_user.name:
            raise AlreadyExistsError()

    user_id = uuid.uuid4().hex
    flask.current_app.users_db[user_id] = user

    return UserOut(id=user_id, **user.dict())


@specs.annotate(
    tags=['users'],
    errors=[NotFoundError],
    examples=[
        specs.MethodExample(
            name='Simple example',
            params=[
                specs.ExampleObject(
                    name='user',
                    value={
                        'user_id': 'c47726c6-a232-45f1-944f-60b98966ff1b',
                    },
                ),
            ],
            result=specs.ExampleObject(
                name="result",
                value={
                    'id': 'c47726c6-a232-45f1-944f-60b98966ff1b',
                    'name': 'John',
                    'surname': 'Doe',
                    'age': 25,
                },
            ),
        ),
    ],
)
@methods.add
@validator.validate
def get_user(user_id: uuid.UUID) -> UserOut:
    """
    Returns a user.

    :param object user_id: user id
    :return object: registered user
    :raise NotFoundError: user not found
    """

    user = flask.current_app.users_db.get(user_id.hex)
    if not user:
        raise NotFoundError()

    return UserOut(id=user_id, **user.dict())


@specs.annotate(
    tags=['users'],
    errors=[NotFoundError],
    examples=[
        specs.MethodExample(
            name='Simple example',
            summary='Simple example',
            params=[
                specs.ExampleObject(
                    name='user',
                    value={
                        'user_id': 'c47726c6-a232-45f1-944f-60b98966ff1b',
                    },
                ),
            ],
            result=specs.ExampleObject(
                name="result",
                value=None,
            ),
        ),
    ],
)
@methods.add
@validator.validate
def delete_user(user_id: uuid.UUID) -> None:
    """
    Deletes a user.

    :param object user_id: user id
    :raise NotFoundError: user not found
    """

    user = flask.current_app.users_db.pop(user_id.hex, None)
    if not user:
        raise NotFoundError()


json_rpc = integration.JsonRPC(
    '/api/v1',
    json_encoder=JsonEncoder,
    spec=specs.OpenRPC(
        info=specs.Info(version="1.0.0", title="User storage"),
        servers=[
            specs.Server(
                name='test',
                url='http://127.0.0.1:8080/api/v1/',
                summary='test server',
            ),
        ],
        schema_extractor=extractors.pydantic.PydanticSchemaExtractor(),
    ),
)
json_rpc.dispatcher.add_methods(methods)

app.users_db = {}

json_rpc.init_app(app)

if __name__ == "__main__":
    app.run(port=8080)