~ubuntu-branches/ubuntu/quantal/zeitgeist/quantal

« back to all changes in this revision

Viewing changes to test/dbus/testutils.py

  • Committer: Package Import Robot
  • Author(s): Didier Roche
  • Date: 2011-11-15 11:15:56 UTC
  • mfrom: (1.1.13)
  • Revision ID: package-import@ubuntu.com-20111115111556-4lmc5wdvjrsdm0ss
Tags: 0.8.99~alpha1-1ubuntu1
Upload to ubuntu the new zeitgeist rewritten in vala

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -.- coding: utf-8 -.-
 
2
 
 
3
# Zeitgeist
 
4
#
 
5
# Copyright © 2009 Mikkel Kamstrup Erlandsen <mikkel.kamstrup@gmail.com>
 
6
# Copyright © 2009-2010 Markus Korn <thekorn@gmx.de>
 
7
# Copyright © 2011 Collabora Ltd.
 
8
#             By Siegfried-Angel Gevatter Pujals <siegfried@gevatter.com>
 
9
#
 
10
# This program is free software: you can redistribute it and/or modify
 
11
# it under the terms of the GNU Lesser General Public License as published by
 
12
# the Free Software Foundation, either version 2.1 of the License, or
 
13
# (at your option) any later version.
 
14
#
 
15
# This program is distributed in the hope that it will be useful,
 
16
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
17
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
18
# GNU Lesser General Public License for more details.
 
19
#
 
20
# You should have received a copy of the GNU Lesser General Public License
 
21
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
22
 
 
23
import unittest
 
24
import os
 
25
import time
 
26
import sys
 
27
import signal
 
28
import tempfile
 
29
import shutil
 
30
from subprocess import Popen, PIPE
 
31
 
 
32
# DBus setup
 
33
import gobject
 
34
from dbus.mainloop.glib import DBusGMainLoop
 
35
DBusGMainLoop(set_as_default=True)
 
36
 
 
37
# Import local Zeitgeist modules
 
38
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
 
39
from zeitgeist.client import ZeitgeistDBusInterface, ZeitgeistClient
 
40
from zeitgeist.datamodel import Event, Subject, Interpretation, Manifestation, \
 
41
        TimeRange, NULL_EVENT
 
42
 
 
43
# Json handling is special in Python 2.5...
 
44
try:
 
45
        import json
 
46
except ImportError:
 
47
        # maybe the user is using python < 2.6
 
48
        import simplejson as json
 
49
 
 
50
def dict2event(d):
 
51
        ev = Event()
 
52
        ev[0][Event.Id] = d.get("id", "").encode("UTF-8")
 
53
        ev.timestamp = str(d.get("timestamp", ""))
 
54
        ev.interpretation = str(d.get("interpretation", "").encode("UTF-8"))
 
55
        ev.manifestation = str(d.get("manifestation", "").encode("UTF-8"))
 
56
        ev.actor = str(d.get("actor", "").encode("UTF-8"))
 
57
        ev.origin = str(d.get("origin", "").encode("UTF-8"))
 
58
        ev.payload = str(d.get("payload", "").encode("UTF-8"))
 
59
        
 
60
        subjects = d.get("subjects", [])
 
61
        for sd in subjects:
 
62
                subj = Subject()
 
63
                subj.uri = str(sd.get("uri", "").encode("UTF-8"))
 
64
                subj.current_uri = str(sd.get("current_uri", "")).encode("UTF-8")
 
65
                subj.interpretation = str(sd.get("interpretation", "").encode("UTF-8"))
 
66
                subj.manifestation = str(sd.get("manifestation", "").encode("UTF-8"))
 
67
                subj.origin = str(sd.get("origin", "").encode("UTF-8"))
 
68
                subj.mimetype = str(sd.get("mimetype", "").encode("UTF-8"))
 
69
                subj.text = str(sd.get("text", "").encode("UTF-8"))
 
70
                subj.storage = str(sd.get("storage", "").encode("UTF-8"))
 
71
                ev.append_subject(subj)
 
72
        return ev
 
73
        
 
74
def parse_events(path):
 
75
        data = json.load(file(path))
 
76
        events = map(dict2event, data)
 
77
        return events
 
78
 
 
79
def import_events(path, engine):
 
80
        """
 
81
        Load a collection of JSON event definitions into 'engine'. Fx:
 
82
        
 
83
                import_events("test/data/single_event.js", self.engine)
 
84
        """
 
85
        events = parse_events(path)
 
86
        return engine.insertEventsAndWait(events)
 
87
 
 
88
def asyncTestMethod(mainloop):
 
89
        """
 
90
        Any callbacks happening in a MainLoopWithFailure must use
 
91
        this decorator for exceptions raised inside them to be visible.
 
92
        """
 
