7
7
from landscape.broker.registration import (
8
8
InvalidCredentialsError, RegistrationHandler, is_cloud_managed, EC2_HOST,
11
from landscape.broker.deployment import BrokerConfiguration
12
from landscape.tests.helpers import LandscapeTest, ExchangeHelper
11
from landscape.broker.config import BrokerConfiguration
12
from landscape.tests.helpers import LandscapeTest
13
from landscape.broker.tests.helpers import (
14
BrokerConfigurationHelper, RegistrationHelper)
13
15
from landscape.lib.bpickle import dumps
14
16
from landscape.lib.fetch import HTTPCodeError, FetchError
17
class RegistrationTest(LandscapeTest):
19
helpers = [ExchangeHelper]
17
from landscape.lib.persist import Persist
20
class IdentityTest(LandscapeTest):
22
helpers = [BrokerConfigurationHelper]
22
super(RegistrationTest, self).setUp()
23
self.config = self.broker_service.config
24
self.identity = self.broker_service.identity
25
self.handler = self.broker_service.registration
26
logging.getLogger().setLevel(logging.INFO)
27
self.hostname = "ooga.local"
28
self.addCleanup(setattr, socket, "getfqdn", socket.getfqdn)
29
socket.getfqdn = lambda: self.hostname
25
super(IdentityTest, self).setUp()
26
self.persist = Persist(filename=self.makePersistFile())
27
self.identity = Identity(self.config, self.persist)
31
29
def check_persist_property(self, attr, persist_name):
37
35
self.assertEquals(getattr(self.identity, attr), value,
38
36
"%r attribute should be %r, not %r" %
39
37
(attr, value, getattr(self.identity, attr)))
40
self.assertEquals(self.persist.get(persist_name), value,
41
"%r not set to %r in persist" % (persist_name, value))
39
self.persist.get(persist_name), value,
40
"%r not set to %r in persist" % (persist_name, value))
43
42
def check_config_property(self, attr):
47
46
"%r attribute should be %r, not %r" %
48
47
(attr, value, getattr(self.identity, attr)))
50
def get_user_data(self, otps=None,
51
exchange_url="https://example.com/message-system",
52
ping_url="http://example.com/ping"):
55
return {"otps": otps, "exchange-url": exchange_url,
58
49
def test_secure_id(self):
59
50
self.check_persist_property("secure_id",
60
51
"registration.secure-id")
75
66
def test_client_tags(self):
76
67
self.check_config_property("tags")
70
class RegistrationHandlerTestBase(LandscapeTest):
72
helpers = [RegistrationHelper]
75
super(RegistrationHandlerTestBase, self).setUp()
76
logging.getLogger().setLevel(logging.INFO)
77
self.hostname = "ooga.local"
78
self.addCleanup(setattr, socket, "getfqdn", socket.getfqdn)
79
socket.getfqdn = lambda: self.hostname
82
class RegistrationHandlerTest(RegistrationHandlerTestBase):
78
84
def test_server_initiated_id_changing(self):
80
86
The server must be able to ask a client to change its secure
146
152
"account_name": "account_name",
147
153
"registration_password": None,
148
154
"hostname": "ooga.local",
151
156
self.assertEquals(self.logfile.getvalue().strip(),
152
157
"INFO: Queueing message to register with account "
153
158
"'account_name' without a password.")
165
170
"account_name": "account_name",
166
171
"registration_password": "SEKRET",
167
172
"hostname": "ooga.local",
170
174
self.assertEquals(self.logfile.getvalue().strip(),
171
175
"INFO: Queueing message to register with account "
172
176
"'account_name' with a password.")
188
192
"account_name": "account_name",
189
193
"registration_password": "SEKRET",
190
194
"hostname": "ooga.local",
191
"tags": u"computer,tag"}
195
"tags": u"computer,tag"}])
193
196
self.assertEquals(self.logfile.getvalue().strip(),
194
197
"INFO: Queueing message to register with account "
195
198
"'account_name' and tags computer,tag "
214
217
"account_name": "account_name",
215
218
"registration_password": "SEKRET",
216
219
"hostname": "ooga.local",
219
221
self.assertEquals(self.logfile.getvalue().strip(),
220
222
"ERROR: Invalid tags provided for cloud "
221
223
"registration.\n "
233
235
self.config.registration_password = "SEKRET"
234
236
self.config.tags = u"prova\N{LATIN SMALL LETTER J WITH CIRCUMFLEX}o"
235
237
self.reactor.fire("pre-exchange")
236
self.assertMessages(self.mstore.get_pending_messages(),
237
[{"type": "register",
238
"computer_title": "Computer Title",
239
"account_name": "account_name",
240
"registration_password": "SEKRET",
241
"hostname": "ooga.local",
242
"tags": u"prova\N{LATIN SMALL LETTER J WITH CIRCUMFLEX}o"}
239
self.mstore.get_pending_messages(),
240
[{"type": "register",
241
"computer_title": "Computer Title",
242
"account_name": "account_name",
243
"registration_password": "SEKRET",
244
"hostname": "ooga.local",
245
"tags": u"prova\N{LATIN SMALL LETTER J WITH CIRCUMFLEX}o"}])
244
246
self.assertEquals(self.logfile.getvalue().strip(),
245
247
"INFO: Queueing message to register with account "
246
248
"'account_name' and tags prova\xc4\xb5o "
423
431
"account_name": "account_name",
424
432
"registration_password": "SEKRET",
425
433
"hostname": socket.getfqdn(),
429
def get_registration_handler_for_cloud(
437
class CloudRegistrationHandlerTest(RegistrationHandlerTestBase):
442
super(CloudRegistrationHandlerTest, self).setUp()
443
self.query_results = {}
446
value = self.query_results[url]
447
if isinstance(value, Exception):
450
return succeed(value)
452
self.fetch_func = fetch_stub
454
def get_user_data(self, otps=None,
455
exchange_url="https://example.com/message-system",
456
ping_url="http://example.com/ping"):
459
return {"otps": otps, "exchange-url": exchange_url,
460
"ping-url": ping_url}
462
def prepare_query_results(
430
463
self, user_data=None, instance_key="key1", launch_index=0,
431
464
local_hostname="ooga.local", public_hostname="ooga.amazon.com",
432
465
reservation_key=u"res1", ramdisk_key=u"ram1", kernel_key=u"kernel1",
436
469
if not isinstance(user_data, Exception):
437
470
user_data = dumps(user_data)
438
471
api_base = "http://169.254.169.254/latest"
472
self.query_results.clear()
440
473
for url_suffix, value in [
441
474
("/user-data", user_data),
442
475
("/meta-data/instance-id", instance_key),
448
481
("/meta-data/ramdisk-id", ramdisk_key),
449
482
("/meta-data/ami-id", image_key),
451
query_results[api_base + url_suffix] = value
454
value = query_results[url]
455
if isinstance(value, Exception):
458
return succeed(value)
460
exchanger = self.broker_service.exchanger
461
handler = RegistrationHandler(self.broker_service.config,
462
self.broker_service.identity,
463
self.broker_service.reactor,
465
self.broker_service.pinger,
466
self.broker_service.message_store,
468
fetch_async=fetch_stub)
471
def prepare_cloud_registration(self, handler, account_name=None,
484
self.query_results[api_base + url_suffix] = value
486
def prepare_cloud_registration(self, account_name=None,
472
487
registration_password=None, tags=None):
473
488
# Set things up so that the client thinks it should register
474
mstore = self.broker_service.message_store
475
mstore.set_accepted_types(list(mstore.get_accepted_types())
476
+ ["register-cloud-vm"])
477
config = self.broker_service.config
478
config.account_name = account_name
479
config.registration_password = registration_password
480
config.computer_title = None
482
self.broker_service.identity.secure_id = None
483
self.assertTrue(handler.should_register())
489
self.mstore.set_accepted_types(list(self.mstore.get_accepted_types())
490
+ ["register-cloud-vm"])
491
self.config.account_name = account_name
492
self.config.registration_password = registration_password
493
self.config.computer_title = None
494
self.config.tags = tags
495
self.identity.secure_id = None
496
self.assertTrue(self.handler.should_register())
485
498
def get_expected_cloud_message(self, **kwargs):
516
529
immediately accepting the computer, instead of going through the
517
530
pending computer stage.
519
handler = self.get_registration_handler_for_cloud()
532
self.prepare_query_results()
521
config = self.broker_service.config
522
self.prepare_cloud_registration(handler, tags=u"server,london")
534
self.prepare_cloud_registration(tags=u"server,london")
524
536
# metadata is fetched and stored at reactor startup:
525
537
self.reactor.fire("run")
527
539
# And the metadata returned determines the URLs that are used
528
540
self.assertEquals(self.transport.get_url(),
529
541
"https://example.com/message-system")
530
self.assertEquals(self.broker_service.pinger.get_url(),
542
self.assertEquals(self.pinger.get_url(),
531
543
"http://example.com/ping")
532
544
# Let's make sure those values were written back to the config file
533
545
new_config = BrokerConfiguration()
536
548
self.assertEquals(new_config.ping_url, "http://example.com/ping")
538
550
# Okay! Exchange should cause the registration to happen.
539
self.broker_service.exchanger.exchange()
551
self.exchanger.exchange()
540
552
# This *should* be asynchronous, but I think a billion tests are
541
553
# written like this
542
554
self.assertEquals(len(self.transport.payloads), 1)
543
self.assertMessages(self.transport.payloads[0]["messages"],
544
[self.get_expected_cloud_message(tags=u"server,london")])
556
self.transport.payloads[0]["messages"],
557
[self.get_expected_cloud_message(tags=u"server,london")])
546
559
def test_cloud_registration_with_invalid_tags(self):
548
561
Invalid tags in the configuration should result in the tags not being
551
564
self.log_helper.ignore_errors("Invalid tags provided for cloud "
553
handler = self.get_registration_handler_for_cloud()
554
config = self.broker_service.config
555
self.prepare_cloud_registration(handler,
556
tags=u"<script>alert()</script>,hardy")
566
self.prepare_query_results()
567
self.prepare_cloud_registration(tags=u"<script>alert()</script>,hardy")
558
569
# metadata is fetched and stored at reactor startup:
559
570
self.reactor.fire("run")
560
self.broker_service.exchanger.exchange()
571
self.exchanger.exchange()
561
572
self.assertEquals(len(self.transport.payloads), 1)
562
573
self.assertMessages(self.transport.payloads[0]["messages"],
563
574
[self.get_expected_cloud_message(tags=None)])
570
581
"INFO: Message exchange completed in 0.00s.")
572
583
def test_wrong_user_data(self):
573
handler = self.get_registration_handler_for_cloud(
574
user_data="other stuff, not a bpickle")
575
config = self.broker_service.config
577
exchanger = self.broker_service.exchanger
579
self.prepare_cloud_registration(handler)
584
self.prepare_query_results(user_data="other stuff, not a bpickle")
585
self.prepare_cloud_registration()
581
587
# Mock registration-failed call
582
588
reactor_mock = self.mocker.patch(self.reactor)
584
590
self.mocker.replay()
586
592
self.reactor.fire("run")
593
self.exchanger.exchange()
589
595
def test_wrong_object_type_in_user_data(self):
590
handler = self.get_registration_handler_for_cloud(
592
config = self.broker_service.config
594
exchanger = self.broker_service.exchanger
596
self.prepare_cloud_registration(handler)
596
self.prepare_query_results(user_data=True)
597
self.prepare_cloud_registration()
598
599
# Mock registration-failed call
599
600
reactor_mock = self.mocker.patch(self.reactor)
601
602
self.mocker.replay()
603
604
self.reactor.fire("run")
605
self.exchanger.exchange()
606
607
def test_user_data_with_not_enough_elements(self):
608
609
If the AMI launch index isn't represented in the list of OTPs in the
609
610
user data then BOOM.
611
handler = self.get_registration_handler_for_cloud(launch_index=1)
613
self.prepare_cloud_registration(handler)
612
self.prepare_query_results(launch_index=1)
613
self.prepare_cloud_registration()
615
615
# Mock registration-failed call
616
616
reactor_mock = self.mocker.patch(self.reactor)
618
618
self.mocker.replay()
620
620
self.reactor.fire("run")
621
self.broker_service.exchanger.exchange()
621
self.exchanger.exchange()
623
623
def test_user_data_bpickle_without_otp(self):
624
handler = self.get_registration_handler_for_cloud(
625
user_data={"foo": "bar"})
626
self.prepare_cloud_registration(handler)
624
self.prepare_query_results(user_data={"foo": "bar"})
625
self.prepare_cloud_registration()
628
627
# Mock registration-failed call
629
628
reactor_mock = self.mocker.patch(self.reactor)
631
630
self.mocker.replay()
633
632
self.reactor.fire("run")
634
self.broker_service.exchanger.exchange()
633
self.exchanger.exchange()
636
635
def test_no_otp_fallback_to_account(self):
637
handler = self.get_registration_handler_for_cloud(
638
user_data="other stuff, not a bpickle",
639
instance_key=u"key1")
640
self.prepare_cloud_registration(handler,
641
account_name=u"onward",
636
self.prepare_query_results(user_data="other stuff, not a bpickle",
637
instance_key=u"key1")
638
self.prepare_cloud_registration(account_name=u"onward",
642
639
registration_password=u"password",
643
640
tags=u"london,server")
645
642
self.reactor.fire("run")
646
self.broker_service.exchanger.exchange()
643
self.exchanger.exchange()
648
645
self.assertEquals(len(self.transport.payloads), 1)
649
646
self.assertMessages(self.transport.payloads[0]["messages"],
685
682
back to, we fire 'registration-failed'.
687
684
self.log_helper.ignore_errors(pycurl.error)
688
config = self.broker_service.config
690
686
def fetch_stub(url):
691
687
return fail(pycurl.error(7, "couldn't connect to host"))
693
exchanger = self.broker_service.exchanger
694
handler = RegistrationHandler(self.broker_service.config,
695
self.broker_service.identity,
696
self.broker_service.reactor,
698
self.broker_service.pinger,
699
self.broker_service.message_store,
701
fetch_async=fetch_stub)
703
self.prepare_cloud_registration(handler)
689
self.handler = RegistrationHandler(
690
self.config, self.identity, self.reactor, self.exchanger,
691
self.pinger, self.mstore, fetch_async=fetch_stub)
693
self.fetch_stub = fetch_stub
694
self.prepare_query_results()
695
self.fetch_stub = fetch_stub
697
self.prepare_cloud_registration()
706
self.reactor.call_on("registration-failed", lambda: failed.append(True))
700
self.reactor.call_on(
701
"registration-failed", lambda: failed.append(True))
708
703
self.log_helper.ignore_errors("Got error while fetching meta-data")
709
704
self.reactor.fire("run")
705
self.exchanger.exchange()
711
706
self.assertEquals(failed, [True])
712
707
self.assertIn('error: (7, "couldn\'t connect to host")',
713
708
self.logfile.getvalue())
718
713
register-cloud-vm still occurs.
720
715
self.log_helper.ignore_errors(HTTPCodeError)
721
handler = self.get_registration_handler_for_cloud(
722
user_data=HTTPCodeError(404, "ohno"))
723
self.prepare_cloud_registration(handler,
724
account_name="onward",
716
self.prepare_query_results(user_data=HTTPCodeError(404, "ohno"))
717
self.prepare_cloud_registration(account_name="onward",
725
718
registration_password="password")
727
720
self.reactor.fire("run")
728
self.broker_service.exchanger.exchange()
721
self.exchanger.exchange()
729
722
self.assertIn("HTTPCodeError: Server returned HTTP code 404",
730
723
self.logfile.getvalue())
731
724
self.assertEquals(len(self.transport.payloads), 1)
735
728
account_name=u"onward",
736
729
registration_password=u"password")])
731
def test_cloud_registration_continues_without_ramdisk(self):
733
If the instance doesn't have a ramdisk (ie, the query for ramdisk
734
returns a 404), then register-cloud-vm still occurs.
736
self.log_helper.ignore_errors(HTTPCodeError)
737
self.prepare_query_results(ramdisk_key=HTTPCodeError(404, "ohno"))
738
self.prepare_cloud_registration()
740
self.reactor.fire("run")
741
self.exchanger.exchange()
742
self.assertIn("HTTPCodeError: Server returned HTTP code 404",
743
self.logfile.getvalue())
744
self.assertEquals(len(self.transport.payloads), 1)
745
self.assertMessages(self.transport.payloads[0]["messages"],
746
[self.get_expected_cloud_message(
738
749
def test_fall_back_to_normal_registration_when_metadata_fetch_fails(self):
740
751
If fetching metadata fails, but we do have an account name, then we
743
754
self.mstore.set_accepted_types(["register"])
744
755
self.log_helper.ignore_errors(HTTPCodeError)
745
handler = self.get_registration_handler_for_cloud(
756
self.prepare_query_results(
746
757
public_hostname=HTTPCodeError(404, "ohnoes"))
747
self.prepare_cloud_registration(handler,
748
account_name="onward",
758
self.prepare_cloud_registration(account_name="onward",
749
759
registration_password="password")
750
self.broker_service.config.computer_title = "whatever"
760
self.config.computer_title = "whatever"
751
761
self.reactor.fire("run")
752
self.broker_service.exchanger.exchange()
762
self.exchanger.exchange()
753
763
self.assertIn("HTTPCodeError: Server returned HTTP code 404",
754
764
self.logfile.getvalue())
755
765
self.assertEquals(len(self.transport.payloads), 1)
759
769
"account_name": u"onward",
760
770
"registration_password": u"password",
761
771
"hostname": socket.getfqdn(),
764
774
def test_should_register_in_cloud(self):
766
776
The client should register when it's in the cloud even though
767
777
it doesn't have the normal account details.
769
config = self.broker_service.config
770
handler = RegistrationHandler(self.broker_service.config,
771
self.broker_service.identity,
772
self.broker_service.reactor,
773
self.broker_service.exchanger,
774
self.broker_service.pinger,
775
self.broker_service.message_store,
778
mstore = self.broker_service.message_store
779
mstore.set_accepted_types(mstore.get_accepted_types()
780
+ ("register-cloud-vm",))
781
config.account_name = None
782
config.registration_password = None
783
config.computer_title = None
784
self.broker_service.identity.secure_id = None
785
self.assertTrue(handler.should_register())
779
self.mstore.set_accepted_types(self.mstore.get_accepted_types()
780
+ ("register-cloud-vm",))
781
self.config.account_name = None
782
self.config.registration_password = None
783
self.config.computer_title = None
784
self.identity.secure_id = None
785
self.assertTrue(self.handler.should_register())
787
787
def test_launch_index(self):
790
790
appropriate OTP in the user data.
792
792
otp = "correct otp for launch index"
793
handler = self.get_registration_handler_for_cloud(
794
user_data=self.get_user_data(otps=["wrong index",
793
self.prepare_query_results(
794
user_data=self.get_user_data(otps=["wrong index", otp,
796
795
"wrong again"],),
797
796
instance_key="key1",
800
self.prepare_cloud_registration(handler)
799
self.prepare_cloud_registration()
802
801
self.reactor.fire("run")
803
self.broker_service.exchanger.exchange()
802
self.exchanger.exchange()
804
803
self.assertEquals(len(self.transport.payloads), 1)
805
804
self.assertMessages(self.transport.payloads[0]["messages"],
806
805
[self.get_expected_cloud_message(otp=otp,
811
810
Having a secure ID means we shouldn't register, even in the cloud.
813
config = self.broker_service.config
814
handler = RegistrationHandler(self.broker_service.config,
815
self.broker_service.identity,
816
self.broker_service.reactor,
817
self.broker_service.exchanger,
818
self.broker_service.pinger,
819
self.broker_service.message_store,
822
mstore = self.broker_service.message_store
823
mstore.set_accepted_types(mstore.get_accepted_types()
824
+ ("register-cloud-vm",))
825
config.account_name = None
826
config.registration_password = None
827
config.computer_title = None
828
self.broker_service.identity.secure_id = "hello"
829
self.assertFalse(handler.should_register())
812
self.mstore.set_accepted_types(self.mstore.get_accepted_types()
813
+ ("register-cloud-vm",))
814
self.config.account_name = None
815
self.config.registration_password = None
816
self.config.computer_title = None
817
self.identity.secure_id = "hello"
818
self.assertFalse(self.handler.should_register())
831
820
def test_should_not_register_without_register_cloud_vm(self):
833
822
If the server isn't accepting a 'register-cloud-vm' message,
834
823
we shouldn't register.
836
config = self.broker_service.config
837
handler = RegistrationHandler(self.broker_service.config,
838
self.broker_service.identity,
839
self.broker_service.reactor,
840
self.broker_service.exchanger,
841
self.broker_service.pinger,
842
self.broker_service.message_store,
845
config.account_name = None
846
config.registration_password = None
847
config.computer_title = None
848
self.broker_service.identity.secure_id = None
849
self.assertFalse(handler.should_register())
825
self.config.account_name = None
826
self.config.registration_password = None
827
self.config.computer_title = None
828
self.identity.secure_id = None
829
self.assertFalse(self.handler.should_register())
852
832
class IsCloudManagedTests(LandscapeTest):
873
853
def test_is_managed(self):
875
L{is_cloud_managed} returns True if the EC2 user-data contains Landscape
876
instance information. It fetches the EC2 data with low timeouts.
855
L{is_cloud_managed} returns True if the EC2 user-data contains
856
Landscape instance information. It fetches the EC2 data with low
878
859
user_data = {"otps": ["otp1"], "exchange-url": "http://exchange",
879
860
"ping-url": "http://ping"}
932
913
self.assertFalse(is_cloud_managed(self.fake_fetch))
934
915
def test_is_managed_fetch_not_found(self):
935
917
def fake_fetch(url, connect_timeout=None):
936
918
raise HTTPCodeError(404, "ohnoes")
937
920
self.mock_socket()
938
921
self.mocker.replay()
939
922
self.assertFalse(is_cloud_managed(fake_fetch))
941
924
def test_is_managed_fetch_error(self):
942
926
def fake_fetch(url, connect_timeout=None):
943
927
raise FetchError(7, "couldn't connect to host")
944
929
self.mock_socket()
945
930
self.mocker.replay()
946
931
self.assertFalse(is_cloud_managed(fake_fetch))