~ubuntuone-control-tower/ubuntuone-client/trunk

« back to all changes in this revision

Viewing changes to canonical/ubuntuone/storage/syncdaemon/action_queue.py

  • Committer: Rodney Dawes
  • Date: 2009-05-12 13:36:05 UTC
  • Revision ID: rodney.dawes@canonical.com-20090512133605-6aqs6e8xnnmp5u1p
        Import the code
        Hook up lint/trial tests in setup.py
        Use icontool now instead of including the render script
        Add missing python-gnome2-desktop to package dependencies
        Update debian/rules to fix the icon cache issue

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# canonical.ubuntuone.storage.syncdaemon.action_queue - Action queue
 
2
#
 
3
# Author: John Lenton <john.lenton@canonical.com>
 
4
#
 
5
# Copyright 2009 Canonical Ltd.
 
6
#
 
7
# This program is free software: you can redistribute it and/or modify it
 
8
# under the terms of the GNU General Public License version 3, as published
 
9
# by the Free Software Foundation.
 
10
#
 
11
# This program is distributed in the hope that it will be useful, but
 
12
# WITHOUT ANY WARRANTY; without even the implied warranties of
 
13
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
 
14
# PURPOSE.  See the GNU General Public License for more details.
 
15
#
 
16
# You should have received a copy of the GNU General Public License along
 
17
# with this program.  If not, see <http://www.gnu.org/licenses/>.
 
18
"""
 
19
The ActionQueue is where actions to be performed on the server are
 
20
queued up and then executed. The idea is that there are two queues,
 
21
one for metadata and another for content; the metadata queue has
 
22
priority over the content queue.
 
23
"""
 
24
from collections import deque, defaultdict
 
25
from functools import wraps, partial
 
26
import logging
 
27
import os
 
28
import random
 
29
import tempfile
 
30
import zlib
 
31
 
 
32
from zope.interface import implements
 
33
from twisted.internet import reactor, defer, ssl
 
34
from twisted.names import client as dns_client
 
35
from twisted.python.failure import Failure
 
36
 
 
37
import uuid
 
38
from canonical.ubuntuone.storage.protocol import protocol_pb2
 
39
from canonical.ubuntuone.storage.protocol.client import StorageClient, \
 
40
    StorageClientFactory
 
41
from canonical.ubuntuone.storage.syncdaemon.logger import mklog, TRACE
 
42
from canonical.ubuntuone.storage.syncdaemon.interfaces import IActionQueue, \
 
43
    IMarker
 
44
 
 
45
logger = logging.getLogger("ubuntuone.SyncDaemon.ActionQueue")
 
46
 
 
47
# I want something which repr() is "---" *without* the quotes :)
 
48
UNKNOWN = type('', (), {'__repr__': lambda _: '---'})()
 
49
 
 
50
def passit(func):
 
51
    """
 
52
    Pass the value on for the next deferred, while calling func with
 
53
    it.
 
54
    """
 
55
    @wraps(func)
 
56
    def wrapper(a):
 
57
        """
 
58
        Do it.
 
59
        """
 
60
        func(a)
 
61
        return a
 
62
    return wrapper
 
63
 
 
64
 
 
65
class UploadCompressionCancelled(Exception):
 
66
    """Compression of a file for upload cancelled."""
 
67
 
 
68
 
 
69
class RequestCleanedUp(Exception):
 
70
    """
 
71
    The request was cancelled by ActionQueue.cleanup()
 
72
    """
 
73
 
 
74
 
 
75
class NamedTemporaryFile(object):
 
76
    """
 
77
    Like tempfile.NamedTemporaryFile, but working in 2.5 also WRT the
 
78
    delete argument. Actually, one of these NamedTemporaryFile()s is
 
79
    the same as a tempfile.NamedTemporaryFile(delete=False) from 2.6.
 
80
 
 
81
    Or so the theory goes.
 
82
    """
 
83
    def __init__(self):
 
84
        fileno, self.name = tempfile.mkstemp()
 
85
        self._fd = os.fdopen(fileno, 'r+w')
 
86
 
 
87
    def __getattr__(self, attr):
 
88
        """proxy everything else (other than .name) on to self._fd"""
 
89
        return getattr(self._fd, attr)
 
90
 
 
91
 
 
92
class MultiProxy(list):
 
93
    """
 
94
    Proxy many objects of the same kind, like this:
 
95
 
 
96
    >>> m = MultiProxy(['foo', 'bar', 'baz'])
 
97
    >>> m.upper()
 
98
    MultiProxy(['FOO', 'BAR', 'BAZ'])
 
99
    """
 
100
 
 
101
    def __getattr__(self, attr):
 
102
        return MultiProxy(getattr(i, attr) for i in self)
 
103
 
 
104
    def __call__(self, *args, **kwargs):
 
105
        return MultiProxy(i(*args, **kwargs) for i in self)
 
106
 
 
107
    def __repr__(self):
 
108
        return 'MultiProxy(%s)' % (super(MultiProxy, self).__repr__(),)
 
109
 
 
110
 
 
111
class LoggingStorageClient(StorageClient):
 
112
    """ A subclass of StorageClient that adds logging to
 
113
    processMessage and sendMessage.
 
114
    """
 
115
 
 
116
    def __init__(self):
 
117
        """ setup logging and create the instance. """
 
118
        StorageClient.__init__(self)
 
119
        self.log = logging.getLogger('ubuntuone.SyncDaemon.StorageClient')
 
120
        # configure the handler level to be < than DEBUG
 
121
        self.log.setLevel(TRACE)
 
122
        self.log.debug = partial(self.log.log, TRACE)
 
123
 
 
124
    def processMessage(self, message):
 
125
        """ wrapper that logs the message and result. """
 
126
        # don't log the full message if it's of type BYTES
 
127
        if message.type == protocol_pb2.Message.BYTES:
 
128
            self.log.debug('start - processMessage: id: %s, type: %s',
 
129
                           message.id, message.type)
 
130
        else:
 
131
            self.log.debug('start - processMessage: %s',
 
132
                          str(message).replace("\n", " "))
 
133
        if message.id in self.requests:
 
134
            req = self.requests[message.id]
 
135
            req.deferred.addCallbacks(self.log_success, self.log_error)
 
136
        result = StorageClient.processMessage(self, message)
 
137
        self.log.debug('end - processMessage: id: %s - result: %s',
 
138
                       message.id, result)
 
139
        return result
 
140
 
 
141
    def log_error(self, failure):
 
142
        """ logging errback for requests """
 
143
        self.log.debug('request error: %s', failure)
 
144
        return failure
 
145
 
 
146
    def log_success(self, result):
 
147
        """ logging callback for requests """
 
148
        self.log.debug('request finished: %s', result)
 
149
        if getattr(result, '__dict__', None):
 
150
            self.log.debug('result.__dict__: %s', result.__dict__)
 
151
        return result
 
152
 
 
153
    def sendMessage(self, message):
 
154
        """ wrapper that logs the message and result. """
 
155
        # don't log the full message if it's of type BYTES
 
156
        if message.type == protocol_pb2.Message.BYTES:
 
157
            self.log.debug('start - sendMessage: id: %s, type: %s',
 
158
                           message.id, message.type)
 
159
        else:
 
