3
#-------------------------------------------------------------------------
4
# Copyright (c) Microsoft. All rights reserved.
6
# Licensed under the Apache License, Version 2.0 (the "License");
7
# you may not use this file except in compliance with the License.
8
# You may obtain a copy of the License at
9
# http://www.apache.org/licenses/LICENSE-2.0
11
# Unless required by applicable law or agreed to in writing, software
12
# distributed under the License is distributed on an "AS IS" BASIS,
13
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
# See the License for the specific language governing permissions and
15
# limitations under the License.
16
#--------------------------------------------------------------------------
24
from datetime import datetime
25
from azure import WindowsAzureError
26
from azure.http import HTTPError
27
from azure.servicebus import (
28
AZURE_SERVICEBUS_NAMESPACE,
29
AZURE_SERVICEBUS_ACCESS_KEY,
30
AZURE_SERVICEBUS_ISSUER,
45
#------------------------------------------------------------------------------
48
class ServiceBusTest(AzureTestCase):
51
self.sbs = ServiceBusService(credentials.getServiceBusNamespace(),
52
credentials.getServiceBusKey(),
54
set_service_options(self.sbs)
56
self.queue_name = getUniqueName('utqueue')
57
self.topic_name = getUniqueName('uttopic')
59
self.additional_queue_names = []
60
self.additional_topic_names = []
64
return super(ServiceBusTest, self).tearDown()
68
self.sbs.delete_queue(self.queue_name)
72
for name in self.additional_queue_names:
74
self.sbs.delete_queue(name)
79
self.sbs.delete_topic(self.topic_name)
83
for name in self.additional_topic_names:
85
self.sbs.delete_topic(name)
89
#--Helpers-----------------------------------------------------------------
90
def _create_queue(self, queue_name):
91
self.sbs.create_queue(queue_name, None, True)
93
def _create_queue_and_send_msg(self, queue_name, msg):
94
self._create_queue(queue_name)
95
self.sbs.send_queue_message(queue_name, msg)
97
def _create_topic(self, topic_name):
98
self.sbs.create_topic(topic_name, None, True)
100
def _create_topic_and_subscription(self, topic_name, subscription_name):
101
self._create_topic(topic_name)
102
self._create_subscription(topic_name, subscription_name)
104
def _create_subscription(self, topic_name, subscription_name):
105
self.sbs.create_subscription(topic_name, subscription_name, None, True)
107
#--Test cases for service bus service -------------------------------------
108
def test_create_service_bus_missing_arguments(self):
110
if AZURE_SERVICEBUS_NAMESPACE in os.environ:
111
del os.environ[AZURE_SERVICEBUS_NAMESPACE]
112
if AZURE_SERVICEBUS_ACCESS_KEY in os.environ:
113
del os.environ[AZURE_SERVICEBUS_ACCESS_KEY]
114
if AZURE_SERVICEBUS_ISSUER in os.environ:
115
del os.environ[AZURE_SERVICEBUS_ISSUER]
118
with self.assertRaises(WindowsAzureError):
119
sbs = ServiceBusService()
123
def test_create_service_bus_env_variables(self):
126
AZURE_SERVICEBUS_NAMESPACE] = credentials.getServiceBusNamespace()
128
AZURE_SERVICEBUS_ACCESS_KEY] = credentials.getServiceBusKey()
129
os.environ[AZURE_SERVICEBUS_ISSUER] = 'owner'
132
sbs = ServiceBusService()
134
if AZURE_SERVICEBUS_NAMESPACE in os.environ:
135
del os.environ[AZURE_SERVICEBUS_NAMESPACE]
136
if AZURE_SERVICEBUS_ACCESS_KEY in os.environ:
137
del os.environ[AZURE_SERVICEBUS_ACCESS_KEY]
138
if AZURE_SERVICEBUS_ISSUER in os.environ:
139
del os.environ[AZURE_SERVICEBUS_ISSUER]
142
self.assertIsNotNone(sbs)
143
self.assertEqual(sbs.service_namespace,
144
credentials.getServiceBusNamespace())
145
self.assertEqual(sbs.account_key, credentials.getServiceBusKey())
146
self.assertEqual(sbs.issuer, 'owner')
148
#--Test cases for queues --------------------------------------------------
149
def test_create_queue_no_options(self):
153
created = self.sbs.create_queue(self.queue_name)
156
self.assertTrue(created)
158
def test_create_queue_no_options_fail_on_exist(self):
162
created = self.sbs.create_queue(self.queue_name, None, True)
165
self.assertTrue(created)
167
def test_create_queue_with_options(self):
171
queue_options = Queue()
172
queue_options.default_message_time_to_live = 'PT1M'
173
queue_options.duplicate_detection_history_time_window = 'PT5M'
174
queue_options.enable_batched_operations = False
175
queue_options.dead_lettering_on_message_expiration = False
176
queue_options.lock_duration = 'PT1M'
177
queue_options.max_delivery_count = 15
178
queue_options.max_size_in_megabytes = 5120
179
queue_options.message_count = 0
180
queue_options.requires_duplicate_detection = False
181
queue_options.requires_session = False
182
queue_options.size_in_bytes = 0
183
created = self.sbs.create_queue(self.queue_name, queue_options)
186
self.assertTrue(created)
187
queue = self.sbs.get_queue(self.queue_name)
188
self.assertEqual('PT1M', queue.default_message_time_to_live)
189
self.assertEqual('PT5M', queue.duplicate_detection_history_time_window)
190
self.assertEqual(False, queue.enable_batched_operations)
191
self.assertEqual(False, queue.dead_lettering_on_message_expiration)
192
self.assertEqual('PT1M', queue.lock_duration)
193
self.assertEqual(15, queue.max_delivery_count)
194
self.assertEqual(5120, queue.max_size_in_megabytes)
195
self.assertEqual(0, queue.message_count)
196
self.assertEqual(False, queue.requires_duplicate_detection)
197
self.assertEqual(False, queue.requires_session)
198
self.assertEqual(0, queue.size_in_bytes)
200
def test_create_queue_with_already_existing_queue(self):
204
created1 = self.sbs.create_queue(self.queue_name)
205
created2 = self.sbs.create_queue(self.queue_name)
208
self.assertTrue(created1)
209
self.assertFalse(created2)
211
def test_create_queue_with_already_existing_queue_fail_on_exist(self):
215
created = self.sbs.create_queue(self.queue_name)
216
with self.assertRaises(WindowsAzureError):
217
self.sbs.create_queue(self.queue_name, None, True)
220
self.assertTrue(created)
222
def test_get_queue_with_existing_queue(self):
224
self._create_queue(self.queue_name)
227
queue = self.sbs.get_queue(self.queue_name)
230
self.assertIsNotNone(queue)
231
self.assertEqual(queue.name, self.queue_name)
233
def test_get_queue_with_non_existing_queue(self):
237
with self.assertRaises(WindowsAzureError):
238
resp = self.sbs.get_queue(self.queue_name)
242
def test_list_queues(self):
244
self._create_queue(self.queue_name)
247
queues = self.sbs.list_queues()
252
self.assertIsNotNone(queues)
253
self.assertNamedItemInContainer(queues, self.queue_name)
255
def test_list_queues_with_special_chars(self):
257
# Name must start and end with an alphanumeric and can only contain
258
# letters, numbers, periods, hyphens, forward slashes and underscores.
259
other_queue_name = self.queue_name + 'txt/.-_123'
260
self.additional_queue_names = [other_queue_name]
261
self._create_queue(other_queue_name)
264
queues = self.sbs.list_queues()
267
self.assertIsNotNone(queues)
268
self.assertNamedItemInContainer(queues, other_queue_name)
270
def test_delete_queue_with_existing_queue(self):
272
self._create_queue(self.queue_name)
275
deleted = self.sbs.delete_queue(self.queue_name)
278
self.assertTrue(deleted)
279
queues = self.sbs.list_queues()
280
self.assertNamedItemNotInContainer(queues, self.queue_name)
282
def test_delete_queue_with_existing_queue_fail_not_exist(self):
284
self._create_queue(self.queue_name)
287
deleted = self.sbs.delete_queue(self.queue_name, True)
290
self.assertTrue(deleted)
291
queues = self.sbs.list_queues()
292
self.assertNamedItemNotInContainer(queues, self.queue_name)
294
def test_delete_queue_with_non_existing_queue(self):
298
deleted = self.sbs.delete_queue(self.queue_name)
301
self.assertFalse(deleted)
303
def test_delete_queue_with_non_existing_queue_fail_not_exist(self):
307
with self.assertRaises(WindowsAzureError):
308
self.sbs.delete_queue(self.queue_name, True)
312
def test_send_queue_message(self):
314
self._create_queue(self.queue_name)
315
sent_msg = Message(b'send message')
318
self.sbs.send_queue_message(self.queue_name, sent_msg)
322
def test_receive_queue_message_read_delete_mode(self):
324
sent_msg = Message(b'receive message')
325
self._create_queue_and_send_msg(self.queue_name, sent_msg)
328
received_msg = self.sbs.receive_queue_message(self.queue_name, False)
331
self.assertIsNotNone(received_msg)
332
self.assertEqual(sent_msg.body, received_msg.body)
334
def test_receive_queue_message_read_delete_mode_throws_on_delete(self):
336
sent_msg = Message(b'receive message')
337
self._create_queue_and_send_msg(self.queue_name, sent_msg)
340
received_msg = self.sbs.receive_queue_message(self.queue_name, False)
341
with self.assertRaises(WindowsAzureError):
342
received_msg.delete()
346
def test_receive_queue_message_read_delete_mode_throws_on_unlock(self):
348
sent_msg = Message(b'receive message')
349
self._create_queue_and_send_msg(self.queue_name, sent_msg)
352
received_msg = self.sbs.receive_queue_message(self.queue_name, False)
353
with self.assertRaises(WindowsAzureError):
354
received_msg.unlock()
358
def test_receive_queue_message_peek_lock_mode(self):
360
sent_msg = Message(b'peek lock message')
361
self._create_queue_and_send_msg(self.queue_name, sent_msg)
364
received_msg = self.sbs.receive_queue_message(self.queue_name, True)
367
self.assertIsNotNone(received_msg)
368
self.assertEqual(sent_msg.body, received_msg.body)
370
def test_receive_queue_message_delete(self):
372
sent_msg = Message(b'peek lock message delete')
373
self._create_queue_and_send_msg(self.queue_name, sent_msg)
376
received_msg = self.sbs.receive_queue_message(self.queue_name, True)
377
received_msg.delete()
380
self.assertIsNotNone(received_msg)
381
self.assertEqual(sent_msg.body, received_msg.body)
383
def test_receive_queue_message_unlock(self):
385
sent_msg = Message(b'peek lock message unlock')
386
self._create_queue_and_send_msg(self.queue_name, sent_msg)
389
received_msg = self.sbs.receive_queue_message(self.queue_name, True)
390
received_msg.unlock()
393
received_again_msg = self.sbs.receive_queue_message(
394
self.queue_name, True)
395
received_again_msg.delete()
396
self.assertIsNotNone(received_msg)
397
self.assertIsNotNone(received_again_msg)
398
self.assertEqual(sent_msg.body, received_msg.body)
399
self.assertEqual(received_again_msg.body, received_msg.body)
401
def test_send_queue_message_with_custom_message_type(self):
403
self._create_queue(self.queue_name)
407
b'<text>peek lock message custom message type</text>',
409
self.sbs.send_queue_message(self.queue_name, sent_msg)
410
received_msg = self.sbs.receive_queue_message(self.queue_name, True, 5)
411
received_msg.delete()
414
self.assertIsNotNone(received_msg)
415
self.assertEqual('text/xml', received_msg.type)
417
def test_send_queue_message_with_custom_message_properties(self):
419
self._create_queue(self.queue_name)
422
props = {'hello': 'world',
428
'dob': datetime(2011, 12, 14)}
429
sent_msg = Message(b'message with properties', custom_properties=props)
430
self.sbs.send_queue_message(self.queue_name, sent_msg)
431
received_msg = self.sbs.receive_queue_message(self.queue_name, True, 5)
432
received_msg.delete()
435
self.assertIsNotNone(received_msg)
436
self.assertEqual(received_msg.custom_properties['hello'], 'world')
437
self.assertEqual(received_msg.custom_properties['number'], 42)
438
self.assertEqual(received_msg.custom_properties['active'], True)
439
self.assertEqual(received_msg.custom_properties['deceased'], False)
440
self.assertEqual(received_msg.custom_properties['large'], 8555111000)
441
self.assertEqual(received_msg.custom_properties['floating'], 3.14)
443
received_msg.custom_properties['dob'], datetime(2011, 12, 14))
445
def test_receive_queue_message_timeout_5(self):
447
self._create_queue(self.queue_name)
451
received_msg = self.sbs.receive_queue_message(self.queue_name, True, 5)
452
duration = time.clock() - start
455
self.assertTrue(duration > 3 and duration < 7)
456
self.assertIsNotNone(received_msg)
457
self.assertIsNone(received_msg.body)
459
def test_receive_queue_message_timeout_50(self):
461
self._create_queue(self.queue_name)
465
received_msg = self.sbs.receive_queue_message(
466
self.queue_name, True, 50)
467
duration = time.clock() - start
470
self.assertTrue(duration > 48 and duration < 52)
471
self.assertIsNotNone(received_msg)
472
self.assertIsNone(received_msg.body)
474
#--Test cases for topics/subscriptions ------------------------------------
475
def test_create_topic_no_options(self):
479
created = self.sbs.create_topic(self.topic_name)
482
self.assertTrue(created)
484
def test_create_topic_no_options_fail_on_exist(self):
488
created = self.sbs.create_topic(self.topic_name, None, True)
491
self.assertTrue(created)
493
def test_create_topic_with_options(self):
497
topic_options = Topic()
498
topic_options.default_message_time_to_live = 'PT1M'
499
topic_options.duplicate_detection_history_time_window = 'PT5M'
500
topic_options.enable_batched_operations = False
501
topic_options.max_size_in_megabytes = 5120
502
topic_options.requires_duplicate_detection = False
503
topic_options.size_in_bytes = 0
504
# TODO: MaximumNumberOfSubscriptions is not supported?
505
created = self.sbs.create_topic(self.topic_name, topic_options)
508
self.assertTrue(created)
509
topic = self.sbs.get_topic(self.topic_name)
510
self.assertEqual('PT1M', topic.default_message_time_to_live)
511
self.assertEqual('PT5M', topic.duplicate_detection_history_time_window)
512
self.assertEqual(False, topic.enable_batched_operations)
513
self.assertEqual(5120, topic.max_size_in_megabytes)
514
self.assertEqual(False, topic.requires_duplicate_detection)
515
self.assertEqual(0, topic.size_in_bytes)
517
def test_create_topic_with_already_existing_topic(self):
521
created1 = self.sbs.create_topic(self.topic_name)
522
created2 = self.sbs.create_topic(self.topic_name)
525
self.assertTrue(created1)
526
self.assertFalse(created2)
528
def test_create_topic_with_already_existing_topic_fail_on_exist(self):
532
created = self.sbs.create_topic(self.topic_name)
533
with self.assertRaises(WindowsAzureError):
534
self.sbs.create_topic(self.topic_name, None, True)
537
self.assertTrue(created)
539
def test_topic_backwards_compatibility_warning(self):
541
topic_options = Topic()
542
topic_options.max_size_in_megabytes = 5120
545
val = topic_options.max_size_in_mega_bytes
548
self.assertEqual(val, 5120)
551
topic_options.max_size_in_mega_bytes = 1024
554
self.assertEqual(topic_options.max_size_in_megabytes, 1024)
556
def test_get_topic_with_existing_topic(self):
558
self._create_topic(self.topic_name)
561
topic = self.sbs.get_topic(self.topic_name)
564
self.assertIsNotNone(topic)
565
self.assertEqual(topic.name, self.topic_name)
567
def test_get_topic_with_non_existing_topic(self):
571
with self.assertRaises(WindowsAzureError):
572
self.sbs.get_topic(self.topic_name)
576
def test_list_topics(self):
578
self._create_topic(self.topic_name)
581
topics = self.sbs.list_topics()
586
self.assertIsNotNone(topics)
587
self.assertNamedItemInContainer(topics, self.topic_name)
589
def test_list_topics_with_special_chars(self):
591
# Name must start and end with an alphanumeric and can only contain
592
# letters, numbers, periods, hyphens, forward slashes and underscores.
593
other_topic_name = self.topic_name + 'txt/.-_123'
594
self.additional_topic_names = [other_topic_name]
595
self._create_topic(other_topic_name)
598
topics = self.sbs.list_topics()
601
self.assertIsNotNone(topics)
602
self.assertNamedItemInContainer(topics, other_topic_name)
604
def test_delete_topic_with_existing_topic(self):
606
self._create_topic(self.topic_name)
609
deleted = self.sbs.delete_topic(self.topic_name)
612
self.assertTrue(deleted)
613
topics = self.sbs.list_topics()
614
self.assertNamedItemNotInContainer(topics, self.topic_name)
616
def test_delete_topic_with_existing_topic_fail_not_exist(self):
618
self._create_topic(self.topic_name)
621
deleted = self.sbs.delete_topic(self.topic_name, True)
624
self.assertTrue(deleted)
625
topics = self.sbs.list_topics()
626
self.assertNamedItemNotInContainer(topics, self.topic_name)
628
def test_delete_topic_with_non_existing_topic(self):
632
deleted = self.sbs.delete_topic(self.topic_name)
635
self.assertFalse(deleted)
637
def test_delete_topic_with_non_existing_topic_fail_not_exist(self):
641
with self.assertRaises(WindowsAzureError):
642
self.sbs.delete_topic(self.topic_name, True)
646
def test_create_subscription(self):
648
self._create_topic(self.topic_name)
651
created = self.sbs.create_subscription(
652
self.topic_name, 'MySubscription')
655
self.assertTrue(created)
657
def test_create_subscription_with_options(self):
659
self._create_topic(self.topic_name)
662
subscription_options = Subscription()
663
subscription_options.dead_lettering_on_filter_evaluation_exceptions = False
664
subscription_options.dead_lettering_on_message_expiration = False
665
subscription_options.default_message_time_to_live = 'PT15M'
666
subscription_options.enable_batched_operations = False
667
subscription_options.lock_duration = 'PT1M'
668
subscription_options.max_delivery_count = 15
669
#message_count is read-only
670
subscription_options.message_count = 0
671
subscription_options.requires_session = False
672
created = self.sbs.create_subscription(
673
self.topic_name, 'MySubscription', subscription_options)
676
self.assertTrue(created)
677
subscription = self.sbs.get_subscription(
678
self.topic_name, 'MySubscription')
680
False, subscription.dead_lettering_on_filter_evaluation_exceptions)
682
False, subscription.dead_lettering_on_message_expiration)
683
self.assertEqual('PT15M', subscription.default_message_time_to_live)
684
self.assertEqual(False, subscription.enable_batched_operations)
685
self.assertEqual('PT1M', subscription.lock_duration)
686
# self.assertEqual(15, subscription.max_delivery_count) #no idea why
687
# max_delivery_count is always 10
688
self.assertEqual(0, subscription.message_count)
689
self.assertEqual(False, subscription.requires_session)
691
def test_create_subscription_fail_on_exist(self):
693
self._create_topic(self.topic_name)
696
created = self.sbs.create_subscription(
697
self.topic_name, 'MySubscription', None, True)
700
self.assertTrue(created)
702
def test_create_subscription_with_already_existing_subscription(self):
704
self._create_topic(self.topic_name)
707
created1 = self.sbs.create_subscription(
708
self.topic_name, 'MySubscription')
709
created2 = self.sbs.create_subscription(
710
self.topic_name, 'MySubscription')
713
self.assertTrue(created1)
714
self.assertFalse(created2)
716
def test_create_subscription_with_already_existing_subscription_fail_on_exist(self):
718
self._create_topic(self.topic_name)
721
created = self.sbs.create_subscription(
722
self.topic_name, 'MySubscription')
723
with self.assertRaises(WindowsAzureError):
724
self.sbs.create_subscription(
725
self.topic_name, 'MySubscription', None, True)
728
self.assertTrue(created)
730
def test_list_subscriptions(self):
732
self._create_topic_and_subscription(self.topic_name, 'MySubscription2')
735
subscriptions = self.sbs.list_subscriptions(self.topic_name)
738
self.assertIsNotNone(subscriptions)
739
self.assertEqual(len(subscriptions), 1)
740
self.assertEqual(subscriptions[0].name, 'MySubscription2')
742
def test_get_subscription_with_existing_subscription(self):
744
self._create_topic_and_subscription(self.topic_name, 'MySubscription3')
747
subscription = self.sbs.get_subscription(
748
self.topic_name, 'MySubscription3')
751
self.assertIsNotNone(subscription)
752
self.assertEqual(subscription.name, 'MySubscription3')
754
def test_get_subscription_with_non_existing_subscription(self):
756
self._create_topic_and_subscription(self.topic_name, 'MySubscription3')
759
with self.assertRaises(WindowsAzureError):
760
self.sbs.get_subscription(self.topic_name, 'MySubscription4')
764
def test_delete_subscription_with_existing_subscription(self):
766
self._create_topic(self.topic_name)
767
self._create_subscription(self.topic_name, 'MySubscription4')
768
self._create_subscription(self.topic_name, 'MySubscription5')
771
deleted = self.sbs.delete_subscription(
772
self.topic_name, 'MySubscription4')
775
self.assertTrue(deleted)
776
subscriptions = self.sbs.list_subscriptions(self.topic_name)
777
self.assertIsNotNone(subscriptions)
778
self.assertEqual(len(subscriptions), 1)
779
self.assertEqual(subscriptions[0].name, 'MySubscription5')
781
def test_delete_subscription_with_existing_subscription_fail_not_exist(self):
783
self._create_topic(self.topic_name)
784
self._create_subscription(self.topic_name, 'MySubscription4')
785
self._create_subscription(self.topic_name, 'MySubscription5')
788
deleted = self.sbs.delete_subscription(
789
self.topic_name, 'MySubscription4', True)
792
self.assertTrue(deleted)
793
subscriptions = self.sbs.list_subscriptions(self.topic_name)
794
self.assertIsNotNone(subscriptions)
795
self.assertEqual(len(subscriptions), 1)
796
self.assertEqual(subscriptions[0].name, 'MySubscription5')
798
def test_delete_subscription_with_non_existing_subscription(self):
800
self._create_topic(self.topic_name)
803
deleted = self.sbs.delete_subscription(
804
self.topic_name, 'MySubscription')
807
self.assertFalse(deleted)
809
def test_delete_subscription_with_non_existing_subscription_fail_not_exist(self):
811
self._create_topic(self.topic_name)
814
with self.assertRaises(WindowsAzureError):
815
self.sbs.delete_subscription(
816
self.topic_name, 'MySubscription', True)
820
def test_create_rule_no_options(self):
822
self._create_topic_and_subscription(self.topic_name, 'MySubscription')
825
created = self.sbs.create_rule(
826
self.topic_name, 'MySubscription', 'MyRule1')
829
self.assertTrue(created)
831
def test_create_rule_no_options_fail_on_exist(self):
833
self._create_topic_and_subscription(self.topic_name, 'MySubscription')
836
created = self.sbs.create_rule(
837
self.topic_name, 'MySubscription', 'MyRule1', None, True)
840
self.assertTrue(created)
842
def test_create_rule_with_already_existing_rule(self):
844
self._create_topic_and_subscription(self.topic_name, 'MySubscription')
847
created1 = self.sbs.create_rule(
848
self.topic_name, 'MySubscription', 'MyRule1')
849
created2 = self.sbs.create_rule(
850
self.topic_name, 'MySubscription', 'MyRule1')
853
self.assertTrue(created1)
854
self.assertFalse(created2)
856
def test_create_rule_with_already_existing_rule_fail_on_exist(self):
858
self._create_topic_and_subscription(self.topic_name, 'MySubscription')
861
created = self.sbs.create_rule(
862
self.topic_name, 'MySubscription', 'MyRule1')
863
with self.assertRaises(WindowsAzureError):
864
self.sbs.create_rule(
865
self.topic_name, 'MySubscription', 'MyRule1', None, True)
868
self.assertTrue(created)
870
def test_create_rule_with_options_sql_filter(self):
872
self._create_topic_and_subscription(self.topic_name, 'MySubscription')
876
rule1.filter_type = 'SqlFilter'
877
rule1.filter_expression = 'number > 40'
878
created = self.sbs.create_rule(
879
self.topic_name, 'MySubscription', 'MyRule1', rule1)
882
self.assertTrue(created)
884
def test_create_rule_with_options_true_filter(self):
886
self._create_topic_and_subscription(self.topic_name, 'MySubscription')
890
rule1.filter_type = 'TrueFilter'
891
rule1.filter_expression = '1=1'
892
created = self.sbs.create_rule(
893
self.topic_name, 'MySubscription', 'MyRule1', rule1)
896
self.assertTrue(created)
898
def test_create_rule_with_options_false_filter(self):
900
self._create_topic_and_subscription(self.topic_name, 'MySubscription')
904
rule1.filter_type = 'FalseFilter'
905
rule1.filter_expression = '1=0'
906
created = self.sbs.create_rule(
907
self.topic_name, 'MySubscription', 'MyRule1', rule1)
910
self.assertTrue(created)
912
def test_create_rule_with_options_correlation_filter(self):
914
self._create_topic_and_subscription(self.topic_name, 'MySubscription')
918
rule1.filter_type = 'CorrelationFilter'
919
rule1.filter_expression = 'myid'
920
created = self.sbs.create_rule(
921
self.topic_name, 'MySubscription', 'MyRule1', rule1)
924
self.assertTrue(created)
926
def test_create_rule_with_options_empty_rule_action(self):
928
self._create_topic_and_subscription(self.topic_name, 'MySubscription')
932
rule1.action_type = 'EmptyRuleAction'
933
rule1.action_expression = ''
934
created = self.sbs.create_rule(
935
self.topic_name, 'MySubscription', 'MyRule1', rule1)
938
self.assertTrue(created)
940
def test_create_rule_with_options_sql_rule_action(self):
942
self._create_topic_and_subscription(self.topic_name, 'MySubscription')
946
rule1.action_type = 'SqlRuleAction'
947
rule1.action_expression = "SET number = 5"
948
created = self.sbs.create_rule(
949
self.topic_name, 'MySubscription', 'MyRule1', rule1)
952
self.assertTrue(created)
954
def test_list_rules(self):
956
self._create_topic_and_subscription(self.topic_name, 'MySubscription')
957
resp = self.sbs.create_rule(
958
self.topic_name, 'MySubscription', 'MyRule2')
961
rules = self.sbs.list_rules(self.topic_name, 'MySubscription')
964
self.assertEqual(len(rules), 2)
966
def test_get_rule_with_existing_rule(self):
968
self._create_topic_and_subscription(self.topic_name, 'MySubscription')
971
rule = self.sbs.get_rule(self.topic_name, 'MySubscription', '$Default')
974
self.assertIsNotNone(rule)
975
self.assertEqual(rule.name, '$Default')
977
def test_get_rule_with_non_existing_rule(self):
979
self._create_topic_and_subscription(self.topic_name, 'MySubscription')
982
with self.assertRaises(WindowsAzureError):
983
self.sbs.get_rule(self.topic_name,
984
'MySubscription', 'NonExistingRule')
988
def test_get_rule_with_existing_rule_with_options(self):
990
self._create_topic_and_subscription(self.topic_name, 'MySubscription')
992
sent_rule.filter_type = 'SqlFilter'
993
sent_rule.filter_expression = 'number > 40'
994
sent_rule.action_type = 'SqlRuleAction'
995
sent_rule.action_expression = 'SET number = 5'
996
self.sbs.create_rule(
997
self.topic_name, 'MySubscription', 'MyRule1', sent_rule)
1000
received_rule = self.sbs.get_rule(
1001
self.topic_name, 'MySubscription', 'MyRule1')
1004
self.assertIsNotNone(received_rule)
1005
self.assertEqual(received_rule.name, 'MyRule1')
1006
self.assertEqual(received_rule.filter_type, sent_rule.filter_type)
1007
self.assertEqual(received_rule.filter_expression,
1008
sent_rule.filter_expression)
1009
self.assertEqual(received_rule.action_type, sent_rule.action_type)
1010
self.assertEqual(received_rule.action_expression,
1011
sent_rule.action_expression)
1013
def test_delete_rule_with_existing_rule(self):
1015
self._create_topic_and_subscription(self.topic_name, 'MySubscription')
1016
resp = self.sbs.create_rule(
1017
self.topic_name, 'MySubscription', 'MyRule3')
1018
resp = self.sbs.create_rule(
1019
self.topic_name, 'MySubscription', 'MyRule4')
1022
deleted1 = self.sbs.delete_rule(
1023
self.topic_name, 'MySubscription', 'MyRule4')
1024
deleted2 = self.sbs.delete_rule(
1025
self.topic_name, 'MySubscription', '$Default')
1028
self.assertTrue(deleted1)
1029
self.assertTrue(deleted2)
1030
rules = self.sbs.list_rules(self.topic_name, 'MySubscription')
1031
self.assertIsNotNone(rules)
1032
self.assertEqual(len(rules), 1)
1033
self.assertEqual(rules[0].name, 'MyRule3')
1035
def test_delete_rule_with_existing_rule_fail_not_exist(self):
1037
self._create_topic_and_subscription(self.topic_name, 'MySubscription')
1038
resp = self.sbs.create_rule(
1039
self.topic_name, 'MySubscription', 'MyRule3')
1040
resp = self.sbs.create_rule(
1041
self.topic_name, 'MySubscription', 'MyRule4')
1044
deleted1 = self.sbs.delete_rule(
1045
self.topic_name, 'MySubscription', 'MyRule4', True)
1046
deleted2 = self.sbs.delete_rule(
1047
self.topic_name, 'MySubscription', '$Default', True)
1050
self.assertTrue(deleted1)
1051
self.assertTrue(deleted2)
1052
rules = self.sbs.list_rules(self.topic_name, 'MySubscription')
1053
self.assertIsNotNone(rules)
1054
self.assertEqual(len(rules), 1)
1055
self.assertEqual(rules[0].name, 'MyRule3')
1057
def test_delete_rule_with_non_existing_rule(self):
1059
self._create_topic_and_subscription(self.topic_name, 'MySubscription')
1062
deleted = self.sbs.delete_rule(
1063
self.topic_name, 'MySubscription', 'NonExistingRule')
1066
self.assertFalse(deleted)
1068
def test_delete_rule_with_non_existing_rule_fail_not_exist(self):
1070
self._create_topic_and_subscription(self.topic_name, 'MySubscription')
1073
with self.assertRaises(WindowsAzureError):
1074
self.sbs.delete_rule(
1075
self.topic_name, 'MySubscription', 'NonExistingRule', True)
1079
def test_send_topic_message(self):
1081
self._create_topic_and_subscription(self.topic_name, 'MySubscription')
1082
sent_msg = Message(b'subscription message')
1085
self.sbs.send_topic_message(self.topic_name, sent_msg)
1089
def test_receive_subscription_message_read_delete_mode(self):
1091
self._create_topic_and_subscription(self.topic_name, 'MySubscription')
1092
sent_msg = Message(b'subscription message')
1093
self.sbs.send_topic_message(self.topic_name, sent_msg)
1096
received_msg = self.sbs.receive_subscription_message(
1097
self.topic_name, 'MySubscription', False)
1100
self.assertIsNotNone(received_msg)
1101
self.assertEqual(sent_msg.body, received_msg.body)
1103
def test_receive_subscription_message_read_delete_mode_throws_on_delete(self):
1105
self._create_topic_and_subscription(self.topic_name, 'MySubscription')
1106
sent_msg = Message(b'subscription message')
1107
self.sbs.send_topic_message(self.topic_name, sent_msg)
1110
received_msg = self.sbs.receive_subscription_message(
1111
self.topic_name, 'MySubscription', False)
1112
with self.assertRaises(WindowsAzureError):
1113
received_msg.delete()
1117
def test_receive_subscription_message_read_delete_mode_throws_on_unlock(self):
1119
self._create_topic_and_subscription(self.topic_name, 'MySubscription')
1120
sent_msg = Message(b'subscription message')
1121
self.sbs.send_topic_message(self.topic_name, sent_msg)
1124
received_msg = self.sbs.receive_subscription_message(
1125
self.topic_name, 'MySubscription', False)
1126
with self.assertRaises(WindowsAzureError):
1127
received_msg.unlock()
1131
def test_receive_subscription_message_peek_lock_mode(self):
1133
self._create_topic_and_subscription(self.topic_name, 'MySubscription')
1134
sent_msg = Message(b'subscription message')
1135
self.sbs.send_topic_message(self.topic_name, sent_msg)
1138
received_msg = self.sbs.receive_subscription_message(
1139
self.topic_name, 'MySubscription', True, 5)
1142
self.assertIsNotNone(received_msg)
1143
self.assertEqual(sent_msg.body, received_msg.body)
1145
def test_receive_subscription_message_delete(self):
1147
self._create_topic_and_subscription(self.topic_name, 'MySubscription')
1148
sent_msg = Message(b'subscription message')
1149
self.sbs.send_topic_message(self.topic_name, sent_msg)
1152
received_msg = self.sbs.receive_subscription_message(
1153
self.topic_name, 'MySubscription', True, 5)
1154
received_msg.delete()
1157
self.assertIsNotNone(received_msg)
1158
self.assertEqual(sent_msg.body, received_msg.body)
1160
def test_receive_subscription_message_unlock(self):
1162
self._create_topic_and_subscription(self.topic_name, 'MySubscription')
1163
sent_msg = Message(b'subscription message')
1164
self.sbs.send_topic_message(self.topic_name, sent_msg)
1167
received_msg = self.sbs.receive_subscription_message(
1168
self.topic_name, 'MySubscription', True)
1169
received_msg.unlock()
1172
received_again_msg = self.sbs.receive_subscription_message(
1173
self.topic_name, 'MySubscription', True)
1174
received_again_msg.delete()
1175
self.assertIsNotNone(received_msg)
1176
self.assertIsNotNone(received_again_msg)
1177
self.assertEqual(sent_msg.body, received_msg.body)
1178
self.assertEqual(received_again_msg.body, received_msg.body)
1180
def test_with_filter(self):
1184
def my_filter(request, next):
1186
return next(request)
1188
sbs = self.sbs.with_filter(my_filter)
1189
sbs.create_topic(self.topic_name + '0', None, True)
1191
self.assertTrue(called)
1195
sbs.delete_topic(self.topic_name + '0')
1197
self.assertTrue(called)
1201
def filter_a(request, next):
1203
return next(request)
1205
def filter_b(request, next):
1207
return next(request)
1209
sbs = self.sbs.with_filter(filter_a).with_filter(filter_b)
1210
sbs.create_topic(self.topic_name + '0', None, True)
1212
self.assertEqual(called, ['b', 'a'])
1214
sbs.delete_topic(self.topic_name + '0')
1216
self.assertEqual(called, ['b', 'a', 'b', 'a'])
1218
def test_two_identities(self):
1219
# In order to run this test, 2 service bus service identities are
1220
# created using the sbaztool available at:
1221
# http://code.msdn.microsoft.com/windowsazure/Authorization-SBAzTool-6fd76d93
1223
# Use the following commands to create 2 identities and grant access
1225
# Replace <servicebusnamespace> with the namespace specified in the
1227
# Replace <servicebuskey> with the key specified in the test .json file
1228
# This only needs to be executed once, after the service bus namespace
1231
# sbaztool makeid user1 NoHEoD6snlvlhZm7yek9Etxca3l0CYjfc19ICIJZoUg= -n <servicebusnamespace> -k <servicebuskey>
1232
# sbaztool grant Send /path1 user1 -n <servicebusnamespace> -k <servicebuskey>
1233
# sbaztool grant Listen /path1 user1 -n <servicebusnamespace> -k <servicebuskey>
1234
# sbaztool grant Manage /path1 user1 -n <servicebusnamespace> -k
1237
# sbaztool makeid user2 Tb6K5qEgstyRBwp86JEjUezKj/a+fnkLFnibfgvxvdg= -n <servicebusnamespace> -k <servicebuskey>
1238
# sbaztool grant Send /path2 user2 -n <servicebusnamespace> -k <servicebuskey>
1239
# sbaztool grant Listen /path2 user2 -n <servicebusnamespace> -k <servicebuskey>
1240
# sbaztool grant Manage /path2 user2 -n <servicebusnamespace> -k
1243
sbs1 = ServiceBusService(credentials.getServiceBusNamespace(),
1244
'NoHEoD6snlvlhZm7yek9Etxca3l0CYjfc19ICIJZoUg=',
1246
sbs2 = ServiceBusService(credentials.getServiceBusNamespace(),
1247
'Tb6K5qEgstyRBwp86JEjUezKj/a+fnkLFnibfgvxvdg=',
1250
queue1_name = 'path1/queue' + str(random.randint(1, 10000000))
1251
queue2_name = 'path2/queue' + str(random.randint(1, 10000000))
1254
# Create queues, success
1255
sbs1.create_queue(queue1_name)
1256
sbs2.create_queue(queue2_name)
1258
# Receive messages, success
1259
msg = sbs1.receive_queue_message(queue1_name, True, 1)
1260
self.assertIsNone(msg.body)
1261
msg = sbs1.receive_queue_message(queue1_name, True, 1)
1262
self.assertIsNone(msg.body)
1263
msg = sbs2.receive_queue_message(queue2_name, True, 1)
1264
self.assertIsNone(msg.body)
1265
msg = sbs2.receive_queue_message(queue2_name, True, 1)
1266
self.assertIsNone(msg.body)
1268
# Receive messages, failure
1269
with self.assertRaises(HTTPError):
1270
msg = sbs1.receive_queue_message(queue2_name, True, 1)
1271
with self.assertRaises(HTTPError):
1272
msg = sbs2.receive_queue_message(queue1_name, True, 1)
1275
sbs1.delete_queue(queue1_name)
1279
sbs2.delete_queue(queue2_name)
1283
def test_unicode_create_queue_unicode_name(self):
1285
self.queue_name = self.queue_name + u'啊齄丂狛狜'
1288
with self.assertRaises(WindowsAzureError):
1289
created = self.sbs.create_queue(self.queue_name)
1293
def test_send_queue_message_unicode_python_27(self):
1294
'''Test for auto-encoding of unicode text (backwards compatibility).'''
1295
if sys.version_info >= (3,):
1299
data = u'receive message啊齄丂狛狜'
1300
sent_msg = Message(data)
1301
self._create_queue(self.queue_name)
1304
self.sbs.send_queue_message(self.queue_name, sent_msg)
1307
received_msg = self.sbs.receive_queue_message(self.queue_name, False)
1308
self.assertIsNotNone(received_msg)
1309
self.assertEqual(received_msg.body, data.encode('utf-8'))
1311
def test_send_queue_message_unicode_python_33(self):
1312
if sys.version_info < (3,):
1316
data = u'receive message啊齄丂狛狜'
1317
sent_msg = Message(data)
1318
self._create_queue(self.queue_name)
1321
with self.assertRaises(TypeError):
1322
self.sbs.send_queue_message(self.queue_name, sent_msg)
1326
def test_unicode_receive_queue_message_unicode_data(self):
1328
sent_msg = Message(u'receive message啊齄丂狛狜'.encode('utf-8'))
1329
self._create_queue_and_send_msg(self.queue_name, sent_msg)
1332
received_msg = self.sbs.receive_queue_message(self.queue_name, False)
1335
self.assertIsNotNone(received_msg)
1336
self.assertEqual(sent_msg.body, received_msg.body)
1338
def test_unicode_receive_queue_message_binary_data(self):
1340
base64_data = 'AAECAwQFBgcICQoLDA0ODxAREhMUFRYXGBkaGxwdHh8gISIjJCUmJygpKissLS4vMDEyMzQ1Njc4OTo7PD0+P0BBQkNERUZHSElKS0xNTk9QUVJTVFVWV1hZWltcXV5fYGFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6e3x9fn+AgYKDhIWGh4iJiouMjY6PkJGSk5SVlpeYmZqbnJ2en6ChoqOkpaanqKmqq6ytrq+wsbKztLW2t7i5uru8vb6/wMHCw8TFxsfIycrLzM3Oz9DR0tPU1dbX2Nna29zd3t/g4eLj5OXm5+jp6uvs7e7v8PHy8/T19vf4+fr7/P3+/wABAgMEBQYHCAkKCwwNDg8QERITFBUWFxgZGhscHR4fICEiIyQlJicoKSorLC0uLzAxMjM0NTY3ODk6Ozw9Pj9AQUJDREVGR0hJSktMTU5PUFFSU1RVVldYWVpbXF1eX2BhYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ent8fX5/gIGCg4SFhoeIiYqLjI2Oj5CRkpOUlZaXmJmam5ydnp+goaKjpKWmp6ipqqusra6vsLGys7S1tre4ubq7vL2+v8DBwsPExcbHyMnKy8zNzs/Q0dLT1NXW19jZ2tvc3d7f4OHi4+Tl5ufo6err7O3u7/Dx8vP09fb3+Pn6+/z9/v8AAQIDBAUGBwgJCgsMDQ4PEBESExQVFhcYGRobHB0eHyAhIiMkJSYnKCkqKywtLi8wMTIzNDU2Nzg5Ojs8PT4/QEFCQ0RFRkdISUpLTE1OT1BRUlNUVVZXWFlaW1xdXl9gYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXp7fH1+f4CBgoOEhYaHiImKi4yNjo+QkZKTlJWWl5iZmpucnZ6foKGio6SlpqeoqaqrrK2ur7CxsrO0tba3uLm6u7y9vr/AwcLDxMXGx8jJysvMzc7P0NHS09TV1tfY2drb3N3e3+Dh4uPk5ebn6Onq6+zt7u/w8fLz9PX29/j5+vv8/f7/AAECAwQFBgcICQoLDA0ODxAREhMUFRYXGBkaGxwdHh8gISIjJCUmJygpKissLS4vMDEyMzQ1Njc4OTo7PD0+P0BBQkNERUZHSElKS0xNTk9QUVJTVFVWV1hZWltcXV5fYGFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6e3x9fn+AgYKDhIWGh4iJiouMjY6PkJGSk5SVlpeYmZqbnJ2en6ChoqOkpaanqKmqq6ytrq+wsbKztLW2t7i5uru8vb6/wMHCw8TFxsfIycrLzM3Oz9DR0tPU1dbX2Nna29zd3t/g4eLj5OXm5+jp6uvs7e7v8PHy8/T19vf4+fr7/P3+/w=='
1341
binary_data = base64.b64decode(base64_data)
1342
sent_msg = Message(binary_data)
1343
self._create_queue_and_send_msg(self.queue_name, sent_msg)
1346
received_msg = self.sbs.receive_queue_message(self.queue_name, False)
1349
self.assertIsNotNone(received_msg)
1350
self.assertEqual(sent_msg.body, received_msg.body)
1352
def test_unicode_create_subscription_unicode_name(self):
1354
self._create_topic(self.topic_name)
1357
with self.assertRaises(WindowsAzureError):
1358
created = self.sbs.create_subscription(
1359
self.topic_name, u'MySubscription啊齄丂狛狜')
1363
def test_unicode_create_rule_unicode_name(self):
1365
self._create_topic_and_subscription(self.topic_name, 'MySubscription')
1368
with self.assertRaises(WindowsAzureError):
1369
created = self.sbs.create_rule(
1370
self.topic_name, 'MySubscription', 'MyRule啊齄丂狛狜')
1374
def test_send_topic_message_unicode_python_27(self):
1375
'''Test for auto-encoding of unicode text (backwards compatibility).'''
1376
if sys.version_info >= (3,):
1380
data = u'receive message啊齄丂狛狜'
1381
sent_msg = Message(data)
1382
self._create_topic_and_subscription(self.topic_name, 'MySubscription')
1385
self.sbs.send_topic_message(self.topic_name, sent_msg)
1388
received_msg = self.sbs.receive_subscription_message(
1389
self.topic_name, 'MySubscription', False)
1390
self.assertIsNotNone(received_msg)
1391
self.assertEqual(received_msg.body, data.encode('utf-8'))
1393
def test_send_topic_message_unicode_python_33(self):
1394
if sys.version_info < (3,):
1398
data = u'receive message啊齄丂狛狜'
1399
sent_msg = Message(data)
1400
self._create_topic_and_subscription(self.topic_name, 'MySubscription')
1403
with self.assertRaises(TypeError):
1404
self.sbs.send_topic_message(self.topic_name, sent_msg)
1408
def test_unicode_receive_subscription_message_unicode_data(self):
1410
self._create_topic_and_subscription(self.topic_name, 'MySubscription')
1411
sent_msg = Message(u'subscription message啊齄丂狛狜'.encode('utf-8'))
1412
self.sbs.send_topic_message(self.topic_name, sent_msg)
1415
received_msg = self.sbs.receive_subscription_message(
1416
self.topic_name, 'MySubscription', False)
1419
self.assertIsNotNone(received_msg)
1420
self.assertEqual(sent_msg.body, received_msg.body)
1422
def test_unicode_receive_subscription_message_binary_data(self):
1424
base64_data = 'AAECAwQFBgcICQoLDA0ODxAREhMUFRYXGBkaGxwdHh8gISIjJCUmJygpKissLS4vMDEyMzQ1Njc4OTo7PD0+P0BBQkNERUZHSElKS0xNTk9QUVJTVFVWV1hZWltcXV5fYGFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6e3x9fn+AgYKDhIWGh4iJiouMjY6PkJGSk5SVlpeYmZqbnJ2en6ChoqOkpaanqKmqq6ytrq+wsbKztLW2t7i5uru8vb6/wMHCw8TFxsfIycrLzM3Oz9DR0tPU1dbX2Nna29zd3t/g4eLj5OXm5+jp6uvs7e7v8PHy8/T19vf4+fr7/P3+/wABAgMEBQYHCAkKCwwNDg8QERITFBUWFxgZGhscHR4fICEiIyQlJicoKSorLC0uLzAxMjM0NTY3ODk6Ozw9Pj9AQUJDREVGR0hJSktMTU5PUFFSU1RVVldYWVpbXF1eX2BhYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ent8fX5/gIGCg4SFhoeIiYqLjI2Oj5CRkpOUlZaXmJmam5ydnp+goaKjpKWmp6ipqqusra6vsLGys7S1tre4ubq7vL2+v8DBwsPExcbHyMnKy8zNzs/Q0dLT1NXW19jZ2tvc3d7f4OHi4+Tl5ufo6err7O3u7/Dx8vP09fb3+Pn6+/z9/v8AAQIDBAUGBwgJCgsMDQ4PEBESExQVFhcYGRobHB0eHyAhIiMkJSYnKCkqKywtLi8wMTIzNDU2Nzg5Ojs8PT4/QEFCQ0RFRkdISUpLTE1OT1BRUlNUVVZXWFlaW1xdXl9gYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXp7fH1+f4CBgoOEhYaHiImKi4yNjo+QkZKTlJWWl5iZmpucnZ6foKGio6SlpqeoqaqrrK2ur7CxsrO0tba3uLm6u7y9vr/AwcLDxMXGx8jJysvMzc7P0NHS09TV1tfY2drb3N3e3+Dh4uPk5ebn6Onq6+zt7u/w8fLz9PX29/j5+vv8/f7/AAECAwQFBgcICQoLDA0ODxAREhMUFRYXGBkaGxwdHh8gISIjJCUmJygpKissLS4vMDEyMzQ1Njc4OTo7PD0+P0BBQkNERUZHSElKS0xNTk9QUVJTVFVWV1hZWltcXV5fYGFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6e3x9fn+AgYKDhIWGh4iJiouMjY6PkJGSk5SVlpeYmZqbnJ2en6ChoqOkpaanqKmqq6ytrq+wsbKztLW2t7i5uru8vb6/wMHCw8TFxsfIycrLzM3Oz9DR0tPU1dbX2Nna29zd3t/g4eLj5OXm5+jp6uvs7e7v8PHy8/T19vf4+fr7/P3+/w=='
1425
binary_data = base64.b64decode(base64_data)
1426
self._create_topic_and_subscription(self.topic_name, 'MySubscription')
1427
sent_msg = Message(binary_data)
1428
self.sbs.send_topic_message(self.topic_name, sent_msg)
1431
received_msg = self.sbs.receive_subscription_message(
1432
self.topic_name, 'MySubscription', False)
1435
self.assertIsNotNone(received_msg)
1436
self.assertEqual(sent_msg.body, received_msg.body)
1438
#------------------------------------------------------------------------------
1439
if __name__ == '__main__':