~tribaal/txaws/xss-hardening

« back to all changes in this revision

Viewing changes to txaws/ec2/tests/test_client.py

Merged 921421-completemultipart [r=oubiwann][f=921421]

This branch adds Complete Multipart method to s3 client.

Show diffs side-by-side

added added

removed removed

Lines of Context:
171
171
        self.assertEquals(reservation.owner_id, "123456789012")
172
172
        # check groups
173
173
        group = reservation.groups[0]
174
 
        self.assertEquals(group[0], "sg-64f9eb08")
175
 
        self.assertEquals(group[1], "default")
 
174
        self.assertEquals(group, "default")
176
175
        # check instance
177
176
        self.assertEquals(instance.instance_id, "i-abcdef01")
178
177
        self.assertEquals(instance.instance_state, "running")
202
201
        self.assertEquals(reservation.owner_id, "123456789012")
203
202
        # check groups
204
203
        group = reservation.groups[0]
205
 
        self.assertEquals(group[0], "sg-64f9eb08")
206
 
        self.assertEquals(group[1], "default")
 
204
        self.assertEquals(group, "default")
207
205
        # check instance
208
206
        self.assertEquals(instance.instance_id, "i-abcdef01")
209
207
        self.assertEquals(instance.instance_state, "running")
333
331
        self.assertEquals(reservation.owner_id, "495219933132")
334
332
        # check groups
335
333
        group = reservation.groups[0]
336
 
        self.assertEquals(group[0], "sg-64f9eb08")
337
 
        self.assertEquals(group[1], "default")
 
334
        self.assertEquals(group, "default")
338
335
        # check instance
339
336
        self.assertEquals(instance.instance_id, "i-2ba64342")
340
337
        self.assertEquals(instance.instance_state, "pending")
380
377
            ramdisk_id=u"r-1234")
381
378
        d.addCallback(self.check_parsed_run_instances)
382
379
 
383
 
    def test_run_instances_with_subnet(self):
384
 
        class StubQuery(object):
385
 
            def __init__(stub, action="", creds=None, endpoint=None,
386
 
                         other_params={}):
387
 
                self.assertEqual(action, "RunInstances")
388
 
                self.assertEqual(creds.access_key, "foo")
389
 
                self.assertEqual(creds.secret_key, "bar")
390
 
                self.assertEquals(
391
 
                    other_params,
392
 
                    {"ImageId": "ami-1234", "MaxCount": "2", "MinCount": "1",
393
 
                     "SecurityGroupId.1": u"sg-a72d9f92e", "KeyName": u"default",
394
 
                     "UserData": "Zm9v", "InstanceType": u"m1.small",
395
 
                     "Placement.AvailabilityZone": u"us-east-1b",
396
 
                     "KernelId": u"k-1234", "RamdiskId": u"r-1234",
397
 
                     "SubnetId": "subnet-a72d829f"})
398
 
 
399
 
            def submit(self):
400
 
                return succeed(
401
 
                    payload.sample_run_instances_result)
402
 
 
403
 
        creds = AWSCredentials("foo", "bar")
404
 
        ec2 = client.EC2Client(creds, query_factory=StubQuery)
405
 
        d = ec2.run_instances("ami-1234", 1, 2, security_group_ids=[u"sg-a72d9f92e"],
406
 
            key_name=u"default", user_data=u"foo", instance_type=u"m1.small",
407
 
            availability_zone=u"us-east-1b", kernel_id=u"k-1234",
408
 
            ramdisk_id=u"r-1234", subnet_id="subnet-a72d829f")
409
 
        d.addCallback(self.check_parsed_run_instances)
410
 
 
411
 
    def test_run_instances_with_subnet_but_without_secgroup_id(self):
412
 
        creds = AWSCredentials("foo", "bar")
413
 
        ec2 = client.EC2Client(creds)