160
            self.log.debug('start - sendMessage: %s',
 
161
                          str(message).replace("\n", " "))
 
162
        result = StorageClient.sendMessage(self, message)
 
163
        self.log.debug('end - sendMessage: id: %s', message.id)
 
164
        return result
 
165
 
 
166
 
 
167
class ActionQueueProtocol(LoggingStorageClient):
 
168
    """
 
169
    This is the Action Queue version of the StorageClient protocol.
 
170
    """
 
171
    connection_state = 'disconnected'
 
172
    factory = None
 
173
 
 
174
    def connectionMade(self):
 
175
        """
 
176
        Called when a new connection is made.
 
177
        All the state is saved in the factory.
 
178
        """
 
179
        logger.debug("connection made")
 
180
        self.connection_state = 'connected'
 
181
        self.factory.client = self
 
182
        self.set_node_state_callback(self.factory._node_state_callback)
 
183
        self.set_share_change_callback(self.factory._share_change_callback)
 
184
        self.set_share_answer_callback(self.factory._share_answer_callback)
 
185
        self.factory.event_queue.push('SYS_CONNECTION_MADE')
 
186
 
 
187
    def disconnect(self):
 
188
        """
 
189
        Close down the sockets
 
190
        """
 
191
        logger.debug("disconnected")
 
192
        if self.transport is not None:
 
193
            self.transport.loseConnection()
 
194
 
 
195
 
 
196
class Marker(str):
 
197
    """
 
198
    A uuid4-based marker class
 
199
    """
 
200
    implements(IMarker)
 
201
    def __new__(self):
 
202
        return super(Marker, self).__new__(self, uuid.uuid4())
 
203
 
 
204
 
 
205
class ZipQueue(object):
 
206
    """
 
207
    A queue of files to be compressed for upload
 
208
 
 
209
    Parts of this were shamelessly copied from
 
210
    twisted.internet.defer.DeferredSemaphore
 
211
 
 
212
    see bug #373984
 
213
    """
 
214
    def __init__(self):
 
215
        self.waiting = deque()
 
216
        self.tokens = self.limit = 10
 
217
 
 
218
    def acquire(self):
 
219
        """
 
220
        return a deferred which fires on token acquisition.
 
221
        """
 
222
        assert self.tokens >= 0, "Tokens should never be negative"
 
223
        d = defer.Deferred()
 
224
        if not self.tokens:
 
225
            self.waiting.append(d)
 
226
        else:
 
227
            self.tokens = self.tokens - 1
 
228
            d.callback(self)
 
229
        return d
 
230
 
 
231
    def release(self):
 
232
        """
 
233
        Release the token.
 
234
 
 
235
        Should be called by whoever did the acquire() when the shared
 
236
        resource is free.
 
237
        """
 
238
        assert self.tokens < self.limit, "Too many tokens!"
 
239
        self.tokens = self.tokens + 1
 
240
        if self.waiting:
 
241
            # someone is waiting to acquire token
 
242
            self.tokens = self.tokens - 1
 
243
            d = self.waiting.popleft()
 
244
            d.callback(self)
 
245
 
 
246
    def _compress(self, deferred, upload):
 
247
        """Compression background task."""
 
248
        try:
 
249
            fileobj = upload.fileobj_factory()
 
250
        except StandardError:
 
251
            # presumably the user deleted the file before we got to
 
252
            # upload it. Logging a warning just in case.
 
253
            upload.log.warn('unable to build fileobj'
 
254
                            ' (user deleted the file, maybe?)'
 
255
                            ' so cancelling the upload.')
 
256
            upload.cancel()
 
257
            fileobj = None
 
258
 
 
259
        filename = getattr(fileobj, 'name', '<?>')
 
260
 
 
261
        upload.log.debug('compressing', filename)
 
262
        try:
 
263
            # we need to compress the file completely to figure out its
 
264
            # compressed size. So streaming is out :(
 
265
            if upload.tempfile_factory is None:
 
266
                f = NamedTemporaryFile()
 
267
            else:
 
268
                f = upload.tempfile_factory()
 
269
            zipper = zlib.compressobj()
 
270
            while not upload.cancelled:
 
271
                data = fileobj.read(4096)
 
272
                if not data:
 
273
                    f.write(zipper.flush())
 
274
                    # no flush/sync because we don't need this to persist
 
275
                    # on disk; if the machine goes down, we'll lose it
 
276
                    # anyway (being in /tmp and all)
 
277
                    break
 
278
                f.write(zipper.compress(data))
 
279
            if upload.cancelled:
 
280
                raise UploadCompressionCancelled("Cancelled")
 
281
            upload.deflated_size = f.tell()
 
282
            # close the compressed file (thus, if you actually want to stream
 
283
            # it out, it must have a name so it can be reopnened)
 
284
            f.close()
 
285
            upload.tempfile = f
 
286
        except Exception, e: # pylint: disable-msg=W0703
 
287
            reactor.callFromThread(deferred.errback, e)
 
288
        else:
 
289
            reactor.callFromThread(deferred.callback, True)
 
290
 
 
291
    def zip(self, upload):
 
292
        """
 
293
        Acquire, do the compression in a thread, release.
 
294
        """
 
295
        d_zip = defer.Deferred()
 
296
        d_lck = self.acquire()
 
297
        d_lck.addCallback(
 
298
            lambda _: reactor.callFromThread(self._compress,
 
299
                                             d_zip, upload) or d_zip)
 
300
        d_lck.addCallback(lambda _: self.release())
 
301
 
 
302
        return d_lck
 
303
 
 
304
 
 
305
class RequestQueue(object):
 
306
    """
 
307
    RequestQueue is a queue that ensures that there is at most one
 
308
    request at a time 'on the wire', and that uses the action queue's
 
309
    states for its syncrhonization.
 
310
    """
 
311
    def __init__(self, name, action_queue):
 
312
        super(RequestQueue, self).__init__()
 
313
        self.name = name
 
314
        self.action_queue = action_queue
 
315
        self.waiting = deque()
 
316
        self.head = None
 
317
        self.paused = True
 
318
 
 
319
    def __len__(self):
 
320
        """return the length of the queue"""
 
321
        return len(self.waiting)
 
322
 
 
323
    def queue(self, func, *args, **kwargs):
 
324
        """
 
325
        Add a call to func to the queue.
 
326
        """
 
327
        d = defer.Deferred()
 
328
        d.addCallback(lambda _: defer.maybeDeferred(func, *args, **kwargs))
 
329
        self.waiting.append(d)
 
330
        if len(self.waiting) == 1 and not self.head:
 
331
            self.action_queue.event_queue.push('SYS_' + self.name
 
332
                                               + '_WAITING')
 
333
        return d
 
334
 
 
335
    def run(self):
 
336
        """
 
337
        Empty the queue.
 
338
        """
 
339
        if self.waiting:
 
340
            d = self.waiting.popleft()
 
341
            # self.head = d
 
342
            d.addBoth(lambda a: (self.run(), a)[1])
 
343
            d.callback(None)
 
344
        else:
 
345
            # self.head = None
 
346
            self.action_queue.event_queue.push('SYS_' + self.name + '_DONE')
 
347
 
 
348
class ContentQueue(RequestQueue):
 
