1
#-------------------------------------------------------------------------
2
# Copyright (c) Microsoft. All rights reserved.
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
# http://www.apache.org/licenses/LICENSE-2.0
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
#--------------------------------------------------------------------------
20
SERVICE_BUS_HOST_BASE,
21
_convert_response_to_feeds,
25
_get_request_body_bytes_only,
28
_update_request_uri_query,
33
from azure.http import (
37
from azure.http.httpclient import _HTTPClient
38
from azure.servicebus import (
39
AZURE_SERVICEBUS_NAMESPACE,
40
AZURE_SERVICEBUS_ACCESS_KEY,
41
AZURE_SERVICEBUS_ISSUER,
42
_convert_topic_to_xml,
43
_convert_response_to_topic,
44
_convert_queue_to_xml,
45
_convert_response_to_queue,
46
_convert_subscription_to_xml,
47
_convert_response_to_subscription,
49
_convert_response_to_rule,
50
_convert_xml_to_queue,
51
_convert_xml_to_topic,
52
_convert_xml_to_subscription,
55
_service_bus_error_handler,
58
# Token cache for Authentication
59
# Shared by the different instances of ServiceBusService
63
class ServiceBusService(object):
65
def __init__(self, service_namespace=None, account_key=None, issuer=None,
66
x_ms_version='2011-06-01', host_base=SERVICE_BUS_HOST_BASE):
67
# x_ms_version is not used, but the parameter is kept for backwards
70
self.service_namespace = service_namespace
71
self.account_key = account_key
73
self.host_base = host_base
75
# Get service namespace, account key and issuer.
76
# If they are set when constructing, then use them, else find them
77
# from environment variables.
78
if not self.service_namespace:
79
self.service_namespace = os.environ.get(AZURE_SERVICEBUS_NAMESPACE)
80
if not self.account_key:
81
self.account_key = os.environ.get(AZURE_SERVICEBUS_ACCESS_KEY)
83
self.issuer = os.environ.get(AZURE_SERVICEBUS_ISSUER)
85
if not self.service_namespace or \
86
not self.account_key or not self.issuer:
87
raise WindowsAzureError(
88
'You need to provide servicebus namespace, access key and Issuer')
90
self._httpclient = _HTTPClient(service_instance=self,
91
service_namespace=self.service_namespace,
92
account_key=self.account_key,
94
self._filter = self._httpclient.perform_request
96
def with_filter(self, filter):
98
Returns a new service which will process requests with the specified
99
filter. Filtering operations can include logging, automatic retrying,
100
etc... The filter is a lambda which receives the HTTPRequest and
101
another lambda. The filter can perform any pre-processing on the
102
request, pass it off to the next lambda, and then perform any
103
post-processing on the response.
105
res = ServiceBusService(self.service_namespace, self.account_key,
107
old_filter = self._filter
109
def new_filter(request):
110
return filter(request, old_filter)
112
res._filter = new_filter
115
def set_proxy(self, host, port, user=None, password=None):
117
Sets the proxy server host and port for the HTTP CONNECT Tunnelling.
119
host: Address of the proxy. Ex: '192.168.0.100'
120
port: Port of the proxy. Ex: 6000
121
user: User for proxy authorization.
122
password: Password for proxy authorization.
124
self._httpclient.set_proxy(host, port, user, password)
126
def create_queue(self, queue_name, queue=None, fail_on_exist=False):
128
Creates a new queue. Once created, this queue's resource manifest is
131
queue_name: Name of the queue to create.
132
queue: Queue object to create.
134
Specify whether to throw an exception when the queue exists.
136
_validate_not_none('queue_name', queue_name)
137
request = HTTPRequest()
138
request.method = 'PUT'
139
request.host = self._get_host()
140
request.path = '/' + _str(queue_name) + ''
141
request.body = _get_request_body(_convert_queue_to_xml(queue))
142
request.path, request.query = _update_request_uri_query(request)
143
request.headers = self._update_service_bus_header(request)
144
if not fail_on_exist:
146
self._perform_request(request)
148
except WindowsAzureError as ex:
149
_dont_fail_on_exist(ex)
152
self._perform_request(request)
155
def delete_queue(self, queue_name, fail_not_exist=False):
157
Deletes an existing queue. This operation will also remove all
158
associated state including messages in the queue.
160
queue_name: Name of the queue to delete.
162
Specify whether to throw an exception if the queue doesn't exist.
164
_validate_not_none('queue_name', queue_name)
165
request = HTTPRequest()
166
request.method = 'DELETE'
167
request.host = self._get_host()
168
request.path = '/' + _str(queue_name) + ''
169
request.path, request.query = _update_request_uri_query(request)
170
request.headers = self._update_service_bus_header(request)
171
if not fail_not_exist:
173
self._perform_request(request)
175
except WindowsAzureError as ex:
176
_dont_fail_not_exist(ex)
179
self._perform_request(request)
182
def get_queue(self, queue_name):
184
Retrieves an existing queue.
186
queue_name: Name of the queue.
188
_validate_not_none('queue_name', queue_name)
189
request = HTTPRequest()
190
request.method = 'GET'
191
request.host = self._get_host()
192
request.path = '/' + _str(queue_name) + ''
193
request.path, request.query = _update_request_uri_query(request)
194
request.headers = self._update_service_bus_header(request)
195
response = self._perform_request(request)
197
return _convert_response_to_queue(response)
199
def list_queues(self):
201
Enumerates the queues in the service namespace.
203
request = HTTPRequest()
204
request.method = 'GET'
205
request.host = self._get_host()
206
request.path = '/$Resources/Queues'
207
request.path, request.query = _update_request_uri_query(request)
208
request.headers = self._update_service_bus_header(request)
209
response = self._perform_request(request)
211
return _convert_response_to_feeds(response, _convert_xml_to_queue)
213
def create_topic(self, topic_name, topic=None, fail_on_exist=False):
215
Creates a new topic. Once created, this topic resource manifest is
218
topic_name: Name of the topic to create.
219
topic: Topic object to create.
221
Specify whether to throw an exception when the topic exists.
223
_validate_not_none('topic_name', topic_name)
224
request = HTTPRequest()
225
request.method = 'PUT'
226
request.host = self._get_host()
227
request.path = '/' + _str(topic_name) + ''
228
request.body = _get_request_body(_convert_topic_to_xml(topic))
229
request.path, request.query = _update_request_uri_query(request)
230
request.headers = self._update_service_bus_header(request)
231
if not fail_on_exist:
233
self._perform_request(request)
235
except WindowsAzureError as ex:
236
_dont_fail_on_exist(ex)
239
self._perform_request(request)
242
def delete_topic(self, topic_name, fail_not_exist=False):
244
Deletes an existing topic. This operation will also remove all
245
associated state including associated subscriptions.
247
topic_name: Name of the topic to delete.
249
Specify whether throw exception when topic doesn't exist.
251
_validate_not_none('topic_name', topic_name)
252
request = HTTPRequest()
253
request.method = 'DELETE'
254
request.host = self._get_host()
255
request.path = '/' + _str(topic_name) + ''
256
request.path, request.query = _update_request_uri_query(request)
257
request.headers = self._update_service_bus_header(request)
258
if not fail_not_exist:
260
self._perform_request(request)
262
except WindowsAzureError as ex:
263
_dont_fail_not_exist(ex)
266
self._perform_request(request)
269
def get_topic(self, topic_name):
271
Retrieves the description for the specified topic.
273
topic_name: Name of the topic.
275
_validate_not_none('topic_name', topic_name)
276
request = HTTPRequest()
277
request.method = 'GET'
278
request.host = self._get_host()
279
request.path = '/' + _str(topic_name) + ''
280
request.path, request.query = _update_request_uri_query(request)
281
request.headers = self._update_service_bus_header(request)
282
response = self._perform_request(request)
284
return _convert_response_to_topic(response)
286
def list_topics(self):
288
Retrieves the topics in the service namespace.
290
request = HTTPRequest()
291
request.method = 'GET'
292
request.host = self._get_host()
293
request.path = '/$Resources/Topics'
294
request.path, request.query = _update_request_uri_query(request)
295
request.headers = self._update_service_bus_header(request)
296
response = self._perform_request(request)
298
return _convert_response_to_feeds(response, _convert_xml_to_topic)
300
def create_rule(self, topic_name, subscription_name, rule_name, rule=None,
301
fail_on_exist=False):
303
Creates a new rule. Once created, this rule's resource manifest is
306
topic_name: Name of the topic.
307
subscription_name: Name of the subscription.
308
rule_name: Name of the rule.
310
Specify whether to throw an exception when the rule exists.
312
_validate_not_none('topic_name', topic_name)
313
_validate_not_none('subscription_name', subscription_name)
314
_validate_not_none('rule_name', rule_name)
315
request = HTTPRequest()
316
request.method = 'PUT'
317
request.host = self._get_host()
318
request.path = '/' + _str(topic_name) + '/subscriptions/' + \
319
_str(subscription_name) + \
320
'/rules/' + _str(rule_name) + ''
321
request.body = _get_request_body(_convert_rule_to_xml(rule))
322
request.path, request.query = _update_request_uri_query(request)
323
request.headers = self._update_service_bus_header(request)
324
if not fail_on_exist:
326
self._perform_request(request)
328
except WindowsAzureError as ex:
329
_dont_fail_on_exist(ex)
332
self._perform_request(request)
335
def delete_rule(self, topic_name, subscription_name, rule_name,
336
fail_not_exist=False):
338
Deletes an existing rule.
340
topic_name: Name of the topic.
341
subscription_name: Name of the subscription.
343
Name of the rule to delete. DEFAULT_RULE_NAME=$Default.
344
Use DEFAULT_RULE_NAME to delete default rule for the subscription.
346
Specify whether throw exception when rule doesn't exist.
348
_validate_not_none('topic_name', topic_name)
349
_validate_not_none('subscription_name', subscription_name)
350
_validate_not_none('rule_name', rule_name)
351
request = HTTPRequest()
352
request.method = 'DELETE'
353
request.host = self._get_host()
354
request.path = '/' + _str(topic_name) + '/subscriptions/' + \
355
_str(subscription_name) + \
356
'/rules/' + _str(rule_name) + ''
357
request.path, request.query = _update_request_uri_query(request)
358
request.headers = self._update_service_bus_header(request)
359
if not fail_not_exist:
361
self._perform_request(request)
363
except WindowsAzureError as ex:
364
_dont_fail_not_exist(ex)
367
self._perform_request(request)
370
def get_rule(self, topic_name, subscription_name, rule_name):
372
Retrieves the description for the specified rule.
374
topic_name: Name of the topic.
375
subscription_name: Name of the subscription.
376
rule_name: Name of the rule.
378
_validate_not_none('topic_name', topic_name)
379
_validate_not_none('subscription_name', subscription_name)
380
_validate_not_none('rule_name', rule_name)
381
request = HTTPRequest()
382
request.method = 'GET'
383
request.host = self._get_host()
384
request.path = '/' + _str(topic_name) + '/subscriptions/' + \
385
_str(subscription_name) + \
386
'/rules/' + _str(rule_name) + ''
387
request.path, request.query = _update_request_uri_query(request)
388
request.headers = self._update_service_bus_header(request)
389
response = self._perform_request(request)
391
return _convert_response_to_rule(response)
393
def list_rules(self, topic_name, subscription_name):
395
Retrieves the rules that exist under the specified subscription.
397
topic_name: Name of the topic.
398
subscription_name: Name of the subscription.
400
_validate_not_none('topic_name', topic_name)
401
_validate_not_none('subscription_name', subscription_name)
402
request = HTTPRequest()
403
request.method = 'GET'
404
request.host = self._get_host()
405
request.path = '/' + \
406
_str(topic_name) + '/subscriptions/' + \
407
_str(subscription_name) + '/rules/'
408
request.path, request.query = _update_request_uri_query(request)
409
request.headers = self._update_service_bus_header(request)
410
response = self._perform_request(request)
412
return _convert_response_to_feeds(response, _convert_xml_to_rule)
414
def create_subscription(self, topic_name, subscription_name,
415
subscription=None, fail_on_exist=False):
417
Creates a new subscription. Once created, this subscription resource
418
manifest is immutable.
420
topic_name: Name of the topic.
421
subscription_name: Name of the subscription.
423
Specify whether throw exception when subscription exists.
425
_validate_not_none('topic_name', topic_name)
426
_validate_not_none('subscription_name', subscription_name)
427
request = HTTPRequest()
428
request.method = 'PUT'
429
request.host = self._get_host()
430
request.path = '/' + \
431
_str(topic_name) + '/subscriptions/' + _str(subscription_name) + ''
432
request.body = _get_request_body(
433
_convert_subscription_to_xml(subscription))
434
request.path, request.query = _update_request_uri_query(request)
435
request.headers = self._update_service_bus_header(request)
436
if not fail_on_exist:
438
self._perform_request(request)
440
except WindowsAzureError as ex:
441
_dont_fail_on_exist(ex)
444
self._perform_request(request)
447
def delete_subscription(self, topic_name, subscription_name,
448
fail_not_exist=False):
450
Deletes an existing subscription.
452
topic_name: Name of the topic.
453
subscription_name: Name of the subscription to delete.
455
Specify whether to throw an exception when the subscription
458
_validate_not_none('topic_name', topic_name)
459
_validate_not_none('subscription_name', subscription_name)
460
request = HTTPRequest()
461
request.method = 'DELETE'
462
request.host = self._get_host()
463
request.path = '/' + \
464
_str(topic_name) + '/subscriptions/' + _str(subscription_name) + ''
465
request.path, request.query = _update_request_uri_query(request)
466
request.headers = self._update_service_bus_header(request)
467
if not fail_not_exist:
469
self._perform_request(request)
471
except WindowsAzureError as ex:
472
_dont_fail_not_exist(ex)
475
self._perform_request(request)
478
def get_subscription(self, topic_name, subscription_name):
480
Gets an existing subscription.
482
topic_name: Name of the topic.
483
subscription_name: Name of the subscription.
485
_validate_not_none('topic_name', topic_name)
486
_validate_not_none('subscription_name', subscription_name)
487
request = HTTPRequest()
488
request.method = 'GET'
489
request.host = self._get_host()
490
request.path = '/' + \
491
_str(topic_name) + '/subscriptions/' + _str(subscription_name) + ''
492
request.path, request.query = _update_request_uri_query(request)
493
request.headers = self._update_service_bus_header(request)
494
response = self._perform_request(request)
496
return _convert_response_to_subscription(response)
498
def list_subscriptions(self, topic_name):
500
Retrieves the subscriptions in the specified topic.
502
topic_name: Name of the topic.
504
_validate_not_none('topic_name', topic_name)
505
request = HTTPRequest()
506
request.method = 'GET'
507
request.host = self._get_host()
508
request.path = '/' + _str(topic_name) + '/subscriptions/'
509
request.path, request.query = _update_request_uri_query(request)
510
request.headers = self._update_service_bus_header(request)
511
response = self._perform_request(request)
513
return _convert_response_to_feeds(response,
514
_convert_xml_to_subscription)
516
def send_topic_message(self, topic_name, message=None):
518
Enqueues a message into the specified topic. The limit to the number
519
of messages which may be present in the topic is governed by the
520
message size in MaxTopicSizeInBytes. If this message causes the topic
521
to exceed its quota, a quota exceeded error is returned and the
522
message will be rejected.
524
topic_name: Name of the topic.
525
message: Message object containing message body and properties.
527
_validate_not_none('topic_name', topic_name)
528
_validate_not_none('message', message)
529
request = HTTPRequest()
530
request.method = 'POST'
531
request.host = self._get_host()
532
request.path = '/' + _str(topic_name) + '/messages'
533
request.headers = message.add_headers(request)
534
request.body = _get_request_body_bytes_only(
535
'message.body', message.body)
536
request.path, request.query = _update_request_uri_query(request)
537
request.headers = self._update_service_bus_header(request)
538
self._perform_request(request)
540
def peek_lock_subscription_message(self, topic_name, subscription_name,
543
This operation is used to atomically retrieve and lock a message for
544
processing. The message is guaranteed not to be delivered to other
545
receivers during the lock duration period specified in buffer
546
description. Once the lock expires, the message will be available to
547
other receivers (on the same subscription only) during the lock
548
duration period specified in the topic description. Once the lock
549
expires, the message will be available to other receivers. In order to
550
complete processing of the message, the receiver should issue a delete
551
command with the lock ID received from this operation. To abandon
552
processing of the message and unlock it for other receivers, an Unlock
553
Message command should be issued, or the lock duration period can
556
topic_name: Name of the topic.
557
subscription_name: Name of the subscription.
558
timeout: Optional. The timeout parameter is expressed in seconds.
560
_validate_not_none('topic_name', topic_name)
561
_validate_not_none('subscription_name', subscription_name)
562
request = HTTPRequest()
563
request.method = 'POST'
564
request.host = self._get_host()
565
request.path = '/' + \
566
_str(topic_name) + '/subscriptions/' + \
567
_str(subscription_name) + '/messages/head'
568
request.query = [('timeout', _int_or_none(timeout))]
569
request.path, request.query = _update_request_uri_query(request)
570
request.headers = self._update_service_bus_header(request)
571
response = self._perform_request(request)
573
return _create_message(response, self)
575
def unlock_subscription_message(self, topic_name, subscription_name,
576
sequence_number, lock_token):
578
Unlock a message for processing by other receivers on a given
579
subscription. This operation deletes the lock object, causing the
580
message to be unlocked. A message must have first been locked by a
581
receiver before this operation is called.
583
topic_name: Name of the topic.
584
subscription_name: Name of the subscription.
586
The sequence number of the message to be unlocked as returned in
587
BrokerProperties['SequenceNumber'] by the Peek Message operation.
589
The ID of the lock as returned by the Peek Message operation in
590
BrokerProperties['LockToken']
592
_validate_not_none('topic_name', topic_name)
593
_validate_not_none('subscription_name', subscription_name)
594
_validate_not_none('sequence_number', sequence_number)
595
_validate_not_none('lock_token', lock_token)
596
request = HTTPRequest()
597
request.method = 'PUT'
598
request.host = self._get_host()
599
request.path = '/' + _str(topic_name) + \
600
'/subscriptions/' + str(subscription_name) + \
601
'/messages/' + _str(sequence_number) + \
602
'/' + _str(lock_token) + ''
603
request.path, request.query = _update_request_uri_query(request)
604
request.headers = self._update_service_bus_header(request)
605
self._perform_request(request)
607
def read_delete_subscription_message(self, topic_name, subscription_name,
610
Read and delete a message from a subscription as an atomic operation.
611
This operation should be used when a best-effort guarantee is
612
sufficient for an application; that is, using this operation it is
613
possible for messages to be lost if processing fails.
615
topic_name: Name of the topic.
616
subscription_name: Name of the subscription.
617
timeout: Optional. The timeout parameter is expressed in seconds.
619
_validate_not_none('topic_name', topic_name)
620
_validate_not_none('subscription_name', subscription_name)
621
request = HTTPRequest()
622
request.method = 'DELETE'
623
request.host = self._get_host()
624
request.path = '/' + _str(topic_name) + \
625
'/subscriptions/' + _str(subscription_name) + \
627
request.query = [('timeout', _int_or_none(timeout))]
628
request.path, request.query = _update_request_uri_query(request)
629
request.headers = self._update_service_bus_header(request)
630
response = self._perform_request(request)
632
return _create_message(response, self)
634
def delete_subscription_message(self, topic_name, subscription_name,
635
sequence_number, lock_token):
637
Completes processing on a locked message and delete it from the
638
subscription. This operation should only be called after processing a
639
previously locked message is successful to maintain At-Least-Once
642
topic_name: Name of the topic.
643
subscription_name: Name of the subscription.
645
The sequence number of the message to be deleted as returned in
646
BrokerProperties['SequenceNumber'] by the Peek Message operation.
648
The ID of the lock as returned by the Peek Message operation in
649
BrokerProperties['LockToken']
651
_validate_not_none('topic_name', topic_name)
652
_validate_not_none('subscription_name', subscription_name)
653
_validate_not_none('sequence_number', sequence_number)
654
_validate_not_none('lock_token', lock_token)
655
request = HTTPRequest()
656
request.method = 'DELETE'
657
request.host = self._get_host()
658
request.path = '/' + _str(topic_name) + \
659
'/subscriptions/' + _str(subscription_name) + \
660
'/messages/' + _str(sequence_number) + \
661
'/' + _str(lock_token) + ''
662
request.path, request.query = _update_request_uri_query(request)
663
request.headers = self._update_service_bus_header(request)
664
self._perform_request(request)
666
def send_queue_message(self, queue_name, message=None):
668
Sends a message into the specified queue. The limit to the number of
669
messages which may be present in the topic is governed by the message
670
size the MaxTopicSizeInMegaBytes. If this message will cause the queue
671
to exceed its quota, a quota exceeded error is returned and the
672
message will be rejected.
674
queue_name: Name of the queue.
675
message: Message object containing message body and properties.
677
_validate_not_none('queue_name', queue_name)
678
_validate_not_none('message', message)
679
request = HTTPRequest()
680
request.method = 'POST'
681
request.host = self._get_host()
682
request.path = '/' + _str(queue_name) + '/messages'
683
request.headers = message.add_headers(request)
684
request.body = _get_request_body_bytes_only('message.body',
686
request.path, request.query = _update_request_uri_query(request)
687
request.headers = self._update_service_bus_header(request)
688
self._perform_request(request)
690
def peek_lock_queue_message(self, queue_name, timeout='60'):
692
Automically retrieves and locks a message from a queue for processing.
693
The message is guaranteed not to be delivered to other receivers (on
694
the same subscription only) during the lock duration period specified
695
in the queue description. Once the lock expires, the message will be
696
available to other receivers. In order to complete processing of the
697
message, the receiver should issue a delete command with the lock ID
698
received from this operation. To abandon processing of the message and
699
unlock it for other receivers, an Unlock Message command should be
700
issued, or the lock duration period can expire.
702
queue_name: Name of the queue.
703
timeout: Optional. The timeout parameter is expressed in seconds.
705
_validate_not_none('queue_name', queue_name)
706
request = HTTPRequest()
707
request.method = 'POST'
708
request.host = self._get_host()
709
request.path = '/' + _str(queue_name) + '/messages/head'
710
request.query = [('timeout', _int_or_none(timeout))]
711
request.path, request.query = _update_request_uri_query(request)
712
request.headers = self._update_service_bus_header(request)
713
response = self._perform_request(request)
715
return _create_message(response, self)
717
def unlock_queue_message(self, queue_name, sequence_number, lock_token):
719
Unlocks a message for processing by other receivers on a given
720
subscription. This operation deletes the lock object, causing the
721
message to be unlocked. A message must have first been locked by a
722
receiver before this operation is called.
724
queue_name: Name of the queue.
726
The sequence number of the message to be unlocked as returned in
727
BrokerProperties['SequenceNumber'] by the Peek Message operation.
729
The ID of the lock as returned by the Peek Message operation in
730
BrokerProperties['LockToken']
732
_validate_not_none('queue_name', queue_name)
733
_validate_not_none('sequence_number', sequence_number)
734
_validate_not_none('lock_token', lock_token)
735
request = HTTPRequest()
736
request.method = 'PUT'
737
request.host = self._get_host()
738
request.path = '/' + _str(queue_name) + \
739
'/messages/' + _str(sequence_number) + \
740
'/' + _str(lock_token) + ''
741
request.path, request.query = _update_request_uri_query(request)
742
request.headers = self._update_service_bus_header(request)
743
self._perform_request(request)
745
def read_delete_queue_message(self, queue_name, timeout='60'):
747
Reads and deletes a message from a queue as an atomic operation. This
748
operation should be used when a best-effort guarantee is sufficient
749
for an application; that is, using this operation it is possible for
750
messages to be lost if processing fails.
752
queue_name: Name of the queue.
753
timeout: Optional. The timeout parameter is expressed in seconds.
755
_validate_not_none('queue_name', queue_name)
756
request = HTTPRequest()
757
request.method = 'DELETE'
758
request.host = self._get_host()
759
request.path = '/' + _str(queue_name) + '/messages/head'
760
request.query = [('timeout', _int_or_none(timeout))]
761
request.path, request.query = _update_request_uri_query(request)
762
request.headers = self._update_service_bus_header(request)
763
response = self._perform_request(request)
765
return _create_message(response, self)
767
def delete_queue_message(self, queue_name, sequence_number, lock_token):
769
Completes processing on a locked message and delete it from the queue.
770
This operation should only be called after processing a previously
771
locked message is successful to maintain At-Least-Once delivery
774
queue_name: Name of the queue.
776
The sequence number of the message to be deleted as returned in
777
BrokerProperties['SequenceNumber'] by the Peek Message operation.
779
The ID of the lock as returned by the Peek Message operation in
780
BrokerProperties['LockToken']
782
_validate_not_none('queue_name', queue_name)
783
_validate_not_none('sequence_number', sequence_number)
784
_validate_not_none('lock_token', lock_token)
785
request = HTTPRequest()
786
request.method = 'DELETE'
787
request.host = self._get_host()
788
request.path = '/' + _str(queue_name) + \
789
'/messages/' + _str(sequence_number) + \
790
'/' + _str(lock_token) + ''
791
request.path, request.query = _update_request_uri_query(request)
792
request.headers = self._update_service_bus_header(request)
793
self._perform_request(request)
795
def receive_queue_message(self, queue_name, peek_lock=True, timeout=60):
797
Receive a message from a queue for processing.
799
queue_name: Name of the queue.
801
Optional. True to retrieve and lock the message. False to read and
802
delete the message. Default is True (lock).
803
timeout: Optional. The timeout parameter is expressed in seconds.
806
return self.peek_lock_queue_message(queue_name, timeout)
808
return self.read_delete_queue_message(queue_name, timeout)
810
def receive_subscription_message(self, topic_name, subscription_name,
811
peek_lock=True, timeout=60):
813
Receive a message from a subscription for processing.
815
topic_name: Name of the topic.
816
subscription_name: Name of the subscription.
818
Optional. True to retrieve and lock the message. False to read and
819
delete the message. Default is True (lock).
820
timeout: Optional. The timeout parameter is expressed in seconds.
823
return self.peek_lock_subscription_message(topic_name,
827
return self.read_delete_subscription_message(topic_name,
832
return self.service_namespace + self.host_base
834
def _perform_request(self, request):
836
resp = self._filter(request)
837
except HTTPError as ex:
838
return _service_bus_error_handler(ex)
842
def _update_service_bus_header(self, request):
843
''' Add additional headers for service bus. '''
845
if request.method in ['PUT', 'POST', 'MERGE', 'DELETE']:
846
request.headers.append(('Content-Length', str(len(request.body))))
848
# if it is not GET or HEAD request, must set content-type.
849
if not request.method in ['GET', 'HEAD']:
850
for name, _ in request.headers:
851
if 'content-type' == name.lower():
854
request.headers.append(
856
'application/atom+xml;type=entry;charset=utf-8'))
858
# Adds authoriaztion header for authentication.
859
request.headers.append(
860
('Authorization', self._sign_service_bus_request(request)))
862
return request.headers
864
def _sign_service_bus_request(self, request):
865
''' return the signed string with token. '''
867
return 'WRAP access_token="' + \
868
self._get_token(request.host, request.path) + '"'
870
def _token_is_expired(self, token):
871
''' Check if token expires or not. '''
872
time_pos_begin = token.find('ExpiresOn=') + len('ExpiresOn=')
873
time_pos_end = token.find('&', time_pos_begin)
874
token_expire_time = int(token[time_pos_begin:time_pos_end])
875
time_now = time.mktime(time.localtime())
877
# Adding 30 seconds so the token wouldn't be expired when we send the
879
return (token_expire_time - time_now) < 30
881
def _get_token(self, host, path):
883
Returns token for the request.
885
host: the service bus service request.
886
path: the service bus service request.
888
wrap_scope = 'http://' + host + path + self.issuer + self.account_key
890
# Check whether has unexpired cache, return cached token if it is still
892
if wrap_scope in _tokens:
893
token = _tokens[wrap_scope]
894
if not self._token_is_expired(token):
897
# get token from accessconstrol server
898
request = HTTPRequest()
899
request.protocol_override = 'https'
900
request.host = host.replace('.servicebus.', '-sb.accesscontrol.')
901
request.method = 'POST'
902
request.path = '/WRAPv0.9'
903
request.body = ('wrap_name=' + url_quote(self.issuer) +
904
'&wrap_password=' + url_quote(self.account_key) +
906
url_quote('http://' + host + path)).encode('utf-8')
907
request.headers.append(('Content-Length', str(len(request.body))))
908
resp = self._httpclient.perform_request(request)
910
token = resp.body.decode('utf-8')
911
token = url_unquote(token[token.find('=') + 1:token.rfind('&')])
912
_tokens[wrap_scope] = token