=== added file '.pc/0001-rabbit-more-precise-iterconsume-timeout.patch/.timestamp'
=== added file '.pc/0002-rabbit-fix-timeout-timer-when-duration-is-None.patch/.timestamp'
=== added file '.pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/.timestamp'
=== added file '.pc/0004-fix-lp-1362863.patch/.timestamp'
=== added directory '.pc/0005-fix-rabbit-starvation-of-connections-for-reply.patch'
=== added file '.pc/0005-fix-rabbit-starvation-of-connections-for-reply.patch/.timestamp'
=== added directory '.pc/0005-fix-rabbit-starvation-of-connections-for-reply.patch/oslo'
=== added directory '.pc/0005-fix-rabbit-starvation-of-connections-for-reply.patch/oslo/messaging'
=== added directory '.pc/0005-fix-rabbit-starvation-of-connections-for-reply.patch/oslo/messaging/_drivers'
=== added file '.pc/0005-fix-rabbit-starvation-of-connections-for-reply.patch/oslo/messaging/_drivers/amqp.py'
--- .pc/0005-fix-rabbit-starvation-of-connections-for-reply.patch/oslo/messaging/_drivers/amqp.py	1970-01-01 00:00:00 +0000
+++ .pc/0005-fix-rabbit-starvation-of-connections-for-reply.patch/oslo/messaging/_drivers/amqp.py	2015-12-31 04:46:54 +0000
@@ -0,0 +1,342 @@
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+# Copyright 2011 - 2012, Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""
+Shared code between AMQP based openstack.common.rpc implementations.
+
+The code in this module is shared between the rpc implementations based on
+AMQP. Specifically, this includes impl_kombu and impl_qpid.  impl_carrot also
+uses AMQP, but is deprecated and predates this code.
+"""
+
+import collections
+import logging
+import threading
+import uuid
+
+from oslo.config import cfg
+import six
+
+from oslo.messaging._drivers import common as rpc_common
+from oslo.messaging._drivers import pool
+
+# FIXME(markmc): remove this
+_ = lambda s: s
+
+amqp_opts = [
+    cfg.BoolOpt('amqp_durable_queues',
+                default=False,
+                deprecated_name='rabbit_durable_queues',
+                deprecated_group='DEFAULT',
+                help='Use durable queues in amqp.'),
+    cfg.BoolOpt('amqp_auto_delete',
+                default=False,
+                help='Auto-delete queues in amqp.'),
+
+    # FIXME(markmc): this was toplevel in openstack.common.rpc
+    cfg.IntOpt('rpc_conn_pool_size',
+               default=30,
+               help='Size of RPC connection pool.'),
+]
+
+UNIQUE_ID = '_unique_id'
+LOG = logging.getLogger(__name__)
+
+
+class ConnectionPool(pool.Pool):
+    """Class that implements a Pool of Connections."""
+    def __init__(self, conf, connection_cls):
+        self.connection_cls = connection_cls
+        self.conf = conf
+        super(ConnectionPool, self).__init__(self.conf.rpc_conn_pool_size)
+        self.reply_proxy = None
+
+    # TODO(comstud): Timeout connections not used in a while
+    def create(self):
+        LOG.debug(_('Pool creating new connection'))
+        return self.connection_cls(self.conf)
+
+    def empty(self):
+        for item in self.iter_free():
+            item.close()
+        # Force a new connection pool to be created.
+        # Note that this was added due to failing unit test cases. The issue
+        # is the above "while loop" gets all the cached connections from the
+        # pool and closes them, but never returns them to the pool, a pool
+        # leak. The unit tests hang waiting for an item to be returned to the
+        # pool. The unit tests get here via the tearDown() method. In the run
+        # time code, it gets here via cleanup() and only appears in service.py
+        # just before doing a sys.exit(), so cleanup() only happens once and
+        # the leakage is not a problem.
+        self.connection_cls.pool = None
+
+
+_pool_create_sem = threading.Lock()
+
+
+def get_connection_pool(conf, connection_cls):
+    with _pool_create_sem:
+        # Make sure only one thread tries to create the connection pool.
+        if not connection_cls.pool:
+            connection_cls.pool = ConnectionPool(conf, connection_cls)
+    return connection_cls.pool
+
+
+class ConnectionContext(rpc_common.Connection):
+    """The class that is actually returned to the create_connection() caller.
+
+    This is essentially a wrapper around Connection that supports 'with'.
+    It can also return a new Connection, or one from a pool.
+
+    The function will also catch when an instance of this class is to be
+    deleted.  With that we can return Connections to the pool on exceptions
+    and so forth without making the caller be responsible for catching them.
+    If possible the function makes sure to return a connection to the pool.
+    """
+
+    def __init__(self, conf, connection_pool, pooled=True, server_params=None):
+        """Create a new connection, or get one from the pool."""
+        self.connection = None
+        self.conf = conf
+        self.connection_pool = connection_pool
+        if pooled:
+            self.connection = connection_pool.get()
+        else:
+            self.connection = connection_pool.connection_cls(
+                conf,
+                server_params=server_params)
+        self.pooled = pooled
+
+    def __enter__(self):
+        """When with ConnectionContext() is used, return self."""
+        return self
+
+    def _done(self):
+        """If the connection came from a pool, clean it up and put it back.
+        If it did not come from a pool, close it.
+        """
+        if self.connection:
+            if self.pooled:
+                # Reset the connection so it's ready for the next caller
+                # to grab from the pool
+                self.connection.reset()
+                self.connection_pool.put(self.connection)
+            else:
+                try:
+                    self.connection.close()
+                except Exception:
+                    pass
+            self.connection = None
+
+    def __exit__(self, exc_type, exc_value, tb):
+        """End of 'with' statement.  We're done here."""
+        self._done()
+
+    def __del__(self):
+        """Caller is done with this connection.  Make sure we cleaned up."""
+        self._done()
+
+    def close(self):
+        """Caller is done with this connection."""
+        self._done()
+
+    def create_consumer(self, topic, proxy, fanout=False):
+        self.connection.create_consumer(topic, proxy, fanout)
+
+    def create_worker(self, topic, proxy, pool_name):
+        self.connection.create_worker(topic, proxy, pool_name)
+
+    def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
+        self.connection.join_consumer_pool(callback,
+                                           pool_name,
+                                           topic,
+                                           exchange_name)
+
+    def consume_in_thread(self):
+        self.connection.consume_in_thread()
+
+    def __getattr__(self, key):
+        """Proxy all other calls to the Connection instance."""
+        if self.connection:
+            return getattr(self.connection, key)
+        else:
+            raise rpc_common.InvalidRPCConnectionReuse()
+
+
+class ReplyProxy(ConnectionContext):
+    """Connection class for RPC replies / callbacks."""
+    def __init__(self, conf, connection_pool):
+        self._call_waiters = {}
+        self._num_call_waiters = 0
+        self._num_call_waiters_wrn_threshold = 10
+        self._reply_q = 'reply_' + uuid.uuid4().hex
+        super(ReplyProxy, self).__init__(conf, connection_pool, pooled=False)
+        self.declare_direct_consumer(self._reply_q, self._process_data)
+        self.consume_in_thread()
+
+    def _process_data(self, message_data):
+        msg_id = message_data.pop('_msg_id', None)
+        waiter = self._call_waiters.get(msg_id)
+        if not waiter:
+            LOG.warn(_('No calling threads waiting for msg_id : %(msg_id)s'
+                       ', message : %(data)s'), {'msg_id': msg_id,
+                                                 'data': message_data})
+            LOG.warn(_('_call_waiters: %s') % str(self._call_waiters))
+        else:
+            waiter.put(message_data)
+
+    def add_call_waiter(self, waiter, msg_id):
+        self._num_call_waiters += 1
+        if self._num_call_waiters > self._num_call_waiters_wrn_threshold:
+            LOG.warn(_('Number of call waiters is greater than warning '
+                       'threshold: %d. There could be a MulticallProxyWaiter '
+                       'leak.') % self._num_call_waiters_wrn_threshold)
+            self._num_call_waiters_wrn_threshold *= 2
+        self._call_waiters[msg_id] = waiter
+
+    def del_call_waiter(self, msg_id):
+        self._num_call_waiters -= 1
+        del self._call_waiters[msg_id]
+
+    def get_reply_q(self):
+        return self._reply_q
+
+
+def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
+              failure=None, ending=False, log_failure=True):
+    """Sends a reply or an error on the channel signified by msg_id.
+
+    Failure should be a sys.exc_info() tuple.
+
+    """
+    with ConnectionContext(conf, connection_pool) as conn:
+        if failure:
+            failure = rpc_common.serialize_remote_exception(failure,
+                                                            log_failure)
+
+        msg = {'result': reply, 'failure': failure}
+        if ending:
+            msg['ending'] = True
+        _add_unique_id(msg)
+        # If a reply_q exists, add the msg_id to the reply and pass the
+        # reply_q to direct_send() to use it as the response queue.
+        # Otherwise use the msg_id for backward compatibility.
+        if reply_q:
+            msg['_msg_id'] = msg_id
+            conn.direct_send(reply_q, rpc_common.serialize_msg(msg))
+        else:
+            conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
+
+
+class RpcContext(rpc_common.CommonRpcContext):
+    """Context that supports replying to a rpc.call."""
+    def __init__(self, **kwargs):
+        self.msg_id = kwargs.pop('msg_id', None)
+        self.reply_q = kwargs.pop('reply_q', None)
+        self.conf = kwargs.pop('conf')
+        super(RpcContext, self).__init__(**kwargs)
+
+    def deepcopy(self):
+        values = self.to_dict()
+        values['conf'] = self.conf
+        values['msg_id'] = self.msg_id
+        values['reply_q'] = self.reply_q
+        return self.__class__(**values)
+
+    def reply(self, reply=None, failure=None, ending=False,
+              connection_pool=None, log_failure=True):
+        if self.msg_id:
+            msg_reply(self.conf, self.msg_id, self.reply_q, connection_pool,
+                      reply, failure, ending, log_failure)
+            if ending:
+                self.msg_id = None
+
+
+def unpack_context(conf, msg):
+    """Unpack context from msg."""
+    context_dict = {}
+    for key in list(msg.keys()):
+        # NOTE(vish): Some versions of Python don't like unicode keys
+        #             in kwargs.
+        key = str(key)
+        if key.startswith('_context_'):
+            value = msg.pop(key)
+            context_dict[key[9:]] = value
+    context_dict['msg_id'] = msg.pop('_msg_id', None)
+    context_dict['reply_q'] = msg.pop('_reply_q', None)
+    context_dict['conf'] = conf
+    ctx = RpcContext.from_dict(context_dict)
+    rpc_common._safe_log(LOG.debug, _('unpacked context: %s'), ctx.to_dict())
+    return ctx
+
+
+def pack_context(msg, context):
+    """Pack context into msg.
+
+    Values for message keys need to be less than 255 chars, so we pull
+    context out into a bunch of separate keys. If we want to support
+    more arguments in rabbit messages, we may want to do the same
+    for args at some point.
+
+    """
+    if isinstance(context, dict):
+        context_d = six.iteritems(context)
+    else:
+        context_d = six.iteritems(context.to_dict())
+
+    msg.update(('_context_%s' % key, value)
+               for (key, value) in context_d)
+
+
+class _MsgIdCache(object):
+    """This class checks any duplicate messages."""
+
+    # NOTE: This value is considered can be a configuration item, but
+    #       it is not necessary to change its value in most cases,
+    #       so let this value as static for now.
+    DUP_MSG_CHECK_SIZE = 16
+
+    def __init__(self, **kwargs):
+        self.prev_msgids = collections.deque([],
+                                             maxlen=self.DUP_MSG_CHECK_SIZE)
+
+    def check_duplicate_message(self, message_data):
+        """AMQP consumers may read same message twice when exceptions occur
+           before ack is returned. This method prevents doing it.
+        """
+        try:
+            msg_id = message_data.pop(UNIQUE_ID)
+        except KeyError:
+            return
+        if msg_id in self.prev_msgids:
+            raise rpc_common.DuplicateMessageError(msg_id=msg_id)
+        return msg_id
+
+    def add(self, msg_id):
+        if msg_id and msg_id not in self.prev_msgids:
+            self.prev_msgids.append(msg_id)
+
+
+def _add_unique_id(msg):
+    """Add unique_id for checking duplicate messages."""
+    unique_id = uuid.uuid4().hex
+    msg.update({UNIQUE_ID: unique_id})
+    LOG.debug(_('UNIQUE_ID is %s.') % (unique_id))
+
+
+def get_control_exchange(conf):
+    return conf.control_exchange

