290
290
TXAWSTestCase.setUp(self)
291
self.creds = AWSCredentials('foo', 'bar')
291
self.creds = AWSCredentials("foo", "bar")
292
292
self.endpoint = AWSServiceEndpoint(uri=EC2_ENDPOINT_US)
294
294
def test_init_minimum(self):
295
query = client.Query('DescribeInstances', self.creds, self.endpoint)
296
self.assertTrue('Timestamp' in query.params)
297
del query.params['Timestamp']
295
query = client.Query("DescribeInstances", self.creds, self.endpoint)
296
self.assertTrue("Timestamp" in query.params)
297
del query.params["Timestamp"]
298
298
self.assertEqual(
299
{'AWSAccessKeyId': 'foo',
300
'Action': 'DescribeInstances',
301
'SignatureMethod': 'HmacSHA1',
302
'SignatureVersion': '2',
303
'Version': '2008-12-01'},
299
{"AWSAccessKeyId": "foo",
300
"Action": "DescribeInstances",
301
"SignatureMethod": "HmacSHA1",
302
"SignatureVersion": "2",
303
"Version": "2008-12-01"},
306
306
def test_init_requires_action(self):
310
310
self.assertRaises(TypeError, client.Query, None)
312
312
def test_init_other_args_are_params(self):
313
query = client.Query('DescribeInstances', self.creds, self.endpoint,
314
{'InstanceId.0': '12345'},
313
query = client.Query("DescribeInstances", self.creds, self.endpoint,
314
{"InstanceId.0": "12345"},
315
315
time_tuple=(2007,11,12,13,14,15,0,0,0))
316
316
self.assertEqual(
317
{'AWSAccessKeyId': 'foo',
318
'Action': 'DescribeInstances',
319
'InstanceId.0': '12345',
320
'SignatureMethod': 'HmacSHA1',
321
'SignatureVersion': '2',
322
'Timestamp': '2007-11-12T13:14:15Z',
323
'Version': '2008-12-01'},
317
{"AWSAccessKeyId": "foo",
318
"Action": "DescribeInstances",
319
"InstanceId.0": "12345",
320
"SignatureMethod": "HmacSHA1",
321
"SignatureVersion": "2",
322
"Timestamp": "2007-11-12T13:14:15Z",
323
"Version": "2008-12-01"},
326
326
def test_sorted_params(self):
327
query = client.Query('DescribeInstances', self.creds, self.endpoint,
327
query = client.Query("DescribeInstances", self.creds, self.endpoint,
329
329
time_tuple=(2007,11,12,13,14,15,0,0,0))
330
330
self.assertEqual([
331
('AWSAccessKeyId', 'foo'),
332
('Action', 'DescribeInstances'),
333
('SignatureMethod', 'HmacSHA1'),
334
('SignatureVersion', '2'),
335
('Timestamp', '2007-11-12T13:14:15Z'),
336
('Version', '2008-12-01'),
331
("AWSAccessKeyId", "foo"),
332
("Action", "DescribeInstances"),
333
("SignatureMethod", "HmacSHA1"),
334
("SignatureVersion", "2"),
335
("Timestamp", "2007-11-12T13:14:15Z"),
336
("Version", "2008-12-01"),
338
338
], query.sorted_params())
340
340
def test_encode_unreserved(self):
341
all_unreserved = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ'
342
'abcdefghijklmnopqrstuvwxyz0123456789-_.~')
343
query = client.Query('DescribeInstances', self.creds, self.endpoint)
341
all_unreserved = ("ABCDEFGHIJKLMNOPQRSTUVWXYZ"
342
"abcdefghijklmnopqrstuvwxyz0123456789-_.~")
343
query = client.Query("DescribeInstances", self.creds, self.endpoint)
344
344
self.assertEqual(all_unreserved, query.encode(all_unreserved))
346
346
def test_encode_space(self):
347
"""This may be just 'url encode', but the AWS manual isn't clear."""
348
query = client.Query('DescribeInstances', self.creds, self.endpoint)
349
self.assertEqual('a%20space', query.encode('a space'))
347
"""This may be just "url encode", but the AWS manual isn't clear."""
348
query = client.Query("DescribeInstances", self.creds, self.endpoint)
349
self.assertEqual("a%20space", query.encode("a space"))
351
351
def test_canonical_query(self):
352
query = client.Query('DescribeInstances', self.creds, self.endpoint,
353
{'fu n': 'g/ames', 'argwithnovalue':'',
354
'InstanceId.1': 'i-1234'},
352
query = client.Query("DescribeInstances", self.creds, self.endpoint,
353
{"fu n": "g/ames", "argwithnovalue":"",
354
"InstanceId.1": "i-1234"},
355
355
time_tuple=(2007,11,12,13,14,15,0,0,0))
356
expected_query = ('AWSAccessKeyId=foo&Action=DescribeInstances'
357
'&InstanceId.1=i-1234'
358
'&SignatureMethod=HmacSHA1&SignatureVersion=2&'
359
'Timestamp=2007-11-12T13%3A14%3A15Z&Version=2008-12-01&'
360
'argwithnovalue=&fu%20n=g%2Fames')
356
expected_query = ("AWSAccessKeyId=foo&Action=DescribeInstances"
357
"&InstanceId.1=i-1234"
358
"&SignatureMethod=HmacSHA1&SignatureVersion=2&"
359
"Timestamp=2007-11-12T13%3A14%3A15Z&Version=2008-12-01&"
360
"argwithnovalue=&fu%20n=g%2Fames")
361
361
self.assertEqual(expected_query, query.canonical_query_params())
363
363
def test_signing_text(self):
364
query = client.Query('DescribeInstances', self.creds, self.endpoint,
364
query = client.Query("DescribeInstances", self.creds, self.endpoint,
365
365
time_tuple=(2007,11,12,13,14,15,0,0,0))
366
signing_text = ('GET\n%s\n/\n' % self.endpoint.host +
367
'AWSAccessKeyId=foo&Action=DescribeInstances&'
368
'SignatureMethod=HmacSHA1&SignatureVersion=2&'
369
'Timestamp=2007-11-12T13%3A14%3A15Z&Version=2008-12-01')
366
signing_text = ("GET\n%s\n/\n" % self.endpoint.host +
367
"AWSAccessKeyId=foo&Action=DescribeInstances&"
368
"SignatureMethod=HmacSHA1&SignatureVersion=2&"
369
"Timestamp=2007-11-12T13%3A14%3A15Z&Version=2008-12-01")
370
370
self.assertEqual(signing_text, query.signing_text())
372
372
def test_sign(self):
373
query = client.Query('DescribeInstances', self.creds, self.endpoint,
373
query = client.Query("DescribeInstances", self.creds, self.endpoint,
374
374
time_tuple=(2007,11,12,13,14,15,0,0,0))
376
self.assertEqual('JuCpwFA2H4OVF3Ql/lAQs+V6iMc=',
377
query.params['Signature'])
376
self.assertEqual("JuCpwFA2H4OVF3Ql/lAQs+V6iMc=",
377
query.params["Signature"])
380
380
class TestEBS(TXAWSTestCase):
649
649
d = ec2.attach_volume("vol-4d826724", "i-6058a509", "/dev/sdh")
650
650
d.addCallback(check_parsed_response)
653
def check_parsed_keypairs(self, results):
654
self.assertEquals(len(results), 1)
656
self.assertEquals(keypair.name, "gsg-keypair")
659
"1f:51:ae:28:bf:89:e9:d8:1f:25:5d:37:2d:7d:b8:ca:9f:f5:f1:6f")
661
def test_single_describe_keypairs(self):
663
class StubQuery(object):
664
def __init__(stub, action, creds, endpoint, params):
665
self.assertEqual(action, "DescribeKeyPairs")
666
self.assertEqual("foo", creds)
667
self.assertEquals(params, {})
670
return succeed(payload.sample_single_describe_keypairs_result)
672
ec2 = client.EC2Client(creds="foo", query_factory=StubQuery)
673
d = ec2.describe_keypairs()
674
d.addCallback(self.check_parsed_keypairs)
677
def test_multiple_describe_keypairs(self):
679
def check_parsed_keypairs(results):
680
self.assertEquals(len(results), 2)
681
keypair1, keypair2 = results
682
self.assertEquals(keypair1.name, "gsg-keypair-1")
684
keypair1.fingerprint,
685
"1f:51:ae:28:bf:89:e9:d8:1f:25:5d:37:2d:7d:b8:ca:9f:f5:f1:6f")
686
self.assertEquals(keypair2.name, "gsg-keypair-2")
688
keypair2.fingerprint,
689
"1f:51:ae:28:bf:89:e9:d8:1f:25:5d:37:2d:7d:b8:ca:9f:f5:f1:70")
691
class StubQuery(object):
692
def __init__(stub, action, creds, endpoint, params):
693
self.assertEqual(action, "DescribeKeyPairs")
694
self.assertEqual("foo", creds)
695
self.assertEquals(params, {})
699
payload.sample_multiple_describe_keypairs_result)
701
ec2 = client.EC2Client(creds="foo", query_factory=StubQuery)
702
d = ec2.describe_keypairs()
703
d.addCallback(check_parsed_keypairs)
706
def test_describe_specified_keypairs(self):
708
class StubQuery(object):
709
def __init__(stub, action, creds, endpoint, params):
710
self.assertEqual(action, "DescribeKeyPairs")
711
self.assertEqual("foo", creds)
714
{"KeyPair.1": "gsg-keypair"})
717
return succeed(payload.sample_single_describe_keypairs_result)
719
ec2 = client.EC2Client(creds="foo", query_factory=StubQuery)
720
d = ec2.describe_keypairs("gsg-keypair")
721
d.addCallback(self.check_parsed_keypairs)
724
def test_create_keypair(self):
726
def check_parsed_create_keypair(keypair):
727
self.assertEquals(keypair.name, "example-key-name")
730
"1f:51:ae:28:bf:89:e9:d8:1f:25:5d:37:2d:7d:b8:ca:9f:f5:f1:6f")
731
self.assertTrue(keypair.material.startswith(
732
"-----BEGIN RSA PRIVATE KEY-----"))
733
self.assertTrue(keypair.material.endswith(
734
"-----END RSA PRIVATE KEY-----"))
735
self.assertEquals(len(keypair.material), 1670)
737
class StubQuery(object):
738
def __init__(stub, action, creds, endpoint, params):
739
self.assertEqual(action, "CreateKeyPair")
740
self.assertEqual("foo", creds)
743
{"KeyName": "example-key-name"})
746
return succeed(payload.sample_create_keypair_result)
748
ec2 = client.EC2Client(creds="foo", query_factory=StubQuery)
749
d = ec2.create_keypair("example-key-name")
750
d.addCallback(check_parsed_create_keypair)
753
def test_delete_keypair_true_result(self):
755
class StubQuery(object):
756
def __init__(stub, action, creds, endpoint, params):
757
self.assertEqual(action, "DeleteKeyPair")
758
self.assertEqual("foo", creds)
759
self.assertEqual("http:///", endpoint.get_uri())
762
{"KeyName": "example-key-name"})
765
return succeed(payload.sample_delete_keypair_true_result)
767
ec2 = client.EC2Client(creds="foo", query_factory=StubQuery)
768
d = ec2.delete_keypair("example-key-name")
769
d.addCallback(self.assertTrue)
772
def test_delete_keypair_false_result(self):
774
class StubQuery(object):
775
def __init__(stub, action, creds, endpoint, params):
776
self.assertEqual(action, "DeleteKeyPair")
777
self.assertEqual("foo", creds)
778
self.assertEqual("http:///", endpoint.get_uri())
781
{"KeyName": "example-key-name"})
784
return succeed(payload.sample_delete_keypair_false_result)
786
ec2 = client.EC2Client(creds="foo", query_factory=StubQuery)
787
d = ec2.delete_keypair("example-key-name")
788
d.addCallback(self.assertFalse)
791
def test_delete_keypair_no_result(self):
793
class StubQuery(object):
794
def __init__(stub, action, creds, endpoint, params):
795
self.assertEqual(action, "DeleteKeyPair")
796
self.assertEqual("foo", creds)
797
self.assertEqual("http:///", endpoint.get_uri())
800
{"KeyName": "example-key-name"})
803
return succeed(payload.sample_delete_keypair_no_result)
805
ec2 = client.EC2Client(creds="foo", query_factory=StubQuery)
806
d = ec2.delete_keypair("example-key-name")
807
d.addCallback(self.assertFalse)