1
#-------------------------------------------------------------------------
2
# Copyright (c) Microsoft. All rights reserved.
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
# http://www.apache.org/licenses/LICENSE-2.0
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
#--------------------------------------------------------------------------
17
from datetime import datetime
18
from xml.dom import minidom
19
from azure import (WindowsAzureData,
23
_general_error_handler,
24
_get_entry_properties,
26
_get_children_from_path,
27
_get_first_child_node_value,
28
_ERROR_MESSAGE_NOT_PEEK_LOCKED_ON_DELETE,
29
_ERROR_MESSAGE_NOT_PEEK_LOCKED_ON_UNLOCK,
30
_ERROR_QUEUE_NOT_FOUND,
31
_ERROR_TOPIC_NOT_FOUND,
33
from azure.http import HTTPError
35
#default rule name for subscription
36
DEFAULT_RULE_NAME='$Default'
38
#-----------------------------------------------------------------------------
39
# Constants for Azure app environment settings.
40
AZURE_SERVICEBUS_NAMESPACE = 'AZURE_SERVICEBUS_NAMESPACE'
41
AZURE_SERVICEBUS_ACCESS_KEY = 'AZURE_SERVICEBUS_ACCESS_KEY'
42
AZURE_SERVICEBUS_ISSUER = 'AZURE_SERVICEBUS_ISSUER'
44
# namespace used for converting rules to objects
45
XML_SCHEMA_NAMESPACE = 'http://www.w3.org/2001/XMLSchema-instance'
47
class Queue(WindowsAzureData):
48
''' Queue class corresponding to Queue Description: http://msdn.microsoft.com/en-us/library/windowsazure/hh780773'''
52
max_size_in_megabytes=None,
53
requires_duplicate_detection=None,
54
requires_session=None,
55
default_message_time_to_live=None,
56
dead_lettering_on_message_expiration=None,
57
duplicate_detection_history_time_window=None,
58
max_delivery_count=None,
59
enable_batched_operations=None,
63
self.lock_duration = lock_duration
64
self.max_size_in_megabytes = max_size_in_megabytes
65
self.requires_duplicate_detection = requires_duplicate_detection
66
self.requires_session = requires_session
67
self.default_message_time_to_live = default_message_time_to_live
68
self.dead_lettering_on_message_expiration = dead_lettering_on_message_expiration
69
self.duplicate_detection_history_time_window = duplicate_detection_history_time_window
70
self.max_delivery_count = max_delivery_count
71
self.enable_batched_operations = enable_batched_operations
72
self.size_in_bytes = size_in_bytes
73
self.message_count = message_count
75
class Topic(WindowsAzureData):
76
''' Topic class corresponding to Topic Description: http://msdn.microsoft.com/en-us/library/windowsazure/hh780749. '''
79
default_message_time_to_live=None,
80
max_size_in_megabytes=None,
81
requires_duplicate_detection=None,
82
duplicate_detection_history_time_window=None,
83
enable_batched_operations=None,
86
self.default_message_time_to_live = default_message_time_to_live
87
self.max_size_in_megabytes = max_size_in_megabytes
88
self.requires_duplicate_detection = requires_duplicate_detection
89
self.duplicate_detection_history_time_window = duplicate_detection_history_time_window
90
self.enable_batched_operations = enable_batched_operations
91
self.size_in_bytes = size_in_bytes
94
def max_size_in_mega_bytes(self):
96
warnings.warn('This attribute has been changed to max_size_in_megabytes.')
97
return self.max_size_in_megabytes
99
@max_size_in_mega_bytes.setter
100
def max_size_in_mega_bytes(self, value):
101
self.max_size_in_megabytes = value
104
class Subscription(WindowsAzureData):
105
''' Subscription class corresponding to Subscription Description: http://msdn.microsoft.com/en-us/library/windowsazure/hh780763. '''
109
requires_session=None,
110
default_message_time_to_live=None,
111
dead_lettering_on_message_expiration=None,
112
dead_lettering_on_filter_evaluation_exceptions=None,
113
enable_batched_operations=None,
114
max_delivery_count=None,
117
self.lock_duration = lock_duration
118
self.requires_session = requires_session
119
self.default_message_time_to_live = default_message_time_to_live
120
self.dead_lettering_on_message_expiration = dead_lettering_on_message_expiration
121
self.dead_lettering_on_filter_evaluation_exceptions = dead_lettering_on_filter_evaluation_exceptions
122
self.enable_batched_operations = enable_batched_operations
123
self.max_delivery_count = max_delivery_count
124
self.message_count = message_count
126
class Rule(WindowsAzureData):
127
''' Rule class corresponding to Rule Description: http://msdn.microsoft.com/en-us/library/windowsazure/hh780753. '''
129
def __init__(self, filter_type=None, filter_expression=None, action_type=None, action_expression=None):
130
self.filter_type = filter_type
131
self.filter_expression = filter_expression
132
self.action_type = action_type
133
self.action_expression = action_type
135
class Message(WindowsAzureData):
136
''' Message class that used in send message/get mesage apis. '''
138
def __init__(self, body=None, service_bus_service=None, location=None, custom_properties=None,
139
type='application/atom+xml;type=entry;charset=utf-8', broker_properties=None):
141
self.location = location
142
self.broker_properties = broker_properties
143
self.custom_properties = custom_properties
145
self.service_bus_service = service_bus_service
146
self._topic_name = None
147
self._subscription_name = None
148
self._queue_name = None
150
if not service_bus_service:
153
# if location is set, then extracts the queue name for queue message and
154
# extracts the topic and subscriptions name if it is topic message.
156
if '/subscriptions/' in location:
157
pos = location.find('/subscriptions/')
158
pos1 = location.rfind('/', 0, pos-1)
159
self._topic_name = location[pos1+1:pos]
160
pos += len('/subscriptions/')
161
pos1 = location.find('/', pos)
162
self._subscription_name = location[pos:pos1]
163
elif '/messages/' in location:
164
pos = location.find('/messages/')
165
pos1 = location.rfind('/', 0, pos-1)
166
self._queue_name = location[pos1+1:pos]
169
''' Deletes itself if find queue name or topic name and subscription name. '''
171
self.service_bus_service.delete_queue_message(self._queue_name, self.broker_properties['SequenceNumber'], self.broker_properties['LockToken'])
172
elif self._topic_name and self._subscription_name:
173
self.service_bus_service.delete_subscription_message(self._topic_name, self._subscription_name, self.broker_properties['SequenceNumber'], self.broker_properties['LockToken'])
175
raise WindowsAzureError(_ERROR_MESSAGE_NOT_PEEK_LOCKED_ON_DELETE)
178
''' Unlocks itself if find queue name or topic name and subscription name. '''
180
self.service_bus_service.unlock_queue_message(self._queue_name, self.broker_properties['SequenceNumber'], self.broker_properties['LockToken'])
181
elif self._topic_name and self._subscription_name:
182
self.service_bus_service.unlock_subscription_message(self._topic_name, self._subscription_name, self.broker_properties['SequenceNumber'], self.broker_properties['LockToken'])
184
raise WindowsAzureError(_ERROR_MESSAGE_NOT_PEEK_LOCKED_ON_UNLOCK)
186
def add_headers(self, request):
187
''' add addtional headers to request for message request.'''
189
# Adds custom properties
190
if self.custom_properties:
191
for name, value in self.custom_properties.iteritems():
192
if isinstance(value, unicode):
193
request.headers.append((name, '"' + value.encode('utf-8') + '"'))
194
elif isinstance(value, str):
195
request.headers.append((name, '"' + str(value) + '"'))
196
elif isinstance(value, datetime):
197
request.headers.append((name, '"' + value.strftime('%a, %d %b %Y %H:%M:%S GMT') + '"'))
199
request.headers.append((name, str(value).lower()))
202
request.headers.append(('Content-Type', self.type))
204
# Adds BrokerProperties
205
if self.broker_properties:
206
request.headers.append(('BrokerProperties', str(self.broker_properties)))
208
return request.headers
210
def _create_message(response, service_instance):
211
''' Create message from response.
213
response: response from service bus cloud server.
214
service_instance: the service bus client.
216
respbody = response.body
217
custom_properties = {}
218
broker_properties = None
220
message_location = None
222
#gets all information from respheaders.
223
for name, value in response.headers:
224
if name.lower() == 'brokerproperties':
225
broker_properties = ast.literal_eval(value)
226
elif name.lower() == 'content-type':
228
elif name.lower() == 'location':
229
message_location = value
230
elif name.lower() not in ['content-type', 'brokerproperties', 'transfer-encoding', 'server', 'location', 'date']:
234
custom_properties[name] = datetime.strptime(value, '%a, %d %b %Y %H:%M:%S GMT')
236
custom_properties[name] = value
237
else: #only int, float or boolean
238
if value.lower() == 'true':
239
custom_properties[name] = True
240
elif value.lower() == 'false':
241
custom_properties[name] = False
242
elif str(int(float(value))) == value: #int('3.1') doesn't work so need to get float('3.14') first
243
custom_properties[name] = int(value)
245
custom_properties[name] = float(value)
247
if message_type == None:
248
message = Message(respbody, service_instance, message_location, custom_properties, 'application/atom+xml;type=entry;charset=utf-8', broker_properties)
250
message = Message(respbody, service_instance, message_location, custom_properties, message_type, broker_properties)
254
def _convert_response_to_rule(response):
255
return _convert_xml_to_rule(response.body)
257
def _convert_xml_to_rule(xmlstr):
258
''' Converts response xml to rule object.
260
The format of xml for rule:
261
<entry xmlns='http://www.w3.org/2005/Atom'>
262
<content type='application/xml'>
263
<RuleDescription xmlns:i="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect">
264
<Filter i:type="SqlFilterExpression">
265
<SqlExpression>MyProperty='XYZ'</SqlExpression>
267
<Action i:type="SqlFilterAction">
268
<SqlExpression>set MyProperty2 = 'ABC'</SqlExpression>
274
xmldoc = minidom.parseString(xmlstr)
277
for rule_desc in _get_children_from_path(xmldoc, 'entry', 'content', 'RuleDescription'):
278
for xml_filter in _get_child_nodes(rule_desc, 'Filter'):
279
filter_type = xml_filter.getAttributeNS(XML_SCHEMA_NAMESPACE, 'type')
280
setattr(rule, 'filter_type', str(filter_type))
281
if xml_filter.childNodes:
283
for expr in _get_child_nodes(xml_filter, 'SqlExpression'):
284
setattr(rule, 'filter_expression', expr.firstChild.nodeValue)
286
for xml_action in _get_child_nodes(rule_desc, 'Action'):
287
action_type = xml_action.getAttributeNS(XML_SCHEMA_NAMESPACE, 'type')
288
setattr(rule, 'action_type', str(action_type))
289
if xml_action.childNodes:
290
action_expression = xml_action.childNodes[0].firstChild
291
if action_expression:
292
setattr(rule, 'action_expression', action_expression.nodeValue)
294
#extract id, updated and name value from feed entry and set them of rule.
295
for name, value in _get_entry_properties(xmlstr, True, '/rules').iteritems():
296
setattr(rule, name, value)
300
def _convert_response_to_queue(response):
301
return _convert_xml_to_queue(response.body)
303
def _parse_bool(value):
304
if value.lower() == 'true':
308
def _convert_xml_to_queue(xmlstr):
309
''' Converts xml response to queue object.
311
The format of xml response for queue:
312
<QueueDescription xmlns=\"http://schemas.microsoft.com/netservices/2010/10/servicebus/connect\">
313
<MaxSizeInBytes>10000</MaxSizeInBytes>
314
<DefaultMessageTimeToLive>PT5M</DefaultMessageTimeToLive>
315
<LockDuration>PT2M</LockDuration>
316
<RequiresGroupedReceives>False</RequiresGroupedReceives>
317
<SupportsDuplicateDetection>False</SupportsDuplicateDetection>
322
xmldoc = minidom.parseString(xmlstr)
326
#get node for each attribute in Queue class, if nothing found then the response is not valid xml for Queue.
327
for desc in _get_children_from_path(xmldoc, 'entry', 'content', 'QueueDescription'):
328
node_value = _get_first_child_node_value(desc, 'LockDuration')
329
if node_value is not None:
330
queue.lock_duration = node_value
331
invalid_queue = False
332
node_value = _get_first_child_node_value(desc, 'MaxSizeInMegabytes')
333
if node_value is not None:
334
queue.max_size_in_megabytes = int(node_value)
335
invalid_queue = False
336
node_value = _get_first_child_node_value(desc, 'RequiresDuplicateDetection')
337
if node_value is not None:
338
queue.requires_duplicate_detection = _parse_bool(node_value)
339
invalid_queue = False
340
node_value = _get_first_child_node_value(desc, 'RequiresSession')
341
if node_value is not None:
342
queue.requires_session = _parse_bool(node_value)
343
invalid_queue = False
344
node_value = _get_first_child_node_value(desc, 'DefaultMessageTimeToLive')
345
if node_value is not None:
346
queue.default_message_time_to_live = node_value
347
invalid_queue = False
348
node_value = _get_first_child_node_value(desc, 'DeadLetteringOnMessageExpiration')
349
if node_value is not None:
350
queue.dead_lettering_on_message_expiration = _parse_bool(node_value)
351
invalid_queue = False
352
node_value = _get_first_child_node_value(desc, 'DuplicateDetectionHistoryTimeWindow')
353
if node_value is not None:
354
queue.duplicate_detection_history_time_window = node_value
355
invalid_queue = False
356
node_value = _get_first_child_node_value(desc, 'EnableBatchedOperations')
357
if node_value is not None:
358
queue.enable_batched_operations = _parse_bool(node_value)
359
invalid_queue = False
360
node_value = _get_first_child_node_value(desc, 'MaxDeliveryCount')
361
if node_value is not None:
362
queue.max_delivery_count = int(node_value)
363
invalid_queue = False
364
node_value = _get_first_child_node_value(desc, 'MessageCount')
365
if node_value is not None:
366
queue.message_count = int(node_value)
367
invalid_queue = False
368
node_value = _get_first_child_node_value(desc, 'SizeInBytes')
369
if node_value is not None:
370
queue.size_in_bytes = int(node_value)
371
invalid_queue = False
374
raise WindowsAzureError(_ERROR_QUEUE_NOT_FOUND)
376
#extract id, updated and name value from feed entry and set them of queue.
377
for name, value in _get_entry_properties(xmlstr, True).iteritems():
378
setattr(queue, name, value)
382
def _convert_response_to_topic(response):
383
return _convert_xml_to_topic(response.body)
385
def _convert_xml_to_topic(xmlstr):
386
'''Converts xml response to topic
388
The xml format for topic:
389
<entry xmlns='http://www.w3.org/2005/Atom'>
390
<content type='application/xml'>
391
<TopicDescription xmlns:i="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect">
392
<DefaultMessageTimeToLive>P10675199DT2H48M5.4775807S</DefaultMessageTimeToLive>
393
<MaxSizeInMegabytes>1024</MaxSizeInMegabytes>
394
<RequiresDuplicateDetection>false</RequiresDuplicateDetection>
395
<DuplicateDetectionHistoryTimeWindow>P7D</DuplicateDetectionHistoryTimeWindow>
396
<DeadLetteringOnFilterEvaluationExceptions>true</DeadLetteringOnFilterEvaluationExceptions>
401
xmldoc = minidom.parseString(xmlstr)
406
#get node for each attribute in Topic class, if nothing found then the response is not valid xml for Topic.
407
for desc in _get_children_from_path(xmldoc, 'entry', 'content', 'TopicDescription'):
409
node_value = _get_first_child_node_value(desc, 'DefaultMessageTimeToLive')
410
if node_value is not None:
411
topic.default_message_time_to_live = node_value
412
invalid_topic = False
413
node_value = _get_first_child_node_value(desc, 'MaxSizeInMegabytes')
414
if node_value is not None:
415
topic.max_size_in_megabytes = int(node_value)
416
invalid_topic = False
417
node_value = _get_first_child_node_value(desc, 'RequiresDuplicateDetection')
418
if node_value is not None:
419
topic.requires_duplicate_detection = _parse_bool(node_value)
420
invalid_topic = False
421
node_value = _get_first_child_node_value(desc, 'DuplicateDetectionHistoryTimeWindow')
422
if node_value is not None:
423
topic.duplicate_detection_history_time_window = node_value
424
invalid_topic = False
425
node_value = _get_first_child_node_value(desc, 'EnableBatchedOperations')
426
if node_value is not None:
427
topic.enable_batched_operations = _parse_bool(node_value)
428
invalid_topic = False
429
node_value = _get_first_child_node_value(desc, 'SizeInBytes')
430
if node_value is not None:
431
topic.size_in_bytes = int(node_value)
432
invalid_topic = False
435
raise WindowsAzureError(_ERROR_TOPIC_NOT_FOUND)
437
#extract id, updated and name value from feed entry and set them of topic.
438
for name, value in _get_entry_properties(xmlstr, True).iteritems():
439
setattr(topic, name, value)
442
def _convert_response_to_subscription(response):
443
return _convert_xml_to_subscription(response.body)
445
def _convert_xml_to_subscription(xmlstr):
446
'''Converts xml response to subscription
448
The xml format for subscription:
449
<entry xmlns='http://www.w3.org/2005/Atom'>
450
<content type='application/xml'>
451
<SubscriptionDescription xmlns:i="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect">
452
<LockDuration>PT5M</LockDuration>
453
<RequiresSession>false</RequiresSession>
454
<DefaultMessageTimeToLive>P10675199DT2H48M5.4775807S</DefaultMessageTimeToLive>
455
<DeadLetteringOnMessageExpiration>false</DeadLetteringOnMessageExpiration> <DeadLetteringOnFilterEvaluationExceptions>true</DeadLetteringOnFilterEvaluationExceptions>
456
</SubscriptionDescription>
460
xmldoc = minidom.parseString(xmlstr)
461
subscription = Subscription()
463
for desc in _get_children_from_path(xmldoc, 'entry', 'content', 'SubscriptionDescription'):
464
node_value = _get_first_child_node_value(desc, 'LockDuration')
465
if node_value is not None:
466
subscription.lock_duration = node_value
467
node_value = _get_first_child_node_value(desc, 'RequiresSession')
468
if node_value is not None:
469
subscription.requires_session = _parse_bool(node_value)
470
node_value = _get_first_child_node_value(desc, 'DefaultMessageTimeToLive')
471
if node_value is not None:
472
subscription.default_message_time_to_live = node_value
473
node_value = _get_first_child_node_value(desc, 'DeadLetteringOnFilterEvaluationExceptions')
474
if node_value is not None:
475
subscription.dead_lettering_on_filter_evaluation_exceptions = _parse_bool(node_value)
476
node_value = _get_first_child_node_value(desc, 'DeadLetteringOnMessageExpiration')
477
if node_value is not None:
478
subscription.dead_lettering_on_message_expiration = _parse_bool(node_value)
479
node_value = _get_first_child_node_value(desc, 'EnableBatchedOperations')
480
if node_value is not None:
481
subscription.enable_batched_operations = _parse_bool(node_value)
482
node_value = _get_first_child_node_value(desc, 'MaxDeliveryCount')
483
if node_value is not None:
484
subscription.max_delivery_count = int(node_value)
485
node_value = _get_first_child_node_value(desc, 'MessageCount')
486
if node_value is not None:
487
subscription.message_count = int(node_value)
489
for name, value in _get_entry_properties(xmlstr, True, '/subscriptions').iteritems():
490
setattr(subscription, name, value)
494
def _convert_subscription_to_xml(subscription):
496
Converts a subscription object to xml to send. The order of each field of subscription
497
in xml is very important so we cann't simple call convert_class_to_xml.
499
subscription: the subsciption object to be converted.
502
subscription_body = '<SubscriptionDescription xmlns:i="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect">'
504
if subscription.lock_duration is not None:
505
subscription_body += ''.join(['<LockDuration>', str(subscription.lock_duration), '</LockDuration>'])
506
if subscription.requires_session is not None:
507
subscription_body += ''.join(['<RequiresSession>', str(subscription.requires_session).lower(), '</RequiresSession>'])
508
if subscription.default_message_time_to_live is not None:
509
subscription_body += ''.join(['<DefaultMessageTimeToLive>', str(subscription.default_message_time_to_live), '</DefaultMessageTimeToLive>'])
510
if subscription.dead_lettering_on_message_expiration is not None:
511
subscription_body += ''.join(['<DeadLetteringOnMessageExpiration>', str(subscription.dead_lettering_on_message_expiration).lower(), '</DeadLetteringOnMessageExpiration>'])
512
if subscription.dead_lettering_on_filter_evaluation_exceptions is not None:
513
subscription_body += ''.join(['<DeadLetteringOnFilterEvaluationExceptions>', str(subscription.dead_lettering_on_filter_evaluation_exceptions).lower(), '</DeadLetteringOnFilterEvaluationExceptions>'])
514
if subscription.enable_batched_operations is not None:
515
subscription_body += ''.join(['<EnableBatchedOperations>', str(subscription.enable_batched_operations).lower(), '</EnableBatchedOperations>'])
516
if subscription.max_delivery_count is not None:
517
subscription_body += ''.join(['<MaxDeliveryCount>', str(subscription.max_delivery_count), '</MaxDeliveryCount>'])
518
if subscription.message_count is not None:
519
subscription_body += ''.join(['<MessageCount>', str(subscription.message_count), '</MessageCount>'])
521
subscription_body += '</SubscriptionDescription>'
522
return _create_entry(subscription_body)
524
def _convert_rule_to_xml(rule):
526
Converts a rule object to xml to send. The order of each field of rule
527
in xml is very important so we cann't simple call convert_class_to_xml.
529
rule: the rule object to be converted.
531
rule_body = '<RuleDescription xmlns:i="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect">'
534
rule_body += ''.join(['<Filter i:type="', xml_escape(rule.filter_type), '">'])
535
if rule.filter_type == 'CorrelationFilter':
536
rule_body += ''.join(['<CorrelationId>', xml_escape(rule.filter_expression), '</CorrelationId>'])
538
rule_body += ''.join(['<SqlExpression>', xml_escape(rule.filter_expression), '</SqlExpression>'])
539
rule_body += '<CompatibilityLevel>20</CompatibilityLevel>'
540
rule_body += '</Filter>'
542
rule_body += ''.join(['<Action i:type="', xml_escape(rule.action_type), '">'])
543
if rule.action_type == 'SqlRuleAction':
544
rule_body += ''.join(['<SqlExpression>', xml_escape(rule.action_expression), '</SqlExpression>'])
545
rule_body += '<CompatibilityLevel>20</CompatibilityLevel>'
546
rule_body += '</Action>'
547
rule_body += '</RuleDescription>'
549
return _create_entry(rule_body)
551
def _convert_topic_to_xml(topic):
553
Converts a topic object to xml to send. The order of each field of topic
554
in xml is very important so we cann't simple call convert_class_to_xml.
556
topic: the topic object to be converted.
559
topic_body = '<TopicDescription xmlns:i="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect">'
561
if topic.default_message_time_to_live is not None:
562
topic_body += ''.join(['<DefaultMessageTimeToLive>', str(topic.default_message_time_to_live), '</DefaultMessageTimeToLive>'])
563
if topic.max_size_in_megabytes is not None:
564
topic_body += ''.join(['<MaxSizeInMegabytes>', str(topic.max_size_in_megabytes), '</MaxSizeInMegabytes>'])
565
if topic.requires_duplicate_detection is not None:
566
topic_body += ''.join(['<RequiresDuplicateDetection>', str(topic.requires_duplicate_detection).lower(), '</RequiresDuplicateDetection>'])
567
if topic.duplicate_detection_history_time_window is not None:
568
topic_body += ''.join(['<DuplicateDetectionHistoryTimeWindow>', str(topic.duplicate_detection_history_time_window), '</DuplicateDetectionHistoryTimeWindow>'])
569
if topic.enable_batched_operations is not None:
570
topic_body += ''.join(['<EnableBatchedOperations>', str(topic.enable_batched_operations).lower(), '</EnableBatchedOperations>'])
571
if topic.size_in_bytes is not None:
572
topic_body += ''.join(['<SizeInBytes>', str(topic.size_in_bytes), '</SizeInBytes>'])
573
topic_body += '</TopicDescription>'
575
return _create_entry(topic_body)
577
def _convert_queue_to_xml(queue):
579
Converts a queue object to xml to send. The order of each field of queue
580
in xml is very important so we cann't simple call convert_class_to_xml.
582
queue: the queue object to be converted.
584
queue_body = '<QueueDescription xmlns:i="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect">'
586
if queue.lock_duration:
587
queue_body += ''.join(['<LockDuration>', str(queue.lock_duration), '</LockDuration>'])
588
if queue.max_size_in_megabytes is not None:
589
queue_body += ''.join(['<MaxSizeInMegabytes>', str(queue.max_size_in_megabytes), '</MaxSizeInMegabytes>'])
590
if queue.requires_duplicate_detection is not None:
591
queue_body += ''.join(['<RequiresDuplicateDetection>', str(queue.requires_duplicate_detection).lower(), '</RequiresDuplicateDetection>'])
592
if queue.requires_session is not None:
593
queue_body += ''.join(['<RequiresSession>', str(queue.requires_session).lower(), '</RequiresSession>'])
594
if queue.default_message_time_to_live is not None:
595
queue_body += ''.join(['<DefaultMessageTimeToLive>', str(queue.default_message_time_to_live), '</DefaultMessageTimeToLive>'])
596
if queue.dead_lettering_on_message_expiration is not None:
597
queue_body += ''.join(['<DeadLetteringOnMessageExpiration>', str(queue.dead_lettering_on_message_expiration).lower(), '</DeadLetteringOnMessageExpiration>'])
598
if queue.duplicate_detection_history_time_window is not None:
599
queue_body += ''.join(['<DuplicateDetectionHistoryTimeWindow>', str(queue.duplicate_detection_history_time_window), '</DuplicateDetectionHistoryTimeWindow>'])
600
if queue.max_delivery_count is not None:
601
queue_body += ''.join(['<MaxDeliveryCount>', str(queue.max_delivery_count), '</MaxDeliveryCount>'])
602
if queue.enable_batched_operations is not None:
603
queue_body += ''.join(['<EnableBatchedOperations>', str(queue.enable_batched_operations).lower(), '</EnableBatchedOperations>'])
604
if queue.size_in_bytes is not None:
605
queue_body += ''.join(['<SizeInBytes>', str(queue.size_in_bytes), '</SizeInBytes>'])
606
if queue.message_count is not None:
607
queue_body += ''.join(['<MessageCount>', str(queue.message_count), '</MessageCount>'])
609
queue_body += '</QueueDescription>'
610
return _create_entry(queue_body)
612
def _service_bus_error_handler(http_error):
613
''' Simple error handler for service bus service. Will add more specific cases '''
614
return _general_error_handler(http_error)
616
from azure.servicebus.servicebusservice import ServiceBusService