~nataliabidart/ubuntuone-client/stable-3-0-update-2.99.92

« back to all changes in this revision

Viewing changes to ubuntuone/syncdaemon/action_queue.py

  • Committer: Rodney Dawes
  • Date: 2012-02-16 14:33:30 UTC
  • mfrom: (1170.1.1 update-from-trunk)
  • Revision ID: rodney.dawes@canonical.com-20120216143330-2hqwkybphi3ngw5z
[Rodney Dawes]

  - Use new twsited gireactor when available, and convert tests to use it always

[Facundo Batista]

  - Add an on-disk FIFO queue
  - Get the file object on the moment it is used in Upload and Download.
  - Compare paths in the test.
  - Put exceeding ops in an offload queue, pick them up and execute them later when operations finish.

[Natalia Bidart]

  - Skip hanging test suite.

[Brian Curtin]

  - Exported validate_path_from_folder from volume_manager instead of ubuntuone-control-panel's backend.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# ubuntuone.syncdaemon.action_queue - Action queue
 
1
# -*- coding: utf8 -*-
2
2
#
3
 
# Copyright 2009-2011 Canonical Ltd.
 
3
# Copyright 2009-2012 Canonical Ltd.
4
4
#
5
5
# This program is free software: you can redistribute it and/or modify it
6
6
# under the terms of the GNU General Public License version 3, as published
17
17
"""Queue and execute operations on the server."""
18
18
 
19
19
import base64
20
 
import simplejson
 
20
import inspect
21
21
import logging
22
22
import os
23
23
import random
24
24
import re
 
25
import simplejson
25
26
import tempfile
26
27
import traceback
27
28
import uuid
55
56
from ubuntuone.syncdaemon.interfaces import IActionQueue, IMarker
56
57
from ubuntuone.syncdaemon.logger import mklog, TRACE
57
58
from ubuntuone.syncdaemon.volume_manager import ACCESS_LEVEL_RW
58
 
from ubuntuone.syncdaemon import config
 
59
from ubuntuone.syncdaemon import config, offload_queue
59
60
 
60
61
logger = logging.getLogger("ubuntuone.SyncDaemon.ActionQueue")
61
62
 
454
455
                reactor.callFromThread(deferred.callback, True)
455
456
 
456
457
    @defer.inlineCallbacks
457
 
    def zip(self, upload):
 
458
    def zip(self, upload, fileobj_factory):
458
459
        """Acquire, do the compression in a thread, release."""
459
460
        deferred = defer.Deferred()
460
461
 
461
462
        yield self.acquire()
462
463
        try:
463
464
            try:
464
 
                fileobj = upload.fileobj_factory()
 
465
                fileobj = fileobj_factory()
465
466
            except StandardError, e:
466
467
                # maybe the user deleted the file before we got to upload it
467
468
                upload.log.warn("Unable to build fileobj (%s: '%s') so "
697
698
        self.uuid_map = DeferredMap()
698
699
        self.zip_queue = ZipQueue()
699
700
        self.conditions_locker = ConditionsLocker()
 
701
        self.disk_queue = offload_queue.OffloadQueue()
700
702
 
701
703
        self.estimated_free_space = {}
702
704
        event_queue.subscribe(self)
703
705
 
 
706
        # data for the offloaded queue
 
707
        user_config = config.get_user_config()
 
708
        self.memory_pool_limit = user_config.get_memory_pool_limit()
 
709
        self.commands = dict((x, y) for x, y in globals().iteritems()
 
710
                             if inspect.isclass(y) and
 
711
                                            issubclass(y, ActionQueueCommand))
 
712
 
704
713
    def check_conditions(self):
705
714
        """Check conditions in the locker, to release all the waiting ops."""
706
715
        self.conditions_locker.check_conditions()
1034
1043
            event_ok=None, handle_exception=False)
1035
1044
        defer.returnValue(result.volumes)
1036
1045
 
 
1046
    @defer.inlineCallbacks
 
1047
    def _really_execute(self, command_class, *args, **kwargs):
 
1048
        """Actually queue and execute the operation."""
 
1049
        cmd = command_class(self.queue, *args, **kwargs)
 
1050
 
 
1051
        # queue if should, otherwise all is done
 
1052
        if cmd.should_be_queued():
 
1053
            cmd.log.debug('queueing')
 
1054
            self.queue.queue(cmd)
 
1055
            yield cmd.go()
 
1056
            self.queue.unqueue(cmd)
 
1057
 
 
1058
    @defer.inlineCallbacks
 
1059
    def execute(self, command_class, *args, **kwargs):
 
1060
        """Execute a command only if there's room in memory to handle it."""
 
1061
        if len(self.queue) > self.memory_pool_limit:
 
