~juju/ubuntu/quantal/juju/0.6

« back to all changes in this revision

Viewing changes to juju/state/machine.py

  • Committer: Martin Packman
  • Date: 2013-04-03 17:01:05 UTC
  • mfrom: (0.1.600 0.6)
  • Revision ID: martin.packman@canonical.com-20130403170105-pk2c8cgtsgaj8ykx
New upstream 0.6.1 release and fix history

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import zookeeper
 
2
 
 
3
from twisted.internet.defer import inlineCallbacks, returnValue
 
4
 
 
5
from juju.errors import ConstraintError
 
6
from juju.lib import serializer
 
7
from juju.state.agent import AgentStateMixin
 
8
from juju.state.environment import EnvironmentStateManager
 
9
from juju.state.errors import MachineStateNotFound, MachineStateInUse
 
10
from juju.state.base import StateBase
 
11
from juju.state.utils import remove_tree, YAMLStateNodeMixin
 
12
 
 
13
 
 
14
class MachineStateManager(StateBase):
 
15
    """Manages the state of machines in an environment."""
 
16
 
 
17
    @inlineCallbacks
 
18
    def add_machine_state(self, constraints):
 
19
        """Create a new machine state.
 
20
 
 
21
        @return: MachineState for the created machine.
 
22
        """
 
23
        if not constraints.complete:
 
24
            raise ConstraintError(
 
25
                "Unprovisionable machine: incomplete constraints")
 
26
        machine_data = {"constraints": constraints.data}
 
27
        path = yield self._client.create(
 
28
            "/machines/machine-", serializer.dump(machine_data),
 
29
            flags=zookeeper.SEQUENCE)
 
30
        _, internal_id = path.rsplit("/", 1)
 
31
 
 
32
        def add_machine(topology):
 
33
            topology.add_machine(internal_id)
 
34
        yield self._retry_topology_change(add_machine)
 
35
 
 
36
        returnValue(MachineState(self._client, internal_id))
 
37
 
 
38
    @inlineCallbacks
 
39
    def remove_machine_state(self, machine_id):
 
40
        """Remove machine state identified by `machine_id` if present.
 
41
 
 
42
        Returns True if machine state was actually removed.
 
43
        """
 
44
        internal_id = "machine-%010d" % machine_id
 
45
        must_delete = [False]
 
46
 
 
47
        def remove_machine(topology):
 
48
            # Removing a non-existing machine again won't fail, since
 
49
            # the end intention is preserved.  This makes dealing
 
50
            # with concurrency easier.
 
51
            if topology.has_machine(internal_id):
 
52
                if topology.machine_has_units(internal_id):
 
53
                    raise MachineStateInUse(machine_id)
 
54
                topology.remove_machine(internal_id)
 
55
                must_delete[0] = True
 
56
            else:
 
57
                must_delete[0] = False
 
58
        yield self._retry_topology_change(remove_machine)
 
59
        if must_delete[0]:
 
60
            # If the process is interrupted here, this node will stay
 
61
            # around, but it's not a big deal since it's not being
 
62
            # referenced by the topology anymore.
 
63
            yield remove_tree(self._client, "/machines/%s" % (internal_id,))
 
64
        returnValue(must_delete[0])
 
65
 
 
66
    @inlineCallbacks
 
67
    def get_machine_state(self, machine_id):
 
68
        """Return deferred machine state with the given id.
 
69
 
 
70
        @return MachineState with the given id.
 
71
 
 
72
        @raise MachineStateNotFound if the id is not found.
 
73
        """
 
74
        if isinstance(machine_id, str) and machine_id.isdigit():
 
75
            machine_id = int(machine_id)
 
76
        if isinstance(machine_id, int):
 
77
            internal_id = "machine-%010d" % machine_id
 
78
        else:
 
79
            raise MachineStateNotFound(machine_id)
 
80
 
 
81
        topology = yield self._read_topology()
 
82
        if not topology.has_machine(internal_id):
 
83
            raise MachineStateNotFound(machine_id)
 
84
 
 
85
        machine_state = MachineState(self._client, internal_id)
 
86
        returnValue(machine_state)
 
87
 
 
88
    @inlineCallbacks
 
