~andrewjbeach/juju-ci-tools/make-local-patcher

« back to all changes in this revision

Viewing changes to azure-sdk-for-python-master/azure/servicebus/servicebusservice.py

  • Committer: Aaron Bentley
  • Date: 2015-06-15 19:04:10 UTC
  • mfrom: (976.2.4 fix-log-rotation)
  • Revision ID: aaron.bentley@canonical.com-20150615190410-vvhtl7yxn0xbtbiy
Fix error handling in assess_log_rotation.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#-------------------------------------------------------------------------
 
2
# Copyright (c) Microsoft.  All rights reserved.
 
3
#
 
4
# Licensed under the Apache License, Version 2.0 (the "License");
 
5
# you may not use this file except in compliance with the License.
 
6
# You may obtain a copy of the License at
 
7
#   http://www.apache.org/licenses/LICENSE-2.0
 
8
#
 
9
# Unless required by applicable law or agreed to in writing, software
 
10
# distributed under the License is distributed on an "AS IS" BASIS,
 
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
12
# See the License for the specific language governing permissions and
 
13
# limitations under the License.
 
14
#--------------------------------------------------------------------------
 
15
import os
 
16
import time
 
17
 
 
18
from azure import (
 
19
    WindowsAzureError,
 
20
    SERVICE_BUS_HOST_BASE,
 
21
    _convert_response_to_feeds,
 
22
    _dont_fail_not_exist,
 
23
    _dont_fail_on_exist,
 
24
    _get_request_body,
 
25
    _get_request_body_bytes_only,
 
26
    _int_or_none,
 
27
    _str,
 
28
    _update_request_uri_query,
 
29
    url_quote,
 
30
    url_unquote,
 
31
    _validate_not_none,
 
32
    )
 
33
from azure.http import (
 
34
    HTTPError,
 
35
    HTTPRequest,
 
36
    )
 
37
from azure.http.httpclient import _HTTPClient
 
38
from azure.servicebus import (
 
39
    AZURE_SERVICEBUS_NAMESPACE,
 
40
    AZURE_SERVICEBUS_ACCESS_KEY,
 
41
    AZURE_SERVICEBUS_ISSUER,
 
42
    _convert_topic_to_xml,
 
43
    _convert_response_to_topic,
 
44
    _convert_queue_to_xml,
 
45
    _convert_response_to_queue,
 
46
    _convert_subscription_to_xml,
 
47
    _convert_response_to_subscription,
 
48
    _convert_rule_to_xml,
 
49
    _convert_response_to_rule,
 
50
    _convert_xml_to_queue,
 
51
    _convert_xml_to_topic,
 
52
    _convert_xml_to_subscription,
 
53
    _convert_xml_to_rule,
 
54
    _create_message,
 
55
    _service_bus_error_handler,
 
56
    )
 
57
 
 
58
# Token cache for Authentication
 
59
# Shared by the different instances of ServiceBusService
 
60
_tokens = {}
 
61
 
 
62
 
 
63
class ServiceBusService(object):
 
64
 
 
65
    def __init__(self, service_namespace=None, account_key=None, issuer=None,
 
66
                 x_ms_version='2011-06-01', host_base=SERVICE_BUS_HOST_BASE):
 
67
        # x_ms_version is not used, but the parameter is kept for backwards
 
68
        # compatibility
 
69
        self.requestid = None
 
70
        self.service_namespace = service_namespace
 
71
        self.account_key = account_key
 
72
        self.issuer = issuer
 
73
        self.host_base = host_base
 
74
 
 
75
        # Get service namespace, account key and issuer.
 
76
        # If they are set when constructing, then use them, else find them
 
77
        # from environment variables.
 
78
        if not self.service_namespace:
 
79
            self.service_namespace = os.environ.get(AZURE_SERVICEBUS_NAMESPACE)
 
80
        if not self.account_key:
 
81
            self.account_key = os.environ.get(AZURE_SERVICEBUS_ACCESS_KEY)
 
82
        if not self.issuer:
 
83
            self.issuer = os.environ.get(AZURE_SERVICEBUS_ISSUER)
 
84
 
 
85
        if not self.service_namespace or \
 
86
           not self.account_key or not self.issuer:
 
87
            raise WindowsAzureError(
 
88
                'You need to provide servicebus namespace, access key and Issuer')
 
89
 
 
90
        self._httpclient = _HTTPClient(service_instance=self,
 
91
                                       service_namespace=self.service_namespace,
 
92
                                       account_key=self.account_key,
 
93
                                       issuer=self.issuer)
 
94
        self._filter = self._httpclient.perform_request
 
95
 
 
96
    def with_filter(self, filter):
 
97
        '''
 
98
        Returns a new service which will process requests with the specified
 
99
        filter.  Filtering operations can include logging, automatic retrying,
 
100
        etc...  The filter is a lambda which receives the HTTPRequest and
 
101
        another lambda.  The filter can perform any pre-processing on the
 
102
        request, pass it off to the next lambda, and then perform any
 
103
        post-processing on the response.
 
104
        '''
 
105
        res = ServiceBusService(self.service_namespace, self.account_key,
 
106
                                self.issuer)
 
