1
# canonical.ubuntuone.storage.syncdaemon.hash_queue - hash queues
3
# Author: Facundo Batista <facundo@canonical.com>
5
# Copyright 2009 Canonical Ltd.
7
# This program is free software: you can redistribute it and/or modify it
8
# under the terms of the GNU General Public License version 3, as published
9
# by the Free Software Foundation.
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranties of
13
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
14
# PURPOSE. See the GNU General Public License for more details.
16
# You should have received a copy of the GNU General Public License along
17
# with this program. If not, see <http://www.gnu.org/licenses/>.
18
'''Module that implements the Hash Queue machinery.'''
20
from __future__ import with_statement
27
from twisted.internet import reactor
29
from canonical.ubuntuone.storage.protocol.hash import \
30
content_hash_factory, crc32
33
class _Hasher(threading.Thread):
34
'''Class that lives in another thread, hashing all night long.'''
35
def __init__(self, queue, end_mark, event_queue):
36
self.logger = logging.getLogger('ubuntuone.SyncDaemon.HQ.hasher')
37
self.end_mark = end_mark
39
self.push = functools.partial(event_queue.push, "HQ_HASH_NEW")
40
threading.Thread.__init__(self)
45
path = self.queue.get()
46
if path is self.end_mark:
49
self.logger.info("Hasher: got path to hash: %r" % path)
50
result = self._hash(path)
52
reactor.callFromThread(self.push, path, *result)
53
self.logger.info("Hasher: path hash pushed: path %r hash %r"
56
def _hash(self, path):
57
'''Actually hashes a file.'''
58
hasher = content_hash_factory()
62
with open(path) as fh:
68
crc = crc32(cont, crc)
73
return hasher.content_hash(), crc, size
75
class HashQueue(object):
76
'''Interface between the real Hasher and the rest of the world.'''
78
def __init__(self, event_queue):
79
self.logger = logging.getLogger('ubuntuone.SyncDaemon.HQ')
80
self._queue = Queue.Queue()
81
self._end_mark = object()
82
t = _Hasher(self._queue, self._end_mark, event_queue)
85
self.logger.info("HashQueue: _hasher started")
87
def insert(self, path):
88
'''Insert the path of a file to be hashed.'''
89
self.logger.info("HashQueue: inserting path %r" % path)
93
'''Shutdown all resources.'''
94
self._queue.put(self._end_mark)
95
self.logger.info("HashQueue: _hasher stopped")
98
'''Return whether we are empty or not'''
99
return self._queue.empty()
102
'''Return the length of the queue'''
103
return self._queue.qsize()