1062
            # not enough room in memory, store it in the offloaded queue
 
1063
            logger.debug('offload push: %s %s %s', command_class.__name__, args, kwargs)
 
1064
            self.disk_queue.push((command_class.__name__, args, kwargs))
 
1065
            return
 
1066
 
 
1067
        # normal case, just instantiate the command and let it go
 
1068
        yield self._really_execute(command_class, *args, **kwargs)
 
1069
 
 
1070
        # command just finished... check to queue more offloaded ones
 
1071
        while len(self.queue) < self.memory_pool_limit and len(self.disk_queue) > 0:
 
1072
            command_class_name, args, kwargs = self.disk_queue.pop()
 
1073
            logger.debug('offload pop: %s %s %s', command_class_name, args, kwargs)
 
1074
            command_class = self.commands[command_class_name]
 
1075
            yield self._really_execute(command_class, *args, **kwargs)
 
1076
 
1037
1077
    def make_file(self, share_id, parent_id, name, marker, path):
1038
1078
        """See .interfaces.IMetaQueue."""
1039
 
        return MakeFile(self.queue, share_id, parent_id,
1040
 
                        name, marker, path).go()
 
1079
        self.execute(MakeFile, share_id, parent_id, name, marker, path)
1041
1080
 
1042
1081
    def make_dir(self, share_id, parent_id, name, marker, path):
1043
1082
        """See .interfaces.IMetaQueue."""
1044
 
        return MakeDir(self.queue, share_id, parent_id,
1045
 
                       name, marker, path).go()
 
1083
        self.execute(MakeDir, share_id, parent_id, name, marker, path)
1046
1084
 
1047
1085
    def move(self, share_id, node_id, old_parent_id, new_parent_id,
1048
1086
             new_name, path_from, path_to):
1049
1087
        """See .interfaces.IMetaQueue."""
1050
 
        return Move(self.queue, share_id, node_id, old_parent_id,
1051
 
                    new_parent_id, new_name, path_from, path_to).go()
 
1088
        self.execute(Move, share_id, node_id, old_parent_id,
 
1089
                     new_parent_id, new_name, path_from, path_to)
1052
1090
 
1053
1091
    def unlink(self, share_id, parent_id, node_id, path, is_dir):
1054
1092
        """See .interfaces.IMetaQueue."""
1055
 
        return Unlink(self.queue, share_id, parent_id, node_id, path,
1056
 
                      is_dir).go()
 
1093
        self.execute(Unlink, share_id, parent_id, node_id, path, is_dir)
1057
1094
 
1058
1095
    def inquire_free_space(self, share_id):
1059
1096
        """See .interfaces.IMetaQueue."""
1060
 
        return FreeSpaceInquiry(self.queue, share_id).go()
 
1097
        self.execute(FreeSpaceInquiry, share_id)
1061
1098
 
1062
1099
    def inquire_account_info(self):
1063
1100
        """See .interfaces.IMetaQueue."""
1064
 
        return AccountInquiry(self.queue).go()
 
1101
        self.execute(AccountInquiry)
1065
1102
 
1066
1103
    def list_shares(self):
1067
1104
        """See .interfaces.IMetaQueue."""
1068
 
        return ListShares(self.queue).go()
 
1105
        self.execute(ListShares)
1069
1106
 
1070
1107
    def answer_share(self, share_id, answer):
1071
1108
        """See .interfaces.IMetaQueue."""
1072
 
        return AnswerShare(self.queue, share_id, answer).go()
 
1109
        self.execute(AnswerShare, share_id, answer)
1073
1110
 
1074
1111
    def create_share(self, node_id, share_to, name, access_level,
1075
1112
                     marker, path):
1076
1113
        """See .interfaces.IMetaQueue."""
1077
 
        return CreateShare(self.queue, node_id, share_to, name,
1078
 
                           access_level, marker, path).go()
 
1114
        self.execute(CreateShare, node_id, share_to, name,
 
1115
                     access_level, marker, path)
1079
1116
 
1080
1117
    def delete_share(self, share_id):
1081
1118
        """See .interfaces.IMetaQueue."""
1082
 
        return DeleteShare(self.queue, share_id).go()
 
1119
        self.execute(DeleteShare, share_id)
1083
1120
 
1084
1121
    def create_udf(self, path, name, marker):
1085
1122
        """See .interfaces.IMetaQueue."""
1086
 
        return CreateUDF(self.queue, path, name, marker).go()
 
1123
        self.execute(CreateUDF, path, name, marker)
1087
1124
 
1088
1125
    def list_volumes(self):
1089
1126
        """See .interfaces.IMetaQueue."""
1090
 
        return ListVolumes(self.queue).go()
 
1127
        self.execute(ListVolumes)
