~ubuntu-branches/ubuntu/oneiric/ubuntuone-client/oneiric

« back to all changes in this revision

Viewing changes to contrib/testing/testcase.py

  • Committer: Bazaar Package Importer
  • Author(s): Rodrigo Moya
  • Date: 2010-06-23 23:08:15 UTC
  • mto: This revision was merged to the branch mainline in revision 34.
  • Revision ID: james.westby@ubuntu.com-20100623230815-4m3ugh10u9x9xzw5
Tags: upstream-1.3.2
ImportĀ upstreamĀ versionĀ 1.3.2

Show diffs side-by-side

added added

removed removed

Lines of Context:
25
25
import shutil
26
26
import itertools
27
27
 
28
 
from ubuntuone.oauthdesktop.main import Login, LoginProcessor
 
28
from ubuntu_sso.main import Login, LoginProcessor
29
29
from ubuntuone.syncdaemon import (
30
30
    config,
31
31
    action_queue,
102
102
        """...that only knows how to go away"""
103
103
 
104
104
 
 
105
class FakeActionQueue(object):
 
106
    """Stub implementation."""
 
107
 
 
108
    implements(interfaces.IActionQueue)
 
109
 
 
110
    def __init__(self, eq, *args, **kwargs):
 
111
        """ Creates the instance """
 
112
        self.eq = self.event_queue = eq
 
113
        self.uploading = {}
 
114
        self.downloading = {}
 
115
        # pylint: disable-msg=C0103
 
116
        class UUID_Map(object):
 
117
            """mock uuid map"""
 
118
            def set(self, *args):
 
119
                """mock set method"""
 
120
                pass
 
121
 
 
122
        self.uuid_map = UUID_Map()
 
123
        self.content_queue = action_queue.ContentQueue('CONTENT_QUEUE', self)
 
124
        self.meta_queue = action_queue.RequestQueue('META_QUEUE', self)
 
125
 
 
126
        # throttling attributes
 
127
        self.readLimit = None
 
128
        self.writeLimit = None
 
129
        self.throttling_enabled = False
 
130
 
 
131
    def connect(self, host=None, port=None, user_ssl=False):
 
132
        """Stub implementation."""
 
133
        self.eq.push('SYS_CONNECTION_MADE')
 
134
 
 
135
    def disconnect(self):
 
136
        """Stub implementation."""
 
137
 
 
138
    def enable_throttling(self):
 
139
        """Stub implementation."""
 
140
        self.throttling_enabled = True
 
141
 
 
142
    def disable_throttling(self):
 
143
        """Stub implementation."""
 
144
        self.throttling_enabled = False
 
145
 
 
146
    def cancel_download(self, share_id, node_id):
 
147
        """Stub implementation."""
 
148
 
 
149
    def cancel_upload(self, share_id, node_id):
 
150
        """Stub implementation."""
 
151
 
 
152
    def download(self, share_id, node_id, server_hash, fileobj):
 
153
        """Stub implementation."""
 
154
 
 
155
    def upload(self, share_id, node_id, previous_hash, hash, crc32,
 
156
               size, deflated_size, fileobj):
 
157
        """Stub implementation."""
 
158
 
 
159
    def make_file(self, share_id, parent_id, name, marker):
 
160
        """Stub implementation."""
 
161
 
 
162
    def make_dir(self, share_id, parent_id, name, marker):
 
163
        """Stub implementation."""
 
164
 
 
165
    def move(self, share_id, node_id, old_parent_id, new_parent_id, new_name):
 
166
        """Stub implementation."""
 
167
 
 
168
    def unlink(self, share_id, node_id):
 
169
        """Stub implementation."""
 
170
 
 
171
    def query(self, items):
 
172
        """Stub implementation."""
 
173
 
 
174
    def listdir(self, share_id, node_id, server_hash, fileobj):
 
175
        """Stub implementation."""
 
176
 
 
177
    def list_shares(self):
 
178
        """Stub implementation."""
 
179
 
 
180
    def list_volumes(self):
 
181
        """ stub implementation """
 
182
        pass
 
183
 
 
184
    def answer_share(self, share_id, answer):
 
185
        """Stub implementation."""
 
186
        self.eq.push('AQ_ANSWER_SHARE_OK', share_id, answer)
 
187
 
 
188
    def create_share(self, *args):
 
189
        """ sutb implementation """
 
190
 
 
191
    def inquire_free_space(self, share_id):
 
192
        """Stub implementation."""
 
193
 
 
194
    def inquire_account_info(self):
 
195
        """Stub implementation."""
 
196
 
 
197
    def delete_volume(self, volume_id):
 
198
        """Stub implementation."""
 
199
 
 
200
    def change_public_access(self, share_id, node_id, is_public):
 
201
        """Stub implementation."""
 
202
 
 
203
 
105
204
class FakeMain(main.Main):
106
205
    """ A fake Main class to setup the tests """
107
206
 
 
207
    _fake_AQ_class = FakeActionQueue
 
208
    _fake_AQ_params = ()
 
209
 
108
210
    # don't call Main.__init__ we take care of creating a fake main and
109
211
    # all its attributes. pylint: disable-msg=W0231
110
212
    def __init__(self, root_dir, shares_dir, data_dir, partials_dir):
122
224
                                               self.partials_dir, self.vm)