=== added file '.pc/0005-fix-rabbit-starvation-of-connections-for-reply.patch/oslo/messaging/_drivers/amqpdriver.py'
--- .pc/0005-fix-rabbit-starvation-of-connections-for-reply.patch/oslo/messaging/_drivers/amqpdriver.py	1970-01-01 00:00:00 +0000
+++ .pc/0005-fix-rabbit-starvation-of-connections-for-reply.patch/oslo/messaging/_drivers/amqpdriver.py	2015-12-31 04:46:54 +0000
@@ -0,0 +1,467 @@
+
+# Copyright 2013 Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+__all__ = ['AMQPDriverBase']
+
+import logging
+import threading
+import uuid
+
+from six import moves
+
+from oslo import messaging
+from oslo.messaging._drivers import amqp as rpc_amqp
+from oslo.messaging._drivers import base
+from oslo.messaging._drivers import common as rpc_common
+
+LOG = logging.getLogger(__name__)
+
+
+class AMQPIncomingMessage(base.IncomingMessage):
+
+    def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q):
+        super(AMQPIncomingMessage, self).__init__(listener, ctxt,
+                                                  dict(message))
+
+        self.unique_id = unique_id
+        self.msg_id = msg_id
+        self.reply_q = reply_q
+        self.acknowledge_callback = message.acknowledge
+        self.requeue_callback = message.requeue
+
+    def _send_reply(self, conn, reply=None, failure=None,
+                    ending=False, log_failure=True):
+        if failure:
+            failure = rpc_common.serialize_remote_exception(failure,
+                                                            log_failure)
+
+        msg = {'result': reply, 'failure': failure}
+        if ending:
+            msg['ending'] = True
+
+        rpc_amqp._add_unique_id(msg)
+
+        # If a reply_q exists, add the msg_id to the reply and pass the
+        # reply_q to direct_send() to use it as the response queue.
+        # Otherwise use the msg_id for backward compatibility.
+        if self.reply_q:
+            msg['_msg_id'] = self.msg_id
+            conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg))
+        else:
+            conn.direct_send(self.msg_id, rpc_common.serialize_msg(msg))
+
+    def reply(self, reply=None, failure=None, log_failure=True):
+        if not self.msg_id:
+            # NOTE(Alexei_987) not sending reply, if msg_id is empty
+            #    because reply should not be expected by caller side
+            return
+
+        with self.listener.driver._get_connection() as conn:
+            self._send_reply(conn, reply, failure, log_failure=log_failure)
+            self._send_reply(conn, ending=True)
+
+    def acknowledge(self):
+        self.listener.msg_id_cache.add(self.unique_id)
+        self.acknowledge_callback()
+
+    def requeue(self):
+        # NOTE(sileht): In case of the connection is lost between receiving the
+        # message and requeing it, this requeue call fail
+        # but because the message is not acknowledged and not added to the
+        # msg_id_cache, the message will be reconsumed, the only difference is
+        # the message stay at the beginning of the queue instead of moving to
+        # the end.
+        self.requeue_callback()
+
+
+class AMQPListener(base.Listener):
+
+    def __init__(self, driver, conn):
+        super(AMQPListener, self).__init__(driver)
+        self.conn = conn
+        self.msg_id_cache = rpc_amqp._MsgIdCache()
+        self.incoming = []
+        self._stopped = threading.Event()
+
+    def __call__(self, message):
+        # FIXME(markmc): logging isn't driver specific
+        rpc_common._safe_log(LOG.debug, 'received %s', dict(message))
+
+        unique_id = self.msg_id_cache.check_duplicate_message(message)
+        ctxt = rpc_amqp.unpack_context(self.conf, message)
+
+        self.incoming.append(AMQPIncomingMessage(self,
+                                                 ctxt.to_dict(),
+                                                 message,
+                                                 unique_id,
+                                                 ctxt.msg_id,
+                                                 ctxt.reply_q))
+
+    def poll(self, timeout=None):
+        while not self._stopped.is_set():
+            if self.incoming:
+                return self.incoming.pop(0)
+            try:
+                self.conn.consume(limit=1, timeout=timeout)
+            except rpc_common.Timeout:
+                return None
+
+    def stop(self):
+        self._stopped.set()
+        self.conn.stop_consuming()
+
+    def cleanup(self):
+        # Closes listener connection
+        self.conn.close()
+
+
+class ReplyWaiters(object):
+
+    WAKE_UP = object()
+
+    def __init__(self):
+        self._queues = {}
+        self._wrn_threshold = 10
+
+    def get(self, msg_id, timeout):
+        try:
+            return self._queues[msg_id].get(block=True, timeout=timeout)
+        except moves.queue.Empty:
+            raise messaging.MessagingTimeout('Timed out waiting for a reply '
+                                             'to message ID %s' % msg_id)
+
+    def check(self, msg_id):
+        try:
+            return self._queues[msg_id].get(block=False)
+        except moves.queue.Empty:
+            return None
+
+    def put(self, msg_id, message_data):
+        queue = self._queues.get(msg_id)
+        if not queue:
+            LOG.warn('No calling threads waiting for msg_id : %(msg_id)s'
+                     ', message : %(data)s', {'msg_id': msg_id,
+                                              'data': message_data})
+            LOG.warn('_queues: %s' % str(self._queues))
+        else:
+            queue.put(message_data)
+
+    def wake_all(self, except_id):
+        msg_ids = [i for i in self._queues.keys() if i != except_id]
+        for msg_id in msg_ids:
+            self.put(msg_id, self.WAKE_UP)
+
+    def add(self, msg_id, queue):
+        self._queues[msg_id] = queue
+        if len(self._queues) > self._wrn_threshold:
+            LOG.warn('Number of call queues is greater than warning '
+                     'threshold: %d. There could be a leak.' %
+                     self._wrn_threshold)
+            self._wrn_threshold *= 2
+
+    def remove(self, msg_id):
+        del self._queues[msg_id]
+
+
+class ReplyWaiter(object):
+
+    def __init__(self, conf, reply_q, conn, allowed_remote_exmods):
+        self.conf = conf
+        self.conn = conn
+        self.reply_q = reply_q
+        self.allowed_remote_exmods = allowed_remote_exmods
+
+        self.conn_lock = threading.Lock()
+        self.incoming = []
+        self.msg_id_cache = rpc_amqp._MsgIdCache()
+        self.waiters = ReplyWaiters()
+
+        conn.declare_direct_consumer(reply_q, self)
+
+    def __call__(self, message):
+        message.acknowledge()
+        self.incoming.append(message)
+
+    def listen(self, msg_id):
+        queue = moves.queue.Queue()
+        self.waiters.add(msg_id, queue)
+
+    def unlisten(self, msg_id):
+        self.waiters.remove(msg_id)
+
+    @staticmethod
+    def _raise_timeout_exception(msg_id):
+        raise messaging.MessagingTimeout(
+            'Timed out waiting for a reply to message ID %s' % msg_id)
+
+    def _process_reply(self, data):
+        result = None
+        ending = False
+        self.msg_id_cache.check_duplicate_message(data)
+        if data['failure']:
+            failure = data['failure']
+            result = rpc_common.deserialize_remote_exception(
+                failure, self.allowed_remote_exmods)
+        elif data.get('ending', False):
+            ending = True
+        else:
+            result = data['result']
+        return result, ending
+
+    def _poll_connection(self, msg_id, timer):
+        while True:
+            while self.incoming:
+                message_data = self.incoming.pop(0)
+
+                incoming_msg_id = message_data.pop('_msg_id', None)
+                if incoming_msg_id == msg_id:
+                    return self._process_reply(message_data)
+
+                self.waiters.put(incoming_msg_id, message_data)
+
+            timeout = timer.check_return(self._raise_timeout_exception, msg_id)
+            try:
+                self.conn.consume(limit=1, timeout=timeout)
+            except rpc_common.Timeout:
+                self._raise_timeout_exception(msg_id)
+
+    def _poll_queue(self, msg_id, timer):
+        timeout = timer.check_return(self._raise_timeout_exception, msg_id)
+        message = self.waiters.get(msg_id, timeout=timeout)
+        if message is self.waiters.WAKE_UP:
+            return None, None, True  # lock was released
+
+        reply, ending = self._process_reply(message)
+        return reply, ending, False
+
+    def _check_queue(self, msg_id):
+        while True:
+            message = self.waiters.check(msg_id)
+            if message is self.waiters.WAKE_UP:
+                continue
+            if message is None:
+                return None, None, True  # queue is empty
+
+            reply, ending = self._process_reply(message)
+            return reply, ending, False
+
+    def wait(self, msg_id, timeout):
+        #
+        # NOTE(markmc): we're waiting for a reply for msg_id to come in for on
+        # the reply_q, but there may be other threads also waiting for replies
+        # to other msg_ids
+        #
+        # Only one thread can be consuming from the queue using this connection
+        # and we don't want to hold open a connection per thread, so instead we
+        # have the first thread take responsibility for passing replies not
+        # intended for itself to the appropriate thread.
+        #
+        timer = rpc_common.DecayingTimer(duration=timeout)
+        timer.start()
+        final_reply = None
+        while True:
+            if self.conn_lock.acquire(False):
+                # Ok, we're the thread responsible for polling the connection
+                try:
+                    # Check the queue to see if a previous lock-holding thread
+                    # queued up a reply already
+                    while True:
+                        reply, ending, empty = self._check_queue(msg_id)
+                        if empty:
+                            break
+                        if not ending:
+                            final_reply = reply
+                        else:
+                            return final_reply
+
+                    # Now actually poll the connection
+                    while True:
+                        reply, ending = self._poll_connection(msg_id, timer)
+                        if not ending:
+                            final_reply = reply
+                        else:
+                            return final_reply
+                finally:
+                    self.conn_lock.release()
+                    # We've got our reply, tell the other threads to wake up
+                    # so that one of them will take over the responsibility for
+                    # polling the connection
+                    self.waiters.wake_all(msg_id)
+            else:
+                # We're going to wait for the first thread to pass us our reply
+                reply, ending, trylock = self._poll_queue(msg_id, timer)
+                if trylock:
+                    # The first thread got its reply, let's try and take over
+                    # the responsibility for polling
+                    continue
+                if not ending:
+                    final_reply = reply
+                else:
+                    return final_reply
+
+
+class AMQPDriverBase(base.BaseDriver):
+
+    def __init__(self, conf, url, connection_pool,
+                 default_exchange=None, allowed_remote_exmods=[]):
+        super(AMQPDriverBase, self).__init__(conf, url, default_exchange,
+                                             allowed_remote_exmods)
+
+        self._server_params = self._server_params_from_url(self._url)
+
+        self._default_exchange = default_exchange
+
+        # FIXME(markmc): temp hack
+        if self._default_exchange:
+            self.conf.set_override('control_exchange', self._default_exchange)
+
+        self._connection_pool = connection_pool
+
+        self._reply_q_lock = threading.Lock()
+        self._reply_q = None
+        self._reply_q_conn = None
+        self._waiter = None
+
+    def _server_params_from_url(self, url):
+        sp = {}
+
+        if url.virtual_host is not None:
+            sp['virtual_host'] = url.virtual_host
+
+        if url.hosts:
+            # FIXME(markmc): support multiple hosts
+            host = url.hosts[0]
+
+            sp['hostname'] = host.hostname
+            if host.port is not None:
+                sp['port'] = host.port
+            sp['username'] = host.username or ''
+            sp['password'] = host.password or ''
+
+        return sp
+
+    def _get_connection(self, pooled=True):
+        # FIXME(markmc): we don't yet have a connection pool for each
+        # Transport instance, so we'll only use the pool with the
+        # transport configuration from the config file
+        server_params = self._server_params or None
+        if server_params:
+            pooled = False
+        return rpc_amqp.ConnectionContext(self.conf,
+                                          self._connection_pool,
+                                          pooled=pooled,
+                                          server_params=server_params)
+
+    def _get_reply_q(self):
+        with self._reply_q_lock:
+            if self._reply_q is not None:
+                return self._reply_q
+
+            reply_q = 'reply_' + uuid.uuid4().hex
+
+            conn = self._get_connection(pooled=False)
+
+            self._waiter = ReplyWaiter(self.conf, reply_q, conn,
+                                       self._allowed_remote_exmods)
+
+            self._reply_q = reply_q
+            self._reply_q_conn = conn
+
+        return self._reply_q
+
+    def _send(self, target, ctxt, message,
+              wait_for_reply=None, timeout=None,
+              envelope=True, notify=False):
+
+        # FIXME(markmc): remove this temporary hack
+        class Context(object):
+            def __init__(self, d):
+                self.d = d
+
+            def to_dict(self):
+                return self.d
+
+        context = Context(ctxt)
+        msg = message
+
+        if wait_for_reply:
+            msg_id = uuid.uuid4().hex
+            msg.update({'_msg_id': msg_id})
+            LOG.debug('MSG_ID is %s' % (msg_id))
+            msg.update({'_reply_q': self._get_reply_q()})
+
+        rpc_amqp._add_unique_id(msg)
+        rpc_amqp.pack_context(msg, context)
+
+        if envelope:
+            msg = rpc_common.serialize_msg(msg)
+
+        if wait_for_reply:
+            self._waiter.listen(msg_id)
+
+        try:
+            with self._get_connection() as conn:
+                if notify:
+                    conn.notify_send(target.topic, msg)
+                elif target.fanout:
+                    conn.fanout_send(target.topic, msg)
+                else:
+                    topic = target.topic
+                    if target.server:
+                        topic = '%s.%s' % (target.topic, target.server)
+                    conn.topic_send(topic, msg, timeout=timeout)
+
+            if wait_for_reply:
+                result = self._waiter.wait(msg_id, timeout)
+                if isinstance(result, Exception):
+                    raise result
+                return result
+        finally:
+            if wait_for_reply:
+                self._waiter.unlisten(msg_id)
+
+    def send(self, target, ctxt, message, wait_for_reply=None, timeout=None):
+        return self._send(target, ctxt, message, wait_for_reply, timeout)
+
+    def send_notification(self, target, ctxt, message, version):
+        return self._send(target, ctxt, message,
+                          envelope=(version == 2.0), notify=True)
+
+    def listen(self, target):
+        conn = self._get_connection(pooled=False)
+
+        listener = AMQPListener(self, conn)
+
+        conn.declare_topic_consumer(target.topic, listener)
+        conn.declare_topic_consumer('%s.%s' % (target.topic, target.server),
+                                    listener)
+        conn.declare_fanout_consumer(target.topic, listener)
+
+        return listener
+
+    def listen_for_notifications(self, targets_and_priorities):
+        conn = self._get_connection(pooled=False)
+
+        listener = AMQPListener(self, conn)
+        for target, priority in targets_and_priorities:
+            conn.declare_topic_consumer('%s.%s' % (target.topic, priority),
+                                        callback=listener,
+                                        exchange_name=target.exchange)
+        return listener
+
+    def cleanup(self):
+        if self._connection_pool:
+            self._connection_pool.empty()
+        self._connection_pool = None

