5
from twisted.internet.defer import succeed, Deferred
7
from landscape.lib.lock import lock_path, LockError
8
from landscape.lib.log import log_failure
9
from landscape.lib.lsb_release import LSB_RELEASE_FILENAME, parse_lsb_release
10
from landscape.reactor import TwistedReactor
11
from landscape.deployment import Configuration, init_logging
12
from landscape.package.store import PackageStore, InvalidHashIdDb
13
from landscape.broker.amp import RemoteBrokerConnector
16
class PackageTaskError(Exception):
17
"""Raised when a task hasn't been successfully completed."""
20
class PackageTaskHandlerConfiguration(Configuration):
21
"""Specialized configuration for L{PackageTaskHandler}s."""
24
def package_directory(self):
25
"""Get the path to the package directory."""
26
return os.path.join(self.data_path, "package")
29
def store_filename(self):
30
"""Get the path to the SQlite file for the L{PackageStore}."""
31
return os.path.join(self.package_directory, "database")
34
def hash_id_directory(self):
35
"""Get the path to the directory holding the stock hash-id stores."""
36
return os.path.join(self.package_directory, "hash-id")
39
def smart_update_stamp_filename(self):
40
"""Get the path to the smart-update stamp file."""
41
return os.path.join(self.package_directory, "smart-update-stamp")
44
class LazyRemoteBroker(object):
45
"""Wrapper class around L{RemoteBroker} providing lazy initialization.
47
This class is a wrapper around a regular L{RemoteBroker}. It connects to
48
the remote broker object only when one of its attributes is first accessed.
50
@param connector: The L{RemoteBrokerConnector} which will be used
51
to connect to the broker.
53
@note: This behaviour is needed in particular by the ReleaseUpgrader and
54
the PackageChanger, because if the they connect early and the
55
landscape-client package gets upgraded while they run, they will lose
56
the connection and will not be able to reconnect for a potentially long
57
window of time (till the new landscape-client package version is fully
58
configured and the service is started again).
61
def __init__(self, connector):
62
self._connector = connector
65
def __getattr__(self, method):
68
return getattr(self._remote, method)
70
def wrapper(*args, **kwargs):
72
def got_connection(remote):
74
return getattr(self._remote, method)(*args, **kwargs)
76
result = self._connector.connect()
77
return result.addCallback(got_connection)
82
class PackageTaskHandler(object):
84
config_factory = PackageTaskHandlerConfiguration
86
queue_name = "default"
87
lsb_release_filename = LSB_RELEASE_FILENAME
89
def __init__(self, package_store, package_facade, remote_broker, config):
90
self._store = package_store
91
self._facade = package_facade
92
self._broker = remote_broker
97
return self.handle_tasks()
99
def handle_tasks(self):
100
"""Handle the tasks in the queue.
102
The tasks will be handed over one by one to L{handle_task} until the
103
queue is empty or a task fails.
105
@see: L{handle_tasks}
107
return self._handle_next_task(None)
109
def _handle_next_task(self, result, last_task=None):
110
"""Pick the next task from the queue and pass it to C{handle_task}."""
112
if last_task is not None:
113
# Last task succeeded. We can safely kill it now.
117
task = self._store.get_next_task(self.queue_name)
120
# We have another task. Let's handle it.
121
result = self.handle_task(task)
122
result.addCallback(self._handle_next_task, last_task=task)
123
result.addErrback(self._handle_task_failure)
127
# No more tasks! We're done!
130
def _handle_task_failure(self, failure):
131
"""Gracefully handle a L{PackageTaskError} and stop handling tasks."""
132
failure.trap(PackageTaskError)
134
def handle_task(self, task):
135
"""Handle a sigle task.
137
Sub-classes must override this method in order to trigger task-specific
140
This method must return a L{Deferred} firing the task result. If the
141
deferred is successful the task will be removed from the queue and the
142
next one will be picked. If the task can't be completed, this method
143
must raise a L{PackageTaskError}, in this case the handler will stop
144
processing tasks and the failed task won't be removed from the queue.
149
def handled_tasks_count(self):
151
Return the number of tasks that have been successfully handled so far.
155
def use_hash_id_db(self):
157
Attach the appropriate pre-canned hash=>id database to our store.
160
def use_it(hash_id_db_filename):
162
if hash_id_db_filename is None:
163
# Couldn't determine which hash=>id database to use,
164
# just ignore the failure and go on
167
if not os.path.exists(hash_id_db_filename):
168
# The appropriate database isn't there, but nevermind
173
self._store.add_hash_id_db(hash_id_db_filename)
174
except InvalidHashIdDb:
175
# The appropriate database is there but broken,
176
# let's remove it and go on
177
logging.warning("Invalid hash=>id database %s" %
179
os.remove(hash_id_db_filename)
182
result = self._determine_hash_id_db_filename()
183
result.addCallback(use_it)
186
def _determine_hash_id_db_filename(self):
187
"""Build up the filename of the hash=>id database to use.
189
@return: a deferred resulting in the filename to use or C{None}
193
def got_server_uuid(server_uuid):
195
warning = "Couldn't determine which hash=>id database to use: %s"
197
if server_uuid is None:
198
logging.warning(warning % "server UUID not available")
202
lsb_release_info = parse_lsb_release(self.lsb_release_filename)
203
except IOError, error:
204
logging.warning(warning % str(error))
207
codename = lsb_release_info["code-name"]
209
logging.warning(warning % "missing code-name key in %s" %
210
self.lsb_release_filename)
213
arch = self._facade.get_arch()
215
# The Smart code should always return a proper string, so this
216
# branch shouldn't get executed at all. However this check is
217
# kept as an extra paranoia sanity check.
218
logging.warning(warning % "unknown dpkg architecture")
221
return os.path.join(self._config.hash_id_directory,
222
"%s_%s_%s" % (server_uuid, codename, arch))
224
result = self._broker.get_server_uuid()
225
result.addCallback(got_server_uuid)
229
def run_task_handler(cls, args, reactor=None):
230
# please only pass reactor when you have totally mangled everything with
231
# mocker. Otherwise bad things will happen.
233
reactor = TwistedReactor()
235
config = cls.config_factory()
238
for directory in [config.package_directory, config.hash_id_directory]:
239
if not os.path.isdir(directory):
242
program_name = cls.queue_name
243
lock_filename = os.path.join(config.package_directory,
244
program_name + ".lock")
246
lock_path(lock_filename)
250
raise SystemExit("error: package %s is already running"
253
words = re.findall("[A-Z][a-z]+", cls.__name__)
254
init_logging(config, "-".join(word.lower() for word in words))
256
# Setup our umask for Smart to use, this needs to setup file permissions to
260
# Delay importing of the facade so that we don't
261
# import Smart unless we need to.
262
from landscape.package.facade import SmartFacade
264
package_store = PackageStore(config.store_filename)
265
package_facade = SmartFacade()
268
connector.disconnect()
269
# For some obscure reason our TwistedReactor.stop method calls
270
# reactor.crash() instead of reactor.stop(), which doesn't work
271
# here. Maybe TwistedReactor.stop should simply use reactor.stop().
272
reactor.call_later(0, reactor._reactor.stop)
274
def got_error(failure):
278
connector = RemoteBrokerConnector(reactor, config, retry_on_reconnect=True)
279
remote = LazyRemoteBroker(connector)
280
handler = cls(package_store, package_facade, remote, config)
282
result.addCallback(lambda x: handler.run())
283
result.addCallback(lambda x: finish())
284
result.addErrback(got_error)
285
reactor.call_when_running(lambda: result.callback(None))