~ahasenack/landscape-client/landscape-client-11.02-0ubuntu0.8.04.1

« back to all changes in this revision

Viewing changes to landscape/broker/client.py

  • Committer: Andreas Hasenack
  • Date: 2011-05-05 14:12:15 UTC
  • Revision ID: andreas@canonical.com-20110505141215-5ymuyyh5es9pwa6p
Added hardy files.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
from logging import info, exception
 
2
 
 
3
from twisted.internet.defer import maybeDeferred
 
4
 
 
5
from landscape.log import format_object
 
6
from landscape.lib.twisted_util import gather_results
 
7
 
 
8
 
 
9
class HandlerNotFoundError(Exception):
 
10
    """A handler for the given message type was not found."""
 
11
 
 
12
 
 
13
class BrokerClientPlugin(object):
 
14
    """A convenience for writing L{BrokerClient} plugins.
 
15
 
 
16
    This provides a register method which will set up a bunch of
 
17
    reactor handlers in the idiomatic way.
 
18
 
 
19
    If C{run} is defined on subclasses, it will be called every C{run_interval}
 
20
    seconds after being registered.
 
21
 
 
22
    @cvar run_interval: The interval, in seconds, to execute the C{run} method.
 
23
        If set to C{None}, then C{run} will not be scheduled.
 
24
    @cvar run_immediately: If C{True} the plugin will be run immediately after
 
25
        it is registered.
 
26
    """
 
27
 
 
28
    run_interval = 5
 
29
    run_immediately = False
 
30
 
 
31
    def register(self, client):
 
32
        self.client = client
 
33
        if getattr(self, "run", None) is not None:
 
34
            if self.run_immediately:
 
35
                self.run()
 
36
            if self.run_interval is not None:
 
37
                self.client.reactor.call_every(self.run_interval, self.run)
 
38
 
 
39
    @property
 
40
    def registry(self):
 
41
        """An alias for the C{client} attribute."""
 
42
        return self.client
 
43
 
 
44
 
 
45
class BrokerClient(object):
 
46
    """Basic plugin registry for clients that have to deal with the broker.
 
47
 
 
48
    This knows about the needs of a client when dealing with the Landscape
 
49
    broker, including interest in messages of a particular type delivered
 
50
    by the broker to the client.
 
51
 
 
52
    @cvar name: The name used when registering to the broker, it must be
 
53
        defined by sub-classes.
 
54
    @ivar broker: A reference to a connected L{RemoteBroker}, it must be set
 
55
        by the connecting machinery at service startup.
 
56
    """
 
57
    name = "client"
 
58
 
 
59
    def __init__(self, reactor):
 
60
        """
 
61
        @param reactor: A L{TwistedReactor}.
 
62
        """
 
63
        super(BrokerClient, self).__init__()
 
64
        self.reactor = reactor
 
65
        self.broker = None
 
66
        self._registered_messages = {}
 
67
        self._plugins = []
 
68
        self._plugin_names = {}
 
69
 
 
70
        # Register event handlers
 
71
        self.reactor.call_on("impending-exchange", self.notify_exchange)
 
72
        self.reactor.call_on("broker-reconnect", self.handle_reconnect)
 
73
 
 
74
    def ping(self):
 
75
        """Return C{True}"""
 
76
        return True
 
77
 
 
78
    def add(self, plugin):
 
79
        """Add a plugin.
 
80
 
 
81
        The plugin's C{register} method will be called with this broker client
 
82
        as its argument.
 
83
 
 
84
        If the plugin has a C{plugin_name} attribute, it will be possible to
 
85
        look up the plugin later with L{get_plugin}.
 
86
        """
 
87
        info("Registering plugin %s.", format_object(plugin))
 
88
        self._plugins.append(plugin)
 
89
        if hasattr(plugin, 'plugin_name'):
 
90
            self._plugin_names[plugin.plugin_name] = plugin
 
