~ahasenack/landscape-client/landscape-client-11.02-0ubuntu0.8.04.1

« back to all changes in this revision

Viewing changes to landscape/package/taskhandler.py

  • Committer: Andreas Hasenack
  • Date: 2011-05-05 14:12:15 UTC
  • Revision ID: andreas@canonical.com-20110505141215-5ymuyyh5es9pwa6p
Added hardy files.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import os
 
2
import re
 
3
import logging
 
4
 
 
5
from twisted.internet.defer import succeed, Deferred
 
6
 
 
7
from landscape.lib.lock import lock_path, LockError
 
8
from landscape.lib.log import log_failure
 
9
from landscape.lib.lsb_release import LSB_RELEASE_FILENAME, parse_lsb_release
 
10
from landscape.reactor import TwistedReactor
 
11
from landscape.deployment import Configuration, init_logging
 
12
from landscape.package.store import PackageStore, InvalidHashIdDb
 
13
from landscape.broker.amp import RemoteBrokerConnector
 
14
 
 
15
 
 
16
class PackageTaskError(Exception):
 
17
    """Raised when a task hasn't been successfully completed."""
 
18
 
 
19
 
 
20
class PackageTaskHandlerConfiguration(Configuration):
 
21
    """Specialized configuration for L{PackageTaskHandler}s."""
 
22
 
 
23
    @property
 
24
    def package_directory(self):
 
25
        """Get the path to the package directory."""
 
26
        return os.path.join(self.data_path, "package")
 
27
 
 
28
    @property
 
29
    def store_filename(self):
 
30
        """Get the path to the SQlite file for the L{PackageStore}."""
 
31
        return os.path.join(self.package_directory, "database")
 
32
 
 
33
    @property
 
34
    def hash_id_directory(self):
 
35
        """Get the path to the directory holding the stock hash-id stores."""
 
36
        return os.path.join(self.package_directory, "hash-id")
 
37
 
 
38
    @property
 
39
    def smart_update_stamp_filename(self):
 
40
        """Get the path to the smart-update stamp file."""
 
41
        return os.path.join(self.package_directory, "smart-update-stamp")
 
42
 
 
43
 
 
44
class LazyRemoteBroker(object):
 
45
    """Wrapper class around L{RemoteBroker} providing lazy initialization.
 
46
 
 
47
    This class is a wrapper around a regular L{RemoteBroker}. It connects to
 
48
    the remote broker object only when one of its attributes is first accessed.
 
49
 
 
50
    @param connector: The L{RemoteBrokerConnector} which will be used
 
51
        to connect to the broker.
 
52
 
 
53
    @note: This behaviour is needed in particular by the ReleaseUpgrader and
 
54
        the PackageChanger, because if the they connect early and the
 
55
        landscape-client package gets upgraded while they run, they will lose
 
56
        the connection and will not be able to reconnect for a potentially long
 
57
        window of time (till the new landscape-client package version is fully
 
58
        configured and the service is started again).
 
59
    """
 
60
 
 
61
    def __init__(self, connector):
 
62
        self._connector = connector
 
63
        self._remote = None
 
64
 
 
65
    def __getattr__(self, method):
 
66
 
 
67
        if self._remote:
 
68
            return getattr(self._remote, method)
 
69
 
 
70
        def wrapper(*args, **kwargs):
 
71
 
 
72
            def got_connection(remote):
 
73
                self._remote = remote
 
74
                return getattr(self._remote, method)(*args, **kwargs)
 
75
 
 
76
            result = self._connector.connect()
 
77
            return result.addCallback(got_connection)
 
78
 
 
79
        return wrapper
 
80
 
 
81
 
 
82
class PackageTaskHandler(object):
 
83
 
 
84
    config_factory = PackageTaskHandlerConfiguration
 
85
 
 
86
    queue_name = "default"
 
87
    lsb_release_filename = LSB_RELEASE_FILENAME
 
88
 
 
89
    def __init__(self, package_store, package_facade, remote_broker, config):
 
