~dobey/ubuntuone-client/fix-headers-1-6

« back to all changes in this revision

Viewing changes to canonical/ubuntuone/storage/syncdaemon/hash_queue.py

  • Committer: Rodney Dawes
  • Date: 2009-05-12 13:36:05 UTC
  • Revision ID: rodney.dawes@canonical.com-20090512133605-6aqs6e8xnnmp5u1p
        Import the code
        Hook up lint/trial tests in setup.py
        Use icontool now instead of including the render script
        Add missing python-gnome2-desktop to package dependencies
        Update debian/rules to fix the icon cache issue

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# canonical.ubuntuone.storage.syncdaemon.hash_queue - hash queues
 
2
#
 
3
# Author: Facundo Batista <facundo@canonical.com>
 
4
#
 
5
# Copyright 2009 Canonical Ltd.
 
6
#
 
7
# This program is free software: you can redistribute it and/or modify it
 
8
# under the terms of the GNU General Public License version 3, as published
 
9
# by the Free Software Foundation.
 
10
#
 
11
# This program is distributed in the hope that it will be useful, but
 
12
# WITHOUT ANY WARRANTY; without even the implied warranties of
 
13
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
 
14
# PURPOSE.  See the GNU General Public License for more details.
 
15
#
 
16
# You should have received a copy of the GNU General Public License along
 
17
# with this program.  If not, see <http://www.gnu.org/licenses/>.
 
18
'''Module that implements the Hash Queue machinery.'''
 
19
 
 
20
from __future__ import with_statement
 
21
 
 
22
import logging
 
23
import threading
 
24
import functools
 
25
import Queue
 
26
 
 
27
from twisted.internet import reactor
 
28
 
 
29
from canonical.ubuntuone.storage.protocol.hash import \
 
30
    content_hash_factory, crc32
 
31
 
 
32
 
 
33
class _Hasher(threading.Thread):
 
34
    '''Class that lives in another thread, hashing all night long.'''
 
35
    def __init__(self, queue, end_mark, event_queue):
 
36
        self.logger = logging.getLogger('ubuntuone.SyncDaemon.HQ.hasher')
 
37
        self.end_mark = end_mark
 
38
        self.queue = queue
 
39
        self.push = functools.partial(event_queue.push, "HQ_HASH_NEW")
 
40
        threading.Thread.__init__(self)
 
41
 
 
42
    def run(self):
 
43
        '''Run the thread.'''
 
44
        while True:
 
45
            path = self.queue.get()
 
46
            if path is self.end_mark:
 
47
                break
 
48
 
 
49
            self.logger.info("Hasher: got path to hash: %r" % path)
 
50
            result = self._hash(path)
 
51
            if result:
 
52
                reactor.callFromThread(self.push, path, *result)
 
53
            self.logger.info("Hasher: path hash pushed:  path %r  hash %r"
 
54
                                                            % (path, result))
 
55
 
 
56
    def _hash(self, path):
 
57
        '''Actually hashes a file.'''
 
58
        hasher = content_hash_factory()
 
59
        crc = 0
 
60
        size = 0
 
61
        try:
 
62
            with open(path) as fh:
 
63
                while True:
 
64
                    cont = fh.read(65536)
 
65
                    if not cont:
 
66
                        break
 
67
                    hasher.update(cont)
 
68
                    crc = crc32(cont, crc)
 
69
                    size += len(cont)
 
70
        except IOError:
 
71
            return None
 
72
 
 
73
        return hasher.content_hash(), crc, size
 
74
 
 
75
class HashQueue(object):
 
76
    '''Interface between the real Hasher and the rest of the world.'''
 
77
 
 
78
    def __init__(self, event_queue):
 
79
        self.logger = logging.getLogger('ubuntuone.SyncDaemon.HQ')
 
80
        self._queue = Queue.Queue()
 
81
        self._end_mark = object()
 
82
        t = _Hasher(self._queue, self._end_mark, event_queue)
 
83
        t.setDaemon(True)
 
84
        t.start()
 
85
        self.logger.info("HashQueue: _hasher started")
 
86
 
 
87
    def insert(self, path):
 
88
        '''Insert the path of a file to be hashed.'''
 
89
        self.logger.info("HashQueue: inserting path %r" % path)
 
90
        self._queue.put(path)
 
91
 
 
92
    def shutdown(self):
 
93
        '''Shutdown all resources.'''
 
94
        self._queue.put(self._end_mark)
 
95
        self.logger.info("HashQueue: _hasher stopped")
 
96
 
 
97
    def empty(self):
 
98
        '''Return whether we are empty or not'''
 
99
        return self._queue.empty()
 
100
 
 
101
    def __len__(self):
 
102
        '''Return the length of the queue'''
 
103
        return self._queue.qsize()