Source code for networkapi.queue_tools.rabbitmq.queue_manager

# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import types

from kombu import Connection
from kombu import Exchange
from kombu import Producer
from kombu import Queue

from networkapi.settings import BROKER_CONNECT_TIMEOUT
from networkapi.settings import BROKER_DESTINATION
from networkapi.settings import BROKER_URL


[docs]class QueueManager(object): """Object to manager objects sent to queue.""" log = logging.getLogger(__name__) def __init__(self, broker_vhost='/', queue_name=None, exchange_name=None, queue_type='direct', routing_key=None): """ Create a new instance QueueManager and initialize with parameters of destination and broker uri from settings or set default settings. """ self._msgs = [] self._broker_timeout = BROKER_CONNECT_TIMEOUT self._queue_name = queue_name self._exchange_name = exchange_name if exchange_name is not None \ else BROKER_DESTINATION self._routing_key = routing_key if routing_key is not None else \ BROKER_DESTINATION self._queue_type = queue_type self.broker = 'amqp://{}/{}'.format( BROKER_URL, broker_vhost, )
[docs] def append(self, dict_obj): """ Appended in list object a dictionary that represents the body of the message that will be sent to queue. :param dict_obj: Dict object """ try: if not isinstance(dict_obj, types.DictType): raise ValueError( u'QueueManagerError - The type must be a instance of Dict') self._msgs.append(dict_obj) self.log.debug('dict_obj:%s', dict_obj) except Exception, e: self.log.error( u'QueueManagerError - Error on appending objects to queue.') self.log.error(e) raise Exception( 'QueueManagerError - Error on appending objects to queue.')
[docs] def send(self): try: # Connection conn = Connection(self.broker) # Channel channel = conn.channel() # Exchange task_exchange = Exchange(self._exchange_name, type=self._queue_type) # Queues if self._queue_name: queue = Queue(name=self._queue_name, channel=channel, exchange=task_exchange, routing_key=self._routing_key) queue.declare() # Producer producer = Producer(exchange=task_exchange, channel=channel, routing_key=self._routing_key) # Send message for message in self._msgs: serialized_message = json.dumps(message, ensure_ascii=False) producer.publish(serialized_message) conn.close() except Exception, e: self.log.error( u'QueueManagerError - Error on sending objects from queue.') self.log.debug(e) raise Exception( 'QueueManagerError - Error on sending objects to queue.')