~rmcbride/ubuntu/lucid/ubuntuone-client/fixucg

« back to all changes in this revision

Viewing changes to ubuntuone/syncdaemon/volume_manager.py

  • Committer: Bazaar Package Importer
  • Author(s): Rodney Dawes
  • Date: 2009-06-30 12:00:00 UTC
  • Revision ID: james.westby@ubuntu.com-20090630120000-by806ovmw3193qe8
Tags: upstream-0.90.3
ImportĀ upstreamĀ versionĀ 0.90.3

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# ubuntuone.syncdaemon.volume_manager - manages volumes
 
2
#
 
3
# Author: Guillermo Gonzalez <guillermo.gonzalez@canonical.com>
 
4
#
 
5
# Copyright 2009 Canonical Ltd.
 
6
#
 
7
# This program is free software: you can redistribute it and/or modify it
 
8
# under the terms of the GNU General Public License version 3, as published
 
9
# by the Free Software Foundation.
 
10
#
 
11
# This program is distributed in the hope that it will be useful, but
 
12
# WITHOUT ANY WARRANTY; without even the implied warranties of
 
13
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
 
14
# PURPOSE.  See the GNU General Public License for more details.
 
15
#
 
16
# You should have received a copy of the GNU General Public License along
 
17
# with this program.  If not, see <http://www.gnu.org/licenses/>.
 
18
""" The all mighty Volume Manager """
 
19
from __future__ import with_statement
 
20
 
 
21
import logging
 
22
import os
 
23
import shutil
 
24
import stat
 
25
from contextlib import contextmanager
 
26
from itertools import ifilter
 
27
 
 
28
from ubuntuone.syncdaemon.marker import MDMarker
 
29
from ubuntuone.syncdaemon import file_shelf
 
30
 
 
31
 
 
32
class Share(object):
 
33
    """Representas a share or mount point"""
 
34
 
 
35
    def __init__(self, path, share_id='', name=None, access_level='View',
 
36
                 accepted=False, other_username=None, other_visible_name=None,
 
37
                 subtree=None):
 
38
        """ Creates the instance.
 
39
 
 
40
        The received path should be 'bytes'
 
41
        """
 
42
        if path is None:
 
43
            self.path = None
 
44
        else:
 
45
            self.path = os.path.normpath(path)
 
46
        self.id = str(share_id)
 
47
        self.access_level = access_level
 
48
        self.accepted = accepted
 
49
        self.name = name
 
50
        self.other_username = other_username
 
51
        self.other_visible_name = other_visible_name
 
52
        self.subtree = subtree
 
53
 
 
54
    @classmethod
 
55
    def from_response(cls, share_response, path):
 
56
        """ Creates a Share instance from a ShareResponse.
 
57
 
 
58
        The received path should be 'bytes'
 
59
        """
 
60
        share = cls(path, str(share_response.id), share_response.name,
 
61
                    share_response.access_level, share_response.accepted,
 
62
                    share_response.other_username,
 
63
                    share_response.other_visible_name, share_response.subtree)
 
64
        return share
 
65
 
 
66
    @classmethod
 
67
    def from_notify_holder(cls, share_notify, path):
 
68
        """ Creates a Share instance from a NotifyShareHolder.
 
69
 
 
70
        The received path should be 'bytes'
 
71
        """
 
72
        share = cls(path, share_id=str(share_notify.share_id),
 
73
                    name=share_notify.share_name,
 
74
                    access_level=share_notify.access_level,
 
75
                    other_username=share_notify.from_username,
 
76
                    other_visible_name=share_notify.from_visible_name,
 
77
                    subtree=share_notify.subtree)
 
78
        return share
 
79
 
 
80
    def can_write(self):
 
81
        """ check the access_level of this share,
 
82
        returns True if it's 'Modify'.
 
83
        """
 
84
        return self.access_level == 'Modify'
 
85
 
 
86
 
 
87
class VolumeManager(object):
 
88
    """Manages shares and mount points."""
 
89
 
 
90
    CACHE_VERSION = '1'
 
91
 
 
92
    def __init__(self, main):
 
93
        """Create the instance and populate the shares/d attributes
 
94
        from the metadata (if it exists).
 
95
        """
 
96
        self.log = logging.getLogger('ubuntuone.SyncDaemon.VM')
 
97
        self.m = main
 
98
        data_dir = os.path.join(self.m.data_dir, 'vm')
 
