110
110
self.node_id == other.node_id)
114
return "<Volume id %r, node_id %r, generation %r>" % (self.volume_id,
114
118
class Share(Volume):
115
119
"""A volume representing a Share."""
251
255
self.subscribed = subscribed
253
257
def __repr__(self):
254
return "<UDF id %r, real path %r>" % (self.id, self.path)
258
return "<UDF id %r, generation %r, real path %r>" % (self.id, self.generation, self.path)
257
261
def ancestors(self):
393
397
self.m.fs.create(path=root.path,
394
398
share_id=root.volume_id, is_dir=True)
396
def handle_SYS_ROOT_RECEIVED(self, root_id):
397
"""Got the root, map it to the root share."""
398
self.log.debug('handle_SYS_ROOT_RECEIVED(%s)', root_id)
399
self._got_root(root_id)
400
self.m.action_q.inquire_account_info()
401
self.m.action_q.inquire_free_space(request.ROOT)
402
self.refresh_volumes()
403
self.refresh_shares()
405
400
def _got_root(self, node_id, free_bytes=None):
406
401
"""Set the root node_id to the root share and mdobj."""
407
402
# only set the root if we don't have it
408
403
root = self.shares[request.ROOT]
409
404
if free_bytes is not None:
410
405
root.free_bytes = free_bytes
412
mdobj = self.m.fs.get_by_path(root.path)
413
if getattr(mdobj, 'node_id', None) is None:
414
self.m.fs.set_node_id(root.path, node_id)
406
mdobj = self.m.fs.get_by_path(root.path)
407
if not (root.node_id and mdobj.node_id):
408
self.m.fs.set_node_id(root.path, node_id)
415
409
root.node_id = node_id
416
410
self.shares[request.ROOT] = root
411
self.m.event_q.push('SYS_ROOT_RECEIVED', root.node_id, mdobj.mdid)
417
412
elif root.node_id != node_id:
418
413
self.m.event_q.push('SYS_ROOT_MISMATCH', root.node_id, node_id)
415
# the root node_id match and we already have it
416
self.m.event_q.push('SYS_ROOT_RECEIVED', root.node_id, mdobj.mdid)
420
418
def refresh_shares(self):
421
419
"""Request the list of shares to the server."""
439
437
udfs.append(vol.volume_id)
440
438
self._cleanup_volumes(shares=shares, udfs=udfs)
440
def handle_SV_VOLUME_NEW_GENERATION(self, volume_id, generation):
441
"""Handle SV_VOLUME_NEW_GENERATION."""
442
self.log.debug('handle_SV_VOLUME_NEW_GENERATION(%r, %r)',
443
volume_id, generation)
444
volume_id = str(volume_id) # be safe and use a str
446
volume = self.get_volume(volume_id)
447
except VolumeDoesNotExist:
448
self.log.warning('Got a SV_VOLUME_NEW_GENERATION for a missing '
449
'volume: %r with %r', volume_id, generation)
451
current_gen = volume.generation
452
if current_gen is None:
453
self.m.action_q.rescan_from_scratch(volume_id)
454
elif current_gen < generation:
455
# XXX: check if we want to impose a hard limit in the size of
456
# the delta and do a rescan from scratch if it's too big
457
self.m.action_q.get_delta(volume_id, current_gen)
458
# TODO/XXX: should we (VM) handle AQ_DELTA_ERROR?
459
elif current_gen >= generation:
460
self.log.info('Got SV_VOLUME_NEW_GENERATION(%r, %r) but volume'
461
' is at generation: %r',
462
volume_id, generation, current_gen)
464
def handle_AQ_DELTA_NOT_POSSIBLE(self, volume_id):
465
"""Handle AQ_DELTA_NOT_POSSIBLE."""
466
self.log.debug('handle_AQ_DELTA_NOT_POSSIBLE(%r)', volume_id)
467
volume_id = str(volume_id)
469
volume = self.get_volume(volume_id)
470
except VolumeDoesNotExist:
471
self.log.warning('Got a AQ_DELTA_NOT_POSSIBLE for a missing '
472
'volume: %r', volume_id)
474
self.log.info('Requesting a rescan from scratch for: %s', volume)
475
self.m.action_q.rescan_from_scratch(volume_id)
442
477
def server_rescan(self):
443
478
"""Do the 'server rescan'"""
444
479
d = self.m.action_q.query_volumes()
454
489
def _volumes_rescan_cb(self, volumes):
455
490
"""Handle the volumes list for server rescan"""
456
self.log.debug('handling volumes rescan')
491
self.log.debug('handling volumes rescan: %r', volumes)
458
493
for new_volume in volumes:
459
494
# TODO: all this block might need to be inside a try:except
466
501
self.log.debug('New volume! id: %r', volume_id)
467
502
volume = self._handle_new_volume(new_volume)
468
# we don't have the volume, use the initial generation
469
current_generation = 0
471
current_generation = volume.generation or 0 # None case
503
# if this new volume is a UDF, it will be skipped as
504
# isn't active until the local rescan finish.
505
# It's safe to skip it as a rescan_from_scratch is
506
# executed after the local rescan.
508
current_generation = volume.generation
510
# first check if it's active
511
if not volume.active:
512
self.log.info('Skipping inactive volume: %r', volume)
514
# handle root volume, to set root node_id
515
if isinstance(new_volume, RootVolume):
516
self._got_root(str(new_volume.node_id),
517
free_bytes=volume.free_bytes or None)
472
519
new_generation = new_volume.generation
473
# update the free_bytes on the volume
474
self.update_free_space(volume_id, new_volume.free_bytes)
475
if current_generation < new_generation:
520
self.log.debug('New generation %s of %s', volume,
522
if current_generation is None or \
523
current_generation < new_generation:
477
525
events.append(('SV_VOLUME_NEW_GENERATION',
478
526
dict(volume_id=volume_id,
668
716
self.shared[share_id] = share
670
718
def handle_AQ_ANSWER_SHARE_OK(self, share_id, answer):
671
""" Handle successfully accepting a share """
719
"""Handle successfully accepting a share."""
672
720
if answer == 'Yes':
673
721
share = self.shares[share_id]
722
share.accepted = True
723
self.shares[share.volume_id] = share
674
724
self._create_fsm_object(share.path, share.volume_id, share.node_id)
675
725
self._create_share_dir(share)
676
self.m.action_q.query([(share.volume_id, str(share.node_id), "")])
677
self.m.action_q.inquire_free_space(share.volume_id)
726
self.m.action_q.rescan_from_scratch(share.volume_id)
679
728
def add_share(self, a_share):
680
729
""" Add a share to the share list, and creates the fs mdobj. """
681
730
self.log.info('Adding new share with id: %s - path: %r',
682
731
a_share.volume_id, a_share.path)
683
# if the share is there, do nothing and return it
684
732
share = self.shares.get(a_share.volume_id)
685
733
is_new_share = share is None
686
734
if share is not None:
687
735
share.accepted = a_share.accepted
688
736
self.shares[share.volume_id] = share
690
share = self.shares[share.volume_id] = a_share
739
if is_new_share or not share.accepted:
740
# if it's a new share or isn't accepted set the generation to None
742
share.generation = None
744
self.shares[share.volume_id] = share
691
745
if share.accepted:
692
746
self._create_fsm_object(share.path, share.volume_id, share.node_id)
693
747
self._create_share_dir(share)
694
self.m.action_q.query([(share.volume_id, str(share.node_id), "")])
695
self.m.action_q.inquire_free_space(share.volume_id)
697
749
# push the event only if it's a new share
698
750
self.m.event_q.push('VM_SHARE_CREATED', share.volume_id)
711
763
self.log.warning("Update of free space requested, but there is "
712
764
"no such volume_id: %s", volume_id)
766
# same functionality, but other name to be called by EventQueue
767
handle_SYS_QUOTA_EXCEEDED = update_free_space
714
769
def get_free_space(self, volume_id):
715
770
"""Return the free_space for volume_id.
852
907
"""Add the udf to the VM metadata if isn't there.
854
909
If it's a new udf, create the directory, hook inotify
910
and request the full delta.
858
913
self.log.debug('add_udf: %s', udf)
859
914
if self.udfs.get(udf.volume_id, None) is None:
860
915
self.log.debug('udf not in metadata, adding it!')
916
# if it's a new UDF set the generation to None to force a
918
udf.generation = None
861
919
self.udfs[udf.volume_id] = udf
862
920
self._create_fsm_object(udf.path, udf.volume_id, udf.node_id)
863
921
# local and server rescan, this will add the inotify hooks
1063
1121
mdobj = self.m.fs.get_by_path(udf.path)
1064
1122
d = self.m.lr.scan_dir(mdobj.mdid, udf.path, udfmode=True)
1065
1123
def server_rescan(_):
1066
"""Do a query over all known nodes."""
1067
data = self.m.fs.get_for_server_rescan_by_path(udf.path)
1068
self.m.action_q.query(data)
1069
self.m.action_q.inquire_free_space(request.ROOT)
1124
"""Request the delta from the last known generation."""
1125
self.m.action_q.rescan_from_scratch(udf.volume_id)
1070
1126
d.addCallback(server_rescan)
1141
1197
def update_generation(self, volume_id, generation):
1142
1198
"""Update the generation of the specified volume."""
1199
self.log.debug('update_generation: %r, %r', volume_id, generation)
1143
1200
vol = self.get_volume(volume_id)
1144
1201
vol.generation = generation
1145
1202
if isinstance(vol, (Share, Root)):