~didrocks/ubuntuone-client/dont-suffer-zg-crash

« back to all changes in this revision

Viewing changes to ubuntuone/syncdaemon/action_queue.py

  • Committer: Bazaar Package Importer
  • Author(s): Rodney Dawes
  • Date: 2009-06-30 12:00:00 UTC
  • Revision ID: james.westby@ubuntu.com-20090630120000-by806ovmw3193qe8
Tags: upstream-0.90.3
ImportĀ upstreamĀ versionĀ 0.90.3

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# ubuntuone.syncdaemon.action_queue - Action queue
 
2
#
 
3
# Author: John Lenton <john.lenton@canonical.com>
 
4
#
 
5
# Copyright 2009 Canonical Ltd.
 
6
#
 
7
# This program is free software: you can redistribute it and/or modify it
 
8
# under the terms of the GNU General Public License version 3, as published
 
9
# by the Free Software Foundation.
 
10
#
 
11
# This program is distributed in the hope that it will be useful, but
 
12
# WITHOUT ANY WARRANTY; without even the implied warranties of
 
13
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
 
14
# PURPOSE.  See the GNU General Public License for more details.
 
15
#
 
16
# You should have received a copy of the GNU General Public License along
 
17
# with this program.  If not, see <http://www.gnu.org/licenses/>.
 
18
"""
 
19
The ActionQueue is where actions to be performed on the server are
 
20
queued up and then executed. The idea is that there are two queues,
 
21
one for metadata and another for content; the metadata queue has
 
22
priority over the content queue.
 
23
"""
 
24
from collections import deque, defaultdict
 
25
from functools import wraps, partial
 
26
import logging
 
27
import os
 
28
import random
 
29
import tempfile
 
30
import zlib
 
31
 
 
32
from zope.interface import implements
 
33
from twisted.internet import reactor, defer, ssl
 
34
from twisted.names import client as dns_client
 
35
from twisted.python.failure import Failure
 
36
 
 
37
import uuid
 
38
from ubuntuone.storageprotocol.context import get_ssl_context
 
39
from ubuntuone.storageprotocol import protocol_pb2
 
40
from ubuntuone.storageprotocol.client import StorageClient, \
 
41
    StorageClientFactory
 
42
from ubuntuone.syncdaemon.logger import mklog, TRACE
 
43
from ubuntuone.syncdaemon.interfaces import IActionQueue, \
 
44
    IMarker
 
45
 
 
46
logger = logging.getLogger("ubuntuone.SyncDaemon.ActionQueue")
 
47
 
 
48
# I want something which repr() is "---" *without* the quotes :)
 
49
UNKNOWN = type('', (), {'__repr__': lambda _: '---'})()
 
50
 
 
51
def passit(func):
 
52
    """
 
53
    Pass the value on for the next deferred, while calling func with
 
54
    it.
 
55
    """
 
56
    @wraps(func)
 
57
    def wrapper(a):
 
58
        """
 
59
        Do it.
 
60
        """
 
61
        func(a)
 
62
        return a
 
63
    return wrapper
 
64
 
 
65
 
 
66
class UploadCompressionCancelled(Exception):
 
67
    """Compression of a file for upload cancelled."""
 
68
 
 
69
 
 
70
class RequestCleanedUp(Exception):
 
71
    """
 
72
    The request was cancelled by ActionQueue.cleanup()
 
73
    """
 
74
 
 
75
 
 
76
class NamedTemporaryFile(object):
 
77
    """
 
78
    Like tempfile.NamedTemporaryFile, but working in 2.5 also WRT the
 
79
    delete argument. Actually, one of these NamedTemporaryFile()s is
 
80
    the same as a tempfile.NamedTemporaryFile(delete=False) from 2.6.
 
81
 
 
82
    Or so the theory goes.
 
83
    """
 
84
    def __init__(self):
 
85
        fileno, self.name = tempfile.mkstemp()
 
86
        self._fd = os.fdopen(fileno, 'r+w')
 
87
 
 
88
    def __getattr__(self, attr):
 
89
        """proxy everything else (other than .name) on to self._fd"""
 
90
        return getattr(self._fd, attr)
 
91
 
 
92
 
 
93
class MultiProxy(list):
 
94
    """
 
95
    Proxy many objects of the same kind, like this:
 
96
 
 
97
    >>> m = MultiProxy(['foo', 'bar', 'baz'])
 
98
    >>> m.upper()
 
99
    MultiProxy(['FOO', 'BAR', 'BAZ'])
 
100
    """
 
101
 
 
102
    def __getattr__(self, attr):
 
103
        return MultiProxy(getattr(i, attr) for i in self)
 
104
 
 
105
    def __call__(self, *args, **kwargs):
 
106
        return MultiProxy(i(*args, **kwargs) for i in self)
 
107
 
 
108
    def __repr__(self):
 
109
        return 'MultiProxy(%s)' % (super(MultiProxy, self).__repr__(),)
 
110
 
 
111
 
 
112
class LoggingStorageClient(StorageClient):
 
113
    """ A subclass of StorageClient that adds logging to
 
114
    processMessage and sendMessage.
 
115
    """
 
116
 
 
117
    def __init__(self):
 
118
        """ setup logging and create the instance. """
 
119
        StorageClient.__init__(self)
 
120
        self.log = logging.getLogger('ubuntuone.SyncDaemon.StorageClient')
 
121
        # configure the handler level to be < than DEBUG
 
122
        self.log.setLevel(TRACE)
 
123
        self.log.debug = partial(self.log.log, TRACE)
 
124
 
 
125
    def processMessage(self, message):
 
126
        """ wrapper that logs the message and result. """
 
127
        # don't log the full message if it's of type BYTES
 
128
        if message.type == protocol_pb2.Message.BYTES:
 
129
            self.log.debug('start - processMessage: id: %s, type: %s',
 
130
                           message.id, message.type)
 
131
        else:
 
132
            self.log.debug('start - processMessage: %s',
 
133
                          str(message).replace("\n", " "))
 
134
        if message.id in self.requests:
 
135
            req = self.requests[message.id]
 
136
            req.deferred.addCallbacks(self.log_success, self.log_error)
 
137
        result = StorageClient.processMessage(self, message)
 
138
        self.log.debug('end - processMessage: id: %s - result: %s',
 
139
                       message.id, result)
 
140
        return result
 
141
 
 
142
    def log_error(self, failure):
 
143
        """ logging errback for requests """
 
144
        self.log.debug('request error: %s', failure)
 
145
        return failure
 
146
 
 
147
    def log_success(self, result):
 
148
        """ logging callback for requests """
 
149
        self.log.debug('request finished: %s', result)
 
150
        if getattr(result, '__dict__', None):
 
151
            self.log.debug('result.__dict__: %s', result.__dict__)
 
152
        return result
 
153
 
 
154
    def sendMessage(self, message):
 
155
        """ wrapper that logs the message and result. """
 
156
        # don't log the full message if it's of type BYTES
 
157
        if message.type == protocol_pb2.Message.BYTES:
 
158
            self.log.debug('start - sendMessage: id: %s, type: %s',
 
159
                           message.id, message.type)
 
160
        else:
 
161
            self.log.debug('start - sendMessage: %s',
 
162
                          str(message).replace("\n", " "))
 
163
        result = StorageClient.sendMessage(self, message)
 
164
        self.log.debug('end - sendMessage: id: %s', message.id)
 
165
        return result
 
166
 
 
167
 
 
168
class ActionQueueProtocol(LoggingStorageClient):
 