414
 
        error = self.assertRaises(ValueError, ec2.run_instances, "ami-1234", 1, 2,
415
 
            key_name=u"default", user_data=u"foo", instance_type=u"m1.small",
416
 
            availability_zone=u"us-east-1b", kernel_id=u"k-1234",
417
 
            ramdisk_id=u"r-1234", subnet_id="subnet-a72d829f")
418
 
        self.assertEqual(
419
 
            str(error),
420
 
            "You must specify the security_group_ids with the subnet_id"
421
 
        )
422
 
 
423
 
    def test_run_instances_without_subnet_and_secgroups(self):
424
 
        creds = AWSCredentials("foo", "bar")
425
 
        ec2 = client.EC2Client(creds)
426
 
        error = self.assertRaises(ValueError, ec2.run_instances, "ami-1234", 1, 2,
427
 
            key_name=u"default", user_data=u"foo", instance_type=u"m1.small",
428
 
            availability_zone=u"us-east-1b", kernel_id=u"k-1234",
429
 
            ramdisk_id=u"r-1234")
430
 
        self.assertEqual(
431
 
            str(error),
432
 
            ("You must specify either the subnet_id and "
433
 
             "security_group_ids or security_groups")
434
 
        )
435
 
 
436
380
 
437
381
class EC2ClientSecurityGroupsTestCase(TXAWSTestCase):
438
382
 
456
400
 
457
401
        def check_results(security_groups):
458
402
            [security_group] = security_groups
459
 
            self.assertEquals(security_group.id, "sg-a1a1a1")
460
403
            self.assertEquals(security_group.owner_id,
461
404
                              "UYY3TLBUXIEON5NQVUUX6OMPWBZIQNFM")
462
405
            self.assertEquals(security_group.name, "WebServers")
497
440
            security_group = security_groups[0]
498
441
            self.assertEquals(security_group.owner_id,
499
442
                              "UYY3TLBUXIEON5NQVUUX6OMPWBZIQNFM")
500
 
            self.assertEquals(security_group.id, "sg-a1a1a1")
501
443
            self.assertEquals(security_group.name, "MessageServers")
502
444
            self.assertEquals(security_group.description, "Message Servers")
503
445
            self.assertEquals(security_group.allowed_groups, [])
509
451
            security_group = security_groups[1]
510
452
            self.assertEquals(security_group.owner_id,
511
453
                              "UYY3TLBUXIEON5NQVUUX6OMPWBZIQNFM")
512
 
            self.assertEquals(security_group.id, "sg-c3c3c3")
513
454
            self.assertEquals(security_group.name, "WebServers")
514
455
            self.assertEquals(security_group.description, "Web Servers")
