~jfb-tempo-consulting/unifield-wm/sync-env-py3

« back to all changes in this revision

Viewing changes to tests/openerplib.py

  • Committer: Samus CTO
  • Date: 2012-08-28 12:38:31 UTC
  • Revision ID: cto@openerp.com-20120828123831-9dhavvjktnrl2p8d
[INIT]

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
from os import path
 
2
from time import sleep
 
3
import re
 
4
 
 
5
import csv
 
6
 
 
7
from xmlrpclib import Fault
 
8
openerplib = __import__('openerplib')
 
9
 
 
10
import config
 
11
 
 
12
try:
 
13
    import ipdb as pdb
 
14
except:
 
15
    import pdb
 
16
 
 
17
class db(object):
 
18
    def __init__(self, server, db_name, user=None, password=None, server_password=None):
 
19
        self.server = server
 
20
        try:
 
21
            self.server_password = config.db_password
 
22
        except:
 
23
            self.server_password = None
 
24
        finally:
 
25
            if server_password: self.server_password = server_password
 
26
        self.db_name = db_name
 
27
        self.service = server.get_service('db')
 
28
        if user: self.connect(user, password)
 
29
 
 
30
    def exec_workflow(self, model, method, ids):
 
31
        return self.server.get_service('object').exec_workflow(
 
32
            self.server.database, self.server.user_id, self.server.password,
 
33
            model, method, ids)
 
34
 
 
35
    def connect(self, login, password):
 
36
        self.server = openerplib.get_connection(hostname=self.server.connector.hostname, port=self.server.connector.port, database=self.db_name, login=login, password=password)
 
37
        return self
 
38
 
 
39
    def create_db(self, password, demo=False, lang='en_US', wait=False):
 
40
        if not self.server_password: raise Exception, "The server password is needed for this operation"
 
41
        self.id = self.service.create(self.server_password, self.db_name, demo, 'en_US', password)
 
42
        if wait: self.wait()
 
43
        return self
 
44
 
 
45
    create = create_db
 
46
 
 
47
    def wait(self):
 
48
        if not self.server_password: raise Exception, "The server password is needed for this operation"
 
49
        while True:
 
50
            try:
 
51
                running = self.service.get_progress(self.server_password, self.id)
 
52
                if not running: break
 
53
                sleep(1)
 
54
            except:
 
55
                break
 
56
        return self
 
57
 
 
58
    def drop(self):
 
59
        if not self.server_password: raise Exception, "The server password is needed for this operation"
 
60
        ## TODO does'n handle database current access prevent dropping
 
61
        #self.service.drop(self.server_password, self.db_name)
 
62
        try:
 
63
            self.service.drop(self.server_password, self.db_name)
 
64
        except Fault, e:
 
65
            if not re.search(r"database ([\"']).+\1 does not exist", e.faultCode): raise e
 
66
        finally:
 
67
            return self
 
68
 
 
69
    def get(self, model):
 
70
        return self.server.get_model(model)
 
71
 
 
72
    def wizard(self, model, data):
 
73
        return wizard(self, model, data)
 
74
 
 
75
    def ref(self, xml_id, index=0):
 
76
        return ref(xml_id).get(self)
 
77
    #def ref(self, model, xml_id, index=0):
 
78
    #    return ref(model, xml_id).get(self)
 
79
 
 
80
    def module(self, name, index=0):
 
81
        return module(self, name, index)
 
82
 
 
83
    def action_done(self, action_id):
 
84
        proxy = self.get('ir.actions.todo')
 
85
        ids = proxy.search([('action_id', '=', action_id)])
 
86
        if ids: proxy.write(ids, {'state' : 'done'})
 
87
 
 
88
    def user(self, login):
 
89
        return user(self, login)
 
90
 
 
91
    def group(self, name):
 
92
        return group(self, name)
 
93
 
 
94
    def search_data(self, model, data, domain=None, action=None, active='active', pdb=False):
 
95
        if isinstance(data, dict):
 
96
            searchDomain = [(field,'=',value) for (field,value) in data.items() if value]
 
97
        elif isinstance(data, (list,tuple)):
 
98
            searchDomain = list(data)
 
99
        else:
 
100
            raise NotImplementedError("Cannot transform %s (of type %s) to a domain (list)" % (data, type(data).__name__))
 
101
        if domain:
 
102
            searchDomain += list(domain)
 
103
        if active == 'unactive': searchDomain.append(('active','=',0))
 
104
        elif active == 'both': searchDomain.extend(['|',('active','=',0),('active','=',1)])
 
105
        elif not active == 'active': raise Exception, "'active' prameter must be one of these: active, unactive or both"
 
106
        proxy = self.get(model)
 
107
        ids = proxy.search(searchDomain)
 
108
        if not ids and pdb:
 
109
            pdb.set_trace()
 