169
    """
 
170
    This is the Action Queue version of the StorageClient protocol.
 
171
    """
 
172
    connection_state = 'disconnected'
 
173
    factory = None
 
174
 
 
175
    def connectionMade(self):
 
176
        """
 
177
        Called when a new connection is made.
 
178
        All the state is saved in the factory.
 
179
        """
 
180
        logger.debug("connection made")
 
181
        self.connection_state = 'connected'
 
182
        self.factory.client = self
 
183
        self.set_node_state_callback(self.factory._node_state_callback)
 
184
        self.set_share_change_callback(self.factory._share_change_callback)
 
185
        self.set_share_answer_callback(self.factory._share_answer_callback)
 
186
        self.factory.event_queue.push('SYS_CONNECTION_MADE')
 
187
 
 
188
    def disconnect(self):
 
189
        """
 
190
        Close down the sockets
 
191
        """
 
192
        logger.debug("disconnected")
 
193
        if self.transport is not None:
 
194
            self.transport.loseConnection()
 
195
 
 
196
    def connectionLost(self, failure):
 
197
        """
 
198
        The connection went down, for some reason (which might or
 
199
        might not be described in failure).
 
200
        """
 
201
        logger.warning('connection lost: %s' % failure.getErrorMessage())
 
202
        self.factory.event_queue.push('SYS_CONNECTION_LOST')
 
203
        LoggingStorageClient.connectionLost(self, failure)
 
204
 
 
205
 
 
206
class Marker(str):
 
207
    """
 
208
    A uuid4-based marker class
 
209
    """
 
210
    implements(IMarker)
 
211
    def __new__(self):
 
212
        return super(Marker, self).__new__(self, uuid.uuid4())
 
213
 
 
214
 
 
215
class ZipQueue(object):
 
216
    """
 
217
    A queue of files to be compressed for upload
 
218
 
 
219
    Parts of this were shamelessly copied from
 
220
    twisted.internet.defer.DeferredSemaphore
 
221
 
 
222
    see bug #373984
 
223
    """
 
224
    def __init__(self):
 
225
        self.waiting = deque()
 
226
        self.tokens = self.limit = 10
 
227
 
 
228
    def acquire(self):
 
229
        """
 
230
        return a deferred which fires on token acquisition.
 
231
        """
 
232
        assert self.tokens >= 0, "Tokens should never be negative"
 
233
        d = defer.Deferred()
 
234
        if not self.tokens:
 
235
            self.waiting.append(d)
 
236
        else:
 
237
            self.tokens = self.tokens - 1
 
238
            d.callback(self)
 
239
        return d
 
240
 
 
241
    def release(self):
 
242
        """
 
243
        Release the token.
 
244
 
 
245
        Should be called by whoever did the acquire() when the shared
 
246
        resource is free.
 
247
        """
 
248
        assert self.tokens < self.limit, "Too many tokens!"
 
249
        self.tokens = self.tokens + 1
 
250
        if self.waiting:
 
251
            # someone is waiting to acquire token
 
252
            self.tokens = self.tokens - 1
 
253
            d = self.waiting.popleft()
 
254
            d.callback(self)
 
255
 
 
256
    def _compress(self, deferred, upload):
 
257
        """Compression background task."""
 
258
        try:
 
259
            fileobj = upload.fileobj_factory()
 
260
        except StandardError:
 
261
            # presumably the user deleted the file before we got to
 
262
            # upload it. Logging a warning just in case.
 
263
            upload.log.warn('unable to build fileobj'
 
264
                            ' (user deleted the file, maybe?)'
 
265
                            ' so cancelling the upload.')
 
266
            upload.cancel()
 
267
            fileobj = None
 
268
 
 
269
        filename = getattr(fileobj, 'name', '<?>')
 
270
 
 
271
        upload.log.debug('compressing', filename)
 
272
        try:
 
273
            # we need to compress the file completely to figure out its
 
274
            # compressed size. So streaming is out :(
 
275
            if upload.tempfile_factory is None:
 
276
                f = NamedTemporaryFile()
 
277
            else:
 
278
                f = upload.tempfile_factory()
 
279
            zipper = zlib.compressobj()
 
280
            while not upload.cancelled:
 
281
                data = fileobj.read(4096)
 
282
                if not data:
 
283
                    f.write(zipper.flush())
 
284
                    # no flush/sync because we don't need this to persist
 
285
                    # on disk; if the machine goes down, we'll lose it
 
286
                    # anyway (being in /tmp and all)
 
287
                    break
 
288
                f.write(zipper.compress(data))
 
289
            if upload.cancelled:
 
290
                raise UploadCompressionCancelled("Cancelled")
 
291
            upload.deflated_size = f.tell()
 
292
            # close the compressed file (thus, if you actually want to stream
 
293
            # it out, it must have a name so it can be reopnened)
 
294
            f.close()
 
295
            upload.tempfile = f
 
296
        except Exception, e: # pylint: disable-msg=W0703
 
297
            reactor.callFromThread(deferred.errback, e)
 
298
        else:
 
299
            reactor.callFromThread(deferred.callback, True)
 
300
 
 
301
    def zip(self, upload):
 
302
        """
 
303
        Acquire, do the compression in a thread, release.
 
304
        """
 
305
        d_zip = defer.Deferred()
 
306
        d_lck = self.acquire()
 
307
        d_lck.addCallback(
 
308
            lambda _: reactor.callFromThread(self._compress,
 
309
                                             d_zip, upload) or d_zip)
 
310
        d_lck.addCallback(lambda _: self.release())
 
311
 
 
312
        return d_lck
 
313
 
 
314
 
 
315
class RequestQueue(object):
 
316
    """
 
317
    RequestQueue is a queue that ensures that there is at most one
 
318
    request at a time 'on the wire', and that uses the action queue's
 
319
    states for its syncrhonization.
 
320
    """
 
321
    def __init__(self, name, action_queue):
 
322
        super(RequestQueue, self).__init__()
 
323
        self.name = name
 
324
        self.action_queue = action_queue
 
325
        self.waiting = deque()
 
326
        self.head = None
 
327
        self.paused = True
 
328
 
 
329
    def __len__(self):
 
330
        """return the length of the queue"""
 
331
        return len(self.waiting)
 
332
 
 
333
    def queue(self, func, *args, **kwargs):
 
334
        """
 
335
        Add a call to func to the queue.
 
336
        """
 
337
        d = defer.Deferred()
 
338
        d.addCallback(lambda _: defer.maybeDeferred(func, *args, **kwargs))
 
339
        self.waiting.append(d)
 
340
        if len(self.waiting) == 1 and not self.head:
 
341
            self.action_queue.event_queue.push('SYS_' + self.name
 
342
                                               + '_WAITING')
 
343
        return d
 
344
 
 
345
    def run(self):
 
346
        """
 
347
        Empty the queue.
 
348
        """
 
349
        if self.waiting:
 
350
            d = self.waiting.popleft()
 
351
            d.addBoth(passit(lambda _: self.run()))
 
352
            d.callback(None)
 
353
        else:
 
354
            self.action_queue.event_queue.push('SYS_' + self.name + '_DONE')
 
355
 
 
356
class ContentQueue(RequestQueue):
 
357
    """
 
358
    A content queue is a queue of content requests (uploads and downloads).
 
359
    """
 
360
    def run(self):
 
361
        """
 
362
        Empty the queue.
 
363
        """
 
364
        if self.waiting:
 
365
            d = self.waiting.popleft()
 