107
        old_filter = self._filter
 
108
 
 
109
        def new_filter(request):
 
110
            return filter(request, old_filter)
 
111
 
 
112
        res._filter = new_filter
 
113
        return res
 
114
 
 
115
    def set_proxy(self, host, port, user=None, password=None):
 
116
        '''
 
117
        Sets the proxy server host and port for the HTTP CONNECT Tunnelling.
 
118
 
 
119
        host: Address of the proxy. Ex: '192.168.0.100'
 
120
        port: Port of the proxy. Ex: 6000
 
121
        user: User for proxy authorization.
 
122
        password: Password for proxy authorization.
 
123
        '''
 
124
        self._httpclient.set_proxy(host, port, user, password)
 
125
 
 
126
    def create_queue(self, queue_name, queue=None, fail_on_exist=False):
 
127
        '''
 
128
        Creates a new queue. Once created, this queue's resource manifest is
 
129
        immutable.
 
130
 
 
131
        queue_name: Name of the queue to create.
 
132
        queue: Queue object to create.
 
133
        fail_on_exist:
 
134
            Specify whether to throw an exception when the queue exists.
 
135
        '''
 
136
        _validate_not_none('queue_name', queue_name)
 
137
        request = HTTPRequest()
 
138
        request.method = 'PUT'
 
139
        request.host = self._get_host()
 
140
        request.path = '/' + _str(queue_name) + ''
 
141
        request.body = _get_request_body(_convert_queue_to_xml(queue))
 
142
        request.path, request.query = _update_request_uri_query(request)
 
143
        request.headers = self._update_service_bus_header(request)
 
144
        if not fail_on_exist:
 
145
            try:
 
146
                self._perform_request(request)
 
147
                return True
 
148
            except WindowsAzureError as ex:
 
149
                _dont_fail_on_exist(ex)
 
150
                return False
 
151
        else:
 
152
            self._perform_request(request)
 
153
            return True
 
154
 
 
155
    def delete_queue(self, queue_name, fail_not_exist=False):
 
156
        '''
 
157
        Deletes an existing queue. This operation will also remove all
 
158
        associated state including messages in the queue.
 
159
 
 
160
        queue_name: Name of the queue to delete.
 
161
        fail_not_exist:
 
162
            Specify whether to throw an exception if the queue doesn't exist.
 
163
        '''
 
164
        _validate_not_none('queue_name', queue_name)
 
165
        request = HTTPRequest()
 
166
        request.method = 'DELETE'
 
167
        request.host = self._get_host()
 
168
        request.path = '/' + _str(queue_name) + ''
 
169
        request.path, request.query = _update_request_uri_query(request)
 
170
        request.headers = self._update_service_bus_header(request)
 
171
        if not fail_not_exist:
 
172
            try:
 
173
                self._perform_request(request)
 
174
                return True
 
175
            except WindowsAzureError as ex:
 
176
                _dont_fail_not_exist(ex)
 
177
                return False
 
178
        else:
 
179
            self._perform_request(request)
 
180
            return True
 
181
 
 
182
    def get_queue(self, queue_name):
 
183
        '''
 
184
        Retrieves an existing queue.
 
185
 
 
186
        queue_name: Name of the queue.
 
187
        '''
 
188
        _validate_not_none('queue_name', queue_name)
 
189
        request = HTTPRequest()
 
190
        request.method = 'GET'
 
191
        request.host = self._get_host()
 
192
        request.path = '/' + _str(queue_name) + ''
 
193
        request.path, request.query = _update_request_uri_query(request)
 
194
        request.headers = self._update_service_bus_header(request)
 
195
        response = self._perform_request(request)
 
196
 
 
197
        return _convert_response_to_queue(response)
 
198
 
 
199
    def list_queues(self):
 
200
        '''
 
201
        Enumerates the queues in the service namespace.
 
202
        '''
 
203
        request = HTTPRequest()
 
204
        request.method = 'GET'
 
205
        request.host = self._get_host()
 
206
        request.path = '/$Resources/Queues'
 
207
        request.path, request.query = _update_request_uri_query(request)
 
208
        request.headers = self._update_service_bus_header(request)
 
209
        response = self._perform_request(request)
 
210
 
 
211
        return _convert_response_to_feeds(response, _convert_xml_to_queue)
 
212
 
 
213
    def create_topic(self, topic_name, topic=None, fail_on_exist=False):
 
214
        '''
 
215
        Creates a new topic. Once created, this topic resource manifest is
 
216
        immutable.
 
217
 
 
218
        topic_name: Name of the topic to create.
 
219
        topic: Topic object to create.
 
220
        fail_on_exist:
 
221
            Specify whether to throw an exception when the topic exists.
 
222
        '''
 
223
        _validate_not_none('topic_name', topic_name)
 
224
        request = HTTPRequest()
 
225
        request.method = 'PUT'
 
226
        request.host = self._get_host()
 
227
        request.path = '/' + _str(topic_name) + ''
 
228
        request.body = _get_request_body(_convert_topic_to_xml(topic))
 
229
        request.path, request.query = _update_request_uri_query(request)
 
230
        request.headers = self._update_service_bus_header(request)
 