89
    def get_all_machine_states(self):
 
90
        """Get information on all machines.
 
91
 
 
92
        @return: list of MachineState instances.
 
93
        """
 
94
        topology = yield self._read_topology()
 
95
        machines = []
 
96
        for machine_id in topology.get_machines():
 
97
            # topology yields internal ids -> map to public
 
98
            machine_state = MachineState(self._client, machine_id)
 
99
            machines.append(machine_state)
 
100
 
 
101
        returnValue(machines)
 
102
 
 
103
    def watch_machine_states(self, callback):
 
104
        """Observe changes in the known machines through the watch function.
 
105
 
 
106
        @param callback: A function/method which accepts two sets of machine
 
107
            ids: the old machines, and the new ones. The old machines set
 
108
            variable will be None the first time this function is called.
 
109
 
 
110
        Note that there are no guarantees that this function will be
 
111
        called once for *every* change in the topology, which means
 
112
        that multiple modifications may be observed as a single call.
 
113
 
 
114
        This method currently sets a perpetual watch (errors
 
115
        will make it bail out). To stop the watch cleanly raise an
 
116
        juju.state.errors.StopWatch exception.
 
117
        """
 
118
 
 
119
        def watch_topology(old_topology, new_topology):
 
120
            if old_topology is None:
 
121
                old_machines = None
 
122
            else:
 
123
                old_machines = set(_public_machine_id(x) for x in
 
124
                                   old_topology.get_machines())
 
125
            new_machines = set(_public_machine_id(x) for x in
 
126
                               new_topology.get_machines())
 
127
            if old_machines != new_machines:
 
128
                return callback(old_machines, new_machines)
 
129
 
 
130
        return self._watch_topology(watch_topology)
 
131
 
 
132
 
 
133
class MachineState(StateBase, AgentStateMixin, YAMLStateNodeMixin):
 
134
 
 
135
    def __init__(self, client, internal_id):
 
136
        super(MachineState, self).__init__(client)
 
137
        self._internal_id = internal_id
 
138
 
 
139
    def __hash__(self):
 
140
        return hash(self.id)
 
141
 
 
142
    def __eq__(self, other):
 
143
        if not isinstance(other, MachineState):
 
144
            return False
 
145
        return self.id == other.id
 
146
 
 
147
    def __str__(self):
 
148
        return "<MachineState id:%s>" % self._internal_id
 
149
 
 
150
    @property
 
151
    def id(self):
 
152
        """High-level id built using the sequence as an int."""
 
153
        return _public_machine_id(self._internal_id)
 
154
 
 
155
    @property
 
156
    def internal_id(self):
 
157
        """Machine's internal id, of the form machine-NNNNNNNNNN."""
 
158
        return self._internal_id
 
159
 
 
160
    @property
 
161
    def _zk_path(self):
 
162
        """Return the path within zookeeper.
 
163
 
 
164
        This attribute should not be used outside of the .state
 
165
        package or for debugging.
 
166
        """
 
167
        return "/machines/" + self.internal_id
 
168
 
 
169
    def _get_agent_path(self):
 
170
        """Get the zookeeper path for the machine agent."""
 
171
        return "%s/agent" % self._zk_path
 
172
 
 
173
    def _node_missing(self):
 
174
        raise MachineStateNotFound(self.id)
 
175
 
 
176
    def set_instance_id(self, instance_id):
 
177
        """Set the provider-specific machine id in this machine state."""
 
178
        return self._set_node_value("provider-machine-id", instance_id)
 
179
 
 
180
    def get_instance_id(self):
 
181
        """Retrieve the provider-specific machine id for this machine."""
 
182
        return self._get_node_value("provider-machine-id")
 
183
 
 
184
    @inlineCallbacks
 
185
    def get_constraints(self):
 
186
        """Get the machine's hardware constraints"""
 
187
        # Note: machine constraints should not be settable; they're a snapshot
 
188
        # of the constraints of the unit state for which they were created. (It
 
189
        # makes no sense to arbitrarily declare that an m1.small is now a
 
190
        # cc2.8xlarge, anyway.)
 
191
        esm = EnvironmentStateManager(self._client)
 