93
        def wrap(f):
 
94
                def new_f(*args, **kwargs):
 
95
                        try:
 
96
                                f(*args, **kwargs)
 
97
                        except AssertionError, e:
 
98
                                mainloop.fail("Assertion failed: %s" % e)
 
99
                        except Exception, e:
 
100
                                mainloop.fail("Unexpected exception: %s" % e)
 
101
                return new_f
 
102
        return wrap
 
103
 
 
104
class RemoteTestCase (unittest.TestCase):
 
105
        """
 
106
        Helper class to implement unit tests against a
 
107
        remote Zeitgeist process
 
108
        """
 
109
        
 
110
        @staticmethod
 
111
        def _get_pid(matching_string):
 
112
                p1 = Popen(["ps", "aux"], stdout=PIPE, stderr=PIPE)
 
113
                p2 = Popen(["grep", matching_string], stdin=p1.stdout, stderr=PIPE, stdout=PIPE)
 
114
                return p2.communicate()[0]
 
115
                
 
116
        @staticmethod
 
117
        def _safe_start_subprocess(cmd, env, timeout=1, error_callback=None):
 
118
                """ starts `cmd` in a subprocess and check after `timeout`
 
119
                if everything goes well"""
 
120
                args = { 'env': env }
 
121
                if not '--verbose-subprocess' in sys.argv:
 
122
                        args['stderr'] = PIPE
 
123
                        args['stdout'] = PIPE
 
124
                process = Popen(cmd, **args)
 
125
                # give the process some time to wake up
 
126
                time.sleep(timeout)
 
127
                error = process.poll()
 
128
                if error:
 
129
                        cmd = " ".join(cmd)
 
130
                        error = "'%s' exits with error %i." %(cmd, error)
 
131
                        if error_callback:
 
132
                                error += " *** %s" %error_callback(*process.communicate())
 
133
                        raise RuntimeError(error)
 
134
                return process
 
135
                
 
136
        @staticmethod
 
137
        def _safe_start_daemon(env=None, timeout=1):
 
138
                if env is None:
 
139
                        env = os.environ.copy()
 
140
                        
 
141
                def error_callback(stdout, stderr):
 
142
                        if "--replace" in stderr:
 
143
                                return "%r | %s" %(stderr, RemoteTestCase._get_pid(
 
144
                                        "./src/zeitgeist-daemon").replace("\n", "|"))
 
145
                        else:
 
146
                                return stderr
 
147
                        
 
148
                return RemoteTestCase._safe_start_subprocess(
 
149
                        ("./src/zeitgeist-daemon", "--no-datahub"), env, timeout, error_callback
 
150
                )
 
151
        
 
152
        def __init__(self, methodName):
 
153
                super(RemoteTestCase, self).__init__(methodName)
 
154
                self.daemon = None
 
155
                self.client = None
 
156
        
 
157
        def spawn_daemon(self):
 
158
                self.daemon = self._safe_start_daemon(env=self.env)
 
159
        
 
160
        def kill_daemon(self, kill_signal=signal.SIGKILL):
 
161
                os.kill(self.daemon.pid, kill_signal)
 
162
                return self.daemon.wait()
 
163
                
 
164
        def setUp(self):
 
165
                assert self.daemon is None
 
166
                assert self.client is None
 
167
                self.env = os.environ.copy()
 
168
                self.datapath = tempfile.mkdtemp(prefix="zeitgeist.datapath.")
 
169
                self.env.update({
 
170
                        "ZEITGEIST_DATABASE_PATH": ":memory:",
 
171
                        "ZEITGEIST_DATA_PATH": self.datapath,
 
172
                        "XDG_CACHE_HOME": os.path.join(self.datapath, "cache"),
 
173
                })
 
174
                self.spawn_daemon()
 
175
                
 
176
                # hack to clear the state of the interface
 
177
                ZeitgeistDBusInterface._ZeitgeistDBusInterface__shared_state = {}
 
178
                self.client = ZeitgeistClient()
 
179
        
 
180
        def tearDown(self):
 
181
                assert self.daemon is not None
 
182
                assert self.client is not None
 
183
                self.kill_daemon()
 
184
                if 'ZEITGEIST_TESTS_KEEP_TMP' in os.environ:
 
185
                        print '\n\nAll temporary files have been preserved in %s\n' \
 
186
                                % self.datapath
 
187
                else:
 
188
                        shutil.rmtree(self.datapath)
 
189
        
 
190
        def insertEventsAndWait(self, events):
 
191
                """
 
192
                Insert a set of events and spin a mainloop until the async reply
 
193
                is back and return the result - which should be a list of ids.
 
194
                
 
195
                This method is basically just a hack to invoke an async method
 
196
                in a blocking manner.
 
197
                """
 
