7
from keystoneclient.v2_0 import ec2
8
from tests import utils
11
class EC2Tests(utils.TestCase):
13
super(EC2Tests, self).setUp()
14
self.TEST_REQUEST_HEADERS = {
15
'X-Auth-Token': 'aToken',
16
'User-Agent': 'python-keystoneclient',
18
self.TEST_POST_HEADERS = {
19
'Content-Type': 'application/json',
20
'X-Auth-Token': 'aToken',
21
'User-Agent': 'python-keystoneclient',
24
def test_create(self):
28
"tenant_id": tenant_id,
34
"tenant_id": tenant_id,
35
"created": "12/12/12",
39
resp = utils.TestResponse({
41
"text": json.dumps(resp_body),
44
url = urlparse.urljoin(self.TEST_URL,
45
'v2.0/users/%s/credentials/OS-EC2' % user_id)
46
kwargs = copy.copy(self.TEST_REQUEST_BASE)
47
kwargs['headers'] = self.TEST_POST_HEADERS
48
kwargs['data'] = json.dumps(req_body)
49
requests.request('POST',
51
**kwargs).AndReturn((resp))
54
cred = self.client.ec2.create(user_id, tenant_id)
55
self.assertTrue(isinstance(cred, ec2.EC2))
56
self.assertEqual(cred.tenant_id, tenant_id)
57
self.assertEqual(cred.enabled, True)
58
self.assertEqual(cred.access, 'access')
59
self.assertEqual(cred.secret, 'secret')
68
"tenant_id": tenant_id,
69
"created": "12/12/12",
73
resp = utils.TestResponse({
75
"text": json.dumps(resp_body),
78
url = urlparse.urljoin(self.TEST_URL,
79
'v2.0/users/%s/credentials/OS-EC2/%s' %
81
kwargs = copy.copy(self.TEST_REQUEST_BASE)
82
kwargs['headers'] = self.TEST_REQUEST_HEADERS
83
requests.request('GET',
85
**kwargs).AndReturn((resp))
88
cred = self.client.ec2.get(user_id, 'access')
89
self.assertTrue(isinstance(cred, ec2.EC2))
90
self.assertEqual(cred.tenant_id, tenant_id)
91
self.assertEqual(cred.enabled, True)
92
self.assertEqual(cred.access, 'access')
93
self.assertEqual(cred.secret, 'secret')
104
"tenant_id": tenant_id,
105
"created": "12/12/12",
111
"tenant_id": tenant_id,
112
"created": "12/12/31",
119
resp = utils.TestResponse({
121
"text": json.dumps(resp_body),
124
url = urlparse.urljoin(self.TEST_URL,
125
'v2.0/users/%s/credentials/OS-EC2' % user_id)
126
kwargs = copy.copy(self.TEST_REQUEST_BASE)
127
kwargs['headers'] = self.TEST_REQUEST_HEADERS
128
requests.request('GET',
130
**kwargs).AndReturn((resp))
133
creds = self.client.ec2.list(user_id)
134
self.assertTrue(len(creds), 2)
136
self.assertTrue(isinstance(cred, ec2.EC2))
137
self.assertEqual(cred.tenant_id, tenant_id)
138
self.assertEqual(cred.enabled, True)
139
self.assertEqual(cred.access, 'access')
140
self.assertEqual(cred.secret, 'secret')
142
def test_delete(self):
145
resp = utils.TestResponse({
150
url = urlparse.urljoin(self.TEST_URL,
151
'v2.0/users/%s/credentials/OS-EC2/%s' %
153
kwargs = copy.copy(self.TEST_REQUEST_BASE)
154
kwargs['headers'] = self.TEST_REQUEST_HEADERS
155
requests.request('DELETE',
157
**kwargs).AndReturn((resp))
160
self.client.ec2.delete(user_id, access)