231
        if not fail_on_exist:
 
232
            try:
 
233
                self._perform_request(request)
 
234
                return True
 
235
            except WindowsAzureError as ex:
 
236
                _dont_fail_on_exist(ex)
 
237
                return False
 
238
        else:
 
239
            self._perform_request(request)
 
240
            return True
 
241
 
 
242
    def delete_topic(self, topic_name, fail_not_exist=False):
 
243
        '''
 
244
        Deletes an existing topic. This operation will also remove all
 
245
        associated state including associated subscriptions.
 
246
 
 
247
        topic_name: Name of the topic to delete.
 
248
        fail_not_exist:
 
249
            Specify whether throw exception when topic doesn't exist.
 
250
        '''
 
251
        _validate_not_none('topic_name', topic_name)
 
252
        request = HTTPRequest()
 
253
        request.method = 'DELETE'
 
254
        request.host = self._get_host()
 
255
        request.path = '/' + _str(topic_name) + ''
 
256
        request.path, request.query = _update_request_uri_query(request)
 
257
        request.headers = self._update_service_bus_header(request)
 
258
        if not fail_not_exist:
 
259
            try:
 
260
                self._perform_request(request)
 
261
                return True
 
262
            except WindowsAzureError as ex:
 
263
                _dont_fail_not_exist(ex)
 
264
                return False
 
265
        else:
 
266
            self._perform_request(request)
 
267
            return True
 
268
 
 
269
    def get_topic(self, topic_name):
 
270
        '''
 
271
        Retrieves the description for the specified topic.
 
272
 
 
273
        topic_name: Name of the topic.
 
274
        '''
 
275
        _validate_not_none('topic_name', topic_name)
 
276
        request = HTTPRequest()
 
277
        request.method = 'GET'
 
278
        request.host = self._get_host()
 
279
        request.path = '/' + _str(topic_name) + ''
 
280
        request.path, request.query = _update_request_uri_query(request)
 
281
        request.headers = self._update_service_bus_header(request)
 
282
        response = self._perform_request(request)
 
283
 
 
284
        return _convert_response_to_topic(response)
 
285
 
 
286
    def list_topics(self):
 
287
        '''
 
288
        Retrieves the topics in the service namespace.
 
289
        '''
 
290
        request = HTTPRequest()
 
291
        request.method = 'GET'
 
292
        request.host = self._get_host()
 
293
        request.path = '/$Resources/Topics'
 
294
        request.path, request.query = _update_request_uri_query(request)
 
295
        request.headers = self._update_service_bus_header(request)
 
296
        response = self._perform_request(request)
 
297
 
 
298
        return _convert_response_to_feeds(response, _convert_xml_to_topic)
 
299
 
 
300
    def create_rule(self, topic_name, subscription_name, rule_name, rule=None,
 
301
                    fail_on_exist=False):
 
302
        '''
 
303
        Creates a new rule. Once created, this rule's resource manifest is
 
304
        immutable.
 
305
 
 
306
        topic_name: Name of the topic.
 
307
        subscription_name: Name of the subscription.
 
308
        rule_name: Name of the rule.
 
309
        fail_on_exist:
 
310
            Specify whether to throw an exception when the rule exists.
 
311
        '''
 
312
        _validate_not_none('topic_name', topic_name)
 
313
        _validate_not_none('subscription_name', subscription_name)
 
314
        _validate_not_none('rule_name', rule_name)
 
315
        request = HTTPRequest()
 
316
        request.method = 'PUT'
 
317
        request.host = self._get_host()
 
318
        request.path = '/' + _str(topic_name) + '/subscriptions/' + \
 
319
            _str(subscription_name) + \
 
320
            '/rules/' + _str(rule_name) + ''
 
321
        request.body = _get_request_body(_convert_rule_to_xml(rule))
 
322
        request.path, request.query = _update_request_uri_query(request)
 
323
        request.headers = self._update_service_bus_header(request)
 
324
        if not fail_on_exist:
 
325
            try:
 
326
                self._perform_request(request)
 
327
                return True
 
328
            except WindowsAzureError as ex:
 
329
                _dont_fail_on_exist(ex)
 
330
                return False
 
331
        else:
 
332
            self._perform_request(request)
 
333
            return True
 
334
 
 
335
    def delete_rule(self, topic_name, subscription_name, rule_name,
 
336
                    fail_not_exist=False):
 
337
        '''
 
338
        Deletes an existing rule.
 
339
 
 
340
        topic_name: Name of the topic.
 
341
        subscription_name: Name of the subscription.
 
342
        rule_name:
 
343
            Name of the rule to delete.  DEFAULT_RULE_NAME=$Default.
 
344
            Use DEFAULT_RULE_NAME to delete default rule for the subscription.
 
345
        fail_not_exist:
 
346
            Specify whether throw exception when rule doesn't exist.
 
347
        '''
 
348
        _validate_not_none('topic_name', topic_name)
 
349
        _validate_not_none('subscription_name', subscription_name)
 
350
        _validate_not_none('rule_name', rule_name)
 
351
        request = HTTPRequest()
 
352
        request.method = 'DELETE'
 
353
        request.host = self._get_host()
 
