14
14
__metaclass__ = type
16
"ClusterClientService",
20
from itertools import starmap
22
from operator import itemgetter
24
from urlparse import urlparse
26
from apiclient.utils import ascii_url
27
from provisioningserver.cluster_config import get_maas_url
19
28
from provisioningserver.config import Config
20
29
from provisioningserver.pxe import tftppath
21
30
from provisioningserver.rpc import cluster
22
from twisted.application.internet import StreamServerEndpointService
23
from twisted.internet.endpoints import TCP4ServerEndpoint
31
from twisted.application.internet import (
32
StreamServerEndpointService,
35
from twisted.internet.address import HostnameAddress
36
from twisted.internet.endpoints import (
24
41
from twisted.internet.protocol import Factory
25
42
from twisted.protocols import amp
28
class Cluster(amp.AMP):
43
from twisted.python import log
44
from twisted.web.client import getPage
47
class Cluster(amp.AMP, object):
48
"""The RPC protocol supported by a cluster controller.
50
This can be used on the client or server end of a connection; once a
51
connection is established, AMP is symmetric.
30
54
@cluster.ListBootImages.responder
31
55
def list_boot_images(self):
34
58
return {"images": images}
37
class ClusterFactory(Factory):
42
61
class ClusterService(StreamServerEndpointService):
62
"""A cluster controller RPC service.
64
This is a service - in the Twisted sense - that exposes the
65
``Cluster`` protocol on the given port.
44
68
def __init__(self, reactor, port):
45
69
super(ClusterService, self).__init__(
46
TCP4ServerEndpoint(reactor, port), ClusterFactory())
70
TCP4ServerEndpoint(reactor, port),
71
Factory.forProtocol(Cluster))
74
class ClusterClient(Cluster):
75
"""The RPC protocol supported by a cluster controller, client version.
77
This works hand-in-hand with ``ClusterClientService``, maintaining
78
the latter's `connections` map.
80
:ivar address: The :class:`HostnameAddress` of the remote endpoint.
82
:ivar service: A reference to the :class:`ClusterClientService` that
89
def connectionMade(self):
90
super(ClusterClient, self).connectionMade()
91
if not self.service.running:
92
self.transport.loseConnection()
93
elif self.address in self.service.connections:
94
self.transport.loseConnection()
96
self.service.connections[self.address] = self
98
def connectionLost(self, reason):
99
if self.address in self.service.connections:
100
if self.service.connections[self.address] is self:
101
del self.service.connections[self.address]
102
super(ClusterClient, self).connectionLost(reason)
105
class ClusterClientService(TimerService, object):
106
"""A cluster controller RPC client service.
108
This is a service - in the Twisted sense - that connects to a set of
109
remote AMP endpoints. The endpoints are obtained from a view in the
110
region controller and periodically refreshed; this list is used to
111
update the connections maintained in this service.
113
:ivar connections: A mapping of endpoints to protocol instances
117
def __init__(self, reactor):
118
super(ClusterClientService, self).__init__(
119
self._get_random_interval(), self.update)
120
self.connections = {}
124
"""Refresh outgoing connections.
126
This obtains a list of endpoints from the region then connects
127
to new ones and drops connections to those no longer used.
129
# 0. Update interval.
130
self._update_interval()
131
# 1. Obtain RPC endpoints.
132
d = getPage(self._get_rpc_info_url())
133
d.addCallback(json.loads)
134
d.addCallback(itemgetter("endpoints"))
135
# 2. Open connections to new endpoints.
136
# 3. Close connections to defunct endpoints.
137
d.addCallback(self._update_connections)
139
d.addErrback(log.err)
143
def _get_rpc_info_url():
144
"""Return the URL to the RPC infomation page on the region."""
145
url = urlparse(get_maas_url())
146
url = url._replace(path="%s/rpc" % url.path.rstrip("/"))
148
return ascii_url(url)
151
def _get_random_interval():
152
"""Return a random interval between 30 and 90 seconds."""
153
return random.randint(30, 90)
155
def _update_interval(self):
156
"""Change the interval randomly to avoid stampedes of clusters."""
157
self._loop.interval = self.step = self._get_random_interval()
159
def _update_connections(self, hostports):
160
connections_established = set(self.connections)
161
connections_desired = set(starmap(HostnameAddress, hostports))
162
self._make_connections(connections_desired - connections_established)
163
self._drop_connections(connections_established - connections_desired)
165
def _make_connections(self, addresses):
166
for address in addresses:
167
endpoint = TCP4ClientEndpoint(
168
self.clock, address.hostname, address.port)
169
protocol = ClusterClient()
170
protocol.address = address
171
protocol.service = self
172
connectProtocol(endpoint, protocol)
174
def _drop_connections(self, addresses):
175
for address in addresses:
176
self.connections[address].loseConnection()