~ubuntu-branches/ubuntu/raring/nova/raring-proposed

« back to all changes in this revision

Viewing changes to nova/openstack/common/rpc/impl_qpid.py

  • Committer: Package Import Robot
  • Author(s): Adam Gandelman
  • Date: 2013-08-09 10:12:27 UTC
  • mto: This revision was merged to the branch mainline in revision 107.
  • Revision ID: package-import@ubuntu.com-20130809101227-flqfubhwpot76pob
Tags: upstream-2013.1.3
ImportĀ upstreamĀ versionĀ 2013.1.3

Show diffs side-by-side

added added

removed removed

Lines of Context:
69
69
 
70
70
cfg.CONF.register_opts(qpid_opts)
71
71
 
 
72
JSON_CONTENT_TYPE = 'application/json; charset=utf8'
 
73
 
72
74
 
73
75
class ConsumerBase(object):
74
76
    """Consumer base class."""
123
125
        self.receiver = session.receiver(self.address)
124
126
        self.receiver.capacity = 1
125
127
 
 
128
    def _unpack_json_msg(self, msg):
 
129
        """Load the JSON data in msg if msg.content_type indicates that it
 
130
           is necessary.  Put the loaded data back into msg.content and
 
131
           update msg.content_type appropriately.
 
132
 
 
133
        A Qpid Message containing a dict will have a content_type of
 
134
        'amqp/map', whereas one containing a string that needs to be converted
 
135
        back from JSON will have a content_type of JSON_CONTENT_TYPE.
 
136
 
 
137
        :param msg: a Qpid Message object
 
138
        :returns: None
 
139
        """
 
140
        if msg.content_type == JSON_CONTENT_TYPE:
 
141
            msg.content = jsonutils.loads(msg.content)
 
142
            msg.content_type = 'amqp/map'
 
143
 
126
144
    def consume(self):
127
145
        """Fetch the message and pass it to the callback object"""
128
146
        message = self.receiver.fetch()
129
147
        try:
 
148
            self._unpack_json_msg(message)
130
149
            msg = rpc_common.deserialize_msg(message.content)
131
150
            self.callback(msg)
132
151
        except Exception:
331
350
 
332
351
    def reconnect(self):
333
352
        """Handles reconnecting and re-establishing sessions and queues"""
334
 
        if self.connection.opened():
335
 
            try:
336
 
                self.connection.close()
337
 
            except qpid_exceptions.ConnectionError:
338
 
                pass
339
 
 
340
353
        attempt = 0
341
354
        delay = 1
342
355
        while True:
 
356
            # Close the session if necessary
 
357
            if self.connection.opened():
 
358
                try:
 
359
                    self.connection.close()
 
360
                except qpid_exceptions.ConnectionError:
 
361
                    pass
 
362
 
343
363
            broker = self.brokers[attempt % len(self.brokers)]
344
364
            attempt += 1
345
365
 
383
403
        """Close/release this connection"""
384
404
        self.cancel_consumer_thread()
385
405
        self.wait_on_proxy_callbacks()
386
 
        self.connection.close()
 
406
        try:
 
407
            self.connection.close()
 
408
        except Exception:
 
409
            # NOTE(dripton) Logging exceptions that happen during cleanup just
 
410
            # causes confusion; there's really nothing useful we can do with
 
411
            # them.
 
412
            pass
387
413
        self.connection = None
388
414
 
389
415
    def reset(self):