354
        request.path = '/' + _str(topic_name) + '/subscriptions/' + \
 
355
            _str(subscription_name) + \
 
356
            '/rules/' + _str(rule_name) + ''
 
357
        request.path, request.query = _update_request_uri_query(request)
 
358
        request.headers = self._update_service_bus_header(request)
 
359
        if not fail_not_exist:
 
360
            try:
 
361
                self._perform_request(request)
 
362
                return True
 
363
            except WindowsAzureError as ex:
 
364
                _dont_fail_not_exist(ex)
 
365
                return False
 
366
        else:
 
367
            self._perform_request(request)
 
368
            return True
 
369
 
 
370
    def get_rule(self, topic_name, subscription_name, rule_name):
 
371
        '''
 
372
        Retrieves the description for the specified rule.
 
373
 
 
374
        topic_name: Name of the topic.
 
375
        subscription_name: Name of the subscription.
 
376
        rule_name: Name of the rule.
 
377
        '''
 
378
        _validate_not_none('topic_name', topic_name)
 
379
        _validate_not_none('subscription_name', subscription_name)
 
380
        _validate_not_none('rule_name', rule_name)
 
381
        request = HTTPRequest()
 
382
        request.method = 'GET'
 
383
        request.host = self._get_host()
 
384
        request.path = '/' + _str(topic_name) + '/subscriptions/' + \
 
385
            _str(subscription_name) + \
 
386
            '/rules/' + _str(rule_name) + ''
 
387
        request.path, request.query = _update_request_uri_query(request)
 
388
        request.headers = self._update_service_bus_header(request)
 
389
        response = self._perform_request(request)
 
390
 
 
391
        return _convert_response_to_rule(response)
 
392
 
 
393
    def list_rules(self, topic_name, subscription_name):
 
394
        '''
 
395
        Retrieves the rules that exist under the specified subscription.
 
396
 
 
397
        topic_name: Name of the topic.
 
398
        subscription_name: Name of the subscription.
 
399
        '''
 
400
        _validate_not_none('topic_name', topic_name)
 
401
        _validate_not_none('subscription_name', subscription_name)
 
402
        request = HTTPRequest()
 
403
        request.method = 'GET'
 
404
        request.host = self._get_host()
 
405
        request.path = '/' + \
 
406
            _str(topic_name) + '/subscriptions/' + \
 
407
            _str(subscription_name) + '/rules/'
 
408
        request.path, request.query = _update_request_uri_query(request)
 
409
        request.headers = self._update_service_bus_header(request)
 
410
        response = self._perform_request(request)
 
411
 
 
412
        return _convert_response_to_feeds(response, _convert_xml_to_rule)
 
413
 
 
414
    def create_subscription(self, topic_name, subscription_name,
 
415
                            subscription=None, fail_on_exist=False):
 
416
        '''
 
417
        Creates a new subscription. Once created, this subscription resource
 
418
        manifest is immutable.
 
419
 
 
420
        topic_name: Name of the topic.
 
421
        subscription_name: Name of the subscription.
 
422
        fail_on_exist:
 
423
            Specify whether throw exception when subscription exists.
 
424
        '''
 
425
        _validate_not_none('topic_name', topic_name)
 
426
        _validate_not_none('subscription_name', subscription_name)
 
427
        request = HTTPRequest()
 
428
        request.method = 'PUT'
 
429
        request.host = self._get_host()
 
430
        request.path = '/' + \
 
431
            _str(topic_name) + '/subscriptions/' + _str(subscription_name) + ''
 
432
        request.body = _get_request_body(
 
433
            _convert_subscription_to_xml(subscription))
 
434
        request.path, request.query = _update_request_uri_query(request)
 
435
        request.headers = self._update_service_bus_header(request)
 
436
        if not fail_on_exist:
 
437
            try:
 
438
                self._perform_request(request)
 
439
                return True
 
440
            except WindowsAzureError as ex:
 
441
                _dont_fail_on_exist(ex)
 
442
                return False
 
443
        else:
 
444
            self._perform_request(request)
 
445
            return True
 
446
 
 
447
    def delete_subscription(self, topic_name, subscription_name,
 
448
                            fail_not_exist=False):
 
449
        '''
 
450
        Deletes an existing subscription.
 
451
 
 
452
        topic_name: Name of the topic.
 
453
        subscription_name: Name of the subscription to delete.
 
454
        fail_not_exist:
 
455
            Specify whether to throw an exception when the subscription
 
456
            doesn't exist.
 
457
        '''
 
458
        _validate_not_none('topic_name', topic_name)
 
459
        _validate_not_none('subscription_name', subscription_name)
 
460
        request = HTTPRequest()
 
461
        request.method = 'DELETE'
 
462
        request.host = self._get_host()
 
463
        request.path = '/' + \
 
464
            _str(topic_name) + '/subscriptions/' + _str(subscription_name) + ''
 
465
        request.path, request.query = _update_request_uri_query(request)
 
466
        request.headers = self._update_service_bus_header(request)
 
467
        if not fail_not_exist:
 
468
            try:
 
469
                self._perform_request(request)
 
470
                return True
 
471
            except WindowsAzureError as ex:
 