110
        if action == 'unlink': return proxy.unlink(ids) if ids else True
 
111
        elif action == 'test': return bool(ids)
 
112
        else: return ids
 
113
        
 
114
   
 
115
    test = lambda self, model, data, **o: self.search_data(model, data, action='test', **o)
 
116
    clean = lambda self, model, data, **o: self.search_data(model, data, action='unlink', **o)
 
117
 
 
118
    def write(self, model, ids_or_domain, data):
 
119
        if all([isinstance(x, (tuple, list)) for x in ids_or_domain]):
 
120
            ids = self.get(model).search(ids_or_domain)
 
121
        elif isinstance(ids_or_domain, dict):
 
122
            ids = self.search_data(model, ids_or_domain)
 
123
        else:
 
124
            ids = ids_or_domain
 
125
        if ids and not self.get(model).write(ids, data):
 
126
            return 0
 
127
        return len(ids)
 
128
 
 
129
    def activate(self, model, domain, state=1):
 
130
        proxy = self.get(model)
 
131
        search_domain = [('active','=',1-state)] + domain
 
132
        ids = proxy.search(search_domain)
 
133
        proxy.write(ids, {'active':state})
 
134
        return len(ids)
 
135
 
 
136
    def desactivate(self, model, domain):
 
137
        return self.activate(model, domain, 0)
 
138
 
 
139
    def load_datas(self, filepath):
 
140
        datas = []
 
141
        if path.isfile(filepath):
 
142
            if filepath[-4:] == '.csv':
 
143
                ch = csv.reader(open(filepath, 'rb'), quotechar='"', delimiter=',')
 
144
                fields = ch.next()
 
145
                datetime_re = re.compile(r"^\d\d\d\d-\d\d-\d\d( \d\d:\d\d:\d\d)?$")
 
146
                for values in ch:
 
147
                    evaluated_values = []
 
148
                    for v in values:
 
149
                        try:
 
150
                            if datetime_re.match(v): raise Exception
 
151
                            evaluated_values += [eval(v)]
 
152
                        except:
 
153
                            evaluated_values += [eval('"""'+v+'"""')]
 
154
                    datas.append(dict(zip(fields, evaluated_values)))
 
155
            elif filepath[-4:] == '.xml':
 
156
                raise Exception, 'While importing XML file: Not yet implemented'
 
157
            else:
 
158
                raise Exception, 'Cannot import file "%s": unrecognized file type!' % (filepath,)
 
159
        else:
 
160
            raise IOError, 'Can\'t access to file "%s", check read permission!' % (filepath,)
 
161
        return datas
 
162
 
 
163
    def Import(self, model, datas, mode='init'):
 
164
        if type(datas) == dict:
 
165
            fields = datas.keys()
 
166
            datas = [datas.values()]
 
167
        elif not type(datas) == list:
 
168
            raise Exception, 'Cannot import data: datas must be a list of dict!'
 
169
        else:
 
170
            fields = []
 
171
            for i in datas:
 
172
                fields = set(list(fields) + i.keys())
 
173
            fields = list(fields)
 
174
            old_datas = datas
 
175
            datas = []
 
176
            for i in old_datas:
 
177
                datas.append([i.get(field, None) for field in fields])
 
178
        for data in datas:
 
179
            for i, v in enumerate(data):
 
180
                data[i] = str(data[i])
 
181
                #if v == None: data[i] = False
 
182
                #elif type(v) == bool: data[i] = 1 if v else 0
 
183
        if 'reconcile note' in fields: ipdb.set_trace()
 
184
        result, rows, warning_msg, dummy = self.get(model).import_data(fields, datas, mode)
 
185
        if result == -1:
 
186
            raise Exception, "Unable to import data: "+str(warning_msg)
 
187
        return self
 
188
 
 
189
class ref(object):
 
190
    def __init__(self, xml_id, index=0):
 
191
        self.xml_id = xml_id
 
192
        self.index = 0
 
193
 
 
194
    def get(self, db, refetch=False):
 
195
        try:
 
196
            if refetch: raise AttributeError
 
197
            return self.value
 
198
        except AttributeError:
 
199
            module, xml_id = self.xml_id.split('.')
 
200
            model, i = db.get('ir.model.data').get_object_reference(module, xml_id)
 
201
            self.value = db.get(model).read([i], ['id'])[0]
 
202
            return self.value['id']
 
203
 
 
204
class wizard(object):
 
205
    def __init__(self, db, model, data):
 
206
        for k, v in data.items():
 
207
            if isinstance(v, ref):
 
208
                data[k] = v.get(db)
 
209
        self.proxy = db.get(model)
 
210
        self.id = self.proxy.create(data)
 
211
 
 
212
    def __getattr__(self, attr):
 
213
        real_attr = getattr(self.proxy, attr)
 
214
        def foo(*args):
 