1091
1128
 
1092
1129
    def delete_volume(self, volume_id, path):
1093
1130
        """See .interfaces.IMetaQueue."""
1094
 
        return DeleteVolume(self.queue, volume_id, path).go()
 
1131
        self.execute(DeleteVolume, volume_id, path)
1095
1132
 
1096
1133
    def change_public_access(self, share_id, node_id, is_public):
1097
1134
        """See .interfaces.IMetaQueue."""
1098
 
        return ChangePublicAccess(self.queue, share_id,
1099
 
                                  node_id, is_public).go()
 
1135
        self.execute(ChangePublicAccess, share_id, node_id, is_public)
1100
1136
 
1101
1137
    def get_public_files(self):
1102
1138
        """See .interfaces.IMetaQueue."""
1103
 
        return GetPublicFiles(self.queue).go()
 
1139
        self.execute(GetPublicFiles)
1104
1140
 
1105
 
    def download(self, share_id, node_id, server_hash, path, fileobj_factory):
 
1141
    def download(self, share_id, node_id, server_hash, path):
1106
1142
        """See .interfaces.IContentQueue.download."""
1107
 
        return Download(self.queue, share_id, node_id, server_hash,
1108
 
                        path, fileobj_factory).go()
 
1143
        self.execute(Download, share_id, node_id, server_hash, path)
1109
1144
 
1110
1145
    def upload(self, share_id, node_id, previous_hash, hash, crc32,
1111
 
               size, path, fileobj_factory, upload_id=None):
 
1146
               size, path, upload_id=None):
1112
1147
        """See .interfaces.IContentQueue."""
1113
 
        return Upload(self.queue, share_id, node_id, previous_hash,
1114
 
                      hash, crc32, size, path, fileobj_factory,
1115
 
                      upload_id=upload_id).go()
 
1148
        self.execute(Upload, share_id, node_id, previous_hash, hash, crc32,
 
1149
                     size, path, upload_id=upload_id)
1116
1150
 
1117
1151
    def _cancel_op(self, share_id, node_id, cmdclass):
1118
1152
        """Generalized form of cancel_upload and cancel_download."""
1138
1172
 
1139
1173
    def get_delta(self, volume_id, generation):
1140
1174
        """See .interfaces.IMetaQueue."""
1141
 
        return GetDelta(self.queue, volume_id, generation).go()
 
1175
        self.execute(GetDelta, volume_id, generation)
1142
1176
 
1143
1177
    def rescan_from_scratch(self, volume_id):
1144
1178
        """See .interfaces.IMetaQueue."""
1145
 
        return GetDeltaFromScratch(self.queue, volume_id).go()
 
1179
        self.execute(GetDeltaFromScratch, volume_id)
1146
1180
 
1147
1181
    def handle_SYS_ROOT_RECEIVED(self, root_id, mdid):
1148
1182
        """Demark the root node_id."""
1206
1240
        # we need to issue all the DeferredMap.get's right now, to be
1207
1241
        # dereferenced later
1208
1242
        waiting_structure = []
 
1243
        fsm = self.action_queue.main.fs
1209
1244
        for name in self.possible_markers:
1210
1245
            marker = getattr(self, name)
1211
1246
 
1212
1247
            # if a marker, get the real value; if not, it's already there, so
1213
1248
            # no action needed
1214
1249
            if IMarker.providedBy(marker):
1215
 
                self.log.debug("waiting for the real value of %r", marker)
1216
 
                d = self.action_queue.uuid_map.get(marker)
 
1250
                # we now it's a mdid, we may already have the marker
 
1251
                # in the metadata
 
1252
                try:
 
1253
                    mdobj = fsm.get_by_mdid(str(marker))
 
1254
                except KeyError:
 
1255
                    # node is not longer there, we don't care
 
1256
                    continue
 
1257
 
 
1258
                if mdobj.node_id is None:
 
1259
                    msg = "waiting for the real value of %r"
 
1260
                    d = self.action_queue.uuid_map.get(marker)
 
1261
                else:
 
1262
                    msg = "shortcutting the real value of %r"
 
1263
                    d = defer.succeed(mdobj.node_id)
 
1264
                self.log.debug(msg, marker)
1217
1265
                waiting_structure.append((name, marker, d))
1218
1266
 
1219
1267
        # now, we wait for all the dereferencings... if any
1244
1292
    def finish(self):
1245
1293
        """The command ended."""
1246
1294
        self.running = False
1247
 
        self._queue.unqueue(self)
 
1295
 
 
1296
    def should_be_queued(self):
 
1297
        """Check if the command should be queued."""
 
1298
        # create the log
 
1299
        self.make_logger()
 
1300
        return self._should_be_queued()
1248
1301
 