=== added file '.pc/0005-fix-rabbit-starvation-of-connections-for-reply.patch/oslo/messaging/_drivers/impl_rabbit.py'
--- .pc/0005-fix-rabbit-starvation-of-connections-for-reply.patch/oslo/messaging/_drivers/impl_rabbit.py	1970-01-01 00:00:00 +0000
+++ .pc/0005-fix-rabbit-starvation-of-connections-for-reply.patch/oslo/messaging/_drivers/impl_rabbit.py	2015-12-31 04:46:54 +0000
@@ -0,0 +1,834 @@
+#    Copyright 2011 OpenStack Foundation
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import functools
+import itertools
+import logging
+import socket
+import ssl
+import time
+import uuid
+
+import kombu
+import kombu.connection
+import kombu.entity
+import kombu.messaging
+from oslo.config import cfg
+import six
+
+from oslo.messaging._drivers import amqp as rpc_amqp
+from oslo.messaging._drivers import amqpdriver
+from oslo.messaging._drivers import common as rpc_common
+from oslo.messaging.openstack.common import network_utils
+
+# FIXME(markmc): remove this
+_ = lambda s: s
+
+rabbit_opts = [
+    cfg.StrOpt('kombu_ssl_version',
+               default='',
+               help='SSL version to use (valid only if SSL enabled). '
+                    'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may '
+                    'be available on some distributions.'
+               ),
+    cfg.StrOpt('kombu_ssl_keyfile',
+               default='',
+               help='SSL key file (valid only if SSL enabled).'),
+    cfg.StrOpt('kombu_ssl_certfile',
+               default='',
+               help='SSL cert file (valid only if SSL enabled).'),
+    cfg.StrOpt('kombu_ssl_ca_certs',
+               default='',
+               help=('SSL certification authority file '
+                     '(valid only if SSL enabled).')),
+    cfg.FloatOpt('kombu_reconnect_delay',
+                 default=1.0,
+                 help='How long to wait before reconnecting in response to an '
+                      'AMQP consumer cancel notification.'),
+    cfg.StrOpt('rabbit_host',
+               default='localhost',
+               help='The RabbitMQ broker address where a single node is '
+                    'used.'),
+    cfg.IntOpt('rabbit_port',
+               default=5672,
+               help='The RabbitMQ broker port where a single node is used.'),
+    cfg.ListOpt('rabbit_hosts',
+                default=['$rabbit_host:$rabbit_port'],
+                help='RabbitMQ HA cluster host:port pairs.'),
+    cfg.BoolOpt('rabbit_use_ssl',
+                default=False,
+                help='Connect over SSL for RabbitMQ.'),
+    cfg.StrOpt('rabbit_userid',
+               default='guest',
+               help='The RabbitMQ userid.'),
+    cfg.StrOpt('rabbit_password',
+               default='guest',
+               help='The RabbitMQ password.',
+               secret=True),
+    cfg.StrOpt('rabbit_login_method',
+               default='AMQPLAIN',
+               help='the RabbitMQ login method'),
+    cfg.StrOpt('rabbit_virtual_host',
+               default='/',
+               help='The RabbitMQ virtual host.'),
+    cfg.IntOpt('rabbit_retry_interval',
+               default=1,
+               help='How frequently to retry connecting with RabbitMQ.'),
+    cfg.IntOpt('rabbit_retry_backoff',
+               default=2,
+               help='How long to backoff for between retries when connecting '
+                    'to RabbitMQ.'),
+    cfg.IntOpt('rabbit_max_retries',
+               default=0,
+               help='Maximum number of RabbitMQ connection retries. '
+                    'Default is 0 (infinite retry count).'),
+    cfg.BoolOpt('rabbit_ha_queues',
+                default=False,
+                help='Use HA queues in RabbitMQ (x-ha-policy: all). '
+                     'If you change this option, you must wipe the '
+                     'RabbitMQ database.'),
+
+    # FIXME(markmc): this was toplevel in openstack.common.rpc
+    cfg.BoolOpt('fake_rabbit',
+                default=False,
+                help='If passed, use a fake RabbitMQ provider.'),
+]
+
+LOG = logging.getLogger(__name__)
+
+
+def _get_queue_arguments(conf):
+    """Construct the arguments for declaring a queue.
+
+    If the rabbit_ha_queues option is set, we declare a mirrored queue
+    as described here:
+
+      http://www.rabbitmq.com/ha.html
+
+    Setting x-ha-policy to all means that the queue will be mirrored
+    to all nodes in the cluster.
+    """
+    return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {}
+
+
+class RabbitMessage(dict):
+    def __init__(self, raw_message):
+        super(RabbitMessage, self).__init__(
+            rpc_common.deserialize_msg(raw_message.payload))
+        self._raw_message = raw_message
+
+    def acknowledge(self):
+        self._raw_message.ack()
+
+    def requeue(self):
+        self._raw_message.requeue()
+
+
+class ConsumerBase(object):
+    """Consumer base class."""
+
+    def __init__(self, channel, callback, tag, **kwargs):
+        """Declare a queue on an amqp channel.
+
+        'channel' is the amqp channel to use
+        'callback' is the callback to call when messages are received
+        'tag' is a unique ID for the consumer on the channel
+
+        queue name, exchange name, and other kombu options are
+        passed in here as a dictionary.
+        """
+        self.callback = callback
+        self.tag = str(tag)
+        self.kwargs = kwargs
+        self.queue = None
+        self.reconnect(channel)
+
+    def reconnect(self, channel):
+        """Re-declare the queue after a rabbit reconnect."""
+        self.channel = channel
+        self.kwargs['channel'] = channel
+        self.queue = kombu.entity.Queue(**self.kwargs)
+        self.queue.declare()
+
+    def _callback_handler(self, message, callback):
+        """Call callback with deserialized message.
+
+        Messages that are processed and ack'ed.
+        """
+
+        try:
+            callback(RabbitMessage(message))
+        except Exception:
+            LOG.exception(_("Failed to process message"
+                            " ... skipping it."))
+            message.ack()
+
+    def consume(self, *args, **kwargs):
+        """Actually declare the consumer on the amqp channel.  This will
+        start the flow of messages from the queue.  Using the
+        Connection.iterconsume() iterator will process the messages,
+        calling the appropriate callback.
+
+        If a callback is specified in kwargs, use that.  Otherwise,
+        use the callback passed during __init__()
+
+        If kwargs['nowait'] is True, then this call will block until
+        a message is read.
+
+        """
+
+        options = {'consumer_tag': self.tag}
+        options['nowait'] = kwargs.get('nowait', False)
+        callback = kwargs.get('callback', self.callback)
+        if not callback:
+            raise ValueError("No callback defined")
+
+        def _callback(raw_message):
+            message = self.channel.message_to_python(raw_message)
+            self._callback_handler(message, callback)
+
+        self.queue.consume(*args, callback=_callback, **options)
+
+    def cancel(self):
+        """Cancel the consuming from the queue, if it has started."""
+        try:
+            self.queue.cancel(self.tag)
+        except KeyError as e:
+            # NOTE(comstud): Kludge to get around a amqplib bug
+            if str(e) != "u'%s'" % self.tag:
+                raise
+        self.queue = None
+
+
+class DirectConsumer(ConsumerBase):
+    """Queue/consumer class for 'direct'."""
+
+    def __init__(self, conf, channel, msg_id, callback, tag, **kwargs):
+        """Init a 'direct' queue.
+
+        'channel' is the amqp channel to use
+        'msg_id' is the msg_id to listen on
+        'callback' is the callback to call when messages are received
+        'tag' is a unique ID for the consumer on the channel
+
+        Other kombu options may be passed
+        """
+        # Default options
+        options = {'durable': False,
+                   'queue_arguments': _get_queue_arguments(conf),
+                   'auto_delete': True,
+                   'exclusive': False}
+        options.update(kwargs)
+        exchange = kombu.entity.Exchange(name=msg_id,
+                                         type='direct',
+                                         durable=options['durable'],
+                                         auto_delete=options['auto_delete'])
+        super(DirectConsumer, self).__init__(channel,
+                                             callback,
+                                             tag,
+                                             name=msg_id,
+                                             exchange=exchange,
+                                             routing_key=msg_id,
+                                             **options)
+
+
+class TopicConsumer(ConsumerBase):
+    """Consumer class for 'topic'."""
+
+    def __init__(self, conf, channel, topic, callback, tag, name=None,
+                 exchange_name=None, **kwargs):
+        """Init a 'topic' queue.
+
+        :param channel: the amqp channel to use
+        :param topic: the topic to listen on
+        :paramtype topic: str
+        :param callback: the callback to call when messages are received
+        :param tag: a unique ID for the consumer on the channel
+        :param name: optional queue name, defaults to topic
+        :paramtype name: str
+
+        Other kombu options may be passed as keyword arguments
+        """
+        # Default options
+        options = {'durable': conf.amqp_durable_queues,
+                   'queue_arguments': _get_queue_arguments(conf),
+                   'auto_delete': conf.amqp_auto_delete,
+                   'exclusive': False}
+        options.update(kwargs)
+        exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
+        exchange = kombu.entity.Exchange(name=exchange_name,
+                                         type='topic',
+                                         durable=options['durable'],
+                                         auto_delete=options['auto_delete'])
+        super(TopicConsumer, self).__init__(channel,
+                                            callback,
+                                            tag,
+                                            name=name or topic,
+                                            exchange=exchange,
+                                            routing_key=topic,
+                                            **options)
+
+
+class FanoutConsumer(ConsumerBase):
+    """Consumer class for 'fanout'."""
+
+    def __init__(self, conf, channel, topic, callback, tag, **kwargs):
+        """Init a 'fanout' queue.
+
+        'channel' is the amqp channel to use
+        'topic' is the topic to listen on
+        'callback' is the callback to call when messages are received
+        'tag' is a unique ID for the consumer on the channel
+
+        Other kombu options may be passed
+        """
+        unique = uuid.uuid4().hex
+        exchange_name = '%s_fanout' % topic
+        queue_name = '%s_fanout_%s' % (topic, unique)
+
+        # Default options
+        options = {'durable': False,
+                   'queue_arguments': _get_queue_arguments(conf),
+                   'auto_delete': True,
+                   'exclusive': False}
+        options.update(kwargs)
+        exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
+                                         durable=options['durable'],
+                                         auto_delete=options['auto_delete'])
+        super(FanoutConsumer, self).__init__(channel, callback, tag,
+                                             name=queue_name,
+                                             exchange=exchange,
+                                             routing_key=topic,
+                                             **options)
+
+
+class Publisher(object):
+    """Base Publisher class."""
+
+    def __init__(self, channel, exchange_name, routing_key, **kwargs):
+        """Init the Publisher class with the exchange_name, routing_key,
+        and other options
+        """
+        self.exchange_name = exchange_name
+        self.routing_key = routing_key
+        self.kwargs = kwargs
+        self.reconnect(channel)
+
+    def reconnect(self, channel):
+        """Re-establish the Producer after a rabbit reconnection."""
+        self.exchange = kombu.entity.Exchange(name=self.exchange_name,
+                                              **self.kwargs)
+        self.producer = kombu.messaging.Producer(exchange=self.exchange,
+                                                 channel=channel,
+                                                 routing_key=self.routing_key)
+
+    def send(self, msg, timeout=None):
+        """Send a message."""
+        if timeout:
+            #
+            # AMQP TTL is in milliseconds when set in the header.
+            #
+            self.producer.publish(msg, headers={'ttl': (timeout * 1000)})
+        else:
+            self.producer.publish(msg)
+
+
+class DirectPublisher(Publisher):
+    """Publisher class for 'direct'."""
+    def __init__(self, conf, channel, msg_id, **kwargs):
+        """Init a 'direct' publisher.
+
+        Kombu options may be passed as keyword args to override defaults
+        """
+
+        options = {'durable': False,
+                   'auto_delete': True,
+                   'exclusive': False,
+                   'passive': True}
+        options.update(kwargs)
+        super(DirectPublisher, self).__init__(channel, msg_id, msg_id,
+                                              type='direct', **options)
+
+
+class TopicPublisher(Publisher):
+    """Publisher class for 'topic'."""
+    def __init__(self, conf, channel, topic, **kwargs):
+        """Init a 'topic' publisher.
+
+        Kombu options may be passed as keyword args to override defaults
+        """
+        options = {'durable': conf.amqp_durable_queues,
+                   'auto_delete': conf.amqp_auto_delete,
+                   'exclusive': False}
+
+        options.update(kwargs)
+        exchange_name = rpc_amqp.get_control_exchange(conf)
+        super(TopicPublisher, self).__init__(channel,
+                                             exchange_name,
+                                             topic,
+                                             type='topic',
+                                             **options)
+
+
+class FanoutPublisher(Publisher):
+    """Publisher class for 'fanout'."""
+    def __init__(self, conf, channel, topic, **kwargs):
+        """Init a 'fanout' publisher.
+
+        Kombu options may be passed as keyword args to override defaults
+        """
+        options = {'durable': False,
+                   'auto_delete': True,
+                   'exclusive': False}
+        options.update(kwargs)
+        super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
+                                              None, type='fanout', **options)
+
+
+class NotifyPublisher(TopicPublisher):
+    """Publisher class for 'notify'."""
+
+    def __init__(self, conf, channel, topic, **kwargs):
+        self.durable = kwargs.pop('durable', conf.amqp_durable_queues)
+        self.queue_arguments = _get_queue_arguments(conf)
+        super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
+
+    def reconnect(self, channel):
+        super(NotifyPublisher, self).reconnect(channel)
+
+        # NOTE(jerdfelt): Normally the consumer would create the queue, but
+        # we do this to ensure that messages don't get dropped if the
+        # consumer is started after we do
+        queue = kombu.entity.Queue(channel=channel,
+                                   exchange=self.exchange,
+                                   durable=self.durable,
+                                   name=self.routing_key,
+                                   routing_key=self.routing_key,
+                                   queue_arguments=self.queue_arguments)
+        queue.declare()
+
+
+class Connection(object):
+    """Connection object."""
+
+    pool = None
+
+    def __init__(self, conf, server_params=None):
+        self.consumers = []
+        self.conf = conf
+        self.max_retries = self.conf.rabbit_max_retries
+        # Try forever?
+        if self.max_retries <= 0:
+            self.max_retries = None
+        self.interval_start = self.conf.rabbit_retry_interval
+        self.interval_stepping = self.conf.rabbit_retry_backoff
+        # max retry-interval = 30 seconds
+        self.interval_max = 30
+        self.memory_transport = False
+
+        if server_params is None:
+            server_params = {}
+        # Keys to translate from server_params to kombu params
+        server_params_to_kombu_params = {'username': 'userid'}
+
+        ssl_params = self._fetch_ssl_params()
+        params_list = []
+        for adr in self.conf.rabbit_hosts:
+            hostname, port = network_utils.parse_host_port(
+                adr, default_port=self.conf.rabbit_port)
+
+            params = {
+                'hostname': hostname,
+                'port': port,
+                'userid': self.conf.rabbit_userid,
+                'password': self.conf.rabbit_password,
+                'login_method': self.conf.rabbit_login_method,
+                'virtual_host': self.conf.rabbit_virtual_host,
+            }
+
+            for sp_key, value in six.iteritems(server_params):
+                p_key = server_params_to_kombu_params.get(sp_key, sp_key)
+                params[p_key] = value
+
+            if self.conf.fake_rabbit:
+                params['transport'] = 'memory'
+            if self.conf.rabbit_use_ssl:
+                params['ssl'] = ssl_params
+
+            params_list.append(params)
+
+        self.params_list = itertools.cycle(params_list)
+
+        self.memory_transport = self.conf.fake_rabbit
+
+        self.connection = None
+        self.do_consume = None
+        self._consume_loop_stopped = False
+
+        self.reconnect()
+
+    # FIXME(markmc): use oslo sslutils when it is available as a library
+    _SSL_PROTOCOLS = {
+        "tlsv1": ssl.PROTOCOL_TLSv1,
+        "sslv23": ssl.PROTOCOL_SSLv23,
+        "sslv3": ssl.PROTOCOL_SSLv3
+    }
+
+    try:
+        _SSL_PROTOCOLS["sslv2"] = ssl.PROTOCOL_SSLv2
+    except AttributeError:
+        pass
+
+    @classmethod
+    def validate_ssl_version(cls, version):
+        key = version.lower()
+        try:
+            return cls._SSL_PROTOCOLS[key]
+        except KeyError:
+            raise RuntimeError(_("Invalid SSL version : %s") % version)
+
+    def _fetch_ssl_params(self):
+        """Handles fetching what ssl params should be used for the connection
+        (if any).
+        """
+        ssl_params = dict()
+
+        # http://docs.python.org/library/ssl.html - ssl.wrap_socket
+        if self.conf.kombu_ssl_version:
+            ssl_params['ssl_version'] = self.validate_ssl_version(
+                self.conf.kombu_ssl_version)
+        if self.conf.kombu_ssl_keyfile:
+            ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
+        if self.conf.kombu_ssl_certfile:
+            ssl_params['certfile'] = self.conf.kombu_ssl_certfile
+        if self.conf.kombu_ssl_ca_certs:
+            ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs
+            # We might want to allow variations in the
+            # future with this?
+            ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
+
+        # Return the extended behavior or just have the default behavior
+        return ssl_params or True
+
+    def _connect(self, params):
+        """Connect to rabbit.  Re-establish any queues that may have
+        been declared before if we are reconnecting.  Exceptions should
+        be handled by the caller.
+        """
+        if self.connection:
+            LOG.info(_("Reconnecting to AMQP server on "
+                     "%(hostname)s:%(port)d") % params)
+            try:
+                # XXX(nic): when reconnecting to a RabbitMQ cluster
+                # with mirrored queues in use, the attempt to release the
+                # connection can hang "indefinitely" somewhere deep down
+                # in Kombu.  Blocking the thread for a bit prior to
+                # release seems to kludge around the problem where it is
+                # otherwise reproduceable.
+                if self.conf.kombu_reconnect_delay > 0:
+                    LOG.info(_("Delaying reconnect for %1.1f seconds...") %
+                             self.conf.kombu_reconnect_delay)
+                    time.sleep(self.conf.kombu_reconnect_delay)
+
+                self.connection.release()
+            except self.connection_errors:
+                pass
+            # Setting this in case the next statement fails, though
+            # it shouldn't be doing any network operations, yet.
+            self.connection = None
+        self.connection = kombu.connection.BrokerConnection(**params)
+        self.connection_errors = self.connection.connection_errors
+        self.channel_errors = self.connection.channel_errors
+        if self.memory_transport:
+            # Kludge to speed up tests.
+            self.connection.transport.polling_interval = 0.0
+        self.do_consume = True
+        self.consumer_num = itertools.count(1)
+        self.connection.connect()
+        self.channel = self.connection.channel()
+        # work around 'memory' transport bug in 1.1.3
+        if self.memory_transport:
+            self.channel._new_queue('ae.undeliver')
+        for consumer in self.consumers:
+            consumer.reconnect(self.channel)
+        LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d') %
+                 params)
+
+    def reconnect(self):
+        """Handles reconnecting and re-establishing queues.
+        Will retry up to self.max_retries number of times.
+        self.max_retries = 0 means to retry forever.
+        Sleep between tries, starting at self.interval_start
+        seconds, backing off self.interval_stepping number of seconds
+        each attempt.
+        """
+
+        attempt = 0
+        while True:
+            params = six.next(self.params_list)
+            attempt += 1
+            try:
+                self._connect(params)
+                return
+            except IOError as e:
+                pass
+            except self.connection_errors as e:
+                pass
+            except Exception as e:
+                # NOTE(comstud): Unfortunately it's possible for amqplib
+                # to return an error not covered by its transport
+                # connection_errors in the case of a timeout waiting for
+                # a protocol response.  (See paste link in LP888621)
+                # So, we check all exceptions for 'timeout' in them
+                # and try to reconnect in this case.
+                if 'timeout' not in str(e):
+                    raise
+
+            log_info = {}
+            log_info['err_str'] = str(e)
+            log_info['max_retries'] = self.max_retries
+            log_info.update(params)
+
+            if self.max_retries and attempt == self.max_retries:
+                msg = _('Unable to connect to AMQP server on '
+                        '%(hostname)s:%(port)d after %(max_retries)d '
+                        'tries: %(err_str)s') % log_info
+                LOG.error(msg)
+                raise rpc_common.RPCException(msg)
+
+            if attempt == 1:
+                sleep_time = self.interval_start or 1
+            elif attempt > 1:
+                sleep_time += self.interval_stepping
+            if self.interval_max:
+                sleep_time = min(sleep_time, self.interval_max)
+
+            log_info['sleep_time'] = sleep_time
+            LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
+                        'unreachable: %(err_str)s. Trying again in '
+                        '%(sleep_time)d seconds.') % log_info)
+            time.sleep(sleep_time)
+
+    def ensure(self, error_callback, method, *args, **kwargs):
+        while True:
+            try:
+                return method(*args, **kwargs)
+            except self.connection_errors as e:
+                if error_callback:
+                    error_callback(e)
+            except self.channel_errors as e:
+                if error_callback:
+                    error_callback(e)
+            except (socket.timeout, IOError) as e:
+                if error_callback:
+                    error_callback(e)
+            except Exception as e:
+                # NOTE(comstud): Unfortunately it's possible for amqplib
+                # to return an error not covered by its transport
+                # connection_errors in the case of a timeout waiting for
+                # a protocol response.  (See paste link in LP888621)
+                # So, we check all exceptions for 'timeout' in them
+                # and try to reconnect in this case.
+                if 'timeout' not in str(e):
+                    raise
+                if error_callback:
+                    error_callback(e)
+            self.reconnect()
+
+    def get_channel(self):
+        """Convenience call for bin/clear_rabbit_queues."""
+        return self.channel
+
+    def close(self):
+        """Close/release this connection."""
+        self.connection.release()
+        self.connection = None
+
+    def reset(self):
+        """Reset a connection so it can be used again."""
+        self.channel.close()
+        self.channel = self.connection.channel()
+        # work around 'memory' transport bug in 1.1.3
+        if self.memory_transport:
+            self.channel._new_queue('ae.undeliver')
+        self.consumers = []
+
+    def declare_consumer(self, consumer_cls, topic, callback):
+        """Create a Consumer using the class that was passed in and
+        add it to our list of consumers
+        """
+
+        def _connect_error(exc):
+            log_info = {'topic': topic, 'err_str': str(exc)}
+            LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
+                      "%(err_str)s") % log_info)
+
+        def _declare_consumer():
+            consumer = consumer_cls(self.conf, self.channel, topic, callback,
+                                    six.next(self.consumer_num))
+            self.consumers.append(consumer)
+            return consumer
+
+        return self.ensure(_connect_error, _declare_consumer)
+
+    def iterconsume(self, limit=None, timeout=None):
+        """Return an iterator that will consume from all queues/consumers."""
+        timer = rpc_common.DecayingTimer(duration=timeout)
+        timer.start()
+
+        def _raise_timeout(exc):
+            LOG.debug('Timed out waiting for RPC response: %s', exc)
+            raise rpc_common.Timeout()
+
+        def _error_callback(exc):
+            self.do_consume = True
+            timer.check_return(_raise_timeout, exc)
+            LOG.exception(_('Failed to consume message from queue: %s'),
+                          exc)
+
+        def _consume():
+            # NOTE(sileht): in case the acknowledgement or requeue of a
+            # message fail, the kombu transport can be disconnected
+            # In this case, we must redeclare our consumers, so raise
+            # a recoverable error to trigger the reconnection code.
+            if not self.connection.connected:
+                raise self.connection.recoverable_connection_errors[0]
+
+            if self.do_consume:
+                queues_head = self.consumers[:-1]  # not fanout.
+                queues_tail = self.consumers[-1]  # fanout
+                for queue in queues_head:
+                    queue.consume(nowait=True)
+                queues_tail.consume(nowait=False)
+                self.do_consume = False
+
+            poll_timeout = 1 if timeout is None else min(timeout, 1)
+            while True:
+                if self._consume_loop_stopped:
+                    self._consume_loop_stopped = False
+                    raise StopIteration
+                try:
+                    return self.connection.drain_events(timeout=poll_timeout)
+                except socket.timeout as exc:
+                    poll_timeout = timer.check_return(_raise_timeout, exc,
+                                                      maximum=1)
+
+        for iteration in itertools.count(0):
+            if limit and iteration >= limit:
+                raise StopIteration
+            yield self.ensure(_error_callback, _consume)
+
+    def publisher_send(self, cls, topic, msg, timeout=None, **kwargs):
+        """Send to a publisher based on the publisher class."""
+
+        def _error_callback(exc):
+            log_info = {'topic': topic, 'err_str': str(exc)}
+            LOG.exception(_("Failed to publish message to topic "
+                          "'%(topic)s': %(err_str)s") % log_info)
+
+        def _publish():
+            publisher = cls(self.conf, self.channel, topic, **kwargs)
+            publisher.send(msg, timeout)
+
+        self.ensure(_error_callback, _publish)
+
+    def declare_direct_consumer(self, topic, callback):
+        """Create a 'direct' queue.
+        In nova's use, this is generally a msg_id queue used for
+        responses for call/multicall
+        """
+        self.declare_consumer(DirectConsumer, topic, callback)
+
+    def declare_topic_consumer(self, topic, callback=None, queue_name=None,
+                               exchange_name=None):
+        """Create a 'topic' consumer."""
+        self.declare_consumer(functools.partial(TopicConsumer,
+                                                name=queue_name,
+                                                exchange_name=exchange_name,
+                                                ),
+                              topic, callback)
+
+    def declare_fanout_consumer(self, topic, callback):
+        """Create a 'fanout' consumer."""
+        self.declare_consumer(FanoutConsumer, topic, callback)
+
+    def direct_send(self, msg_id, msg):
+        """Send a 'direct' message."""
+
+        timer = rpc_common.DecayingTimer(duration=60)
+        timer.start()
+        # NOTE(sileht): retry at least 60sec, after we have a good change
+        # that the caller is really dead too...
+
+        while True:
+            try:
+                self.publisher_send(DirectPublisher, msg_id, msg)
+            except self.connection.channel_errors as exc:
+                # NOTE(noelbk/sileht):
+                # If rabbit dies, the consumer can be disconnected before the
+                # publisher sends, and if the consumer hasn't declared the
+                # queue, the publisher's will send a message to an exchange
+                # that's not bound to a queue, and the message wll be lost.
+                # So we set passive=True to the publisher exchange and catch
+                # the 404 kombu ChannelError and retry until the exchange
+                # appears
+                if exc.code == 404 and timer.check_return() > 0:
+                    LOG.info(_("The exchange to reply to %s doesn't "
+                               "exist yet, retrying...") % msg_id)
+                    time.sleep(1)
+                    continue
+                raise
+            return
+
+    def topic_send(self, topic, msg, timeout=None):
+        """Send a 'topic' message."""
+        self.publisher_send(TopicPublisher, topic, msg, timeout)
+
+    def fanout_send(self, topic, msg):
+        """Send a 'fanout' message."""
+        self.publisher_send(FanoutPublisher, topic, msg)
+
+    def notify_send(self, topic, msg, **kwargs):
+        """Send a notify message on a topic."""
+        self.publisher_send(NotifyPublisher, topic, msg, None, **kwargs)
+
+    def consume(self, limit=None, timeout=None):
+        """Consume from all queues/consumers."""
+        it = self.iterconsume(limit=limit, timeout=timeout)
+        while True:
+            try:
+                six.next(it)
+            except StopIteration:
+                return
+
+    def stop_consuming(self):
+        self._consume_loop_stopped = True
+
+
+class RabbitDriver(amqpdriver.AMQPDriverBase):
+
+    def __init__(self, conf, url, default_exchange=None,
+                 allowed_remote_exmods=[]):
+        conf.register_opts(rabbit_opts)
+        conf.register_opts(rpc_amqp.amqp_opts)
+
+        connection_pool = rpc_amqp.get_connection_pool(conf, Connection)
+
+        super(RabbitDriver, self).__init__(conf, url,
+                                           connection_pool,
+                                           default_exchange,
+                                           allowed_remote_exmods)
+
+    def require_features(self, requeue=True):
+        pass

