1
# canonical.ubuntuone.storage.syncdaemon.volume_manager - manages volumes
3
# Author: Guillermo Gonzalez <guillermo.gonzalez@canonical.com>
5
# Copyright 2009 Canonical Ltd.
7
# This program is free software: you can redistribute it and/or modify it
8
# under the terms of the GNU General Public License version 3, as published
9
# by the Free Software Foundation.
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranties of
13
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
14
# PURPOSE. See the GNU General Public License for more details.
16
# You should have received a copy of the GNU General Public License along
17
# with this program. If not, see <http://www.gnu.org/licenses/>.
18
""" The all mighty Volume Manager """
19
from __future__ import with_statement
25
from contextlib import contextmanager
27
from canonical.ubuntuone.storage.syncdaemon.marker import MDMarker
28
from canonical.ubuntuone.storage.syncdaemon import file_shelf
32
"""Representas a share or mount point"""
34
def __init__(self, path, share_id='', name=None, access_level='View',
35
accepted=False, other_username=None, other_visible_name=None,
37
""" Creates the instance.
39
The received path should be 'bytes'
44
self.path = os.path.normpath(path)
45
self.id = str(share_id)
46
self.access_level = access_level
47
self.accepted = accepted
49
self.other_username = other_username
50
self.other_visible_name = other_visible_name
51
self.subtree = subtree
54
def from_response(cls, share_response, path):
55
""" Creates a Share instance from a ShareResponse.
57
The received path should be 'bytes'
59
share = cls(path, str(share_response.id), share_response.name,
60
share_response.access_level, share_response.accepted,
61
share_response.other_username,
62
share_response.other_visible_name, share_response.subtree)
66
def from_notify_holder(cls, share_notify, path):
67
""" Creates a Share instance from a NotifyShareHolder.
69
The received path should be 'bytes'
71
share = cls(path, share_id=str(share_notify.share_id),
72
name=share_notify.share_name,
73
access_level=share_notify.access_level,
74
other_username=share_notify.from_username,
75
other_visible_name=share_notify.from_visible_name,
76
subtree=share_notify.subtree)
80
""" check the access_level of this share,
81
returns True if it's 'Modify'.
83
return self.access_level == 'Modify'
86
class VolumeManager(object):
87
"""Manages shares and mount points."""
91
def __init__(self, main):
92
"""Create the instance and populate the shares/d attributes
93
from the metadata (if it exists).
95
self.log = logging.getLogger('ubuntuone.SyncDaemon.VM')
97
data_dir = os.path.join(self.m.data_dir, 'vm')
98
version_file = os.path.join(data_dir, '.version')
99
shares_dir = os.path.join(data_dir, 'shares')
100
shared_dir = os.path.join(data_dir, 'shared')
101
if os.path.exists(data_dir) and not os.path.exists(version_file) \
102
and not os.path.exists(shares_dir):
103
self.upgrade_shelf_layout(data_dir, shares_dir)
104
# write down the version file if it don't exists
105
if not os.path.exists(version_file):
106
if not os.path.exists(os.path.dirname(version_file)):
107
os.makedirs(os.path.dirname(version_file))
108
with open(version_file, 'w') as fd:
109
fd.write(VolumeManager.CACHE_VERSION)
110
if not os.path.exists(shares_dir):
111
os.makedirs(shares_dir)
112
if not os.path.exists(shared_dir):
113
os.makedirs(shared_dir)
114
self.shares = ShareFileShelf(shares_dir)
115
self.shared = ShareFileShelf(shared_dir)
116
if self.shares.get('') is None:
117
self.root = Share(self.m.root_dir)
119
self.root = self.shares['']
120
self.root.access_level = 'Modify'
121
self.root.path = self.m.root_dir
122
self.shares[''] = self.root
123
with allow_writes(os.path.dirname(self.m.shares_dir)):
124
if not os.path.exists(self.m.shares_dir):
125
os.makedirs(self.m.shares_dir)
127
os.chmod(self.m.shares_dir, 0555)
128
self.marker_share_map = {}
129
self.list_shares_retries = 0
130
self.retries_limit = 5
133
""" Creates the root mdid. """
134
self._create_share_dir(self.root)
136
mdobj = self.m.fs.get_by_path(self.root.path)
138
mdobj = self.m.fs.create(path=self.root.path,
139
share_id='', is_dir=True)
140
self.m.fs.set_by_path(path=self.root.path,
141
local_hash=None, server_hash=None)
143
def on_server_root(self, root):
144
"""Asociate server root"""
145
self.log.debug('init_root(%s)', root)
146
mdobj = self.m.fs.get_by_path(self.root.path)
147
if getattr(mdobj, 'node_id', None) is None:
148
self.m.fs.set_node_id(self.root.path, root)
149
share = self.shares['']
150
self.root.subtree = share.subtree = root
151
self.shares[''] = share
152
self.refresh_shares()
155
def refresh_shares(self):
156
""" Reuqest the list of shares to the server. """
157
# request the list of shares
158
self.m.action_q.list_shares()
160
def handle_SYS_STATE_CHANGED(self, state):
162
The system changed state. If we don't know our root's uuid yet,
163
and the AQ is online, ask for it.
165
if state.is_connected and self.root.subtree is None:
167
mdobj = self.m.fs.get_by_path(self.root.path)
170
mdid = self.m.fs.create(path=self.root.path, share_id='',
172
self.m.fs.set_by_path(path=self.root.path,
173
local_hash=None, server_hash=None)
174
self.m.get_root(MDMarker(mdid))
176
def handle_AQ_SHARES_LIST(self, shares_list):
177
""" handle AQ_SHARES_LIST event """
178
for share in shares_list.shares:
179
share_id = getattr(share, 'id', getattr(share, 'share_id', None))
180
self.log.debug('new share %r: id=%s, name=%r',
181
share.direction, share_id, share.name)
183
self.list_shares_retries = 0
184
for a_share in shares_list.shares:
185
if a_share.direction == "to_me":
186
dir_name = self._build_dirname(a_share.name,
187
a_share.other_visible_name)
188
path = os.path.join(self.m.shares_dir, dir_name)
189
share = Share.from_response(a_share, path)
190
self.add_share(share)
191
elif a_share.direction == "from_me":
193
mdobj = self.m.fs.get_by_node_id("", a_share.subtree)
194
path = self.m.fs.get_abspath(mdobj.share_id, mdobj.path)
196
# we don't have the file/md of this shared subtree yet
197
# for the moment ignore this share
198
self.log.warning("we got a share with 'from_me' direction,"
199
" but don't have the node_id in the metadata yet")
201
share = Share.from_response(a_share, path)
202
self.add_shared(share)
204
def _build_dirname(self, share_name, visible_name):
205
'''Builds the root path using the share information.'''
206
dir_name = share_name + u' from ' + visible_name
208
# Unicode boundary! the name is Unicode in protocol and server,
209
# but here we use bytes for paths
210
dir_name = dir_name.encode("utf8")
214
def handle_AQ_LIST_SHARES_ERROR(self, error):
215
""" handle AQ_LIST_SHARES_ERROR event """
216
# just call list_shares again, until we reach the retry limit
217
if self.list_shares_retries <= self.retries_limit:
218
self.m.action_q.list_shares()
219
self.list_shares_retries += 1
221
def handle_SV_SHARE_CHANGED(self, message, share):
222
""" handle SV_SHARE_CHANGED event """
223
if message == 'changed':
224
if share.share_id not in self.shares:
225
self.log.debug("New share notification, share_id: %s",
227
dir_name = self._build_dirname(share.share_name,
228
share.from_visible_name)
229
path = os.path.join(self.m.shares_dir, dir_name)
230
share = Share.from_notify_holder(share, path)
231
self.add_share(share)
233
self.log.debug('share changed! %s', share.share_id)
234
self.share_changed(share)
235
elif message == 'deleted':
236
self.log.debug('share deleted! %s', share.share_id)
237
self.share_deleted(share.share_id)
239
def handle_AQ_CREATE_SHARE_OK(self, share_id, marker):
240
""" handle AQ_CREATE_SHARE_OK event. """
241
share = self.marker_share_map.get(marker)
243
self.m.action_q.list_shares()
246
self.add_shared(share)
247
if marker in self.marker_share_map:
248
del self.marker_share_map[marker]
250
def handle_AQ_CREATE_SHARE_ERROR(self, marker, error):
251
""" handle AQ_CREATE_SHARE_ERROR event. """
252
if marker in self.marker_share_map:
253
del self.marker_share_map[marker]
255
def handle_SV_SHARE_ANSWERED(self, share_id, answer):
256
""" handle SV_SHARE_ANSWERED event. """
257
share = self.shared.get(share_id, None)
259
# oops, we got an answer for a share we don't have,
260
# probably created from the web.
261
# refresh the share list
262
self.refresh_shares()
264
share.accepted = True if answer == 'Yes' else False
265
self.shared[share_id] = share
267
def add_share(self, share):
268
""" Add a share to the share list, and creates the fs mdobj. """
269
self.log.info('Adding new share with id: %s - path: %r',
270
share.id, share.path)
271
self.shares[share.id] = share
273
self._create_fsm_object(share)
274
self._create_share_dir(share)
275
self.m.action_q.query([(share.id, str(share.subtree), "")])
277
def accept_share(self, share_id, answer):
278
""" Calls AQ.accept_share with answer ('Yes'/'No')."""
279
self.log.debug("Accept share, with id: %s - answer: %s ",
281
share = self.shares[share_id]
282
share.accepted = answer
283
self.shares[share_id] = share
284
answer_str = "Yes" if answer else "No"
285
d = self.m.action_q.answer_share(share_id, answer_str)
286
def answer_ok(result):
287
""" create the share, fsm object, and request a query. """
289
self._create_fsm_object(share)
290
self._create_share_dir(share)
291
self.m.action_q.query([(share.id, str(share.subtree), "")])
292
d.addCallback(answer_ok)
295
def share_deleted(self, share_id):
296
""" process the share deleted event. """
297
# XXX: verterok: shares deletion is not fully implemented,
298
# the files aren't deleted from disk
299
self.log.debug("Share (id: %s) deleted. ", share_id)
300
share = self.shares.get(share_id, None)
302
# we don't have this share, ignore it
303
self.log.warning("Got a share deleted notification (%r), "
304
"but don't have the share", share_id)
306
self._delete_fsm_object(share)
307
del self.shares[share_id]
309
def share_changed(self, share_holder):
310
""" process the share changed event """
311
share = self.shares.get(share_holder.share_id, None)
313
# we don't have this share, ignore it
314
self.log.warning("Got a share changed notification (%r), "
315
"but don't have the share", share_holder.share_id)
317
share.access_level = share_holder.access_level
318
self.shares[share_holder.share_id] = share
320
def _create_share_dir(self, share):
321
""" Creates the share root dir, and set the permissions. """
322
# XXX: verterok: This is going to be moved into fsm
323
# if the share don't exists, create it
324
if not os.path.exists(share.path):
325
with allow_writes(os.path.dirname(share.path)):
327
# add the watch after the mkdir
328
if share.can_write():
329
self.log.debug('adding inotify watch to: %s', share.path)
330
self.m.event_q.inotify_add_watch(share.path)
331
# if it's a ro share, change the perms
332
if not share.can_write():
333
os.chmod(share.path, 0555)
335
def _create_fsm_object(self, share):
336
""" Creates the mdobj for this share in fs manager. """
338
self.m.fs.get_by_path(share.path)
340
self.m.fs.create(path=share.path, share_id=share.id, is_dir=True)
341
self.m.fs.set_node_id(share.path, share.subtree)
342
self.m.fs.set_by_path(path=share.path, **dict(local_hash=None,
345
def _delete_fsm_object(self, share):
346
""" Deletes the share from fsm. """
347
#XXX: not implemented yet, this ;ll delegate most of the work in fsm
350
def create_share(self, path, username, name, access_level):
351
""" create a share for the specified path, username, name """
352
self.log.debug('create share(%r, %s, %s, %s)',
353
path, username, name, access_level)
354
mdobj = self.m.fs.get_by_path(path)
355
mdid = self.m.fs.get_by_node_id(mdobj.share_id, mdobj.node_id)
356
marker = MDMarker(mdid)
357
share = Share(self.m.fs.get_abspath("", mdobj.path), share_id=marker,
358
name=name, access_level=access_level,
359
other_username=username, other_visible_name=None,
360
subtree=mdobj.node_id)
361
self.marker_share_map[marker] = share
362
self.m.action_q.create_share(mdobj.node_id, username, name,
363
access_level, marker)
365
def add_shared(self, share):
366
""" Add a share with direction == from_me """
367
self.log.info('New shared subtree: id: %s - path: %r',
368
share.id, share.path)
369
current_share = self.shared.get(share.id)
370
if current_share is None:
371
self.shared[share.id] = share
373
for k in share.__dict__:
374
setattr(current_share, k, getattr(share, k))
375
self.shared[share.id] = current_share
377
def upgrade_shelf_layout(self, data_dir, shares_dir):
378
""" Upgrade the shelf layout"""
379
self.log.debug('Upgrading the share shelf layout')
380
# the shelf already exists, and don't have a .version file
381
# first backup the old data
382
backup = os.path.join(data_dir, '0.bkp')
383
if not os.path.exists(backup):
385
for dirname, dirs, files in os.walk(data_dir):
386
if dirname == data_dir:
388
if dir != os.path.basename(backup):
389
shutil.move(os.path.join(dirname, dir),
390
os.path.join(backup, dir))
391
# regenerate the shelf using the new layout using the backup as src
392
old_shelf = ShareFileShelf(backup)
393
os.makedirs(shares_dir)
394
new_shelf = ShareFileShelf(shares_dir)
395
for key in old_shelf.keys():
396
new_shelf[key] = old_shelf[key]
400
def allow_writes(path):
401
""" a very simple context manager to allow writting in RO dirs. """
402
prev_mod = stat.S_IMODE(os.stat(path).st_mode)
405
os.chmod(path, prev_mod)
408
class ShareFileShelf(file_shelf.FileShelf):
409
""" Custom file shelf that allow '' as key, it's replaced by the string:
413
def __init__(self, *args, **kwargs):
414
""" Create the instance. """
415
super(ShareFileShelf, self).__init__(*args, **kwargs)
416
self.key = 'root_node_id'
418
def key_file(self, key):
419
""" override default key_file, to handle key == ''"""
422
return super(ShareFileShelf, self).key_file(key)
425
""" override default keys, to handle key == ''"""
426
for key in super(ShareFileShelf, self).keys():