~ubuntu-branches/ubuntu/oneiric/ubuntuone-client/oneiric

« back to all changes in this revision

Viewing changes to ubuntuone/syncdaemon/volume_manager.py

  • Committer: Bazaar Package Importer
  • Author(s): Rodrigo Moya
  • Date: 2010-08-04 10:08:20 UTC
  • mto: This revision was merged to the branch mainline in revision 41.
  • Revision ID: james.westby@ubuntu.com-20100804100820-m88dfedh2sa3hi3v
Tags: upstream-1.3.6
ImportĀ upstreamĀ versionĀ 1.3.6

Show diffs side-by-side

added added

removed removed

Lines of Context:
110
110
                  self.node_id == other.node_id)
111
111
        return result
112
112
 
 
113
    def __repr__(self):
 
114
        return "<Volume id %r, node_id %r, generation %r>" % (self.volume_id,
 
115
                                                              self.node_id,
 
116
                                                              self.generation)
113
117
 
114
118
class Share(Volume):
115
119
    """A volume representing a Share."""
251
255
        self.subscribed = subscribed
252
256
 
253
257
    def __repr__(self):
254
 
        return "<UDF id %r, real path %r>" % (self.id, self.path)
 
258
        return "<UDF id %r, generation %r, real path %r>" % (self.id, self.generation, self.path)
255
259
 
256
260
    @property
257
261
    def ancestors(self):
393
397
            self.m.fs.create(path=root.path,
394
398
                             share_id=root.volume_id, is_dir=True)
395
399
 
396
 
    def handle_SYS_ROOT_RECEIVED(self, root_id):
397
 
        """Got the root, map it to the  root share."""
398
 
        self.log.debug('handle_SYS_ROOT_RECEIVED(%s)', root_id)
399
 
        self._got_root(root_id)
400
 
        self.m.action_q.inquire_account_info()
401
 
        self.m.action_q.inquire_free_space(request.ROOT)
402
 
        self.refresh_volumes()
403
 
        self.refresh_shares()
404
 
 
405
400
    def _got_root(self, node_id, free_bytes=None):
406
401
        """Set the root node_id to the root share and mdobj."""
407
402
        # only set the root if we don't have it
408
403
        root = self.shares[request.ROOT]
409
404
        if free_bytes is not None:
410
405
            root.free_bytes = free_bytes
411
 
        if not root.node_id:
412
 
            mdobj = self.m.fs.get_by_path(root.path)
413
 
            if getattr(mdobj, 'node_id', None) is None:
414
 
                self.m.fs.set_node_id(root.path, node_id)
 
406
        mdobj = self.m.fs.get_by_path(root.path)
 
407
        if not (root.node_id and mdobj.node_id):
 
408
            self.m.fs.set_node_id(root.path, node_id)
415
409
            root.node_id = node_id
416
410
            self.shares[request.ROOT] = root
 
411
            self.m.event_q.push('SYS_ROOT_RECEIVED', root.node_id, mdobj.mdid)
417
412
        elif root.node_id != node_id:
418
413
            self.m.event_q.push('SYS_ROOT_MISMATCH', root.node_id, node_id)
 
414
        else:
 
415
            # the root node_id match and we already have it
 
416
            self.m.event_q.push('SYS_ROOT_RECEIVED', root.node_id, mdobj.mdid)
419
417
 
420
418
    def refresh_shares(self):
421
419
        """Request the list of shares to the server."""
439
437
                udfs.append(vol.volume_id)
440
438
        self._cleanup_volumes(shares=shares, udfs=udfs)
441
439
 
 
440
    def handle_SV_VOLUME_NEW_GENERATION(self, volume_id, generation):
 
441
        """Handle SV_VOLUME_NEW_GENERATION."""
 
442
        self.log.debug('handle_SV_VOLUME_NEW_GENERATION(%r, %r)',
 
443
                       volume_id, generation)
 
444
        volume_id = str(volume_id) # be safe and use a str
 
445
        try:
 
446
            volume = self.get_volume(volume_id)
 
447
        except VolumeDoesNotExist:
 
