~saurabhanandiit/gtg/exportFixed

« back to all changes in this revision

Viewing changes to GTG/tests/test_datastore.py

Merge of my work on liblarch newbase and all the backends ported to liblarch
(which mainly means porting the datastore).
One failing test, will check it.

Show diffs side-by-side

added added

removed removed

Lines of Context:
23
23
 
24
24
import unittest
25
25
import uuid
26
 
import random
27
26
import time
 
27
from random import randint
28
28
 
29
29
import GTG
30
30
from GTG.core.datastore import DataStore
32
32
from GTG.core import CoreConfig
33
33
 
34
34
 
35
 
 
36
35
class TestDatastore(unittest.TestCase):
37
36
    '''
38
37
    Tests for the DataStore object.
39
38
    '''
40
39
 
41
 
    
42
40
    def setUp(self):
43
41
        '''
44
42
        Creates the environment for the tests
53
51
        '''
54
52
        #generate a Task with a random id
55
53
        tid = str(uuid.uuid4())
56
 
        task = self.datastore.task_factory(tid, newtask = True)
 
54
        task = self.datastore.task_factory(tid, newtask=True)
57
55
        self.assertTrue(isinstance(task, GTG.core.task.Task))
58
56
        self.assertEqual(task.get_id(), tid)
59
57
        self.assertEqual(task.is_new(), True)
60
58
        tid = str(uuid.uuid4())
61
 
        task = self.datastore.task_factory(tid, newtask = False)
 
59
        task = self.datastore.task_factory(tid, newtask=False)
62
60
        self.assertEqual(task.is_new(), False)
63
61
 
64
62
    def test_new_task_and_has_task(self):
81
79
        for i in xrange(1, 10):
82
80
            task = self.datastore.new_task()
83
81
            task_ids.append(task.get_id())
84
 
            return_list =self.datastore.get_all_tasks()
 
82
            return_list = self.datastore.get_all_tasks()
85
83
            self.assertEqual(len(return_list), i)
86
84
            task_ids.sort()
87
85
            return_list.sort()
97
95
                                   GTG.core.task.Task))
98
96
        self.assertEqual(self.datastore.get_task(task.get_id()), task)
99
97
 
100
 
 
101
 
    def test_get_tagstore(self):
102
 
        '''
103
 
        Tests the get_tagstore function
104
 
        '''
105
 
        tagstore = self.datastore.get_tagstore()
106
 
        self.assertTrue(isinstance(tagstore, GTG.core.tagstore.TagStore))
107
 
 
108
98
    def test_get_requester(self):
109
99
        '''
110
100
        Tests the get_requester function
117
107
        Tests the get_tasks_tree function
118
108
        '''
119
109
        tasks_tree = self.datastore.get_tasks_tree()
120
 
        self.assertTrue(isinstance(tasks_tree, GTG.core.tree.Tree))
 
110
        self.assertTrue(isinstance(tasks_tree, GTG.tools.liblarch.Tree))
121
111
 
122
112
    def test_push_task(self):
123
113
        '''
146
136
        get_all_backends and get_backend function as a side effect
147
137
        '''
148
138
        #create a simple backend dictionary
149
 
        backend = FakeBackend(enabled = True)
150
 
        tasks_in_backend_count = int(random.random() * 20)
 
139
        backend = FakeBackend(enabled=True)
 
140
        tasks_in_backend_count = randint(1, 20)
151
141
        for temp in xrange(0, tasks_in_backend_count):
152
142
            backend.fake_add_random_task()
153
143
        backend_dic = {'backend': backend, 'pid': 'a'}
154
144
        self.datastore.register_backend(backend_dic)
155
 
        all_backends = self.datastore.get_all_backends(disabled = True)
 
145
        all_backends = self.datastore.get_all_backends(disabled=True)
156
146
        self.assertEqual(len(all_backends), 1)
157
147
        registered_backend = self.datastore.get_backend(backend.get_id())
158
148
        self.assertEqual(backend.get_id(), registered_backend.get_id())
166
156
                         tasks_in_backend_count)
167
157
 
168
158
        #same test, disabled backend
169
 
        backend = FakeBackend(enabled = False)
170
 
        for temp in xrange(1, int(random.random() * 20)):
 
159
        backend = FakeBackend(enabled=False)
 
160
        for temp in xrange(1, randint(2, 20)):
171
161
            backend.fake_add_random_task()
172
 
        backend_dic = {'backend': backend, 'pid':'b'}
 
162
        backend_dic = {'backend': backend, 'pid': 'b'}
173
163
        self.datastore.register_backend(backend_dic)
174
 
        all_backends = self.datastore.get_all_backends(disabled = True)
 
164
        all_backends = self.datastore.get_all_backends(disabled=True)
175
165
        self.assertEqual(len(all_backends), 2)
176
 
        all_backends = self.datastore.get_all_backends(disabled = False)
 
166
        all_backends = self.datastore.get_all_backends(disabled=False)
177
167
        self.assertEqual(len(all_backends), 1)
178
168
        registered_backend = self.datastore.get_backend(backend.get_id())
179
169
        self.assertEqual(backend.get_id(), registered_backend.get_id())
191
181
        '''
192
182
        Tests the set_backend_enabled function
193
183
        '''
