1
# -.- coding: utf-8 -.-
5
# Copyright © 2009 Mikkel Kamstrup Erlandsen <mikkel.kamstrup@gmail.com>
6
# Copyright © 2009-2010 Markus Korn <thekorn@gmx.de>
7
# Copyright © 2011 Collabora Ltd.
8
# By Siegfried-Angel Gevatter Pujals <siegfried@gevatter.com>
10
# This program is free software: you can redistribute it and/or modify
11
# it under the terms of the GNU Lesser General Public License as published by
12
# the Free Software Foundation, either version 2.1 of the License, or
13
# (at your option) any later version.
15
# This program is distributed in the hope that it will be useful,
16
# but WITHOUT ANY WARRANTY; without even the implied warranty of
17
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18
# GNU Lesser General Public License for more details.
20
# You should have received a copy of the GNU Lesser General Public License
21
# along with this program. If not, see <http://www.gnu.org/licenses/>.
30
from subprocess import Popen, PIPE
34
from dbus.mainloop.glib import DBusGMainLoop
35
DBusGMainLoop(set_as_default=True)
37
# Import local Zeitgeist modules
38
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
39
from zeitgeist.client import ZeitgeistDBusInterface, ZeitgeistClient
40
from zeitgeist.datamodel import Event, Subject, Interpretation, Manifestation, \
43
# Json handling is special in Python 2.5...
47
# maybe the user is using python < 2.6
48
import simplejson as json
52
ev[0][Event.Id] = d.get("id", "").encode("UTF-8")
53
ev.timestamp = str(d.get("timestamp", ""))
54
ev.interpretation = str(d.get("interpretation", "").encode("UTF-8"))
55
ev.manifestation = str(d.get("manifestation", "").encode("UTF-8"))
56
ev.actor = str(d.get("actor", "").encode("UTF-8"))
57
ev.origin = str(d.get("origin", "").encode("UTF-8"))
58
ev.payload = str(d.get("payload", "").encode("UTF-8"))
60
subjects = d.get("subjects", [])
63
subj.uri = str(sd.get("uri", "").encode("UTF-8"))
64
subj.current_uri = str(sd.get("current_uri", "")).encode("UTF-8")
65
subj.interpretation = str(sd.get("interpretation", "").encode("UTF-8"))
66
subj.manifestation = str(sd.get("manifestation", "").encode("UTF-8"))
67
subj.origin = str(sd.get("origin", "").encode("UTF-8"))
68
subj.mimetype = str(sd.get("mimetype", "").encode("UTF-8"))
69
subj.text = str(sd.get("text", "").encode("UTF-8"))
70
subj.storage = str(sd.get("storage", "").encode("UTF-8"))
71
ev.append_subject(subj)
74
def parse_events(path):
75
data = json.load(file(path))
76
events = map(dict2event, data)
79
def import_events(path, engine):
81
Load a collection of JSON event definitions into 'engine'. Fx:
83
import_events("test/data/single_event.js", self.engine)
85
events = parse_events(path)
86
return engine.insertEventsAndWait(events)
88
def asyncTestMethod(mainloop):
90
Any callbacks happening in a MainLoopWithFailure must use
91
this decorator for exceptions raised inside them to be visible.
94
def new_f(*args, **kwargs):
97
except AssertionError, e:
98
mainloop.fail("Assertion failed: %s" % e)
100
mainloop.fail("Unexpected exception: %s" % e)
104
class RemoteTestCase (unittest.TestCase):
106
Helper class to implement unit tests against a
107
remote Zeitgeist process
111
def _get_pid(matching_string):
112
p1 = Popen(["ps", "aux"], stdout=PIPE, stderr=PIPE)
113
p2 = Popen(["grep", matching_string], stdin=p1.stdout, stderr=PIPE, stdout=PIPE)
114
return p2.communicate()[0]
117
def _safe_start_subprocess(cmd, env, timeout=1, error_callback=None):
118
""" starts `cmd` in a subprocess and check after `timeout`
119
if everything goes well"""
120
args = { 'env': env }
121
if not '--verbose-subprocess' in sys.argv:
122
args['stderr'] = PIPE
123
args['stdout'] = PIPE
124
process = Popen(cmd, **args)
125
# give the process some time to wake up
127
error = process.poll()
130
error = "'%s' exits with error %i." %(cmd, error)
132
error += " *** %s" %error_callback(*process.communicate())
133
raise RuntimeError(error)
137
def _safe_start_daemon(env=None, timeout=1):
139
env = os.environ.copy()
141
def error_callback(stdout, stderr):
142
if "--replace" in stderr:
143
return "%r | %s" %(stderr, RemoteTestCase._get_pid(
144
"./src/zeitgeist-daemon").replace("\n", "|"))
148
return RemoteTestCase._safe_start_subprocess(
149
("./src/zeitgeist-daemon", "--no-datahub"), env, timeout, error_callback
152
def __init__(self, methodName):
153
super(RemoteTestCase, self).__init__(methodName)
157
def spawn_daemon(self):
158
self.daemon = self._safe_start_daemon(env=self.env)
160
def kill_daemon(self, kill_signal=signal.SIGKILL):
161
os.kill(self.daemon.pid, kill_signal)
162
return self.daemon.wait()
165
assert self.daemon is None
166
assert self.client is None
167
self.env = os.environ.copy()
168
self.datapath = tempfile.mkdtemp(prefix="zeitgeist.datapath.")
170
"ZEITGEIST_DATABASE_PATH": ":memory:",
171
"ZEITGEIST_DATA_PATH": self.datapath,
172
"XDG_CACHE_HOME": os.path.join(self.datapath, "cache"),
176
# hack to clear the state of the interface
177
ZeitgeistDBusInterface._ZeitgeistDBusInterface__shared_state = {}
178
self.client = ZeitgeistClient()
181
assert self.daemon is not None
182
assert self.client is not None
184
if 'ZEITGEIST_TESTS_KEEP_TMP' in os.environ:
185
print '\n\nAll temporary files have been preserved in %s\n' \
188
shutil.rmtree(self.datapath)
190
def insertEventsAndWait(self, events):
192
Insert a set of events and spin a mainloop until the async reply
193
is back and return the result - which should be a list of ids.
195
This method is basically just a hack to invoke an async method
196
in a blocking manner.
198
mainloop = self.create_mainloop()
201
def collect_ids_and_quit(ids):
205
self.client.insert_events(events,
206
ids_reply_handler=collect_ids_and_quit)
210
def findEventIdsAndWait(self, event_templates, **kwargs):
212
Do search based on event_templates and spin a mainloop until
213
the async reply is back and return the result - which should be
216
This method is basically just a hack to invoke an async method
217
in a blocking manner.
219
mainloop = self.create_mainloop()
222
def collect_ids_and_quit(ids):
226
self.client.find_event_ids_for_templates(event_templates,
227
collect_ids_and_quit,
232
def getEventsAndWait(self, event_ids):
234
Request a set of full events and spin a mainloop until the
235
async reply is back and return the result - which should be a
238
This method is basically just a hack to invoke an async method
239
in a blocking manner.
241
mainloop = self.create_mainloop()
244
def collect_events_and_quit(events):
247
event[0][0] = int(event.id)
248
result.extend(events)
251
self.client.get_events(event_ids, collect_events_and_quit)
255
def findEventsForTemplatesAndWait(self, event_templates, **kwargs):
257
Execute ZeitgeistClient.find_events_for_templates in a blocking manner.
259
mainloop = self.create_mainloop()
262
def collect_events_and_quit(events):
263
result.extend(events)
266
self.client.find_events_for_templates(
267
event_templates, collect_events_and_quit, **kwargs)
271
def findEventsForValuesAndWait(self, *args, **kwargs):
273
Execute ZeitgeistClient.find_events_for_value in a blocking manner.
275
mainloop = self.create_mainloop()
278
def collect_events_and_quit(events):
279
result.extend(events)
282
self.client.find_events_for_values(
283
collect_events_and_quit, *args, **kwargs)
287
def deleteEventsAndWait(self, event_ids):
289
Delete events given by their id and run a loop until the result
290
containing a timetuple describing the interval of changes is
293
This method is basically just a hack to invoke an async method
294
in a blocking manner.
296
mainloop = self.create_mainloop()
299
def collect_timestamp_and_quit(timestamps):
300
result.append(timestamps)
303
self.client.delete_events(event_ids, collect_timestamp_and_quit)
307
def findRelatedAndWait(self, subject_uris, num_events, result_type):
309
Find related subject uris to given uris and return them.
311
This method is basically just a hack to invoke an async method
312
in a blocking manner.
314
mainloop = self.create_mainloop()
317
def callback(uri_list):
318
result.extend(uri_list)
321
self.client.find_related_uris_for_uris(subject_uris, callback,
322
num_events=num_events, result_type=result_type)
327
def create_mainloop(timeout=5):
329
class MainLoopWithFailure(object):
331
Remember to wrap callbacks using the asyncTestMethod decorator.
335
self._mainloop = gobject.MainLoop()
338
def __getattr__(self, name):
339
return getattr(self._mainloop, name)
341
def fail(self, message):
343
self.failure_message = message
347
assert self.failed is False
350
raise AssertionError, self.failure_message
352
mainloop = MainLoopWithFailure()
353
if timeout is not None:
355
mainloop.fail("Timed out -- "
356
"operations not completed in reasonable time.")
357
return False # stop timeout from being called again
359
# Add an arbitrary timeout so this test won't block if it fails
360
gobject.timeout_add_seconds(timeout, cb_timeout)
365
def get_plain_event(ev):
367
Ensure that an Event instance is a Plain Old Python Object (popo),
368
without DBus wrappings etc.
372
for subject in ev.subjects:
373
if not subject.current_uri:
374
subject.current_uri = subject.uri
376
popo.append(map(unicode, ev[0]))
377
popo.append([map(unicode, subj) for subj in ev[1]])
378
# We need the check here so that if D-Bus gives us an empty
379
# byte array we don't serialize the text "dbus.Array(...)".
380
popo.append(str(ev[2]) if ev[2] else u'')
383
def assertEventsEqual(self, ev1, ev2):
384
ev1 = self.get_plain_event(Event(ev1))
385
ev2 = self.get_plain_event(Event(ev2))
386
if ev1 is not NULL_EVENT and ev2 is not NULL_EVENT:
387
if (ev1[0][0] and not ev2[0][0]) or (ev2[0][0] and not ev1[0][0]):
388
ev1[0][0] = ev2[0][0] = "" # delete IDs
389
self.assertEqual(ev1, ev2)
391
class DBusPrivateMessageBus(object):
395
os.environ.update({"DISPLAY": self.DISPLAY})
396
devnull = file("/dev/null", "w")
397
self.display = Popen(
398
["Xvfb", self.DISPLAY, "-screen", "0", "1024x768x8"],
399
stderr=devnull, stdout=devnull
401
# give the display some time to wake up
403
err = self.display.poll()
405
raise RuntimeError("Could not start Xvfb on display %s, got err=%i" %(self.DISPLAY, err))
406
dbus = Popen(["dbus-launch"], stdout=PIPE)
408
self.dbus_config = dict(l.split("=", 1) for l in dbus.communicate()[0].split("\n") if l)
409
os.environ.update(self.dbus_config)
411
def run(self, ignore_errors=False):
420
os.kill(self.display.pid, signal.SIGKILL)
422
pid = int(self.dbus_config["DBUS_SESSION_BUS_PID"])
423
os.kill(pid, signal.SIGKILL)
429
def quit(self, ignore_errors=False):