472
                _dont_fail_not_exist(ex)
 
473
                return False
 
474
        else:
 
475
            self._perform_request(request)
 
476
            return True
 
477
 
 
478
    def get_subscription(self, topic_name, subscription_name):
 
479
        '''
 
480
        Gets an existing subscription.
 
481
 
 
482
        topic_name: Name of the topic.
 
483
        subscription_name: Name of the subscription.
 
484
        '''
 
485
        _validate_not_none('topic_name', topic_name)
 
486
        _validate_not_none('subscription_name', subscription_name)
 
487
        request = HTTPRequest()
 
488
        request.method = 'GET'
 
489
        request.host = self._get_host()
 
490
        request.path = '/' + \
 
491
            _str(topic_name) + '/subscriptions/' + _str(subscription_name) + ''
 
492
        request.path, request.query = _update_request_uri_query(request)
 
493
        request.headers = self._update_service_bus_header(request)
 
494
        response = self._perform_request(request)
 
495
 
 
496
        return _convert_response_to_subscription(response)
 
497
 
 
498
    def list_subscriptions(self, topic_name):
 
499
        '''
 
500
        Retrieves the subscriptions in the specified topic.
 
501
 
 
502
        topic_name: Name of the topic.
 
503
        '''
 
504
        _validate_not_none('topic_name', topic_name)
 
505
        request = HTTPRequest()
 
506
        request.method = 'GET'
 
507
        request.host = self._get_host()
 
508
        request.path = '/' + _str(topic_name) + '/subscriptions/'
 
509
        request.path, request.query = _update_request_uri_query(request)
 
510
        request.headers = self._update_service_bus_header(request)
 
511
        response = self._perform_request(request)
 
512
 
 
513
        return _convert_response_to_feeds(response,
 
514
                                          _convert_xml_to_subscription)
 
515
 
 
516
    def send_topic_message(self, topic_name, message=None):
 
517
        '''
 
518
        Enqueues a message into the specified topic. The limit to the number
 
519
        of messages which may be present in the topic is governed by the
 
520
        message size in MaxTopicSizeInBytes. If this message causes the topic
 
521
        to exceed its quota, a quota exceeded error is returned and the
 
522
        message will be rejected.
 
523
 
 
524
        topic_name: Name of the topic.
 
525
        message: Message object containing message body and properties.
 
526
        '''
 
527
        _validate_not_none('topic_name', topic_name)
 
528
        _validate_not_none('message', message)
 
529
        request = HTTPRequest()
 
530
        request.method = 'POST'
 
531
        request.host = self._get_host()
 
532
        request.path = '/' + _str(topic_name) + '/messages'
 
533
        request.headers = message.add_headers(request)
 
534
        request.body = _get_request_body_bytes_only(
 
535
            'message.body', message.body)
 
536
        request.path, request.query = _update_request_uri_query(request)
 
537
        request.headers = self._update_service_bus_header(request)
 
538
        self._perform_request(request)
 
539
 
 
540
    def peek_lock_subscription_message(self, topic_name, subscription_name,
 
541
                                       timeout='60'):
 
542
        '''
 
543
        This operation is used to atomically retrieve and lock a message for
 
544
        processing. The message is guaranteed not to be delivered to other
 
545
        receivers during the lock duration period specified in buffer
 
546
        description. Once the lock expires, the message will be available to
 
547
        other receivers (on the same subscription only) during the lock
 
548
        duration period specified in the topic description. Once the lock
 
549
        expires, the message will be available to other receivers. In order to
 
550
        complete processing of the message, the receiver should issue a delete
 
551
        command with the lock ID received from this operation. To abandon
 
552
        processing of the message and unlock it for other receivers, an Unlock
 
553
        Message command should be issued, or the lock duration period can
 
554
        expire.
 
555
 
 
556
        topic_name: Name of the topic.
 
557
        subscription_name: Name of the subscription.
 
558
        timeout: Optional. The timeout parameter is expressed in seconds.
 
559
        '''
 
560
        _validate_not_none('topic_name', topic_name)
 
561
        _validate_not_none('subscription_name', subscription_name)
 
562
        request = HTTPRequest()
 
563
        request.method = 'POST'
 
564
        request.host = self._get_host()
 
565
        request.path = '/' + \
 
566
            _str(topic_name) + '/subscriptions/' + \
 
567
            _str(subscription_name) + '/messages/head'
 
568
        request.query = [('timeout', _int_or_none(timeout))]
 
569
        request.path, request.query = _update_request_uri_query(request)
 
570
        request.headers = self._update_service_bus_header(request)
 
571
        response = self._perform_request(request)
 
572
 
 
573
        return _create_message(response, self)
 
574
 
 
575
    def unlock_subscription_message(self, topic_name, subscription_name,
 
576
                                    sequence_number, lock_token):
 
577
        '''
 
578
        Unlock a message for processing by other receivers on a given
 
579
        subscription. This operation deletes the lock object, causing the
 
580
        message to be unlocked. A message must have first been locked by a
 
581
        receiver before this operation is called.
 
582
 
 
583
        topic_name: Name of the topic.
 
584
        subscription_name: Name of the subscription.
 
585
        sequence_number:
 
586
            The sequence number of the message to be unlocked as returned in
 
587
            BrokerProperties['SequenceNumber'] by the Peek Message operation.
 
588
        lock_token:
 
589
            The ID of the lock as returned by the Peek Message operation in
 
590
            BrokerProperties['LockToken']
 
591
        '''
 