1249
1302
    def _should_be_queued(self):
1250
1303
        """Return True if the command should be queued."""
1272
1325
    @defer.inlineCallbacks
1273
1326
    def go(self):
1274
1327
        """Execute all the steps for a command."""
1275
 
        # create the log
1276
 
        self.make_logger()
1277
 
 
1278
 
        # queue if should, otherwise all is done
1279
 
        if not self._should_be_queued():
1280
 
            return
1281
 
 
1282
 
        self.log.debug('queueing')
1283
 
        self._queue.queue(self)
1284
 
 
1285
1328
        # set up basic marker failure handler and demark
1286
1329
        def f(failure):
1287
1330
            self.log.debug("failing because marker failed: %s", failure)
2209
2252
class Download(ActionQueueCommand):
2210
2253
    """Get the contents of a file."""
2211
2254
 
2212
 
    __slots__ = ('share_id', 'node_id', 'server_hash', 'fileobj_factory',
 
2255
    __slots__ = ('share_id', 'node_id', 'server_hash',
2213
2256
                 'fileobj', 'gunzip', 'path', 'download_req', 'tx_semaphore',
2214
2257
                 'deflated_size', 'n_bytes_read_last', 'n_bytes_read')
2215
2258
    logged_attrs = ActionQueueCommand.logged_attrs + (
2216
2259
                    'share_id', 'node_id', 'server_hash', 'path')
2217
2260
    possible_markers = 'node_id',
2218
2261
 
2219
 
    def __init__(self, request_queue, share_id, node_id, server_hash, path,
2220
 
                 fileobj_factory):
 
2262
    def __init__(self, request_queue, share_id, node_id, server_hash, path):
2221
2263
        super(Download, self).__init__(request_queue)
2222
2264
        self.share_id = share_id
2223
2265
        self.node_id = node_id
2224
2266
        self.server_hash = server_hash
2225
 
        self.fileobj_factory = fileobj_factory
2226
2267
        self.fileobj = None
2227
2268
        self.gunzip = None
2228
2269
        self.path = path
2286
2327
        """Do the actual running."""
2287
2328
        # start or reset the file object, and get a new decompressor
2288
2329
        if self.fileobj is None:
 
2330
            fsm = self.action_queue.main.fs
2289
2331
            try:
2290
 
                self.fileobj = self.fileobj_factory()
 
2332
                self.fileobj = fsm.get_partial_for_writing(self.node_id,
 
2333
                                                           self.share_id)
2291
2334
            except StandardError:
2292
2335
                self.log.debug(traceback.format_exc())
2293
2336
                msg = DefaultException('unable to build fileobj'
2375
2418
    """Upload stuff to a file."""
2376
2419
 
2377
2420
    __slots__ = ('share_id', 'node_id', 'previous_hash', 'hash', 'crc32',
2378
 
                 'size', 'fileobj_factory', 'magic_hash',
2379
 
                 'deflated_size', 'tempfile', 'upload_req',
2380
 
                 'tx_semaphore', 'n_bytes_written_last', 'path',
 
2421
                 'size', 'magic_hash', 'deflated_size', 'tempfile',
 
2422
                 'tx_semaphore', 'n_bytes_written_last', 'path', 'upload_req',
2381
2423
                 'n_bytes_written', 'upload_id')
2382
2424
 
2383
2425
    logged_attrs = ActionQueueCommand.logged_attrs + (
2388
2430
    possible_markers = 'node_id',
2389
2431
 
2390
2432
    def __init__(self, request_queue, share_id, node_id, previous_hash, hash,
2391
 
                 crc32, size, path, fileobj_factory, upload_id=None):
 
2433
                 crc32, size, path, upload_id=None):
2392
2434
        super(Upload, self).__init__(request_queue)
2393
2435
        self.share_id = share_id
2394
2436
        self.node_id = node_id
2396
2438
        self.hash = hash
2397
2439
        self.crc32 = crc32
2398
2440
        self.size = size
2399
 
        self.fileobj_factory = fileobj_factory
2400
2441
        self.upload_id = upload_id
2401
2442
        self.tempfile = None
2402
2443
        self.path = path
2477
2518
            return
2478
2519
        self.log.debug('semaphore acquired')
2479
2520
 
2480
 
        yield self.action_queue.zip_queue.zip(self)
 
2521
        fsm = self.action_queue.main.fs
 
2522
        mdobj = fsm.get_by_node_id(self.share_id, self.node_id)
 
2523
        fileobj_factory = lambda: fsm.open_file(mdobj.mdid)
 
2524
        yield self.action_queue.zip_queue.zip(self, fileobj_factory)
2481
2525
 
2482
2526
    def finish(self):
2483
2527
        """Release the semaphore if already acquired."""