7
from config import load_test as load_value
9
#Load OpenERP Client Library
13
from tests.openerplib import db
15
from scripts.common import Synchro, HQ, Coordo, Project
17
#Load tests procedures
18
if sys.version_info >= (2, 7):
21
import unittest27 as unittest
23
from threading import Thread
24
from timeit import Timer
26
from datetime import datetime
37
class synchronize(object):
38
databases = (HQ, Coordo, Project)
41
if not db.get('sync.client.entity').sync():
42
monitor = db.get('sync.monitor')
43
ids = monitor.search([], 0, 1, '"end" desc')
44
self.fail('Synchronization process of database "%s" failed!\n%s' % (db.db_name,monitor.read(ids, ['error'])[0]['error']))
47
for db in self.databases:
52
class make_records(unittest.TestCase):
54
id = datetime.now().strftime('%m%d%H%M')
56
'name' : 'TEST_%s_' % id,
58
'amount_currency' : 1.0,
59
'document_date' : datetime.now().strftime('%Y-%m-%d'),
62
'name' : 'TEST_%s' % id,
65
def test_20_make_records(self):
66
Project.connect('admin')
67
journal_id = Project.get('account.analytic.journal').create(self.journal)
68
self.assertTrue(journal_id,
69
"Cannot create journal: %s in db %s" % (self.journal, Project.name))
70
self.__class__.created_records.append( ('account.analytic.journal', self.journal) )
71
for i in range(load_value):
72
data = dict(self.template)
73
data['name'] += str(i)
74
self.__class__.created_records.append( ('account.analytic.line', dict(data)) )
76
'journal_id' : journal_id,
77
'currency_id' : Project.get('res.currency').search([('name','=','EUR')], 0, 1)[0],
78
'account_id' : Project.get('account.analytic.account').search([('name','=','Dummy')], 0, 1)[0],
79
'general_account_id' : Project.get('account.account').search([('name','=','Medical')], 0, 1)[0],
81
self.assertTrue(Project.get('account.analytic.line').create(data),
82
"Cannot create record: %s in db %s" % (data, Project.name))
85
class clean(synchronize, unittest.TestCase):
86
def test_10_synchronize(self):
89
test_11_synchronize_twice = test_10_synchronize
92
class check_records(unittest.TestCase):
93
def test_10_check_record_existence(self):
94
if not make_records.created_records:
95
self.skipTest("It seems no record has been created!")
96
for model, rec in make_records.created_records:
97
for db in (Coordo, HQ):
98
self.assertTrue(db.test(model, rec),
99
"Cannot find record: %s in db %s" % (rec, db.name))
102
class load(synchronize, unittest.TestCase):
105
def measure_synchronize(self, db, message):
106
t = Timer( lambda:self.sync(db) )
108
self.__class__.logs += (message+"\n") % (db.name, ceil(delay))
110
def test_50_synchronize(self):
111
for db in self.databases:
113
self.measure_synchronize(Project, "%s pushed in %d seconds")
115
for db in (Coordo, HQ):
116
threads.append( Thread(target=lambda:self.measure_synchronize(db,
117
"%s pulled in %d seconds")) )
123
def tearDownClass(cls):
124
print cls.logs.strip()
126
test_cases = [clean,make_records,load,check_records]
129
def load_tests(loader, tests, pattern):
130
suite = unittest.TestSuite()
131
for test_class in test_cases:
132
tests = loader.loadTestsFromTestCase(test_class)
133
suite.addTests(tests)
136
if __name__ == '__main__':
137
unittest.main(failfast=True, verbosity=2)