194
 
        enabled_backend = FakeBackend(enabled = True)
195
 
        disabled_backend = FakeBackend(enabled = False)
 
184
        enabled_backend = FakeBackend(enabled=True)
 
185
        disabled_backend = FakeBackend(enabled=False)
196
186
        self.datastore.register_backend({'backend': enabled_backend, \
197
187
                                'pid': str(uuid.uuid4()), \
198
188
                                GenericBackend.KEY_DEFAULT_BACKEND: False})
216
206
#        self.datastore.set_backend_enabled(disabled_backend.get_id(), True)
217
207
#        self.assertEqual(disabled_backend.fake_get_initialized_count(), 1)
218
208
#        self.assertTrue(disabled_backend.is_enabled())
219
 
            
 
209
 
220
210
    def test_remove_backend(self):
221
211
        '''
222
212
        Tests the remove_backend function
223
213
        '''
224
 
        enabled_backend = FakeBackend(enabled = True)
225
 
        disabled_backend = FakeBackend(enabled = False)
 
214
        enabled_backend = FakeBackend(enabled=True)
 
215
        disabled_backend = FakeBackend(enabled=False)
226
216
        self.datastore.register_backend({'backend': enabled_backend, \
227
217
                                'pid': str(uuid.uuid4()), \
228
218
                                GenericBackend.KEY_DEFAULT_BACKEND: False})
232
222
        #removing an enabled backend
233
223
        self.datastore.remove_backend(enabled_backend.get_id())
234
224
        self.assertFalse(enabled_backend.is_enabled())
235
 
        self.assertTrue(enabled_backend.fake_is_purged())
236
225
        self.assertEqual( \
237
 
            len(self.datastore.get_all_backends(disabled = True)), 1)
 
226
            len(self.datastore.get_all_backends(disabled=True)), 1)
238
227
        #removing a disabled backend
239
228
        self.datastore.remove_backend(disabled_backend.get_id())
240
229
        self.assertFalse(disabled_backend.is_enabled())
241
 
        self.assertTrue(disabled_backend.fake_is_purged())
242
230
        self.assertEqual( \
243
 
            len(self.datastore.get_all_backends(disabled = True)), 0)
 
231
            len(self.datastore.get_all_backends(disabled=True)), 0)
244
232
 
245
233
    def test_flush_all_tasks(self):
246
234
        '''
247
235
        Tests the flush_all_tasks function
248
236
        '''
249
237
        #we add some tasks in the datastore
250
 
        tasks_in_datastore_count = 10 #int(random.random() * 20)
 
238
        tasks_in_datastore_count = 10 #randint(1, 20)
251
239
        for temp in xrange(0, tasks_in_datastore_count):
252
240
            self.datastore.new_task()
253
241
        datastore_stored_tids = self.datastore.get_all_tasks()
254
242
        self.assertEqual(tasks_in_datastore_count, len(datastore_stored_tids))
255
243
 
256
244
        #we enable a backend
257
 
        backend = FakeBackend(enabled = True)
 
245
        backend = FakeBackend(enabled=True)
258
246
        self.datastore.register_backend({'backend': backend, 'pid': 'a'})
259
247
        #we wait for the signal storm to wear off
260
 
        time.sleep(5)
 
248
        time.sleep(2)
261
249
        #we sync
262
250
        self.datastore.get_backend(backend.get_id()).sync()
263
251
        #and we inject task in the backend
264
 
        tasks_in_backend_count = 5 #int(random.random() * 20)
 
252
        tasks_in_backend_count = 5 #randint(1, 20)
265
253
        for temp in xrange(0, tasks_in_backend_count):
266
254
            backend.fake_add_random_task()
267
255
        backend_stored_tids = backend.fake_get_task_ids()
281
269
        self.assertEqual(new_backend_stored_tids, new_datastore_stored_tids)
282
270
 
283
271
 
284
 
 
285
272
def test_suite():
286
 
    CoreConfig().set_data_dir("./test_data")
287
 
    CoreConfig().set_conf_dir("./test_data")
288
273
    return unittest.TestLoader().loadTestsFromTestCase(TestDatastore)
289
274
 
290
275
 
291
 
 
292
276
class FakeBackend(unittest.TestCase):
293
277
    '''
294
278
    Mimics the behavior of a simple backend. Just used for testing
295
279
    '''
296
280
 
297
 
    def __init__(self, enabled = True):
 
281
    def __init__(self, enabled=True):
298
282
        self.enabled = enabled
299
283
        self.initialized_count = 0
300
284
        self.tasks_ids = []
324
308
        for task_id in self.tasks_ids:
325
309
            self.datastore.push_task(self.datastore.task_factory(task_id))
326
310
 
327
 
    def quit(self, disabled = False):
 
311
    def quit(self, disabled=False):
328
312
        self.enabled = not disabled
329
313
 
330
 
    def purge(self):
331
 
        self.purged = True
332
 
 
333
314
    def is_default(self):
334
315
        return True
335
316
 
356
337
    def fake_add_random_task(self):
357
338
        self.tasks_ids.append(str(uuid.uuid4()))
358
339
 
359
 
    def fake_is_purged(self):
360
 
        return self.purged