Source code for pjrpc.server.integration.aiohttp

"""
aiohttp JSON-RPC server integration.
"""

import functools as ft
import inspect
import json
from typing import Any, Callable, Iterable, Mapping, Optional

from aiohttp import web

import pjrpc
from pjrpc.server import specs, utils
from pjrpc.server.dispatcher import AsyncExecutor, AsyncMiddlewareType, JSONEncoder, MethodRegistry


def is_aiohttp_request(idx: int, name: str, annotation: Optional[type[Any]], default: Optional[Any]) -> bool:
    if annotation is None:
        return False

    return inspect.isclass(annotation) and issubclass(annotation, web.Request)


AioHttpDispatcher = pjrpc.server.AsyncDispatcher[web.Request]


[docs]class Application: """ `aiohttp <https://aiohttp.readthedocs.io/en/stable/web.html>`_ based JSON-RPC server. :param prefix: JSON-RPC handler base path :param http_app: aiohttp application instance :param status_by_error: a function returns http status code by json-rpc error codes, 200 for all errors by default """ def __init__( self, prefix: str = '', http_app: Optional[web.Application] = None, status_by_error: Callable[[tuple[int, ...]], int] = lambda codes: 200, executor: Optional[AsyncExecutor] = None, json_loader: Callable[..., Any] = json.loads, json_dumper: Callable[..., str] = json.dumps, json_encoder: type[JSONEncoder] = JSONEncoder, json_decoder: Optional[type[json.JSONDecoder]] = None, middlewares: Iterable[AsyncMiddlewareType[web.Request]] = (), max_batch_size: Optional[int] = None, ): self._prefix = prefix.rstrip('/') self._http_app = http_app or web.Application() self._status_by_error = status_by_error self._executor: Optional[AsyncExecutor] = executor self._json_loader: Callable[..., Any] = json_loader self._json_dumper: Callable[..., str] = json_dumper self._json_encoder: type[JSONEncoder] = json_encoder self._json_decoder: Optional[type[json.JSONDecoder]] = json_decoder self._middlewares: Iterable[AsyncMiddlewareType[web.Request]] = middlewares self._max_batch_size: Optional[int] = max_batch_size self._endpoints: dict[str, AioHttpDispatcher] = {} self._subapps: dict[str, Application] = {} @property def prefix(self) -> str: return self._prefix @property def http_app(self) -> web.Application: """ aiohttp application. """ return self._http_app @property def endpoints(self) -> Mapping[str, AioHttpDispatcher]: """ JSON-RPC application registered endpoints. """ return self._endpoints
[docs] def add_methods(self, registry: MethodRegistry, endpoint: str = '') -> 'Application': """ Adds methods to the provided endpoint. :param registry: methods registry :param endpoint: endpoint path """ dispatcher = self._get_endpoint(endpoint) dispatcher.add_methods(registry) return self
[docs] def add_subapp(self, prefix: str, subapp: 'Application') -> None: """ Adds sub-application accessible under provided prefix. :param prefix: path under which sub-application is accessed. :param subapp: sub-application instance """ prefix = prefix.rstrip('/') if not prefix: raise ValueError("prefix cannot be empty") for dispatcher in subapp.endpoints.values(): dispatcher.add_middlewares(*self._middlewares, before=True) self._http_app.add_subapp(utils.join_path(self._prefix, prefix), subapp.http_app) self._subapps[prefix] = subapp
[docs] def add_spec(self, spec: specs.Specification, endpoint: str = '', path: str = '') -> None: """ Adds JSON-RPC specification of the provided endpoint to the provided path. :param spec: JSON-RPC specification :param endpoint: specification endpoint :param path: path under witch the specification will be accessible. """ self._http_app.router.add_get( utils.join_path(self._prefix, endpoint, path), ft.partial(self._get_spec, endpoint=endpoint, spec=spec, path=path), )
[docs] def add_spec_ui(self, path: str, ui: specs.BaseUI, spec_url: str) -> None: """ Adds JSON-RPC specification ui. :param path: path under which ui will be accessible. :param ui: specification ui instance :param spec_url: specification url """ ui_app = web.Application() ui_app.router.add_get('/', ft.partial(self._ui_index_page, ui=ui, spec_url=spec_url)) ui_app.router.add_get('/index.html', ft.partial(self._ui_index_page, ui=ui, spec_url=spec_url)) ui_app.router.add_static('/', ui.get_static_folder()) self._http_app.add_subapp(utils.join_path(self._prefix, path), ui_app)
[docs] def generate_spec(self, spec: specs.Specification, base_path: str = '', endpoint: str = '') -> dict[str, Any]: """ Generates JSON-RPC specification of the provided endpoint. :param spec: JSON-RPC specification :param base_path: specification base path :param endpoint: endpoint the specification is generated for """ app_endpoints = self._endpoints for prefix, subapp in self._subapps.items(): for subprefix, dispatcher in subapp.endpoints.items(): app_endpoints[utils.join_path(prefix, subapp._prefix, subprefix)] = dispatcher methods = { utils.remove_prefix(dispatcher_endpoint, endpoint): dispatcher.registry.values() for dispatcher_endpoint, dispatcher in app_endpoints.items() if dispatcher_endpoint.startswith(endpoint) } return spec.generate( root_endpoint=utils.join_path(base_path, endpoint), methods=methods, )
async def _ui_index_page(self, request: web.Request, ui: specs.BaseUI, spec_url: str) -> web.Response: return web.Response(text=ui.get_index_page(spec_url), content_type='text/html') def _get_endpoint(self, endpoint: str) -> AioHttpDispatcher: endpoint = endpoint.rstrip('/') if endpoint not in self._endpoints: self._endpoints[endpoint] = dispatcher = AioHttpDispatcher( executor=self._executor, json_loader=self._json_loader, json_dumper=self._json_dumper, json_encoder=self._json_encoder, json_decoder=self._json_decoder, middlewares=self._middlewares, max_batch_size=self._max_batch_size, ) self._http_app.router.add_post( utils.join_path(self._prefix, endpoint), ft.partial(self._rpc_handle, dispatcher=dispatcher), ) else: dispatcher = self._endpoints[endpoint] return dispatcher async def _get_spec( self, request: web.Request, endpoint: str, spec: specs.Specification, path: str, ) -> web.Response: base_path = utils.remove_suffix(request.path, suffix=utils.join_path(endpoint, path)) schema = self.generate_spec(base_path=base_path, endpoint=endpoint.rstrip('/'), spec=spec) return web.json_response(text=self._json_dumper(schema, cls=self._json_encoder)) async def _rpc_handle(self, http_request: web.Request, dispatcher: AioHttpDispatcher) -> web.Response: """ Handles JSON-RPC request. :param http_request: :py:class:`aiohttp.web.Response` :returns: :py:class:`aiohttp.web.Request` """ if http_request.content_type not in pjrpc.common.REQUEST_CONTENT_TYPES: raise web.HTTPUnsupportedMediaType() try: request_text = await http_request.text() except UnicodeDecodeError as e: raise web.HTTPBadRequest() from e response = await dispatcher.dispatch(request_text, context=http_request) if response is None: return web.Response() else: response_text, error_codes = response return web.json_response(status=self._status_by_error(error_codes), text=response_text)