198
                mainloop = self.create_mainloop()
 
199
                result = []
 
200
                
 
201
                def collect_ids_and_quit(ids):
 
202
                        result.extend(ids)
 
203
                        mainloop.quit()
 
204
                        
 
205
                self.client.insert_events(events,
 
206
                                        ids_reply_handler=collect_ids_and_quit)
 
207
                mainloop.run()
 
208
                return result
 
209
        
 
210
        def findEventIdsAndWait(self, event_templates, **kwargs):
 
211
                """
 
212
                Do search based on event_templates and spin a mainloop until
 
213
                the async reply is back and return the result - which should be
 
214
                a list of ids.
 
215
                
 
216
                This method is basically just a hack to invoke an async method
 
217
                in a blocking manner.
 
218
                """
 
219
                mainloop = self.create_mainloop()
 
220
                result = []
 
221
                
 
222
                def collect_ids_and_quit(ids):
 
223
                        result.extend(ids)
 
224
                        mainloop.quit()
 
225
                        
 
226
                self.client.find_event_ids_for_templates(event_templates,
 
227
                                                        collect_ids_and_quit,
 
228
                                                        **kwargs)
 
229
                mainloop.run()
 
230
                return result
 
231
        
 
232
        def getEventsAndWait(self, event_ids):
 
233
                """
 
234
                Request a set of full events and spin a mainloop until the
 
235
                async reply is back and return the result - which should be a
 
236
                list of Events.
 
237
                
 
238
                This method is basically just a hack to invoke an async method
 
239
                in a blocking manner.
 
240
                """
 
241
                mainloop = self.create_mainloop()
 
242
                result = []
 
243
                
 
244
                def collect_events_and_quit(events):
 
245
                        for event in events:
 
246
                                if event:
 
247
                                        event[0][0] = int(event.id)
 
248
                        result.extend(events)
 
249
                        mainloop.quit()
 
250
                        
 
251
                self.client.get_events(event_ids, collect_events_and_quit)
 
252
                mainloop.run()
 
253
                return result
 
254
        
 
255
        def findEventsForTemplatesAndWait(self, event_templates, **kwargs):
 
256
                """
 
257
                Execute ZeitgeistClient.find_events_for_templates in a blocking manner.
 
258
                """
 
259
                mainloop = self.create_mainloop()
 
260
                result = []
 
261
                
 
262
                def collect_events_and_quit(events):
 
263
                        result.extend(events)
 
264
                        mainloop.quit()
 
265
                
 
266
                self.client.find_events_for_templates(
 
267
                        event_templates, collect_events_and_quit, **kwargs)
 
268
                mainloop.run()
 
269
                return result
 
270
 
 
271
        def findEventsForValuesAndWait(self, *args, **kwargs):
 
272
                """
 
273
                Execute ZeitgeistClient.find_events_for_value in a blocking manner.
 
274
                """
 
275
                mainloop = self.create_mainloop()
 
276
                result = []
 
277
                
 
278
                def collect_events_and_quit(events):
 
279
                        result.extend(events)
 
280
                        mainloop.quit()
 
281
                
 
282
                self.client.find_events_for_values(
 
283
                        collect_events_and_quit, *args, **kwargs)
 
284
                mainloop.run()
 
285
                return result
 
286
        
 
287
        def deleteEventsAndWait(self, event_ids):
 
288
                """
 
289
                Delete events given by their id and run a loop until the result 
 
290
                containing a timetuple describing the interval of changes is
 
291
                returned.
 
292
                
 
293
                This method is basically just a hack to invoke an async method
 
294
                in a blocking manner.
 
295
                """
 
296
                mainloop = self.create_mainloop()
 
297
                result = []
 
298
                
 
299
                def collect_timestamp_and_quit(timestamps):
 
300
                        result.append(timestamps)
 
301
                        mainloop.quit()
 
302
                
 
303
                self.client.delete_events(event_ids, collect_timestamp_and_quit)
 
304
                mainloop.run()
 
305
                return result[0]
 
306
                
 
307
        def findRelatedAndWait(self, subject_uris, num_events, result_type):
 
308
                """
 
309
                Find related subject uris to given uris and return them.
 
310
                
 
311
                This method is basically just a hack to invoke an async method
 
312
                in a blocking manner.
 
313
                """
 
314
                mainloop = self.create_mainloop()
 
315
                result = []
 
316
                
 
317
                def callback(uri_list):
 
318
                        result.extend(uri_list)
 
319
                        mainloop.quit()
 
320
                
 
321
                self.client.find_related_uris_for_uris(subject_uris, callback,
 
322
                        num_events=num_events, result_type=result_type)
 
323
                mainloop.run()
 
324
                return result
 
325
        
 
326
        @staticmethod
 
