3
from twisted.internet.defer import inlineCallbacks, returnValue
5
from juju.errors import ConstraintError
6
from juju.lib import serializer
7
from juju.state.agent import AgentStateMixin
8
from juju.state.environment import EnvironmentStateManager
9
from juju.state.errors import MachineStateNotFound, MachineStateInUse
10
from juju.state.base import StateBase
11
from juju.state.utils import remove_tree, YAMLStateNodeMixin
14
class MachineStateManager(StateBase):
15
"""Manages the state of machines in an environment."""
18
def add_machine_state(self, constraints):
19
"""Create a new machine state.
21
@return: MachineState for the created machine.
23
if not constraints.complete:
24
raise ConstraintError(
25
"Unprovisionable machine: incomplete constraints")
26
machine_data = {"constraints": constraints.data}
27
path = yield self._client.create(
28
"/machines/machine-", serializer.dump(machine_data),
29
flags=zookeeper.SEQUENCE)
30
_, internal_id = path.rsplit("/", 1)
32
def add_machine(topology):
33
topology.add_machine(internal_id)
34
yield self._retry_topology_change(add_machine)
36
returnValue(MachineState(self._client, internal_id))
39
def remove_machine_state(self, machine_id):
40
"""Remove machine state identified by `machine_id` if present.
42
Returns True if machine state was actually removed.
44
internal_id = "machine-%010d" % machine_id
47
def remove_machine(topology):
48
# Removing a non-existing machine again won't fail, since
49
# the end intention is preserved. This makes dealing
50
# with concurrency easier.
51
if topology.has_machine(internal_id):
52
if topology.machine_has_units(internal_id):
53
raise MachineStateInUse(machine_id)
54
topology.remove_machine(internal_id)
57
must_delete[0] = False
58
yield self._retry_topology_change(remove_machine)
60
# If the process is interrupted here, this node will stay
61
# around, but it's not a big deal since it's not being
62
# referenced by the topology anymore.
63
yield remove_tree(self._client, "/machines/%s" % (internal_id,))
64
returnValue(must_delete[0])
67
def get_machine_state(self, machine_id):
68
"""Return deferred machine state with the given id.
70
@return MachineState with the given id.
72
@raise MachineStateNotFound if the id is not found.
74
if isinstance(machine_id, str) and machine_id.isdigit():
75
machine_id = int(machine_id)
76
if isinstance(machine_id, int):
77
internal_id = "machine-%010d" % machine_id
79
raise MachineStateNotFound(machine_id)
81
topology = yield self._read_topology()
82
if not topology.has_machine(internal_id):
83
raise MachineStateNotFound(machine_id)
85
machine_state = MachineState(self._client, internal_id)
86
returnValue(machine_state)
89
def get_all_machine_states(self):
90
"""Get information on all machines.
92
@return: list of MachineState instances.
94
topology = yield self._read_topology()
96
for machine_id in topology.get_machines():
97
# topology yields internal ids -> map to public
98
machine_state = MachineState(self._client, machine_id)
99
machines.append(machine_state)
101
returnValue(machines)
103
def watch_machine_states(self, callback):
104
"""Observe changes in the known machines through the watch function.
106
@param callback: A function/method which accepts two sets of machine
107
ids: the old machines, and the new ones. The old machines set
108
variable will be None the first time this function is called.
110
Note that there are no guarantees that this function will be
111
called once for *every* change in the topology, which means
112
that multiple modifications may be observed as a single call.
114
This method currently sets a perpetual watch (errors
115
will make it bail out). To stop the watch cleanly raise an
116
juju.state.errors.StopWatch exception.
119
def watch_topology(old_topology, new_topology):
120
if old_topology is None:
123
old_machines = set(_public_machine_id(x) for x in
124
old_topology.get_machines())
125
new_machines = set(_public_machine_id(x) for x in
126
new_topology.get_machines())
127
if old_machines != new_machines:
128
return callback(old_machines, new_machines)
130
return self._watch_topology(watch_topology)
133
class MachineState(StateBase, AgentStateMixin, YAMLStateNodeMixin):
135
def __init__(self, client, internal_id):
136
super(MachineState, self).__init__(client)
137
self._internal_id = internal_id
142
def __eq__(self, other):
143
if not isinstance(other, MachineState):
145
return self.id == other.id
148
return "<MachineState id:%s>" % self._internal_id
152
"""High-level id built using the sequence as an int."""
153
return _public_machine_id(self._internal_id)
156
def internal_id(self):
157
"""Machine's internal id, of the form machine-NNNNNNNNNN."""
158
return self._internal_id
162
"""Return the path within zookeeper.
164
This attribute should not be used outside of the .state
165
package or for debugging.
167
return "/machines/" + self.internal_id
169
def _get_agent_path(self):
170
"""Get the zookeeper path for the machine agent."""
171
return "%s/agent" % self._zk_path
173
def _node_missing(self):
174
raise MachineStateNotFound(self.id)
176
def set_instance_id(self, instance_id):
177
"""Set the provider-specific machine id in this machine state."""
178
return self._set_node_value("provider-machine-id", instance_id)
180
def get_instance_id(self):
181
"""Retrieve the provider-specific machine id for this machine."""
182
return self._get_node_value("provider-machine-id")
185
def get_constraints(self):
186
"""Get the machine's hardware constraints"""
187
# Note: machine constraints should not be settable; they're a snapshot
188
# of the constraints of the unit state for which they were created. (It
189
# makes no sense to arbitrarily declare that an m1.small is now a
190
# cc2.8xlarge, anyway.)
191
esm = EnvironmentStateManager(self._client)
192
constraint_set = yield esm.get_constraint_set()
193
data = yield self._get_node_value("constraints", {})
194
returnValue(constraint_set.load(data))
196
def watch_assigned_units(self, callback):
197
"""Observe changes in service units assigned to this machine.
199
@param callback: A function/method which accepts two sets of unit
200
names: the old assigned units, and the new ones. The old units
201
set variable will be None the first time this function is called,
202
and the new one will be None if the machine itself is ever
205
Note that there are no guarantees that this function will be
206
called once for *every* change in the topology, which means
207
that multiple modifications may be observed as a single call.
209
This method currently sets a perpetual watch (errors
210
will make it bail out). To stop the watch cleanly raise an
211
juju.state.errors.StopWatch exception.
213
return self._watch_topology(
214
_WatchAssignedUnits(self._internal_id, callback))
217
def get_all_service_unit_states(self):
218
# avoid circular imports by deferring the import until now
219
from juju.state.service import ServiceUnitState
221
topology = yield self._read_topology()
222
service_unit_states = []
223
for internal_service_unit_id in topology.get_service_units_in_machine(
225
internal_service_id = topology.get_service_unit_service(
226
internal_service_unit_id)
227
service_name = topology.get_service_name(internal_service_id)
228
unit_sequence = topology.get_service_unit_sequence(
229
internal_service_id, internal_service_unit_id)
230
service_unit_state = ServiceUnitState(
231
self._client, internal_service_id, service_name, unit_sequence,
232
internal_service_unit_id)
233
service_unit_states.append(service_unit_state)
234
returnValue(service_unit_states)
237
class _WatchAssignedUnits(object):
238
"""Helper to implement MachineState.watch_assigned_units(). See above."""
240
def __init__(self, internal_id, callback):
241
self._internal_id = internal_id
242
self._callback = callback
243
self._old_units = None
245
def __call__(self, old_topology, new_topology):
246
if new_topology.has_machine(self._internal_id):
247
unit_ids = new_topology.get_service_units_in_machine(
249
# Translate the internal ids to nice unit names.
250
new_units = self._get_unit_names(new_topology, unit_ids)
252
# Machine state is gone, so no units there of course. This can
253
# only be visible in practice if the change happens fast
254
# enough for the client to see the unassignment and removal as
255
# a single change, since the topology enforces
256
# unassignment-before-removal.
258
if (new_units or self._old_units) and new_units != self._old_units:
259
maybe_deferred = self._callback(self._old_units, new_units)
260
self._old_units = new_units
261
# The callback can return a deferred, to postpone its execution.
262
# As a side effect, this watch won't fire again until the returned
263
# deferred has not fired.
264
return maybe_deferred
266
def _get_unit_names(self, topology, internal_ids):
267
"""Translate internal ids to nice unit names."""
269
for internal_id in internal_ids:
270
service_id = topology.get_service_unit_service(internal_id)
272
topology.get_service_unit_name(service_id, internal_id))
276
def _public_machine_id(internal_id):
277
"""Convert an internal_id to an external one.
279
That's an implementation detail, and shouldn't be used elsewhere.
281
_, sequence = internal_id.rsplit("-", 1)