1034
1043
event_ok=None, handle_exception=False)
1035
1044
defer.returnValue(result.volumes)
1046
@defer.inlineCallbacks
1047
def _really_execute(self, command_class, *args, **kwargs):
1048
"""Actually queue and execute the operation."""
1049
cmd = command_class(self.queue, *args, **kwargs)
1051
# queue if should, otherwise all is done
1052
if cmd.should_be_queued():
1053
cmd.log.debug('queueing')
1054
self.queue.queue(cmd)
1056
self.queue.unqueue(cmd)
1058
@defer.inlineCallbacks
1059
def execute(self, command_class, *args, **kwargs):
1060
"""Execute a command only if there's room in memory to handle it."""
1061
if len(self.queue) > self.memory_pool_limit:
1062
# not enough room in memory, store it in the offloaded queue
1063
logger.debug('offload push: %s %s %s', command_class.__name__, args, kwargs)
1064
self.disk_queue.push((command_class.__name__, args, kwargs))
1067
# normal case, just instantiate the command and let it go
1068
yield self._really_execute(command_class, *args, **kwargs)
1070
# command just finished... check to queue more offloaded ones
1071
while len(self.queue) < self.memory_pool_limit and len(self.disk_queue) > 0:
1072
command_class_name, args, kwargs = self.disk_queue.pop()
1073
logger.debug('offload pop: %s %s %s', command_class_name, args, kwargs)
1074
command_class = self.commands[command_class_name]
1075
yield self._really_execute(command_class, *args, **kwargs)
1037
1077
def make_file(self, share_id, parent_id, name, marker, path):
1038
1078
"""See .interfaces.IMetaQueue."""
1039
return MakeFile(self.queue, share_id, parent_id,
1040
name, marker, path).go()
1079
self.execute(MakeFile, share_id, parent_id, name, marker, path)
1042
1081
def make_dir(self, share_id, parent_id, name, marker, path):
1043
1082
"""See .interfaces.IMetaQueue."""
1044
return MakeDir(self.queue, share_id, parent_id,
1045
name, marker, path).go()
1083
self.execute(MakeDir, share_id, parent_id, name, marker, path)
1047
1085
def move(self, share_id, node_id, old_parent_id, new_parent_id,
1048
1086
new_name, path_from, path_to):
1049
1087
"""See .interfaces.IMetaQueue."""
1050
return Move(self.queue, share_id, node_id, old_parent_id,
1051
new_parent_id, new_name, path_from, path_to).go()
1088
self.execute(Move, share_id, node_id, old_parent_id,
1089
new_parent_id, new_name, path_from, path_to)
1053
1091
def unlink(self, share_id, parent_id, node_id, path, is_dir):
1054
1092
"""See .interfaces.IMetaQueue."""
1055
return Unlink(self.queue, share_id, parent_id, node_id, path,
1093
self.execute(Unlink, share_id, parent_id, node_id, path, is_dir)
1058
1095
def inquire_free_space(self, share_id):
1059
1096
"""See .interfaces.IMetaQueue."""
1060
return FreeSpaceInquiry(self.queue, share_id).go()
1097
self.execute(FreeSpaceInquiry, share_id)
1062
1099
def inquire_account_info(self):
1063
1100
"""See .interfaces.IMetaQueue."""
1064
return AccountInquiry(self.queue).go()
1101
self.execute(AccountInquiry)
1066
1103
def list_shares(self):
1067
1104
"""See .interfaces.IMetaQueue."""
1068
return ListShares(self.queue).go()
1105
self.execute(ListShares)
1070
1107
def answer_share(self, share_id, answer):
1071
1108
"""See .interfaces.IMetaQueue."""
1072
return AnswerShare(self.queue, share_id, answer).go()
1109
self.execute(AnswerShare, share_id, answer)
1074
1111
def create_share(self, node_id, share_to, name, access_level,
1076
1113
"""See .interfaces.IMetaQueue."""
1077
return CreateShare(self.queue, node_id, share_to, name,
1078
access_level, marker, path).go()
1114
self.execute(CreateShare, node_id, share_to, name,
1115
access_level, marker, path)
1080
1117
def delete_share(self, share_id):
1081
1118
"""See .interfaces.IMetaQueue."""
1082
return DeleteShare(self.queue, share_id).go()
1119
self.execute(DeleteShare, share_id)
1084
1121
def create_udf(self, path, name, marker):
1085
1122
"""See .interfaces.IMetaQueue."""
1086
return CreateUDF(self.queue, path, name, marker).go()
1123
self.execute(CreateUDF, path, name, marker)
1088
1125
def list_volumes(self):
1089
1126
"""See .interfaces.IMetaQueue."""
1090
return ListVolumes(self.queue).go()
1127
self.execute(ListVolumes)
1092
1129
def delete_volume(self, volume_id, path):
1093
1130
"""See .interfaces.IMetaQueue."""
1094
return DeleteVolume(self.queue, volume_id, path).go()
1131
self.execute(DeleteVolume, volume_id, path)
1096
1133
def change_public_access(self, share_id, node_id, is_public):
1097
1134
"""See .interfaces.IMetaQueue."""
1098
return ChangePublicAccess(self.queue, share_id,
1099
node_id, is_public).go()
1135
self.execute(ChangePublicAccess, share_id, node_id, is_public)
1101
1137
def get_public_files(self):
1102
1138
"""See .interfaces.IMetaQueue."""
1103
return GetPublicFiles(self.queue).go()
1139
self.execute(GetPublicFiles)
1105
def download(self, share_id, node_id, server_hash, path, fileobj_factory):
1141
def download(self, share_id, node_id, server_hash, path):
1106
1142
"""See .interfaces.IContentQueue.download."""
1107
return Download(self.queue, share_id, node_id, server_hash,
1108
path, fileobj_factory).go()
1143
self.execute(Download, share_id, node_id, server_hash, path)
1110
1145
def upload(self, share_id, node_id, previous_hash, hash, crc32,
1111
size, path, fileobj_factory, upload_id=None):
1146
size, path, upload_id=None):
1112
1147
"""See .interfaces.IContentQueue."""
1113
return Upload(self.queue, share_id, node_id, previous_hash,
1114
hash, crc32, size, path, fileobj_factory,
1115
upload_id=upload_id).go()
1148
self.execute(Upload, share_id, node_id, previous_hash, hash, crc32,
1149
size, path, upload_id=upload_id)
1117
1151
def _cancel_op(self, share_id, node_id, cmdclass):
1118
1152
"""Generalized form of cancel_upload and cancel_download."""