17
14
from zeitgeist.client import ZeitgeistDBusInterface, ZeitgeistClient
18
15
from zeitgeist.datamodel import (Event, Subject, Interpretation, Manifestation,
19
16
TimeRange, StorageState)
22
19
from testutils import parse_events
24
21
class ZeitgeistRemoteAPITest(testutils.RemoteTestCase):
163
160
self.assertEquals(2, len(result))
162
def testMonitorDeleteNonExistingEvent(self):
164
mainloop = gobject.MainLoop()
165
events = parse_events("test/data/five_events.js")
168
# We want this timeout - we should not get informed
169
# about deletions of non-existing events
173
def notify_insert_handler(time_range, events):
174
event_ids = map(lambda ev : ev.id, events)
175
self.client.delete_events([9999999])
177
def notify_delete_handler(time_range, event_ids):
179
self.fail("Notified about deletion of non-existing events %s", events)
182
self.client.install_monitor(TimeRange(125, 145), [],
183
notify_insert_handler, notify_delete_handler)
185
gobject.timeout_add_seconds(5, timeout)
186
self.client.insert_events(events)
189
def testTwoMonitorsDeleteEvents(self):
192
mainloop = gobject.MainLoop()
193
events = parse_events("test/data/five_events.js")
197
self.fail("Test case timed out")
201
if len(result1) == 2 and len(result2) == 2:
204
def notify_insert_handler1(time_range, events):
205
event_ids = map(lambda ev : ev.id, events)
206
self.client.delete_events(event_ids)
208
def notify_delete_handler1(time_range, event_ids):
209
result1.extend(event_ids)
212
def notify_delete_handler2(time_range, event_ids):
213
result2.extend(event_ids)
216
self.client.install_monitor(TimeRange(125, 145), [],
217
notify_insert_handler1, notify_delete_handler1)
219
self.client.install_monitor(TimeRange(125, 145), [],
220
lambda x, y: x, notify_delete_handler2)
222
self.client.insert_events(events)
223
gobject.timeout_add_seconds (5, timeout)
226
self.assertEquals(2, len(result1))
227
self.assertEquals(2, len(result1))
165
229
def testMonitorInstallRemoval(self):
167
231
mainloop = gobject.MainLoop()
225
289
def callback(uris):
227
self.assertEquals(uris, ["i2", "i1", "i5", "i3"])
291
self.assertEquals(uris, ["i2", "i1", "i3", "i5"])
229
293
result = self.client.find_related_uris_for_uris(["i4"], callback, num_events=4, result_type=0)
242
306
result = self.client.find_events_for_values(callback, actor="firefox", num_events=1)
309
def testDataSourcesRegistry(self):
310
""" Ensure that the DataSourceRegistry extension is there. If we'd want
311
to do any actual value checking we need to change testutils.py to
312
use a ZEITGEIST_DATA_PATH other than ~/.local/share. """
313
iface = ZeitgeistDBusInterface()
314
registry = iface.get_extension("DataSourceRegistry", "data_source_registry")
315
registry.GetDataSources()
246
317
if __name__ == "__main__":