366
            d.addBoth(passit(lambda _: self.action_queue.event_queue.push(
 
367
                        'SYS_' + self.name + '_DONE')))
 
368
            d.addBoth(passit(lambda _: self.action_queue.event_queue.push(
 
369
                        'SYS_' + self.name + '_WAITING')))
 
370
            d.callback(None)
 
371
        else:
 
372
            self.action_queue.event_queue.push('SYS_' + self.name + '_DONE')
 
373
 
 
374
 
 
375
class MetaQueue(RequestQueue):
 
376
    """
 
377
    A meta queue is a queue of metadata-related requests.
 
378
    """
 
379
 
 
380
 
 
381
class DeferredMap(object):
 
382
    """
 
383
    A mapping of deferred values. Or a deferred map of values. Or a
 
384
    mapping that returns deferreds and then fires them when it has the
 
385
    value.
 
386
    """
 
387
    def __init__(self):
 
388
        self.waiting = defaultdict(list)
 
389
        self.failed = {}
 
390
        self.map = {}
 
391
 
 
392
    def get(self, key):
 
393
        """
 
394
        Get the value for the given key.
 
395
 
 
396
        This always returns a deferred; when we already know the value
 
397
        we return a `succeed`, and if we don't know the value because
 
398
        it failed we return a `fail`; otherwise we return a plain
 
399
        unfired `Deferred`, and add it to the list of deferreds to
 
400
        call when we actually get the value.
 
401
        """
 
402
        if key in self.map:
 
403
            return defer.succeed(self.map[key])
 
404
        if key in self.failed:
 
405
            return defer.fail(Exception(self.failed[key]))
 
406
        d = defer.Deferred()
 
407
        self.waiting[key].append(d)
 
408
        return d
 
409
 
 
410
    def set(self, key, value):
 
411
        """
 
412
        We've got the value for a key! Write it down in the map, and
 
413
        fire the waiting deferreds.
 
414
        """
 
415
        if key not in self.map:
 
416
            self.map[key] =  value
 
417
            for d in self.waiting.pop(key, ()):
 
418
                d.callback(value)
 
419
        elif self.map[key] != value:
 
420
            if key in self.map:
 
421
                raise KeyError("key is taken -- dunno what to do")
 
422
 
 
423
    def err(self, key, failure):
 
424
        """
 
425
        Something went terribly wrong in the process of getting a
 
426
        value. Break the news to the waiting deferreds.
 
427
        """
 
428
        self.failed[key] = failure.getErrorMessage()
 
429
        for d in self.waiting.pop(key, ()):
 
430
            d.errback(failure)
 
431
 
 
432
 
 
433
class UploadProgressWrapper(object):
 
434
    """
 
435
    A wrapper around the file-like object used for Uploads, with which
 
436
    we can keep track of the number of bytes that have been written to
 
437
    the store.
 
438
    """
 
439
    def __init__(self, fd, data_dict):
 
440
        """
 
441
        fd is the file-like object used for uploads. data_dict is the
 
442
        entry in the uploading dictionary.
 
443
        """
 
444
        self.fd = fd
 
445
        self.data_dict = data_dict
 
446
        self.n_bytes_read = 0
 
447
 
 
448
    def read(self, size=None):
 
449
        """
 
450
        read at most size bytes from the file-like object.
 
451
 
 
452
        Keep track of the number of bytes that have been read, and the
 
453
        number of bytes that have been written (assumed to be equal to
 
454
        the number of bytes read on the previews call to read). The
 
455
        latter is done directly in the data_dict.
 
456
        """
 
457
        self.data_dict['n_bytes_written'] = self.n_bytes_read
 
458
        data = self.fd.read(size)
 
459
        self.n_bytes_read += len(data)
 
460
        return data
 
461
 
 
462
    def __getattr__(self, attr):
 
463
        """
 
464
        Proxy all the rest.
 
465
        """
 
466
        return getattr(self.fd, attr)
 
467
 
 
468
 
 
469
class ActionQueue(StorageClientFactory, object):
 
470
    """
 
471
    This is the ActionQueue itself.
 
472
    """
 
473
    implements(IActionQueue)
 
474
    protocol = ActionQueueProtocol
 
475
 
 
476
    def __init__(self, event_queue, host, port, dns_srv,
 
477
                 use_ssl=False, disable_ssl_verify=False):
 
478
        self.event_queue = event_queue
 
479
        self.host = host
 
480
        self.port = port
 
481
        self.dns_srv = dns_srv
 
482
        self.use_ssl = use_ssl
 
483
        self.disable_ssl_verify = disable_ssl_verify
 
484
 
 
485
        self.token = None
 
486
        self.client = None
 
487
        self.deferred = None
 
488
 
 
489
        self.content_queue = ContentQueue('CONTENT_QUEUE', self)
 
490
        self.meta_queue = MetaQueue('META_QUEUE', self)
 
491
        self.uuid_map = DeferredMap()
 
492
        self.zip_queue = ZipQueue()
 
493
 
 
494
        self.uploading = {}
 
495
        self.downloading = {}
 
496
 
 
497
        event_queue.subscribe(self)
 
498
 
 
499
    def handle_SYS_CONNECT(self, access_token):
 
500
        """
 
501
        Stow the access token away for later use
 
502
        """
 
503
        self.token = access_token
 
504
 
 
505
    def cleanup(self):
 
506
        """
 
507
        Cancel, clean up, and reschedule things that were in progress
 
508
        when a disconnection happened
 
509
        """
 
510
        self.event_queue.push('SYS_CLEANUP_STARTED')
 
511
        if self.client is not None:
 
512
            self.client.disconnect()
 
513
        for queue in self.meta_queue, self.content_queue:
 
514
            if queue.head is not None:
 
515
                queue.head.errback(RequestCleanedUp('Cleaned up'))
 
516
        self.event_queue.push('SYS_CLEANUP_FINISHED')
 
517
 
 
518
 
 
519
    def _node_state_callback(self, share_id, node_id, hash):
 
520
        """
 
521
        Called by the client when notified that node changed.
 
522
        """
 
523
        self.event_queue.push('SV_HASH_NEW',
 
524
                              share_id=share_id, node_id=node_id, hash=hash)
 
525
 
 
526
    def _share_change_callback(self, message, info):
 
527
        """
 
528
        Called by the client when notified that a share changed.
 
529
        """
 
530
        self.event_queue.push('SV_SHARE_CHANGED',
 
531
                              message=message, info=info)
 
532
 
 
533
    def _share_answer_callback(self, share_id, answer):
 
534
        """
 
535
        Called by the client when it gets a share answer notification.
 
536
        """
 
537
        self.event_queue.push('SV_SHARE_ANSWERED',
 
538
                              share_id=str(share_id), answer=answer)
 
539
 
 
540
    def _lookup_srv(self):
 
541
        """ do the SRV lookup and return a deferred whose callback is going
 
542
        to be called with (host, port). If we can't do the lookup, the default
 
543
        host, port is used.
 
544
        """
 
545
        def on_lookup_ok(results):
 
546
            """Get a random host from the SRV result."""
 
547
            logger.debug('SRV lookup done, choosing a server')
 
548
            records, auth, add = results
 
549
            if not records:
 
550
                raise ValueError("No available records.")
 
551
            # pick a random server
 
552
            record = random.choice(records)
 
553
            logger.debug('Using record: %r', record)
 
554
            if record.payload:
 
555
                return record.payload.target.name, record.payload.port
 
556
            else:
 
