~jfb-tempo-consulting/unifield-wm/sync-env-py3

« back to all changes in this revision

Viewing changes to load.py

  • Committer: Samus CTO
  • Date: 2012-10-18 11:18:44 UTC
  • Revision ID: cto@openerp.com-20121018111844-u0ji3g3yuu6hkkna
[IMP] Load script test

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/env python2
 
2
 
 
3
import sys
 
4
 
 
5
#Load config file
 
6
import config
 
7
from config import load_test as load_value
 
8
 
 
9
#Load OpenERP Client Library
 
10
import openerplib
 
11
 
 
12
#from tests import *
 
13
from tests.openerplib import db
 
14
 
 
15
from scripts.common import Synchro, HQ, Coordo, Project
 
16
 
 
17
#Load tests procedures
 
18
if sys.version_info >= (2, 7):
 
19
    import unittest
 
20
else:
 
21
    import unittest27 as unittest
 
22
 
 
23
from threading import Thread
 
24
from timeit import Timer
 
25
from math import ceil
 
26
from datetime import datetime
 
27
 
 
28
try:
 
29
    import ipdb as pdb
 
30
except:
 
31
    import pdb
 
32
 
 
33
 
 
34
test_cases = []
 
35
 
 
36
 
 
37
class synchronize(object):
 
38
    databases = (HQ, Coordo, Project)
 
39
 
 
40
    def sync(self, db):
 
41
        if not db.get('sync.client.entity').sync():
 
42
            monitor = db.get('sync.monitor')
 
43
            ids = monitor.search([], 0, 1, '"end" desc')
 
44
            self.fail('Synchronization process of database "%s" failed!\n%s' % (db.db_name,monitor.read(ids, ['error'])[0]['error']))
 
45
 
 
46
    def sync_all(self):
 
47
        for db in self.databases:
 
48
            db.connect('admin')
 
49
            self.sync(db)
 
50
 
 
51
 
 
52
class make_records(unittest.TestCase):
 
53
    created_records = []
 
54
    id = datetime.now().strftime('%m%d%H%M')
 
55
    template = {
 
56
        'name' : 'TEST_%s_' % id,
 
57
        'amount' : 400.00,
 
58
        'amount_currency' : 1.0,
 
59
        'document_date' : datetime.now().strftime('%Y-%m-%d'),
 
60
    }
 
61
    journal = {
 
62
        'name' : 'TEST_%s' % id,
 
63
    }
 
64
 
 
65
    def test_20_make_records(self):
 
66
        Project.connect('admin')
 
67
        journal_id = Project.get('account.analytic.journal').create(self.journal)
 
68
        self.assertTrue(journal_id,
 
69
                   "Cannot create journal: %s in db %s" % (self.journal, Project.name))
 
70
        self.__class__.created_records.append( ('account.analytic.journal', self.journal) )
 
71
        for i in range(load_value):
 
72
            data = dict(self.template)
 
73
            data['name'] += str(i)
 
74
            self.__class__.created_records.append( ('account.analytic.line', dict(data)) )
 
75
            data.update({
 
76
                'journal_id' : journal_id,
 
77
                'currency_id' : Project.get('res.currency').search([('name','=','EUR')], 0, 1)[0],
 
78
                'account_id' : Project.get('account.analytic.account').search([('name','=','Dummy')], 0, 1)[0],
 
79
                'general_account_id' : Project.get('account.account').search([('name','=','Medical')], 0, 1)[0],
 
80
            })
 
81
            self.assertTrue(Project.get('account.analytic.line').create(data),
 
82
                       "Cannot create record: %s in db %s" % (data, Project.name))
 
83
 
 
84
 
 
85
class clean(synchronize, unittest.TestCase):
 
86
    def test_10_synchronize(self):
 
87
        self.sync_all()
 
88
 
 
89
    test_11_synchronize_twice = test_10_synchronize
 
90
 
 
91
 
 
92
class check_records(unittest.TestCase):
 
93
    def test_10_check_record_existence(self):
 
94
        if not make_records.created_records:
 
95
            self.skipTest("It seems no record has been created!")
 
96
        for model, rec in make_records.created_records:
 
97
            for db in (Coordo, HQ):
 
98
                self.assertTrue(db.test(model, rec),
 
99
                           "Cannot find record: %s in db %s" % (rec, db.name))
 
100
 
 
101
 
 
102
class load(synchronize, unittest.TestCase):
 
103
    logs = ""
 
104
 
 
105
    def measure_synchronize(self, db, message):
 
106
        t = Timer( lambda:self.sync(db) )
 
107
        delay = t.timeit(1)
 
108
        self.__class__.logs += (message+"\n") % (db.name, ceil(delay))
 
109
 
 
110
    def test_50_synchronize(self):
 
111
        for db in self.databases:
 
112
            db.connect('admin')
 
113
        self.measure_synchronize(Project, "%s pushed in %d seconds")
 
114
        threads = []
 
115
        for db in (Coordo, HQ):
 
116
            threads.append( Thread(target=lambda:self.measure_synchronize(db,
 
117
                "%s pulled in %d seconds")) )
 
118
            threads[-1].start()
 
119
        for t in threads:
 
120
            t.join()
 
121
 
 
122
    @classmethod
 
123
    def tearDownClass(cls):
 
124
        print cls.logs.strip()
 
125
 
 
126
test_cases = [clean,make_records,load,check_records]
 
127
 
 
128
 
 
129
def load_tests(loader, tests, pattern):
 
130
    suite = unittest.TestSuite()
 
131
    for test_class in test_cases:
 
132
        tests = loader.loadTestsFromTestCase(test_class)
 
133
        suite.addTests(tests)
 
134
    return suite
 
135
 
 
136
if __name__ == '__main__':
 
137
    unittest.main(failfast=True, verbosity=2)
 
138