349
    """
 
350
    A content queue is a queue of content requests (uploads and downloads).
 
351
    """
 
352
 
 
353
 
 
354
class MetaQueue(RequestQueue):
 
355
    """
 
356
    A meta queue is a queue of metadata-related requests.
 
357
    """
 
358
 
 
359
 
 
360
class DeferredMap(object):
 
361
    """
 
362
    A mapping of deferred values. Or a deferred map of values. Or a
 
363
    mapping that returns deferreds and then fires them when it has the
 
364
    value.
 
365
    """
 
366
    def __init__(self):
 
367
        self.waiting = defaultdict(list)
 
368
        self.failed = {}
 
369
        self.map = {}
 
370
 
 
371
    def get(self, key):
 
372
        """
 
373
        Get the value for the given key.
 
374
 
 
375
        This always returns a deferred; when we already know the value
 
376
        we return a `succeed`, and if we don't know the value because
 
377
        it failed we return a `fail`; otherwise we return a plain
 
378
        unfired `Deferred`, and add it to the list of deferreds to
 
379
        call when we actually get the value.
 
380
        """
 
381
        if key in self.map:
 
382
            return defer.succeed(self.map[key])
 
383
        if key in self.failed:
 
384
            return defer.fail(Exception(self.failed[key]))
 
385
        d = defer.Deferred()
 
386
        self.waiting[key].append(d)
 
387
        return d
 
388
 
 
389
    def set(self, key, value):
 
390
        """
 
391
        We've got the value for a key! Write it down in the map, and
 
392
        fire the waiting deferreds.
 
393
        """
 
394
        if key not in self.map:
 
395
            self.map[key] =  value
 
396
            for d in self.waiting.pop(key, ()):
 
397
                d.callback(value)
 
398
        elif self.map[key] != value:
 
399
            if key in self.map:
 
400
                raise KeyError("key is taken -- dunno what to do")
 
401
 
 
402
    def err(self, key, failure):
 
403
        """
 
404
        Something went terribly wrong in the process of getting a
 
405
        value. Break the news to the waiting deferreds.
 
406
        """
 
407
        self.failed[key] = failure.getErrorMessage()
 
408
        for d in self.waiting.pop(key, ()):
 
409
            d.errback(failure)
 
410
 
 
411
 
 
412
class UploadProgressWrapper(object):
 
413
    """
 
414
    A wrapper around the file-like object used for Uploads, with which
 
415
    we can keep track of the number of bytes that have been written to
 
416
    the store.
 
417
    """
 
418
    def __init__(self, fd, data_dict):
 
419
        """
 
420
        fd is the file-like object used for uploads. data_dict is the
 
421
        entry in the uploading dictionary.
 
422
        """
 
423
        self.fd = fd
 
424
        self.data_dict = data_dict
 
425
        self.n_bytes_read = 0
 
426
 
 
427
    def read(self, size=None):
 
428
        """
 
429
        read at most size bytes from the file-like object.
 
430
 
 
431
        Keep track of the number of bytes that have been read, and the
 
432
        number of bytes that have been written (assumed to be equal to
 
433
        the number of bytes read on the previews call to read). The
 
434
        latter is done directly in the data_dict.
 
435
        """
 
436
        self.data_dict['n_bytes_written'] = self.n_bytes_read
 
437
        data = self.fd.read(size)
 
438
        self.n_bytes_read += len(data)
 
439
        return data
 
440
 
 
441
    def __getattr__(self, attr):
 
442
        """
 
443
        Proxy all the rest.
 
444
        """
 
445
        return getattr(self.fd, attr)
 
446
 
 
447
 
 
448
class ActionQueue(StorageClientFactory, object):
 
449
    """
 
450
    This is the ActionQueue itself.
 
451
    """
 
452
    implements(IActionQueue)
 
453
    protocol = ActionQueueProtocol
 
454
 
 
455
    def __init__(self, event_queue, host, port, dns_srv,
 
456
                 use_ssl=False):
 
457
        self.event_queue = event_queue
 
458
        self.host = host
 
459
        self.port = port
 
460
        self.dns_srv = dns_srv
 
461
        self.use_ssl = use_ssl
 
462
 
 
463
        self.token = None
 
464
        self.client = None
 
465
        self.deferred = None
 
466
 
 
467
        self.content_queue = ContentQueue('CONTENT_QUEUE', self)
 
468
        self.meta_queue = MetaQueue('META_QUEUE', self)
 
469
        self.uuid_map = DeferredMap()
 
470
        self.zip_queue = ZipQueue()
 
471
 
 
472
        self.uploading = {}
 
473
        self.downloading = {}
 
474
 
 
475
        event_queue.subscribe(self)
 
476
 
 
477
    def handle_SYS_CONNECT(self, access_token):
 
478
        """
 
479
        Stow the access token away for later use
 
480
        """
 
481
        self.token = access_token
 
482
 
 
483
    def cleanup(self):
 
484
        """
 
485
        Cancel, clean up, and reschedule things that were in progress
 
486
        when a disconnection happened
 
487
        """
 
488
        self.event_queue.push('SYS_CLEANUP_STARTED')
 
489
        if self.client is not None:
 
490
            self.client.disconnect()
 
491
        for queue in self.meta_queue, self.content_queue:
 
492
            if queue.head is not None:
 
493
                queue.head.errback(RequestCleanedUp('Cleaned up'))
 
494
        self.event_queue.push('SYS_CLEANUP_FINISHED')
 
495
 
 
496
 
 
497
    def _node_state_callback(self, share_id, node_id, hash):
 
498
        """
 
499
        Called by the client when notified that node changed.
 
500
        """
 
501
        self.event_queue.push('SV_HASH_NEW',
 
502
                              share_id=share_id, node_id=node_id, hash=hash)
 
503
 
 
504
    def _share_change_callback(self, message, info):
 
505
        """
 
506
        Called by the client when notified that a share changed.
 
507
        """
 
508
        self.event_queue.push('SV_SHARE_CHANGED',
 
509
                              message=message, info=info)
 
510
 
 
511
    def _share_answer_callback(self, share_id, answer):
 
512
        """
 
513
        Called by the client when it gets a share answer notification.
 
514
        """
 
515
        self.event_queue.push('SV_SHARE_ANSWERED',
 
516
                              share_id=str(share_id), answer=answer)
 
517
 
 
518
    def _lookup_srv(self):
 
519
        """ do the SRV lookup and return a deferred whose callback is going
 
520
        to be called with (host, port). If we can't do the lookup, the default
 
521
        host, port is used.
 
522
        """
 
523
        def on_lookup_ok(results):
 
524
            """Get a random host from the SRV result."""
 
525
            logger.debug('SRV lookup done, choosing a server')
 
526
            records, auth, add = results
 
527
            if not records:
 
528
                raise ValueError("No available records.")
 
529
            # pick a random server
 
530
            record = random.choice(records)
 
531
            logger.debug('Using record: %r', record)
 
532
            if record.payload:
 
533
                return record.payload.target.name, record.payload.port
 
534
            else:
 
535
                logger.info('Empty SRV record, fallback to %r:%r',
 
536
                            self.host, self.port)
 
537
                return self.host, self.port
 
538
 
 
539
        def on_lookup_error(failure):
 
540
            """ return the default host/post on a DNS SRV lookup failure. """
 
