Source code for pjrpc.client.client

import abc
import contextlib as cl
import functools as ft
import json
import logging
from typing import Any, AsyncGenerator, Awaitable, Callable, Generator, Iterable, Mapping, Optional, Protocol, TypeVar

from pjrpc import common
from pjrpc.common import UNSET, AbstractRequest, AbstractResponse, BatchRequest, BatchResponse, MaybeSet, Request
from pjrpc.common import Response, generators
from pjrpc.common.typedefs import JsonRpcRequestIdT, JsonT

from . import exceptions

logger = logging.getLogger(__package__)


ReturnT = TypeVar('ReturnT', covariant=True)


class ProxyCall(Protocol[ReturnT]):
    def __call__(self, *args: JsonT, **kwargs: JsonT) -> ReturnT: pass


[docs]class Batch: """ Batch object. Provides syntactic sugar to send batch requests. """
[docs] class Proxy: """ Proxy object. Provides syntactic sugar to make method call using dot notation. :param batch: batch object """ def __init__(self, batch: 'Batch'): self._batch = batch def __getattr__(self, attr: str) -> ProxyCall[JsonT]: return ft.partial(self._batch.call, attr)
def __init__( self, id_gen_impl: Callable[..., Generator[JsonRpcRequestIdT, None, None]], ): self._id_gen = id_gen_impl() self._requests: list[Request] = [] self._response: MaybeSet[Optional[BatchResponse]] = UNSET @property def proxy(self) -> Proxy: """ Client proxy object. """ return Batch.Proxy(self) @property def requests(self) -> list[Request]: """ Batch requests. """ return self._requests def __call__(self, method: str, *args: JsonT, **kwargs: JsonT) -> None: """ Makes a JSON-RPC call. :param method: method name :param args: method positional arguments :param kwargs: method named arguments :returns: response result """ assert not (args and kwargs), "positional and keyword arguments are mutually exclusive" self._requests.append(Request(id=next(self._id_gen), method=method, params=args or kwargs)) def send(self, request: Request) -> None: self._requests.append(request)
[docs] def notify(self, method: str, *args: JsonT, **kwargs: JsonT) -> None: """ Makes a notification request :param method: method name :param args: method positional arguments :param kwargs: method named arguments """ assert not (args and kwargs), "positional and keyword arguments are mutually exclusive" self._requests.append(Request(id=None, method=method, params=args or kwargs))
[docs] def call(self, method: str, *args: JsonT, **kwargs: JsonT) -> None: """ Makes a JSON-RPC call. :param method: method name :param args: method positional arguments :param kwargs: method named arguments :returns: response result """ assert not (args and kwargs), "positional and keyword arguments are mutually exclusive" self._requests.append(Request(id=next(self._id_gen), method=method, params=args or kwargs))
[docs] def set_response(self, response: Optional[BatchResponse]) -> None: """ Sets batch response """ self._response = response
[docs] def get_response(self) -> Optional[BatchResponse]: """ Returns a batch response. """ if self._response is UNSET: raise RuntimeError("batch reqeust is not sent yet") return self._response
[docs] def get_results(self) -> Iterable[Any]: """ Returns the batch results preserving requests order (skipping notification request). """ if self._response is UNSET: raise RuntimeError("batch reqeust is not sent yet") if self._response is None: return [] if self._response.is_error: raise self._response.unwrap_error() response_map = {response.id: response for response in self._response} results: list[Any] = [] for request in self._requests: if request.id is not None: if (response := response_map.get(request.id)) is None: raise exceptions.IdentityError(f"response '{request.id}' is missing") results.append(response.unwrap_result()) return results
MiddlewareHandler = Callable[[AbstractRequest, Mapping[str, Any]], Optional[AbstractResponse]]
[docs]class Middleware(Protocol): """ JSON-RPC client middleware. """ def __call__( self, request: AbstractRequest, request_kwargs: Mapping[str, Any], /, handler: MiddlewareHandler, ) -> Optional[AbstractResponse]: pass
[docs]class AbstractClient(abc.ABC): """ Abstract synchronous JSON-RPC client. :param id_gen_impl: identifier generator :param error_cls: JSON-RPC error base class :param json_loader: json loader :param json_dumper: json dumper :param json_encoder: json encoder :param json_decoder: json decoder :param middlewares: client reqeust middlewares """
[docs] class Proxy: """ Proxy object. Provides syntactic sugar to make method call using dot notation. :param client: JSON-RPC client instance """ def __init__(self, client: 'AbstractClient'): self._client = client def __getattr__(self, attr: str) -> ProxyCall[JsonT]: return ft.partial(self._client.call, attr)
@property def proxy(self) -> Proxy: """ Client proxy object. """ return AbstractClient.Proxy(self)
[docs] @cl.contextmanager def batch(self) -> Generator[Batch, None, None]: """ Client batch wrapper. """ batch = Batch(self._id_gen_impl) yield batch response = self._send(BatchRequest(*batch.requests), {}) assert isinstance(response, (BatchResponse, type(None))), "unexpected response type" batch.set_response(response)
def __init__( self, *, id_gen_impl: Callable[..., Generator[JsonRpcRequestIdT, None, None]] = generators.sequential, error_cls: type[exceptions.JsonRpcError] = exceptions.JsonRpcError, json_loader: Callable[..., Any] = json.loads, json_dumper: Callable[..., str] = json.dumps, json_encoder: type[common.JSONEncoder] = common.JSONEncoder, json_decoder: Optional[json.JSONDecoder] = None, middlewares: Iterable[Middleware] = (), request_content_type: str = common.DEFAULT_CONTENT_TYPE, response_content_types: Iterable[str] = common.RESPONSE_CONTENT_TYPES, ): self._id_gen_impl = id_gen_impl self._error_cls = error_cls self._json_loader = json_loader self._json_dumper = json_dumper self._json_encoder = json_encoder self._json_decoder = json_decoder self._request_content_type = request_content_type self._response_content_types = set(response_content_types) send = self._send_request for middleware in reversed(list(middlewares)): send = ft.partial(middleware, handler=send) self._send = send def __call__(self, method: str, *args: JsonT, **kwargs: JsonT) -> JsonT: """ Makes a JSON-RPC call. :param method: method name :param args: method positional arguments :param kwargs: method named arguments :returns: response result """ return self.call(method, *args, **kwargs) @abc.abstractmethod def _request( self, request_text: str, is_notification: bool, request_kwargs: Mapping[str, Any], ) -> Optional[str]: """ Makes a JSON-RPC request. :param request_text: request text representation :param is_notification: is the request a notification :param request_kwargs: additional client request argument :returns: response text representation """ def _send_request( self, request: AbstractRequest, request_kwargs: Mapping[str, Any], ) -> Optional[AbstractResponse]: """ Sends a JSON-RPC request. :param request: request instance :param request_kwargs: additional client request argument :returns: response instance or None if the request is a notification """ response_cls: type[AbstractResponse] = BatchResponse if isinstance(request, BatchRequest) else Response request_text = self._json_dumper(request, cls=self._json_encoder) response_text = self._request(request_text, request.is_notification, request_kwargs) if not request.is_notification: response = response_cls.from_json( self._json_loader(response_text, cls=self._json_decoder), error_cls=self._error_cls, ) else: if response_text: raise exceptions.ProtocolError("unexpected response") response = None return response
[docs] def notify(self, method: str, *args: JsonT, **kwargs: JsonT) -> None: """ Makes a notification request :param method: method name :param args: method positional arguments :param kwargs: method named arguments """ assert not (args and kwargs), "positional and keyword arguments are mutually exclusive" request = Request( id=None, method=method, params=args or kwargs, ) self._send(request, {})
[docs] def call(self, method: str, *args: JsonT, **kwargs: JsonT) -> JsonT: """ Makes a JSON-RPC call. :param method: method name :param args: method positional arguments :param kwargs: method named arguments :returns: response result """ assert not (args and kwargs), "positional and keyword arguments are mutually exclusive" request = Request( id=next(self._id_gen_impl()), method=method, params=args or kwargs, ) response = self._send(request, {}) assert response is not None, "response is not set" return response.unwrap_result()
AsyncMiddlewareHandler = Callable[[AbstractRequest, Mapping[str, Any]], Awaitable[Optional[AbstractResponse]]]
[docs]class AsyncMiddleware(Protocol): """ Asynchronous JSON-RPC client middleware. """ async def __call__( self, request: AbstractRequest, request_kwargs: Mapping[str, Any], /, handler: AsyncMiddlewareHandler, ) -> Optional[AbstractResponse]: pass
[docs]class AbstractAsyncClient(abc.ABC): """ Abstract asynchronous JSON-RPC client. :param id_gen_impl: identifier generator :param error_cls: JSON-RPC error base class :param json_loader: json loader :param json_dumper: json dumper :param json_encoder: json encoder :param json_decoder: json decoder :param middlewares: client reqeust middlewares """
[docs] class Proxy: """ Proxy object. Provides syntactic sugar to make method call using dot notation. :param client: JSON-RPC client instance """ def __init__(self, client: 'AbstractAsyncClient'): self._client = client def __getattr__(self, attr: str) -> Callable[..., Awaitable[JsonT]]: return ft.partial(self._client.call, attr)
@property def proxy(self) -> Proxy: """ Client proxy object. """ return AbstractAsyncClient.Proxy(self)
[docs] @cl.asynccontextmanager async def batch(self) -> AsyncGenerator[Batch, None]: """ Client batch wrapper. """ batch = Batch(self._id_gen_impl) yield batch response = await self._send(BatchRequest(*batch.requests), {}) assert isinstance(response, (BatchResponse, type(None))), "unexpected response type" batch.set_response(response)
def __init__( self, *, id_gen_impl: Callable[..., Generator[JsonRpcRequestIdT, None, None]] = generators.sequential, error_cls: type[exceptions.JsonRpcError] = exceptions.JsonRpcError, json_loader: Callable[..., Any] = json.loads, json_dumper: Callable[..., str] = json.dumps, json_encoder: type[common.JSONEncoder] = common.JSONEncoder, json_decoder: Optional[json.JSONDecoder] = None, middlewares: Iterable[AsyncMiddleware] = (), request_content_type: str = common.DEFAULT_CONTENT_TYPE, response_content_types: Iterable[str] = common.RESPONSE_CONTENT_TYPES, ): self._id_gen_impl = id_gen_impl self._error_cls = error_cls self._json_loader = json_loader self._json_dumper = json_dumper self._json_encoder = json_encoder self._json_decoder = json_decoder self._request_content_type = request_content_type self._response_content_types = set(response_content_types) send = self._send_request for middleware in reversed(list(middlewares)): send = ft.partial(middleware, handler=send) self._send = send def __call__(self, method: str, *args: JsonT, **kwargs: JsonT) -> Awaitable[JsonT]: """ Makes a JSON-RPC call. :param method: method name :param args: method positional arguments :param kwargs: method named arguments :returns: response result """ return self.call(method, *args, **kwargs) @abc.abstractmethod async def _request( self, request_text: str, is_notification: bool, request_kwargs: Mapping[str, Any], ) -> Optional[str]: """ Makes a JSON-RPC request. :param request_text: request text representation :param is_notification: is the request a notification :param request_kwargs: additional client request argument :returns: response text representation or None if the request is a notification """ async def _send_request( self, request: AbstractRequest, request_kwargs: Mapping[str, Any], ) -> Optional[AbstractResponse]: """ Sends a JSON-RPC request. :param request: request instance :param request_kwargs: additional client request argument :returns: response instance or None if the request is a notification """ response_cls: type[AbstractResponse] = BatchResponse if isinstance(request, BatchRequest) else Response request_text = self._json_dumper(request, cls=self._json_encoder) response_text = await self._request(request_text, request.is_notification, request_kwargs) if not request.is_notification: response = response_cls.from_json( self._json_loader(response_text, cls=self._json_decoder), error_cls=self._error_cls, ) else: if response_text: raise exceptions.ProtocolError("unexpected response") response = None return response
[docs] async def notify(self, method: str, *args: JsonT, **kwargs: JsonT) -> None: """ Makes a notification request :param method: method name :param args: method positional arguments :param kwargs: method named arguments """ assert not (args and kwargs), "positional and keyword arguments are mutually exclusive" request = Request( id=None, method=method, params=args or kwargs, ) await self._send(request, {})
[docs] async def call(self, method: str, *args: JsonT, **kwargs: JsonT) -> JsonT: """ Makes a JSON-RPC call. :param method: method name :param args: method positional arguments :param kwargs: method named arguments :returns: response result """ assert not (args and kwargs), "positional and keyword arguments are mutually exclusive" request = Request( id=next(self._id_gen_impl()), method=method, params=args or kwargs, ) response = await self._send(request, {}) assert response is not None, "response is empty" return response.unwrap_result()