192
        constraint_set = yield esm.get_constraint_set()
 
193
        data = yield self._get_node_value("constraints", {})
 
194
        returnValue(constraint_set.load(data))
 
195
 
 
196
    def watch_assigned_units(self, callback):
 
197
        """Observe changes in service units assigned to this machine.
 
198
 
 
199
        @param callback: A function/method which accepts two sets of unit
 
200
            names: the old assigned units, and the new ones. The old units
 
201
            set variable will be None the first time this function is called,
 
202
            and the new one will be None if the machine itself is ever
 
203
            deleted.
 
204
 
 
205
        Note that there are no guarantees that this function will be
 
206
        called once for *every* change in the topology, which means
 
207
        that multiple modifications may be observed as a single call.
 
208
 
 
209
        This method currently sets a perpetual watch (errors
 
210
        will make it bail out). To stop the watch cleanly raise an
 
211
        juju.state.errors.StopWatch exception.
 
212
        """
 
213
        return self._watch_topology(
 
214
            _WatchAssignedUnits(self._internal_id, callback))
 
215
 
 
216
    @inlineCallbacks
 
217
    def get_all_service_unit_states(self):
 
218
        # avoid circular imports by deferring the import until now
 
219
        from juju.state.service import ServiceUnitState
 
220
 
 
221
        topology = yield self._read_topology()
 
222
        service_unit_states = []
 
223
        for internal_service_unit_id in topology.get_service_units_in_machine(
 
224
                self.internal_id):
 
225
            internal_service_id = topology.get_service_unit_service(
 
226
                internal_service_unit_id)
 
227
            service_name = topology.get_service_name(internal_service_id)
 
228
            unit_sequence = topology.get_service_unit_sequence(
 
229
                internal_service_id, internal_service_unit_id)
 
230
            service_unit_state = ServiceUnitState(
 
231
                self._client, internal_service_id, service_name, unit_sequence,
 
232
                internal_service_unit_id)
 
233
            service_unit_states.append(service_unit_state)
 
234
        returnValue(service_unit_states)
 
235
 
 
236
 
 
237
class _WatchAssignedUnits(object):
 
238
    """Helper to implement MachineState.watch_assigned_units(). See above."""
 
239
 
 
240
    def __init__(self, internal_id, callback):
 
241
        self._internal_id = internal_id
 
242
        self._callback = callback
 
243
        self._old_units = None
 
244
 
 
245
    def __call__(self, old_topology, new_topology):
 
246
        if new_topology.has_machine(self._internal_id):
 
247
            unit_ids = new_topology.get_service_units_in_machine(
 
248
                self._internal_id)
 
249
            # Translate the internal ids to nice unit names.
 
250
            new_units = self._get_unit_names(new_topology, unit_ids)
 
251
        else:
 
252
            # Machine state is gone, so no units there of course. This can
 
253
            # only be visible in practice if the change happens fast
 
254
            # enough for the client to see the unassignment and removal as
 
255
            # a single change, since the topology enforces
 
256
            # unassignment-before-removal.
 
257
            new_units = set()
 
258
        if (new_units or self._old_units) and new_units != self._old_units:
 
259
            maybe_deferred = self._callback(self._old_units, new_units)
 
260
            self._old_units = new_units
 
261
            # The callback can return a deferred, to postpone its execution.
 
262
            # As a side effect, this watch won't fire again until the returned
 
263
            # deferred has not fired.
 
264
            return maybe_deferred
 
265
 
 
266
    def _get_unit_names(self, topology, internal_ids):
 
267
        """Translate internal ids to nice unit names."""
 
268
        unit_names = set()
 
269
        for internal_id in internal_ids:
 
270
            service_id = topology.get_service_unit_service(internal_id)
 
271
            unit_names.add(
 
272
                topology.get_service_unit_name(service_id, internal_id))
 
273
        return unit_names
 
274
 
 
275
 
 
276
def _public_machine_id(internal_id):
 
277
    """Convert an internal_id to an external one.
 
278
 
 
279
    That's an implementation detail, and shouldn't be used elsewhere.
 
280
    """
 
281
    _, sequence = internal_id.rsplit("-", 1)
 
282
    return int(sequence)