4
from cStringIO import StringIO
6
from twisted.internet import reactor
7
from twisted.internet.defer import Deferred, fail
9
from landscape.lib.lock import lock_path
11
from landscape.deployment import Configuration
12
from landscape.broker.remote import RemoteBroker
14
from landscape.package.taskhandler import PackageTaskHandler, run_task_handler
15
from landscape.package.facade import SmartFacade
16
from landscape.package.store import PackageStore
17
from landscape.package.tests.helpers import SmartFacadeHelper
19
from landscape.tests.helpers import (
20
LandscapeIsolatedTest, LandscapeTest, RemoteBrokerHelper)
21
from landscape.tests.mocker import ANY, ARGS, MATCH
24
def ISTYPE(match_type):
25
return MATCH(lambda arg: type(arg) is match_type)
28
class PackageTaskHandlerTest(LandscapeIsolatedTest):
30
helpers = [SmartFacadeHelper, RemoteBrokerHelper]
33
super(PackageTaskHandlerTest, self).setUp()
35
self.store = PackageStore(self.makeFile())
37
self.handler = PackageTaskHandler(self.store, self.facade, self.remote)
39
def test_ensure_channels_reloaded(self):
40
self.assertEquals(len(self.facade.get_packages()), 0)
41
self.handler.ensure_channels_reloaded()
42
self.assertEquals(len(self.facade.get_packages()), 3)
44
# Calling it once more won't reload channels again.
45
self.facade.get_packages_by_name("name1")[0].installed = True
46
self.handler.ensure_channels_reloaded()
47
self.assertTrue(self.facade.get_packages_by_name("name1")[0].installed)
50
handler_mock = self.mocker.patch(self.handler)
51
handler_mock.handle_tasks()
52
self.mocker.result("WAYO!")
56
self.assertEquals(self.handler.run(), "WAYO!")
58
def test_handle_tasks(self):
59
queue_name = PackageTaskHandler.queue_name
61
self.store.add_task(queue_name, 0)
62
self.store.add_task(queue_name, 1)
63
self.store.add_task(queue_name, 2)
65
results = [Deferred() for i in range(3)]
68
def handle_task(task):
69
result = results[task.data]
70
result.addCallback(lambda x: stash.append(task.data))
73
handler_mock = self.mocker.patch(self.handler)
74
handler_mock.handle_task(ANY)
75
self.mocker.call(handle_task)
79
handle_tasks_result = self.handler.handle_tasks()
81
self.assertEquals(stash, [])
83
results[1].callback(None)
84
self.assertEquals(stash, [])
85
self.assertEquals(self.store.get_next_task(queue_name).data, 0)
87
results[0].callback(None)
88
self.assertEquals(stash, [0, 1])
89
self.assertFalse(handle_tasks_result.called)
90
self.assertEquals(self.store.get_next_task(queue_name).data, 2)
92
results[2].callback(None)
93
self.assertEquals(stash, [0, 1, 2])
94
self.assertTrue(handle_tasks_result.called)
95
self.assertEquals(self.store.get_next_task(queue_name), None)
97
handle_tasks_result = self.handler.handle_tasks()
98
self.assertTrue(handle_tasks_result.called)
100
def test_handle_tasks_hooks_errback(self):
101
queue_name = PackageTaskHandler.queue_name
103
self.store.add_task(queue_name, 0)
105
class MyException(Exception): pass
107
def handle_task(task):
109
result.errback(MyException())
112
handler_mock = self.mocker.patch(self.handler)
113
handler_mock.handle_task(ANY)
114
self.mocker.call(handle_task)
118
handle_tasks_result = self.handler.handle_tasks()
119
handle_tasks_result.addErrback(stash.append)
121
self.assertEquals(len(stash), 1)
122
self.assertEquals(stash[0].type, MyException)
124
def test_default_handle_task(self):
125
result = self.handler.handle_task(None)
126
self.assertTrue(isinstance(result, Deferred))
127
self.assertTrue(result.called)
129
def test_run_task_handler(self):
131
# This is a slightly lengthy one, so bear with me.
133
data_path = self.makeDir()
135
# Prepare the mock objects.
136
lock_path_mock = self.mocker.replace("landscape.lib.lock.lock_path",
138
install_mock = self.mocker.replace("twisted.internet."
139
"glib2reactor.install")
140
reactor_mock = self.mocker.replace("twisted.internet.reactor",
142
init_logging_mock = self.mocker.replace("landscape.deployment"
145
HandlerMock = self.mocker.proxy(PackageTaskHandler)
147
# The goal of this method is to perform a sequence of tasks
148
# where the ordering is important.
151
# As the very first thing, install the twisted glib2 reactor
152
# so that we can use DBUS safely.
155
# Then, we must acquire a lock as the same task handler should
156
# never have two instances running in parallel. The 'default'
157
# below comes from the queue_name attribute.
158
lock_path_mock(os.path.join(data_path, "package/default.lock"))
160
# Once locking is done, it's safe to start logging without
161
# corrupting the file. We don't want any output unless it's
162
# breaking badly, so the quiet option should be set.
163
init_logging_mock(ISTYPE(Configuration), "package-default")
165
# Then, it must create an instance of the TaskHandler subclass
166
# passed in as a parameter. We'll keep track of the arguments
167
# given and verify them later.
169
handler_mock = HandlerMock(ANY, ANY, ANY)
170
self.mocker.passthrough() # Let the real constructor run for testing.
171
self.mocker.call(lambda *args: handler_args.extend(args))
173
# Finally, the task handler must be run, and will return a deferred.
174
# We'll return a real deferred so that we can call it back and test
175
# whatever was hooked in as well.
176
deferred = Deferred()
178
self.mocker.result(deferred)
180
# With all of that done, the Twisted reactor must be run, so that
181
# deferred tasks are correctly performed.
184
self.mocker.unorder()
186
# The following tasks are hooked in as callbacks of our deferred.
187
# We must use callLater() so that stop() won't happen before run().
188
reactor_mock.callLater(0, "STOP METHOD")
190
self.mocker.result("STOP METHOD")
192
# We also expect the umask to be set appropriately before running the
194
umask = self.mocker.replace("os.umask")
197
# Okay, the whole playground is set.
202
result = run_task_handler(HandlerMock,
203
["--data-path", data_path,
206
# reactor.stop() wasn't run yet, so it must fail right now.
207
self.assertRaises(AssertionError, self.mocker.verify)
209
# DO THE REST OF IT! :-)
210
result.callback(None)
215
# Put reactor back in place before returning.
218
store, facade, broker = handler_args
220
# Verify if the arguments to the reporter constructor were correct.
221
self.assertEquals(type(store), PackageStore)
222
self.assertEquals(type(facade), SmartFacade)
223
self.assertEquals(type(broker), RemoteBroker)
225
# Let's see if the store path is where it should be.
226
filename = os.path.join(data_path, "package/database")
227
store.add_available([1, 2, 3])
228
other_store = PackageStore(filename)
229
self.assertEquals(other_store.get_available(), [1, 2, 3])
231
def test_run_task_handler_when_already_locked(self):
232
data_path = self.makeDir()
234
install_mock = self.mocker.replace("twisted.internet."
235
"glib2reactor.install")
240
os.mkdir(os.path.join(data_path, "package"))
241
lock_path(os.path.join(data_path, "package/default.lock"))
244
run_task_handler(PackageTaskHandler, ["--data-path", data_path])
245
except SystemExit, e:
246
self.assertIn("default is already running", str(e))
248
self.fail("SystemExit not raised")
250
def test_run_task_handler_when_already_locked_and_quiet_option(self):
251
data_path = self.makeDir()
253
install_mock = self.mocker.replace("twisted.internet."
254
"glib2reactor.install")
259
os.mkdir(os.path.join(data_path, "package"))
260
lock_path(os.path.join(data_path, "package/default.lock"))
263
run_task_handler(PackageTaskHandler,
264
["--data-path", data_path, "--quiet"])
265
except SystemExit, e:
266
self.assertEquals(str(e), "")
268
self.fail("SystemExit not raised")
270
def test_errors_in_tasks_are_printed_and_exit_program(self):
271
# Ignore a bunch of crap that we don't care about
272
install_mock = self.mocker.replace("twisted.internet."
273
"glib2reactor.install")
275
reactor_mock = self.mocker.proxy(reactor)
276
init_logging_mock = self.mocker.replace("landscape.deployment"
279
init_logging_mock(ARGS)
282
# Get a deferred which will fire when the reactor is stopped, so the
283
# test runs until the reactor is stopped.
285
self.expect(reactor_mock.stop()).call(lambda: done.callback(None))
287
class MyException(Exception): pass
289
self.log_helper.ignore_errors(MyException)
291
# Simulate a task handler which errors out.
292
handler_factory_mock = self.mocker.proxy(PackageTaskHandler)
293
handler_mock = handler_factory_mock(ARGS)
294
self.expect(handler_mock.run()).result(fail(MyException("Hey error")))
298
# Ok now for some real stuff
300
result = run_task_handler(handler_factory_mock,
301
["--data-path", self.data_path,
303
reactor=reactor_mock)
305
def everything_stopped(result):
306
self.assertIn("MyException", self.logfile.getvalue())
308
return done.addCallback(everything_stopped)