90
        self._store = package_store
 
91
        self._facade = package_facade
 
92
        self._broker = remote_broker
 
93
        self._config = config
 
94
        self._count = 0
 
95
 
 
96
    def run(self):
 
97
        return self.handle_tasks()
 
98
 
 
99
    def handle_tasks(self):
 
100
        """Handle the tasks in the queue.
 
101
 
 
102
        The tasks will be handed over one by one to L{handle_task} until the
 
103
        queue is empty or a task fails.
 
104
 
 
105
        @see: L{handle_tasks}
 
106
        """
 
107
        return self._handle_next_task(None)
 
108
 
 
109
    def _handle_next_task(self, result, last_task=None):
 
110
        """Pick the next task from the queue and pass it to C{handle_task}."""
 
111
 
 
112
        if last_task is not None:
 
113
            # Last task succeeded.  We can safely kill it now.
 
114
            last_task.remove()
 
115
            self._count += 1
 
116
 
 
117
        task = self._store.get_next_task(self.queue_name)
 
118
 
 
119
        if task:
 
120
            # We have another task.  Let's handle it.
 
121
            result = self.handle_task(task)
 
122
            result.addCallback(self._handle_next_task, last_task=task)
 
123
            result.addErrback(self._handle_task_failure)
 
124
            return result
 
125
 
 
126
        else:
 
127
            # No more tasks!  We're done!
 
128
            return succeed(None)
 
129
 
 
130
    def _handle_task_failure(self, failure):
 
131
        """Gracefully handle a L{PackageTaskError} and stop handling tasks."""
 
132
        failure.trap(PackageTaskError)
 
133
 
 
134
    def handle_task(self, task):
 
135
        """Handle a sigle task.
 
136
 
 
137
        Sub-classes must override this method in order to trigger task-specific
 
138
        actions.
 
139
 
 
140
        This method must return a L{Deferred} firing the task result. If the
 
141
        deferred is successful the task will be removed from the queue and the
 
142
        next one will be picked. If the task can't be completed, this method
 
143
        must raise a L{PackageTaskError}, in this case the handler will stop
 
144
        processing tasks and the failed task won't be removed from the queue.
 
145
        """
 
146
        return succeed(None)
 
147
 
 
148
    @property
 
149
    def handled_tasks_count(self):
 
150
        """
 
151
        Return the number of tasks that have been successfully handled so far.
 
152
        """
 
153
        return self._count
 
154
 
 
155
    def use_hash_id_db(self):
 
156
        """
 
157
        Attach the appropriate pre-canned hash=>id database to our store.
 
158
        """
 
159
 
 
160
        def use_it(hash_id_db_filename):
 
161
 
 
162
            if hash_id_db_filename is None:
 
163
                # Couldn't determine which hash=>id database to use,
 
164
                # just ignore the failure and go on
 
165
                return
 
166
 
 
167
            if not os.path.exists(hash_id_db_filename):
 
168
                # The appropriate database isn't there, but nevermind
 
169
                # and just go on
 
170
                return
 
171
 
 
172
            try:
 
173
                self._store.add_hash_id_db(hash_id_db_filename)
 
174
            except InvalidHashIdDb:
 
175
                # The appropriate database is there but broken,
 
176
                # let's remove it and go on
 
177
                logging.warning("Invalid hash=>id database %s" %
 
178
                                hash_id_db_filename)
 
179
                os.remove(hash_id_db_filename)
 
180
                return
 
181
 
 
182
        result = self._determine_hash_id_db_filename()
 
183
        result.addCallback(use_it)
 
184
        return result
 
185
 
 
186
    def _determine_hash_id_db_filename(self):
 
187
        """Build up the filename of the hash=>id database to use.
 
188
 
 
189
        @return: a deferred resulting in the filename to use or C{None}
 
190
            in case of errors.
 
191
        """
 
192
 
 
193
        def got_server_uuid(server_uuid):
 