515
456
            self.assertEquals([(pair.user_id, pair.group_name)
642
583
            def submit(self):
643
584
                return succeed(payload.sample_create_security_group)
644
585
 
645
 
        def check_result(id):
646
 
            self.assertEquals(id, "sg-1a2b3c4d")
647
 
 
648
586
        creds = AWSCredentials("foo", "bar")
649
587
        ec2 = client.EC2Client(creds, query_factory=StubQuery)
650
588
        d = ec2.create_security_group(
651
589
            "WebServers",
652
590
            "The group for the web server farm.")
653
 
        return d.addCallback(check_result)
654
 
 
655
 
    def test_create_security_group_with_VPC(self):
656
 
        class StubQuery(object):
657
 
 
658
 
            def __init__(stub, action="", creds=None, endpoint=None,
659
 
                         other_params={}):
660
 
                self.assertEqual(action, "CreateSecurityGroup")
661
 
                self.assertEqual(creds.access_key, "foo")
662
 
                self.assertEqual(creds.secret_key, "bar")
663
 
                self.assertEqual(other_params, {
664
 
                    "GroupName": "WebServers",
665
 
                    "GroupDescription": "The group for the web server farm.",
666
 
                    "VpcId": "vpc-a4f2",
667
 
                    })
668
 
 
669
 
            def submit(self):
670
 
                return succeed(payload.sample_create_security_group)
671
 
 
672
 
        def check_result(id):
673
 
            self.assertEquals(id, "sg-1a2b3c4d")
674
 
 
675
 
        creds = AWSCredentials("foo", "bar")
676
 
        ec2 = client.EC2Client(creds, query_factory=StubQuery)
677
 
        d = ec2.create_security_group(
678
 
            "WebServers",
679
 
            "The group for the web server farm.",
680
 
            "vpc-a4f2")
681
 
        return d.addCallback(check_result)
682
 
 
683
 
    def test_delete_security_group_using_name(self):
 
591
        return self.assertTrue(d)
 
592
 
 
593
    def test_delete_security_group(self):
684
594
        """
685
595
        L{EC2Client.delete_security_group} returns a C{Deferred} that
686
596
        eventually fires with a true value, indicating the success of the
705
615
        d = ec2.delete_security_group("WebServers")
706
616
        return self.assertTrue(d)
707
617
 
708
 
    def test_delete_security_group_using_id(self):
709
 
        """
710
 
        L{EC2Client.delete_security_group} returns a C{Deferred} that
711
 
        eventually fires with a true value, indicating the success of the
712
 
        operation.
713
 
        """
714
 
        class StubQuery(object):
715
 
 
716
 
            def __init__(stub, action="", creds=None, endpoint=None,
717
 
                         other_params={}):
718
 
                self.assertEqual(action, "DeleteSecurityGroup")
719
 
                self.assertEqual(creds.access_key, "foo")
720
 
                self.assertEqual(creds.secret_key, "bar")
721
 
                self.assertEqual(other_params, {
722
 
                    "GroupId": "sg-a1a1a1",
723
 
                    })
724
 
 
725
 
            def submit(self):
726
 
                return succeed(payload.sample_delete_security_group)
727
 
 
728
 
        creds = AWSCredentials("foo", "bar")
729
 
        ec2 = client.EC2Client(creds, query_factory=StubQuery)
730
 
        d = ec2.delete_security_group(id="sg-a1a1a1")
731
 
        return self.assertTrue(d)
732
 
 
733
 
    def test_delete_security_group_without_id_and_name(self):
734
 
        creds = AWSCredentials("foo", "bar")
735
 
        ec2 = client.EC2Client(creds)
736
 
        error = self.assertRaises(ValueError, ec2.delete_security_group)
737
 
        self.assertEquals(
738
 
            str(error),
739
 
            "You must provide either the security group name or id",
740
 
        )
741
 
 
742
618
    def test_delete_security_group_failure(self):
743
619
        """
744
620
        L{EC2Client.delete_security_group} returns a C{Deferred} that
800
676
        creds = AWSCredentials("foo", "bar")
801
677
        ec2 = client.EC2Client(creds, query_factory=StubQuery)
802
678
        d = ec2.authorize_security_group(
803
 
            group_name="WebServers", source_group_name="AppServers",
804
 
            source_group_owner_id="123456789123")
805
 
        return self.assertTrue(d)
806
 
 
807
 
    def test_authorize_security_group_using_group_id(self):
808
 
        class StubQuery(object):
809
 
 
810
 
            def __init__(stub, action="", creds=None, endpoint=None,
811
 
                         other_params={}):
812
 
                self.assertEqual(action, "AuthorizeSecurityGroupIngress")
813
 
                self.assertEqual(creds.access_key, "foo")
814
 
                self.assertEqual(creds.secret_key, "bar")
815
 
                self.assertEqual(other_params, {
816
 
                    "GroupId": "sg-a1b2c3d4e5f6",
817
 
                    "SourceSecurityGroupName": "AppServers",
818
 
                    "SourceSecurityGroupOwnerId": "123456789123",
819
 
                    })
820
 
 
821
 
            def submit(self):
822
 
                return succeed(payload.sample_authorize_security_group)
823
 
 
824
 
        creds = AWSCredentials("foo", "bar")
825
 
        ec2 = client.EC2Client(creds, query_factory=StubQuery)
826
 
        d = ec2.authorize_security_group(
827
 
            group_id="sg-a1b2c3d4e5f6", source_group_name="AppServers",
828
 
            source_group_owner_id="123456789123")
829
 
        return self.assertTrue(d)
830
 
 
831
 
    def test_authorize_security_group_without_group_id_and_group_name(self):
832
 
        creds = AWSCredentials("foo", "bar")
833
 
        ec2 = client.EC2Client(creds)
834
 
        error = self.assertRaises(ValueError, ec2.authorize_security_group,
835
 
                source_group_name="AppServers", source_group_owner_id="123456789123")
836
 
        self.assertEquals(
837
 
            str(error),
838
 
            "You must specify either the group name of the group id.")
 
679
            "WebServers", source_group_name="AppServers",
 
680
            source_group_owner_id="123456789123")
 
681
        return self.assertTrue(d)
839
682
 
840
683
    def test_authorize_security_group_with_ip_permissions(self):
841
684
        """
864
707
        creds = AWSCredentials("foo", "bar")
865
708
        ec2 = client.EC2Client(creds, query_factory=StubQuery)
866
709
        d = ec2.authorize_security_group(
867
 
            group_name="WebServers", ip_protocol="tcp", from_port="22", to_port="80",
 
710
            "WebServers", ip_protocol="tcp", from_port="22", to_port="80",
868
711
            cidr_ip="0.0.0.0/0")
869
712
        return self.assertTrue(d)
870
713
 
879
722
        """
880
723
        creds = AWSCredentials("foo", "bar")
881
724
        ec2 = client.EC2Client(creds)
882
 
        error = self.assertRaises(ValueError, ec2.authorize_security_group,
883
 
                group_name="WebServers", ip_protocol="tcp", from_port="22")
884
 
        self.assertEquals(
885
 
            str(error),
886
 
            ("You must specify either both group parameters or all the "
887
 
             "ip parameters."))
 
725
        self.assertRaises(ValueError, ec2.authorize_security_group,
 
726
                "WebServers", ip_protocol="tcp", from_port="22")
 
727
        try:
 
728
            ec2.authorize_security_group(
 
729
                "WebServers", ip_protocol="tcp", from_port="22")
 
730
        except Exception, error:
 
731
            self.assertEquals(
 
732
                str(error),
 
733
                ("You must specify either both group parameters or all the "
 
734
                 "ip parameters."))
888
735
 
889
736
    def test_authorize_group_permission(self):
890
737
        """
975
822
            source_group_owner_id="123456789123")
976
823
        return self.assertTrue(d)
977
824
 
978
 
    def test_revoke_security_group_using_group_id(self):
979
 
        class StubQuery(object):
980
 
 
981
 
            def __init__(stub, action="", creds=None, endpoint=None,
982
 
                         other_params={}):
983
 
                self.assertEqual(action, "RevokeSecurityGroupIngress")
984
 
                self.assertEqual(creds.access_key, "foo")
985
 
                self.assertEqual(creds.secret_key, "bar")
986
 
                self.assertEqual(other_params, {
987
 
                    "GroupId": "sg-a1a1a1",
988
 
                    "SourceSecurityGroupName": "AppServers",
989
 
                    "SourceSecurityGroupOwnerId": "123456789123",
990
 
                    })
991
 
 
992
 
            def submit(self):
993
 
                return succeed(payload.sample_revoke_security_group)
994
 
 
995
 
        creds = AWSCredentials("foo", "bar")
996
 
        ec2 = client.EC2Client(creds, query_factory=StubQuery)
997
 
        d = ec2.revoke_security_group(
998
 
            group_id="sg-a1a1a1", source_group_name="AppServers",
999
 
            source_group_owner_id="123456789123")
1000
 
        return self.assertTrue(d)
1001
 
 
1002
825
    def test_revoke_security_group_with_ip_permissions(self):
1003
826
        """
1004
827
        L{EC2Client.revoke_security_group} returns a C{Deferred} that
1030
853
            cidr_ip="0.0.0.0/0")