448
            self.log.warning('Got a SV_VOLUME_NEW_GENERATION for a missing '
 
449
                             'volume: %r with %r', volume_id, generation)
 
450
        else:
 
451
            current_gen = volume.generation
 
452
            if current_gen is None:
 
453
                self.m.action_q.rescan_from_scratch(volume_id)
 
454
            elif current_gen < generation:
 
455
                # XXX: check if we want to impose a hard limit in the size of
 
456
                # the delta and do a rescan from scratch if it's too big
 
457
                self.m.action_q.get_delta(volume_id, current_gen)
 
458
                # TODO/XXX: should we (VM) handle AQ_DELTA_ERROR?
 
459
            elif current_gen >= generation:
 
460
                self.log.info('Got SV_VOLUME_NEW_GENERATION(%r, %r) but volume'
 
461
                              ' is at generation: %r',
 
462
                              volume_id, generation, current_gen)
 
463
 
 
464
    def handle_AQ_DELTA_NOT_POSSIBLE(self, volume_id):
 
465
        """Handle AQ_DELTA_NOT_POSSIBLE."""
 
466
        self.log.debug('handle_AQ_DELTA_NOT_POSSIBLE(%r)', volume_id)
 
467
        volume_id = str(volume_id)
 
468
        try:
 
469
            volume = self.get_volume(volume_id)
 
470
        except VolumeDoesNotExist:
 
471
            self.log.warning('Got a AQ_DELTA_NOT_POSSIBLE for a missing '
 
472
                             'volume: %r', volume_id)
 
473
        else:
 
474
            self.log.info('Requesting a rescan from scratch for: %s', volume)
 
475
            self.m.action_q.rescan_from_scratch(volume_id)
 
476
 
442
477
    def server_rescan(self):
443
478
        """Do the 'server rescan'"""
444
479
        d = self.m.action_q.query_volumes()
453
488
 
454
489
    def _volumes_rescan_cb(self, volumes):
455
490
        """Handle the volumes list for server rescan"""
456
 
        self.log.debug('handling volumes rescan')
 
491
        self.log.debug('handling volumes rescan: %r', volumes)
457
492
        events = []
458
493
        for new_volume in volumes:
459
494
            # TODO: all this block might need to be inside a try:except
465
500
                # A new volume!
466
501
                self.log.debug('New volume! id: %r', volume_id)
467
502
                volume = self._handle_new_volume(new_volume)
468
 
                # we don't have the volume, use the initial generation
469
 
                current_generation = 0
470
 
            else:
471
 
                current_generation = volume.generation or 0 # None case
 
503
                # if this new volume is a UDF, it will be skipped as
 
504
                # isn't active until the local rescan finish.
 
505
                # It's safe to skip it as a rescan_from_scratch is
 
506
                # executed after the local rescan.
 
507
            finally:
 
508
                current_generation = volume.generation
 
509
 
 
510
            # first check if it's active
 
511
            if not volume.active:
 
512
                self.log.info('Skipping inactive volume: %r', volume)
 
513
                continue
 
514
            # handle root volume, to set root node_id
 
515
            if isinstance(new_volume, RootVolume):
 
516
                self._got_root(str(new_volume.node_id),
 
517
                               free_bytes=volume.free_bytes or None)
 
518
 
472
519
            new_generation = new_volume.generation
473
 
            # update the free_bytes on the volume
474
 
            self.update_free_space(volume_id, new_volume.free_bytes)
475
 
            if current_generation < new_generation:
 
520
            self.log.debug('New generation %s of %s', volume,
 
521
                           new_generation)
 
522
            if current_generation is None or \
 
523
               current_generation < new_generation:
476
524
                # add the event
