~lutostag/ubuntu/utopic/maas/1.5.2+packagefix

« back to all changes in this revision

Viewing changes to src/provisioningserver/rpc/clusterservice.py

  • Committer: Package Import Robot
  • Author(s): Andres Rodriguez, Graham Binns, Andres Rodriguez, Julian Edwards, Seth Arnold
  • Date: 2014-02-15 12:08:23 UTC
  • mfrom: (1.2.22)
  • Revision ID: package-import@ubuntu.com-20140215120823-kew2lqrw5qgww721
Tags: 1.5+bzr1948-0ubuntu1
* New upstream release.

[ Graham Binns ]
* debian/control: Depends on python-jsonschema.

[ Andres Rodriguez ]
* debian/maas-region-controller-min.posinst: Make txlongpoll.yaml only
  readable by the app and not world readeable.
* debian/patches/02-pserv-config.patch: Refreshed.

[ Julian Edwards ]
* debian/extras/maas-cli renamed to debian/extras/maas, and introduce
  a deprecation warning in favour of using maas over maas-cli.
* debian/extras/maas renamed to debian/extras/maas-region-admin
* debian/maas-cli.install: install debian/extras/maas
* debian/maas-dns.postinst: Invoke maas-region-admin instead of maas
* debian/maas-region-controller-min.install: install maas-region-admin
  instead of maas
* debian/maas-region-controller.postinst: Invoke maas-region-admin instead
  of maas
* debian/maas-cli.links: Link from maas to maas-cli for backward compat.

[ Seth Arnold ]
* debian/maas-region-controller-min.postinst: Make sure txlongpoll.yaml
  gets correct permissions on upgrade (LP: #1254034)

Show diffs side-by-side

added added

removed removed

Lines of Context:
13
13
 
14
14
__metaclass__ = type
15
15
__all__ = [
 
16
    "ClusterClientService",
16
17
    "ClusterService",
17
18
]
18
19
 
 
20
from itertools import starmap
 
21
import json
 
22
from operator import itemgetter
 
23
import random
 
24
from urlparse import urlparse
 
25
 
 
26
from apiclient.utils import ascii_url
 
27
from provisioningserver.cluster_config import get_maas_url
19
28
from provisioningserver.config import Config
20
29
from provisioningserver.pxe import tftppath
21
30
from provisioningserver.rpc import cluster
22
 
from twisted.application.internet import StreamServerEndpointService
23
 
from twisted.internet.endpoints import TCP4ServerEndpoint
 
31
from twisted.application.internet import (
 
32
    StreamServerEndpointService,
 
33
    TimerService,
 
34
    )
 
35
from twisted.internet.address import HostnameAddress
 
36
from twisted.internet.endpoints import (
 
37
    connectProtocol,
 
38
    TCP4ClientEndpoint,
 
39
    TCP4ServerEndpoint,
 
40
    )
24
41
from twisted.internet.protocol import Factory
25
42
from twisted.protocols import amp
26
 
 
27
 
 
28
 
class Cluster(amp.AMP):
 
43
from twisted.python import log
 
44
from twisted.web.client import getPage
 
45
 
 
46
 
 
47
class Cluster(amp.AMP, object):
 
48
    """The RPC protocol supported by a cluster controller.
 
49
 
 
50
    This can be used on the client or server end of a connection; once a
 
51
    connection is established, AMP is symmetric.
 
52
    """
29
53
 
30
54
    @cluster.ListBootImages.responder
31
55
    def list_boot_images(self):
34
58
        return {"images": images}
35
59
 
36
60
 
37
 
class ClusterFactory(Factory):
38
 
 
39
 
    protocol = Cluster
40
 
 
41
 
 
42
61
class ClusterService(StreamServerEndpointService):
 
62
    """A cluster controller RPC service.
 
63
 
 
64
    This is a service - in the Twisted sense - that exposes the
 
65
    ``Cluster`` protocol on the given port.
 
66
    """
43
67
 
44
68
    def __init__(self, reactor, port):
45
69
        super(ClusterService, self).__init__(
46
 
            TCP4ServerEndpoint(reactor, port), ClusterFactory())
 
70
            TCP4ServerEndpoint(reactor, port),
 
71
            Factory.forProtocol(Cluster))
 