194
 
 
195
            warning = "Couldn't determine which hash=>id database to use: %s"
 
196
 
 
197
            if server_uuid is None:
 
198
                logging.warning(warning % "server UUID not available")
 
199
                return None
 
200
 
 
201
            try:
 
202
                lsb_release_info = parse_lsb_release(self.lsb_release_filename)
 
203
            except IOError, error:
 
204
                logging.warning(warning % str(error))
 
205
                return None
 
206
            try:
 
207
                codename = lsb_release_info["code-name"]
 
208
            except KeyError:
 
209
                logging.warning(warning % "missing code-name key in %s" %
 
210
                                self.lsb_release_filename)
 
211
                return None
 
212
 
 
213
            arch = self._facade.get_arch()
 
214
            if arch is None:
 
215
                # The Smart code should always return a proper string, so this
 
216
                # branch shouldn't get executed at all. However this check is
 
217
                # kept as an extra paranoia sanity check.
 
218
                logging.warning(warning % "unknown dpkg architecture")
 
219
                return None
 
220
 
 
221
            return os.path.join(self._config.hash_id_directory,
 
222
                                "%s_%s_%s" % (server_uuid, codename, arch))
 
223
 
 
224
        result = self._broker.get_server_uuid()
 
225
        result.addCallback(got_server_uuid)
 
226
        return result
 
227
 
 
228
 
 
229
def run_task_handler(cls, args, reactor=None):
 
230
    # please only pass reactor when you have totally mangled everything with
 
231
    # mocker. Otherwise bad things will happen.
 
232
    if reactor is None:
 
233
        reactor = TwistedReactor()
 
234
 
 
235
    config = cls.config_factory()
 
236
    config.load(args)
 
237
 
 
238
    for directory in [config.package_directory, config.hash_id_directory]:
 
239
        if not os.path.isdir(directory):
 
240
            os.mkdir(directory)
 
241
 
 
242
    program_name = cls.queue_name
 
243
    lock_filename = os.path.join(config.package_directory,
 
244
                                 program_name + ".lock")
 
245
    try:
 
246
        lock_path(lock_filename)
 
247
    except LockError:
 
248
        if config.quiet:
 
249
            raise SystemExit()
 
250
        raise SystemExit("error: package %s is already running"
 
251
                         % program_name)
 
252
 
 
253
    words = re.findall("[A-Z][a-z]+", cls.__name__)
 
254
    init_logging(config, "-".join(word.lower() for word in words))
 
255
 
 
256
    # Setup our umask for Smart to use, this needs to setup file permissions to
 
257
    # 0644 so...
 
258
    os.umask(022)
 
259
 
 
260
    # Delay importing of the facade so that we don't
 
261
    # import Smart unless we need to.
 
262
    from landscape.package.facade import SmartFacade
 
263
 
 
264
    package_store = PackageStore(config.store_filename)
 
265
    package_facade = SmartFacade()
 
266
 
 
267
    def finish():
 
268
        connector.disconnect()
 
269
        # For some obscure reason our TwistedReactor.stop method calls
 
270
        # reactor.crash() instead of reactor.stop(), which doesn't work
 
271
        # here. Maybe TwistedReactor.stop should simply use reactor.stop().
 
272
        reactor.call_later(0, reactor._reactor.stop)
 
273
 
 
274
    def got_error(failure):
 
275
        log_failure(failure)
 
276
        finish()
 
277
 
 
278
    connector = RemoteBrokerConnector(reactor, config, retry_on_reconnect=True)
 
279
    remote = LazyRemoteBroker(connector)
 
280
    handler = cls(package_store, package_facade, remote, config)
 
281
    result = Deferred()
 
282
    result.addCallback(lambda x: handler.run())
 
283
    result.addCallback(lambda x: finish())
 
284
    result.addErrback(got_error)
 
285
    reactor.call_when_running(lambda: result.callback(None))
 
286
    reactor.run()
 
287
 
 
288
    return result