327
        def create_mainloop(timeout=5):
 
328
                
 
329
                class MainLoopWithFailure(object):
 
330
                        """
 
331
                        Remember to wrap callbacks using the asyncTestMethod decorator.
 
332
                        """
 
333
                        
 
334
                        def __init__(self):
 
335
                                self._mainloop = gobject.MainLoop()
 
336
                                self.failed = False
 
337
                        
 
338
                        def __getattr__(self, name):
 
339
                                return getattr(self._mainloop, name)
 
340
                        
 
341
                        def fail(self, message):
 
342
                                self.failed = True
 
343
                                self.failure_message = message
 
344
                                mainloop.quit()
 
345
                        
 
346
                        def run(self):
 
347
                                assert self.failed is False
 
348
                                self._mainloop.run()
 
349
                                if self.failed:
 
350
                                        raise AssertionError, self.failure_message
 
351
                
 
352
                mainloop = MainLoopWithFailure()
 
353
                if timeout is not None:
 
354
                        def cb_timeout():
 
355
                                mainloop.fail("Timed out -- "
 
356
                                        "operations not completed in reasonable time.")
 
357
                                return False # stop timeout from being called again
 
358
                        
 
359
                        # Add an arbitrary timeout so this test won't block if it fails
 
360
                        gobject.timeout_add_seconds(timeout, cb_timeout)
 
361
                
 
362
                return mainloop
 
363
        
 
364
        @staticmethod
 
365
        def get_plain_event(ev):
 
366
                """
 
367
                Ensure that an Event instance is a Plain Old Python Object (popo),
 
368
                without DBus wrappings etc.
 
369
                """
 
370
                if not ev:
 
371
                        return NULL_EVENT
 
372
                for subject in ev.subjects:
 
373
                        if not subject.current_uri:
 
374
                                subject.current_uri = subject.uri
 
375
                popo = []
 
376
                popo.append(map(unicode, ev[0]))
 
377
                popo.append([map(unicode, subj) for subj in ev[1]])
 
378
                # We need the check here so that if D-Bus gives us an empty
 
379
                # byte array we don't serialize the text "dbus.Array(...)".
 
380
                popo.append(str(ev[2]) if ev[2] else u'')
 
381
                return popo
 
382
        
 
383
        def assertEventsEqual(self, ev1, ev2):
 
384
                ev1 = self.get_plain_event(Event(ev1))
 
385
                ev2 = self.get_plain_event(Event(ev2))
 
386
                if ev1 is not NULL_EVENT and ev2 is not NULL_EVENT:
 
387
                        if (ev1[0][0] and not ev2[0][0]) or (ev2[0][0] and not ev1[0][0]):
 
388
                                ev1[0][0] = ev2[0][0] = "" # delete IDs
 
389
                self.assertEqual(ev1, ev2)
 
390
 
 
391
class DBusPrivateMessageBus(object):
 
392
        DISPLAY = ":27"
 
393
 
 
394
        def _run(self):
 
395
                os.environ.update({"DISPLAY": self.DISPLAY})
 
396
                devnull = file("/dev/null", "w")
 
397
                self.display = Popen(
 
398
                        ["Xvfb", self.DISPLAY, "-screen", "0", "1024x768x8"],
 
399
                        stderr=devnull, stdout=devnull
 
400
                )
 
401
                # give the display some time to wake up
 
402
                time.sleep(1)
 
403
                err = self.display.poll()
 
404
                if err:
 
405
                        raise RuntimeError("Could not start Xvfb on display %s, got err=%i" %(self.DISPLAY, err))
 
406
                dbus = Popen(["dbus-launch"], stdout=PIPE)
 
407
                time.sleep(1)
 
408
                self.dbus_config = dict(l.split("=", 1) for l in dbus.communicate()[0].split("\n") if l)
 
409
                os.environ.update(self.dbus_config)
 
410
                
 
411
        def run(self, ignore_errors=False):
 
412
                try:
 
413
                        return self._run()
 
414
                except Exception, e:
 
415
                        if ignore_errors:
 
416
                                return e
 
417
                        raise
 
418
 
 
419
        def _quit(self):
 
420
                os.kill(self.display.pid, signal.SIGKILL)
 
421
                self.display.wait()
 
422
                pid = int(self.dbus_config["DBUS_SESSION_BUS_PID"])
 
423
                os.kill(pid, signal.SIGKILL)
 
424
                try:
 
425
                        os.waitpid(pid, 0)
 
426
                except OSError:
 
427
                        pass
 
428
                        
 
429
        def quit(self, ignore_errors=False):
 
430
                try:
 
431
                        return self._quit()
 
432
                except Exception, e:
 
433
                        if ignore_errors:
 
434
                                return e
 
435
                        raise