~ubuntu-branches/ubuntu/wily/neutron-lbaas/wily-proposed

« back to all changes in this revision

Viewing changes to neutron_lbaas/tests/tempest/lib/services/compute/json/floating_ips_client.py

  • Committer: Package Import Robot
  • Author(s): Corey Bryant
  • Date: 2015-08-19 14:26:04 UTC
  • mfrom: (1.1.6)
  • Revision ID: package-import@ubuntu.com-20150819142604-928sn8gp2urb2bmh
Tags: 2:7.0.0~b2-0ubuntu1
* New upstream milestone for OpenStack Liberty.
* d/control: Align (build-)depends with upstream.
* d/rules: Remove .eggs directory in override_dh_auto_clean.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright 2012 OpenStack Foundation
 
2
# All Rights Reserved.
 
3
#
 
4
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
 
5
#    not use this file except in compliance with the License. You may obtain
 
6
#    a copy of the License at
 
7
#
 
8
#         http://www.apache.org/licenses/LICENSE-2.0
 
9
#
 
10
#    Unless required by applicable law or agreed to in writing, software
 
11
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 
12
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 
13
#    License for the specific language governing permissions and limitations
 
14
#    under the License.
 
15
 
 
16
import json
 
17
 
 
18
from six.moves.urllib import parse as urllib
 
19
from tempest_lib import exceptions as lib_exc
 
20
 
 
21
from neutron_lbaas.tests.tempest.lib.api_schema.response.compute.v2_1 import floating_ips as schema
 
22
from neutron_lbaas.tests.tempest.lib.common import service_client
 
23
 
 
24
 
 
25
class FloatingIPsClientJSON(service_client.ServiceClient):
 
26
 
 
27
    def list_floating_ips(self, params=None):
 
28
        """Returns a list of all floating IPs filtered by any parameters."""
 
29
        url = 'os-floating-ips'
 
30
        if params:
 
31
            url += '?%s' % urllib.urlencode(params)
 
32
 
 
33
        resp, body = self.get(url)
 
34
        body = json.loads(body)
 
35
        self.validate_response(schema.list_floating_ips, resp, body)
 
36
        return service_client.ResponseBodyList(resp, body['floating_ips'])
 
37
 
 
38
    def show_floating_ip(self, floating_ip_id):
 
39
        """Get the details of a floating IP."""
 
40
        url = "os-floating-ips/%s" % str(floating_ip_id)
 
41
        resp, body = self.get(url)
 
42
        body = json.loads(body)
 
43
        self.validate_response(schema.create_get_floating_ip, resp, body)
 
44
        return service_client.ResponseBody(resp, body['floating_ip'])
 
45
 
 
46
    def create_floating_ip(self, pool_name=None):
 
47
        """Allocate a floating IP to the project."""
 
48
        url = 'os-floating-ips'
 
49
        post_body = {'pool': pool_name}
 
50
        post_body = json.dumps(post_body)
 
51
        resp, body = self.post(url, post_body)
 
52
        body = json.loads(body)
 
53
        self.validate_response(schema.create_get_floating_ip, resp, body)
 
54
        return service_client.ResponseBody(resp, body['floating_ip'])
 
55
 
 
56
    def delete_floating_ip(self, floating_ip_id):
 
57
        """Deletes the provided floating IP from the project."""
 
58
        url = "os-floating-ips/%s" % str(floating_ip_id)
 
59
        resp, body = self.delete(url)
 
60
        self.validate_response(schema.add_remove_floating_ip, resp, body)
 
61
        return service_client.ResponseBody(resp, body)
 
62
 
 
63
    def associate_floating_ip_to_server(self, floating_ip, server_id):
 
64
        """Associate the provided floating IP to a specific server."""
 
65
        url = "servers/%s/action" % str(server_id)
 
66
        post_body = {
 
67
            'addFloatingIp': {
 
68
                'address': floating_ip,
 
69
            }
 
70
        }
 
71
 
 
72
        post_body = json.dumps(post_body)
 
73
        resp, body = self.post(url, post_body)
 
74
        self.validate_response(schema.add_remove_floating_ip, resp, body)
 
75
        return service_client.ResponseBody(resp, body)
 
76
 
 
77
    def disassociate_floating_ip_from_server(self, floating_ip, server_id):
 
78
        """Disassociate the provided floating IP from a specific server."""
 
79
        url = "servers/%s/action" % str(server_id)
 
80
        post_body = {
 
81
            'removeFloatingIp': {
 
82
                'address': floating_ip,
 
83
            }
 
84
        }
 
85
 
 
86
        post_body = json.dumps(post_body)
 
87
        resp, body = self.post(url, post_body)
 
88
        self.validate_response(schema.add_remove_floating_ip, resp, body)
 
89
        return service_client.ResponseBody(resp, body)
 
90
 
 
91
    def is_resource_deleted(self, id):
 
92
        try:
 
93
            self.show_floating_ip(id)
 
94
        except lib_exc.NotFound:
 
95
            return True
 
96
        return False
 
97
 
 
98
    @property
 
99
    def resource_type(self):
 
100
        """Returns the primary type of resource this client works with."""
 
101
        return 'floating_ip'
 
102
 
 
103
    def list_floating_ip_pools(self, params=None):
 
104
        """Returns a list of all floating IP Pools."""
 
105
        url = 'os-floating-ip-pools'
 
106
        if params:
 
107
            url += '?%s' % urllib.urlencode(params)
 
108
 
 
109
        resp, body = self.get(url)
 
110
        body = json.loads(body)
 
111
        self.validate_response(schema.list_floating_ip_pools, resp, body)
 
112
        return service_client.ResponseBodyList(resp, body['floating_ip_pools'])
 
113
 
 
114
    def create_floating_ips_bulk(self, ip_range, pool, interface):
 
115
        """Allocate floating IPs in bulk."""
 
116
        post_body = {
 
117
            'ip_range': ip_range,
 
118
            'pool': pool,
 
119
            'interface': interface
 
120
        }
 
121
        post_body = json.dumps({'floating_ips_bulk_create': post_body})
 
122
        resp, body = self.post('os-floating-ips-bulk', post_body)
 
123
        body = json.loads(body)
 
124
        self.validate_response(schema.create_floating_ips_bulk, resp, body)
 
125
        return service_client.ResponseBody(resp,
 
126
                                           body['floating_ips_bulk_create'])
 
127
 
 
128
    def list_floating_ips_bulk(self):
 
129
        """Returns a list of all floating IPs bulk."""
 
130
        resp, body = self.get('os-floating-ips-bulk')
 
131
        body = json.loads(body)
 
132
        self.validate_response(schema.list_floating_ips_bulk, resp, body)
 
133
        return service_client.ResponseBodyList(resp, body['floating_ip_info'])
 
134
 
 
135
    def delete_floating_ips_bulk(self, ip_range):
 
136
        """Deletes the provided floating IPs bulk."""
 
137
        post_body = json.dumps({'ip_range': ip_range})
 
138
        resp, body = self.put('os-floating-ips-bulk/delete', post_body)
 
139
        body = json.loads(body)
 
140
        self.validate_response(schema.delete_floating_ips_bulk, resp, body)
 
141
        data = body['floating_ips_bulk_delete']
 
142
        return service_client.ResponseBodyData(resp, data)