~jfb-tempo-consulting/unifield-wm/sync-env-py3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#!/usr/bin/env python2

import sys

#Load config file
import config
from config import load_test as load_value

#Load OpenERP Client Library
import openerplib

#from tests import *
from tests.openerplib import db

from scripts.common import Synchro, HQ, Coordo, Project

#Load tests procedures
if sys.version_info >= (2, 7):
    import unittest
else:
    import unittest27 as unittest

from threading import Thread
from timeit import Timer
from math import ceil
from datetime import datetime

try:
    import ipdb as pdb
except:
    import pdb


test_cases = []


class synchronize(object):
    databases = (HQ, Coordo, Project)

    def sync(self, db):
        if not db.get('sync.client.entity').sync():
            monitor = db.get('sync.monitor')
            ids = monitor.search([], 0, 1, '"end" desc')
            self.fail('Synchronization process of database "%s" failed!\n%s' % (db.db_name,monitor.read(ids, ['error'])[0]['error']))

    def sync_all(self):
        for db in self.databases:
            db.connect('admin')
            self.sync(db)


class make_records(unittest.TestCase):
    created_records = []
    id = datetime.now().strftime('%m%d%H%M')
    template = {
        'name' : 'TEST_%s_' % id,
        'amount' : 400.00,
        'amount_currency' : 1.0,
        'document_date' : datetime.now().strftime('%Y-%m-%d'),
    }
    journal = {
        'name' : 'TEST_%s' % id,
    }

    def test_20_make_records(self):
        Project.connect('admin')
        journal_id = Project.get('account.analytic.journal').create(self.journal)
        self.assertTrue(journal_id,
                   "Cannot create journal: %s in db %s" % (self.journal, Project.name))
        self.__class__.created_records.append( ('account.analytic.journal', self.journal) )
        for i in range(load_value):
            data = dict(self.template)
            data['name'] += str(i)
            self.__class__.created_records.append( ('account.analytic.line', dict(data)) )
            data.update({
                'journal_id' : journal_id,
                'currency_id' : Project.get('res.currency').search([('name','=','EUR')], 0, 1)[0],
                'account_id' : Project.get('account.analytic.account').search([('name','=','Dummy')], 0, 1)[0],
                'general_account_id' : Project.get('account.account').search([('name','=','Medical')], 0, 1)[0],
            })
            self.assertTrue(Project.get('account.analytic.line').create(data),
                       "Cannot create record: %s in db %s" % (data, Project.name))


class clean(synchronize, unittest.TestCase):
    def test_10_synchronize(self):
        self.sync_all()

    test_11_synchronize_twice = test_10_synchronize


class check_records(unittest.TestCase):
    def test_10_check_record_existence(self):
        if not make_records.created_records:
            self.skipTest("It seems no record has been created!")
        for model, rec in make_records.created_records:
            for db in (Coordo, HQ):
                self.assertTrue(db.test(model, rec),
                           "Cannot find record: %s in db %s" % (rec, db.name))


class load(synchronize, unittest.TestCase):
    logs = ""

    def measure_synchronize(self, db, message):
        t = Timer( lambda:self.sync(db) )
        delay = t.timeit(1)
        self.__class__.logs += (message+"\n") % (db.name, ceil(delay))

    def test_50_synchronize(self):
        for db in self.databases:
            db.connect('admin')
        self.measure_synchronize(Project, "%s pushed in %d seconds")
        threads = []
        for db in (Coordo, HQ):
            threads.append( Thread(target=lambda:self.measure_synchronize(db,
                "%s pulled in %d seconds")) )
            threads[-1].start()
        for t in threads:
            t.join()

    @classmethod
    def tearDownClass(cls):
        print(cls.logs.strip())

test_cases = [clean,make_records,load,check_records]


def load_tests(loader, tests, pattern):
    suite = unittest.TestSuite()
    for test_class in test_cases:
        tests = loader.loadTestsFromTestCase(test_class)
        suite.addTests(tests)
    return suite

if __name__ == '__main__':
    unittest.main(failfast=True, verbosity=2)