215
            return real_attr([self.id], *args)
 
216
        return foo
 
217
 
 
218
    def data(self, *attrs):
 
219
        data = self.proxy.read([self.id], attrs)
 
220
        if len(attrs) == 1: return data[0][attrs[0]]
 
221
        else: return data[0]
 
222
 
 
223
class module(object):
 
224
    def __init__(self, db, name, index=0):
 
225
        self.db = db
 
226
        self.module_proxy = db.get('ir.module.module')
 
227
        if name == 'all':
 
228
            name = 'base'
 
229
            #self.ids = self.module_proxy.search([('state','=','installed')])
 
230
            #if not self.ids:
 
231
            #    raise Exception, "No module installed!"
 
232
        #else:
 
233
        self.ids = self.module_proxy.search([('name','=',name)])
 
234
        if not self.ids:
 
235
            raise Exception, "Unable to find module %s!" % (name,)
 
236
 
 
237
    def remove(self):
 
238
        self.expect = 'uninstalled'
 
239
        self.module_proxy.button_uninstall(self.ids)
 
240
        return self
 
241
 
 
242
    uninstall = remove
 
243
 
 
244
    def install(self):
 
245
        self.expect = 'installed'
 
246
        self.module_proxy.button_install(self.ids)
 
247
        return self
 
248
 
 
249
    def upgrade(self):
 
250
        self.expect = 'installed'
 
251
        self.module_proxy.button_upgrade(self.ids)
 
252
        return self
 
253
 
 
254
    def do(self):
 
255
        self.db.get('base.module.upgrade').upgrade_module([])
 
256
        if self.module_proxy.search([('id','in',self.ids),('state','=',self.expect)], 0, False, False, True) == len(self.ids):
 
257
            raise Exception, "Modules modifications not applied"
 
258
        return self
 
259
 
 
260
class user(object):
 
261
    def __init__(self, db, login):
 
262
        self.db = db
 
263
        self.users = db.get('res.users')
 
264
        self.login = login
 
265
        try: self.id = self.users.search([('login','=',login)])[0]
 
266
        except: self.id = None
 
267
 
 
268
    def delGroups(self, *groups):
 
269
        if not self.id: raise Exception, 'Cannot remove groups to unknown user: '+self.login
 
270
        group_ids = [i.id if type(i) == group else group(self.db, i).id for i in groups]
 
271
        if None in group_ids: raise Exception, 'Cannot find some groups in this list: '+', '.join(groups)
 
272
        group_changes = [(3,i,) for i in group_ids]
 
273
        self.users.write([self.id],{'groups_id':group_changes})
 
274
        return self
 
275
 
 
276
    def addGroups(self, *groups):
 
277
        if not self.id: raise Exception, 'Cannot add groups to unknown user: '+self.login
 
278
        group_ids = [i.id if type(i) == group else group(self.db, i).id for i in groups]
 
279
        if None in group_ids: raise Exception, 'Cannot find some groups in this list: '+', '.join(groups)
 
280
        group_changes = [(4,i,) for i in group_ids]
 
281
        self.users.write([self.id],{'groups_id':group_changes})
 
282
        return self
 
283
 
 
284
    def add(self, password, name=None):
 
285
        if not self.id:
 
286
            self.id = self.users.create({'login':self.login,'name':(name or self.login),'password':password,})
 
287
        return self
 
288
 
 
289
    def exists(self):
 
290
        return bool(self.id)
 
291
 
 
292
class group(object):
 
293
    def __init__(self, db, name):
 
294
        self.db = db
 
295
        self.groups = db.get('res.groups')
 
296
        self.name = name
 
297
        try: self.id = self.groups.search([('name','=',name)])[0]
 
298
        except: self.id = None
 
299
 
 
300
    def delUsers(self, *users):
 
301
        if not self.id: self.add()
 
302
        user_ids = [i.id if type(i) == user else user(self.db, i).id for i in users]
 
303
        if None in user_ids: raise Exception, 'Cannot find some users in this list: '+', '.join(users)
 
304
        user_changes = [(3,i,) for i in user_ids]
 
305
        self.groups.write([self.id],{'users':user_changes})
 
306
        return self
 
307
 
 
308
    def addUsers(self, *users):
 
309
        if not self.id: self.add()
 
310
        user_ids = [i.id if type(i) == user else user(self.db, i).id for i in users]
 
311
        if None in user_ids: raise Exception, 'Cannot find some users in this list: '+', '.join(users)
 
312
        user_changes = [(4,i,) for i in user_ids]
 
313
        self.groups.write([self.id],{'users':user_changes})
 
314
        return self
 
315
 
 
316
    def add(self):
 
317
        if not self.id:
 
318
            self.id = self.groups.create({'name':self.name,})
 
319
        return self
 
320
 
 
321
    def exists(self):
 
322
        return bool(self.id)
 
323