Source code for pjrpc.server.integration.kombu

"""
kombu JSON-RPC server integration.
"""

import logging
from typing import Any, Dict, List, Optional, Type

import kombu.mixins
from kombu.transport import virtual

import pjrpc.server

logger = logging.getLogger(__package__)


[docs]class Executor(kombu.mixins.ConsumerProducerMixin): """ `kombu <http://kombu.readthedocs.org/>`_ based JSON-RPC server. :param broker_url: broker connection url :param queue_name: requests queue name :param conn_args: additional connection arguments :param queue_args: queue arguments :param publish_args: message publish additional arguments :param prefetch_count: worker prefetch count :param kwargs: dispatcher additional arguments """ def __init__( self, broker_url: str, queue_name: str, conn_args: Optional[Dict[str, Any]] = None, queue_args: Optional[Dict[str, Any]] = None, publish_args: Optional[Dict[str, Any]] = None, prefetch_count: int = 0, **kwargs: Any, ): self.connection = kombu.Connection(broker_url, **(conn_args or {})) self._rpc_queue = kombu.Queue(queue_name, **(queue_args or {})) self._prefetch_count = prefetch_count self._publish_args = publish_args self._dispatcher = pjrpc.server.Dispatcher(**kwargs) @property def dispatcher(self) -> pjrpc.server.Dispatcher: """ JSON-RPC method dispatcher. """ return self._dispatcher def get_consumers(self, Consumer: Type[kombu.Consumer], channel: virtual.AbstractChannel) -> List[kombu.Consumer]: return [ Consumer( channel=channel, queues=[self._rpc_queue], on_message=self._rpc_handle, accept={pjrpc.common.DEFAULT_CONTENT_TYPE}, prefetch_count=self._prefetch_count, ), ] def _rpc_handle(self, message: kombu.Message) -> None: """ Handles JSON-RPC request. :param message: kombu message :py:class:`kombu.message.Message` """ try: reply_to = message.properties.get('reply_to') response_text = self._dispatcher.dispatch(message.body, context=message) if response_text is not None: if reply_to is None: logger.warning("property 'reply_to' is missing") else: self.producer.publish( response_text, routing_key=reply_to, correlation_id=message.properties.get('correlation_id'), content_type=pjrpc.common.DEFAULT_CONTENT_TYPE, content_encoding='utf8', **(self._publish_args or {}), ) message.ack() except Exception as e: logger.exception("jsonrpc request handling error: %s", e)