1
# canonical.ubuntuone.storage.syncdaemon.action_queue - Action queue
3
# Author: John Lenton <john.lenton@canonical.com>
5
# Copyright 2009 Canonical Ltd.
7
# This program is free software: you can redistribute it and/or modify it
8
# under the terms of the GNU General Public License version 3, as published
9
# by the Free Software Foundation.
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranties of
13
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
14
# PURPOSE. See the GNU General Public License for more details.
16
# You should have received a copy of the GNU General Public License along
17
# with this program. If not, see <http://www.gnu.org/licenses/>.
19
The ActionQueue is where actions to be performed on the server are
20
queued up and then executed. The idea is that there are two queues,
21
one for metadata and another for content; the metadata queue has
22
priority over the content queue.
24
from collections import deque, defaultdict
25
from functools import wraps, partial
32
from zope.interface import implements
33
from twisted.internet import reactor, defer, ssl
34
from twisted.names import client as dns_client
35
from twisted.python.failure import Failure
38
from canonical.ubuntuone.storage.protocol import protocol_pb2
39
from canonical.ubuntuone.storage.protocol.client import StorageClient, \
41
from canonical.ubuntuone.storage.syncdaemon.logger import mklog, TRACE
42
from canonical.ubuntuone.storage.syncdaemon.interfaces import IActionQueue, \
45
logger = logging.getLogger("ubuntuone.SyncDaemon.ActionQueue")
47
# I want something which repr() is "---" *without* the quotes :)
48
UNKNOWN = type('', (), {'__repr__': lambda _: '---'})()
52
Pass the value on for the next deferred, while calling func with
65
class UploadCompressionCancelled(Exception):
66
"""Compression of a file for upload cancelled."""
69
class RequestCleanedUp(Exception):
71
The request was cancelled by ActionQueue.cleanup()
75
class NamedTemporaryFile(object):
77
Like tempfile.NamedTemporaryFile, but working in 2.5 also WRT the
78
delete argument. Actually, one of these NamedTemporaryFile()s is
79
the same as a tempfile.NamedTemporaryFile(delete=False) from 2.6.
81
Or so the theory goes.
84
fileno, self.name = tempfile.mkstemp()
85
self._fd = os.fdopen(fileno, 'r+w')
87
def __getattr__(self, attr):
88
"""proxy everything else (other than .name) on to self._fd"""
89
return getattr(self._fd, attr)
92
class MultiProxy(list):
94
Proxy many objects of the same kind, like this:
96
>>> m = MultiProxy(['foo', 'bar', 'baz'])
98
MultiProxy(['FOO', 'BAR', 'BAZ'])
101
def __getattr__(self, attr):
102
return MultiProxy(getattr(i, attr) for i in self)
104
def __call__(self, *args, **kwargs):
105
return MultiProxy(i(*args, **kwargs) for i in self)
108
return 'MultiProxy(%s)' % (super(MultiProxy, self).__repr__(),)
111
class LoggingStorageClient(StorageClient):
112
""" A subclass of StorageClient that adds logging to
113
processMessage and sendMessage.
117
""" setup logging and create the instance. """
118
StorageClient.__init__(self)
119
self.log = logging.getLogger('ubuntuone.SyncDaemon.StorageClient')
120
# configure the handler level to be < than DEBUG
121
self.log.setLevel(TRACE)
122
self.log.debug = partial(self.log.log, TRACE)
124
def processMessage(self, message):
125
""" wrapper that logs the message and result. """
126
# don't log the full message if it's of type BYTES
127
if message.type == protocol_pb2.Message.BYTES:
128
self.log.debug('start - processMessage: id: %s, type: %s',
129
message.id, message.type)
131
self.log.debug('start - processMessage: %s',
132
str(message).replace("\n", " "))
133
if message.id in self.requests:
134
req = self.requests[message.id]
135
req.deferred.addCallbacks(self.log_success, self.log_error)
136
result = StorageClient.processMessage(self, message)
137
self.log.debug('end - processMessage: id: %s - result: %s',
141
def log_error(self, failure):
142
""" logging errback for requests """
143
self.log.debug('request error: %s', failure)
146
def log_success(self, result):
147
""" logging callback for requests """
148
self.log.debug('request finished: %s', result)
149
if getattr(result, '__dict__', None):
150
self.log.debug('result.__dict__: %s', result.__dict__)
153
def sendMessage(self, message):
154
""" wrapper that logs the message and result. """
155
# don't log the full message if it's of type BYTES
156
if message.type == protocol_pb2.Message.BYTES:
157
self.log.debug('start - sendMessage: id: %s, type: %s',
158
message.id, message.type)
160
self.log.debug('start - sendMessage: %s',
161
str(message).replace("\n", " "))
162
result = StorageClient.sendMessage(self, message)
163
self.log.debug('end - sendMessage: id: %s', message.id)
167
class ActionQueueProtocol(LoggingStorageClient):
169
This is the Action Queue version of the StorageClient protocol.
171
connection_state = 'disconnected'
174
def connectionMade(self):
176
Called when a new connection is made.
177
All the state is saved in the factory.
179
logger.debug("connection made")
180
self.connection_state = 'connected'
181
self.factory.client = self
182
self.set_node_state_callback(self.factory._node_state_callback)
183
self.set_share_change_callback(self.factory._share_change_callback)
184
self.set_share_answer_callback(self.factory._share_answer_callback)
185
self.factory.event_queue.push('SYS_CONNECTION_MADE')
187
def disconnect(self):
189
Close down the sockets
191
logger.debug("disconnected")
192
if self.transport is not None:
193
self.transport.loseConnection()
198
A uuid4-based marker class
202
return super(Marker, self).__new__(self, uuid.uuid4())
205
class ZipQueue(object):
207
A queue of files to be compressed for upload
209
Parts of this were shamelessly copied from
210
twisted.internet.defer.DeferredSemaphore
215
self.waiting = deque()
216
self.tokens = self.limit = 10
220
return a deferred which fires on token acquisition.
222
assert self.tokens >= 0, "Tokens should never be negative"
225
self.waiting.append(d)
227
self.tokens = self.tokens - 1
235
Should be called by whoever did the acquire() when the shared
238
assert self.tokens < self.limit, "Too many tokens!"
239
self.tokens = self.tokens + 1
241
# someone is waiting to acquire token
242
self.tokens = self.tokens - 1
243
d = self.waiting.popleft()
246
def _compress(self, deferred, upload):
247
"""Compression background task."""
249
fileobj = upload.fileobj_factory()
250
except StandardError:
251
# presumably the user deleted the file before we got to
252
# upload it. Logging a warning just in case.
253
upload.log.warn('unable to build fileobj'
254
' (user deleted the file, maybe?)'
255
' so cancelling the upload.')
259
filename = getattr(fileobj, 'name', '<?>')
261
upload.log.debug('compressing', filename)
263
# we need to compress the file completely to figure out its
264
# compressed size. So streaming is out :(
265
if upload.tempfile_factory is None:
266
f = NamedTemporaryFile()
268
f = upload.tempfile_factory()
269
zipper = zlib.compressobj()
270
while not upload.cancelled:
271
data = fileobj.read(4096)
273
f.write(zipper.flush())
274
# no flush/sync because we don't need this to persist
275
# on disk; if the machine goes down, we'll lose it
276
# anyway (being in /tmp and all)
278
f.write(zipper.compress(data))
280
raise UploadCompressionCancelled("Cancelled")
281
upload.deflated_size = f.tell()
282
# close the compressed file (thus, if you actually want to stream
283
# it out, it must have a name so it can be reopnened)
286
except Exception, e: # pylint: disable-msg=W0703
287
reactor.callFromThread(deferred.errback, e)
289
reactor.callFromThread(deferred.callback, True)
291
def zip(self, upload):
293
Acquire, do the compression in a thread, release.
295
d_zip = defer.Deferred()
296
d_lck = self.acquire()
298
lambda _: reactor.callFromThread(self._compress,
299
d_zip, upload) or d_zip)
300
d_lck.addCallback(lambda _: self.release())
305
class RequestQueue(object):
307
RequestQueue is a queue that ensures that there is at most one
308
request at a time 'on the wire', and that uses the action queue's
309
states for its syncrhonization.
311
def __init__(self, name, action_queue):
312
super(RequestQueue, self).__init__()
314
self.action_queue = action_queue
315
self.waiting = deque()
320
"""return the length of the queue"""
321
return len(self.waiting)
323
def queue(self, func, *args, **kwargs):
325
Add a call to func to the queue.
328
d.addCallback(lambda _: defer.maybeDeferred(func, *args, **kwargs))
329
self.waiting.append(d)
330
if len(self.waiting) == 1 and not self.head:
331
self.action_queue.event_queue.push('SYS_' + self.name
340
d = self.waiting.popleft()
342
d.addBoth(lambda a: (self.run(), a)[1])
346
self.action_queue.event_queue.push('SYS_' + self.name + '_DONE')
348
class ContentQueue(RequestQueue):
350
A content queue is a queue of content requests (uploads and downloads).
354
class MetaQueue(RequestQueue):
356
A meta queue is a queue of metadata-related requests.
360
class DeferredMap(object):
362
A mapping of deferred values. Or a deferred map of values. Or a
363
mapping that returns deferreds and then fires them when it has the
367
self.waiting = defaultdict(list)
373
Get the value for the given key.
375
This always returns a deferred; when we already know the value
376
we return a `succeed`, and if we don't know the value because
377
it failed we return a `fail`; otherwise we return a plain
378
unfired `Deferred`, and add it to the list of deferreds to
379
call when we actually get the value.
382
return defer.succeed(self.map[key])
383
if key in self.failed:
384
return defer.fail(Exception(self.failed[key]))
386
self.waiting[key].append(d)
389
def set(self, key, value):
391
We've got the value for a key! Write it down in the map, and
392
fire the waiting deferreds.
394
if key not in self.map:
395
self.map[key] = value
396
for d in self.waiting.pop(key, ()):
398
elif self.map[key] != value:
400
raise KeyError("key is taken -- dunno what to do")
402
def err(self, key, failure):
404
Something went terribly wrong in the process of getting a
405
value. Break the news to the waiting deferreds.
407
self.failed[key] = failure.getErrorMessage()
408
for d in self.waiting.pop(key, ()):
412
class UploadProgressWrapper(object):
414
A wrapper around the file-like object used for Uploads, with which
415
we can keep track of the number of bytes that have been written to
418
def __init__(self, fd, data_dict):
420
fd is the file-like object used for uploads. data_dict is the
421
entry in the uploading dictionary.
424
self.data_dict = data_dict
425
self.n_bytes_read = 0
427
def read(self, size=None):
429
read at most size bytes from the file-like object.
431
Keep track of the number of bytes that have been read, and the
432
number of bytes that have been written (assumed to be equal to
433
the number of bytes read on the previews call to read). The
434
latter is done directly in the data_dict.
436
self.data_dict['n_bytes_written'] = self.n_bytes_read
437
data = self.fd.read(size)
438
self.n_bytes_read += len(data)
441
def __getattr__(self, attr):
445
return getattr(self.fd, attr)
448
class ActionQueue(StorageClientFactory, object):
450
This is the ActionQueue itself.
452
implements(IActionQueue)
453
protocol = ActionQueueProtocol
455
def __init__(self, event_queue, host, port, dns_srv,
457
self.event_queue = event_queue
460
self.dns_srv = dns_srv
461
self.use_ssl = use_ssl
467
self.content_queue = ContentQueue('CONTENT_QUEUE', self)
468
self.meta_queue = MetaQueue('META_QUEUE', self)
469
self.uuid_map = DeferredMap()
470
self.zip_queue = ZipQueue()
473
self.downloading = {}
475
event_queue.subscribe(self)
477
def handle_SYS_CONNECT(self, access_token):
479
Stow the access token away for later use
481
self.token = access_token
485
Cancel, clean up, and reschedule things that were in progress
486
when a disconnection happened
488
self.event_queue.push('SYS_CLEANUP_STARTED')
489
if self.client is not None:
490
self.client.disconnect()
491
for queue in self.meta_queue, self.content_queue:
492
if queue.head is not None:
493
queue.head.errback(RequestCleanedUp('Cleaned up'))
494
self.event_queue.push('SYS_CLEANUP_FINISHED')
497
def _node_state_callback(self, share_id, node_id, hash):
499
Called by the client when notified that node changed.
501
self.event_queue.push('SV_HASH_NEW',
502
share_id=share_id, node_id=node_id, hash=hash)
504
def _share_change_callback(self, message, info):
506
Called by the client when notified that a share changed.
508
self.event_queue.push('SV_SHARE_CHANGED',
509
message=message, info=info)
511
def _share_answer_callback(self, share_id, answer):
513
Called by the client when it gets a share answer notification.
515
self.event_queue.push('SV_SHARE_ANSWERED',
516
share_id=str(share_id), answer=answer)
518
def _lookup_srv(self):
519
""" do the SRV lookup and return a deferred whose callback is going
520
to be called with (host, port). If we can't do the lookup, the default
523
def on_lookup_ok(results):
524
"""Get a random host from the SRV result."""
525
logger.debug('SRV lookup done, choosing a server')
526
records, auth, add = results
528
raise ValueError("No available records.")
529
# pick a random server
530
record = random.choice(records)
531
logger.debug('Using record: %r', record)
533
return record.payload.target.name, record.payload.port
535
logger.info('Empty SRV record, fallback to %r:%r',
536
self.host, self.port)
537
return self.host, self.port
539
def on_lookup_error(failure):
540
""" return the default host/post on a DNS SRV lookup failure. """
541
logger.info("SRV lookup error, fallback to %r:%r \n%s",
542
self.host, self.port, failure.getTraceback())
543
return self.host, self.port
546
# lookup the DNS SRV records
547
d = dns_client.lookupService(self.dns_srv, timeout=[3, 2])
548
d.addCallback(on_lookup_ok)
549
d.addErrback(on_lookup_error)
552
return defer.succeed((self.host, self.port))
556
Start the circus going.
558
self.deferred = defer.Deferred()
559
d = self._lookup_srv()
560
def _connect(result):
561
""" do the real thing """
564
reactor.connectSSL(host, port, self,
565
ssl.ClientContextFactory())
567
reactor.connectTCP(host, port, self)
568
d.addCallback(_connect)
571
def conectionFailed(self, reason=None):
573
Called when the connect() call fails
575
self.deferred.errback(reason)
577
def get_root(self, marker):
579
Get the user's root uuid. Use the uuid_map, so the caller can
580
use the marker in followup operations.
582
log = mklog(logger, 'get_root', '', marker, marker=marker)
583
log.debug('starting')
584
d = self.client.get_root()
585
d.addCallbacks(*log.callbacks())
586
d.addCallbacks(passit(lambda root: self.uuid_map.set(marker, root)),
587
passit(lambda f: self.uuid_map.err(marker, f)))
591
def make_file(self, share_id, parent_id, name, marker):
593
See .interfaces.IMetaQueue
595
return MakeFile(self, share_id, parent_id, name, marker).start()
597
def make_dir(self, share_id, parent_id, name, marker):
599
See .interfaces.IMetaQueue
601
return MakeDir(self, share_id, parent_id, name, marker).start()
603
def move(self, share_id, node_id, old_parent_id, new_parent_id, new_name):
605
See .interfaces.IMetaQueue
607
return Move(self, share_id, node_id, old_parent_id,
608
new_parent_id, new_name).start()
610
def unlink(self, share_id, parent_id, node_id):
612
See .interfaces.IMetaQueue
614
return Unlink(self, share_id, parent_id, node_id).start()
616
def query(self, items):
618
See .interfaces.IMetaQueue
620
return Query(self, items).start()
622
def list_shares(self):
624
List the shares; put the result on the event queue
626
return ListShares(self).start()
628
def answer_share(self, share_id, answer):
630
Answer the offer of a share.
632
return AnswerShare(self, share_id, answer).start()
634
def create_share(self, node_id, share_to, name, access_level, marker):
636
Share a node with somebody.
638
return CreateShare(self, node_id, share_to, name, access_level,
641
def listdir(self, share_id, node_id, server_hash, fileobj_factory):
643
See .interfaces.IMetaQueue.listdir
645
return ListDir(self, share_id, node_id, server_hash,
646
fileobj_factory).start()
648
def download(self, share_id, node_id, server_hash, fileobj_factory):
650
See .interfaces.IContentQueue.download
652
return Download(self, share_id, node_id, server_hash,
653
fileobj_factory).start()
655
def upload(self, share_id, node_id, previous_hash, hash, crc32,
656
size, fileobj_factory, tempfile_factory=None):
658
See .interfaces.IContentQueue
660
return Upload(self, share_id, node_id, previous_hash, hash,
661
crc32, size, fileobj_factory, tempfile_factory).start()
663
def cancel_upload(self, share_id, node_id):
665
See .interfaces.IContentQueue
667
log = mklog(logger, 'cancel_upload', share_id, node_id,
668
share=share_id, node=node_id)
669
log.debug('starting')
670
if (share_id, node_id) in self.uploading:
671
req = self.uploading[share_id, node_id].get('req')
674
log.debug("cancelled")
675
log.debug('finished')
677
def cancel_download(self, share_id, node_id):
679
See .interfaces.IContentQueue
681
log = mklog(logger, 'cancel_download', share_id, node_id,
682
share=share_id, node=node_id)
683
log.debug('starting')
684
if (share_id, node_id) in self.downloading:
685
req = self.downloading[share_id, node_id].get('req')
688
log.debug("cancelled")
689
log.debug('finished')
692
SKIP_THIS_ITEM = object()
694
# pylint: disable-msg=W0231
696
class ActionQueueCommand(object):
698
Base of all the action queue commands
701
# protobuf doesn't seem to have much introspectionable stuff
702
# without going into private attributes
703
known_error_messages = (set(protocol_pb2._ERROR_ERRORTYPE.values_by_name)
704
| set(['CANCELLED']))
705
suppressed_error_messages = (known_error_messages
706
- set(['INTERNAL_ERROR'])
707
| set(['Cleaned up']))
708
retryable_errors = set(['Cleaned up', 'TRY_AGAIN'])
710
def demark(self, *maybe_markers):
712
Arrange to have maybe_markers realized
715
for marker in maybe_markers:
716
if IMarker.providedBy(marker):
717
self.log.debug('waiting until we know the real value of %s'
719
d = self.action_queue.uuid_map.get(marker)
720
d.addCallbacks(passit(lambda _:
721
self.log.debug('got %s' % marker)),
723
self.log.error('failed %s' % marker)))
725
d = defer.succeed(marker)
727
dl = defer.DeferredList(l, fireOnOneErrback=True, consumeErrors=True)
728
dl.addCallbacks(self.unwrap,
729
lambda f: f.value.subFailure)
735
Unpack the values from the result of a DeferredList. If
736
there's a failure, return it instead.
739
for result in results:
740
# result can be none if one of the callbacks failed
741
# before the others were ready
742
if result is not None:
743
is_ok, value = result
747
if value is not SKIP_THIS_ITEM:
752
def end_callback(self, arg):
756
self._queue.head = None
757
self.log.debug('success')
758
return self.handle_success(arg)
760
def end_errback(self, failure):
764
self._queue.head = None
765
error_message = failure.getErrorMessage()
766
if error_message not in self.suppressed_error_messages:
767
self.log.error('failure', error_message)
768
self.log.debug('traceback follows:\n\n' + failure.getTraceback())
770
self.log.warn('failure', error_message)
771
if error_message in self.retryable_errors:
772
reactor.callLater(0.1, self.retry)
774
return self.handle_failure(failure)
776
def start(self, _=None):
778
Get ready to run, and then run.
780
The default implementation is for when there is no preparation necessary
783
d.addCallback(self.store_marker_result)
784
d.addCallback(lambda _: self.log.debug('queueing in the %s'
786
d.addCallbacks(lambda _: self._queue.queue(self.run),
792
Do the specialized pre-run setup
794
return defer.succeed(None)
796
def store_marker_result(self, _):
798
Called when all the markers are realized.
805
self.log.debug('starting')
806
self._queue.head = d = self._run()
807
d.addCallbacks(self.end_callback, self.end_errback)
810
def handle_success(self, success):
812
Do anthing that's needed to handle success of the operation.
816
def handle_failure(self, failure):
818
Do anthing that's needed to handle failure of the operation.
819
Note that cancellation and TRY_AGAIN are already handled.
825
Request cancelled or TRY_AGAIN. Well then, try again!
827
return self._queue.queue(self.run)
831
class ActionQueueMetaCommand(ActionQueueCommand):
833
Base of metadata-related commands (commands that are queued in the
839
Get at the meta queue
841
return self.action_queue.meta_queue
844
class ActionQueueContentCommand(ActionQueueCommand):
846
Base of content-related commands (commands that are queued in the
852
Get at the content queue
854
return self.action_queue.content_queue
857
class MakeThing(ActionQueueMetaCommand):
859
Base of MakeFile and MakeDir
861
def __init__(self, action_queue, share_id, parent_id, name, marker):
862
self.action_queue = action_queue
863
self.share_id = share_id
864
self.parent_id = parent_id
865
# Unicode boundary! the name is Unicode in protocol and server, but
866
# here we use bytes for paths
867
self.name = name.decode("utf8")
869
self.log = mklog(logger, self.__class__.__name__, share_id, marker,
870
share=share_id, parent=parent_id, name=name,
875
Do the specialized pre-run setup
877
return self.demark(self.share_id, self.parent_id)
879
def store_marker_result(self, (share_id, parent_id)):
881
Called when all the markers are realized.
883
self.share_id = share_id
884
self.parent_id = parent_id
888
Do the actual running
890
maker = getattr(self.action_queue.client, self.client_method)
891
return maker(self.share_id,
895
def handle_success(self, success):
897
It worked! Push the event, and update the uuid map.
899
# note that we're not getting the new name from the answer
900
# message, if we would get it, we would have another Unicode
902
self.action_queue.event_queue.push(self.ok_event_name,
904
new_id=success.new_id)
905
if IMarker.providedBy(self.marker):
906
self.action_queue.uuid_map.set(self.marker, success.new_id)
909
def handle_failure(self, failure):
911
It didn't work! Push the event, and update the uuid map.
913
self.action_queue.event_queue.push(self.error_event_name,
915
error=failure.getErrorMessage())
916
if IMarker.providedBy(self.marker):
917
self.action_queue.uuid_map.err(self.marker,
921
class MakeFile(MakeThing):
925
ok_event_name = 'AQ_FILE_NEW_OK'
926
error_event_name = 'AQ_FILE_NEW_ERROR'
927
client_method = 'make_file'
930
class MakeDir(MakeThing):
934
ok_event_name = 'AQ_DIR_NEW_OK'
935
error_event_name = 'AQ_DIR_NEW_ERROR'
936
client_method = 'make_dir'
939
class Move(ActionQueueMetaCommand):
941
Move a file or directory
943
def __init__(self, action_queue, share_id, node_id, old_parent_id,
944
new_parent_id, new_name):
945
self.action_queue = action_queue
946
self.share_id = share_id
947
self.node_id = node_id
948
self.old_parent_id = old_parent_id
949
self.new_parent_id = new_parent_id
950
# Unicode boundary! the name is Unicode in protocol and server, but
951
# here we use bytes for paths
952
self.new_name = new_name.decode("utf8")
953
self.log = mklog(logger, self.__class__.__name__, share_id, node_id,
954
share=share_id, node=node_id,
955
old_parent=old_parent_id, new_parent=new_parent_id,
960
Do the specialized pre-run setup
962
return self.demark(self.share_id, self.node_id, self.new_parent_id)
964
def store_marker_result(self, (share_id, node_id, new_parent_id)):
966
Called when all the markers are realized.
968
self.share_id = share_id
969
self.node_id = node_id
970
self.new_parent_id = new_parent_id
974
Do the actual running
976
return self.action_queue.client.move(self.share_id,
980
def handle_success(self, success):
982
It worked! Push the event.
984
self.action_queue.event_queue.push('AQ_MOVE_OK',
985
share_id=self.share_id,
986
node_id=self.node_id)
989
def handle_failure(self, failure):
991
It didn't work! Push the event.
993
self.action_queue.event_queue.push('AQ_MOVE_ERROR',
994
error=failure.getErrorMessage(),
995
share_id=self.share_id,
996
node_id=self.node_id,
997
old_parent_id=self.old_parent_id,
998
new_parent_id=self.new_parent_id,
999
new_name=self.new_name)
1002
class Unlink(ActionQueueMetaCommand):
1004
Unlink a file or dir
1006
def __init__(self, action_queue, share_id, parent_id, node_id):
1007
self.action_queue = action_queue
1008
self.share_id = share_id
1009
self.node_id = node_id
1010
self.parent_id = parent_id
1011
self.log = mklog(logger, self.__class__.__name__, share_id, node_id,
1012
share=share_id, node=node_id, parent=parent_id)
1016
Do the specialized pre-run setup
1018
return self.demark(self.share_id, self.node_id, self.parent_id)
1020
def store_marker_result(self, (share_id, node_id, parent_id)):
1022
Called when all the markers are realized.
1024
self.share_id = share_id
1025
self.node_id = node_id
1026
self.parent_id = parent_id
1030
Do the actual running
1032
return self.action_queue.client.unlink(self.share_id, self.node_id)
1034
def handle_success(self, success):
1036
It worked! Push the event.
1038
self.action_queue.event_queue.push('AQ_UNLINK_OK',
1039
share_id=self.share_id,
1040
parent_id=self.parent_id,
1041
node_id=self.node_id)
1044
def handle_failure(self, failure):
1046
It didn't work! Push the event.
1048
self.action_queue.event_queue.push('AQ_UNLINK_ERROR',
1049
error=failure.getErrorMessage(),
1050
share_id=self.share_id,
1051
parent_id=self.parent_id,
1052
node_id=self.node_id)
1055
class Query(ActionQueueMetaCommand):
1057
Ask about the freshness of server hashes
1059
def __init__(self, action_queue, items):
1060
self.log = MultiProxy(
1061
[mklog(logger, '(unrolled) query', share, node,
1062
share=share, node=node, hash=hash, index=i)
1063
for (i, (share, node, hash)) in enumerate(items)])
1064
self.action_queue = action_queue
1067
def store_marker_result(self, items):
1069
Called when all the markers are realized.
1075
Do the specialized pre-run setup
1077
# node_hash will (should?) never be a marker, but it's the
1078
# easiest way to keep the trio together: send it along for the
1081
for item in self.items:
1082
d = self.demark(*item)
1083
d.addErrback(self.handle_single_failure, item)
1085
d = defer.DeferredList(dl, fireOnOneErrback=True, consumeErrors=True)
1086
d.addCallbacks(self.unwrap)
1089
def handle_failure(self, failure):
1091
It didn't work! Never mind.
1095
def handle_single_failure(self, failure, item):
1097
The only failure mode of a Query is for a query to be done
1098
using a marker that fails to realize.
1100
self.action_queue.event_queue.push('AQ_QUERY_ERROR', item=item,
1101
error=failure.getErrorMessage())
1102
return SKIP_THIS_ITEM
1106
Do the actual running
1108
return self.action_queue.client.query(self.items)
1111
class ListShares(ActionQueueMetaCommand):
1113
List shares shared to me
1115
def __init__(self, action_queue):
1116
self.action_queue = action_queue
1117
self.log = mklog(logger, 'list_shares', UNKNOWN, UNKNOWN)
1121
Do the actual running
1123
return self.action_queue.client.list_shares()
1125
def handle_success(self, success):
1127
It worked! Push the event.
1129
self.action_queue.event_queue.push('AQ_SHARES_LIST',
1130
shares_list=success)
1132
def handle_failure(self, failure):
1134
It didn't work! Push the event.
1136
self.action_queue.event_queue.push('AQ_LIST_SHARES_ERROR',
1137
error=failure.getErrorMessage())
1140
class AnswerShare(ActionQueueMetaCommand):
1142
Answer a share offer
1144
def __init__(self, action_queue, share_id, answer):
1145
self.action_queue = action_queue
1146
self.share_id = share_id
1147
self.answer = answer
1148
self.log = mklog(logger, 'answer_share', share_id, UNKNOWN)
1152
Do the actual running
1154
return self.action_queue.client.accept_share(self.share_id, self.answer)
1157
class CreateShare(ActionQueueMetaCommand):
1159
Offer a share to somebody
1161
def __init__(self, action_queue, node_id, share_to, name, access_level,
1163
self.action_queue = action_queue
1164
self.node_id = node_id
1165
self.share_to = share_to
1167
self.access_level = access_level
1168
self.marker = marker
1169
self.log = mklog(logger, self.__class__.__name__, UNKNOWN, node_id)
1171
def store_marker_result(self, (node_id,)):
1173
Called when all the markers are realized.
1175
self.node_id = node_id
1179
Do the specialized pre-run setup
1181
return self.demark(self.node_id)
1185
Do the actual running
1187
return self.action_queue.client.create_share(self.node_id,
1192
def handle_success(self, success):
1194
It worked! Push the event.
1196
self.action_queue.event_queue.push('AQ_CREATE_SHARE_OK',
1197
share_id=success.share_id,
1201
def handle_failure(self, failure):
1203
It didn't work! Push the event.
1205
self.action_queue.event_queue.push('AQ_CREATE_SHARE_ERROR',
1207
error=failure.getErrorMessage())
1210
class GetContentMixin(object):
1212
Base for ListDir and Download. It's a mixin (ugh) because
1213
otherwise things would be even more confusing
1215
def __init__(self, action_queue, share_id, node_id, server_hash,
1217
self.action_queue = action_queue
1218
self.share_id = share_id
1219
self.node_id = node_id
1220
self.server_hash = server_hash
1221
self.fileobj_factory = fileobj_factory
1223
self.gunzip = zlib.decompressobj()
1224
self.log = mklog(logger, self.__class__.__name__, share_id, node_id,
1225
share=share_id, node=node_id, server_hash=server_hash,
1226
fileobj_factory=fileobj_factory)
1227
if (self.share_id, self.node_id) in self.action_queue.downloading:
1228
self.action_queue.cancel_download(self.share_id, self.node_id)
1232
Do the specialized pre-run setup
1234
return self.demark(self.node_id)
1236
def store_marker_result(self, (node_id,)):
1238
Called when all the markers are realized.
1240
self.node_id = node_id
1244
Do the actual running
1247
self.fileobj = self.fileobj_factory()
1248
except StandardError:
1249
return defer.fail(Failure('unable to build fileobj'
1250
' (file went away?)'
1251
' so aborting the download.'))
1252
self.action_queue.downloading[self.share_id,
1253
self.node_id] = {'n_bytes_read': 0}
1255
self.action_queue.event_queue.push('AQ_DOWNLOAD_STARTED',
1256
share_id=self.share_id,
1257
node_id=self.node_id,
1258
server_hash=self.server_hash)
1260
req = self.action_queue.client.get_content_request(
1261
self.share_id, self.node_id, self.server_hash,
1262
callback=self.cb, node_attr_callback=self.nacb)
1263
self.action_queue.downloading[self.share_id, self.node_id]['req'] = req
1265
d.addBoth(passit(lambda _:
1266
self.action_queue.downloading.pop((self.share_id,
1268
d.addErrback(passit(lambda _: self.reset_fileobj()))
1271
def handle_success(self, _):
1273
It worked! Push the event.
1275
self.action_queue.event_queue.push('AQ_DOWNLOAD_FINISHED',
1276
share_id=self.share_id,
1277
node_id=self.node_id,
1278
server_hash=self.server_hash)
1280
def handle_failure(self, failure):
1282
It didn't work! Push the event.
1284
self.action_queue.event_queue.push('AQ_DOWNLOAD_ERROR',
1285
error=failure.getErrorMessage(),
1286
share_id=self.share_id,
1287
node_id=self.node_id,
1288
server_hash=self.server_hash)
1290
def reset_fileobj(self):
1292
Rewind and empty the file (i.e. get it ready to try again if
1295
if self.fileobj is not None:
1296
self.fileobj.seek(0, 0)
1297
self.fileobj.truncate(0)
1299
def cb(self, bytes):
1301
A streaming decompressor
1303
dloading = self.action_queue.downloading[self.share_id,
1305
dloading['n_bytes_read'] += len(bytes)
1306
self.fileobj.write(self.gunzip.decompress(bytes))
1307
self.fileobj.flush() # not strictly necessary but nice to
1308
# see the downloaded size
1310
def nacb(self, **kwargs):
1312
set the node attrs in the 'currently downloading' dict
1314
self.action_queue.downloading[self.share_id,
1315
self.node_id].update(kwargs)
1319
Flush the buffers and sync them to disk if possible
1321
self.fileobj.write(self.gunzip.flush())
1322
self.fileobj.flush()
1323
if getattr(self.fileobj, 'fileno', None) is not None:
1324
# it's a real file, with a fileno! Let's sync its data
1326
os.fsync(self.fileobj.fileno())
1327
self.fileobj.close()
1330
class ListDir(GetContentMixin, ActionQueueMetaCommand):
1332
Get a listing of a directory's contents
1335
class Download(GetContentMixin, ActionQueueContentCommand):
1337
Get the contents of a file.
1340
class Upload(ActionQueueContentCommand):
1342
Upload stuff to a file
1344
retryable_errors = (ActionQueueContentCommand.retryable_errors
1345
| set(['UPLOAD_IN_PROGRESS']))
1347
def __init__(self, action_queue, share_id, node_id, previous_hash, hash,
1348
crc32, size, fileobj_factory, tempfile_factory):
1349
self.action_queue = action_queue
1350
self.share_id = share_id
1351
self.node_id = node_id
1352
self.previous_hash = previous_hash
1356
self.fileobj_factory = fileobj_factory
1357
self.tempfile_factory = tempfile_factory
1358
self.deflated_size = None
1359
self.tempfile = None
1360
self.cancelled = False
1361
self.upload_req = None
1362
self.log = mklog(logger, 'upload', share_id, node_id, share=share_id,
1363
node=node_id, previous_hash=previous_hash,
1364
hash=hash, crc32=crc32, size=size,
1365
fileobj_factory=fileobj_factory)
1366
if (self.share_id, self.node_id) in self.action_queue.uploading:
1367
self.action_queue.cancel_upload(self.share_id, self.node_id)
1370
"""Cancel the upload."""
1371
self.cancelled = True
1372
if self.upload_req is not None:
1373
self.upload_req.cancel()
1377
Do the specialized pre-run setup
1379
d = defer.Deferred()
1381
uploading = {"hash": self.hash, "req": self}
1382
self.action_queue.uploading[self.share_id, self.node_id] = uploading
1384
d = self.action_queue.zip_queue.zip(self)
1385
d.addCallback(lambda _: self.demark(self.node_id))
1388
def store_marker_result(self, (node_id,)):
1390
Called when all the markers are realized.
1392
self.node_id = node_id
1396
Do the actual running
1398
uploading = {"hash": self.hash, "deflated_size": self.deflated_size,
1400
self.action_queue.uploading[self.share_id, self.node_id] = uploading
1402
self.action_queue.event_queue.push('AQ_UPLOAD_STARTED',
1403
share_id=self.share_id,
1404
node_id=self.node_id,
1407
if getattr(self.tempfile, 'name', None) is not None:
1408
self.tempfile = open(self.tempfile.name)
1409
f = UploadProgressWrapper(self.tempfile, uploading)
1410
req = self.action_queue.client.put_content_request(
1411
self.share_id, self.node_id, self.previous_hash, self.hash,
1412
self.crc32, self.size, self.deflated_size, f)
1413
self.upload_req = req
1415
d.addBoth(passit(lambda _:
1416
self.action_queue.uploading.pop((self.share_id,
1418
d.addBoth(passit(lambda _: self.tempfile.close()))
1421
def handle_success(self, _):
1423
It worked! Push the event.
1425
if getattr(self.tempfile, 'name', None) is not None:
1426
os.unlink(self.tempfile.name)
1427
self.action_queue.event_queue.push('AQ_UPLOAD_FINISHED',
1428
share_id=self.share_id,
1429
node_id=self.node_id,
1432
def handle_failure(self, failure):
1434
It didn't work! Push the event.
1436
if getattr(self.tempfile, 'name', None) is not None:
1437
os.unlink(self.tempfile.name)
1438
self.action_queue.event_queue.push('AQ_UPLOAD_ERROR',
1439
error=failure.getErrorMessage(),
1440
share_id=self.share_id,
1441
node_id=self.node_id,