"""
aiohttp JSON-RPC server integration.
"""
import functools as ft
import json
from typing import Any, Dict, Mapping, Optional
import aiohttp.web
from aiohttp import web
import pjrpc
from pjrpc.server import specs, utils
[docs]class Application:
"""
`aiohttp <https://aiohttp.readthedocs.io/en/stable/web.html>`_ based JSON-RPC server.
:param path: JSON-RPC handler base path
:param app_args: arguments to be passed to :py:class:`aiohttp.web.Application`
:param kwargs: arguments to be passed to the dispatcher :py:class:`pjrpc.server.AsyncDispatcher`
"""
def __init__(
self,
path: str = '',
spec: Optional[specs.Specification] = None,
app: Optional[web.Application] = None,
**kwargs: Any,
):
self._path = path.rstrip('/')
self._spec = spec
self._app = app or web.Application()
self._dispatcher = pjrpc.server.AsyncDispatcher(**kwargs)
self._endpoints: Dict[str, pjrpc.server.AsyncDispatcher] = {'': self._dispatcher}
self._app.router.add_post(self._path, ft.partial(self._rpc_handle, dispatcher=self._dispatcher))
if self._spec:
self._app.router.add_get(utils.join_path(self._path, self._spec.path), self._generate_spec)
if self._spec.ui and self._spec.ui_path:
ui_app = web.Application()
ui_app.router.add_get('/', self._ui_index_page)
ui_app.router.add_get('/index.html', self._ui_index_page)
ui_app.router.add_static('/', self._spec.ui.get_static_folder())
self._app.add_subapp(utils.join_path(self._path, self._spec.ui_path), ui_app)
@property
def app(self) -> web.Application:
"""
aiohttp application.
"""
return self._app
@property
def dispatcher(self) -> pjrpc.server.AsyncDispatcher:
"""
JSON-RPC method dispatcher.
"""
return self._dispatcher
@property
def endpoints(self) -> Mapping[str, pjrpc.server.AsyncDispatcher]:
"""
JSON-RPC application registered endpoints.
"""
return self._endpoints
[docs] def add_endpoint(
self,
prefix: str,
subapp: Optional[aiohttp.web.Application] = None,
**kwargs: Any,
) -> pjrpc.server.AsyncDispatcher:
"""
Adds additional endpoint.
:param prefix: endpoint prefix
:param subapp: aiohttp subapp the endpoint will be served on
:param kwargs: arguments to be passed to the dispatcher :py:class:`pjrpc.server.Dispatcher`
:return: dispatcher
"""
prefix = prefix.rstrip('/')
dispatcher = pjrpc.server.AsyncDispatcher(**kwargs)
self._endpoints[prefix] = dispatcher
if subapp:
subapp.router.add_post('', ft.partial(self._rpc_handle, dispatcher=dispatcher))
self._app.add_subapp(utils.join_path(self._path, prefix), subapp)
else:
self._app.router.add_post(
utils.join_path(self._path, prefix),
ft.partial(self._rpc_handle, dispatcher=dispatcher),
)
return dispatcher
async def _generate_spec(self, request: web.Request) -> web.Response:
assert self._spec is not None, "spec is not set"
endpoint_path = utils.remove_suffix(request.path, suffix=self._spec.path)
methods = {path: dispatcher.registry.values() for path, dispatcher in self._endpoints.items()}
schema = self._spec.schema(path=endpoint_path, methods_map=methods)
return web.json_response(text=json.dumps(schema, cls=specs.JSONEncoder))
async def _ui_index_page(self, request: web.Request) -> web.Response:
assert self._spec is not None and self._spec.ui is not None, "spec is not set"
app_path = request.path.rsplit(self._spec.ui_path, maxsplit=1)[0]
spec_full_path = utils.join_path(app_path, self._spec.path)
return web.Response(
text=self._spec.ui.get_index_page(spec_url=spec_full_path),
content_type='text/html',
)
async def _rpc_handle(self, http_request: web.Request, dispatcher: pjrpc.server.AsyncDispatcher) -> web.Response:
"""
Handles JSON-RPC request.
:param http_request: :py:class:`aiohttp.web.Response`
:returns: :py:class:`aiohttp.web.Request`
"""
if http_request.content_type not in pjrpc.common.REQUEST_CONTENT_TYPES:
raise web.HTTPUnsupportedMediaType()
try:
request_text = await http_request.text()
except UnicodeDecodeError as e:
raise web.HTTPBadRequest() from e
response_text = await dispatcher.dispatch(request_text, context=http_request)
if response_text is None:
return web.Response()
else:
return web.json_response(text=response_text)