477
525
                events.append(('SV_VOLUME_NEW_GENERATION',
478
526
                               dict(volume_id=volume_id,
668
716
            self.shared[share_id] = share
669
717
 
670
718
    def handle_AQ_ANSWER_SHARE_OK(self, share_id, answer):
671
 
        """ Handle successfully accepting a share """
 
719
        """Handle successfully accepting a share."""
672
720
        if answer == 'Yes':
673
721
            share = self.shares[share_id]
 
722
            share.accepted = True
 
723
            self.shares[share.volume_id] = share
674
724
            self._create_fsm_object(share.path, share.volume_id, share.node_id)
675
725
            self._create_share_dir(share)
676
 
            self.m.action_q.query([(share.volume_id, str(share.node_id), "")])
677
 
            self.m.action_q.inquire_free_space(share.volume_id)
 
726
            self.m.action_q.rescan_from_scratch(share.volume_id)
678
727
 
679
728
    def add_share(self, a_share):
680
729
        """ Add a share to the share list, and creates the fs mdobj. """
681
730
        self.log.info('Adding new share with id: %s - path: %r',
682
731
                      a_share.volume_id, a_share.path)
683
 
        # if the share is there, do nothing and return it
684
732
        share = self.shares.get(a_share.volume_id)
685
733
        is_new_share = share is None
686
734
        if share is not None:
687
735
            share.accepted = a_share.accepted
688
736
            self.shares[share.volume_id] = share
689
737
        else:
690
 
            share = self.shares[share.volume_id] = a_share
 
738
            share = a_share
 
739
        if is_new_share or not share.accepted:
 
740
            # if it's a new share or isn't accepted set the generation to None
 
741
            # to force a rescan
 
742
            share.generation = None
 
743
        # store the share
 
744
        self.shares[share.volume_id] = share
691
745
        if share.accepted:
692
746
            self._create_fsm_object(share.path, share.volume_id, share.node_id)
693
747
            self._create_share_dir(share)
694
 
            self.m.action_q.query([(share.volume_id, str(share.node_id), "")])
695
 
            self.m.action_q.inquire_free_space(share.volume_id)
696
748
        if is_new_share:
697
749
            # push the event only if it's a new share
698
750
            self.m.event_q.push('VM_SHARE_CREATED', share.volume_id)
711
763
            self.log.warning("Update of free space requested, but there is "
712
764
                             "no such volume_id: %s", volume_id)
713
765
 
 
766
    # same functionality, but other name to be called by EventQueue
 
767
    handle_SYS_QUOTA_EXCEEDED = update_free_space
 
768
 
714
769
    def get_free_space(self, volume_id):
715
770
        """Return the free_space for volume_id.
716
771
 
852
907
        """Add the udf to the VM metadata if isn't there.
853
908
 
854
909
        If it's a new udf, create the directory, hook inotify
855
 
        and execute a query.
 
910
        and request the full delta.
856
911
 
857
912
        """
858
913
        self.log.debug('add_udf: %s', udf)
859
914
        if self.udfs.get(udf.volume_id, None) is None:
860
915
            self.log.debug('udf not in metadata, adding it!')
 
916
            # if it's a new UDF set the generation to None to force a
 
917
            # rescan
 
918
            udf.generation = None
861
919
            self.udfs[udf.volume_id] = udf
862
920
            self._create_fsm_object(udf.path, udf.volume_id, udf.node_id)
863
921
            # local and server rescan, this will add the inotify hooks
1063
1121
        mdobj = self.m.fs.get_by_path(udf.path)
1064
1122
        d = self.m.lr.scan_dir(mdobj.mdid, udf.path, udfmode=True)
1065
1123
        def server_rescan(_):
1066
 
            """Do a query over all known nodes."""
1067
 
            data = self.m.fs.get_for_server_rescan_by_path(udf.path)
1068
 
            self.m.action_q.query(data)
1069
 
            self.m.action_q.inquire_free_space(request.ROOT)
 
1124
            """Request the delta from the last known generation."""
 
1125
            self.m.action_q.rescan_from_scratch(udf.volume_id)
1070
1126
        d.addCallback(server_rescan)
1071
1127
        return d
1072
1128
 
1140
1196
 
1141
1197
    def update_generation(self, volume_id, generation):
1142
1198
        """Update the generation of the specified volume."""
 
1199
        self.log.debug('update_generation: %r, %r', volume_id, generation)
1143
1200
        vol = self.get_volume(volume_id)
1144
1201
        vol.generation = generation
1145
1202
        if isinstance(vol, (Share, Root)):