557
                logger.info('Empty SRV record, fallback to %r:%r',
 
558
                            self.host, self.port)
 
559
                return self.host, self.port
 
560
 
 
561
        def on_lookup_error(failure):
 
562
            """ return the default host/post on a DNS SRV lookup failure. """
 
563
            logger.info("SRV lookup error, fallback to %r:%r \n%s",
 
564
                        self.host, self.port, failure.getTraceback())
 
565
            return self.host, self.port
 
566
 
 
567
        if self.dns_srv:
 
568
            # lookup the DNS SRV records
 
569
            d = dns_client.lookupService(self.dns_srv, timeout=[3, 2])
 
570
            d.addCallback(on_lookup_ok)
 
571
            d.addErrback(on_lookup_error)
 
572
            return d
 
573
        else:
 
574
            return defer.succeed((self.host, self.port))
 
575
 
 
576
    def connect(self):
 
577
        """
 
578
        Start the circus going.
 
579
        """
 
580
        self.deferred = defer.Deferred()
 
581
        d = self._lookup_srv()
 
582
        def _connect(result):
 
583
            """ do the real thing """
 
584
            host, port = result
 
585
            sslContext = get_ssl_context(self.disable_ssl_verify)
 
586
 
 
587
            if self.use_ssl:
 
588
                reactor.connectSSL(host, port, self, sslContext)
 
589
            else:
 
590
                reactor.connectTCP(host, port, self)
 
591
        d.addCallback(_connect)
 
592
        return self.deferred
 
593
 
 
594
    def conectionFailed(self, reason=None):
 
595
        """
 
596
        Called when the connect() call fails
 
597
        """
 
598
        self.deferred.errback(reason)
 
599
 
 
600
    def get_root(self, marker):
 
601
        """
 
602
        Get the user's root uuid. Use the uuid_map, so the caller can
 
603
        use the marker in followup operations.
 
604
        """
 
605
        log = mklog(logger, 'get_root', '', marker, marker=marker)
 
606
        log.debug('starting')
 
607
        d = self.client.get_root()
 
608
        d.addCallbacks(*log.callbacks())
 
609
        d.addCallbacks(passit(lambda root: self.uuid_map.set(marker, root)),
 
610
                       passit(lambda f: self.uuid_map.err(marker, f)))
 
611
 
 
612
        return d
 
613
 
 
614
    def make_file(self, share_id, parent_id, name, marker):
 
615
        """
 
616
        See .interfaces.IMetaQueue
 
617
        """
 
618
        return MakeFile(self, share_id, parent_id, name, marker).start()
 
619
 
 
620
    def make_dir(self, share_id, parent_id, name, marker):
 
621
        """
 
622
        See .interfaces.IMetaQueue
 
623
        """
 
624
        return MakeDir(self, share_id, parent_id, name, marker).start()
 
625
 
 
626
    def move(self, share_id, node_id, old_parent_id, new_parent_id, new_name):
 
627
        """
 
628
        See .interfaces.IMetaQueue
 
629
        """
 
630
        return Move(self, share_id, node_id, old_parent_id,
 
631
                    new_parent_id, new_name).start()
 
632
 
 
633
    def unlink(self, share_id, parent_id, node_id):
 
634
        """
 
635
        See .interfaces.IMetaQueue
 
636
        """
 
637
        return Unlink(self, share_id, parent_id, node_id).start()
 
638
 
 
639
    def query(self, items):
 
640
        """
 
641
        See .interfaces.IMetaQueue
 
642
        """
 
643
        return Query(self, items).start()
 
644
 
 
645
    def list_shares(self):
 
646
        """
 
647
        List the shares; put the result on the event queue
 
648
        """
 
649
        return ListShares(self).start()
 
650
 
 
651
    def answer_share(self, share_id, answer):
 
652
        """
 
653
        Answer the offer of a share.
 
654
        """
 
655
        return AnswerShare(self, share_id, answer).start()
 
656
 
 
657
    def create_share(self, node_id, share_to, name, access_level, marker):
 
658
        """
 
659
        Share a node with somebody.
 
660
        """
 
661
        return CreateShare(self, node_id, share_to, name, access_level,
 
662
                           marker).start()
 
663
 
 
664
    def listdir(self, share_id, node_id, server_hash, fileobj_factory):
 
665
        """
 
666
        See .interfaces.IMetaQueue.listdir
 
667
        """
 
668
        return ListDir(self, share_id, node_id, server_hash,
 
669
                       fileobj_factory).start()
 
670
 
 
671
    def download(self, share_id, node_id, server_hash, fileobj_factory):
 
672
        """
 
673
        See .interfaces.IContentQueue.download
 
674
        """
 
675
        return Download(self, share_id, node_id, server_hash,
 
676
                        fileobj_factory).start()
 
677
 
 
678
    def upload(self, share_id, node_id, previous_hash, hash, crc32,
 
679
               size, fileobj_factory, tempfile_factory=None):
 
680
        """
 
681
        See .interfaces.IContentQueue
 
682
        """
 
683
        return Upload(self, share_id, node_id, previous_hash, hash,
 
684
                      crc32, size, fileobj_factory, tempfile_factory).start()
 
685
 
 
686
    def cancel_upload(self, share_id, node_id):
 
687
        """
 
688
        See .interfaces.IContentQueue
 
689
        """
 
690
        log = mklog(logger, 'cancel_upload', share_id, node_id,
 
691
                    share=share_id, node=node_id)
 
692
        log.debug('starting')
 
693
        if (share_id, node_id) in self.uploading:
 
694
            req = self.uploading[share_id, node_id].get('req')
 
695
            if req is not None:
 
696
                req.cancel()
 
697
                log.debug("cancelled")
 
698
        log.debug('finished')
 
699
 
 
700
    def cancel_download(self, share_id, node_id):
 
701
        """
 
702
        See .interfaces.IContentQueue
 
703
        """
 
704
        log = mklog(logger, 'cancel_download', share_id, node_id,
 
705
                    share=share_id, node=node_id)
 
706
        log.debug('starting')
 
707
        if (share_id, node_id) in self.downloading:
 
708
            req = self.downloading[share_id, node_id].get('req')
 
709
            if req is not None:
 
710
                req.cancel()
 
711
                log.debug("cancelled")
 
712
        log.debug('finished')
 
713
 
 
714
 
 
715
SKIP_THIS_ITEM = object()
 
716
 
 
717
# pylint: disable-msg=W0231
 
718
 
 
719
class ActionQueueCommand(object):
 
720
    """
 
721
    Base of all the action queue commands
 
722
    """
 
723
 
 
724
    # protobuf doesn't seem to have much introspectionable stuff
 
725
    # without going into private attributes
 
726
    known_error_messages = (set(protocol_pb2._ERROR_ERRORTYPE.values_by_name)
 
727
                            | set(['CANCELLED']))
 
728
    suppressed_error_messages = (known_error_messages
 
729
                                 - set(['INTERNAL_ERROR'])
 
730
                                 | set(['Cleaned up']))
 
731
    retryable_errors = set(['Cleaned up', 'TRY_AGAIN'])
 
732
 
 
733
    def demark(self, *maybe_markers):
 
734
        """
 
735
        Arrange to have maybe_markers realized
 
736
        """
 
737
        l = []
 
738
        for marker in maybe_markers:
 
739
            if IMarker.providedBy(marker):
 
740
                self.log.debug('waiting until we know the real value of %s'
 
741
                               % marker)
 
742
                d = self.action_queue.uuid_map.get(marker)
 