1031
854
        return self.assertTrue(d)
1032
855
 
1033
 
    def test_revoke_security_group_without_group_id_and_group_name(self):
1034
 
        creds = AWSCredentials("foo", "bar")
1035
 
        ec2 = client.EC2Client(creds)
1036
 
        error = self.assertRaises(ValueError, ec2.revoke_security_group,
1037
 
                source_group_name="AppServers", source_group_owner_id="123456789123")
1038
 
        self.assertEquals(
1039
 
            str(error),
1040
 
            "You must specify either the group name of the group id.")
1041
 
 
1042
856
    def test_revoke_security_group_with_missing_parameters(self):
1043
857
        """
1044
858
        L{EC2Client.revoke_security_group} returns a C{Deferred} that
1050
864
        """
1051
865
        creds = AWSCredentials("foo", "bar")
1052
866
        ec2 = client.EC2Client(creds)
1053
 
        error = self.assertRaises(ValueError, ec2.revoke_security_group,
1054
 
                group_name="WebServers", ip_protocol="tcp", from_port="22")
1055
 
        self.assertEquals(
1056
 
            str(error),
1057
 
            ("You must specify either both group parameters or all the "
1058
 
             "ip parameters."))
 
867
        self.assertRaises(ValueError, ec2.authorize_security_group,
 
868
                "WebServers", ip_protocol="tcp", from_port="22")
 
