1
# -*- coding: utf-8 -*-
2
# -----------------------------------------------------------------------------
3
# Getting Things GNOME! - a personal organizer for the GNOME desktop
4
# Copyright (c) 2008-2013 - Lionel Dricot & Bertrand Rousseau
6
# This program is free software: you can redistribute it and/or modify it under
7
# the terms of the GNU General Public License as published by the Free Software
8
# Foundation, either version 3 of the License, or (at your option) any later
11
# This program is distributed in the hope that it will be useful, but WITHOUT
12
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
16
# You should have received a copy of the GNU General Public License along with
17
# this program. If not, see <http://www.gnu.org/licenses/>.
18
# -----------------------------------------------------------------------------
20
""" Tests for the datastore """
25
from random import randint
26
from gi.repository import GObject
29
from GTG.core.datastore import DataStore
30
from GTG.backends.genericbackend import GenericBackend
31
from GTG.core import CoreConfig
32
from liblarch import Tree
35
def sleep_within_loop(duration):
36
main_loop = GObject.MainLoop()
37
GObject.timeout_add(duration * 1000, main_loop.quit)
38
# NOTE: I am not sure why, but I need add this
39
# dumb thing to run _process method of LibLarch
40
GObject.idle_add(lambda: True)
44
class TestDatastore(unittest.TestCase):
45
""" Tests for the DataStore object. """
49
Creates the environment for the tests
52
self.datastore = DataStore()
53
self.requester = self.datastore.get_requester()
55
def test_task_factory(self):
56
""" Test for the task_factory function """
57
# generate a Task with a random id
58
tid = str(uuid.uuid4())
59
task = self.datastore.task_factory(tid, newtask=True)
60
self.assertTrue(isinstance(task, GTG.core.task.Task))
61
self.assertEqual(task.get_id(), tid)
62
self.assertEqual(task.is_new(), True)
63
tid = str(uuid.uuid4())
64
task = self.datastore.task_factory(tid, newtask=False)
65
self.assertEqual(task.is_new(), False)
67
def test_new_task_and_has_task(self):
68
""" Tests the new_task function """
69
task = self.datastore.new_task()
71
self.assertTrue(isinstance(tid, str))
72
self.assertTrue(tid != '')
73
self.assertTrue(task.is_new())
74
self.assertTrue(self.datastore.has_task(tid))
75
self.assertTrue(len(self.datastore.get_all_tasks()) == 1)
77
def test_get_all_tasks(self):
78
""" Tests the get_all_tasks function """
80
for i in range(1, 10):
81
task = self.datastore.new_task()
82
task_ids.append(task.get_id())
83
return_list = self.datastore.get_all_tasks()
84
self.assertEqual(len(return_list), i)
87
self.assertEqual(task_ids, return_list)
89
def test_get_task(self):
91
Tests the get_task function
93
task = self.datastore.new_task()
94
self.assertTrue(isinstance(self.datastore.get_task(task.get_id()),
96
self.assertEqual(self.datastore.get_task(task.get_id()), task)
98
def test_get_requester(self):
100
Tests the get_requester function
102
requester = self.datastore.get_requester()
103
self.assertTrue(isinstance(requester, GTG.core.requester.Requester))
105
def test_get_tasks_tree(self):
107
Tests the get_tasks_tree function
109
tasks_tree = self.datastore.get_tasks_tree()
110
self.assertTrue(isinstance(tasks_tree, Tree))
112
def test_push_task(self):
114
Tests the push_task function
117
for i in range(1, 10):
118
tid = str(uuid.uuid4())
119
if tid not in task_ids:
121
task = self.datastore.task_factory(tid)
122
return_value1 = self.datastore.push_task(task)
123
self.assertTrue(return_value1)
124
# we do it twice, but it should be pushed only once if it's
125
# working correctly (the second should be discarded)
126
return_value2 = self.datastore.push_task(task)
127
self.assertFalse(return_value2)
128
stored_tasks = self.datastore.get_all_tasks()
131
self.assertEqual(task_ids, stored_tasks)
133
def test_register_backend(self):
135
Tests the register_backend function. It also tests the
136
get_all_backends and get_backend function as a side effect
138
# create a simple backend dictionary
139
backend = FakeBackend(enabled=True)
140
tasks_in_backend_count = randint(1, 20)
141
for temp in range(0, tasks_in_backend_count):
142
backend.fake_add_random_task()
143
backend_dic = {'backend': backend, 'pid': 'a'}
144
self.datastore.register_backend(backend_dic)
145
all_backends = self.datastore.get_all_backends(disabled=True)
146
self.assertEqual(len(all_backends), 1)
147
registered_backend = self.datastore.get_backend(backend.get_id())
148
self.assertEqual(backend.get_id(), registered_backend.get_id())
149
self.assertTrue(isinstance(registered_backend,
150
GTG.core.datastore.TaskSource))
151
self.assertTrue(registered_backend.is_enabled())
152
self.assertEqual(registered_backend.fake_get_initialized_count(), 1)
153
# we give some time for the backend to push all its tasks
155
self.assertEqual(len(self.datastore.get_all_tasks()),
156
tasks_in_backend_count)
158
# same test, disabled backend
159
backend = FakeBackend(enabled=False)
160
for temp in range(1, randint(2, 20)):
161
backend.fake_add_random_task()
162
backend_dic = {'backend': backend, 'pid': 'b'}
163
self.datastore.register_backend(backend_dic)
164
all_backends = self.datastore.get_all_backends(disabled=True)
165
self.assertEqual(len(all_backends), 2)
166
all_backends = self.datastore.get_all_backends(disabled=False)
167
self.assertEqual(len(all_backends), 1)
168
registered_backend = self.datastore.get_backend(backend.get_id())
169
self.assertEqual(backend.get_id(), registered_backend.get_id())
170
self.assertTrue(isinstance(registered_backend,
171
GTG.core.datastore.TaskSource))
172
self.assertFalse(registered_backend.is_enabled())
173
self.assertEqual(registered_backend.fake_get_initialized_count(), 0)
174
# we give some time for the backend to push all its tasks (is
175
# shouldn't, since it's disabled, but we give time anyway
177
self.assertEqual(len(self.datastore.get_all_tasks()),
178
tasks_in_backend_count)
180
def test_set_backend_enabled(self):
182
Tests the set_backend_enabled function
184
enabled_backend = FakeBackend(enabled=True)
185
disabled_backend = FakeBackend(enabled=False)
186
self.datastore.register_backend({'backend': enabled_backend,
187
'pid': str(uuid.uuid4()),
188
GenericBackend.KEY_DEFAULT_BACKEND:
190
self.datastore.register_backend({'backend': disabled_backend,
191
'pid': str(uuid.uuid4()),
192
GenericBackend.KEY_DEFAULT_BACKEND:
194
# enabling an enabled backend
195
self.datastore.set_backend_enabled(enabled_backend.get_id(), True)
196
self.assertEqual(enabled_backend.fake_get_initialized_count(), 1)
197
self.assertTrue(enabled_backend.is_enabled())
198
# disabling a disabled backend
199
self.datastore.set_backend_enabled(disabled_backend.get_id(), False)
200
self.assertEqual(disabled_backend.fake_get_initialized_count(), 0)
201
self.assertFalse(disabled_backend.is_enabled())
202
# disabling an enabled backend
203
self.datastore.set_backend_enabled(enabled_backend.get_id(), False)
204
self.assertEqual(enabled_backend.fake_get_initialized_count(), 1)
206
while countdown >= 0 and enabled_backend.is_enabled():
208
self.assertFalse(enabled_backend.is_enabled())
209
# #enabling a disabled backend
210
# self.datastore.set_backend_enabled(disabled_backend.get_id(), True)
211
# self.assertEqual(disabled_backend.fake_get_initialized_count(), 1)
212
# self.assertTrue(disabled_backend.is_enabled())
214
def test_remove_backend(self):
215
""" Tests the remove_backend function """
216
enabled_backend = FakeBackend(enabled=True)
217
disabled_backend = FakeBackend(enabled=False)
218
self.datastore.register_backend({'backend': enabled_backend,
219
'pid': str(uuid.uuid4()),
220
GenericBackend.KEY_DEFAULT_BACKEND:
222
self.datastore.register_backend({'backend': disabled_backend,
223
'pid': str(uuid.uuid4()),
224
GenericBackend.KEY_DEFAULT_BACKEND:
226
# removing an enabled backend
227
self.datastore.remove_backend(enabled_backend.get_id())
230
while countdown >= 0 and enabled_backend.is_enabled():
232
self.assertFalse(enabled_backend.is_enabled())
234
len(self.datastore.get_all_backends(disabled=True)), 1)
235
# removing a disabled backend
236
self.datastore.remove_backend(disabled_backend.get_id())
237
self.assertFalse(disabled_backend.is_enabled())
239
len(self.datastore.get_all_backends(disabled=True)), 0)
241
def test_flush_all_tasks(self):
243
Tests the flush_all_tasks function
245
# we add some tasks in the datastore
246
tasks_in_datastore_count = 10 # randint(1, 20)
247
for temp in range(0, tasks_in_datastore_count):
248
self.datastore.new_task()
249
datastore_stored_tids = self.datastore.get_all_tasks()
250
self.assertEqual(tasks_in_datastore_count, len(datastore_stored_tids))
252
# we enable a backend
253
backend = FakeBackend(enabled=True)
254
self.datastore.register_backend({'backend': backend, 'pid': 'a'})
255
# we wait for the signal storm to wear off
258
self.datastore.get_backend(backend.get_id()).sync()
259
# and we inject task in the backend
260
tasks_in_backend_count = 5 # randint(1, 20)
261
for temp in range(0, tasks_in_backend_count):
262
backend.fake_add_random_task()
263
backend_stored_tids = backend.fake_get_task_ids()
264
self.assertEqual(tasks_in_backend_count, len(backend_stored_tids))
265
self.datastore.flush_all_tasks(backend.get_id())
266
# we wait for the signal storm to wear off
269
self.datastore.get_backend(backend.get_id()).sync()
270
all_tasks_count = tasks_in_backend_count + tasks_in_datastore_count
271
new_datastore_stored_tids = self.datastore.get_all_tasks()
272
new_backend_stored_tids = backend.fake_get_task_ids()
273
self.assertEqual(len(new_backend_stored_tids), all_tasks_count)
274
self.assertEqual(len(new_datastore_stored_tids), all_tasks_count)
275
new_datastore_stored_tids.sort()
276
new_backend_stored_tids.sort()
277
self.assertEqual(new_backend_stored_tids, new_datastore_stored_tids)
281
return unittest.TestLoader().loadTestsFromTestCase(TestDatastore)
284
class FakeBackend(unittest.TestCase):
286
Mimics the behavior of a simple backend. Just used for testing
289
def __init__(self, enabled=True):
290
self.enabled = enabled
291
self.initialized_count = 0
293
self.backend_id = str(uuid.uuid4())
296
def is_enabled(self):
299
def initialize(self):
300
self.initialized_count += 1
303
def queue_set_task(self, task):
304
if task.get_id() not in self.tasks_ids:
305
self.tasks_ids.append(task.get_id())
307
def has_task(self, task_id):
308
return task_id in self.tasks_ids
310
def queue_remove_task(self, task_id):
311
self.tasks_ids.remove(task_id)
314
return self.backend_id
316
def start_get_tasks(self):
317
for task_id in self.tasks_ids:
318
self.datastore.push_task(self.datastore.task_factory(task_id))
320
def quit(self, disabled=False):
321
self.enabled = not disabled
323
def is_default(self):
326
def set_parameter(self, param_name, param_value):
329
def get_attached_tags(self):
330
return [CoreConfig.ALLTASKS_TAG]
332
def register_datastore(self, datastore):
333
self.datastore = datastore
335
##########################################################################
336
# The following are used just for testing, they're not present inside a
338
##########################################################################
339
def fake_get_initialized_count(self):
340
return self.initialized_count
342
def fake_get_task_ids(self):
343
return self.tasks_ids
345
def fake_add_random_task(self):
346
self.tasks_ids.append(str(uuid.uuid4()))