743
                d.addCallbacks(passit(lambda _:
 
744
                                          self.log.debug('got %s' % marker)),
 
745
                               passit(lambda f:
 
746
                                          self.log.error('failed %s' % marker)))
 
747
            else:
 
748
                d = defer.succeed(marker)
 
749
            l.append(d)
 
750
        dl = defer.DeferredList(l, fireOnOneErrback=True, consumeErrors=True)
 
751
        dl.addCallbacks(self.unwrap,
 
752
                        lambda f: f.value.subFailure)
 
753
        return dl
 
754
 
 
755
    @staticmethod
 
756
    def unwrap(results):
 
757
        """
 
758
        Unpack the values from the result of a DeferredList. If
 
759
        there's a failure, return it instead.
 
760
        """
 
761
        values = []
 
762
        for result in results:
 
763
            # result can be none if one of the callbacks failed
 
764
            # before the others were ready
 
765
            if result is not None:
 
766
                is_ok, value = result
 
767
                if not is_ok:
 
768
                    # a failure!
 
769
                    return value
 
770
                if value is not SKIP_THIS_ITEM:
 
771
                    values.append(value)
 
772
        return values
 
773
 
 
774
 
 
775
    def end_callback(self, arg):
 
776
        """
 
777
        It worked!
 
778
        """
 
779
        self._queue.head = None
 
780
        self.log.debug('success')
 
781
        return self.handle_success(arg)
 
782
 
 
783
    def end_errback(self, failure):
 
784
        """
 
785
        It failed!
 
786
        """
 
787
        self._queue.head = None
 
788
        error_message = failure.getErrorMessage()
 
789
        if error_message not in self.suppressed_error_messages:
 
790
            self.log.error('failure', error_message)
 
791
            self.log.debug('traceback follows:\n\n' + failure.getTraceback())
 
792
        else:
 
793
            self.log.warn('failure', error_message)
 
794
        self.cleanup()
 
795
        if error_message in self.retryable_errors:
 
796
            reactor.callLater(0.1, self.retry)
 
797
        else:
 
798
            return self.handle_failure(failure)
 
799
 
 
800
    def start(self, _=None):
 
801
        """
 
802
        Get ready to run, and then run.
 
803
 
 
804
        The default implementation is for when there is no preparation necessary
 
805
        """
 
806
        d = self._start()
 
807
        d.addCallback(self.store_marker_result)
 
808
        d.addCallback(lambda _: self.log.debug('queueing in the %s'
 
809
                                               % self._queue.name))
 
810
        d.addCallbacks(lambda _: self._queue.queue(self.run),
 
811
                       self.handle_failure)
 
812
        return d
 
813
 
 
814
    def cleanup(self):
 
815
        """
 
816
        Do whatever is needed to clean up from a failure (such as stop
 
817
        producers and other such that aren't cleaned up appropriately
 
818
        on their own)
 
819
 
 
820
        The default implementation does absolutely nothing.
 
821
        """
 
822
 
 
823
    def _start(self):
 
824
        """
 
825
        Do the specialized pre-run setup
 
826
        """
 
827
        return defer.succeed(None)
 
828
 
 
829
    def store_marker_result(self, _):
 
830
        """
 
831
        Called when all the markers are realized.
 
832
        """
 
833
 
 
834
    def run(self):
 
835
        """
 
836
        Do the deed.
 
837
        """
 
838
        self.log.debug('starting')
 
839
        self._queue.head = d = self._run()
 
840
        d.addCallbacks(self.end_callback, self.end_errback)
 
841
        return d
 
842
 
 
843
    def handle_success(self, success):
 
844
        """
 
845
        Do anthing that's needed to handle success of the operation.
 
846
        """
 
847
        return success
 
848
 
 
849
    def handle_failure(self, failure):
 
850
        """
 
851
        Do anthing that's needed to handle failure of the operation.
 
852
        Note that cancellation and TRY_AGAIN are already handled.
 
853
        """
 
854
        return failure
 
855
 
 
856
    def retry(self):
 
857
        """
 
858
        Request cancelled or TRY_AGAIN. Well then, try again!
 
859
        """
 
860
        return self._queue.queue(self.run)
 
861
 
 
862
 
 
863
 
 
864
class ActionQueueMetaCommand(ActionQueueCommand):
 
865
    """
 
866
    Base of metadata-related commands (commands that are queued in the
 
867
    meta queue)
 
868
    """
 
869
    @property
 
870
    def _queue(self):
 
871
        """
 
872
        Get at the meta queue
 
873
        """
 
874
        return self.action_queue.meta_queue
 
875
 
 
876
 
 
877
class ActionQueueContentCommand(ActionQueueCommand):
 
878
    """
 
879
    Base of content-related commands (commands that are queued in the
 
880
    content queue)
 
881
    """
 
882
    @property
 
883
    def _queue(self):
 
884
        """
 
885
        Get at the content queue
 
886
        """
 
887
        return self.action_queue.content_queue
 
888
 
 
889
 
 
890
class MakeThing(ActionQueueMetaCommand):
 
891
    """
 
892
    Base of MakeFile and MakeDir
 
893
    """
 
894
    def __init__(self, action_queue, share_id, parent_id, name, marker):
 
895
        self.action_queue = action_queue
 
896
        self.share_id = share_id
 
897
        self.parent_id = parent_id
 
898
        # Unicode boundary! the name is Unicode in protocol and server, but
 
899
        # here we use bytes for paths
 
900
        self.name = name.decode("utf8")
 
901
        self.marker = marker
 
902
        self.log = mklog(logger, self.__class__.__name__, share_id, marker,
 
903
                         share=share_id, parent=parent_id, name=name,
 
904
                         marker=marker)
 
905
 
 
906
    def _start(self):
 
907
        """
 
908
        Do the specialized pre-run setup
 
909
        """
 
910
        return self.demark(self.share_id, self.parent_id)
 
911
 
 
912
    def store_marker_result(self, (share_id, parent_id)):
 
913
        """
 
914
        Called when all the markers are realized.
 
915
        """
 
916
        self.share_id = share_id
 
917
        self.parent_id = parent_id
 
918
 
 
919
    def _run(self):
 
920
        """
 
921
        Do the actual running
 
922
        """
 
923
        maker = getattr(self.action_queue.client, self.client_method)
 
924
        return maker(self.share_id,
 
925
                     self.parent_id,
 
926
                     self.name)
 
927
 
 
928
    def handle_success(self, success):
 
929
        """
 
930
        It worked! Push the event, and update the uuid map.
 
931
        """
 
932
        # note that we're not getting the new name from the answer
 
933
        # message, if we would get it, we would have another Unicode
 
934
        # boundary with it
 
935
        self.action_queue.event_queue.push(self.ok_event_name,
 
936
                                           marker=self.marker,
 
937
                                           new_id=success.new_id)
 
938
        if IMarker.providedBy(self.marker):
 
939
            self.action_queue.uuid_map.set(self.marker, success.new_id)
 
940
        return success
 
941
 
 
942
    def handle_failure(self, failure):
 
943
        """
 
944
        It didn't work! Push the event, and update the uuid map.
 
945
        """
 
946
        self.action_queue.event_queue.push(self.error_event_name,
 
947
                                           marker=self.marker,
 
948
                                           error=failure.getErrorMessage())
 
949
        if IMarker.providedBy(self.marker):
 
950
            self.action_queue.uuid_map.err(self.marker,
 
951
                                           failure)
 
952
 
 
953
 
 
954
class MakeFile(MakeThing):
 