91
        plugin.register(self)
 
92
 
 
93
    def get_plugins(self):
 
94
        """Get the list of plugins."""
 
95
        return self._plugins[:]
 
96
 
 
97
    def get_plugin(self, name):
 
98
        """Get a particular plugin by name."""
 
99
        return self._plugin_names[name]
 
100
 
 
101
    def register_message(self, type, handler):
 
102
        """
 
103
        Register interest in a particular type of Landscape server->client
 
104
        message.
 
105
 
 
106
        @param type: The type of message to register C{handler} for.
 
107
        @param handler: A callable taking a message as a parameter, called
 
108
            when messages of C{type} are received.
 
109
        @return: A C{Deferred} that will fire when registration completes.
 
110
        """
 
111
        self._registered_messages[type] = handler
 
112
        return self.broker.register_client_accepted_message_type(type)
 
113
 
 
114
    def dispatch_message(self, message):
 
115
        """Run the handler registered for the type of the given message.
 
116
 
 
117
        @return: The return value of the handler, if found.
 
118
        @raises: HandlerNotFoundError if the handler was not found
 
119
        """
 
120
        type = message["type"]
 
121
        handler = self._registered_messages.get(type)
 
122
        if handler is None:
 
123
            raise HandlerNotFoundError(type)
 
124
        try:
 
125
            return handler(message)
 
126
        except:
 
127
            exception("Error running message handler for type %r: %r"
 
128
                      % (type, handler))
 
129
 
 
130
    def message(self, message):
 
131
        """Call C{dispatch_message} for the given C{message}.
 
132
 
 
133
        @return: A boolean indicating if a handler for the message was found.
 
134
        """
 
135
        try:
 
136
            self.dispatch_message(message)
 
137
            return True
 
138
        except HandlerNotFoundError:
 
139
            return False
 
140
 
 
141
    def exchange(self):
 
142
        """Call C{exchange} on all plugins."""
 
143
        for plugin in self.get_plugins():
 
144
            if hasattr(plugin, "exchange"):
 
145
                try:
 
146
                    plugin.exchange()
 
147
                except:
 
148
                    exception("Error during plugin exchange")
 
149
 
 
150
    def notify_exchange(self):
 
151
        """Notify all plugins about an impending exchange."""
 
152
        info("Got notification of impending exchange. Notifying all plugins.")
 
153
        self.exchange()
 
154
 
 
155
    def fire_event(self, event_type, *args, **kwargs):
 
156
        """Fire an event of a given type.
 
157
 
 
158
        @return: A L{Deferred} resulting in a list of returns values of
 
159
            the fired event handlers, in the order they were fired.
 
160
        """
 
161
        if event_type == "message-type-acceptance-changed":
 
162
            message_type = args[0]
 
163
            acceptance = args[1]
 
164
            results = self.reactor.fire((event_type, message_type), acceptance)
 
165
        else:
 
166
            results = self.reactor.fire(event_type, *args, **kwargs)
 
167
        return gather_results([
 
168
            maybeDeferred(lambda x: x, result) for result in results])
 
169
 
 
170
    def handle_reconnect(self):
 
171
        """Called when the connection with the broker is established again.
 
172
 
 
173
        The following needs to be done:
 
174
 
 
175
          - Re-register any previously registered message types, so the broker
 
176
            knows we have interest on them.
 
177
 
 
178
          - Re-register ourselves as client, so the broker knows we exist and
 
179
            will talk to us firing events and dispatching messages.
 
180
        """
 
181
        for type in self._registered_messages:
 
182
            self.broker.register_client_accepted_message_type(type)
 
183
        self.broker.register_client(self.name)
 
184
 
 
185
    def exit(self):
 
186
        """Stop the reactor and exit the process."""
 
187
        # Stop with a short delay to give a chance to reply to the
 
188
        # caller over AMP.
 
189
        self.reactor.call_later(0.1, self.reactor.stop)