53
51
An object from which registration can be requested of the server,
54
52
and which will handle forced ID changes from the server.
56
L{register} should be used to initial registration.
54
L{register} should be used to perform initial registration.
59
def __init__(self, identity, reactor, exchange, message_store, cloud=False,
57
def __init__(self, config, identity, reactor, exchange, pinger,
58
message_store, cloud=False, fetch_async=None):
61
60
self._identity = identity
62
61
self._reactor = reactor
63
62
self._exchange = exchange
64
64
self._message_store = message_store
65
self._reactor.call_on("run", self._handle_run)
65
self._reactor.call_on("run", self._fetch_ec2_data)
66
66
self._reactor.call_on("pre-exchange", self._handle_pre_exchange)
67
67
self._reactor.call_on("exchange-done", self._handle_exchange_done)
68
68
self._exchange.register_message("set-id", self._handle_set_id)
96
98
self._exchange.exchange()
99
def _handle_run(self):
101
def _extract_ec2_instance_data(self, raw_user_data, launch_index):
103
Given the raw string of EC2 User Data, parse it and return the dict of
104
instance data for this particular instance.
106
If the data can't be parsed, a debug message will be logged and None
110
user_data = loads(raw_user_data)
112
logging.debug("Got invalid user-data %r" % (raw_user_data,))
115
if not isinstance(user_data, dict):
116
logging.debug("user-data %r is not a dict" % (user_data,))
118
for key in "otps", "exchange-url", "ping-url":
119
if key not in user_data:
120
logging.debug("user-data %r doesn't have key %r."
123
if len(user_data["otps"]) <= launch_index:
124
logging.debug("user-data %r doesn't have OTP for launch index %d"
125
% (user_data, launch_index))
127
return {"otp": user_data["otps"][launch_index],
128
"exchange-url": user_data["exchange-url"],
129
"ping-url": user_data["ping-url"]}
131
def _fetch_ec2_data(self):
100
132
id = self._identity
101
133
if self._cloud and not id.secure_id:
102
134
# Fetch data from the EC2 API, to be used later in the registration
104
userdata_deferred = self._fetch_async(EC2_API + "/user-data")
105
instance_key_deferred = self._fetch_async(
106
EC2_API + "/meta-data/instance-id")
107
hostname_deferred = self._fetch_async(
108
EC2_API + "/meta-data/local-hostname")
109
launch_index_deferred = self._fetch_async(
110
EC2_API + "/meta-data/ami-launch-index")
111
registration_data = gather_results([userdata_deferred,
112
instance_key_deferred,
114
launch_index_deferred],
116
def got_data(results):
118
launch_index = int(results[3])
120
user_data = loads(results[0])
122
logging.debug("Got invalid user-data %r" % (results[0],))
125
if (not isinstance(user_data, (tuple, list))
126
or len(user_data) < launch_index + 1
127
or not "otp" in user_data[launch_index]):
129
"OTP not present in user-data %r" % (user_data,))
132
id.otp = user_data[launch_index]["otp"]
133
id.instance_key = unicode(results[1])
134
id.hostname= results[2]
136
def got_error(error):
138
"Got error while fetching meta-data: %r" % (error.value,))
140
registration_data.addCallback(got_data)
141
registration_data.addErrback(got_error)
136
registration_data = gather_results([
137
# We ignore errors from user-data because it's common for the
138
# URL to return a 404 when the data is unavailable.
139
self._fetch_async(EC2_API + "/user-data")
140
.addErrback(log_failure),
141
# The rest of the fetches don't get protected because we just
142
# fall back to regular registration if any of them don't work.
143
self._fetch_async(EC2_API + "/meta-data/instance-id"),
144
self._fetch_async(EC2_API + "/meta-data/reservation-id"),
145
self._fetch_async(EC2_API + "/meta-data/local-hostname"),
146
self._fetch_async(EC2_API + "/meta-data/public-hostname"),
147
self._fetch_async(EC2_API + "/meta-data/ami-launch-index"),
148
self._fetch_async(EC2_API + "/meta-data/kernel-id"),
149
self._fetch_async(EC2_API + "/meta-data/ramdisk-id"),
150
self._fetch_async(EC2_API + "/meta-data/ami-id"),
154
def record_data(ec2_data):
155
"""Record the instance data returned by the EC2 API."""
156
(raw_user_data, instance_key, reservation_key,
157
local_hostname, public_hostname, launch_index,
158
kernel_key, ramdisk_key, ami_key) = ec2_data
160
"instance_key": instance_key,
161
"reservation_key": reservation_key,
162
"local_hostname": local_hostname,
163
"public_hostname": public_hostname,
164
"launch_index": launch_index,
165
"kernel_key": kernel_key,
166
"ramdisk_key": ramdisk_key,
167
"image_key": ami_key}
168
for k, v in self._ec2_data.items():
169
self._ec2_data[k] = v.decode("utf-8")
170
self._ec2_data["launch_index"] = int(
171
self._ec2_data["launch_index"])
173
instance_data = self._extract_ec2_instance_data(
174
raw_user_data, int(launch_index))
175
if instance_data is not None:
176
self._otp = instance_data["otp"]
177
exchange_url = instance_data["exchange-url"]
178
ping_url = instance_data["ping-url"]
179
self._exchange._transport.set_url(exchange_url)
180
self._pinger.set_url(ping_url)
181
self._config.url = exchange_url
182
self._config.ping_url = ping_url
185
def log_error(error):
186
log_failure(error, msg="Got error while fetching meta-data: %r"
189
# It sucks that this deferred is never returned
190
registration_data.addCallback(record_data)
191
registration_data.addErrback(log_error)
143
193
def _handle_exchange_done(self):
144
194
if self.should_register() and not self._should_register:
163
213
id = self._identity
165
215
self._message_store.delete_all_messages()
216
if self._cloud and self._ec2_data is not None:
168
218
logging.info("Queueing message to register with OTP")
169
219
message = {"type": "register-cloud-vm",
171
"instance_key": id.instance_key,
172
"hostname": id.hostname,
221
"hostname": socket.gethostname(),
173
222
"account_name": None,
174
"registration_password": None}
223
"registration_password": None,
225
message.update(self._ec2_data)
175
226
self._exchange.send(message)
176
227
elif id.account_name:
177
228
logging.info("Queueing message to register with account "
178
229
"%r as an EC2 instance." % (id.account_name,))
179
230
message = {"type": "register-cloud-vm",
181
"instance_key": id.instance_key,
182
"hostname": id.hostname,
232
"hostname": socket.gethostname(),
183
233
"account_name": id.account_name,
184
234
"registration_password": \
185
235
id.registration_password}
236
message.update(self._ec2_data)
186
237
self._exchange.send(message)
188
239
self._reactor.fire("registration-failed")
240
elif id.account_name:
190
241
with_word = ["without", "with"][bool(id.registration_password)]
191
242
logging.info("Queueing message to register with account %r %s "
192
243
"a password." % (id.account_name, with_word))