955
    """
 
956
    Make a file
 
957
    """
 
958
    ok_event_name = 'AQ_FILE_NEW_OK'
 
959
    error_event_name = 'AQ_FILE_NEW_ERROR'
 
960
    client_method = 'make_file'
 
961
 
 
962
 
 
963
class MakeDir(MakeThing):
 
964
    """
 
965
    Make a directory
 
966
    """
 
967
    ok_event_name = 'AQ_DIR_NEW_OK'
 
968
    error_event_name = 'AQ_DIR_NEW_ERROR'
 
969
    client_method = 'make_dir'
 
970
 
 
971
 
 
972
class Move(ActionQueueMetaCommand):
 
973
    """
 
974
    Move a file or directory
 
975
    """
 
976
    def __init__(self, action_queue, share_id, node_id, old_parent_id,
 
977
                 new_parent_id, new_name):
 
978
        self.action_queue = action_queue
 
979
        self.share_id = share_id
 
980
        self.node_id = node_id
 
981
        self.old_parent_id = old_parent_id
 
982
        self.new_parent_id = new_parent_id
 
983
        # Unicode boundary! the name is Unicode in protocol and server, but
 
984
        # here we use bytes for paths
 
985
        self.new_name = new_name.decode("utf8")
 
986
        self.log = mklog(logger, self.__class__.__name__, share_id, node_id,
 
987
                         share=share_id, node=node_id,
 
988
                         old_parent=old_parent_id, new_parent=new_parent_id,
 
989
                         new_name=new_name)
 
990
 
 
991
    def _start(self):
 
992
        """
 
993
        Do the specialized pre-run setup
 
994
        """
 
995
        return self.demark(self.share_id, self.node_id, self.new_parent_id)
 
996
 
 
997
    def store_marker_result(self, (share_id, node_id, new_parent_id)):
 
998
        """
 
999
        Called when all the markers are realized.
 
1000
        """
 
1001
        self.share_id = share_id
 
1002
        self.node_id = node_id
 
1003
        self.new_parent_id = new_parent_id
 
1004
 
 
1005
    def _run(self):
 
1006
        """
 
1007
        Do the actual running
 
1008
        """
 
1009
        return self.action_queue.client.move(self.share_id,
 
1010
                                             self.node_id,
 
1011
                                             self.new_parent_id,
 
1012
                                             self.new_name)
 
1013
    def handle_success(self, success):
 
1014
        """
 
1015
        It worked! Push the event.
 
1016
        """
 
1017
        self.action_queue.event_queue.push('AQ_MOVE_OK',
 
1018
                                           share_id=self.share_id,
 
1019
                                           node_id=self.node_id)
 
1020
        return success
 
1021
 
 
1022
    def handle_failure(self, failure):
 
1023
        """
 
1024
        It didn't work! Push the event.
 
1025
        """
 
1026
        self.action_queue.event_queue.push('AQ_MOVE_ERROR',
 
1027
                                           error=failure.getErrorMessage(),
 
1028
                                           share_id=self.share_id,
 
1029
                                           node_id=self.node_id,
 
1030
                                           old_parent_id=self.old_parent_id,
 
1031
                                           new_parent_id=self.new_parent_id,
 
1032
                                           new_name=self.new_name)
 
1033
 
 
1034
 
 
1035
class Unlink(ActionQueueMetaCommand):
 
1036
    """
 
1037
    Unlink a file or dir
 
1038
    """
 
1039
    def __init__(self, action_queue, share_id, parent_id, node_id):
 
1040
        self.action_queue = action_queue
 
1041
        self.share_id = share_id
 
1042
        self.node_id = node_id
 
1043
        self.parent_id = parent_id
 
1044
        self.log = mklog(logger, self.__class__.__name__, share_id, node_id,
 
1045
                         share=share_id, node=node_id, parent=parent_id)
 
1046
 
 
1047
    def _start(self):
 
1048
        """
 
1049
        Do the specialized pre-run setup
 
1050
        """
 
1051
        return self.demark(self.share_id, self.node_id, self.parent_id)
 
1052
 
 
1053
    def store_marker_result(self, (share_id, node_id, parent_id)):
 
1054
        """
 
1055
        Called when all the markers are realized.
 
1056
        """
 
1057
        self.share_id = share_id
 
1058
        self.node_id = node_id
 
1059
        self.parent_id = parent_id
 
1060
 
 
1061
    def _run(self):
 
1062
        """
 
1063
        Do the actual running
 
1064
        """
 
1065
        return self.action_queue.client.unlink(self.share_id, self.node_id)
 
1066
 
 
1067
    def handle_success(self, success):
 
1068
        """
 
1069
        It worked! Push the event.
 
1070
        """
 
1071
        self.action_queue.event_queue.push('AQ_UNLINK_OK',
 
1072
                                           share_id=self.share_id,
 
1073
                                           parent_id=self.parent_id,
 
1074
                                           node_id=self.node_id)
 
1075
        return success
 
1076
 
 
1077
    def handle_failure(self, failure):
 
1078
        """
 
1079
        It didn't work! Push the event.
 
1080
        """
 
1081
        self.action_queue.event_queue.push('AQ_UNLINK_ERROR',
 
1082
                                           error=failure.getErrorMessage(),
 
1083
                                           share_id=self.share_id,
 
1084
                                           parent_id=self.parent_id,
 
1085
                                           node_id=self.node_id)
 
1086
 
 
1087
 
 
1088
class Query(ActionQueueMetaCommand):
 
1089
    """
 
1090
    Ask about the freshness of server hashes
 
1091
    """
 
1092
    def __init__(self, action_queue, items):
 
1093
        self.log = MultiProxy(
 
1094
            [mklog(logger, '(unrolled) query', share, node,
 
1095
                   share=share, node=node, hash=hash, index=i)
 
1096
             for (i, (share, node, hash)) in enumerate(items)])
 
1097
        self.action_queue = action_queue
 
1098
        self.items = items
 
1099
 
 
1100
    def store_marker_result(self, items):
 
1101
        """
 
1102
        Called when all the markers are realized.
 
1103
        """
 
1104
        self.items = items
 
1105
 
 
1106
    def _start(self):
 
1107
        """
 
1108
        Do the specialized pre-run setup
 
1109
        """
 
1110
        # node_hash will (should?) never be a marker, but it's the
 
1111
        # easiest way to keep the trio together: send it along for the
 
1112
        # trip
 
1113
        dl = []
 
1114
        for item in self.items:
 
1115
            d = self.demark(*item)
 
1116
            d.addErrback(self.handle_single_failure, item)
 
1117
            dl.append(d)
 
1118
        d = defer.DeferredList(dl, fireOnOneErrback=True, consumeErrors=True)
 
1119
        d.addCallbacks(self.unwrap)
 
1120
        return d
 
1121
 
 
1122
    def handle_failure(self, failure):
 
1123
        """
 
1124
        It didn't work! Never mind.
 
1125
        """
 
1126
        pass
 
1127
 
 
1128
    def handle_single_failure(self, failure, item):
 
1129
        """
 
1130
        The only failure mode of a Query is for a query to be done
 
1131
        using a marker that fails to realize.
 
1132
        """
 
1133
        self.action_queue.event_queue.push('AQ_QUERY_ERROR', item=item,
 
1134
                                           error=failure.getErrorMessage())
 
1135
        return SKIP_THIS_ITEM
 
1136
 
 
1137
    def _run(self):
 
1138
        """
 
1139
        Do the actual running
 
1140
        """
 