99
        version_file = os.path.join(data_dir, '.version')
 
100
        shares_dir = os.path.join(data_dir, 'shares')
 
101
        shared_dir = os.path.join(data_dir, 'shared')
 
102
        if os.path.exists(data_dir) and not os.path.exists(version_file) \
 
103
           and not os.path.exists(shares_dir):
 
104
            self.upgrade_shelf_layout(data_dir, shares_dir)
 
105
        # write down the version file if it don't exists
 
106
        if not os.path.exists(version_file):
 
107
            if not os.path.exists(os.path.dirname(version_file)):
 
108
                os.makedirs(os.path.dirname(version_file))
 
109
            with open(version_file, 'w') as fd:
 
110
                fd.write(VolumeManager.CACHE_VERSION)
 
111
        if not os.path.exists(shares_dir):
 
112
            os.makedirs(shares_dir)
 
113
        if not os.path.exists(shared_dir):
 
114
            os.makedirs(shared_dir)
 
115
        self.shares = ShareFileShelf(shares_dir)
 
116
        self.shared = ShareFileShelf(shared_dir)
 
117
        if self.shares.get('') is None:
 
118
            self.root = Share(self.m.root_dir)
 
119
        else:
 
120
            self.root = self.shares['']
 
121
        self.root.access_level = 'Modify'
 
122
        self.root.path = self.m.root_dir
 
123
        self.shares[''] = self.root
 
124
        with allow_writes(os.path.dirname(self.m.shares_dir)):
 
125
            if not os.path.exists(self.m.shares_dir):
 
126
                os.makedirs(self.m.shares_dir)
 
127
                # make it read only
 
128
            os.chmod(self.m.shares_dir, 0555)
 
129
        self.marker_share_map = {}
 
130
        self.list_shares_retries = 0
 
131
        self.retries_limit = 5
 
132
 
 
133
    def init_root(self):
 
134
        """ Creates the root mdid. """
 
135
        self._create_share_dir(self.root)
 
136
        try:
 
137
            mdobj = self.m.fs.get_by_path(self.root.path)
 
138
        except KeyError:
 
139
            mdobj = self.m.fs.create(path=self.root.path,
 
140
                                     share_id='', is_dir=True)
 
141
            self.m.fs.set_by_path(path=self.root.path,
 
142
                                  local_hash=None, server_hash=None)
 
143
 
 
144
    def on_server_root(self, root):
 
145
        """Asociate server root"""
 
146
        self.log.debug('init_root(%s)', root)
 
147
        mdobj = self.m.fs.get_by_path(self.root.path)
 
148
        if getattr(mdobj, 'node_id', None) is None:
 
149
            self.m.fs.set_node_id(self.root.path, root)
 
150
        share = self.shares['']
 
151
        self.root.subtree = share.subtree = root
 
152
        self.shares[''] = share
 
153
        self.refresh_shares()
 
154
        return mdobj.mdid
 
155
 
 
156
    def refresh_shares(self):
 
157
        """ Reuqest the list of shares to the server. """
 
158
        # request the list of shares
 
159
        self.m.action_q.list_shares()
 
160
 
 
161
    def handle_SYS_STATE_CHANGED(self, state):
 
162
        """
 
163
        The system changed state. If we don't know our root's uuid yet,
 
164
        and the AQ is online, ask for it.
 
165
        """
 
166
        if state.is_connected and self.root.subtree is None:
 
167
            try:
 
168
                mdobj = self.m.fs.get_by_path(self.root.path)
 
169
                mdid = mdobj.mdid
 
170
            except KeyError:
 
171
                mdid = self.m.fs.create(path=self.root.path, share_id='',
 
172
                                         is_dir=True)
 
173
                self.m.fs.set_by_path(path=self.root.path,
 
174
                                      local_hash=None, server_hash=None)
 
175
            self.m.get_root(MDMarker(mdid))
 
176
 
 
177
    def handle_AQ_SHARES_LIST(self, shares_list):
 
178
        """ handle AQ_SHARES_LIST event """
 
179
        self.log.debug('handling shares list: ')
 
180
        self.list_shares_retries = 0
 
181
        shares = []
 
182
        shared = []
 
183
        for a_share in shares_list.shares:
 
184
            share_id = getattr(a_share, 'id',
 
185
                               getattr(a_share, 'share_id', None))
 