541
            logger.info("SRV lookup error, fallback to %r:%r \n%s",
 
542
                        self.host, self.port, failure.getTraceback())
 
543
            return self.host, self.port
 
544
 
 
545
        if self.dns_srv:
 
546
            # lookup the DNS SRV records
 
547
            d = dns_client.lookupService(self.dns_srv, timeout=[3, 2])
 
548
            d.addCallback(on_lookup_ok)
 
549
            d.addErrback(on_lookup_error)
 
550
            return d
 
551
        else:
 
552
            return defer.succeed((self.host, self.port))
 
553
 
 
554
    def connect(self):
 
555
        """
 
556
        Start the circus going.
 
557
        """
 
558
        self.deferred = defer.Deferred()
 
559
        d = self._lookup_srv()
 
560
        def _connect(result):
 
561
            """ do the real thing """
 
562
            host, port = result
 
563
            if self.use_ssl:
 
564
                reactor.connectSSL(host, port, self,
 
565
                                   ssl.ClientContextFactory())
 
566
            else:
 
567
                reactor.connectTCP(host, port, self)
 
568
        d.addCallback(_connect)
 
569
        return self.deferred
 
570
 
 
571
    def conectionFailed(self, reason=None):
 
572
        """
 
573
        Called when the connect() call fails
 
574
        """
 
575
        self.deferred.errback(reason)
 
576
 
 
577
    def get_root(self, marker):
 
578
        """
 
579
        Get the user's root uuid. Use the uuid_map, so the caller can
 
580
        use the marker in followup operations.
 
581
        """
 
582
        log = mklog(logger, 'get_root', '', marker, marker=marker)
 
583
        log.debug('starting')
 
584
        d = self.client.get_root()
 
585
        d.addCallbacks(*log.callbacks())
 
586
        d.addCallbacks(passit(lambda root: self.uuid_map.set(marker, root)),
 
587
                       passit(lambda f: self.uuid_map.err(marker, f)))
 
588
 
 
589
        return d
 
590
 
 
591
    def make_file(self, share_id, parent_id, name, marker):
 
592
        """
 
593
        See .interfaces.IMetaQueue
 
594
        """
 
595
        return MakeFile(self, share_id, parent_id, name, marker).start()
 
596
 
 
597
    def make_dir(self, share_id, parent_id, name, marker):
 
598
        """
 
599
        See .interfaces.IMetaQueue
 
600
        """
 
601
        return MakeDir(self, share_id, parent_id, name, marker).start()
 
602
 
 
603
    def move(self, share_id, node_id, old_parent_id, new_parent_id, new_name):
 
604
        """
 
605
        See .interfaces.IMetaQueue
 
606
        """
 
607
        return Move(self, share_id, node_id, old_parent_id,
 
608
                    new_parent_id, new_name).start()
 
609
 
 
610
    def unlink(self, share_id, parent_id, node_id):
 
611
        """
 
612
        See .interfaces.IMetaQueue
 
613
        """
 
614
        return Unlink(self, share_id, parent_id, node_id).start()
 
615
 
 
616
    def query(self, items):
 
617
        """
 
618
        See .interfaces.IMetaQueue
 
619
        """
 
620
        return Query(self, items).start()
 
621
 
 
622
    def list_shares(self):
 
623
        """
 
624
        List the shares; put the result on the event queue
 
625
        """
 
626
        return ListShares(self).start()
 
627
 
 
628
    def answer_share(self, share_id, answer):
 
629
        """
 
630
        Answer the offer of a share.
 
631
        """
 
632
        return AnswerShare(self, share_id, answer).start()
 
633
 
 
634
    def create_share(self, node_id, share_to, name, access_level, marker):
 
635
        """
 
636
        Share a node with somebody.
 
637
        """
 
638
        return CreateShare(self, node_id, share_to, name, access_level,
 
639
                           marker).start()
 
640
 
 
641
    def listdir(self, share_id, node_id, server_hash, fileobj_factory):
 
642
        """
 
643
        See .interfaces.IMetaQueue.listdir
 
644
        """
 
645
        return ListDir(self, share_id, node_id, server_hash,
 
646
                       fileobj_factory).start()
 
647
 
 
648
    def download(self, share_id, node_id, server_hash, fileobj_factory):
 
649
        """
 
650
        See .interfaces.IContentQueue.download
 
651
        """
 
652
        return Download(self, share_id, node_id, server_hash,
 
653
                        fileobj_factory).start()
 
654
 
 
655
    def upload(self, share_id, node_id, previous_hash, hash, crc32,
 
656
               size, fileobj_factory, tempfile_factory=None):
 
657
        """
 
658
        See .interfaces.IContentQueue
 
659
        """
 
660
        return Upload(self, share_id, node_id, previous_hash, hash,
 
661
                      crc32, size, fileobj_factory, tempfile_factory).start()
 
662
 
 
663
    def cancel_upload(self, share_id, node_id):
 
664
        """
 
665
        See .interfaces.IContentQueue
 
666
        """
 
667
        log = mklog(logger, 'cancel_upload', share_id, node_id,
 
668
                    share=share_id, node=node_id)
 
669
        log.debug('starting')
 
670
        if (share_id, node_id) in self.uploading:
 
671
            req = self.uploading[share_id, node_id].get('req')
 
672
            if req is not None:
 
673
                req.cancel()
 
674
                log.debug("cancelled")
 
675
        log.debug('finished')
 
676
 
 
677
    def cancel_download(self, share_id, node_id):
 
678
        """
 
679
        See .interfaces.IContentQueue
 
680
        """
 
681
        log = mklog(logger, 'cancel_download', share_id, node_id,
 
682
                    share=share_id, node=node_id)
 
683
        log.debug('starting')
 
684
        if (share_id, node_id) in self.downloading:
 
685
            req = self.downloading[share_id, node_id].get('req')
 
686
            if req is not None:
 
687
                req.cancel()
 
688
                log.debug("cancelled")
 
689
        log.debug('finished')
 
690
 
 
691
 
 
692
SKIP_THIS_ITEM = object()
 
693
 
 
694
# pylint: disable-msg=W0231
 
695
 
 
696
class ActionQueueCommand(object):
 
697
    """
 
698
    Base of all the action queue commands
 
699
    """
 
700
 
 
701
    # protobuf doesn't seem to have much introspectionable stuff
 
702
    # without going into private attributes
 
703
    known_error_messages = (set(protocol_pb2._ERROR_ERRORTYPE.values_by_name)
 
704
                            | set(['CANCELLED']))
 
705
    suppressed_error_messages = (known_error_messages
 
706
                                 - set(['INTERNAL_ERROR'])
 
707
                                 | set(['Cleaned up']))
 
708
    retryable_errors = set(['Cleaned up', 'TRY_AGAIN'])
 
709
 
 
710
    def demark(self, *maybe_markers):
 
711
        """
 
712
        Arrange to have maybe_markers realized
 
713
        """
 
714
        l = []
 
715
        for marker in maybe_markers:
 
716
            if IMarker.providedBy(marker):
 
717
                self.log.debug('waiting until we know the real value of %s'
 
718
                               % marker)
 
719
                d = self.action_queue.uuid_map.get(marker)
 