72
 
 
73
 
 
74
class ClusterClient(Cluster):
 
75
    """The RPC protocol supported by a cluster controller, client version.
 
76
 
 
77
    This works hand-in-hand with ``ClusterClientService``, maintaining
 
78
    the latter's `connections` map.
 
79
 
 
80
    :ivar address: The :class:`HostnameAddress` of the remote endpoint.
 
81
 
 
82
    :ivar service: A reference to the :class:`ClusterClientService` that
 
83
        made self.
 
84
    """
 
85
 
 
86
    address = None
 
87
    service = None
 
88
 
 
89
    def connectionMade(self):
 
90
        super(ClusterClient, self).connectionMade()
 
91
        if not self.service.running:
 
92
            self.transport.loseConnection()
 
93
        elif self.address in self.service.connections:
 
94
            self.transport.loseConnection()
 
95
        else:
 
96
            self.service.connections[self.address] = self
 
97
 
 
98
    def connectionLost(self, reason):
 
99
        if self.address in self.service.connections:
 
100
            if self.service.connections[self.address] is self:
 
101
                del self.service.connections[self.address]
 
102
        super(ClusterClient, self).connectionLost(reason)
 
103
 
 
104
 
 
105
class ClusterClientService(TimerService, object):
 
106
    """A cluster controller RPC client service.
 
107
 
 
108
    This is a service - in the Twisted sense - that connects to a set of
 
109
    remote AMP endpoints. The endpoints are obtained from a view in the
 
110
    region controller and periodically refreshed; this list is used to
 
111
    update the connections maintained in this service.
 
112
 
 
113
    :ivar connections: A mapping of endpoints to protocol instances
 
114
        connected to it.
 
115
    """
 
116
 
 
117
    def __init__(self, reactor):
 
118
        super(ClusterClientService, self).__init__(
 
119
            self._get_random_interval(), self.update)
 
120
        self.connections = {}
 
121
        self.clock = reactor
 
122
 
 
123
    def update(self):
 
124
        """Refresh outgoing connections.
 
125
 
 
126
        This obtains a list of endpoints from the region then connects
 
127
        to new ones and drops connections to those no longer used.
 
128
        """
 
129
        # 0. Update interval.
 
130
        self._update_interval()
 
131
        # 1. Obtain RPC endpoints.
 
132
        d = getPage(self._get_rpc_info_url())
 
133
        d.addCallback(json.loads)
 
134
        d.addCallback(itemgetter("endpoints"))
 
135
        # 2. Open connections to new endpoints.
 
136
        # 3. Close connections to defunct endpoints.
 
137
        d.addCallback(self._update_connections)
 
138
        # 4. Log errors.
 
139
        d.addErrback(log.err)
 
140
        return d
 
141
 
 
142
    @staticmethod
 
143
    def _get_rpc_info_url():
 
144
        """Return the URL to the RPC infomation page on the region."""
 
145
        url = urlparse(get_maas_url())
 
146
        url = url._replace(path="%s/rpc" % url.path.rstrip("/"))
 
147
        url = url.geturl()
 
148
        return ascii_url(url)
 
149
 
 
150
    @staticmethod
 
151
    def _get_random_interval():
 
152
        """Return a random interval between 30 and 90 seconds."""
 
153
        return random.randint(30, 90)
 
154
 
 
155
    def _update_interval(self):
 
156
        """Change the interval randomly to avoid stampedes of clusters."""
 
157
        self._loop.interval = self.step = self._get_random_interval()
 
158
 
 
159
    def _update_connections(self, hostports):
 
160
        connections_established = set(self.connections)
 
161
        connections_desired = set(starmap(HostnameAddress, hostports))
 
162
        self._make_connections(connections_desired - connections_established)
 
163
        self._drop_connections(connections_established - connections_desired)
 
164
 
 
165
    def _make_connections(self, addresses):
 
166
        for address in addresses:
 
167
            endpoint = TCP4ClientEndpoint(
 
168
                self.clock, address.hostname, address.port)
 
169
            protocol = ClusterClient()
 
170
            protocol.address = address
 
171
            protocol.service = self
 
172
            connectProtocol(endpoint, protocol)
 
173
 
 
174
    def _drop_connections(self, addresses):
 
175
        for address in addresses:
 
176
            self.connections[address].loseConnection()