1141
        return self.action_queue.client.query(self.items)
 
1142
 
 
1143
 
 
1144
class ListShares(ActionQueueMetaCommand):
 
1145
    """
 
1146
    List shares shared to me
 
1147
    """
 
1148
    def __init__(self, action_queue):
 
1149
        self.action_queue = action_queue
 
1150
        self.log = mklog(logger, 'list_shares', UNKNOWN, UNKNOWN)
 
1151
 
 
1152
    def _run(self):
 
1153
        """
 
1154
        Do the actual running
 
1155
        """
 
1156
        return self.action_queue.client.list_shares()
 
1157
 
 
1158
    def handle_success(self, success):
 
1159
        """
 
1160
        It worked! Push the event.
 
1161
        """
 
1162
        self.action_queue.event_queue.push('AQ_SHARES_LIST',
 
1163
                                           shares_list=success)
 
1164
 
 
1165
    def handle_failure(self, failure):
 
1166
        """
 
1167
        It didn't work! Push the event.
 
1168
        """
 
1169
        self.action_queue.event_queue.push('AQ_LIST_SHARES_ERROR',
 
1170
                                           error=failure.getErrorMessage())
 
1171
 
 
1172
 
 
1173
class AnswerShare(ActionQueueMetaCommand):
 
1174
    """
 
1175
    Answer a share offer
 
1176
    """
 
1177
    def __init__(self, action_queue, share_id, answer):
 
1178
        self.action_queue = action_queue
 
1179
        self.share_id = share_id
 
1180
        self.answer = answer
 
1181
        self.log = mklog(logger, 'answer_share', share_id, UNKNOWN)
 
1182
 
 
1183
    def _run(self):
 
1184
        """
 
1185
        Do the actual running
 
1186
        """
 
1187
        return self.action_queue.client.accept_share(self.share_id, self.answer)
 
1188
 
 
1189
 
 
1190
class CreateShare(ActionQueueMetaCommand):
 
1191
    """
 
1192
    Offer a share to somebody
 
1193
    """
 
1194
    def __init__(self, action_queue, node_id, share_to, name, access_level,
 
1195
                 marker):
 
1196
        self.action_queue = action_queue
 
1197
        self.node_id = node_id
 
1198
        self.share_to = share_to
 
1199
        self.name = name
 
1200
        self.access_level = access_level
 
1201
        self.marker = marker
 
1202
        self.log = mklog(logger, self.__class__.__name__, UNKNOWN, node_id)
 
1203
 
 
1204
    def store_marker_result(self, (node_id,)):
 
1205
        """
 
1206
        Called when all the markers are realized.
 
1207
        """
 
1208
        self.node_id = node_id
 
1209
 
 
1210
    def _start(self):
 
1211
        """
 
1212
        Do the specialized pre-run setup
 
1213
        """
 
1214
        return self.demark(self.node_id)
 
1215
 
 
1216
    def _run(self):
 
1217
        """
 
1218
        Do the actual running
 
1219
        """
 
1220
        return self.action_queue.client.create_share(self.node_id,
 
1221
                                                     self.share_to,
 
1222
                                                     self.name,
 
1223
                                                     self.access_level)
 
1224
 
 
1225
    def handle_success(self, success):
 
1226
        """
 
1227
        It worked! Push the event.
 
1228
        """
 
1229
        self.action_queue.event_queue.push('AQ_CREATE_SHARE_OK',
 
1230
                                           share_id=success.share_id,
 
1231
                                           marker=self.marker)
 
1232
        return success
 
1233
 
 
1234
    def handle_failure(self, failure):
 
1235
        """
 
1236
        It didn't work! Push the event.
 
1237
        """
 
1238
        self.action_queue.event_queue.push('AQ_CREATE_SHARE_ERROR',
 
1239
                                           marker=self.marker,
 
1240
                                           error=failure.getErrorMessage())
 
1241
 
 
1242
 
 
1243
class GetContentMixin(object):
 
1244
    """
 
1245
    Base for ListDir and Download. It's a mixin (ugh) because
 
1246
    otherwise things would be even more confusing
 
1247
    """
 
1248
    def __init__(self, action_queue, share_id, node_id, server_hash,
 
1249
                 fileobj_factory):
 
1250
        self.action_queue = action_queue
 
1251
        self.share_id = share_id
 
1252
        self.node_id = node_id
 
1253
        self.server_hash = server_hash
 
1254
        self.fileobj_factory = fileobj_factory
 
1255
        self.fileobj = None
 
1256
        self.gunzip = zlib.decompressobj()
 
1257
        self.log = mklog(logger, self.__class__.__name__, share_id, node_id,
 
1258
                         share=share_id, node=node_id, server_hash=server_hash,
 
1259
                         fileobj_factory=fileobj_factory)
 
1260
        if (self.share_id, self.node_id) in self.action_queue.downloading:
 
1261
            self.action_queue.cancel_download(self.share_id, self.node_id)
 
1262
 
 
1263
    def _start(self):
 
1264
        """
 
1265
        Do the specialized pre-run setup
 
1266
        """
 
1267
        return self.demark(self.node_id)
 
1268
 
 
1269
    def store_marker_result(self, (node_id,)):
 
1270
        """
 
1271
        Called when all the markers are realized.
 
1272
        """
 
1273
        self.node_id = node_id
 
1274
 
 
1275
    def _run(self):
 
1276
        """
 
1277
        Do the actual running
 
1278
        """
 
1279
        try:
 
1280
            self.fileobj = self.fileobj_factory()
 
1281
        except StandardError:
 
1282
            return defer.fail(Failure('unable to build fileobj'
 
1283
                                      ' (file went away?)'
 
1284
                                      ' so aborting the download.'))
 
1285
        self.action_queue.downloading[self.share_id,
 
1286
                                      self.node_id] = {'n_bytes_read': 0}
 
1287
 
 
1288
        self.action_queue.event_queue.push('AQ_DOWNLOAD_STARTED',
 
1289
                                           share_id=self.share_id,
 
1290
                                           node_id=self.node_id,
 
1291
                                           server_hash=self.server_hash)
 
1292
 
 
1293
        req = self.action_queue.client.get_content_request(
 
1294
            self.share_id, self.node_id, self.server_hash,
 
1295
            callback=self.cb, node_attr_callback=self.nacb)
 
1296
        self.action_queue.downloading[self.share_id, self.node_id]['req'] = req
 
1297
        d = req.deferred
 
1298
        d.addBoth(passit(lambda _:
 
1299
                             self.action_queue.downloading.pop((self.share_id,
 
1300
                                                                self.node_id))))
 
1301
        d.addErrback(passit(lambda _: self.reset_fileobj()))
 
1302
        return d
 
1303
 
 
1304
    def handle_success(self, _):
 
1305
        """
 
1306
        It worked! Push the event.
 
1307
        """
 
1308
        self.action_queue.event_queue.push('AQ_DOWNLOAD_FINISHED',
 
1309
                                           share_id=self.share_id,
 
1310
                                           node_id=self.node_id,
 
1311
                                           server_hash=self.server_hash)
 
1312
 
 
1313
    def handle_failure(self, failure):
 
1314
        """
 
1315
        It didn't work! Push the event.
 
1316
        """
 
1317
        self.action_queue.event_queue.push('AQ_DOWNLOAD_ERROR',
 
1318
                                           error=failure.getErrorMessage(),
 
1319
                                           share_id=self.share_id,
 
1320
                                           node_id=self.node_id,
 
1321
                                           server_hash=self.server_hash)
 