720
                d.addCallbacks(passit(lambda _:
 
721
                                          self.log.debug('got %s' % marker)),
 
722
                               passit(lambda f:
 
723
                                          self.log.error('failed %s' % marker)))
 
724
            else:
 
725
                d = defer.succeed(marker)
 
726
            l.append(d)
 
727
        dl = defer.DeferredList(l, fireOnOneErrback=True, consumeErrors=True)
 
728
        dl.addCallbacks(self.unwrap,
 
729
                        lambda f: f.value.subFailure)
 
730
        return dl
 
731
 
 
732
    @staticmethod
 
733
    def unwrap(results):
 
734
        """
 
735
        Unpack the values from the result of a DeferredList. If
 
736
        there's a failure, return it instead.
 
737
        """
 
738
        values = []
 
739
        for result in results:
 
740
            # result can be none if one of the callbacks failed
 
741
            # before the others were ready
 
742
            if result is not None:
 
743
                is_ok, value = result
 
744
                if not is_ok:
 
745
                    # a failure!
 
746
                    return value
 
747
                if value is not SKIP_THIS_ITEM:
 
748
                    values.append(value)
 
749
        return values
 
750
 
 
751
 
 
752
    def end_callback(self, arg):
 
753
        """
 
754
        It worked!
 
755
        """
 
756
        self._queue.head = None
 
757
        self.log.debug('success')
 
758
        return self.handle_success(arg)
 
759
 
 
760
    def end_errback(self, failure):
 
761
        """
 
762
        It failed!
 
763
        """
 
764
        self._queue.head = None
 
765
        error_message = failure.getErrorMessage()
 
766
        if error_message not in self.suppressed_error_messages:
 
767
            self.log.error('failure', error_message)
 
768
            self.log.debug('traceback follows:\n\n' + failure.getTraceback())
 
769
        else:
 
770
            self.log.warn('failure', error_message)
 
771
        if error_message in self.retryable_errors:
 
772
            reactor.callLater(0.1, self.retry)
 
773
        else:
 
774
            return self.handle_failure(failure)
 
775
 
 
776
    def start(self, _=None):
 
777
        """
 
778
        Get ready to run, and then run.
 
779
 
 
780
        The default implementation is for when there is no preparation necessary
 
781
        """
 
782
        d = self._start()
 
783
        d.addCallback(self.store_marker_result)
 
784
        d.addCallback(lambda _: self.log.debug('queueing in the %s'
 
785
                                               % self._queue.name))
 
786
        d.addCallbacks(lambda _: self._queue.queue(self.run),
 
787
                       self.handle_failure)
 
788
        return d
 
789
 
 
790
    def _start(self):
 
791
        """
 
792
        Do the specialized pre-run setup
 
793
        """
 
794
        return defer.succeed(None)
 
795
 
 
796
    def store_marker_result(self, _):
 
797
        """
 
798
        Called when all the markers are realized.
 
799
        """
 
800
 
 
801
    def run(self):
 
802
        """
 
803
        Do the deed.
 
804
        """
 
805
        self.log.debug('starting')
 
806
        self._queue.head = d = self._run()
 
807
        d.addCallbacks(self.end_callback, self.end_errback)
 
808
        return d
 
809
 
 
810
    def handle_success(self, success):
 
811
        """
 
812
        Do anthing that's needed to handle success of the operation.
 
813
        """
 
814
        return success
 
815
 
 
816
    def handle_failure(self, failure):
 
817
        """
 
818
        Do anthing that's needed to handle failure of the operation.
 
819
        Note that cancellation and TRY_AGAIN are already handled.
 
820
        """
 
821
        return failure
 
822
 
 
823
    def retry(self):
 
824
        """
 
825
        Request cancelled or TRY_AGAIN. Well then, try again!
 
826
        """
 
827
        return self._queue.queue(self.run)
 
828
 
 
829
 
 
830
 
 
831
class ActionQueueMetaCommand(ActionQueueCommand):
 
832
    """
 
833
    Base of metadata-related commands (commands that are queued in the
 
834
    meta queue)
 
835
    """
 
836
    @property
 
837
    def _queue(self):
 
838
        """
 
839
        Get at the meta queue
 
840
        """
 
841
        return self.action_queue.meta_queue
 
842
 
 
843
 
 
844
class ActionQueueContentCommand(ActionQueueCommand):
 
845
    """
 
846
    Base of content-related commands (commands that are queued in the
 
847
    content queue)
 
848
    """
 
849
    @property
 
850
    def _queue(self):
 
851
        """
 
852
        Get at the content queue
 
853
        """
 
854
        return self.action_queue.content_queue
 
855
 
 
856
 
 
857
class MakeThing(ActionQueueMetaCommand):
 
858
    """
 
859
    Base of MakeFile and MakeDir
 
860
    """
 
861
    def __init__(self, action_queue, share_id, parent_id, name, marker):
 
862
        self.action_queue = action_queue
 
863
        self.share_id = share_id
 
864
        self.parent_id = parent_id
 
865
        # Unicode boundary! the name is Unicode in protocol and server, but
 
866
        # here we use bytes for paths
 
867
        self.name = name.decode("utf8")
 
868
        self.marker = marker
 
869
        self.log = mklog(logger, self.__class__.__name__, share_id, marker,
 
870
                         share=share_id, parent=parent_id, name=name,
 
871
                         marker=marker)
 
872
 
 
873
    def _start(self):
 
874
        """
 
875
        Do the specialized pre-run setup
 
876
        """
 
877
        return self.demark(self.share_id, self.parent_id)
 
878
 
 
879
    def store_marker_result(self, (share_id, parent_id)):
 
880
        """
 
881
        Called when all the markers are realized.
 
882
        """
 
883
        self.share_id = share_id
 
884
        self.parent_id = parent_id
 
885
 
 
886
    def _run(self):
 
887
        """
 
888
        Do the actual running
 
889
        """
 
890
        maker = getattr(self.action_queue.client, self.client_method)
 
891
        return maker(self.share_id,
 
892
                     self.parent_id,
 
893
                     self.name)
 
894
 
 
895
    def handle_success(self, success):
 
896
        """
 
897
        It worked! Push the event, and update the uuid map.
 
898
        """
 
899
        # note that we're not getting the new name from the answer
 
900
        # message, if we would get it, we would have another Unicode
 
901
        # boundary with it
 
902
        self.action_queue.event_queue.push(self.ok_event_name,
 
903
                                           marker=self.marker,
 
904
                                           new_id=success.new_id)
 
905
        if IMarker.providedBy(self.marker):
 
906
            self.action_queue.uuid_map.set(self.marker, success.new_id)
 
907
        return success
 
908
 
 
909
    def handle_failure(self, failure):
 
910
        """
 
911
        It didn't work! Push the event, and update the uuid map.
 
912
        """
 
913
        self.action_queue.event_queue.push(self.error_event_name,
 
914
                                           marker=self.marker,
 
915
                                           error=failure.getErrorMessage())
 
916
        if IMarker.providedBy(self.marker):
 
917
            self.action_queue.uuid_map.err(self.marker,
 
918
                                           failure)
 
919
 
 
920
 
 
921
class MakeFile(MakeThing):
 
922
    """
 
923
    Make a file
 
924
    """
 
925
    ok_event_name = 'AQ_FILE_NEW_OK'
 
