43
43
self.logger = logging.getLogger('ubuntuone.SyncDaemon.HQ.hasher')
44
44
self.end_mark = end_mark
46
self.push = functools.partial(event_queue.push, "HQ_HASH_NEW")
46
self.push_ok = functools.partial(event_queue.push, "HQ_HASH_NEW")
47
self.push_error = functools.partial(event_queue.push, "HQ_HASH_ERROR")
47
48
# mutex to access _should_cancel and _hashing attributes
48
49
self.mutex = threading.Lock()
49
50
self._should_cancel = None
61
path = self.queue.get()
62
if path is self.end_mark:
62
info = self.queue.get()
63
if info is self.end_mark:
63
64
self._stopped = True
64
65
self.queue.task_done()
67
self.logger.info("Hasher: got path to hash: %r" % path)
68
result = self._hash(path)
69
m = "Hasher: got file to hash: path %r mdid %s"
70
self.logger.debug(m, path, mdid)
72
result = self._hash(path)
73
except (IOError, OSError), e:
74
m = "Hasher: hash error %s (path %r mdid %s)"
75
self.logger.debug(m, e, path, mdid)
76
reactor.callFromThread(self.push_error, mdid)
77
except StopHashing, e:
78
self.logger.debug(str(e))
80
m = "Hasher: path hash pushed: path %r hash %r"
81
self.logger.debug(m, path, result)
82
reactor.callFromThread(self.push_ok, path, *result)
69
84
self.queue.task_done()
71
reactor.callFromThread(self.push, path, *result)
72
self.logger.info("Hasher: path hash pushed: path %r hash %r"
76
87
"""Stop the hasher (will be effective in the next loop if a hash
143
149
self.hasher.start()
144
150
self.logger.info("HashQueue: _hasher started")
146
def insert(self, path):
152
def insert(self, path, mdid):
147
153
'''Insert the path of a file to be hashed.'''
148
self.logger.info("HashQueue: inserting path %r" % path)
154
self.logger.info("HashQueue: inserting path %r mdid %s", path, mdid)
149
155
# only accept it if we are running
150
156
if self._stopped:
151
157
raise RuntimeError("The HashQueue was stopped")
152
158
self.hasher.cancel_if_running(path)
153
self._queue.put(path)
159
self._queue.put((path, mdid))
155
161
def shutdown(self):
156
162
'''Shutdown all resources and clear the queue'''