15
16
ServiceUnitStateMachineAlreadyAssigned, ServiceStateNameInUse,
16
17
BadDescriptor, BadServiceStateName, NoUnusedMachines,
17
18
ServiceUnitDebugAlreadyEnabled, ServiceUnitResolvedAlreadyEnabled,
18
ServiceUnitRelationResolvedAlreadyEnabled, StopWatcher)
19
ServiceUnitRelationResolvedAlreadyEnabled, StopWatcher,
20
ServiceUnitUpgradeAlreadyEnabled)
19
21
from juju.state.charm import CharmStateManager
20
22
from juju.state.relation import ServiceRelationState, RelationStateManager
21
23
from juju.state.machine import _public_machine_id, MachineState
22
from juju.state.utils import remove_tree, dict_merge, YAMLState
24
from juju.state.utils import (
25
remove_tree, dict_merge, YAMLState, YAMLStateNodeMixin)
31
def _series_constraints(base_constraints, charm_id):
32
series = CharmURL.parse(charm_id).collection.series
33
return base_constraints.with_series(series)
28
36
class ServiceStateManager(StateBase):
29
37
"""Manages the state of services in an environment."""
32
def add_service_state(self, service_name, charm_state):
40
def add_service_state(self, service_name, charm_state, constraints):
33
41
"""Create a new service with the given name.
35
43
@param service_name: Unique name of the service created.
36
44
@param charm_state: CharmState for the service.
45
@param constraint_strs: list of constraint strings
38
47
@return: ServiceState for the created service.
49
charm_id = charm_state.id
50
constraints = _series_constraints(constraints, charm_id)
52
"charm": charm_id, "constraints": constraints.data}
40
53
# charm metadata is always decoded into unicode, ensure any
41
54
# serialized state references strings to avoid tying to py runtime.
42
service_details = {"charm": charm_state.id}
43
55
node_data = yaml.safe_dump(service_details)
44
56
path = yield self._client.create("/services/service-", node_data,
45
57
flags=zookeeper.SEQUENCE)
175
189
relation_type=spec["interface"],
176
190
relation_name=relation_name,
177
191
relation_role=relation_role))
193
# Add in implicit relations
196
query_service_name, "juju-info", "juju-info", "server"))
198
# when offering matches implicit relations should be considered last.
199
# this cmpfunc pushes implicit methods to the end of the list of possible
201
def low_priority_implicit_cmp(rel1, rel2):
202
if rel1.relation_name.startswith("juju-"):
204
if rel2.relation_name.startswith("juju-"):
206
return cmp(rel1.relation_name, rel2.relation_name)
208
endpoints = sorted(endpoints, low_priority_implicit_cmp)
178
210
returnValue(endpoints)
181
213
def join_descriptors(self, descriptor1, descriptor2):
182
214
"""Return a list of pairs of RelationEndpoints joining descriptors."""
184
relation_set1 = yield self.get_relation_endpoints(descriptor1)
185
relation_set2 = yield self.get_relation_endpoints(descriptor2)
186
for relation1 in relation_set1:
187
for relation2 in relation_set2:
216
relations_1 = yield self.get_relation_endpoints(descriptor1)
217
relations_2 = yield self.get_relation_endpoints(descriptor2)
219
for relation1 in relations_1:
220
for relation2 in relations_2:
188
221
if relation1.may_relate_to(relation2):
189
222
result.append((relation1, relation2))
190
223
returnValue(result)
293
326
# Verify its a valid charm id
294
327
CharmURL.parse(charm_id).assert_revision()
296
def update_charm_id(content, stat):
297
data = yaml.load(content)
298
data["charm"] = charm_id
299
return yaml.safe_dump(data)
302
self._client, "/services/%s" % self.internal_id, update_charm_id)
328
yield self._set_node_value("charm", charm_id)
305
331
def get_charm_state(self):
306
332
"""Return the CharmState for this service."""
307
forumla_state_manager = CharmStateManager(self._client)
308
333
charm_id = yield self.get_charm_id()
309
charm = yield forumla_state_manager.get_charm_state(charm_id)
334
formula_state_manager = CharmStateManager(self._client)
335
charm = yield formula_state_manager.get_charm_state(charm_id)
310
336
returnValue(charm)
339
def set_constraints(self, constraints):
340
"""Set hardware requirements for any new machines running this service.
342
:param constraint_strs: a list of constraint strings describing the
343
*service*-level requirements for new hardware.
345
charm_id = yield self.get_charm_id()
346
constraints = _series_constraints(constraints, charm_id)
347
yield self._set_node_value("constraints", constraints.data)
350
def get_constraints(self):
351
"""Get *service*-level constraints for new hardware.
353
:return: a list of constraint strings describing the service-level
354
hardware constraints.
356
data = yield self._get_node_value("constraints")
357
returnValue(Constraints(data))
313
360
def add_unit_state(self):
314
361
"""Add a new service unit to this state.
316
363
@return: ServiceUnitState for the created unit.
365
constraints = yield self.get_constraints()
318
366
charm_id = yield self.get_charm_id()
319
unit_data = {"charm": charm_id}
367
unit_data = {"charm": charm_id, "constraints": constraints.data}
320
368
path = yield self._client.create(
321
369
"/units/unit-", yaml.dump(unit_data), flags=zookeeper.SEQUENCE)
723
771
return "%s/%d" % (self._service_name, self._unit_sequence)
775
return "/units/%s" % self._internal_id
726
778
def _ports_path(self):
727
779
"""The path for the open ports for this service unit."""
728
return "/units/%s/ports" % self._internal_id
780
return "%s/ports" % self._zk_path
730
782
def _get_agent_path(self):
731
783
"""Get the zookeeper path for the service unit agent."""
732
return "/units/%s/agent" % self._internal_id
784
return "%s/agent" % self._zk_path
786
def _check_valid_in(self, topology):
787
ok = topology.has_service(self._internal_service_id)
788
ok = ok and topology.has_service_unit(
789
self._internal_service_id, self._internal_id)
793
def _node_missing(self):
794
raise ServiceUnitStateNotFound(self._service_name)
735
796
def get_public_address(self):
736
797
"""Get the public address of the unit.
738
799
If the unit is unassigned, or its unit agent hasn't started this
739
800
value maybe None.
741
unit_data, stat = yield self._client.get(
742
"/units/%s" % self.internal_id)
743
data = yaml.load(unit_data)
744
returnValue(data.get("public-address"))
802
return self._get_node_value("public-address")
747
804
def set_public_address(self, public_address):
748
805
"""A unit's public address can be utilized to access the service.
750
807
The service must have been exposed for the service to be reachable
751
808
outside of the environment.
753
def update_private_address(content, stat):
754
data = yaml.load(content)
755
data["public-address"] = public_address
756
return yaml.safe_dump(data)
760
"/units/%s" % self.internal_id,
761
update_private_address)
810
return self._set_node_value("public-address", public_address)
764
812
def get_private_address(self):
765
813
"""Get the private address of the unit.
767
815
If the unit is unassigned, or its unit agent hasn't started this
768
816
value maybe None.
770
unit_data, stat = yield self._client.get(
771
"/units/%s" % self.internal_id)
772
data = yaml.load(unit_data)
773
returnValue(data.get("private-address"))
818
return self._get_node_value("private-address")
776
820
def set_private_address(self, private_address):
777
821
"""A unit's address private to the environment.
779
823
Other service will see and utilize this address for relations.
782
def update_private_address(content, stat):
783
data = yaml.load(content)
784
data["private-address"] = private_address
785
return yaml.safe_dump(data)
789
"/units/%s" % self.internal_id,
790
update_private_address)
825
return self._set_node_value("private-address", private_address)
793
827
def get_charm_id(self):
794
"""Get the charm identifier that the unit is currently running."""
795
unit_data, stat = yield self._client.get(
796
"/units/%s" % self.internal_id)
797
data = yaml.load(unit_data)
798
returnValue(data["charm"])
828
"""The id of the charm currently deployed on the unit"""
829
return self._get_node_value("charm")
801
832
def set_charm_id(self, charm_id):
802
833
"""Set the charm identifier that the unit is currently running."""
804
# Verify its a valid charm id
805
834
CharmURL.parse(charm_id).assert_revision()
807
def update_charm_id(content, stat):
808
data = yaml.load(content)
809
data["charm"] = charm_id
810
return yaml.safe_dump(data)
813
self._client, "/units/%s" % self.internal_id, update_charm_id)
835
yield self._set_node_value("charm", charm_id)
838
def get_constraints(self):
839
"""The hardware constraints for this unit"""
840
data = yield self._get_node_value("constraints")
841
returnValue(Constraints(data))
816
844
def get_assigned_machine_id(self):
817
845
"""Get the assigned machine id or None if the unit is not assigned.
819
847
topology = yield self._read_topology()
820
if not topology.has_service(self._internal_service_id):
822
if not topology.has_service_unit(self._internal_service_id,
848
self._check_valid_in(topology)
825
849
machine_id = topology.get_service_unit_machine(
826
850
self._internal_service_id, self._internal_id)
827
851
if machine_id is not None:
868
889
`assign_to_machine`.
870
891
# used to provide a writable result for the callback
871
unused_machine_internal_id_wrapper = [None]
892
scope_escaper = [None]
893
unit_constraints = yield self.get_constraints()
873
896
def assign_unused_unit(topology):
874
if not topology.has_service(self._internal_service_id) or \
875
not topology.has_service_unit(self._internal_service_id,
897
self._check_valid_in(topology)
879
899
# XXX We cannot reuse the "root" machine (used by the
880
900
# provisioning agent), but the topology metadata does not
881
901
# properly reflect its allocation. In the future, once it
882
902
# is managed like any other service, this special case can
884
root_machine = "machine-%010d" % 0
885
unused_machines = sorted([
886
m for m in topology.get_machines()
887
if not (m == root_machine or
888
topology.machine_has_units(m))])
889
if not unused_machines:
904
root_machine_id = "machine-%010d" % 0
905
for internal_id in topology.get_machines():
906
if internal_id == root_machine_id:
908
if topology.machine_has_units(internal_id):
910
machine_state = MachineState(self._client, internal_id)
911
machine_constraints = yield machine_state.get_constraints()
912
if machine_constraints.can_satisfy(unit_constraints):
890
915
raise NoUnusedMachines()
891
unused_machine_internal_id = unused_machines[0]
892
917
topology.assign_service_unit_to_machine(
893
918
self._internal_service_id,
894
919
self._internal_id,
895
unused_machine_internal_id)
896
unused_machine_internal_id_wrapper[0] = \
897
unused_machine_internal_id
920
machine_state.internal_id)
921
scope_escaper[0] = machine_state
899
923
yield self._retry_topology_change(assign_unused_unit)
900
returnValue(MachineState(
901
self._client, unused_machine_internal_id_wrapper[0]))
924
returnValue(scope_escaper[0])
903
926
def unassign_from_machine(self):
904
927
"""Unassign this service unit from whatever machine it's assigned to.
907
930
def unassign_unit(topology):
908
if not topology.has_service(self._internal_service_id) or \
909
not topology.has_service_unit(self._internal_service_id,
931
self._check_valid_in(topology)
913
# If for whatever reason it's already not assigned to a
933
# If for whatever reason it's not already assigned to a
914
934
# machine, ignore it and move forward so that we don't
915
935
# have to deal with conflicts.
916
936
machine_id = topology.get_service_unit_machine(
998
1020
of the current state. It is only a reflection of some change
999
1021
happening, to inform watch users should fetch the current value.
1001
debug_path = "/units/%s/debug" % self._internal_id
1003
1024
@inlineCallbacks
1004
1025
def watcher(change_event):
1005
1026
if permanent and self._client.connected:
1006
exists_d, watch_d = self._client.exists_and_watch(debug_path)
1027
exists_d, watch_d = self._client.exists_and_watch(
1028
self._hook_debug_path)
1008
1030
yield callback(change_event)
1011
1033
watch_d.addCallback(watcher)
1013
exists_d, watch_d = self._client.exists_and_watch(debug_path)
1035
exists_d, watch_d = self._client.exists_and_watch(
1036
self._hook_debug_path)
1014
1037
exists = yield exists_d
1015
1038
# Setup the watch deferred callback after the user defined callback
1016
1039
# has returned successfully from the existence invocation.
1020
1043
# Wait on the first callback, reflecting present state, not a zk watch
1021
1044
yield callback_d
1047
def _upgrade_flag_path(self):
1048
return "%s/upgrade" % self._zk_path
1023
1050
@inlineCallbacks
1024
def set_upgrade_flag(self):
1051
def set_upgrade_flag(self, force=False):
1025
1052
"""Inform the unit it should perform an upgrade.
1027
upgrade_path = "/units/%s/upgrade" % self._internal_id
1029
yield self._client.create(upgrade_path)
1030
except zookeeper.NodeExistsException:
1031
# We get to the same end state
1054
assert isinstance(force, bool), "Invalid force upgrade flag"
1056
def update(content, stat):
1058
flags = dict(force=force)
1059
return yaml.dump(flags)
1061
flags = yaml.load(content)
1062
if not isinstance(flags, dict):
1063
flags = dict(force=force)
1064
return yaml.dump(flags)
1066
if flags['force'] != force:
1067
raise ServiceUnitUpgradeAlreadyEnabled(self.unit_name)
1071
yield retry_change(self._client,
1072
self._upgrade_flag_path,
1034
1075
@inlineCallbacks
1035
1076
def get_upgrade_flag(self):
1036
"""Returns a boolean denoting if the upgrade flag is set.
1077
"""Returns a dictionary containing the upgrade flag or False.
1038
upgrade_path = "/units/%s/upgrade" % self._internal_id
1039
stat = yield self._client.exists(upgrade_path)
1040
returnValue(bool(stat))
1080
content, stat = yield self._client.get(self._upgrade_flag_path)
1083
except zookeeper.NoNodeException:
1085
returnValue(yaml.load(content))
1042
1087
@inlineCallbacks
1043
1088
def clear_upgrade_flag(self):