926
    error_event_name = 'AQ_FILE_NEW_ERROR'
 
927
    client_method = 'make_file'
 
928
 
 
929
 
 
930
class MakeDir(MakeThing):
 
931
    """
 
932
    Make a directory
 
933
    """
 
934
    ok_event_name = 'AQ_DIR_NEW_OK'
 
935
    error_event_name = 'AQ_DIR_NEW_ERROR'
 
936
    client_method = 'make_dir'
 
937
 
 
938
 
 
939
class Move(ActionQueueMetaCommand):
 
940
    """
 
941
    Move a file or directory
 
942
    """
 
943
    def __init__(self, action_queue, share_id, node_id, old_parent_id,
 
944
                 new_parent_id, new_name):
 
945
        self.action_queue = action_queue
 
946
        self.share_id = share_id
 
947
        self.node_id = node_id
 
948
        self.old_parent_id = old_parent_id
 
949
        self.new_parent_id = new_parent_id
 
950
        # Unicode boundary! the name is Unicode in protocol and server, but
 
951
        # here we use bytes for paths
 
952
        self.new_name = new_name.decode("utf8")
 
953
        self.log = mklog(logger, self.__class__.__name__, share_id, node_id,
 
954
                         share=share_id, node=node_id,
 
955
                         old_parent=old_parent_id, new_parent=new_parent_id,
 
956
                         new_name=new_name)
 
957
 
 
958
    def _start(self):
 
959
        """
 
960
        Do the specialized pre-run setup
 
961
        """
 
962
        return self.demark(self.share_id, self.node_id, self.new_parent_id)
 
963
 
 
964
    def store_marker_result(self, (share_id, node_id, new_parent_id)):
 
965
        """
 
966
        Called when all the markers are realized.
 
967
        """
 
968
        self.share_id = share_id
 
969
        self.node_id = node_id
 
970
        self.new_parent_id = new_parent_id
 
971
 
 
972
    def _run(self):
 
973
        """
 
974
        Do the actual running
 
975
        """
 
976
        return self.action_queue.client.move(self.share_id,
 
977
                                             self.node_id,
 
978
                                             self.new_parent_id,
 
979
                                             self.new_name)
 
980
    def handle_success(self, success):
 
981
        """
 
982
        It worked! Push the event.
 
983
        """
 
984
        self.action_queue.event_queue.push('AQ_MOVE_OK',
 
985
                                           share_id=self.share_id,
 
986
                                           node_id=self.node_id)
 
987
        return success
 
988
 
 
989
    def handle_failure(self, failure):
 
990
        """
 
991
        It didn't work! Push the event.
 
992
        """
 
993
        self.action_queue.event_queue.push('AQ_MOVE_ERROR',
 
994
                                           error=failure.getErrorMessage(),
 
995
                                           share_id=self.share_id,
 
996
                                           node_id=self.node_id,
 
997
                                           old_parent_id=self.old_parent_id,
 
998
                                           new_parent_id=self.new_parent_id,
 
999
                                           new_name=self.new_name)
 
1000
 
 
1001
 
 
1002
class Unlink(ActionQueueMetaCommand):
 
1003
    """
 
1004
    Unlink a file or dir
 
1005
    """
 
1006
    def __init__(self, action_queue, share_id, parent_id, node_id):
 
1007
        self.action_queue = action_queue
 
1008
        self.share_id = share_id
 
1009
        self.node_id = node_id
 
1010
        self.parent_id = parent_id
 
1011
        self.log = mklog(logger, self.__class__.__name__, share_id, node_id,
 
1012
                         share=share_id, node=node_id, parent=parent_id)
 
1013
 
 
1014
    def _start(self):
 
1015
        """
 
1016
        Do the specialized pre-run setup
 
1017
        """
 
1018
        return self.demark(self.share_id, self.node_id, self.parent_id)
 
1019
 
 
1020
    def store_marker_result(self, (share_id, node_id, parent_id)):
 
1021
        """
 
1022
        Called when all the markers are realized.
 
1023
        """
 
1024
        self.share_id = share_id
 
1025
        self.node_id = node_id
 
1026
        self.parent_id = parent_id
 
1027
 
 
1028
    def _run(self):
 
1029
        """
 
1030
        Do the actual running
 
1031
        """
 
1032
        return self.action_queue.client.unlink(self.share_id, self.node_id)
 
1033
 
 
1034
    def handle_success(self, success):
 
1035
        """
 
1036
        It worked! Push the event.
 
1037
        """
 
1038
        self.action_queue.event_queue.push('AQ_UNLINK_OK',
 
1039
                                           share_id=self.share_id,
 
1040
                                           parent_id=self.parent_id,
 
1041
                                           node_id=self.node_id)
 
1042
        return success
 
1043
 
 
1044
    def handle_failure(self, failure):
 
1045
        """
 
1046
        It didn't work! Push the event.
 
1047
        """
 
1048
        self.action_queue.event_queue.push('AQ_UNLINK_ERROR',
 
1049
                                           error=failure.getErrorMessage(),
 
1050
                                           share_id=self.share_id,
 
1051
                                           parent_id=self.parent_id,
 
1052
                                           node_id=self.node_id)
 
1053
 
 
1054
 
 
1055
class Query(ActionQueueMetaCommand):
 
1056
    """
 
1057
    Ask about the freshness of server hashes
 
1058
    """
 
1059
    def __init__(self, action_queue, items):
 
1060
        self.log = MultiProxy(
 
1061
            [mklog(logger, '(unrolled) query', share, node,
 
1062
                   share=share, node=node, hash=hash, index=i)
 
1063
             for (i, (share, node, hash)) in enumerate(items)])
 
1064
        self.action_queue = action_queue
 
1065
        self.items = items
 
1066
 
 
1067
    def store_marker_result(self, items):
 
1068
        """
 
1069
        Called when all the markers are realized.
 
1070
        """
 
1071
        self.items = items
 
1072
 
 
1073
    def _start(self):
 
1074
        """
 
1075
        Do the specialized pre-run setup
 
1076
        """
 
1077
        # node_hash will (should?) never be a marker, but it's the
 
1078
        # easiest way to keep the trio together: send it along for the
 
1079
        # trip
 
1080
        dl = []
 
1081
        for item in self.items:
 
1082
            d = self.demark(*item)
 
1083
            d.addErrback(self.handle_single_failure, item)
 
1084
            dl.append(d)
 
1085
        d = defer.DeferredList(dl, fireOnOneErrback=True, consumeErrors=True)
 
1086
        d.addCallbacks(self.unwrap)
 
1087
        return d
 
1088
 
 
1089
    def handle_failure(self, failure):
 
1090
        """
 
1091
        It didn't work! Never mind.
 
1092
        """
 
1093
        pass
 
1094
 
 
1095
    def handle_single_failure(self, failure, item):
 
1096
        """
 
1097
        The only failure mode of a Query is for a query to be done
 
1098
        using a marker that fails to realize.
 
1099
        """
 
1100
        self.action_queue.event_queue.push('AQ_QUERY_ERROR', item=item,
 
1101
                                           error=failure.getErrorMessage())
 
1102
        return SKIP_THIS_ITEM
 
1103
 
 
1104
    def _run(self):
 