186
            self.log.debug('share %r: id=%s, name=%r', a_share.direction,
 
187
                           share_id, a_share.name)
 
188
            if a_share.direction == "to_me":
 
189
                dir_name = self._build_dirname(a_share.name,
 
190
                                               a_share.other_visible_name)
 
191
                path = os.path.join(self.m.shares_dir, dir_name)
 
192
                share = Share.from_response(a_share, path)
 
193
                shares.append(share.id)
 
194
                self.add_share(share)
 
195
            elif a_share.direction == "from_me":
 
196
                try:
 
197
                    mdobj = self.m.fs.get_by_node_id("", a_share.subtree)
 
198
                    path = self.m.fs.get_abspath(mdobj.share_id, mdobj.path)
 
199
                except KeyError:
 
200
                    # we don't have the file/md of this shared subtree yet
 
201
                    # for the moment ignore this share
 
202
                    self.log.warning("we got a share with 'from_me' direction,"
 
203
                            " but don't have the node_id in the metadata yet")
 
204
                    path = None
 
205
                share = Share.from_response(a_share, path)
 
206
                shared.append(share.id)
 
207
                self.add_shared(share)
 
208
 
 
209
        # housekeeping of the shares and shared shelf's each time we get the
 
210
        # list of shares
 
211
        self.log.debug('deleting dead shares')
 
212
        for share in ifilter(lambda item: item and item not in shares,
 
213
                             self.shares):
 
214
            self.log.debug('deleting share: id=%s', share)
 
215
            self.share_deleted(share)
 
216
        for share in ifilter(lambda item: item and item not in shared,
 
217
                             self.shared):
 
218
            self.log.debug('deleting shared: id=%s', share)
 
219
            del self.shared[share]
 
220
 
 
221
    def _build_dirname(self, share_name, visible_name):
 
222
        '''Builds the root path using the share information.'''
 
223
        dir_name = share_name + u' from ' + visible_name
 
224
 
 
225
        # Unicode boundary! the name is Unicode in protocol and server,
 
226
        # but here we use bytes for paths
 
227
        dir_name = dir_name.encode("utf8")
 
228
        return dir_name
 
229
 
 
230
    def handle_AQ_LIST_SHARES_ERROR(self, error):
 
231
        """ handle AQ_LIST_SHARES_ERROR event """
 
232
        # just call list_shares again, until we reach the retry limit
 
233
        if self.list_shares_retries <= self.retries_limit:
 
234
            self.m.action_q.list_shares()
 
235
            self.list_shares_retries += 1
 
236
 
 
237
    def handle_SV_SHARE_CHANGED(self, message, share):
 
238
        """ handle SV_SHARE_CHANGED event """
 
239
        if message == 'changed':
 
240
            if share.share_id not in self.shares:
 
241
                self.log.debug("New share notification, share_id: %s",
 
242
                         share.share_id)
 
243
                dir_name = self._build_dirname(share.share_name,
 
244
                                               share.from_visible_name)
 
245
                path = os.path.join(self.m.shares_dir, dir_name)
 
246
                share = Share.from_notify_holder(share, path)
 
247
                self.add_share(share)
 
248
            else:
 
249
                self.log.debug('share changed! %s', share.share_id)
 
250
                self.share_changed(share)
 
251
        elif message == 'deleted':
 
252
            self.log.debug('share deleted! %s', share.share_id)
 
253
            self.share_deleted(share.share_id)
 
254
 
 
255
    def handle_AQ_CREATE_SHARE_OK(self, share_id, marker):
 
256
        """ handle AQ_CREATE_SHARE_OK event. """
 
257
        share = self.marker_share_map.get(marker)
 
258
        if share is None:
 
259
            self.m.action_q.list_shares()
 
260
        else:
 
261
            share.id = share_id
 
262
            self.add_shared(share)
 
263
            if marker in self.marker_share_map:
 
264
                del self.marker_share_map[marker]
 
265
 
 
266
    def handle_AQ_CREATE_SHARE_ERROR(self, marker, error):
 
267
        """ handle AQ_CREATE_SHARE_ERROR event. """
 
268
        if marker in self.marker_share_map:
 
269
            del self.marker_share_map[marker]
 
270
 
 
271
    def handle_SV_SHARE_ANSWERED(self, share_id, answer):
 
272
        """ handle SV_SHARE_ANSWERED event. """
 
273
        share = self.shared.get(share_id, None)
 
274
        if share is None:
 
