102
102
"""...that only knows how to go away"""
105
class FakeActionQueue(object):
106
"""Stub implementation."""
108
implements(interfaces.IActionQueue)
110
def __init__(self, eq, *args, **kwargs):
111
""" Creates the instance """
112
self.eq = self.event_queue = eq
114
self.downloading = {}
115
# pylint: disable-msg=C0103
116
class UUID_Map(object):
118
def set(self, *args):
119
"""mock set method"""
122
self.uuid_map = UUID_Map()
123
self.content_queue = action_queue.ContentQueue('CONTENT_QUEUE', self)
124
self.meta_queue = action_queue.RequestQueue('META_QUEUE', self)
126
# throttling attributes
127
self.readLimit = None
128
self.writeLimit = None
129
self.throttling_enabled = False
131
def connect(self, host=None, port=None, user_ssl=False):
132
"""Stub implementation."""
133
self.eq.push('SYS_CONNECTION_MADE')
135
def disconnect(self):
136
"""Stub implementation."""
138
def enable_throttling(self):
139
"""Stub implementation."""
140
self.throttling_enabled = True
142
def disable_throttling(self):
143
"""Stub implementation."""
144
self.throttling_enabled = False
146
def cancel_download(self, share_id, node_id):
147
"""Stub implementation."""
149
def cancel_upload(self, share_id, node_id):
150
"""Stub implementation."""
152
def download(self, share_id, node_id, server_hash, fileobj):
153
"""Stub implementation."""
155
def upload(self, share_id, node_id, previous_hash, hash, crc32,
156
size, deflated_size, fileobj):
157
"""Stub implementation."""
159
def make_file(self, share_id, parent_id, name, marker):
160
"""Stub implementation."""
162
def make_dir(self, share_id, parent_id, name, marker):
163
"""Stub implementation."""
165
def move(self, share_id, node_id, old_parent_id, new_parent_id, new_name):
166
"""Stub implementation."""
168
def unlink(self, share_id, node_id):
169
"""Stub implementation."""
171
def query(self, items):
172
"""Stub implementation."""
174
def listdir(self, share_id, node_id, server_hash, fileobj):
175
"""Stub implementation."""
177
def list_shares(self):
178
"""Stub implementation."""
180
def list_volumes(self):
181
""" stub implementation """
184
def answer_share(self, share_id, answer):
185
"""Stub implementation."""
186
self.eq.push('AQ_ANSWER_SHARE_OK', share_id, answer)
188
def create_share(self, *args):
189
""" sutb implementation """
191
def inquire_free_space(self, share_id):
192
"""Stub implementation."""
194
def inquire_account_info(self):
195
"""Stub implementation."""
197
def delete_volume(self, volume_id):
198
"""Stub implementation."""
200
def change_public_access(self, share_id, node_id, is_public):
201
"""Stub implementation."""
105
204
class FakeMain(main.Main):
106
205
""" A fake Main class to setup the tests """
207
_fake_AQ_class = FakeActionQueue
108
210
# don't call Main.__init__ we take care of creating a fake main and
109
211
# all its attributes. pylint: disable-msg=W0231
110
212
def __init__(self, root_dir, shares_dir, data_dir, partials_dir):
341
444
return defer.succeed(True)
344
class FakeActionQueue(object):
345
"""Stub implementation."""
347
implements(interfaces.IActionQueue)
349
def __init__(self, eq, *args, **kwargs):
350
""" Creates the instance """
351
self.eq = self.event_queue = eq
353
self.downloading = {}
354
# pylint: disable-msg=C0103
355
class UUID_Map(object):
357
def set(self, *args):
358
"""mock set method"""
361
self.uuid_map = UUID_Map()
362
self.content_queue = action_queue.ContentQueue('CONTENT_QUEUE', self)
363
self.meta_queue = action_queue.RequestQueue('META_QUEUE', self)
365
# throttling attributes
366
self.readLimit = None
367
self.writeLimit = None
368
self.throttling_enabled = False
370
def connect(self, host=None, port=None, user_ssl=False):
371
"""Stub implementation."""
372
self.eq.push('SYS_CONNECTION_MADE')
374
def disconnect(self):
375
"""Stub implementation."""
377
def enable_throttling(self):
378
"""Stub implementation."""
379
self.throttling_enabled = True
381
def disable_throttling(self):
382
"""Stub implementation."""
383
self.throttling_enabled = False
385
def cancel_download(self, share_id, node_id):
386
"""Stub implementation."""
388
def cancel_upload(self, share_id, node_id):
389
"""Stub implementation."""
391
def download(self, share_id, node_id, server_hash, fileobj):
392
"""Stub implementation."""
394
def upload(self, share_id, node_id, previous_hash, hash, crc32,
395
size, deflated_size, fileobj):
396
"""Stub implementation."""
398
def make_file(self, share_id, parent_id, name, marker):
399
"""Stub implementation."""
401
def make_dir(self, share_id, parent_id, name, marker):
402
"""Stub implementation."""
404
def move(self, share_id, node_id, old_parent_id, new_parent_id, new_name):
405
"""Stub implementation."""
407
def unlink(self, share_id, node_id):
408
"""Stub implementation."""
410
def query(self, items):
411
"""Stub implementation."""
413
def listdir(self, share_id, node_id, server_hash, fileobj):
414
"""Stub implementation."""
416
def list_shares(self):
417
"""Stub implementation."""
419
def list_volumes(self):
420
""" stub implementation """
423
def answer_share(self, share_id, answer):
424
"""Stub implementation."""
425
self.eq.push('AQ_ANSWER_SHARE_OK', share_id, answer)
427
def create_share(self, *args):
428
""" sutb implementation """
430
def inquire_free_space(self, share_id):
431
"""Stub implementation."""
433
def inquire_account_info(self):
434
"""Stub implementation."""
436
def delete_volume(self, volume_id):
437
"""Stub implementation."""
439
def change_public_access(self, share_id, node_id, is_public):
440
"""Stub implementation."""
443
447
class MementoHandler(logging.Handler):
444
448
""" A handler class which store logging records in a list """