~unifield-team/unifield-wm/us-808-sync

« back to all changes in this revision

Viewing changes to sync_client/sync_client.py

  • Committer: jf
  • Date: 2015-11-23 16:39:39 UTC
  • mfrom: (601.1.14 unifield-sync)
  • Revision ID: jfb@tempo-consulting.fr-20151123163939-yf7d3zfy48vzpyzv
Tags: uf2.0rc1
US-703 [IMP] Sync server: remove puller
US-703 [IMP] Split data pulled ids
SP-190 [IMP] Data push: do not browse all records
SP-199 [IMP] Do not send multi delete updates for the same record
US-684 [IMP] Close and remove cursor used by sync process
Us-684 [IMP] Don't load db dump file in memory

lp:~unifield-team/unifield-wm/us-703-sync-qt

Changes in sync_server.sync_rule.csv:
remove duplicated rule to generate delete updates on:
    - account.target.costcenter #107, done @rule #113
    - res.currency.rate #128, done @rule #106
    - cost.center.distribution.line #202 and #203, done @rule #201
    - funding.pool.distribution.line #211, done @rule #210
    - account.cashbox.line #404, done @rule #402
    - account.move.line #417, done @rule #412
    - account.analytic.line #421 and #422, done @rule #420
    - financing.contract.format #450, done @rule @451
    - financing.contract.format.line #457, done @rule #452
    - supplier.catalogue #655, done @rule #650
    - supplier.catalogue.line #656, done @rule #655

Show diffs side-by-side

added added

removed removed

Lines of Context:
89
89
        except:
90
90
            pass
91
91
        finally:
92
 
            cr.close()
 
92
            cr.close(True)
93
93
 
94
94
def sync_subprocess(step='status', defaults_logger={}):
95
95
    def decorator(fn):
247
247
                if make_log:
248
248
                    logger.close()
249
249
                    if self.sync_cursor is not None:
250
 
                        self.sync_cursor.close()
 
250
                        self.sync_cursor.close(True)
251
251
                else:
252
252
                    logger.write()
253
253
            return res
541
541
        if not res[0]: raise Exception, res[1]
542
542
 
543
543
        updates_count = self.retrieve_update(cr, uid, max_packet_size, recover=recover, context=context)
544
 
        self._logger.info("::::::::The instance " + entity.name + " pulled: " + str(res[1]) + " messages and " + str(updates_count) + " updates.")        
 
544
        self._logger.info("::::::::The instance " + entity.name + " pulled: " + str(res[1]) + " messages and " + str(updates_count) + " updates.")
545
545
        updates_executed = self.execute_updates(cr, uid, context=context)
546
546
        if updates_executed == 0 and updates_count > 0:
547
547
            self._logger.warning("No update to execute, this case should never occurs.")
577
577
        last = (last_seq >= max_seq)
578
578
        updates_count = 0
579
579
        logger_index = None
 
580
        proxy = self.pool.get("sync.client.sync_server_connection").get_connection(cr, uid, "sync.server.sync_manager")
580
581
        while not last:
581
 
            proxy = self.pool.get("sync.client.sync_server_connection").get_connection(cr, uid, "sync.server.sync_manager")
582
582
            res = proxy.get_update(entity.identifier, self._hardware_id, last_seq, offset, max_packet_size, max_seq, recover)
583
583
            if res and res[0]:
584
584
                updates_count += updates.unfold_package(cr, uid, res[1], context=context)
616
616
 
617
617
        # Get a list of updates to execute
618
618
        # Warning: execution order matter
619
 
        update_ids = updates.search(cr, uid, [('run', '=', False)], order='id asc', context=context)
 
619
        update_ids = updates.search(cr, uid, [('run', '=', False)], order='sequence_number, rule_sequence, id asc', context=context)
620
620
        update_count = len(update_ids)
621
621
        if not update_count: return 0
622
622
 
623
 
        # Sort updates by rule_sequence
624
 
        whole = updates.browse(cr, uid, update_ids, context=context)
625
 
        update_groups = dict()
626
 
 
627
 
        for update in whole:
628
 
            group_key = (update.sequence_number, update.rule_sequence)
629
 
            try:
630
 
                update_groups[group_key].append(update.id)
631
 
            except KeyError:
632
 
                update_groups[group_key] = [update.id]
633
 
 
634
623
        try:
635
624
            if logger: logger_index = logger.append()
636
625
            done = []
637
626
            imported, deleted = 0, 0
638
 
            for rule_seq in sorted(update_groups.keys()):
639
 
                update_ids = update_groups[rule_seq]
640
 
                while update_ids:
641
 
                    to_do, update_ids = update_ids[:MAX_EXECUTED_UPDATES], update_ids[MAX_EXECUTED_UPDATES:]
642
 
                    messages, imported_executed, deleted_executed = \
643
 
                        updates.execute_update(cr, uid,
644
 
                            to_do,
645
 
                            priorities=priorities_stuff,
646
 
                            context=context)
647
 
                    imported += imported_executed
648
 
                    deleted += deleted_executed
649
 
                    # Do nothing with messages
650
 
                    done.extend(to_do)
651
 
                    if logger:
652
 
                        logger.replace(logger_index, _("Update(s) processed: %d import updates + %d delete updates on %d updates") \
653
 
                                                     % (imported, deleted, update_count))
654
 
                        logger.write()
655
 
                    # intermittent commit
656
 
                    if len(done) >= MAX_EXECUTED_UPDATES:
657
 
                        done[:] = []
658
 
                        cr.commit()
 
627
            while update_ids:
 
628
                to_do, update_ids = update_ids[:MAX_EXECUTED_UPDATES], update_ids[MAX_EXECUTED_UPDATES:]
 
629
                messages, imported_executed, deleted_executed = \
 
630
                    updates.execute_update(cr, uid,
 
631
                        to_do,
 
632
                        priorities=priorities_stuff,
 
633
                        context=context)
 
634
                imported += imported_executed
 
635
                deleted += deleted_executed
 
636
                # Do nothing with messages
 
637
                done += to_do
 
638
                if logger:
 
639
                    logger.replace(logger_index, _("Update(s) processed: %d import updates + %d delete updates on %d updates") \
 
640
                                                 % (imported, deleted, update_count))
 
641
                    logger.write()
 
642
                # intermittent commit
 
643
                if len(done) >= MAX_EXECUTED_UPDATES:
 
644
                    done[:] = []
 
645
                    cr.commit()
659
646
        finally:
660
647
            cr.commit()
661
648
 
923
910
 
924
911
        return True
925
912
 
 
913
 
926
914
    @sync_process()
927
915
    def sync_withbackup(self, cr, uid, context=None):
928
916
        """
991
979
            #except StandardError:
992
980
            #    return False
993
981
            self.aborting = True
994
 
            self.sync_cursor.close()
 
982
            self.sync_cursor.close(True)
995
983
        return True
996
984
 
997
985
Entity()