~free.ekanayaka/landscape-client/jaunty-1.5.0-0ubuntu0.9.04.0

« back to all changes in this revision

Viewing changes to landscape/broker/registration.py

  • Committer: Bazaar Package Importer
  • Author(s): Martin Pitt
  • Date: 2009-03-18 20:42:05 UTC
  • Revision ID: james.westby@ubuntu.com-20090318204205-v17ejhu5urdsrm7j
Tags: 1.0.28-0ubuntu1
New upstream release. (LP: #343954)

Show diffs side-by-side

added added

removed removed

Lines of Context:
6
6
 
7
7
from landscape.lib.twisted_util import gather_results
8
8
from landscape.lib.bpickle import loads
 
9
from landscape.lib.log import log_failure
9
10
 
10
11
 
11
12
EC2_API = "http://169.254.169.254/latest"
39
40
    computer_title = config_property("computer_title")
40
41
    account_name = config_property("account_name")
41
42
    registration_password = config_property("registration_password")
42
 
    otp = None
43
 
    instance_key = None
44
 
    hostname = None
45
43
 
46
44
    def __init__(self, config, persist):
47
45
        self._config = config
53
51
    An object from which registration can be requested of the server,
54
52
    and which will handle forced ID changes from the server.
55
53
 
56
 
    L{register} should be used to initial registration.
 
54
    L{register} should be used to perform initial registration.
57
55
    """
58
56
 
59
 
    def __init__(self, identity, reactor, exchange, message_store, cloud=False,
60
 
                 fetch_async=None):
 
57
    def __init__(self, config, identity, reactor, exchange, pinger,
 
58
                 message_store, cloud=False, fetch_async=None):
 
59
        self._config = config
61
60
        self._identity = identity
62
61
        self._reactor = reactor
63
62
        self._exchange = exchange
 
63
        self._pinger = pinger
64
64
        self._message_store = message_store
65
 
        self._reactor.call_on("run", self._handle_run)
 
65
        self._reactor.call_on("run", self._fetch_ec2_data)
66
66
        self._reactor.call_on("pre-exchange", self._handle_pre_exchange)
67
67
        self._reactor.call_on("exchange-done", self._handle_exchange_done)
68
68
        self._exchange.register_message("set-id", self._handle_set_id)
72
72
        self._should_register = None
73
73
        self._cloud = cloud
74
74
        self._fetch_async = fetch_async
 
75
        self._otp = None
 
76
        self._ec2_data = None
75
77
 
76
78
    def should_register(self):
77
79
        id = self._identity
96
98
        self._exchange.exchange()
97
99
        return result
98
100
 
99
 
    def _handle_run(self):
 
101
    def _extract_ec2_instance_data(self, raw_user_data, launch_index):
 
102
        """
 
103
        Given the raw string of EC2 User Data, parse it and return the dict of
 
104
        instance data for this particular instance.
 
105
 
 
106
        If the data can't be parsed, a debug message will be logged and None
 
107
        will be returned.
 
108
        """
 
109
        try:
 
110
            user_data = loads(raw_user_data)
 
111
        except ValueError:
 
112
            logging.debug("Got invalid user-data %r" % (raw_user_data,))
 
113
            return
 
114
 
 
115
        if not isinstance(user_data, dict):
 
116
            logging.debug("user-data %r is not a dict" % (user_data,))
 
117
            return
 
118
        for key in "otps", "exchange-url", "ping-url":
 
119
            if key not in user_data:
 
120
                logging.debug("user-data %r doesn't have key %r."
 
121
                              % (user_data, key))
 
122
                return
 
123
        if len(user_data["otps"]) <= launch_index:
 
124
            logging.debug("user-data %r doesn't have OTP for launch index %d"
 
125
                          % (user_data, launch_index))
 
126
            return
 
127
        return {"otp": user_data["otps"][launch_index],
 
128
                "exchange-url": user_data["exchange-url"],
 
129
                "ping-url": user_data["ping-url"]}
 
130
 
 
131
    def _fetch_ec2_data(self):
100
132
        id = self._identity
101
133
        if self._cloud and not id.secure_id:
102
134
            # Fetch data from the EC2 API, to be used later in the registration
103
135
            # process
104
 
            userdata_deferred = self._fetch_async(EC2_API + "/user-data")
105
 
            instance_key_deferred = self._fetch_async(
106
 
                EC2_API + "/meta-data/instance-id")
107
 
            hostname_deferred = self._fetch_async(
108
 
                EC2_API + "/meta-data/local-hostname")
109
 
            launch_index_deferred = self._fetch_async(
110
 
                EC2_API + "/meta-data/ami-launch-index")
111
 
            registration_data = gather_results([userdata_deferred,
112
 
                                                instance_key_deferred,
113
 
                                                hostname_deferred,
114
 
                                                launch_index_deferred],
115
 
                                                consume_errors=True)
116
 
            def got_data(results):
117
 
                got_otp = True
118
 
                launch_index = int(results[3])
119
 
                try:
120
 
                    user_data = loads(results[0])
121
 
                except ValueError:
122
 
                    logging.debug("Got invalid user-data %r" % (results[0],))
123
 
                    got_otp = False
124
 
                    user_data = {}
125
 
                if (not isinstance(user_data, (tuple, list))
126
 
                    or len(user_data) < launch_index + 1
127
 
                    or not "otp" in user_data[launch_index]):
128
 
                    logging.debug(
129
 
                        "OTP not present in user-data %r" % (user_data,))
130
 
                    got_otp = False
131
 
                if got_otp:
132
 
                    id.otp = user_data[launch_index]["otp"]
133
 
                id.instance_key = unicode(results[1])
134
 
                id.hostname= results[2]
135
 
 
136
 
            def got_error(error):
137
 
                logging.error(
138
 
                    "Got error while fetching meta-data: %r" % (error.value,))
139
 
 
140
 
            registration_data.addCallback(got_data)
141
 
            registration_data.addErrback(got_error)
 
136
            registration_data = gather_results([
 
137
                # We ignore errors from user-data because it's common for the
 
138
                # URL to return a 404 when the data is unavailable.
 
139
                self._fetch_async(EC2_API + "/user-data")
 
140
                    .addErrback(log_failure),
 
141
                # The rest of the fetches don't get protected because we just
 
142
                # fall back to regular registration if any of them don't work.
 
143
                self._fetch_async(EC2_API + "/meta-data/instance-id"),
 
144
                self._fetch_async(EC2_API + "/meta-data/reservation-id"),
 
145
                self._fetch_async(EC2_API + "/meta-data/local-hostname"),
 
146
                self._fetch_async(EC2_API + "/meta-data/public-hostname"),
 
147
                self._fetch_async(EC2_API + "/meta-data/ami-launch-index"),
 
148
                self._fetch_async(EC2_API + "/meta-data/kernel-id"),
 
149
                self._fetch_async(EC2_API + "/meta-data/ramdisk-id"),
 
150
                self._fetch_async(EC2_API + "/meta-data/ami-id"),
 
151
                ],
 
152
                consume_errors=True)
 
153
 
 
154
            def record_data(ec2_data):
 
155
                """Record the instance data returned by the EC2 API."""
 
156
                (raw_user_data, instance_key, reservation_key,
 
157
                 local_hostname, public_hostname, launch_index,
 
158
                 kernel_key, ramdisk_key, ami_key) = ec2_data
 
159
                self._ec2_data = {
 
160
                    "instance_key": instance_key,
 
161
                    "reservation_key": reservation_key,
 
162
                    "local_hostname": local_hostname,
 
163
                    "public_hostname": public_hostname,
 
164
                    "launch_index": launch_index,
 
165
                    "kernel_key": kernel_key,
 
166
                    "ramdisk_key": ramdisk_key,
 
167
                    "image_key": ami_key}
 
168
                for k, v in self._ec2_data.items():
 
169
                    self._ec2_data[k] = v.decode("utf-8")
 
170
                self._ec2_data["launch_index"] = int(
 
171
                    self._ec2_data["launch_index"])
 
172
 
 
173
                instance_data = self._extract_ec2_instance_data(
 
174
                    raw_user_data, int(launch_index))
 
175
                if instance_data is not None:
 
176
                    self._otp = instance_data["otp"]
 
177
                    exchange_url = instance_data["exchange-url"]
 
178
                    ping_url = instance_data["ping-url"]
 
179
                    self._exchange._transport.set_url(exchange_url)
 
180
                    self._pinger.set_url(ping_url)
 
181
                    self._config.url = exchange_url
 
182
                    self._config.ping_url = ping_url
 
183
                    self._config.write()
 
184
 
 
185
            def log_error(error):
 
186
                log_failure(error, msg="Got error while fetching meta-data: %r"
 
187
                            % (error.value,))
 
188
 
 
189
            # It sucks that this deferred is never returned
 
190
            registration_data.addCallback(record_data)
 
191
            registration_data.addErrback(log_error)
142
192
 
143
193
    def _handle_exchange_done(self):
144
194
        if self.should_register() and not self._should_register:
163
213
            id = self._identity
164
214
 
165
215
            self._message_store.delete_all_messages()
166
 
            if self._cloud:
167
 
                if id.otp:
 
216
            if self._cloud and self._ec2_data is not None:
 
217
                if self._otp:
168
218
                    logging.info("Queueing message to register with OTP")
169
219
                    message = {"type": "register-cloud-vm",
170
 
                               "otp": id.otp,
171
 
                               "instance_key": id.instance_key,
172
 
                               "hostname": id.hostname,
 
220
                               "otp": self._otp,
 
221
                               "hostname": socket.gethostname(),
173
222
                               "account_name": None,
174
 
                               "registration_password": None}
 
223
                               "registration_password": None,
 
224
                               }
 
225
                    message.update(self._ec2_data)
175
226
                    self._exchange.send(message)
176
227
                elif id.account_name:
177
228
                    logging.info("Queueing message to register with account "
178
229
                                 "%r as an EC2 instance." % (id.account_name,))
179
230
                    message = {"type": "register-cloud-vm",
180
231
                               "otp": None,
181
 
                               "instance_key": id.instance_key,
182
 
                               "hostname": id.hostname,
 
232
                               "hostname": socket.gethostname(),
183
233
                               "account_name": id.account_name,
184
234
                               "registration_password": \
185
235
                                   id.registration_password}
 
236
                    message.update(self._ec2_data)
186
237
                    self._exchange.send(message)
187
238
                else:
188
239
                    self._reactor.fire("registration-failed")
189
 
            else:
 
240
            elif id.account_name:
190
241
                with_word = ["without", "with"][bool(id.registration_password)]
191
242
                logging.info("Queueing message to register with account %r %s "
192
243
                             "a password." % (id.account_name, with_word))
196
247
                           "account_name": id.account_name,
197
248
                           "registration_password": id.registration_password,
198
249
                           "hostname": socket.gethostname()}
199
 
 
200
250
                self._exchange.send(message)
 
251
            else:
 
252
                self._reactor.fire("registration-failed")
201
253
 
202
254
    def _handle_set_id(self, message):
203
255
        """