123
225
        self.event_q = event_queue.EventQueue(self.fs)
124
226
        self.fs.register_eq(self.event_q)
125
 
        self.action_q = FakeActionQueue(self.event_q, main=self)
 
227
        self.action_q = self._fake_AQ_class(self.event_q, self,
 
228
                                            *self._fake_AQ_params)
126
229
        self.state_manager = main.StateManager(self, 2)
127
230
        self.event_q.subscribe(self.vm)
128
231
        self.vm.init_root()
341
444
            return defer.succeed(True)
342
445
 
343
446
 
344
 
class FakeActionQueue(object):
345
 
    """Stub implementation."""
346
 
 
347
 
    implements(interfaces.IActionQueue)
348
 
 
349
 
    def __init__(self, eq, *args, **kwargs):
350
 
        """ Creates the instance """
351
 
        self.eq = self.event_queue = eq
352
 
        self.uploading = {}
353
 
        self.downloading = {}
354
 
        # pylint: disable-msg=C0103
355
 
        class UUID_Map(object):
356
 
            """mock uuid map"""
357
 
            def set(self, *args):
358
 
                """mock set method"""
359
 
                pass
360
 
 
361
 
        self.uuid_map = UUID_Map()
362
 
        self.content_queue = action_queue.ContentQueue('CONTENT_QUEUE', self)
363
 
        self.meta_queue = action_queue.RequestQueue('META_QUEUE', self)
364
 
 
365
 
        # throttling attributes
366
 
        self.readLimit = None
367
 
        self.writeLimit = None
368
 
        self.throttling_enabled = False
369
 
 
370
 
    def connect(self, host=None, port=None, user_ssl=False):
371
 
        """Stub implementation."""
372
 
        self.eq.push('SYS_CONNECTION_MADE')
373
 
 
374
 
    def disconnect(self):
375
 
        """Stub implementation."""
376
 
 
377
 
    def enable_throttling(self):
378
 
        """Stub implementation."""
379
 
        self.throttling_enabled = True
380
 
 
381
 
    def disable_throttling(self):
382
 
        """Stub implementation."""
383
 
        self.throttling_enabled = False
384
 
 
385
 
    def cancel_download(self, share_id, node_id):
386
 
        """Stub implementation."""
387
 
 
388
 
    def cancel_upload(self, share_id, node_id):
389
 
        """Stub implementation."""
390
 
 
391
 
    def download(self, share_id, node_id, server_hash, fileobj):
392
 
        """Stub implementation."""
393
 
 
394
 
    def upload(self, share_id, node_id, previous_hash, hash, crc32,
395
 
               size, deflated_size, fileobj):
396
 
        """Stub implementation."""
397
 
 
398
 
    def make_file(self, share_id, parent_id, name, marker):
399
 
        """Stub implementation."""
400
 
 
401
 
    def make_dir(self, share_id, parent_id, name, marker):
402
 
        """Stub implementation."""
403
 
 
404
 
    def move(self, share_id, node_id, old_parent_id, new_parent_id, new_name):
405
 
        """Stub implementation."""
406
 
 
407
 
    def unlink(self, share_id, node_id):
408
 
        """Stub implementation."""
409
 
 
410
 
    def query(self, items):
411
 
        """Stub implementation."""
412
 
 
413
 
    def listdir(self, share_id, node_id, server_hash, fileobj):
414
 
        """Stub implementation."""
415
 
 
416
 
    def list_shares(self):
417
 
        """Stub implementation."""
418
 
 
419
 
    def list_volumes(self):
420
 
        """ stub implementation """
421
 
        pass
422
 
 
423
 
    def answer_share(self, share_id, answer):
424
 
        """Stub implementation."""
425
 
        self.eq.push('AQ_ANSWER_SHARE_OK', share_id, answer)
426
 
 
427
 
    def create_share(self, *args):
428
 
        """ sutb implementation """
429
 
 
430
 
    def inquire_free_space(self, share_id):
431
 
        """Stub implementation."""
432
 
 
433
 
    def inquire_account_info(self):
434
 
        """Stub implementation."""
435
 
 
436
 
    def delete_volume(self, volume_id):
437
 
        """Stub implementation."""
438
 
 
439
 
    def change_public_access(self, share_id, node_id, is_public):
440
 
        """Stub implementation."""
441
 
 
442
 
 
443
447
class MementoHandler(logging.Handler):
444
448
    """ A handler class which store logging records in a list """
445
449