1322
 
 
1323
    def reset_fileobj(self):
 
1324
        """
 
1325
        Rewind and empty the file (i.e. get it ready to try again if
 
1326
        necessary)
 
1327
        """
 
1328
        if self.fileobj is not None:
 
1329
            self.fileobj.seek(0, 0)
 
1330
            self.fileobj.truncate(0)
 
1331
 
 
1332
    def cb(self, bytes):
 
1333
        """
 
1334
        A streaming decompressor
 
1335
        """
 
1336
        dloading = self.action_queue.downloading[self.share_id,
 
1337
                                                 self.node_id]
 
1338
        dloading['n_bytes_read'] += len(bytes)
 
1339
        self.fileobj.write(self.gunzip.decompress(bytes))
 
1340
        self.fileobj.flush()     # not strictly necessary but nice to
 
1341
                                 # see the downloaded size
 
1342
 
 
1343
    def nacb(self, **kwargs):
 
1344
        """
 
1345
        set the node attrs in the 'currently downloading' dict
 
1346
        """
 
1347
        self.action_queue.downloading[self.share_id,
 
1348
                                      self.node_id].update(kwargs)
 
1349
 
 
1350
    def sync(self, _):
 
1351
        """
 
1352
        Flush the buffers and sync them to disk if possible
 
1353
        """
 
1354
        self.fileobj.write(self.gunzip.flush())
 
1355
        self.fileobj.flush()
 
1356
        if getattr(self.fileobj, 'fileno', None) is not None:
 
1357
            # it's a real file, with a fileno! Let's sync its data
 
1358
            # out to disk
 
1359
            os.fsync(self.fileobj.fileno())
 
1360
        self.fileobj.close()
 
1361
 
 
1362
 
 
1363
class ListDir(GetContentMixin, ActionQueueMetaCommand):
 
1364
    """
 
1365
    Get a listing of a directory's contents
 
1366
    """
 
1367
 
 
1368
class Download(GetContentMixin, ActionQueueContentCommand):
 
1369
    """
 
1370
    Get the contents of a file.
 
1371
    """
 
1372
 
 
1373
class Upload(ActionQueueContentCommand):
 
1374
    """
 
1375
    Upload stuff to a file
 
1376
    """
 
1377
    retryable_errors = (ActionQueueContentCommand.retryable_errors
 
1378
                        | set(['UPLOAD_IN_PROGRESS']))
 
1379
 
 
1380
    def __init__(self, action_queue, share_id, node_id, previous_hash, hash,
 
1381
                 crc32, size, fileobj_factory, tempfile_factory):
 
1382
        self.action_queue = action_queue
 
1383
        self.share_id = share_id
 
1384
        self.node_id = node_id
 
1385
        self.previous_hash = previous_hash
 
1386
        self.hash = hash
 
1387
        self.crc32 = crc32
 
1388
        self.size = size
 
1389
        self.fileobj_factory = fileobj_factory
 
1390
        self.tempfile_factory = tempfile_factory
 
1391
        self.deflated_size = None
 
1392
        self.tempfile = None
 
1393
        self.cancelled = False
 
1394
        self.upload_req = None
 
1395
        self.log = mklog(logger, 'upload', share_id, node_id, share=share_id,
 
1396
                         node=node_id, previous_hash=previous_hash,
 
1397
                         hash=hash, crc32=crc32, size=size,
 
1398
                         fileobj_factory=fileobj_factory)
 
1399
        if (self.share_id, self.node_id) in self.action_queue.uploading:
 
1400
            self.action_queue.cancel_upload(self.share_id, self.node_id)
 
1401
 
 
1402
    def cancel(self):
 
1403
        """Cancel the upload."""
 
1404
        self.cancelled = True
 
1405
        if self.upload_req is not None:
 
1406
            self.upload_req.cancel()
 
1407
 
 
1408
    def cleanup(self):
 
1409
        """
 
1410
        Cleanup: stop the producer.
 
1411
        """
 
1412
        if self.upload_req.producer is not None:
 
1413
            self.upload_req.producer.stopProducing()
 
1414
 
 
1415
    def _start(self):
 
1416
        """
 
1417
        Do the specialized pre-run setup
 
1418
        """
 
1419
        d = defer.Deferred()
 
1420
 
 
1421
        uploading = {"hash": self.hash, "req": self}
 
1422
        self.action_queue.uploading[self.share_id, self.node_id] = uploading
 
1423
 
 
1424
        d = self.action_queue.zip_queue.zip(self)
 
1425
        d.addCallback(lambda _: self.demark(self.node_id))
 
1426
        return d
 
1427
 
 
1428
    def store_marker_result(self, (node_id,)):
 
1429
        """
 
1430
        Called when all the markers are realized.
 
1431
        """
 
1432
        # update action_queue.uploading with the real node_id
 
1433
        uploading = self.action_queue.uploading.pop((self.share_id,
 
1434
                                                     self.node_id))
 
1435
        self.node_id = node_id
 
1436
        self.action_queue.uploading[self.share_id, node_id] = uploading
 
1437
 
 
1438
 
 
1439
    def _run(self):
 
1440
        """
 
1441
        Do the actual running
 
1442
        """
 
1443
        uploading = {"hash": self.hash, "deflated_size": self.deflated_size,
 
1444
                     "req": self}
 
1445
        self.action_queue.uploading[self.share_id, self.node_id] = uploading
 
1446
 
 
1447
        self.action_queue.event_queue.push('AQ_UPLOAD_STARTED',
 
1448
                                           share_id=self.share_id,
 
1449
                                           node_id=self.node_id,
 
1450
                                           hash=self.hash)
 
1451
 
 
1452
        if getattr(self.tempfile, 'name', None) is not None:
 
1453
            self.tempfile = open(self.tempfile.name)
 
1454
        f = UploadProgressWrapper(self.tempfile, uploading)
 
1455
        req = self.action_queue.client.put_content_request(
 
1456
            self.share_id, self.node_id, self.previous_hash, self.hash,
 
1457
            self.crc32, self.size, self.deflated_size, f)
 
1458
        self.upload_req = req
 
1459
        d = req.deferred
 
1460
        d.addBoth(passit(lambda _:
 
1461
                             self.action_queue.uploading.pop((self.share_id,
 
1462
                                                              self.node_id))))
 
1463
        d.addBoth(passit(lambda _: self.tempfile.close()))
 
1464
        return d
 
1465
 
 
1466
    def handle_success(self, _):
 
1467
        """
 
1468
        It worked! Push the event.
 
1469
        """
 
1470
        if getattr(self.tempfile, 'name', None) is not None:
 
1471
            os.unlink(self.tempfile.name)
 
1472
        self.action_queue.event_queue.push('AQ_UPLOAD_FINISHED',
 
1473
                                           share_id=self.share_id,
 
1474
                                           node_id=self.node_id,
 
1475
                                           hash=self.hash)
 
1476
 
 
1477
    def handle_failure(self, failure):
 
1478
        """
 
1479
        It didn't work! Push the event.
 
1480
        """
 
1481
        if getattr(self.tempfile, 'name', None) is not None:
 
1482
            os.unlink(self.tempfile.name)
 
1483
        self.action_queue.event_queue.push('AQ_UPLOAD_ERROR',
 
1484
                                           error=failure.getErrorMessage(),
 
1485
                                           share_id=self.share_id,
 
1486
                                           node_id=self.node_id,
 
1487
                                           hash=self.hash)