869
        try:
 
870
            ec2.authorize_security_group(
 
871
                "WebServers", ip_protocol="tcp", from_port="22")
 
872
        except Exception, error:
 
873
            self.assertEquals(
 
874
                str(error),
 
875
                ("You must specify either both group parameters or all the "
 
876
                 "ip parameters."))
1059
877
 
1060
878
    def test_revoke_group_permission(self):
1061
879
        """
1743
1561
            {"AWSAccessKeyId": "foo",
1744
1562
             "Action": "DescribeInstances",
1745
1563
             "SignatureVersion": "2",
1746
 
             "Version": "2012-08-15"})
 
1564
             "Version": "2009-11-30"})
1747
1565
 
1748
1566
    def test_init_other_args_are_params(self):
1749
1567
        query = client.Query(
1757
1575
             "InstanceId.0": "12345",
1758
1576
             "SignatureVersion": "2",
1759
1577
             "Timestamp": "2007-11-12T13:14:15Z",
1760
 
             "Version": "2012-08-15"})
 
1578
             "Version": "2009-11-30"})
1761
1579
 
1762
1580
    def test_no_timestamp_if_expires_in_other_params(self):
1763
1581
        """
1775
1593
             "Action": "DescribeInstances",
1776
1594
             "SignatureVersion": "2",
1777
1595
             "Expires": "2007-11-12T13:14:15Z",
1778
 
             "Version": "2012-08-15"})
 
1596
             "Version": "2009-11-30"})
1779
1597
 
1780
1598
    def test_sign(self):
1781
1599
        query = client.Query(
1783
1601
            endpoint=self.endpoint,
1784
1602
            time_tuple=(2007, 11, 12, 13, 14, 15, 0, 0, 0))
1785
1603
        query.sign()
1786
 
        self.assertEqual("c0gbkemrGEJdqxWOl2UZYaygYiBLVjrpWBs7bTN7Ndo=",
 
1604
        self.assertEqual("G4c2NtQaFNhWWT8EWPVIIOpHVr0mGUYwJVYss9krsMU=",
1787
1605
            query.params["Signature"])
1788
1606
 
1789
1607
    def test_old_sign(self):
1794
1612
            other_params={"SignatureVersion": "1"})
1795
1613
        query.sign()
1796
1614
        self.assertEqual(
1797
 
            "7tWrIC5VYvXOjVE+roVoyDUt2Yw=", query.params["Signature"])
 
1615
            "9xP+PIs/3QXW+4mWX6WGR4nGqfE=", query.params["Signature"])
1798
1616
 
1799
1617
    def test_unsupported_sign(self):
1800
1618
        query = client.Query(