1
# Copyright 2012 Canonical Ltd. This software is licensed under the
2
# GNU Affero General Public License version 3 (see the file LICENSE).
4
"""Command: start the cluster controller."""
6
from __future__ import (
18
from grp import getgrnam
22
from pwd import getpwnam
23
from time import sleep
29
from apiclient.maas_client import (
34
from celery.app import app_or_default
35
from provisioningserver.logging import task_logger
36
from provisioningserver.network import discover_networks
39
class ClusterControllerRejected(Exception):
40
"""Request to become a cluster controller has been rejected."""
43
def add_arguments(parser):
44
"""For use by :class:`MainScript`."""
46
'server_url', metavar='URL', help="URL to the MAAS region controller.")
48
'--user', '-u', metavar='USER', default='maas',
49
help="System user identity that should run the cluster controller.")
51
'--group', '-g', metavar='GROUP', default='maas',
52
help="System group that should run the cluster controller.")
55
def log_error(exception):
57
"Could not register with region controller: %s."
61
def make_anonymous_api_client(server_url):
62
"""Create an unauthenticated API client."""
63
return MAASClient(NoAuth(), MAASDispatcher(), server_url)
66
def get_cluster_uuid():
67
"""Read this cluster's UUID from the config."""
68
return app_or_default().conf.CLUSTER_UUID
71
def get_maas_celery_log():
72
"""Read location for MAAS Celery log file from the config."""
73
return app_or_default().conf.MAAS_CELERY_LOG
76
def get_maas_celerybeat_db():
77
"""Read location for MAAS Celery schedule file from the config."""
78
return app_or_default().conf.MAAS_CLUSTER_CELERY_DB
81
def register(server_url):
82
"""Request Rabbit connection details from the domain controller.
84
Offers this machine to the region controller as a potential cluster
87
:param server_url: URL to the region controller's MAAS API.
88
:return: A dict of connection details if this cluster controller has been
89
accepted, or `None` if there is no definite response yet. If there
90
is no definite response, retry this call later.
91
:raise ClusterControllerRejected: if this system has been rejected as a
94
known_responses = {httplib.OK, httplib.FORBIDDEN, httplib.ACCEPTED}
96
interfaces = json.dumps(discover_networks())
97
client = make_anonymous_api_client(server_url)
98
cluster_uuid = get_cluster_uuid()
100
response = client.post(
101
'api/1.0/nodegroups/', 'register',
102
interfaces=interfaces, uuid=cluster_uuid)
103
except HTTPError as e:
105
if e.code not in known_responses:
107
# Unknown error. Keep trying.
109
except URLError as e:
111
# Unknown error. Keep trying.
114
status_code = response.getcode()
116
if status_code == httplib.OK:
117
# Our application has been approved. Proceed.
118
return json.loads(response.read())
119
elif status_code == httplib.ACCEPTED:
120
# Our application is still waiting for approval. Keep trying.
122
elif status_code == httplib.FORBIDDEN:
123
# Our application has been rejected. Give up.
124
raise ClusterControllerRejected(
125
"This system has been rejected as a cluster controller.")
127
raise AssertionError("Unexpected return code: %r" % status_code)
130
def start_celery(connection_details, user, group):
131
broker_url = connection_details['BROKER_URL']
132
uid = getpwnam(user).pw_uid
133
gid = getgrnam(group).gr_gid
135
# Copy environment, but also tell celeryd what broker to listen to.
136
env = dict(os.environ, CELERY_BROKER_URL=broker_url)
140
'--logfile=%s' % get_maas_celery_log(),
141
'--schedule=%s' % get_maas_celerybeat_db(),
144
'-Q', get_cluster_uuid(),
147
# Change gid first, just in case changing the uid might deprive
148
# us of the privileges required to setgid.
152
os.execvpe(command[0], command, env=env)
155
def request_refresh(server_url):
156
client = make_anonymous_api_client(server_url)
158
client.post('api/1.0/nodegroups/', 'refresh_workers')
159
except URLError as e:
161
"Could not request secrets from region controller: %s"
165
def start_up(server_url, connection_details, user, group):
166
"""We've been accepted as a cluster controller; start doing the job.
168
This starts up celeryd, listening to the broker that the region
169
controller pointed us to, and on the appropriate queue.
171
# Get the region controller to send out credentials. If it arrives
172
# before celeryd has started up, we should find the message waiting
173
# in our queue. Even if we're new and the queue did not exist yet,
174
# the arriving task will create the queue.
175
request_refresh(server_url)
176
start_celery(connection_details, user=user, group=group)
180
"""Start the cluster controller.
182
If this system is still awaiting approval as a cluster controller, this
183
command will keep looping until it gets a definite answer.
185
connection_details = register(args.server_url)
186
while connection_details is None:
188
connection_details = register(args.server_url)
190
args.server_url, connection_details, user=args.user, group=args.group)