~unifield-team/unifield-wm/us-808-sync

« back to all changes in this revision

Viewing changes to sync_server/update.py

  • Committer: jf
  • Date: 2015-11-23 16:39:39 UTC
  • mfrom: (601.1.14 unifield-sync)
  • Revision ID: jfb@tempo-consulting.fr-20151123163939-yf7d3zfy48vzpyzv
Tags: uf2.0rc1
US-703 [IMP] Sync server: remove puller
US-703 [IMP] Split data pulled ids
SP-190 [IMP] Data push: do not browse all records
SP-199 [IMP] Do not send multi delete updates for the same record
US-684 [IMP] Close and remove cursor used by sync process
Us-684 [IMP] Don't load db dump file in memory

lp:~unifield-team/unifield-wm/us-703-sync-qt

Changes in sync_server.sync_rule.csv:
remove duplicated rule to generate delete updates on:
    - account.target.costcenter #107, done @rule #113
    - res.currency.rate #128, done @rule #106
    - cost.center.distribution.line #202 and #203, done @rule #201
    - funding.pool.distribution.line #211, done @rule #210
    - account.cashbox.line #404, done @rule #402
    - account.move.line #417, done @rule #412
    - account.analytic.line #421 and #422, done @rule #420
    - financing.contract.format #450, done @rule @451
    - financing.contract.format.line #457, done @rule #452
    - supplier.catalogue #655, done @rule #650
    - supplier.catalogue.line #656, done @rule #655

Show diffs side-by-side

added added

removed removed

Lines of Context:
188
188
        if not cr.fetchone():
189
189
            cr.execute("CREATE INDEX sync_server_update_sequence_id_index on sync_server_update (sequence, id)")
190
190
 
191
 
    def __init__(self, pool, cr):
192
 
        self._cache_pullers = SavePullerCache(self)
193
 
        super(update, self).__init__(pool, cr)
 
191
#    def __init__(self, pool, cr):
 
192
#        self._cache_pullers = SavePullerCache(self)
 
193
#        super(update, self).__init__(pool, cr)
194
194
 
195
195
    def _save_puller(self, cr, uid, context=None):
196
 
        return self._cache_pullers.merge(cr, uid, context)
 
196
        return True
 
197
#        return self._cache_pullers.merge(cr, uid, context)
197
198
 
198
199
    def unfold_package(self, cr, uid, entity, packet, context=None):
199
200
        """
313
314
            return 0
314
315
        seq = self.browse(cr, uid, ids, context=context)[0].sequence
315
316
        return seq
316
 
    
317
 
    def get_update_to_send(self,cr, uid, entity, update_ids, recover=False, context=None):
318
 
        """
319
 
            Called by get_package during the client instance pull process.
320
 
            Return a list of browse_record with only the updates that really need to be send to the client.
321
 
            It filter according to the rules:
322
 
             - rule's directionality is 'up' and update source is a child of entity
323
 
             - rule's directionality is 'down' and update source is an ancestor of entity
324
 
             - rule's directionality is 'bidirectional' (synchronize every where)
325
 
             - rule's directionality is 'bi-private': only the ancestors of update's owner field value
326
 
               are allowed to get the update.
327
 
            Then, it checks the groups.
328
 
 
329
 
            @param cr : cr
330
 
            @param uid : uid
331
 
            @param entity : browse_record(sync.server.entity) : client instance entity
332
 
            @param update_ids : list of update ids
333
 
            @param recover : flag : if set to True, update of the same source than the entity who try to pull
334
 
                are sent to the entity. Default is False.
335
 
            @param context : context
336
 
 
337
 
            @return : list of browse record of the updates to send
338
 
        """
339
 
        update_to_send = []
340
 
        ancestor = self.pool.get('sync.server.entity')._get_ancestor(cr, uid, entity.id, context=context) 
341
 
        children = self.pool.get('sync.server.entity')._get_all_children(cr, uid, entity.id, context=context)
342
 
        for update in self.browse(cr, uid, update_ids, context=context):
343
 
            if update.rule_id.direction == 'bi-private':
344
 
                if update.is_deleted:
345
 
                    privates = [entity.id]
346
 
                elif not update.owner:
347
 
                    privates = []
348
 
                else:
349
 
                    privates = self.pool.get('sync.server.entity')._get_ancestor(cr, uid, update.owner.id, context=context) + \
350
 
                               [update.owner.id]
351
 
            else:
352
 
                privates = []
353
 
            if not update.rule_id:
354
 
                update_to_send.append(update)
355
 
            elif (update.rule_id.direction == 'up' and update.source.id in children) or \
356
 
               (update.rule_id.direction == 'down' and update.source.id in ancestor) or \
357
 
               (update.rule_id.direction == 'bidirectional') or \
358
 
               (entity.id in privates) or \
359
 
               (recover and entity.id == update.source.id):
360
 
                
361
 
                source_rules_ids = self.pool.get('sync_server.sync_rule')._get_groups_per_rule(cr, uid, update.source, context)
362
 
                s_group = source_rules_ids.get(update.rule_id.id, [])
363
 
                if any(group.id in s_group for group in entity.group_ids):
364
 
                    update_to_send.append(update)
365
 
        return update_to_send
366
317
 
367
318
    def get_package(self, cr, uid, entity, last_seq, offset, max_size, max_seq, recover=False, context=None):
368
319
        """