592
        _validate_not_none('topic_name', topic_name)
 
593
        _validate_not_none('subscription_name', subscription_name)
 
594
        _validate_not_none('sequence_number', sequence_number)
 
595
        _validate_not_none('lock_token', lock_token)
 
596
        request = HTTPRequest()
 
597
        request.method = 'PUT'
 
598
        request.host = self._get_host()
 
599
        request.path = '/' + _str(topic_name) + \
 
600
                       '/subscriptions/' + str(subscription_name) + \
 
601
                       '/messages/' + _str(sequence_number) + \
 
602
                       '/' + _str(lock_token) + ''
 
603
        request.path, request.query = _update_request_uri_query(request)
 
604
        request.headers = self._update_service_bus_header(request)
 
605
        self._perform_request(request)
 
606
 
 
607
    def read_delete_subscription_message(self, topic_name, subscription_name,
 
608
                                         timeout='60'):
 
609
        '''
 
610
        Read and delete a message from a subscription as an atomic operation.
 
611
        This operation should be used when a best-effort guarantee is
 
612
        sufficient for an application; that is, using this operation it is
 
613
        possible for messages to be lost if processing fails.
 
614
 
 
615
        topic_name: Name of the topic.
 
616
        subscription_name: Name of the subscription.
 
617
        timeout: Optional. The timeout parameter is expressed in seconds.
 
618
        '''
 
619
        _validate_not_none('topic_name', topic_name)
 
620
        _validate_not_none('subscription_name', subscription_name)
 
621
        request = HTTPRequest()
 
622
        request.method = 'DELETE'
 
623
        request.host = self._get_host()
 
624
        request.path = '/' + _str(topic_name) + \
 
625
                       '/subscriptions/' + _str(subscription_name) + \
 
626
                       '/messages/head'
 
627
        request.query = [('timeout', _int_or_none(timeout))]
 
628
        request.path, request.query = _update_request_uri_query(request)
 
629
        request.headers = self._update_service_bus_header(request)
 
630
        response = self._perform_request(request)
 
631
 
 
632
        return _create_message(response, self)
 
633
 
 
634
    def delete_subscription_message(self, topic_name, subscription_name,
 
635
                                    sequence_number, lock_token):
 
636
        '''
 
637
        Completes processing on a locked message and delete it from the
 
638
        subscription. This operation should only be called after processing a
 
639
        previously locked message is successful to maintain At-Least-Once
 
640
        delivery assurances.
 
641
 
 
642
        topic_name: Name of the topic.
 
643
        subscription_name: Name of the subscription.
 
644
        sequence_number:
 
645
            The sequence number of the message to be deleted as returned in
 
646
            BrokerProperties['SequenceNumber'] by the Peek Message operation.
 
647
        lock_token:
 
648
            The ID of the lock as returned by the Peek Message operation in
 
649
            BrokerProperties['LockToken']
 
650
        '''
 
651
        _validate_not_none('topic_name', topic_name)
 
652
        _validate_not_none('subscription_name', subscription_name)
 
653
        _validate_not_none('sequence_number', sequence_number)
 
654
        _validate_not_none('lock_token', lock_token)
 
655
        request = HTTPRequest()
 
656
        request.method = 'DELETE'
 
657
        request.host = self._get_host()
 
658
        request.path = '/' + _str(topic_name) + \
 
659
                       '/subscriptions/' + _str(subscription_name) + \
 
660
                       '/messages/' + _str(sequence_number) + \
 
661
                       '/' + _str(lock_token) + ''
 
662
        request.path, request.query = _update_request_uri_query(request)
 
663
        request.headers = self._update_service_bus_header(request)
 
664
        self._perform_request(request)
 
665
 
 
666
    def send_queue_message(self, queue_name, message=None):
 
667
        '''
 
668
        Sends a message into the specified queue. The limit to the number of
 
669
        messages which may be present in the topic is governed by the message
 
670
        size the MaxTopicSizeInMegaBytes. If this message will cause the queue
 
671
        to exceed its quota, a quota exceeded error is returned and the
 
672
        message will be rejected.
 
673
 
 
674
        queue_name: Name of the queue.
 
675
        message: Message object containing message body and properties.
 
676
        '''
 
677
        _validate_not_none('queue_name', queue_name)
 
678
        _validate_not_none('message', message)
 
679
        request = HTTPRequest()
 
680
        request.method = 'POST'
 
681
        request.host = self._get_host()
 
682
        request.path = '/' + _str(queue_name) + '/messages'
 
683
        request.headers = message.add_headers(request)
 
684
        request.body = _get_request_body_bytes_only('message.body',
 
685
                                                    message.body)
 
686
        request.path, request.query = _update_request_uri_query(request)
 
687
        request.headers = self._update_service_bus_header(request)
 
688
        self._perform_request(request)
 