275
            # oops, we got an answer for a share we don't have,
 
276
            # probably created from the web.
 
277
            # refresh the share list
 
278
            self.refresh_shares()
 
279
        else:
 
280
            share.accepted = True if answer == 'Yes' else False
 
281
            self.shared[share_id] = share
 
282
 
 
283
    def add_share(self, share):
 
284
        """ Add a share to the share list, and creates the fs mdobj. """
 
285
        self.log.info('Adding new share with id: %s - path: %r',
 
286
                      share.id, share.path)
 
287
        if share.id in self.shares:
 
288
            del self.shares[share.id]
 
289
        self.shares[share.id] = share
 
290
        if share.accepted:
 
291
            self._create_fsm_object(share)
 
292
            self._create_share_dir(share)
 
293
            self.m.action_q.query([(share.id, str(share.subtree), "")])
 
294
 
 
295
    def accept_share(self, share_id, answer):
 
296
        """ Calls AQ.accept_share with answer ('Yes'/'No')."""
 
297
        self.log.debug("Accept share, with id: %s - answer: %s ",
 
298
                       share_id, answer)
 
299
        share = self.shares[share_id]
 
300
        share.accepted = answer
 
301
        self.shares[share_id] =  share
 
302
        answer_str = "Yes" if answer else "No"
 
303
        d = self.m.action_q.answer_share(share_id, answer_str)
 
304
        def answer_ok(result):
 
305
            """ create the share, fsm object, and request a query. """
 
306
            if answer:
 
307
                self._create_fsm_object(share)
 
308
                self._create_share_dir(share)
 
309
                self.m.action_q.query([(share.id, str(share.subtree), "")])
 
310
        d.addCallback(answer_ok)
 
311
        return d
 
312
 
 
313
    def share_deleted(self, share_id):
 
314
        """ process the share deleted event. """
 
315
        self.log.debug("Share (id: %s) deleted. ", share_id)
 
316
        share = self.shares.get(share_id, None)
 
317
        if share is None:
 
318
            # we don't have this share, ignore it
 
319
            self.log.warning("Got a share deleted notification (%r), "
 
320
                             "but don't have the share", share_id)
 
321
        else:
 
322
            self._delete_fsm_object(share)
 
323
            del self.shares[share_id]
 
324
 
 
325
    def share_changed(self, share_holder):
 
326
        """ process the share changed event """
 
327
        share = self.shares.get(share_holder.share_id, None)
 
328
        if share is None:
 
329
            # we don't have this share, ignore it
 
330
            self.log.warning("Got a share changed notification (%r), "
 
331
                             "but don't have the share", share_holder.share_id)
 
332
        else:
 
333
            share.access_level = share_holder.access_level
 
334
            self.shares[share_holder.share_id] = share
 
335
 
 
336
    def _create_share_dir(self, share):
 
337
        """ Creates the share root dir, and set the permissions. """
 
338
        # XXX: verterok: This is going to be moved into fsm
 
339
        # if the share don't exists, create it
 
340
        if not os.path.exists(share.path):
 
341
            with allow_writes(os.path.dirname(share.path)):
 
342
                os.mkdir(share.path)
 
343
            # add the watch after the mkdir
 
344
            if share.can_write():
 
345
                self.log.debug('adding inotify watch to: %s', share.path)
 
346
                self.m.event_q.inotify_add_watch(share.path)
 
347
        # if it's a ro share, change the perms
 
348
        if not share.can_write():
 
349
            os.chmod(share.path, 0555)
 
350
 
 
351
    def _create_fsm_object(self, share):
 
352
        """ Creates the mdobj for this share in fs manager. """
 
353
        try:
 
354
            self.m.fs.get_by_path(share.path)
 
355
        except KeyError:
 
356
            self.m.fs.create(path=share.path, share_id=share.id, is_dir=True)
 
357
            self.m.fs.set_node_id(share.path, share.subtree)
 
358
            self.m.fs.set_by_path(path=share.path, **dict(local_hash=None,
 
359
                                                          server_hash=None,))
 
360
 
 
361
    def _delete_fsm_object(self, share):
 
362
        """ Deletes the share and it files/folders metadata from fsm. """
 
363
        #XXX: partially implemented, this should be moved into fsm?.
 
364
        # should delete all the files in the share?
 
365
        if share.can_write():
 
366
            try:
 
367
                self.m.event_q.inotify_rm_watch(share.path)
 
