951
930
def make_file(self, share_id, parent_id, name, marker, path):
952
931
"""See .interfaces.IMetaQueue."""
953
932
return MakeFile(self.queue, share_id, parent_id,
954
name, marker, path).queue()
933
name, marker, path).go()
956
935
def make_dir(self, share_id, parent_id, name, marker, path):
957
936
"""See .interfaces.IMetaQueue."""
958
937
return MakeDir(self.queue, share_id, parent_id,
959
name, marker, path).queue()
938
name, marker, path).go()
961
940
def move(self, share_id, node_id, old_parent_id, new_parent_id,
962
941
new_name, path_from, path_to):
963
942
"""See .interfaces.IMetaQueue."""
964
943
return Move(self.queue, share_id, node_id, old_parent_id,
965
new_parent_id, new_name, path_from, path_to).queue()
944
new_parent_id, new_name, path_from, path_to).go()
967
946
def unlink(self, share_id, parent_id, node_id, path):
968
947
"""See .interfaces.IMetaQueue."""
969
return Unlink(self.queue, share_id, parent_id, node_id, path).queue()
948
return Unlink(self.queue, share_id, parent_id, node_id, path).go()
971
950
def inquire_free_space(self, share_id):
972
951
"""See .interfaces.IMetaQueue."""
973
return FreeSpaceInquiry(self.queue, share_id).queue()
952
return FreeSpaceInquiry(self.queue, share_id).go()
975
954
def inquire_account_info(self):
976
955
"""See .interfaces.IMetaQueue."""
977
return AccountInquiry(self.queue).queue()
956
return AccountInquiry(self.queue).go()
979
958
def list_shares(self):
980
959
"""See .interfaces.IMetaQueue."""
981
return ListShares(self.queue).queue()
960
return ListShares(self.queue).go()
983
962
def answer_share(self, share_id, answer):
984
963
"""See .interfaces.IMetaQueue."""
985
return AnswerShare(self.queue, share_id, answer).queue()
964
return AnswerShare(self.queue, share_id, answer).go()
987
966
def create_share(self, node_id, share_to, name, access_level,
989
968
"""See .interfaces.IMetaQueue."""
990
969
return CreateShare(self.queue, node_id, share_to, name,
991
access_level, marker, path).queue()
970
access_level, marker, path).go()
993
972
def delete_share(self, share_id):
994
973
"""See .interfaces.IMetaQueue."""
995
return DeleteShare(self.queue, share_id).queue()
974
return DeleteShare(self.queue, share_id).go()
997
976
def create_udf(self, path, name, marker):
998
977
"""See .interfaces.IMetaQueue."""
999
return CreateUDF(self.queue, path, name, marker).queue()
978
return CreateUDF(self.queue, path, name, marker).go()
1001
980
def list_volumes(self):
1002
981
"""See .interfaces.IMetaQueue."""
1003
return ListVolumes(self.queue).queue()
982
return ListVolumes(self.queue).go()
1005
984
def delete_volume(self, volume_id, path):
1006
985
"""See .interfaces.IMetaQueue."""
1007
return DeleteVolume(self.queue, volume_id, path).queue()
986
return DeleteVolume(self.queue, volume_id, path).go()
1009
988
def change_public_access(self, share_id, node_id, is_public):
1010
989
"""See .interfaces.IMetaQueue."""
1011
990
return ChangePublicAccess(self.queue, share_id,
1012
node_id, is_public).queue()
991
node_id, is_public).go()
1014
993
def get_public_files(self):
1015
994
"""See .interfaces.IMetaQueue."""
1016
return GetPublicFiles(self.queue).queue()
995
return GetPublicFiles(self.queue).go()
1018
997
def download(self, share_id, node_id, server_hash, path, fileobj_factory):
1019
998
"""See .interfaces.IContentQueue.download."""
1020
999
return Download(self.queue, share_id, node_id, server_hash,
1021
path, fileobj_factory).queue()
1000
path, fileobj_factory).go()
1023
1002
def upload(self, share_id, node_id, previous_hash, hash, crc32,
1024
1003
size, path, fileobj_factory, tempfile_factory=None):
1025
1004
"""See .interfaces.IContentQueue."""
1026
1005
return Upload(self.queue, share_id, node_id, previous_hash,
1027
1006
hash, crc32, size, path,
1028
fileobj_factory, tempfile_factory).queue()
1007
fileobj_factory, tempfile_factory).go()
1030
1009
def _cancel_op(self, share_id, node_id, cmdclass):
1031
1010
"""Generalized form of cancel_upload and cancel_download."""
1092
1070
is_runnable = True
1093
1071
uniqueness = None
1095
__slots__ = ('_queue', 'start_done', 'running', 'pathlock_release', 'log',
1096
'markers_resolved_deferred', 'action_queue', 'cancelled')
1073
__slots__ = ('_queue', 'running', 'pathlock_release', 'log',
1074
'markers_resolved_deferred', 'action_queue', 'cancelled',
1075
'wait_for_queue', 'wait_for_conditions')
1098
1077
def __init__(self, request_queue):
1099
1078
"""Initialize a command instance."""
1100
1079
self._queue = request_queue
1101
1080
self.action_queue = request_queue.action_queue
1102
self.start_done = False
1103
1081
self.running = False
1104
1082
self.log = None
1105
1083
self.markers_resolved_deferred = defer.Deferred()
1106
1084
self.pathlock_release = None
1107
1085
self.cancelled = False
1087
self.wait_for_queue = None
1088
self.wait_for_conditions = None
1109
1090
def to_dict(self):
1110
1091
"""Dump logged attributes to a dict."""
1111
1092
return dict((n, getattr(self, n, None)) for n in self.logged_attrs)
1163
1144
"""The command ended."""
1164
1145
self.running = False
1165
1146
self._queue.unqueue(self)
1166
if self.pathlock_release is not None:
1167
self.pathlock_release()
1169
def end_callback(self, arg):
1171
if not self.running:
1172
self.log.debug('not running, so no success')
1175
self.log.debug('success')
1176
self.handle_success(arg)
1179
def end_errback(self, failure):
1182
self.log.debug('not errbacking because cancelled')
1185
error_message = failure.getErrorMessage()
1186
if failure.check(*self.suppressed_error_messages):
1187
self.log.warn('failure: %s', error_message)
1189
self.log.error('failure: %s', error_message)
1190
self.log.debug('traceback follows:\n\n' + failure.getTraceback())
1193
if failure.check(*self.retryable_errors):
1194
if self._queue.active:
1195
reactor.callLater(0.1, self.run)
1197
self.log.debug('not retrying because queue not active')
1199
self.handle_failure(failure)
1202
def pre_queue_setup(self):
1203
"""Set up before the command gets really queued."""
1204
self.log = self.make_logger()
1208
"""Queue the command."""
1209
self.pre_queue_setup()
1210
self.log.debug('queueing')
1211
self._queue.queue(self)
1148
def _should_be_queued(self):
1149
"""Return True if the command should be queued."""
1213
1152
def cleanup(self):
1214
1153
"""Do whatever is needed to clean up from a failure.
1216
1155
For example, stop producers and others that aren't cleaned up
1217
appropriately on their own.
1156
appropriately on their own. Note that this may be called more
1219
1159
self.log.debug('cleanup')
1222
1162
"""Do the specialized pre-run setup."""
1223
1163
return defer.succeed(None)
1166
"""Unlock the command because the queue is back alive."""
1167
if self.wait_for_queue is not None:
1168
self.wait_for_queue.callback(True)
1169
self.wait_for_queue = None
1171
def check_conditions(self):
1172
"""If conditions are ok, run the command again."""
1173
if self.is_runnable and self.wait_for_conditions is not None:
1174
self.wait_for_conditions.callback(True)
1175
self.wait_for_conditions = None
1225
1177
@defer.inlineCallbacks
1226
def locked_run(self):
1179
"""Execute all the steps for a command."""
1180
# create the log and demark
1184
# queue if should, otherwise all is done
1185
if not self._should_be_queued():
1188
self.log.debug('queueing')
1189
self._queue.queue(self)
1228
1191
self.pathlock_release = yield self._acquire_pathlock()
1229
1192
if self.cancelled:
1230
self.pathlock_release()
1193
if self.pathlock_release is not None:
1194
self.log.debug('releasing the pathlock because of cancelled')
1195
self.pathlock_release()
1201
if self.pathlock_release is not None:
1202
self.pathlock_release()
1204
@defer.inlineCallbacks
1206
"""Run the command."""
1233
1207
self.running = True
1234
if self._queue.active:
1238
"""Continue running if was running before."""
1239
if self.running and self._queue.active:
1242
def check_conditions(self):
1243
"""If conditions are ok, run the command again."""
1244
if not self.running and self._queue.active:
1248
@defer.inlineCallbacks
1251
if not self.is_runnable:
1252
self.log.debug('not running because of conditions')
1253
self.running = False
1256
if not self._queue.active:
1257
self.log.debug('not retrying because queue not active')
1258
self.running = False
1263
self.log.debug('retrying')
1265
self.log.debug('starting')
1267
self.start_done = True
1208
self.log.debug('starting')
1213
self.log.debug('cancelled before trying to run')
1216
# if queue not active, wait for it and check again
1217
if not self._queue.active:
1218
self.log.debug('not running because of inactive queue')
1219
self.wait_for_queue = defer.Deferred()
1220
yield self.wait_for_queue
1223
if not self.is_runnable:
1224
self.log.debug('not running because of conditions')
1225
self.wait_for_conditions = defer.Deferred()
1226
yield self.wait_for_conditions
1268
1230
yield self.markers_resolved_deferred
1269
except Exception, e:
1270
yield self.end_errback(Failure(e))
1272
yield self._run_command()
1274
def _run_command(self):
1275
"""Really execute the command."""
1276
self.log.debug('running')
1278
d.addCallbacks(self.end_callback, self.end_errback)
1231
self.log.debug('running')
1232
result = yield self._run()
1234
except Exception, exc:
1236
self.log.debug('cancelled while running')
1238
if exc.__class__ in self.suppressed_error_messages:
1239
self.log.warn('failure: %s', exc)
1241
self.log.exception('failure: %s (traceback follows)', exc)
1244
if exc.__class__ in self.retryable_errors:
1245
self.log.debug('retrying')
1248
self.handle_failure(Failure(exc))
1251
self.log.debug('cancelled while running')
1253
self.log.debug('success')
1254
self.handle_success(result)
1256
# finish the command
1281
1260
def cancel(self):
1282
1261
"""Cancel the command."""