689
 
 
690
    def peek_lock_queue_message(self, queue_name, timeout='60'):
 
691
        '''
 
692
        Automically retrieves and locks a message from a queue for processing.
 
693
        The message is guaranteed not to be delivered to other receivers (on
 
694
        the same subscription only) during the lock duration period specified
 
695
        in the queue description. Once the lock expires, the message will be
 
696
        available to other receivers. In order to complete processing of the
 
697
        message, the receiver should issue a delete command with the lock ID
 
698
        received from this operation. To abandon processing of the message and
 
699
        unlock it for other receivers, an Unlock Message command should be
 
700
        issued, or the lock duration period can expire.
 
701
 
 
702
        queue_name: Name of the queue.
 
703
        timeout: Optional. The timeout parameter is expressed in seconds.
 
704
        '''
 
705
        _validate_not_none('queue_name', queue_name)
 
706
        request = HTTPRequest()
 
707
        request.method = 'POST'
 
708
        request.host = self._get_host()
 
709
        request.path = '/' + _str(queue_name) + '/messages/head'
 
710
        request.query = [('timeout', _int_or_none(timeout))]
 
711
        request.path, request.query = _update_request_uri_query(request)
 
712
        request.headers = self._update_service_bus_header(request)
 
713
        response = self._perform_request(request)
 
714
 
 
715
        return _create_message(response, self)
 
716
 
 
717
    def unlock_queue_message(self, queue_name, sequence_number, lock_token):
 
718
        '''
 
719
        Unlocks a message for processing by other receivers on a given
 
720
        subscription. This operation deletes the lock object, causing the
 
721
        message to be unlocked. A message must have first been locked by a
 
722
        receiver before this operation is called.
 
723
 
 
724
        queue_name: Name of the queue.
 
725
        sequence_number:
 
726
            The sequence number of the message to be unlocked as returned in
 
727
            BrokerProperties['SequenceNumber'] by the Peek Message operation.
 
728
        lock_token:
 
729
            The ID of the lock as returned by the Peek Message operation in
 
730
            BrokerProperties['LockToken']
 
731
        '''
 
732
        _validate_not_none('queue_name', queue_name)
 
733
        _validate_not_none('sequence_number', sequence_number)
 
734
        _validate_not_none('lock_token', lock_token)
 
735
        request = HTTPRequest()
 
736
        request.method = 'PUT'
 
737
        request.host = self._get_host()
 
738
        request.path = '/' + _str(queue_name) + \
 
739
                       '/messages/' + _str(sequence_number) + \
 
740
                       '/' + _str(lock_token) + ''
 
741
        request.path, request.query = _update_request_uri_query(request)
 
742
        request.headers = self._update_service_bus_header(request)
 
743
        self._perform_request(request)
 
744
 
 
745
    def read_delete_queue_message(self, queue_name, timeout='60'):
 
746
        '''
 
747
        Reads and deletes a message from a queue as an atomic operation. This
 
748
        operation should be used when a best-effort guarantee is sufficient
 
749
        for an application; that is, using this operation it is possible for
 
750
        messages to be lost if processing fails.
 
751
 
 
752
        queue_name: Name of the queue.
 
753
        timeout: Optional. The timeout parameter is expressed in seconds.
 
754
        '''
 
755
        _validate_not_none('queue_name', queue_name)
 
756
        request = HTTPRequest()
 
757
        request.method = 'DELETE'
 
758
        request.host = self._get_host()
 
759
        request.path = '/' + _str(queue_name) + '/messages/head'
 
760
        request.query = [('timeout', _int_or_none(timeout))]
 
761
        request.path, request.query = _update_request_uri_query(request)
 
762
        request.headers = self._update_service_bus_header(request)
 
763
        response = self._perform_request(request)
 
764
 
 
765
        return _create_message(response, self)
 
766
 
 
767
    def delete_queue_message(self, queue_name, sequence_number, lock_token):
 
768
        '''
 
769
        Completes processing on a locked message and delete it from the queue.
 
770
        This operation should only be called after processing a previously
 
771
        locked message is successful to maintain At-Least-Once delivery
 
772
        assurances.
 
773
 
 
774
        queue_name: Name of the queue.
 
775
        sequence_number:
 
776
            The sequence number of the message to be deleted as returned in
 
777
            BrokerProperties['SequenceNumber'] by the Peek Message operation.
 
778
        lock_token:
 
779
            The ID of the lock as returned by the Peek Message operation in
 
780
            BrokerProperties['LockToken']
 
781
        '''
 
782
        _validate_not_none('queue_name', queue_name)
 
783
        _validate_not_none('sequence_number', sequence_number)
 
784
        _validate_not_none('lock_token', lock_token)
 
785
        request = HTTPRequest()
 
786
        request.method = 'DELETE'
 
787
        request.host = self._get_host()
 
788
        request.path = '/' + _str(queue_name) + \
 
789
                       '/messages/' + _str(sequence_number) + \
 
790
                       '/' + _str(lock_token) + ''
 
791
        request.path, request.query = _update_request_uri_query(request)
 
792
        request.headers = self._update_service_bus_header(request)
 
793
        self._perform_request(request)
 