368
            except (ValueError, RuntimeError, TypeError), e:
 
369
                # pyinotify has an ugly error management, if we can call
 
370
                # it that, :(. We handle this here because it's possible
 
371
                # and correct that the path is not there anymore
 
372
                self.log.warning("Error %s when trying to remove the watch"
 
373
                                 " on %r", e, share.path)
 
374
        # delete all the metadata but dont touch the files/folders
 
375
        for path, is_dir in self.m.fs.get_paths_starting_with(share.path):
 
376
            self.m.fs.delete_metadata(path)
 
377
 
 
378
    def create_share(self, path, username, name, access_level):
 
379
        """ create a share for the specified path, username, name """
 
380
        self.log.debug('create share(%r, %s, %s, %s)',
 
381
                       path, username, name, access_level)
 
382
        mdobj = self.m.fs.get_by_path(path)
 
383
        mdid = self.m.fs.get_by_node_id(mdobj.share_id, mdobj.node_id)
 
384
        marker = MDMarker(mdid)
 
385
        share = Share(self.m.fs.get_abspath("", mdobj.path), share_id=marker,
 
386
                      name=name, access_level=access_level,
 
387
                      other_username=username, other_visible_name=None,
 
388
                      subtree=mdobj.node_id)
 
389
        self.marker_share_map[marker] = share
 
390
        self.m.action_q.create_share(mdobj.node_id, username, name,
 
391
                                     access_level, marker)
 
392
 
 
393
    def add_shared(self, share):
 
394
        """ Add a share with direction == from_me """
 
395
        self.log.info('New shared subtree: id: %s - path: %r',
 
396
                      share.id, share.path)
 
397
        current_share = self.shared.get(share.id)
 
398
        if current_share is None:
 
399
            self.shared[share.id] = share
 
400
        else:
 
401
            for k in share.__dict__:
 
402
                setattr(current_share, k, getattr(share, k))
 
403
            self.shared[share.id] = current_share
 
404
 
 
405
    def upgrade_shelf_layout(self, data_dir, shares_dir):
 
406
        """ Upgrade the shelf layout"""
 
407
        self.log.debug('Upgrading the share shelf layout')
 
408
        # the shelf already exists, and don't have a .version file
 
409
        # first backup the old data
 
410
        backup = os.path.join(data_dir, '0.bkp')
 
411
        if not os.path.exists(backup):
 
412
            os.makedirs(backup)
 
413
        for dirname, dirs, files in os.walk(data_dir):
 
414
            if dirname == data_dir:
 
415
                for dir in dirs:
 
416
                    if dir != os.path.basename(backup):
 
417
                        shutil.move(os.path.join(dirname, dir),
 
418
                                    os.path.join(backup, dir))
 
419
        # regenerate the shelf using the new layout using the backup as src
 
420
        old_shelf = ShareFileShelf(backup)
 
421
        os.makedirs(shares_dir)
 
422
        new_shelf = ShareFileShelf(shares_dir)
 
423
        for key in old_shelf.keys():
 
424
            new_shelf[key] = old_shelf[key]
 
425
 
 
426
 
 
427
@contextmanager
 
428
def allow_writes(path):
 
429
    """ a very simple context manager to allow writting in RO dirs. """
 
430
    prev_mod = stat.S_IMODE(os.stat(path).st_mode)
 
431
    os.chmod(path, 0755)
 
432
    yield
 
433
    os.chmod(path, prev_mod)
 
434
 
 
435
 
 
436
class ShareFileShelf(file_shelf.FileShelf):
 
437
    """ Custom file shelf that allow '' as key, it's replaced by the string:
 
438
    root_node_id.
 
439
    """
 
440
 
 
441
    def __init__(self, *args, **kwargs):
 
442
        """ Create the instance. """
 
443
        super(ShareFileShelf, self).__init__(*args, **kwargs)
 
444
        self.key = 'root_node_id'
 
445
 
 
446
    def key_file(self, key):
 
447
        """ override default key_file, to handle key == ''"""
 
448
        if key == '':
 
449
            key = self.key
 
450
        return super(ShareFileShelf, self).key_file(key)
 
451
 
 
452
    def keys(self):
 
453
        """ override default keys, to handle key == ''"""
 
454
        for key in super(ShareFileShelf, self).keys():
 
455
            if key == self.key:
 
456
                yield ''
 
457
            else:
 
458
                yield key
 
459