1105
        """
 
1106
        Do the actual running
 
1107
        """
 
1108
        return self.action_queue.client.query(self.items)
 
1109
 
 
1110
 
 
1111
class ListShares(ActionQueueMetaCommand):
 
1112
    """
 
1113
    List shares shared to me
 
1114
    """
 
1115
    def __init__(self, action_queue):
 
1116
        self.action_queue = action_queue
 
1117
        self.log = mklog(logger, 'list_shares', UNKNOWN, UNKNOWN)
 
1118
 
 
1119
    def _run(self):
 
1120
        """
 
1121
        Do the actual running
 
1122
        """
 
1123
        return self.action_queue.client.list_shares()
 
1124
 
 
1125
    def handle_success(self, success):
 
1126
        """
 
1127
        It worked! Push the event.
 
1128
        """
 
1129
        self.action_queue.event_queue.push('AQ_SHARES_LIST',
 
1130
                                           shares_list=success)
 
1131
 
 
1132
    def handle_failure(self, failure):
 
1133
        """
 
1134
        It didn't work! Push the event.
 
1135
        """
 
1136
        self.action_queue.event_queue.push('AQ_LIST_SHARES_ERROR',
 
1137
                                           error=failure.getErrorMessage())
 
1138
 
 
1139
 
 
1140
class AnswerShare(ActionQueueMetaCommand):
 
1141
    """
 
1142
    Answer a share offer
 
1143
    """
 
1144
    def __init__(self, action_queue, share_id, answer):
 
1145
        self.action_queue = action_queue
 
1146
        self.share_id = share_id
 
1147
        self.answer = answer
 
1148
        self.log = mklog(logger, 'answer_share', share_id, UNKNOWN)
 
1149
 
 
1150
    def _run(self):
 
1151
        """
 
1152
        Do the actual running
 
1153
        """
 
1154
        return self.action_queue.client.accept_share(self.share_id, self.answer)
 
1155
 
 
1156
 
 
1157
class CreateShare(ActionQueueMetaCommand):
 
1158
    """
 
1159
    Offer a share to somebody
 
1160
    """
 
1161
    def __init__(self, action_queue, node_id, share_to, name, access_level,
 
1162
                 marker):
 
1163
        self.action_queue = action_queue
 
1164
        self.node_id = node_id
 
1165
        self.share_to = share_to
 
1166
        self.name = name
 
1167
        self.access_level = access_level
 
1168
        self.marker = marker
 
1169
        self.log = mklog(logger, self.__class__.__name__, UNKNOWN, node_id)
 
1170
 
 
1171
    def store_marker_result(self, (node_id,)):
 
1172
        """
 
1173
        Called when all the markers are realized.
 
1174
        """
 
1175
        self.node_id = node_id
 
1176
 
 
1177
    def _start(self):
 
1178
        """
 
1179
        Do the specialized pre-run setup
 
1180
        """
 
1181
        return self.demark(self.node_id)
 
1182
 
 
1183
    def _run(self):
 
1184
        """
 
1185
        Do the actual running
 
1186
        """
 
1187
        return self.action_queue.client.create_share(self.node_id,
 
1188
                                                     self.share_to,
 
1189
                                                     self.name,
 
1190
                                                     self.access_level)
 
1191
 
 
1192
    def handle_success(self, success):
 
1193
        """
 
1194
        It worked! Push the event.
 
1195
        """
 
1196
        self.action_queue.event_queue.push('AQ_CREATE_SHARE_OK',
 
1197
                                           share_id=success.share_id,
 
1198
                                           marker=self.marker)
 
1199
        return success
 
1200
 
 
1201
    def handle_failure(self, failure):
 
1202
        """
 
1203
        It didn't work! Push the event.
 
1204
        """
 
1205
        self.action_queue.event_queue.push('AQ_CREATE_SHARE_ERROR',
 
1206
                                           marker=self.marker,
 
1207
                                           error=failure.getErrorMessage())
 
1208
 
 
1209
 
 
1210
class GetContentMixin(object):
 
1211
    """
 
1212
    Base for ListDir and Download. It's a mixin (ugh) because
 
1213
    otherwise things would be even more confusing
 
1214
    """
 
1215
    def __init__(self, action_queue, share_id, node_id, server_hash,
 
1216
                 fileobj_factory):
 
1217
        self.action_queue = action_queue
 
1218
        self.share_id = share_id
 
1219
        self.node_id = node_id
 
1220
        self.server_hash = server_hash
 
1221
        self.fileobj_factory = fileobj_factory
 
1222
        self.fileobj = None
 
1223
        self.gunzip = zlib.decompressobj()
 
1224
        self.log = mklog(logger, self.__class__.__name__, share_id, node_id,
 
1225
                         share=share_id, node=node_id, server_hash=server_hash,
 
1226
                         fileobj_factory=fileobj_factory)
 
1227
        if (self.share_id, self.node_id) in self.action_queue.downloading:
 
1228
            self.action_queue.cancel_download(self.share_id, self.node_id)
 
1229
 
 
1230
    def _start(self):
 
1231
        """
 
1232
        Do the specialized pre-run setup
 
1233
        """
 
1234
        return self.demark(self.node_id)
 
1235
 
 
1236
    def store_marker_result(self, (node_id,)):
 
1237
        """
 
1238
        Called when all the markers are realized.
 
1239
        """
 
1240
        self.node_id = node_id
 
1241
 
 
1242
    def _run(self):
 
1243
        """
 
1244
        Do the actual running
 
1245
        """
 
1246
        try:
 
1247
            self.fileobj = self.fileobj_factory()
 
1248
        except StandardError:
 
1249
            return defer.fail(Failure('unable to build fileobj'
 
1250
                                      ' (file went away?)'
 
1251
                                      ' so aborting the download.'))
 
1252
        self.action_queue.downloading[self.share_id,
 
1253
                                      self.node_id] = {'n_bytes_read': 0}
 
1254
 
 
1255
        self.action_queue.event_queue.push('AQ_DOWNLOAD_STARTED',
 
1256
                                           share_id=self.share_id,
 
1257
                                           node_id=self.node_id,
 
1258
                                           server_hash=self.server_hash)
 
1259
 
 
1260
        req = self.action_queue.client.get_content_request(
 
1261
            self.share_id, self.node_id, self.server_hash,
 
1262
            callback=self.cb, node_attr_callback=self.nacb)
 
1263
        self.action_queue.downloading[self.share_id, self.node_id]['req'] = req
 
1264
        d = req.deferred
 
1265
        d.addBoth(passit(lambda _:
 
1266
                             self.action_queue.downloading.pop((self.share_id,
 
1267
                                                                self.node_id))))
 
1268
        d.addErrback(passit(lambda _: self.reset_fileobj()))
 
1269
        return d
 
1270
 
 
1271
    def handle_success(self, _):
 
1272
        """
 
1273
        It worked! Push the event.
 
1274
        """
 
1275
        self.action_queue.event_queue.push('AQ_DOWNLOAD_FINISHED',
 
1276
                                           share_id=self.share_id,
 
1277
                                           node_id=self.node_id,
 
1278
                                           server_hash=self.server_hash)
 
1279
 
 
1280
    def handle_failure(self, failure):
 
1281
        """
 
1282
        It didn't work! Push the event.
 
1283
        """
 
