import logging
from typing import Any, Dict, Optional, Union
import kombu.mixins
import pjrpc
from pjrpc.client import AbstractClient
from pjrpc.common import UNSET, MaybeSet, UnsetType
logger = logging.getLogger(__package__)
[docs]class Client(AbstractClient):
"""
`kombu <https://aio-pika.readthedocs.io/en/latest/>`_ based JSON-RPC client.
Note: the client is not thread-safe.
:param broker_url: broker connection url
:param conn_args: broker connection arguments.
:param queue_name: queue name to publish requests to
:param exchange_name: exchange to publish requests to. If ``None`` default exchange is used
:param exchange_args: exchange arguments
:param routing_key: reply message routing key. If ``None`` queue name is used
:param result_queue_name: result queue name. If ``None`` random exclusive queue is declared for each request
:param conn_args: additional connection arguments
:param kwargs: parameters to be passed to :py:class:`pjrpc.client.AbstractClient`
"""
def __init__(
self,
broker_url: str,
queue_name: Optional[str] = None,
conn_args: Optional[Dict[str, Any]] = None,
exchange_name: Optional[str] = None,
exchange_args: Optional[Dict[str, Any]] = None,
routing_key: Optional[str] = None,
result_queue_name: Optional[str] = None,
result_queue_args: Optional[Dict[str, Any]] = None,
**kwargs: Any,
):
assert queue_name or routing_key, "queue_name or routing_key must be provided"
super().__init__(**kwargs)
self._connection = kombu.Connection(broker_url, **(conn_args or {}))
self._routing_key = routing_key or queue_name
self._result_queue = None
self._result_queue_args = result_queue_args
self._exchange = None
self._exchange_args = exchange_args
if exchange_name:
self._exchange = kombu.Exchange(exchange_name, **(exchange_args or {}))
if result_queue_name:
self._result_queue = kombu.Queue(result_queue_name, **(result_queue_args or {}))
[docs] def close(self) -> None:
"""
Closes the current broker connection.
"""
self._connection.close()
def _request(self, request_text: str, is_notification: bool = False, **kwargs: Any) -> Optional[str]:
"""
Sends a JSON-RPC request.
:param request_text: request text
:param is_notification: is the request a notification
:param kwargs: publish additional arguments
:returns: response text
"""
if is_notification:
with kombu.Producer(self._connection) as producer:
producer.publish(
request_text,
exchange=self._exchange or '',
routing_key=self._routing_key,
content_type=pjrpc.common.DEFAULT_CONTENT_TYPE,
**kwargs,
)
return None
request_id = kombu.uuid()
result_queue = self._result_queue or kombu.Queue(
exclusive=True, name=request_id, **(self._result_queue_args or {}),
)
with kombu.Producer(self._connection) as producer:
result_queue.declare(channel=self._connection.default_channel)
producer.publish(
request_text,
exchange=self._exchange or '',
routing_key=self._routing_key,
reply_to=result_queue.name,
correlation_id=request_id,
content_type=pjrpc.common.DEFAULT_CONTENT_TYPE,
**kwargs,
)
response: MaybeSet[Union[None, str, Exception]] = UNSET
def on_response(message: kombu.Message) -> None:
nonlocal response
try:
if message.properties.get('correlation_id') != request_id:
logger.warning("unexpected message received: %r", message)
return
if message.content_type not in pjrpc.common.RESPONSE_CONTENT_TYPES:
raise pjrpc.exc.DeserializationError(f"unexpected response content type: {message.content_type}")
else:
response = message.body
except Exception as e:
response = e
with kombu.Consumer(self._connection, on_message=on_response, queues=result_queue, no_ack=True):
while response is UNSET:
self._connection.drain_events(timeout=kwargs.get('timeout', None))
if isinstance(response, Exception):
raise response
assert not isinstance(response, UnsetType), "response is unset"
return response