import json
import typing
from ssl import SSLContext
from typing import Any, Callable, Generator, Iterable, Mapping, Optional, TypedDict, Union
from aiohttp import BasicAuth, Fingerprint, client
from aiohttp.typedefs import LooseCookies, LooseHeaders, StrOrURL
from multidict import MultiDict
from pjrpc.client import AbstractAsyncClient, AsyncMiddleware, exceptions
from pjrpc.common import AbstractRequest, AbstractResponse, BatchRequest, BatchResponse, JSONEncoder, Request, Response
from pjrpc.common import generators
from pjrpc.common.typedefs import JsonRpcRequestIdT
[docs]class RequestArgs(TypedDict, total=False):
cookies: LooseCookies
headers: LooseHeaders
skip_auto_headers: Iterable[str]
auth: BasicAuth
allow_redirects: bool
max_redirects: int
compress: Union[str, bool]
chunked: bool
expect100: bool
read_until_eof: bool
proxy: StrOrURL
proxy_auth: BasicAuth
timeout: "Union[client.ClientTimeout, None]"
ssl: Union[SSLContext, bool, Fingerprint]
server_hostname: str
proxy_headers: LooseHeaders
trace_request_ctx: Mapping[str, Any]
read_bufsize: int
auto_decompress: bool
max_line_size: int
max_field_size: int
[docs]class Client(AbstractAsyncClient):
"""
`Aiohttp <https://aiohttp.readthedocs.io/en/stable/client.html>`_ library client backend.
:param url: url to be used as JSON-RPC endpoint
:param session: custom session to be used instead of :py:class:`aiohttp.ClientSession`
:param raise_for_status: should `ClientResponse.raise_for_status()` be called automatically
:param id_gen_impl: identifier generator
:param error_cls: JSON-RPC error base class
:param json_loader: json loader
:param json_dumper: json dumper
:param json_encoder: json encoder
:param json_decoder: json decoder
"""
def __init__(
self,
url: str,
*,
session: Optional[client.ClientSession] = None,
raise_for_status: bool = True,
id_gen_impl: Callable[..., Generator[JsonRpcRequestIdT, None, None]] = generators.sequential,
error_cls: type[exceptions.JsonRpcError] = exceptions.JsonRpcError,
json_loader: Callable[..., Any] = json.loads,
json_dumper: Callable[..., str] = json.dumps,
json_encoder: type[JSONEncoder] = JSONEncoder,
json_decoder: Optional[json.JSONDecoder] = None,
middlewares: Iterable[AsyncMiddleware] = (),
):
super().__init__(
id_gen_impl=id_gen_impl,
error_cls=error_cls,
json_loader=json_loader,
json_dumper=json_dumper,
json_encoder=json_encoder,
json_decoder=json_decoder,
middlewares=middlewares,
)
self._endpoint = url
self._session = session or client.ClientSession()
self._owned_session = session is None
self._raise_for_status = raise_for_status
@typing.overload
async def send(self, request: Request, **kwargs: Any) -> Optional[Response]:
...
@typing.overload
async def send(self, request: BatchRequest, **kwargs: Any) -> Optional[BatchResponse]:
...
[docs] async def send(self, request: AbstractRequest, **kwargs: Any) -> Optional[AbstractResponse]:
"""
Sends a JSON-RPC request.
:param request: request instance
:param kwargs: additional client request argument
:returns: response instance or None if the request is a notification
"""
return await self._send(request, kwargs)
async def _request(
self,
request_text: str,
is_notification: bool,
request_kwargs: Mapping[str, Any],
) -> Optional[str]:
"""
Makes a JSON-RPC request.
:param request_text: request text representation
:param is_notification: is the request a notification
:param request_kwargs: additional client request argument
:returns: response text representation or None if the request is a notification
"""
request_kwargs = typing.cast(RequestArgs, request_kwargs)
request_kwargs['headers'] = headers = MultiDict(request_kwargs.get('headers', {}))
headers['Content-Type'] = self._request_content_type
async with self._session.post(self._endpoint, data=request_text, **request_kwargs) as resp:
if self._raise_for_status:
resp.raise_for_status()
response_text = await resp.text()
if is_notification:
return None
content_type = resp.headers.get('Content-Type', '')
if response_text and content_type.split(';')[0] not in self._response_content_types:
raise exceptions.DeserializationError(f"unexpected response content type: {content_type}")
return response_text
[docs] async def close(self) -> None:
"""
Closes current http session.
"""
if self._owned_session:
await self._session.close()
async def __aenter__(self) -> 'Client':
if self._owned_session:
await self._session.__aenter__()
return self
async def __aexit__(self, *args: Any) -> None:
if self._owned_session:
await self._session.__aexit__(*args)