1284
        self.action_queue.event_queue.push('AQ_DOWNLOAD_ERROR',
 
1285
                                           error=failure.getErrorMessage(),
 
1286
                                           share_id=self.share_id,
 
1287
                                           node_id=self.node_id,
 
1288
                                           server_hash=self.server_hash)
 
1289
 
 
1290
    def reset_fileobj(self):
 
1291
        """
 
1292
        Rewind and empty the file (i.e. get it ready to try again if
 
1293
        necessary)
 
1294
        """
 
1295
        if self.fileobj is not None:
 
1296
            self.fileobj.seek(0, 0)
 
1297
            self.fileobj.truncate(0)
 
1298
 
 
1299
    def cb(self, bytes):
 
1300
        """
 
1301
        A streaming decompressor
 
1302
        """
 
1303
        dloading = self.action_queue.downloading[self.share_id,
 
1304
                                                 self.node_id]
 
1305
        dloading['n_bytes_read'] += len(bytes)
 
1306
        self.fileobj.write(self.gunzip.decompress(bytes))
 
1307
        self.fileobj.flush()     # not strictly necessary but nice to
 
1308
                                 # see the downloaded size
 
1309
 
 
1310
    def nacb(self, **kwargs):
 
1311
        """
 
1312
        set the node attrs in the 'currently downloading' dict
 
1313
        """
 
1314
        self.action_queue.downloading[self.share_id,
 
1315
                                      self.node_id].update(kwargs)
 
1316
 
 
1317
    def sync(self, _):
 
1318
        """
 
1319
        Flush the buffers and sync them to disk if possible
 
1320
        """
 
1321
        self.fileobj.write(self.gunzip.flush())
 
1322
        self.fileobj.flush()
 
1323
        if getattr(self.fileobj, 'fileno', None) is not None:
 
1324
            # it's a real file, with a fileno! Let's sync its data
 
1325
            # out to disk
 
1326
            os.fsync(self.fileobj.fileno())
 
1327
        self.fileobj.close()
 
1328
 
 
1329
 
 
1330
class ListDir(GetContentMixin, ActionQueueMetaCommand):
 
1331
    """
 
1332
    Get a listing of a directory's contents
 
1333
    """
 
1334
 
 
1335
class Download(GetContentMixin, ActionQueueContentCommand):
 
1336
    """
 
1337
    Get the contents of a file.
 
1338
    """
 
1339
 
 
1340
class Upload(ActionQueueContentCommand):
 
1341
    """
 
1342
    Upload stuff to a file
 
1343
    """
 
1344
    retryable_errors = (ActionQueueContentCommand.retryable_errors
 
1345
                        | set(['UPLOAD_IN_PROGRESS']))
 
1346
 
 
1347
    def __init__(self, action_queue, share_id, node_id, previous_hash, hash,
 
1348
                 crc32, size, fileobj_factory, tempfile_factory):
 
1349
        self.action_queue = action_queue
 
1350
        self.share_id = share_id
 
1351
        self.node_id = node_id
 
1352
        self.previous_hash = previous_hash
 
1353
        self.hash = hash
 
1354
        self.crc32 = crc32
 
1355
        self.size = size
 
1356
        self.fileobj_factory = fileobj_factory
 
1357
        self.tempfile_factory = tempfile_factory
 
1358
        self.deflated_size = None
 
1359
        self.tempfile = None
 
1360
        self.cancelled = False
 
1361
        self.upload_req = None
 
1362
        self.log = mklog(logger, 'upload', share_id, node_id, share=share_id,
 
1363
                         node=node_id, previous_hash=previous_hash,
 
1364
                         hash=hash, crc32=crc32, size=size,
 
1365
                         fileobj_factory=fileobj_factory)
 
1366
        if (self.share_id, self.node_id) in self.action_queue.uploading:
 
1367
            self.action_queue.cancel_upload(self.share_id, self.node_id)
 
1368
 
 
1369
    def cancel(self):
 
1370
        """Cancel the upload."""
 
1371
        self.cancelled = True
 
1372
        if self.upload_req is not None:
 
1373
            self.upload_req.cancel()
 
1374
 
 
1375
    def _start(self):
 
1376
        """
 
1377
        Do the specialized pre-run setup
 
1378
        """
 
1379
        d = defer.Deferred()
 
1380
 
 
1381
        uploading = {"hash": self.hash, "req": self}
 
1382
        self.action_queue.uploading[self.share_id, self.node_id] = uploading
 
1383
 
 
1384
        d = self.action_queue.zip_queue.zip(self)
 
1385
        d.addCallback(lambda _: self.demark(self.node_id))
 
1386
        return d
 
1387
 
 
1388
    def store_marker_result(self, (node_id,)):
 
1389
        """
 
1390
        Called when all the markers are realized.
 
1391
        """
 
1392
        self.node_id = node_id
 
1393
 
 
1394
    def _run(self):
 
1395
        """
 
1396
        Do the actual running
 
1397
        """
 
1398
        uploading = {"hash": self.hash, "deflated_size": self.deflated_size,
 
1399
                     "req": self}
 
1400
        self.action_queue.uploading[self.share_id, self.node_id] = uploading
 
1401
 
 
1402
        self.action_queue.event_queue.push('AQ_UPLOAD_STARTED',
 
1403
                                           share_id=self.share_id,
 
1404
                                           node_id=self.node_id,
 
1405
                                           hash=self.hash)
 
1406
 
 
1407
        if getattr(self.tempfile, 'name', None) is not None:
 
1408
            self.tempfile = open(self.tempfile.name)
 
1409
        f = UploadProgressWrapper(self.tempfile, uploading)
 
1410
        req = self.action_queue.client.put_content_request(
 
1411
            self.share_id, self.node_id, self.previous_hash, self.hash,
 
1412
            self.crc32, self.size, self.deflated_size, f)
 
1413
        self.upload_req = req
 
1414
        d = req.deferred
 
1415
        d.addBoth(passit(lambda _:
 
1416
                             self.action_queue.uploading.pop((self.share_id,
 
1417
                                                              self.node_id))))
 
1418
        d.addBoth(passit(lambda _: self.tempfile.close()))
 
1419
        return d
 
1420
 
 
1421
    def handle_success(self, _):
 
1422
        """
 
1423
        It worked! Push the event.
 
1424
        """
 
1425
        if getattr(self.tempfile, 'name', None) is not None:
 
1426
            os.unlink(self.tempfile.name)
 
1427
        self.action_queue.event_queue.push('AQ_UPLOAD_FINISHED',
 
1428
                                           share_id=self.share_id,
 
1429
                                           node_id=self.node_id,
 
1430
                                           hash=self.hash)
 
1431
 
 
1432
    def handle_failure(self, failure):
 
1433
        """
 
1434
        It didn't work! Push the event.
 
1435
        """
 
1436
        if getattr(self.tempfile, 'name', None) is not None:
 
1437
            os.unlink(self.tempfile.name)
 
1438
        self.action_queue.event_queue.push('AQ_UPLOAD_ERROR',
 
1439
                                           error=failure.getErrorMessage(),
 
1440
                                           share_id=self.share_id,
 
1441
                                           node_id=self.node_id,
 
1442
                                           hash=self.hash)