434
385
        timed_out = False
435
386
        start_time = time.time()
436
387
        self._logger.info("::::::::[%s] Data pull get package:: last_seq = %s, max_seq = %s, offset = %s, max_size = %s" % (entity.name, last_seq, max_seq, offset, max_size))
 
388
 
 
389
        ancestor = self.pool.get('sync.server.entity')._get_ancestor(cr, uid, entity.id, context=context) 
 
390
        children = self.pool.get('sync.server.entity')._get_all_children(cr, uid, entity.id, context=context)
 
391
        cache_rule = {}
437
392
        while not ids or not update_to_send:
438
393
            if time.time() - start_time > 500:
439
394
                timed_out = True
443
398
            ids = map(lambda x:x[0], cr.fetchall())
444
399
            if not ids and update_master is None:
445
400
                break
446
 
            for update in self.get_update_to_send(cr, uid, entity, ids, recover, context):
447
 
                if update_master is None:
448
 
                    update_master = update
449
 
                    update_to_send.append(update_master)
450
 
                elif update.model == update_master.model and \
451
 
                   update.rule_id.id == update_master.rule_id.id and \
452
 
                   update.source.id == update_master.source.id and \
453
 
                   update.is_deleted == update_master.is_deleted and \
454
 
                   len(update_to_send) < max_size:
455
 
                    update_to_send.append(update)
 
401
 
 
402
            for update_to_compute in self.browse(cr, uid, ids, context=context):
 
403
                update = False
 
404
                if update_to_compute.rule_id.direction == 'bi-private':
 
405
                    if update_to_compute.is_deleted:
 
406
                        privates = [entity.id]
 
407
                    elif not update_to_compute.owner:
 
408
                        privates = []
 
409
                    else:
 
410
                        privates = self.pool.get('sync.server.entity')._get_ancestor(cr, uid, update_to_compute.owner.id, context=context) + \
 
411
                                   [update_to_compute.owner.id]
456
412
                else:
457
 
                    ids = ids[:ids.index(update.id)]
458
 
                    break
 
413
                    privates = []
 
414
 
 
415
                if not update_to_compute.rule_id:
 
416
                    update = update_to_compute
 
417
                elif (update_to_compute.rule_id.direction == 'up' and update_to_compute.source.id in children) or \
 
418
                   (update_to_compute.rule_id.direction == 'down' and update_to_compute.source.id in ancestor) or \
 
419
                   (update_to_compute.rule_id.direction == 'bidirectional') or \
 
420
                   (entity.id in privates) or \
 
421
                   (recover and entity.id == update_to_compute.source.id):
 
422
 
 
423
                    if update_to_compute.source not in cache_rule:
 
424
                        cache_rule[update_to_compute.source] = self.pool.get('sync_server.sync_rule')._get_groups_per_rule(cr, uid, update_to_compute.source, context)
 
425
                    s_group = cache_rule[update_to_compute.source].get(update_to_compute.rule_id.id, [])
 
426
                    if any(group.id in s_group for group in entity.group_ids):
 
427
                        update = update_to_compute
 
428
 
 
429
                if update:
 
430
                    if update_master is None:
 
431
                        update_master = update
 
432
                        update_to_send.append(update_master)
 
433
                    elif update.model == update_master.model and \
 
434
                       update.rule_id.id == update_master.rule_id.id and \
 
435
                       update.source.id == update_master.source.id and \
 
436
                       update.is_deleted == update_master.is_deleted and \
 
437
                       len(update_to_send) < max_size:
 
438
                        update_to_send.append(update)
 
439
                    else:
 
440
                        ids = ids[:ids.index(update.id)]
 
441
                        break
459
442
            offset += len(ids)
460
443
        if timed_out and not update_to_send:
461
444
            # send a fake update to keep to connection open
475
458
            return None
476
459
 
477
460
        # Point of no return
478
 
        self._cache_pullers.add(entity, update_to_send)
 
461
#        self._cache_pullers.add(entity, update_to_send)
 
462
        for update in iter(update_to_send):
 
463
            self.pool.get('sync.server.puller_logs').create(cr, 1, {
 
464
                'update_id': update.id,
 
465
                'entity_id': entity.id,
 
466
            }, context=context)
479
467
 
480
468
        ## Package template
481
469
        data = {