1
# ubuntuone.syncdaemon.action_queue - Action queue
3
# Author: John Lenton <john.lenton@canonical.com>
5
# Copyright 2009 Canonical Ltd.
7
# This program is free software: you can redistribute it and/or modify it
8
# under the terms of the GNU General Public License version 3, as published
9
# by the Free Software Foundation.
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranties of
13
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
14
# PURPOSE. See the GNU General Public License for more details.
16
# You should have received a copy of the GNU General Public License along
17
# with this program. If not, see <http://www.gnu.org/licenses/>.
19
The ActionQueue is where actions to be performed on the server are
20
queued up and then executed. The idea is that there are two queues,
21
one for metadata and another for content; the metadata queue has
22
priority over the content queue.
24
from collections import deque, defaultdict
25
from functools import wraps, partial
32
from zope.interface import implements
33
from twisted.internet import reactor, defer, ssl
34
from twisted.names import client as dns_client
35
from twisted.python.failure import Failure
38
from ubuntuone.storageprotocol.context import get_ssl_context
39
from ubuntuone.storageprotocol import protocol_pb2
40
from ubuntuone.storageprotocol.client import StorageClient, \
42
from ubuntuone.syncdaemon.logger import mklog, TRACE
43
from ubuntuone.syncdaemon.interfaces import IActionQueue, \
46
logger = logging.getLogger("ubuntuone.SyncDaemon.ActionQueue")
48
# I want something which repr() is "---" *without* the quotes :)
49
UNKNOWN = type('', (), {'__repr__': lambda _: '---'})()
53
Pass the value on for the next deferred, while calling func with
66
class UploadCompressionCancelled(Exception):
67
"""Compression of a file for upload cancelled."""
70
class RequestCleanedUp(Exception):
72
The request was cancelled by ActionQueue.cleanup()
76
class NamedTemporaryFile(object):
78
Like tempfile.NamedTemporaryFile, but working in 2.5 also WRT the
79
delete argument. Actually, one of these NamedTemporaryFile()s is
80
the same as a tempfile.NamedTemporaryFile(delete=False) from 2.6.
82
Or so the theory goes.
85
fileno, self.name = tempfile.mkstemp()
86
self._fd = os.fdopen(fileno, 'r+w')
88
def __getattr__(self, attr):
89
"""proxy everything else (other than .name) on to self._fd"""
90
return getattr(self._fd, attr)
93
class MultiProxy(list):
95
Proxy many objects of the same kind, like this:
97
>>> m = MultiProxy(['foo', 'bar', 'baz'])
99
MultiProxy(['FOO', 'BAR', 'BAZ'])
102
def __getattr__(self, attr):
103
return MultiProxy(getattr(i, attr) for i in self)
105
def __call__(self, *args, **kwargs):
106
return MultiProxy(i(*args, **kwargs) for i in self)
109
return 'MultiProxy(%s)' % (super(MultiProxy, self).__repr__(),)
112
class LoggingStorageClient(StorageClient):
113
""" A subclass of StorageClient that adds logging to
114
processMessage and sendMessage.
118
""" setup logging and create the instance. """
119
StorageClient.__init__(self)
120
self.log = logging.getLogger('ubuntuone.SyncDaemon.StorageClient')
121
# configure the handler level to be < than DEBUG
122
self.log.setLevel(TRACE)
123
self.log.debug = partial(self.log.log, TRACE)
125
def processMessage(self, message):
126
""" wrapper that logs the message and result. """
127
# don't log the full message if it's of type BYTES
128
if message.type == protocol_pb2.Message.BYTES:
129
self.log.debug('start - processMessage: id: %s, type: %s',
130
message.id, message.type)
132
self.log.debug('start - processMessage: %s',
133
str(message).replace("\n", " "))
134
if message.id in self.requests:
135
req = self.requests[message.id]
136
req.deferred.addCallbacks(self.log_success, self.log_error)
137
result = StorageClient.processMessage(self, message)
138
self.log.debug('end - processMessage: id: %s - result: %s',
142
def log_error(self, failure):
143
""" logging errback for requests """
144
self.log.debug('request error: %s', failure)
147
def log_success(self, result):
148
""" logging callback for requests """
149
self.log.debug('request finished: %s', result)
150
if getattr(result, '__dict__', None):
151
self.log.debug('result.__dict__: %s', result.__dict__)
154
def sendMessage(self, message):
155
""" wrapper that logs the message and result. """
156
# don't log the full message if it's of type BYTES
157
if message.type == protocol_pb2.Message.BYTES:
158
self.log.debug('start - sendMessage: id: %s, type: %s',
159
message.id, message.type)
161
self.log.debug('start - sendMessage: %s',
162
str(message).replace("\n", " "))
163
result = StorageClient.sendMessage(self, message)
164
self.log.debug('end - sendMessage: id: %s', message.id)
168
class ActionQueueProtocol(LoggingStorageClient):
170
This is the Action Queue version of the StorageClient protocol.
172
connection_state = 'disconnected'
175
def connectionMade(self):
177
Called when a new connection is made.
178
All the state is saved in the factory.
180
logger.debug("connection made")
181
self.connection_state = 'connected'
182
self.factory.client = self
183
self.set_node_state_callback(self.factory._node_state_callback)
184
self.set_share_change_callback(self.factory._share_change_callback)
185
self.set_share_answer_callback(self.factory._share_answer_callback)
186
self.factory.event_queue.push('SYS_CONNECTION_MADE')
188
def disconnect(self):
190
Close down the sockets
192
logger.debug("disconnected")
193
if self.transport is not None:
194
self.transport.loseConnection()
196
def connectionLost(self, failure):
198
The connection went down, for some reason (which might or
199
might not be described in failure).
201
logger.warning('connection lost: %s' % failure.getErrorMessage())
202
self.factory.event_queue.push('SYS_CONNECTION_LOST')
203
LoggingStorageClient.connectionLost(self, failure)
208
A uuid4-based marker class
212
return super(Marker, self).__new__(self, uuid.uuid4())
215
class ZipQueue(object):
217
A queue of files to be compressed for upload
219
Parts of this were shamelessly copied from
220
twisted.internet.defer.DeferredSemaphore
225
self.waiting = deque()
226
self.tokens = self.limit = 10
230
return a deferred which fires on token acquisition.
232
assert self.tokens >= 0, "Tokens should never be negative"
235
self.waiting.append(d)
237
self.tokens = self.tokens - 1
245
Should be called by whoever did the acquire() when the shared
248
assert self.tokens < self.limit, "Too many tokens!"
249
self.tokens = self.tokens + 1
251
# someone is waiting to acquire token
252
self.tokens = self.tokens - 1
253
d = self.waiting.popleft()
256
def _compress(self, deferred, upload):
257
"""Compression background task."""
259
fileobj = upload.fileobj_factory()
260
except StandardError:
261
# presumably the user deleted the file before we got to
262
# upload it. Logging a warning just in case.
263
upload.log.warn('unable to build fileobj'
264
' (user deleted the file, maybe?)'
265
' so cancelling the upload.')
269
filename = getattr(fileobj, 'name', '<?>')
271
upload.log.debug('compressing', filename)
273
# we need to compress the file completely to figure out its
274
# compressed size. So streaming is out :(
275
if upload.tempfile_factory is None:
276
f = NamedTemporaryFile()
278
f = upload.tempfile_factory()
279
zipper = zlib.compressobj()
280
while not upload.cancelled:
281
data = fileobj.read(4096)
283
f.write(zipper.flush())
284
# no flush/sync because we don't need this to persist
285
# on disk; if the machine goes down, we'll lose it
286
# anyway (being in /tmp and all)
288
f.write(zipper.compress(data))
290
raise UploadCompressionCancelled("Cancelled")
291
upload.deflated_size = f.tell()
292
# close the compressed file (thus, if you actually want to stream
293
# it out, it must have a name so it can be reopnened)
296
except Exception, e: # pylint: disable-msg=W0703
297
reactor.callFromThread(deferred.errback, e)
299
reactor.callFromThread(deferred.callback, True)
301
def zip(self, upload):
303
Acquire, do the compression in a thread, release.
305
d_zip = defer.Deferred()
306
d_lck = self.acquire()
308
lambda _: reactor.callFromThread(self._compress,
309
d_zip, upload) or d_zip)
310
d_lck.addCallback(lambda _: self.release())
315
class RequestQueue(object):
317
RequestQueue is a queue that ensures that there is at most one
318
request at a time 'on the wire', and that uses the action queue's
319
states for its syncrhonization.
321
def __init__(self, name, action_queue):
322
super(RequestQueue, self).__init__()
324
self.action_queue = action_queue
325
self.waiting = deque()
330
"""return the length of the queue"""
331
return len(self.waiting)
333
def queue(self, func, *args, **kwargs):
335
Add a call to func to the queue.
338
d.addCallback(lambda _: defer.maybeDeferred(func, *args, **kwargs))
339
self.waiting.append(d)
340
if len(self.waiting) == 1 and not self.head:
341
self.action_queue.event_queue.push('SYS_' + self.name
350
d = self.waiting.popleft()
351
d.addBoth(passit(lambda _: self.run()))
354
self.action_queue.event_queue.push('SYS_' + self.name + '_DONE')
356
class ContentQueue(RequestQueue):
358
A content queue is a queue of content requests (uploads and downloads).
365
d = self.waiting.popleft()
366
d.addBoth(passit(lambda _: self.action_queue.event_queue.push(
367
'SYS_' + self.name + '_DONE')))
368
d.addBoth(passit(lambda _: self.action_queue.event_queue.push(
369
'SYS_' + self.name + '_WAITING')))
372
self.action_queue.event_queue.push('SYS_' + self.name + '_DONE')
375
class MetaQueue(RequestQueue):
377
A meta queue is a queue of metadata-related requests.
381
class DeferredMap(object):
383
A mapping of deferred values. Or a deferred map of values. Or a
384
mapping that returns deferreds and then fires them when it has the
388
self.waiting = defaultdict(list)
394
Get the value for the given key.
396
This always returns a deferred; when we already know the value
397
we return a `succeed`, and if we don't know the value because
398
it failed we return a `fail`; otherwise we return a plain
399
unfired `Deferred`, and add it to the list of deferreds to
400
call when we actually get the value.
403
return defer.succeed(self.map[key])
404
if key in self.failed:
405
return defer.fail(Exception(self.failed[key]))
407
self.waiting[key].append(d)
410
def set(self, key, value):
412
We've got the value for a key! Write it down in the map, and
413
fire the waiting deferreds.
415
if key not in self.map:
416
self.map[key] = value
417
for d in self.waiting.pop(key, ()):
419
elif self.map[key] != value:
421
raise KeyError("key is taken -- dunno what to do")
423
def err(self, key, failure):
425
Something went terribly wrong in the process of getting a
426
value. Break the news to the waiting deferreds.
428
self.failed[key] = failure.getErrorMessage()
429
for d in self.waiting.pop(key, ()):
433
class UploadProgressWrapper(object):
435
A wrapper around the file-like object used for Uploads, with which
436
we can keep track of the number of bytes that have been written to
439
def __init__(self, fd, data_dict):
441
fd is the file-like object used for uploads. data_dict is the
442
entry in the uploading dictionary.
445
self.data_dict = data_dict
446
self.n_bytes_read = 0
448
def read(self, size=None):
450
read at most size bytes from the file-like object.
452
Keep track of the number of bytes that have been read, and the
453
number of bytes that have been written (assumed to be equal to
454
the number of bytes read on the previews call to read). The
455
latter is done directly in the data_dict.
457
self.data_dict['n_bytes_written'] = self.n_bytes_read
458
data = self.fd.read(size)
459
self.n_bytes_read += len(data)
462
def __getattr__(self, attr):
466
return getattr(self.fd, attr)
469
class ActionQueue(StorageClientFactory, object):
471
This is the ActionQueue itself.
473
implements(IActionQueue)
474
protocol = ActionQueueProtocol
476
def __init__(self, event_queue, host, port, dns_srv,
477
use_ssl=False, disable_ssl_verify=False):
478
self.event_queue = event_queue
481
self.dns_srv = dns_srv
482
self.use_ssl = use_ssl
483
self.disable_ssl_verify = disable_ssl_verify
489
self.content_queue = ContentQueue('CONTENT_QUEUE', self)
490
self.meta_queue = MetaQueue('META_QUEUE', self)
491
self.uuid_map = DeferredMap()
492
self.zip_queue = ZipQueue()
495
self.downloading = {}
497
event_queue.subscribe(self)
499
def handle_SYS_CONNECT(self, access_token):
501
Stow the access token away for later use
503
self.token = access_token
507
Cancel, clean up, and reschedule things that were in progress
508
when a disconnection happened
510
self.event_queue.push('SYS_CLEANUP_STARTED')
511
if self.client is not None:
512
self.client.disconnect()
513
for queue in self.meta_queue, self.content_queue:
514
if queue.head is not None:
515
queue.head.errback(RequestCleanedUp('Cleaned up'))
516
self.event_queue.push('SYS_CLEANUP_FINISHED')
519
def _node_state_callback(self, share_id, node_id, hash):
521
Called by the client when notified that node changed.
523
self.event_queue.push('SV_HASH_NEW',
524
share_id=share_id, node_id=node_id, hash=hash)
526
def _share_change_callback(self, message, info):
528
Called by the client when notified that a share changed.
530
self.event_queue.push('SV_SHARE_CHANGED',
531
message=message, info=info)
533
def _share_answer_callback(self, share_id, answer):
535
Called by the client when it gets a share answer notification.
537
self.event_queue.push('SV_SHARE_ANSWERED',
538
share_id=str(share_id), answer=answer)
540
def _lookup_srv(self):
541
""" do the SRV lookup and return a deferred whose callback is going
542
to be called with (host, port). If we can't do the lookup, the default
545
def on_lookup_ok(results):
546
"""Get a random host from the SRV result."""
547
logger.debug('SRV lookup done, choosing a server')
548
records, auth, add = results
550
raise ValueError("No available records.")
551
# pick a random server
552
record = random.choice(records)
553
logger.debug('Using record: %r', record)
555
return record.payload.target.name, record.payload.port
557
logger.info('Empty SRV record, fallback to %r:%r',
558
self.host, self.port)
559
return self.host, self.port
561
def on_lookup_error(failure):
562
""" return the default host/post on a DNS SRV lookup failure. """
563
logger.info("SRV lookup error, fallback to %r:%r \n%s",
564
self.host, self.port, failure.getTraceback())
565
return self.host, self.port
568
# lookup the DNS SRV records
569
d = dns_client.lookupService(self.dns_srv, timeout=[3, 2])
570
d.addCallback(on_lookup_ok)
571
d.addErrback(on_lookup_error)
574
return defer.succeed((self.host, self.port))
578
Start the circus going.
580
self.deferred = defer.Deferred()
581
d = self._lookup_srv()
582
def _connect(result):
583
""" do the real thing """
585
sslContext = get_ssl_context(self.disable_ssl_verify)
588
reactor.connectSSL(host, port, self, sslContext)
590
reactor.connectTCP(host, port, self)
591
d.addCallback(_connect)
594
def conectionFailed(self, reason=None):
596
Called when the connect() call fails
598
self.deferred.errback(reason)
600
def get_root(self, marker):
602
Get the user's root uuid. Use the uuid_map, so the caller can
603
use the marker in followup operations.
605
log = mklog(logger, 'get_root', '', marker, marker=marker)
606
log.debug('starting')
607
d = self.client.get_root()
608
d.addCallbacks(*log.callbacks())
609
d.addCallbacks(passit(lambda root: self.uuid_map.set(marker, root)),
610
passit(lambda f: self.uuid_map.err(marker, f)))
614
def make_file(self, share_id, parent_id, name, marker):
616
See .interfaces.IMetaQueue
618
return MakeFile(self, share_id, parent_id, name, marker).start()
620
def make_dir(self, share_id, parent_id, name, marker):
622
See .interfaces.IMetaQueue
624
return MakeDir(self, share_id, parent_id, name, marker).start()
626
def move(self, share_id, node_id, old_parent_id, new_parent_id, new_name):
628
See .interfaces.IMetaQueue
630
return Move(self, share_id, node_id, old_parent_id,
631
new_parent_id, new_name).start()
633
def unlink(self, share_id, parent_id, node_id):
635
See .interfaces.IMetaQueue
637
return Unlink(self, share_id, parent_id, node_id).start()
639
def query(self, items):
641
See .interfaces.IMetaQueue
643
return Query(self, items).start()
645
def list_shares(self):
647
List the shares; put the result on the event queue
649
return ListShares(self).start()
651
def answer_share(self, share_id, answer):
653
Answer the offer of a share.
655
return AnswerShare(self, share_id, answer).start()
657
def create_share(self, node_id, share_to, name, access_level, marker):
659
Share a node with somebody.
661
return CreateShare(self, node_id, share_to, name, access_level,
664
def listdir(self, share_id, node_id, server_hash, fileobj_factory):
666
See .interfaces.IMetaQueue.listdir
668
return ListDir(self, share_id, node_id, server_hash,
669
fileobj_factory).start()
671
def download(self, share_id, node_id, server_hash, fileobj_factory):
673
See .interfaces.IContentQueue.download
675
return Download(self, share_id, node_id, server_hash,
676
fileobj_factory).start()
678
def upload(self, share_id, node_id, previous_hash, hash, crc32,
679
size, fileobj_factory, tempfile_factory=None):
681
See .interfaces.IContentQueue
683
return Upload(self, share_id, node_id, previous_hash, hash,
684
crc32, size, fileobj_factory, tempfile_factory).start()
686
def cancel_upload(self, share_id, node_id):
688
See .interfaces.IContentQueue
690
log = mklog(logger, 'cancel_upload', share_id, node_id,
691
share=share_id, node=node_id)
692
log.debug('starting')
693
if (share_id, node_id) in self.uploading:
694
req = self.uploading[share_id, node_id].get('req')
697
log.debug("cancelled")
698
log.debug('finished')
700
def cancel_download(self, share_id, node_id):
702
See .interfaces.IContentQueue
704
log = mklog(logger, 'cancel_download', share_id, node_id,
705
share=share_id, node=node_id)
706
log.debug('starting')
707
if (share_id, node_id) in self.downloading:
708
req = self.downloading[share_id, node_id].get('req')
711
log.debug("cancelled")
712
log.debug('finished')
715
SKIP_THIS_ITEM = object()
717
# pylint: disable-msg=W0231
719
class ActionQueueCommand(object):
721
Base of all the action queue commands
724
# protobuf doesn't seem to have much introspectionable stuff
725
# without going into private attributes
726
known_error_messages = (set(protocol_pb2._ERROR_ERRORTYPE.values_by_name)
727
| set(['CANCELLED']))
728
suppressed_error_messages = (known_error_messages
729
- set(['INTERNAL_ERROR'])
730
| set(['Cleaned up']))
731
retryable_errors = set(['Cleaned up', 'TRY_AGAIN'])
733
def demark(self, *maybe_markers):
735
Arrange to have maybe_markers realized
738
for marker in maybe_markers:
739
if IMarker.providedBy(marker):
740
self.log.debug('waiting until we know the real value of %s'
742
d = self.action_queue.uuid_map.get(marker)
743
d.addCallbacks(passit(lambda _:
744
self.log.debug('got %s' % marker)),
746
self.log.error('failed %s' % marker)))
748
d = defer.succeed(marker)
750
dl = defer.DeferredList(l, fireOnOneErrback=True, consumeErrors=True)
751
dl.addCallbacks(self.unwrap,
752
lambda f: f.value.subFailure)
758
Unpack the values from the result of a DeferredList. If
759
there's a failure, return it instead.
762
for result in results:
763
# result can be none if one of the callbacks failed
764
# before the others were ready
765
if result is not None:
766
is_ok, value = result
770
if value is not SKIP_THIS_ITEM:
775
def end_callback(self, arg):
779
self._queue.head = None
780
self.log.debug('success')
781
return self.handle_success(arg)
783
def end_errback(self, failure):
787
self._queue.head = None
788
error_message = failure.getErrorMessage()
789
if error_message not in self.suppressed_error_messages:
790
self.log.error('failure', error_message)
791
self.log.debug('traceback follows:\n\n' + failure.getTraceback())
793
self.log.warn('failure', error_message)
795
if error_message in self.retryable_errors:
796
reactor.callLater(0.1, self.retry)
798
return self.handle_failure(failure)
800
def start(self, _=None):
802
Get ready to run, and then run.
804
The default implementation is for when there is no preparation necessary
807
d.addCallback(self.store_marker_result)
808
d.addCallback(lambda _: self.log.debug('queueing in the %s'
810
d.addCallbacks(lambda _: self._queue.queue(self.run),
816
Do whatever is needed to clean up from a failure (such as stop
817
producers and other such that aren't cleaned up appropriately
820
The default implementation does absolutely nothing.
825
Do the specialized pre-run setup
827
return defer.succeed(None)
829
def store_marker_result(self, _):
831
Called when all the markers are realized.
838
self.log.debug('starting')
839
self._queue.head = d = self._run()
840
d.addCallbacks(self.end_callback, self.end_errback)
843
def handle_success(self, success):
845
Do anthing that's needed to handle success of the operation.
849
def handle_failure(self, failure):
851
Do anthing that's needed to handle failure of the operation.
852
Note that cancellation and TRY_AGAIN are already handled.
858
Request cancelled or TRY_AGAIN. Well then, try again!
860
return self._queue.queue(self.run)
864
class ActionQueueMetaCommand(ActionQueueCommand):
866
Base of metadata-related commands (commands that are queued in the
872
Get at the meta queue
874
return self.action_queue.meta_queue
877
class ActionQueueContentCommand(ActionQueueCommand):
879
Base of content-related commands (commands that are queued in the
885
Get at the content queue
887
return self.action_queue.content_queue
890
class MakeThing(ActionQueueMetaCommand):
892
Base of MakeFile and MakeDir
894
def __init__(self, action_queue, share_id, parent_id, name, marker):
895
self.action_queue = action_queue
896
self.share_id = share_id
897
self.parent_id = parent_id
898
# Unicode boundary! the name is Unicode in protocol and server, but
899
# here we use bytes for paths
900
self.name = name.decode("utf8")
902
self.log = mklog(logger, self.__class__.__name__, share_id, marker,
903
share=share_id, parent=parent_id, name=name,
908
Do the specialized pre-run setup
910
return self.demark(self.share_id, self.parent_id)
912
def store_marker_result(self, (share_id, parent_id)):
914
Called when all the markers are realized.
916
self.share_id = share_id
917
self.parent_id = parent_id
921
Do the actual running
923
maker = getattr(self.action_queue.client, self.client_method)
924
return maker(self.share_id,
928
def handle_success(self, success):
930
It worked! Push the event, and update the uuid map.
932
# note that we're not getting the new name from the answer
933
# message, if we would get it, we would have another Unicode
935
self.action_queue.event_queue.push(self.ok_event_name,
937
new_id=success.new_id)
938
if IMarker.providedBy(self.marker):
939
self.action_queue.uuid_map.set(self.marker, success.new_id)
942
def handle_failure(self, failure):
944
It didn't work! Push the event, and update the uuid map.
946
self.action_queue.event_queue.push(self.error_event_name,
948
error=failure.getErrorMessage())
949
if IMarker.providedBy(self.marker):
950
self.action_queue.uuid_map.err(self.marker,
954
class MakeFile(MakeThing):
958
ok_event_name = 'AQ_FILE_NEW_OK'
959
error_event_name = 'AQ_FILE_NEW_ERROR'
960
client_method = 'make_file'
963
class MakeDir(MakeThing):
967
ok_event_name = 'AQ_DIR_NEW_OK'
968
error_event_name = 'AQ_DIR_NEW_ERROR'
969
client_method = 'make_dir'
972
class Move(ActionQueueMetaCommand):
974
Move a file or directory
976
def __init__(self, action_queue, share_id, node_id, old_parent_id,
977
new_parent_id, new_name):
978
self.action_queue = action_queue
979
self.share_id = share_id
980
self.node_id = node_id
981
self.old_parent_id = old_parent_id
982
self.new_parent_id = new_parent_id
983
# Unicode boundary! the name is Unicode in protocol and server, but
984
# here we use bytes for paths
985
self.new_name = new_name.decode("utf8")
986
self.log = mklog(logger, self.__class__.__name__, share_id, node_id,
987
share=share_id, node=node_id,
988
old_parent=old_parent_id, new_parent=new_parent_id,
993
Do the specialized pre-run setup
995
return self.demark(self.share_id, self.node_id, self.new_parent_id)
997
def store_marker_result(self, (share_id, node_id, new_parent_id)):
999
Called when all the markers are realized.
1001
self.share_id = share_id
1002
self.node_id = node_id
1003
self.new_parent_id = new_parent_id
1007
Do the actual running
1009
return self.action_queue.client.move(self.share_id,
1013
def handle_success(self, success):
1015
It worked! Push the event.
1017
self.action_queue.event_queue.push('AQ_MOVE_OK',
1018
share_id=self.share_id,
1019
node_id=self.node_id)
1022
def handle_failure(self, failure):
1024
It didn't work! Push the event.
1026
self.action_queue.event_queue.push('AQ_MOVE_ERROR',
1027
error=failure.getErrorMessage(),
1028
share_id=self.share_id,
1029
node_id=self.node_id,
1030
old_parent_id=self.old_parent_id,
1031
new_parent_id=self.new_parent_id,
1032
new_name=self.new_name)
1035
class Unlink(ActionQueueMetaCommand):
1037
Unlink a file or dir
1039
def __init__(self, action_queue, share_id, parent_id, node_id):
1040
self.action_queue = action_queue
1041
self.share_id = share_id
1042
self.node_id = node_id
1043
self.parent_id = parent_id
1044
self.log = mklog(logger, self.__class__.__name__, share_id, node_id,
1045
share=share_id, node=node_id, parent=parent_id)
1049
Do the specialized pre-run setup
1051
return self.demark(self.share_id, self.node_id, self.parent_id)
1053
def store_marker_result(self, (share_id, node_id, parent_id)):
1055
Called when all the markers are realized.
1057
self.share_id = share_id
1058
self.node_id = node_id
1059
self.parent_id = parent_id
1063
Do the actual running
1065
return self.action_queue.client.unlink(self.share_id, self.node_id)
1067
def handle_success(self, success):
1069
It worked! Push the event.
1071
self.action_queue.event_queue.push('AQ_UNLINK_OK',
1072
share_id=self.share_id,
1073
parent_id=self.parent_id,
1074
node_id=self.node_id)
1077
def handle_failure(self, failure):
1079
It didn't work! Push the event.
1081
self.action_queue.event_queue.push('AQ_UNLINK_ERROR',
1082
error=failure.getErrorMessage(),
1083
share_id=self.share_id,
1084
parent_id=self.parent_id,
1085
node_id=self.node_id)
1088
class Query(ActionQueueMetaCommand):
1090
Ask about the freshness of server hashes
1092
def __init__(self, action_queue, items):
1093
self.log = MultiProxy(
1094
[mklog(logger, '(unrolled) query', share, node,
1095
share=share, node=node, hash=hash, index=i)
1096
for (i, (share, node, hash)) in enumerate(items)])
1097
self.action_queue = action_queue
1100
def store_marker_result(self, items):
1102
Called when all the markers are realized.
1108
Do the specialized pre-run setup
1110
# node_hash will (should?) never be a marker, but it's the
1111
# easiest way to keep the trio together: send it along for the
1114
for item in self.items:
1115
d = self.demark(*item)
1116
d.addErrback(self.handle_single_failure, item)
1118
d = defer.DeferredList(dl, fireOnOneErrback=True, consumeErrors=True)
1119
d.addCallbacks(self.unwrap)
1122
def handle_failure(self, failure):
1124
It didn't work! Never mind.
1128
def handle_single_failure(self, failure, item):
1130
The only failure mode of a Query is for a query to be done
1131
using a marker that fails to realize.
1133
self.action_queue.event_queue.push('AQ_QUERY_ERROR', item=item,
1134
error=failure.getErrorMessage())
1135
return SKIP_THIS_ITEM
1139
Do the actual running
1141
return self.action_queue.client.query(self.items)
1144
class ListShares(ActionQueueMetaCommand):
1146
List shares shared to me
1148
def __init__(self, action_queue):
1149
self.action_queue = action_queue
1150
self.log = mklog(logger, 'list_shares', UNKNOWN, UNKNOWN)
1154
Do the actual running
1156
return self.action_queue.client.list_shares()
1158
def handle_success(self, success):
1160
It worked! Push the event.
1162
self.action_queue.event_queue.push('AQ_SHARES_LIST',
1163
shares_list=success)
1165
def handle_failure(self, failure):
1167
It didn't work! Push the event.
1169
self.action_queue.event_queue.push('AQ_LIST_SHARES_ERROR',
1170
error=failure.getErrorMessage())
1173
class AnswerShare(ActionQueueMetaCommand):
1175
Answer a share offer
1177
def __init__(self, action_queue, share_id, answer):
1178
self.action_queue = action_queue
1179
self.share_id = share_id
1180
self.answer = answer
1181
self.log = mklog(logger, 'answer_share', share_id, UNKNOWN)
1185
Do the actual running
1187
return self.action_queue.client.accept_share(self.share_id, self.answer)
1190
class CreateShare(ActionQueueMetaCommand):
1192
Offer a share to somebody
1194
def __init__(self, action_queue, node_id, share_to, name, access_level,
1196
self.action_queue = action_queue
1197
self.node_id = node_id
1198
self.share_to = share_to
1200
self.access_level = access_level
1201
self.marker = marker
1202
self.log = mklog(logger, self.__class__.__name__, UNKNOWN, node_id)
1204
def store_marker_result(self, (node_id,)):
1206
Called when all the markers are realized.
1208
self.node_id = node_id
1212
Do the specialized pre-run setup
1214
return self.demark(self.node_id)
1218
Do the actual running
1220
return self.action_queue.client.create_share(self.node_id,
1225
def handle_success(self, success):
1227
It worked! Push the event.
1229
self.action_queue.event_queue.push('AQ_CREATE_SHARE_OK',
1230
share_id=success.share_id,
1234
def handle_failure(self, failure):
1236
It didn't work! Push the event.
1238
self.action_queue.event_queue.push('AQ_CREATE_SHARE_ERROR',
1240
error=failure.getErrorMessage())
1243
class GetContentMixin(object):
1245
Base for ListDir and Download. It's a mixin (ugh) because
1246
otherwise things would be even more confusing
1248
def __init__(self, action_queue, share_id, node_id, server_hash,
1250
self.action_queue = action_queue
1251
self.share_id = share_id
1252
self.node_id = node_id
1253
self.server_hash = server_hash
1254
self.fileobj_factory = fileobj_factory
1256
self.gunzip = zlib.decompressobj()
1257
self.log = mklog(logger, self.__class__.__name__, share_id, node_id,
1258
share=share_id, node=node_id, server_hash=server_hash,
1259
fileobj_factory=fileobj_factory)
1260
if (self.share_id, self.node_id) in self.action_queue.downloading:
1261
self.action_queue.cancel_download(self.share_id, self.node_id)
1265
Do the specialized pre-run setup
1267
return self.demark(self.node_id)
1269
def store_marker_result(self, (node_id,)):
1271
Called when all the markers are realized.
1273
self.node_id = node_id
1277
Do the actual running
1280
self.fileobj = self.fileobj_factory()
1281
except StandardError:
1282
return defer.fail(Failure('unable to build fileobj'
1283
' (file went away?)'
1284
' so aborting the download.'))
1285
self.action_queue.downloading[self.share_id,
1286
self.node_id] = {'n_bytes_read': 0}
1288
self.action_queue.event_queue.push('AQ_DOWNLOAD_STARTED',
1289
share_id=self.share_id,
1290
node_id=self.node_id,
1291
server_hash=self.server_hash)
1293
req = self.action_queue.client.get_content_request(
1294
self.share_id, self.node_id, self.server_hash,
1295
callback=self.cb, node_attr_callback=self.nacb)
1296
self.action_queue.downloading[self.share_id, self.node_id]['req'] = req
1298
d.addBoth(passit(lambda _:
1299
self.action_queue.downloading.pop((self.share_id,
1301
d.addErrback(passit(lambda _: self.reset_fileobj()))
1304
def handle_success(self, _):
1306
It worked! Push the event.
1308
self.action_queue.event_queue.push('AQ_DOWNLOAD_FINISHED',
1309
share_id=self.share_id,
1310
node_id=self.node_id,
1311
server_hash=self.server_hash)
1313
def handle_failure(self, failure):
1315
It didn't work! Push the event.
1317
self.action_queue.event_queue.push('AQ_DOWNLOAD_ERROR',
1318
error=failure.getErrorMessage(),
1319
share_id=self.share_id,
1320
node_id=self.node_id,
1321
server_hash=self.server_hash)
1323
def reset_fileobj(self):
1325
Rewind and empty the file (i.e. get it ready to try again if
1328
if self.fileobj is not None:
1329
self.fileobj.seek(0, 0)
1330
self.fileobj.truncate(0)
1332
def cb(self, bytes):
1334
A streaming decompressor
1336
dloading = self.action_queue.downloading[self.share_id,
1338
dloading['n_bytes_read'] += len(bytes)
1339
self.fileobj.write(self.gunzip.decompress(bytes))
1340
self.fileobj.flush() # not strictly necessary but nice to
1341
# see the downloaded size
1343
def nacb(self, **kwargs):
1345
set the node attrs in the 'currently downloading' dict
1347
self.action_queue.downloading[self.share_id,
1348
self.node_id].update(kwargs)
1352
Flush the buffers and sync them to disk if possible
1354
self.fileobj.write(self.gunzip.flush())
1355
self.fileobj.flush()
1356
if getattr(self.fileobj, 'fileno', None) is not None:
1357
# it's a real file, with a fileno! Let's sync its data
1359
os.fsync(self.fileobj.fileno())
1360
self.fileobj.close()
1363
class ListDir(GetContentMixin, ActionQueueMetaCommand):
1365
Get a listing of a directory's contents
1368
class Download(GetContentMixin, ActionQueueContentCommand):
1370
Get the contents of a file.
1373
class Upload(ActionQueueContentCommand):
1375
Upload stuff to a file
1377
retryable_errors = (ActionQueueContentCommand.retryable_errors
1378
| set(['UPLOAD_IN_PROGRESS']))
1380
def __init__(self, action_queue, share_id, node_id, previous_hash, hash,
1381
crc32, size, fileobj_factory, tempfile_factory):
1382
self.action_queue = action_queue
1383
self.share_id = share_id
1384
self.node_id = node_id
1385
self.previous_hash = previous_hash
1389
self.fileobj_factory = fileobj_factory
1390
self.tempfile_factory = tempfile_factory
1391
self.deflated_size = None
1392
self.tempfile = None
1393
self.cancelled = False
1394
self.upload_req = None
1395
self.log = mklog(logger, 'upload', share_id, node_id, share=share_id,
1396
node=node_id, previous_hash=previous_hash,
1397
hash=hash, crc32=crc32, size=size,
1398
fileobj_factory=fileobj_factory)
1399
if (self.share_id, self.node_id) in self.action_queue.uploading:
1400
self.action_queue.cancel_upload(self.share_id, self.node_id)
1403
"""Cancel the upload."""
1404
self.cancelled = True
1405
if self.upload_req is not None:
1406
self.upload_req.cancel()
1410
Cleanup: stop the producer.
1412
if self.upload_req.producer is not None:
1413
self.upload_req.producer.stopProducing()
1417
Do the specialized pre-run setup
1419
d = defer.Deferred()
1421
uploading = {"hash": self.hash, "req": self}
1422
self.action_queue.uploading[self.share_id, self.node_id] = uploading
1424
d = self.action_queue.zip_queue.zip(self)
1425
d.addCallback(lambda _: self.demark(self.node_id))
1428
def store_marker_result(self, (node_id,)):
1430
Called when all the markers are realized.
1432
# update action_queue.uploading with the real node_id
1433
uploading = self.action_queue.uploading.pop((self.share_id,
1435
self.node_id = node_id
1436
self.action_queue.uploading[self.share_id, node_id] = uploading
1441
Do the actual running
1443
uploading = {"hash": self.hash, "deflated_size": self.deflated_size,
1445
self.action_queue.uploading[self.share_id, self.node_id] = uploading
1447
self.action_queue.event_queue.push('AQ_UPLOAD_STARTED',
1448
share_id=self.share_id,
1449
node_id=self.node_id,
1452
if getattr(self.tempfile, 'name', None) is not None:
1453
self.tempfile = open(self.tempfile.name)
1454
f = UploadProgressWrapper(self.tempfile, uploading)
1455
req = self.action_queue.client.put_content_request(
1456
self.share_id, self.node_id, self.previous_hash, self.hash,
1457
self.crc32, self.size, self.deflated_size, f)
1458
self.upload_req = req
1460
d.addBoth(passit(lambda _:
1461
self.action_queue.uploading.pop((self.share_id,
1463
d.addBoth(passit(lambda _: self.tempfile.close()))
1466
def handle_success(self, _):
1468
It worked! Push the event.
1470
if getattr(self.tempfile, 'name', None) is not None:
1471
os.unlink(self.tempfile.name)
1472
self.action_queue.event_queue.push('AQ_UPLOAD_FINISHED',
1473
share_id=self.share_id,
1474
node_id=self.node_id,
1477
def handle_failure(self, failure):
1479
It didn't work! Push the event.
1481
if getattr(self.tempfile, 'name', None) is not None:
1482
os.unlink(self.tempfile.name)
1483
self.action_queue.event_queue.push('AQ_UPLOAD_ERROR',
1484
error=failure.getErrorMessage(),
1485
share_id=self.share_id,
1486
node_id=self.node_id,