=== modified file '.pc/applied-patches'
--- .pc/applied-patches	2015-07-13 15:59:14 +0000
+++ .pc/applied-patches	2015-12-31 04:46:54 +0000
@@ -4,3 +4,4 @@
 0003-Declare-DirectPublisher-exchanges-with-passive-True.patch
 redeclare-consumers-when-ack-requeue-fails.patch
 0004-fix-lp-1362863.patch
+0005-fix-rabbit-starvation-of-connections-for-reply.patch

=== added file '.pc/redeclare-consumers-when-ack-requeue-fails.patch/.timestamp'
=== added file '.pc/skip-qpid-tests.patch/.timestamp'
=== modified file 'debian/changelog'
--- debian/changelog	2015-07-13 15:59:14 +0000
+++ debian/changelog	2015-12-31 04:46:54 +0000
@@ -1,3 +1,11 @@
+oslo.messaging (1.3.0-0ubuntu1.4) trusty; urgency=medium
+
+  * Backport fixes to support AMQP HA failover cases.
+    - d/p/0005-fix-rabbit-starvation-of-connections-for-reply:
+      To solve nova-conductor infinitely loop issue (LP: #1521958). 
+
+ -- Hui Xiang <hui.xiang@canonical.com>  Thu, 31 Dec 2015 12:11:26 +0800
+
 oslo.messaging (1.3.0-0ubuntu1.3) trusty; urgency=medium
 
   * Backport various fixes for AMQP listener/executor. (LP: #1362863).

=== added file 'debian/patches/0005-fix-rabbit-starvation-of-connections-for-reply.patch'
--- debian/patches/0005-fix-rabbit-starvation-of-connections-for-reply.patch	1970-01-01 00:00:00 +0000
+++ debian/patches/0005-fix-rabbit-starvation-of-connections-for-reply.patch	2015-12-31 04:46:54 +0000
@@ -0,0 +1,123 @@
+--- a/oslo/messaging/_drivers/amqp.py
++++ b/oslo/messaging/_drivers/amqp.py
+@@ -340,3 +340,7 @@
+ 
+ def get_control_exchange(conf):
+     return conf.control_exchange
++
++
++class AMQPDestinationNotFound(Exception):
++    pass
+--- a/oslo/messaging/_drivers/amqpdriver.py
++++ b/oslo/messaging/_drivers/amqpdriver.py
+@@ -17,6 +17,7 @@
+ 
+ import logging
+ import threading
++import time
+ import uuid
+ 
+ from six import moves
+@@ -68,9 +69,36 @@
+             #    because reply should not be expected by caller side
+             return
+ 
+-        with self.listener.driver._get_connection() as conn:
+-            self._send_reply(conn, reply, failure, log_failure=log_failure)
+-            self._send_reply(conn, ending=True)
++        # NOTE(sileht): we read the configuration value from the driver
++        # to be able to backport this change in previous version that
++        # still have the qpid driver
++        duration = self.listener.driver.missing_destination_retry_timeout
++        timer = rpc_common.DecayingTimer(duration=duration)
++        timer.start()
++
++        while True:
++            try:
++                with self.listener.driver._get_connection() as conn:
++                    self._send_reply(conn, reply, failure,
++                                     log_failure=log_failure)
++                    self._send_reply(conn, ending=True)
++                return
++            except rpc_amqp.AMQPDestinationNotFound:
++                if timer.check_return() > 0:
++                    LOG.info("The reply %(msg_id)s cannot be sent  "
++                             "%(reply_q)s reply queue don't exist, "
++                             "retrying..." % {
++                                 'msg_id': self.msg_id,
++                                 'reply_q': self.reply_q})
++                    time.sleep(0.25)
++                else:
++                    LOG.info("** The reply %(msg_id)s cannot be sent  "
++                             "%(reply_q)s reply queue don't exist after "
++                             "%(duration)s sec abandoning..." % {
++                                 'msg_id': self.msg_id,
++                                 'reply_q': self.reply_q,
++                                 'duration': duration})
++                    return
+ 
+     def acknowledge(self):
+         self.listener.msg_id_cache.add(self.unique_id)
+@@ -313,6 +341,7 @@
+ 
+ 
+ class AMQPDriverBase(base.BaseDriver):
++    missing_destination_retry_timeout = 0
+ 
+     def __init__(self, conf, url, connection_pool,
+                  default_exchange=None, allowed_remote_exmods=[]):
+--- a/oslo/messaging/_drivers/impl_rabbit.py
++++ b/oslo/messaging/_drivers/impl_rabbit.py
+@@ -733,6 +733,10 @@
+         """Send to a publisher based on the publisher class."""
+ 
+         def _error_callback(exc):
++            if exc.code == 404:
++                raise rpc_amqp.AMQPDestinationNotFound(
++                    "exchange doesn't exists" )
++
+             log_info = {'topic': topic, 'err_str': str(exc)}
+             LOG.exception(_("Failed to publish message to topic "
+                           "'%(topic)s': %(err_str)s") % log_info)
+@@ -766,30 +770,8 @@
+     def direct_send(self, msg_id, msg):
+         """Send a 'direct' message."""
+ 
+-        timer = rpc_common.DecayingTimer(duration=60)
+-        timer.start()
+-        # NOTE(sileht): retry at least 60sec, after we have a good change
+-        # that the caller is really dead too...
+-
+-        while True:
+-            try:
+-                self.publisher_send(DirectPublisher, msg_id, msg)
+-            except self.connection.channel_errors as exc:
+-                # NOTE(noelbk/sileht):
+-                # If rabbit dies, the consumer can be disconnected before the
+-                # publisher sends, and if the consumer hasn't declared the
+-                # queue, the publisher's will send a message to an exchange
+-                # that's not bound to a queue, and the message wll be lost.
+-                # So we set passive=True to the publisher exchange and catch
+-                # the 404 kombu ChannelError and retry until the exchange
+-                # appears
+-                if exc.code == 404 and timer.check_return() > 0:
+-                    LOG.info(_("The exchange to reply to %s doesn't "
+-                               "exist yet, retrying...") % msg_id)
+-                    time.sleep(1)
+-                    continue
+-                raise
+-            return
++        self.publisher_send(DirectPublisher, msg_id, msg)
++        return
+ 
+     def topic_send(self, topic, msg, timeout=None):
+         """Send a 'topic' message."""
+@@ -823,6 +805,8 @@
+         conf.register_opts(rabbit_opts)
+         conf.register_opts(rpc_amqp.amqp_opts)
+ 
++        self.missing_destination_retry_timeout = 60
++
+         connection_pool = rpc_amqp.get_connection_pool(conf, Connection)
+ 
+         super(RabbitDriver, self).__init__(conf, url,

=== modified file 'debian/patches/series'
--- debian/patches/series	2015-07-13 15:59:14 +0000
+++ debian/patches/series	2015-12-31 04:46:54 +0000
@@ -4,3 +4,4 @@
 0003-Declare-DirectPublisher-exchanges-with-passive-True.patch
 redeclare-consumers-when-ack-requeue-fails.patch
 0004-fix-lp-1362863.patch
+0005-fix-rabbit-starvation-of-connections-for-reply.patch

=== modified file 'oslo/messaging/_drivers/amqp.py'
--- oslo/messaging/_drivers/amqp.py	2014-03-27 13:01:34 +0000
+++ oslo/messaging/_drivers/amqp.py	2015-12-31 04:46:54 +0000
@@ -340,3 +340,7 @@
 
 def get_control_exchange(conf):
     return conf.control_exchange
+
+
+class AMQPDestinationNotFound(Exception):
+    pass

=== modified file 'oslo/messaging/_drivers/amqpdriver.py'
--- oslo/messaging/_drivers/amqpdriver.py	2015-07-13 15:59:14 +0000
+++ oslo/messaging/_drivers/amqpdriver.py	2015-12-31 04:46:54 +0000
@@ -17,6 +17,7 @@
 
 import logging
 import threading
+import time
 import uuid
 
 from six import moves
@@ -68,9 +69,36 @@
             #    because reply should not be expected by caller side
             return
 
-        with self.listener.driver._get_connection() as conn:
-            self._send_reply(conn, reply, failure, log_failure=log_failure)
-            self._send_reply(conn, ending=True)
+        # NOTE(sileht): we read the configuration value from the driver
+        # to be able to backport this change in previous version that
+        # still have the qpid driver
+        duration = self.listener.driver.missing_destination_retry_timeout
+        timer = rpc_common.DecayingTimer(duration=duration)
+        timer.start()
+
+        while True:
+            try:
+                with self.listener.driver._get_connection() as conn:
+                    self._send_reply(conn, reply, failure,
+                                     log_failure=log_failure)
+                    self._send_reply(conn, ending=True)
+                return
+            except rpc_amqp.AMQPDestinationNotFound:
+                if timer.check_return() > 0:
+                    LOG.info("The reply %(msg_id)s cannot be sent  "
+                             "%(reply_q)s reply queue don't exist, "
+                             "retrying..." % {
+                                 'msg_id': self.msg_id,
+                                 'reply_q': self.reply_q})
+                    time.sleep(0.25)
+                else:
+                    LOG.info("** The reply %(msg_id)s cannot be sent  "
+                             "%(reply_q)s reply queue don't exist after "
+                             "%(duration)s sec abandoning..." % {
+                                 'msg_id': self.msg_id,
+                                 'reply_q': self.reply_q,
+                                 'duration': duration})
+                    return
 
     def acknowledge(self):
         self.listener.msg_id_cache.add(self.unique_id)
@@ -313,6 +341,7 @@
 
 
 class AMQPDriverBase(base.BaseDriver):
+    missing_destination_retry_timeout = 0
 
     def __init__(self, conf, url, connection_pool,
                  default_exchange=None, allowed_remote_exmods=[]):

=== modified file 'oslo/messaging/_drivers/impl_rabbit.py'
--- oslo/messaging/_drivers/impl_rabbit.py	2015-07-13 15:59:14 +0000
+++ oslo/messaging/_drivers/impl_rabbit.py	2015-12-31 04:46:54 +0000
@@ -733,6 +733,10 @@
         """Send to a publisher based on the publisher class."""
 
         def _error_callback(exc):
+            if exc.code == 404:
+                raise rpc_amqp.AMQPDestinationNotFound(
+                    "exchange doesn't exists" )
+
             log_info = {'topic': topic, 'err_str': str(exc)}
             LOG.exception(_("Failed to publish message to topic "
                           "'%(topic)s': %(err_str)s") % log_info)
@@ -766,30 +770,8 @@
     def direct_send(self, msg_id, msg):
         """Send a 'direct' message."""
 
-        timer = rpc_common.DecayingTimer(duration=60)
-        timer.start()
-        # NOTE(sileht): retry at least 60sec, after we have a good change
-        # that the caller is really dead too...
-
-        while True:
-            try:
-                self.publisher_send(DirectPublisher, msg_id, msg)
-            except self.connection.channel_errors as exc:
-                # NOTE(noelbk/sileht):
-                # If rabbit dies, the consumer can be disconnected before the
-                # publisher sends, and if the consumer hasn't declared the
-                # queue, the publisher's will send a message to an exchange
-                # that's not bound to a queue, and the message wll be lost.
-                # So we set passive=True to the publisher exchange and catch
-                # the 404 kombu ChannelError and retry until the exchange
-                # appears
-                if exc.code == 404 and timer.check_return() > 0:
-                    LOG.info(_("The exchange to reply to %s doesn't "
-                               "exist yet, retrying...") % msg_id)
-                    time.sleep(1)
-                    continue
-                raise
-            return
+        self.publisher_send(DirectPublisher, msg_id, msg)
+        return
 
     def topic_send(self, topic, msg, timeout=None):
         """Send a 'topic' message."""
@@ -823,6 +805,8 @@
         conf.register_opts(rabbit_opts)
         conf.register_opts(rpc_amqp.amqp_opts)
 
+        self.missing_destination_retry_timeout = 60
+
         connection_pool = rpc_amqp.get_connection_pool(conf, Connection)
 
         super(RabbitDriver, self).__init__(conf, url,

