1
#-------------------------------------------------------------------------
2
# Copyright (c) Microsoft. All rights reserved.
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
# http://www.apache.org/licenses/LICENSE-2.0
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
#--------------------------------------------------------------------------
18
from datetime import datetime
19
from xml.dom import minidom
25
_general_error_handler,
26
_get_entry_properties,
28
_get_children_from_path,
29
_get_first_child_node_value,
30
_ERROR_MESSAGE_NOT_PEEK_LOCKED_ON_DELETE,
31
_ERROR_MESSAGE_NOT_PEEK_LOCKED_ON_UNLOCK,
32
_ERROR_QUEUE_NOT_FOUND,
33
_ERROR_TOPIC_NOT_FOUND,
35
from azure.http import HTTPError
37
# default rule name for subscription
38
DEFAULT_RULE_NAME = '$Default'
40
#-----------------------------------------------------------------------------
41
# Constants for Azure app environment settings.
42
AZURE_SERVICEBUS_NAMESPACE = 'AZURE_SERVICEBUS_NAMESPACE'
43
AZURE_SERVICEBUS_ACCESS_KEY = 'AZURE_SERVICEBUS_ACCESS_KEY'
44
AZURE_SERVICEBUS_ISSUER = 'AZURE_SERVICEBUS_ISSUER'
46
# namespace used for converting rules to objects
47
XML_SCHEMA_NAMESPACE = 'http://www.w3.org/2001/XMLSchema-instance'
50
class Queue(WindowsAzureData):
52
''' Queue class corresponding to Queue Description:
53
http://msdn.microsoft.com/en-us/library/windowsazure/hh780773'''
55
def __init__(self, lock_duration=None, max_size_in_megabytes=None,
56
requires_duplicate_detection=None, requires_session=None,
57
default_message_time_to_live=None,
58
dead_lettering_on_message_expiration=None,
59
duplicate_detection_history_time_window=None,
60
max_delivery_count=None, enable_batched_operations=None,
61
size_in_bytes=None, message_count=None):
63
self.lock_duration = lock_duration
64
self.max_size_in_megabytes = max_size_in_megabytes
65
self.requires_duplicate_detection = requires_duplicate_detection
66
self.requires_session = requires_session
67
self.default_message_time_to_live = default_message_time_to_live
68
self.dead_lettering_on_message_expiration = \
69
dead_lettering_on_message_expiration
70
self.duplicate_detection_history_time_window = \
71
duplicate_detection_history_time_window
72
self.max_delivery_count = max_delivery_count
73
self.enable_batched_operations = enable_batched_operations
74
self.size_in_bytes = size_in_bytes
75
self.message_count = message_count
78
class Topic(WindowsAzureData):
80
''' Topic class corresponding to Topic Description:
81
http://msdn.microsoft.com/en-us/library/windowsazure/hh780749. '''
83
def __init__(self, default_message_time_to_live=None,
84
max_size_in_megabytes=None, requires_duplicate_detection=None,
85
duplicate_detection_history_time_window=None,
86
enable_batched_operations=None, size_in_bytes=None):
88
self.default_message_time_to_live = default_message_time_to_live
89
self.max_size_in_megabytes = max_size_in_megabytes
90
self.requires_duplicate_detection = requires_duplicate_detection
91
self.duplicate_detection_history_time_window = \
92
duplicate_detection_history_time_window
93
self.enable_batched_operations = enable_batched_operations
94
self.size_in_bytes = size_in_bytes
97
def max_size_in_mega_bytes(self):
100
'This attribute has been changed to max_size_in_megabytes.')
101
return self.max_size_in_megabytes
103
@max_size_in_mega_bytes.setter
104
def max_size_in_mega_bytes(self, value):
105
self.max_size_in_megabytes = value
108
class Subscription(WindowsAzureData):
110
''' Subscription class corresponding to Subscription Description:
111
http://msdn.microsoft.com/en-us/library/windowsazure/hh780763. '''
113
def __init__(self, lock_duration=None, requires_session=None,
114
default_message_time_to_live=None,
115
dead_lettering_on_message_expiration=None,
116
dead_lettering_on_filter_evaluation_exceptions=None,
117
enable_batched_operations=None, max_delivery_count=None,
120
self.lock_duration = lock_duration
121
self.requires_session = requires_session
122
self.default_message_time_to_live = default_message_time_to_live
123
self.dead_lettering_on_message_expiration = \
124
dead_lettering_on_message_expiration
125
self.dead_lettering_on_filter_evaluation_exceptions = \
126
dead_lettering_on_filter_evaluation_exceptions
127
self.enable_batched_operations = enable_batched_operations
128
self.max_delivery_count = max_delivery_count
129
self.message_count = message_count
132
class Rule(WindowsAzureData):
134
''' Rule class corresponding to Rule Description:
135
http://msdn.microsoft.com/en-us/library/windowsazure/hh780753. '''
137
def __init__(self, filter_type=None, filter_expression=None,
138
action_type=None, action_expression=None):
139
self.filter_type = filter_type
140
self.filter_expression = filter_expression
141
self.action_type = action_type
142
self.action_expression = action_type
145
class Message(WindowsAzureData):
147
''' Message class that used in send message/get mesage apis. '''
149
def __init__(self, body=None, service_bus_service=None, location=None,
150
custom_properties=None,
151
type='application/atom+xml;type=entry;charset=utf-8',
152
broker_properties=None):
154
self.location = location
155
self.broker_properties = broker_properties
156
self.custom_properties = custom_properties
158
self.service_bus_service = service_bus_service
159
self._topic_name = None
160
self._subscription_name = None
161
self._queue_name = None
163
if not service_bus_service:
166
# if location is set, then extracts the queue name for queue message and
167
# extracts the topic and subscriptions name if it is topic message.
169
if '/subscriptions/' in location:
170
pos = location.find('/subscriptions/')
171
pos1 = location.rfind('/', 0, pos - 1)
172
self._topic_name = location[pos1 + 1:pos]
173
pos += len('/subscriptions/')
174
pos1 = location.find('/', pos)
175
self._subscription_name = location[pos:pos1]
176
elif '/messages/' in location:
177
pos = location.find('/messages/')
178
pos1 = location.rfind('/', 0, pos - 1)
179
self._queue_name = location[pos1 + 1:pos]
182
''' Deletes itself if find queue name or topic name and subscription
185
self.service_bus_service.delete_queue_message(
187
self.broker_properties['SequenceNumber'],
188
self.broker_properties['LockToken'])
189
elif self._topic_name and self._subscription_name:
190
self.service_bus_service.delete_subscription_message(
192
self._subscription_name,
193
self.broker_properties['SequenceNumber'],
194
self.broker_properties['LockToken'])
196
raise WindowsAzureError(_ERROR_MESSAGE_NOT_PEEK_LOCKED_ON_DELETE)
199
''' Unlocks itself if find queue name or topic name and subscription
202
self.service_bus_service.unlock_queue_message(
204
self.broker_properties['SequenceNumber'],
205
self.broker_properties['LockToken'])
206
elif self._topic_name and self._subscription_name:
207
self.service_bus_service.unlock_subscription_message(
209
self._subscription_name,
210
self.broker_properties['SequenceNumber'],
211
self.broker_properties['LockToken'])
213
raise WindowsAzureError(_ERROR_MESSAGE_NOT_PEEK_LOCKED_ON_UNLOCK)
215
def add_headers(self, request):
216
''' add addtional headers to request for message request.'''
218
# Adds custom properties
219
if self.custom_properties:
220
for name, value in self.custom_properties.items():
221
if sys.version_info < (3,) and isinstance(value, unicode):
222
request.headers.append(
223
(name, '"' + value.encode('utf-8') + '"'))
224
elif isinstance(value, str):
225
request.headers.append((name, '"' + str(value) + '"'))
226
elif isinstance(value, datetime):
227
request.headers.append(
228
(name, '"' + value.strftime('%a, %d %b %Y %H:%M:%S GMT') + '"'))
230
request.headers.append((name, str(value).lower()))
233
request.headers.append(('Content-Type', self.type))
235
# Adds BrokerProperties
236
if self.broker_properties:
237
request.headers.append(
238
('BrokerProperties', str(self.broker_properties)))
240
return request.headers
243
def _create_message(response, service_instance):
244
''' Create message from response.
246
response: response from service bus cloud server.
247
service_instance: the service bus client.
249
respbody = response.body
250
custom_properties = {}
251
broker_properties = None
253
message_location = None
255
# gets all information from respheaders.
256
for name, value in response.headers:
257
if name.lower() == 'brokerproperties':
258
broker_properties = ast.literal_eval(value)
259
elif name.lower() == 'content-type':
261
elif name.lower() == 'location':
262
message_location = value
263
elif name.lower() not in ['content-type',
272
custom_properties[name] = datetime.strptime(
273
value, '%a, %d %b %Y %H:%M:%S GMT')
275
custom_properties[name] = value
276
else: # only int, float or boolean
277
if value.lower() == 'true':
278
custom_properties[name] = True
279
elif value.lower() == 'false':
280
custom_properties[name] = False
281
# int('3.1') doesn't work so need to get float('3.14') first
282
elif str(int(float(value))) == value:
283
custom_properties[name] = int(value)
285
custom_properties[name] = float(value)
287
if message_type == None:
289
respbody, service_instance, message_location, custom_properties,
290
'application/atom+xml;type=entry;charset=utf-8', broker_properties)
292
message = Message(respbody, service_instance, message_location,
293
custom_properties, message_type, broker_properties)
299
def _convert_response_to_rule(response):
300
return _convert_xml_to_rule(response.body)
303
def _convert_xml_to_rule(xmlstr):
304
''' Converts response xml to rule object.
306
The format of xml for rule:
307
<entry xmlns='http://www.w3.org/2005/Atom'>
308
<content type='application/xml'>
310
xmlns:i="http://www.w3.org/2001/XMLSchema-instance"
311
xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect">
312
<Filter i:type="SqlFilterExpression">
313
<SqlExpression>MyProperty='XYZ'</SqlExpression>
315
<Action i:type="SqlFilterAction">
316
<SqlExpression>set MyProperty2 = 'ABC'</SqlExpression>
322
xmldoc = minidom.parseString(xmlstr)
325
for rule_desc in _get_children_from_path(xmldoc,
329
for xml_filter in _get_child_nodes(rule_desc, 'Filter'):
330
filter_type = xml_filter.getAttributeNS(
331
XML_SCHEMA_NAMESPACE, 'type')
332
setattr(rule, 'filter_type', str(filter_type))
333
if xml_filter.childNodes:
335
for expr in _get_child_nodes(xml_filter, 'SqlExpression'):
336
setattr(rule, 'filter_expression',
337
expr.firstChild.nodeValue)
339
for xml_action in _get_child_nodes(rule_desc, 'Action'):
340
action_type = xml_action.getAttributeNS(
341
XML_SCHEMA_NAMESPACE, 'type')
342
setattr(rule, 'action_type', str(action_type))
343
if xml_action.childNodes:
344
action_expression = xml_action.childNodes[0].firstChild
345
if action_expression:
346
setattr(rule, 'action_expression',
347
action_expression.nodeValue)
349
# extract id, updated and name value from feed entry and set them of rule.
350
for name, value in _get_entry_properties(xmlstr, True, '/rules').items():
351
setattr(rule, name, value)
356
def _convert_response_to_queue(response):
357
return _convert_xml_to_queue(response.body)
360
def _parse_bool(value):
361
if value.lower() == 'true':
366
def _convert_xml_to_queue(xmlstr):
367
''' Converts xml response to queue object.
369
The format of xml response for queue:
371
xmlns=\"http://schemas.microsoft.com/netservices/2010/10/servicebus/connect\">
372
<MaxSizeInBytes>10000</MaxSizeInBytes>
373
<DefaultMessageTimeToLive>PT5M</DefaultMessageTimeToLive>
374
<LockDuration>PT2M</LockDuration>
375
<RequiresGroupedReceives>False</RequiresGroupedReceives>
376
<SupportsDuplicateDetection>False</SupportsDuplicateDetection>
381
xmldoc = minidom.parseString(xmlstr)
385
# get node for each attribute in Queue class, if nothing found then the
386
# response is not valid xml for Queue.
387
for desc in _get_children_from_path(xmldoc,
391
node_value = _get_first_child_node_value(desc, 'LockDuration')
392
if node_value is not None:
393
queue.lock_duration = node_value
394
invalid_queue = False
396
node_value = _get_first_child_node_value(desc, 'MaxSizeInMegabytes')
397
if node_value is not None:
398
queue.max_size_in_megabytes = int(node_value)
399
invalid_queue = False
401
node_value = _get_first_child_node_value(
402
desc, 'RequiresDuplicateDetection')
403
if node_value is not None:
404
queue.requires_duplicate_detection = _parse_bool(node_value)
405
invalid_queue = False
407
node_value = _get_first_child_node_value(desc, 'RequiresSession')
408
if node_value is not None:
409
queue.requires_session = _parse_bool(node_value)
410
invalid_queue = False
412
node_value = _get_first_child_node_value(
413
desc, 'DefaultMessageTimeToLive')
414
if node_value is not None:
415
queue.default_message_time_to_live = node_value
416
invalid_queue = False
418
node_value = _get_first_child_node_value(
419
desc, 'DeadLetteringOnMessageExpiration')
420
if node_value is not None:
421
queue.dead_lettering_on_message_expiration = _parse_bool(node_value)
422
invalid_queue = False
424
node_value = _get_first_child_node_value(
425
desc, 'DuplicateDetectionHistoryTimeWindow')
426
if node_value is not None:
427
queue.duplicate_detection_history_time_window = node_value
428
invalid_queue = False
430
node_value = _get_first_child_node_value(
431
desc, 'EnableBatchedOperations')
432
if node_value is not None:
433
queue.enable_batched_operations = _parse_bool(node_value)
434
invalid_queue = False
436
node_value = _get_first_child_node_value(desc, 'MaxDeliveryCount')
437
if node_value is not None:
438
queue.max_delivery_count = int(node_value)
439
invalid_queue = False
441
node_value = _get_first_child_node_value(desc, 'MessageCount')
442
if node_value is not None:
443
queue.message_count = int(node_value)
444
invalid_queue = False
446
node_value = _get_first_child_node_value(desc, 'SizeInBytes')
447
if node_value is not None:
448
queue.size_in_bytes = int(node_value)
449
invalid_queue = False
452
raise WindowsAzureError(_ERROR_QUEUE_NOT_FOUND)
454
# extract id, updated and name value from feed entry and set them of queue.
455
for name, value in _get_entry_properties(xmlstr, True).items():
456
setattr(queue, name, value)
461
def _convert_response_to_topic(response):
462
return _convert_xml_to_topic(response.body)
465
def _convert_xml_to_topic(xmlstr):
466
'''Converts xml response to topic
468
The xml format for topic:
469
<entry xmlns='http://www.w3.org/2005/Atom'>
470
<content type='application/xml'>
472
xmlns:i="http://www.w3.org/2001/XMLSchema-instance"
473
xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect">
474
<DefaultMessageTimeToLive>P10675199DT2H48M5.4775807S</DefaultMessageTimeToLive>
475
<MaxSizeInMegabytes>1024</MaxSizeInMegabytes>
476
<RequiresDuplicateDetection>false</RequiresDuplicateDetection>
477
<DuplicateDetectionHistoryTimeWindow>P7D</DuplicateDetectionHistoryTimeWindow>
478
<DeadLetteringOnFilterEvaluationExceptions>true</DeadLetteringOnFilterEvaluationExceptions>
483
xmldoc = minidom.parseString(xmlstr)
488
# get node for each attribute in Topic class, if nothing found then the
489
# response is not valid xml for Topic.
490
for desc in _get_children_from_path(xmldoc,
495
node_value = _get_first_child_node_value(
496
desc, 'DefaultMessageTimeToLive')
497
if node_value is not None:
498
topic.default_message_time_to_live = node_value
499
invalid_topic = False
500
node_value = _get_first_child_node_value(desc, 'MaxSizeInMegabytes')
501
if node_value is not None:
502
topic.max_size_in_megabytes = int(node_value)
503
invalid_topic = False
504
node_value = _get_first_child_node_value(
505
desc, 'RequiresDuplicateDetection')
506
if node_value is not None:
507
topic.requires_duplicate_detection = _parse_bool(node_value)
508
invalid_topic = False
509
node_value = _get_first_child_node_value(
510
desc, 'DuplicateDetectionHistoryTimeWindow')
511
if node_value is not None:
512
topic.duplicate_detection_history_time_window = node_value
513
invalid_topic = False
514
node_value = _get_first_child_node_value(
515
desc, 'EnableBatchedOperations')
516
if node_value is not None:
517
topic.enable_batched_operations = _parse_bool(node_value)
518
invalid_topic = False
519
node_value = _get_first_child_node_value(desc, 'SizeInBytes')
520
if node_value is not None:
521
topic.size_in_bytes = int(node_value)
522
invalid_topic = False
525
raise WindowsAzureError(_ERROR_TOPIC_NOT_FOUND)
527
# extract id, updated and name value from feed entry and set them of topic.
528
for name, value in _get_entry_properties(xmlstr, True).items():
529
setattr(topic, name, value)
533
def _convert_response_to_subscription(response):
534
return _convert_xml_to_subscription(response.body)
537
def _convert_xml_to_subscription(xmlstr):
538
'''Converts xml response to subscription
540
The xml format for subscription:
541
<entry xmlns='http://www.w3.org/2005/Atom'>
542
<content type='application/xml'>
543
<SubscriptionDescription
544
xmlns:i="http://www.w3.org/2001/XMLSchema-instance"
545
xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect">
546
<LockDuration>PT5M</LockDuration>
547
<RequiresSession>false</RequiresSession>
548
<DefaultMessageTimeToLive>P10675199DT2H48M5.4775807S</DefaultMessageTimeToLive>
549
<DeadLetteringOnMessageExpiration>false</DeadLetteringOnMessageExpiration>
550
<DeadLetteringOnFilterEvaluationExceptions>true</DeadLetteringOnFilterEvaluationExceptions>
551
</SubscriptionDescription>
555
xmldoc = minidom.parseString(xmlstr)
556
subscription = Subscription()
558
for desc in _get_children_from_path(xmldoc,
561
'SubscriptionDescription'):
562
node_value = _get_first_child_node_value(desc, 'LockDuration')
563
if node_value is not None:
564
subscription.lock_duration = node_value
566
node_value = _get_first_child_node_value(
567
desc, 'RequiresSession')
568
if node_value is not None:
569
subscription.requires_session = _parse_bool(node_value)
571
node_value = _get_first_child_node_value(
572
desc, 'DefaultMessageTimeToLive')
573
if node_value is not None:
574
subscription.default_message_time_to_live = node_value
576
node_value = _get_first_child_node_value(
577
desc, 'DeadLetteringOnFilterEvaluationExceptions')
578
if node_value is not None:
579
subscription.dead_lettering_on_filter_evaluation_exceptions = \
580
_parse_bool(node_value)
582
node_value = _get_first_child_node_value(
583
desc, 'DeadLetteringOnMessageExpiration')
584
if node_value is not None:
585
subscription.dead_lettering_on_message_expiration = \
586
_parse_bool(node_value)
588
node_value = _get_first_child_node_value(
589
desc, 'EnableBatchedOperations')
590
if node_value is not None:
591
subscription.enable_batched_operations = _parse_bool(node_value)
593
node_value = _get_first_child_node_value(
594
desc, 'MaxDeliveryCount')
595
if node_value is not None:
596
subscription.max_delivery_count = int(node_value)
598
node_value = _get_first_child_node_value(
599
desc, 'MessageCount')
600
if node_value is not None:
601
subscription.message_count = int(node_value)
603
for name, value in _get_entry_properties(xmlstr,
605
'/subscriptions').items():
606
setattr(subscription, name, value)
611
def _convert_subscription_to_xml(subscription):
613
Converts a subscription object to xml to send. The order of each field of
614
subscription in xml is very important so we can't simple call
615
convert_class_to_xml.
617
subscription: the subsciption object to be converted.
620
subscription_body = '<SubscriptionDescription xmlns:i="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect">'
622
if subscription.lock_duration is not None:
623
subscription_body += ''.join(
625
str(subscription.lock_duration),
628
if subscription.requires_session is not None:
629
subscription_body += ''.join(
630
['<RequiresSession>',
631
str(subscription.requires_session).lower(),
632
'</RequiresSession>'])
634
if subscription.default_message_time_to_live is not None:
635
subscription_body += ''.join(
636
['<DefaultMessageTimeToLive>',
637
str(subscription.default_message_time_to_live),
638
'</DefaultMessageTimeToLive>'])
640
if subscription.dead_lettering_on_message_expiration is not None:
641
subscription_body += ''.join(
642
['<DeadLetteringOnMessageExpiration>',
643
str(subscription.dead_lettering_on_message_expiration).lower(),
644
'</DeadLetteringOnMessageExpiration>'])
646
if subscription.dead_lettering_on_filter_evaluation_exceptions is not None:
647
subscription_body += ''.join(
648
['<DeadLetteringOnFilterEvaluationExceptions>',
649
str(subscription.dead_lettering_on_filter_evaluation_exceptions).lower(),
650
'</DeadLetteringOnFilterEvaluationExceptions>'])
652
if subscription.enable_batched_operations is not None:
653
subscription_body += ''.join(
654
['<EnableBatchedOperations>',
655
str(subscription.enable_batched_operations).lower(),
656
'</EnableBatchedOperations>'])
658
if subscription.max_delivery_count is not None:
659
subscription_body += ''.join(
660
['<MaxDeliveryCount>',
661
str(subscription.max_delivery_count),
662
'</MaxDeliveryCount>'])
664
if subscription.message_count is not None:
665
subscription_body += ''.join(
667
str(subscription.message_count),
670
subscription_body += '</SubscriptionDescription>'
671
return _create_entry(subscription_body)
674
def _convert_rule_to_xml(rule):
676
Converts a rule object to xml to send. The order of each field of rule
677
in xml is very important so we cann't simple call convert_class_to_xml.
679
rule: the rule object to be converted.
681
rule_body = '<RuleDescription xmlns:i="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect">'
684
rule_body += ''.join(
686
xml_escape(rule.filter_type),
688
if rule.filter_type == 'CorrelationFilter':
689
rule_body += ''.join(
691
xml_escape(rule.filter_expression),
694
rule_body += ''.join(
696
xml_escape(rule.filter_expression),
698
rule_body += '<CompatibilityLevel>20</CompatibilityLevel>'
699
rule_body += '</Filter>'
701
rule_body += ''.join(
703
xml_escape(rule.action_type),
705
if rule.action_type == 'SqlRuleAction':
706
rule_body += ''.join(
708
xml_escape(rule.action_expression),
710
rule_body += '<CompatibilityLevel>20</CompatibilityLevel>'
711
rule_body += '</Action>'
712
rule_body += '</RuleDescription>'
714
return _create_entry(rule_body)
717
def _convert_topic_to_xml(topic):
719
Converts a topic object to xml to send. The order of each field of topic
720
in xml is very important so we cann't simple call convert_class_to_xml.
722
topic: the topic object to be converted.
725
topic_body = '<TopicDescription xmlns:i="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect">'
727
if topic.default_message_time_to_live is not None:
728
topic_body += ''.join(
729
['<DefaultMessageTimeToLive>',
730
str(topic.default_message_time_to_live),
731
'</DefaultMessageTimeToLive>'])
733
if topic.max_size_in_megabytes is not None:
734
topic_body += ''.join(
735
['<MaxSizeInMegabytes>',
736
str(topic.max_size_in_megabytes),
737
'</MaxSizeInMegabytes>'])
739
if topic.requires_duplicate_detection is not None:
740
topic_body += ''.join(
741
['<RequiresDuplicateDetection>',
742
str(topic.requires_duplicate_detection).lower(),
743
'</RequiresDuplicateDetection>'])
745
if topic.duplicate_detection_history_time_window is not None:
746
topic_body += ''.join(
747
['<DuplicateDetectionHistoryTimeWindow>',
748
str(topic.duplicate_detection_history_time_window),
749
'</DuplicateDetectionHistoryTimeWindow>'])
751
if topic.enable_batched_operations is not None:
752
topic_body += ''.join(
753
['<EnableBatchedOperations>',
754
str(topic.enable_batched_operations).lower(),
755
'</EnableBatchedOperations>'])
757
if topic.size_in_bytes is not None:
758
topic_body += ''.join(
760
str(topic.size_in_bytes),
763
topic_body += '</TopicDescription>'
765
return _create_entry(topic_body)
768
def _convert_queue_to_xml(queue):
770
Converts a queue object to xml to send. The order of each field of queue
771
in xml is very important so we cann't simple call convert_class_to_xml.
773
queue: the queue object to be converted.
775
queue_body = '<QueueDescription xmlns:i="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect">'
777
if queue.lock_duration:
778
queue_body += ''.join(
780
str(queue.lock_duration),
783
if queue.max_size_in_megabytes is not None:
784
queue_body += ''.join(
785
['<MaxSizeInMegabytes>',
786
str(queue.max_size_in_megabytes),
787
'</MaxSizeInMegabytes>'])
789
if queue.requires_duplicate_detection is not None:
790
queue_body += ''.join(
791
['<RequiresDuplicateDetection>',
792
str(queue.requires_duplicate_detection).lower(),
793
'</RequiresDuplicateDetection>'])
795
if queue.requires_session is not None:
796
queue_body += ''.join(
797
['<RequiresSession>',
798
str(queue.requires_session).lower(),
799
'</RequiresSession>'])
801
if queue.default_message_time_to_live is not None:
802
queue_body += ''.join(
803
['<DefaultMessageTimeToLive>',
804
str(queue.default_message_time_to_live),
805
'</DefaultMessageTimeToLive>'])
807
if queue.dead_lettering_on_message_expiration is not None:
808
queue_body += ''.join(
809
['<DeadLetteringOnMessageExpiration>',
810
str(queue.dead_lettering_on_message_expiration).lower(),
811
'</DeadLetteringOnMessageExpiration>'])
813
if queue.duplicate_detection_history_time_window is not None:
814
queue_body += ''.join(
815
['<DuplicateDetectionHistoryTimeWindow>',
816
str(queue.duplicate_detection_history_time_window),
817
'</DuplicateDetectionHistoryTimeWindow>'])
819
if queue.max_delivery_count is not None:
820
queue_body += ''.join(
821
['<MaxDeliveryCount>',
822
str(queue.max_delivery_count),
823
'</MaxDeliveryCount>'])
825
if queue.enable_batched_operations is not None:
826
queue_body += ''.join(
827
['<EnableBatchedOperations>',
828
str(queue.enable_batched_operations).lower(),
829
'</EnableBatchedOperations>'])
831
if queue.size_in_bytes is not None:
832
queue_body += ''.join(
834
str(queue.size_in_bytes),
837
if queue.message_count is not None:
838
queue_body += ''.join(
840
str(queue.message_count),
843
queue_body += '</QueueDescription>'
844
return _create_entry(queue_body)
847
def _service_bus_error_handler(http_error):
848
''' Simple error handler for service bus service. '''
849
return _general_error_handler(http_error)
851
from azure.servicebus.servicebusservice import ServiceBusService