~rmcbride/ubuntu/lucid/ubuntuone-client/fixucg

« back to all changes in this revision

Viewing changes to ubuntuone/syncdaemon/hash_queue.py

  • Committer: Bazaar Package Importer
  • Author(s): Rodney Dawes
  • Date: 2009-08-26 12:15:00 UTC
  • mfrom: (1.1.5 upstream)
  • Revision ID: james.westby@ubuntu.com-20090826121500-rivjec5146epys62
Tags: 0.93.0-0ubuntu1
* New upstream release.
  - Fix crash with KeyError in __getitem__ (LP: #404934)
  - Fix fatal error constant occurrances (LP: #414635)
  - Fix ability to reauthorize (LP: #406897)
  - Get rid of animated icon, and false activity status (LP: #414925)
* Update Standards-Version to 3.8.3

Show diffs side-by-side

added added

removed removed

Lines of Context:
43
43
        self.logger = logging.getLogger('ubuntuone.SyncDaemon.HQ.hasher')
44
44
        self.end_mark = end_mark
45
45
        self.queue = queue
46
 
        self.push = functools.partial(event_queue.push, "HQ_HASH_NEW")
 
46
        self.push_ok = functools.partial(event_queue.push, "HQ_HASH_NEW")
 
47
        self.push_error = functools.partial(event_queue.push, "HQ_HASH_ERROR")
47
48
        # mutex to access _should_cancel and _hashing attributes
48
49
        self.mutex = threading.Lock()
49
50
        self._should_cancel = None
58
59
        while True:
59
60
            if self._stopped:
60
61
                break
61
 
            path = self.queue.get()
62
 
            if path is self.end_mark:
 
62
            info = self.queue.get()
 
63
            if info is self.end_mark:
63
64
                self._stopped = True
64
65
                self.queue.task_done()
65
66
                break
66
67
 
67
 
            self.logger.info("Hasher: got path to hash: %r" % path)
68
 
            result = self._hash(path)
 
68
            path, mdid = info
 
69
            m = "Hasher: got file to hash: path %r  mdid %s"
 
70
            self.logger.debug(m, path, mdid)
 
71
            try:
 
72
                result = self._hash(path)
 
73
            except (IOError, OSError), e:
 
74
                m = "Hasher: hash error %s  (path %r  mdid %s)"
 
75
                self.logger.debug(m, e, path, mdid)
 
76
                reactor.callFromThread(self.push_error, mdid)
 
77
            except StopHashing, e:
 
78
                self.logger.debug(str(e))
 
79
            else:
 
80
                m = "Hasher: path hash pushed:  path %r  hash %r"
 
81
                self.logger.debug(m, path, result)
 
82
                reactor.callFromThread(self.push_ok, path, *result)
 
83
 
69
84
            self.queue.task_done()
70
 
            if result:
71
 
                reactor.callFromThread(self.push, path, *result)
72
 
            self.logger.info("Hasher: path hash pushed:  path %r  hash %r"
73
 
                                                            % (path, result))
74
85
 
75
86
    def stop(self):
76
87
        """Stop the hasher (will be effective in the next loop if a hash
106
117
                    hasher.update(cont)
107
118
                    crc = crc32(cont, crc)
108
119
                    size += len(cont)
109
 
        except (IOError, OSError):
110
 
            return None
111
 
        except StopHashing, e:
112
 
            self.logger.info(str(e))
113
 
            return None
114
120
        finally:
115
121
            with self.mutex:
116
122
                self._hashing = None
143
149
        self.hasher.start()
144
150
        self.logger.info("HashQueue: _hasher started")
145
151
 
146
 
    def insert(self, path):
 
152
    def insert(self, path, mdid):
147
153
        '''Insert the path of a file to be hashed.'''
148
 
        self.logger.info("HashQueue: inserting path %r" % path)
 
154
        self.logger.info("HashQueue: inserting path %r  mdid %s", path, mdid)
149
155
        # only accept it if we are running
150
156
        if self._stopped:
151
157
            raise RuntimeError("The HashQueue was stopped")
152
158
        self.hasher.cancel_if_running(path)
153
 
        self._queue.put(path)
 
159
        self._queue.put((path, mdid))
154
160
 
155
161
    def shutdown(self):
156
162
        '''Shutdown all resources and clear the queue'''