Source code for pjrpc.server.integration.aio_pika
import logging
from typing import Any, Dict, Optional
import aio_pika
from yarl import URL
import pjrpc.server
logger = logging.getLogger(__package__)
[docs]class Executor:
"""
`aio_pika <https://aio-pika.readthedocs.io/en/latest/>`_ based JSON-RPC server.
:param broker_url: broker connection url
:param queue_name: requests queue name
:param prefetch_count: worker prefetch count
:param kwargs: dispatcher additional arguments
"""
def __init__(self, broker_url: URL, queue_name: str, prefetch_count: int = 0, **kwargs: Any):
self._broker_url = broker_url
self._queue_name = queue_name
self._prefetch_count = prefetch_count
self._connection = aio_pika.connection.Connection(broker_url)
self._channel: Optional[aio_pika.abc.AbstractChannel] = None
self._queue: Optional[aio_pika.abc.AbstractQueue] = None
self._consumer_tag: Optional[str] = None
self._dispatcher = pjrpc.server.AsyncDispatcher(**kwargs)
@property
def dispatcher(self) -> pjrpc.server.AsyncDispatcher:
"""
JSON-RPC method dispatcher.
"""
return self._dispatcher
[docs] async def start(self, queue_args: Optional[Dict[str, Any]] = None) -> None:
"""
Starts executor.
:param queue_args: queue arguments
"""
await self._connection.connect()
self._channel = channel = await self._connection.channel()
self._queue = queue = await channel.declare_queue(self._queue_name, **(queue_args or {}))
await channel.set_qos(prefetch_count=self._prefetch_count)
self._consumer_tag = await queue.consume(self._rpc_handle)
[docs] async def shutdown(self) -> None:
"""
Stops executor.
"""
if self._consumer_tag and self._queue:
await self._queue.cancel(self._consumer_tag)
if self._channel:
await self._channel.close()
await self._connection.close()
async def _rpc_handle(self, message: aio_pika.abc.AbstractIncomingMessage) -> None:
"""
Handles JSON-RPC request.
:param message: incoming message
"""
try:
reply_to = message.reply_to
response_text = await self._dispatcher.dispatch(message.body.decode(), context=message)
if response_text is not None:
if reply_to is None:
logger.warning("property 'reply_to' is missing")
else:
async with self._connection.channel() as channel:
await channel.default_exchange.publish(
aio_pika.Message(
body=response_text.encode(),
reply_to=reply_to,
correlation_id=message.correlation_id,
content_type=pjrpc.common.DEFAULT_CONTENT_TYPE,
),
routing_key=reply_to,
)
await message.ack()
except Exception as e:
logger.exception("jsonrpc request handling error: %s", e)