794
 
 
795
    def receive_queue_message(self, queue_name, peek_lock=True, timeout=60):
 
796
        '''
 
797
        Receive a message from a queue for processing.
 
798
 
 
799
        queue_name: Name of the queue.
 
800
        peek_lock:
 
801
            Optional. True to retrieve and lock the message. False to read and
 
802
            delete the message. Default is True (lock).
 
803
        timeout: Optional. The timeout parameter is expressed in seconds.
 
804
        '''
 
805
        if peek_lock:
 
806
            return self.peek_lock_queue_message(queue_name, timeout)
 
807
        else:
 
808
            return self.read_delete_queue_message(queue_name, timeout)
 
809
 
 
810
    def receive_subscription_message(self, topic_name, subscription_name,
 
811
                                     peek_lock=True, timeout=60):
 
812
        '''
 
813
        Receive a message from a subscription for processing.
 
814
 
 
815
        topic_name: Name of the topic.
 
816
        subscription_name: Name of the subscription.
 
817
        peek_lock:
 
818
            Optional. True to retrieve and lock the message. False to read and
 
819
            delete the message. Default is True (lock).
 
820
        timeout: Optional. The timeout parameter is expressed in seconds.
 
821
        '''
 
822
        if peek_lock:
 
823
            return self.peek_lock_subscription_message(topic_name,
 
824
                                                       subscription_name,
 
825
                                                       timeout)
 
826
        else:
 
827
            return self.read_delete_subscription_message(topic_name,
 
828
                                                         subscription_name,
 
829
                                                         timeout)
 
830
 
 
831
    def _get_host(self):
 
832
        return self.service_namespace + self.host_base
 
833
 
 
834
    def _perform_request(self, request):
 
835
        try:
 
836
            resp = self._filter(request)
 
837
        except HTTPError as ex:
 
838
            return _service_bus_error_handler(ex)
 
839
 
 
840
        return resp
 
841
 
 
842
    def _update_service_bus_header(self, request):
 
843
        ''' Add additional headers for service bus. '''
 
844
 
 
845
        if request.method in ['PUT', 'POST', 'MERGE', 'DELETE']:
 
846
            request.headers.append(('Content-Length', str(len(request.body))))
 
847
 
 
848
        # if it is not GET or HEAD request, must set content-type.
 
849
        if not request.method in ['GET', 'HEAD']:
 
850
            for name, _ in request.headers:
 
851
                if 'content-type' == name.lower():
 
852
                    break
 
853
            else:
 
854
                request.headers.append(
 
855
                    ('Content-Type',
 
856
                     'application/atom+xml;type=entry;charset=utf-8'))
 
857
 
 
858
        # Adds authoriaztion header for authentication.
 
859
        request.headers.append(
 
860
            ('Authorization', self._sign_service_bus_request(request)))
 
861
 
 
862
        return request.headers
 
863
 
 
864
    def _sign_service_bus_request(self, request):
 
865
        ''' return the signed string with token. '''
 
866
 
 
867
        return 'WRAP access_token="' + \
 
868
               self._get_token(request.host, request.path) + '"'
 
869
 
 
870
    def _token_is_expired(self, token):
 
871
        ''' Check if token expires or not. '''
 
872
        time_pos_begin = token.find('ExpiresOn=') + len('ExpiresOn=')
 
873
        time_pos_end = token.find('&', time_pos_begin)
 
874
        token_expire_time = int(token[time_pos_begin:time_pos_end])
 
875
        time_now = time.mktime(time.localtime())
 
876
 
 
877
        # Adding 30 seconds so the token wouldn't be expired when we send the
 
878
        # token to server.
 
879
        return (token_expire_time - time_now) < 30
 
880
 
 
881
    def _get_token(self, host, path):
 
882
        '''
 
883
        Returns token for the request.
 
884
 
 
885
        host: the service bus service request.
 
886
        path: the service bus service request.
 
887
        '''
 
888
        wrap_scope = 'http://' + host + path + self.issuer + self.account_key
 
889
 
 
890
        # Check whether has unexpired cache, return cached token if it is still
 
891
        # usable.
 
892
        if wrap_scope in _tokens:
 
893
            token = _tokens[wrap_scope]
 
894
            if not self._token_is_expired(token):
 
895
                return token
 
896
 
 
897
        # get token from accessconstrol server
 
898
        request = HTTPRequest()
 
899
        request.protocol_override = 'https'
 
900
        request.host = host.replace('.servicebus.', '-sb.accesscontrol.')
 
901
        request.method = 'POST'
 
902
        request.path = '/WRAPv0.9'
 
903
        request.body = ('wrap_name=' + url_quote(self.issuer) +
 
904
                        '&wrap_password=' + url_quote(self.account_key) +
 
905
                        '&wrap_scope=' +
 
906
                        url_quote('http://' + host + path)).encode('utf-8')
 
907
        request.headers.append(('Content-Length', str(len(request.body))))
 
908
        resp = self._httpclient.perform_request(request)
 
909
 
 
910
        token = resp.body.decode('utf-8')
 
911
        token = url_unquote(token[token.find('=') + 1:token.rfind('&')])
 
912
        _tokens[wrap_scope] = token
 
913
 
 
914
        return token