1
# -*- coding: utf-8 -*-
3
# Copyright 2009-2011 Canonical Ltd.
5
# This program is free software: you can redistribute it and/or modify it
6
# under the terms of the GNU General Public License version 3, as published
7
# by the Free Software Foundation.
9
# This program is distributed in the hope that it will be useful, but
10
# WITHOUT ANY WARRANTY; without even the implied warranties of
11
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
12
# PURPOSE. See the GNU General Public License for more details.
14
# You should have received a copy of the GNU General Public License along
15
# with this program. If not, see <http://www.gnu.org/licenses/>.
17
"""Tests for the syncdaemon u1sdtool script."""
21
from operator import itemgetter
22
from StringIO import StringIO
24
from twisted.internet import defer
26
from contrib.testing.testcase import (
31
from ubuntuone.syncdaemon.volume_manager import (
36
from ubuntuone.platform.tools import (
48
show_waiting_metadata,
50
from tests.platform.test_tools import TestToolsBase
53
class U1SDToolTests(TestToolsBase):
54
"""Tests for u1sdtool output"""
56
def test_show_shares_empty(self):
57
"""test the output of --list-shared """
59
d = self.tool.get_shares()
60
d.addCallback(lambda result: show_shares(result, out))
62
"""check the output"""
63
self.assertEqual('No shares\n', out.getvalue())
67
@defer.inlineCallbacks
68
def test_show_shares(self):
69
"""test the output of --list-shared """
71
share_path = os.path.join(self.shares_dir, u"ñoño".encode('utf-8'))
72
share = Share(path=share_path, name=u"ñoño", volume_id='share_id',
73
access_level=ACCESS_LEVEL_RO, other_username='fake_user',
74
other_visible_name=u"ñoño", accepted=False,
76
yield self.main.vm.add_share(share)
77
expected = u"Shares list:\n id=share_id name=\xf1o\xf1o " + \
78
"accepted=False subscribed=False access_level=View " + \
80
result = yield self.tool.get_shares()
81
show_shares(result, out)
82
self.assertEqual(out.getvalue(), expected)
84
def test_show_shared_empty(self):
85
"""test the output of --list-shared """
87
d = self.tool.list_shared()
88
d.addCallback(lambda result: show_shared(result, out))
90
"""check the output"""
91
self.assertEqual('No shared\n', out.getvalue())
95
def test_show_shared(self):
96
"""test the output of --list-shared """
97
path = os.path.join(self.root_dir, u"ñoño".encode('utf-8'))
98
self.fs.create(path, "")
99
self.fs.set_node_id(path, "node_id")
100
# helper function, pylint: disable-msg=C0111
101
def fake_create_share(node_id, user, name, access_level, marker, path):
102
self.main.vm.handle_AQ_CREATE_SHARE_OK(share_id='share_id',
104
self.main.action_q.create_share = fake_create_share
105
self.main.vm.create_share(path, 'fake_user', 'shared_name',
108
expected = u"Shared list:\n id=share_id name=shared_name " + \
109
"accepted=False access_level=View to=fake_user " + \
110
"path=%s\n" % path.decode('utf-8')
111
d = self.tool.list_shared()
112
d.addCallback(lambda result: show_shared(result, out))
114
"""check the output"""
115
self.assertEqual(out.getvalue(), expected)
119
def test_show_path_info_unicode(self):
120
"""test the output of --info with unicode paths """
121
return self.generic_test_show_path_info_unicode('utf-8')
123
def test_show_path_info_unicode_pipe(self):
124
"""test the output of --info with unicode paths going to e.g. a pipe"""
125
return self.generic_test_show_path_info_unicode(None)
127
def generic_test_show_path_info_unicode(self, encoding):
128
"""generic test for the output of --info with unicode paths """
129
path = os.path.join(self.root_dir, u"ñoño".encode('utf-8'))
130
mdid = self.fs.create(path, "")
131
self.fs.set_node_id(path, "uuid1")
132
mdobj = self.fs.get_by_mdid(mdid)
133
self.fs.create_partial(mdobj.node_id, mdobj.share_id)
134
fh = self.fs.get_partial_for_writing(mdobj.node_id,
138
self.fs.commit_partial(mdobj.node_id, mdobj.share_id, "localhash")
139
self.fs.remove_partial("uuid1", "")
141
if encoding is not None:
142
path = path.decode(encoding)
144
path = path.decode('utf-8')
146
d = self.tool.get_metadata(path)
148
out.encoding = encoding
149
expected = """ File: %(path_info)s
152
info_created: %(info_created)s
153
info_is_partial: %(info_is_partial)s
154
info_last_downloaded: %(info_last_downloaded)s
155
info_last_partial_created: %(info_last_partial_created)s
156
info_last_partial_removed: %(info_last_partial_removed)s
157
info_node_id_assigned: %(info_node_id_assigned)s
159
local_hash: %(local_hash)s
163
server_hash: %(server_hash)s
164
share_id: %(share_id)s
168
# the callback, pylint: disable-msg=C0111
169
def callback(result):
170
if encoding is not None:
171
result.update(dict(path_info=path))
173
result.update(dict(path_info=path))
174
for k, v in result.iteritems():
175
self.assertIsInstance(v, unicode)
176
value = expected % result
177
self.assertEqual(out.getvalue(), value)
178
# helper callback, pylint: disable-msg=C0111
180
show_path_info(result, path, out)
183
d.addCallback(callback)
186
def test_show_current_transfers_empty(self):
187
"""test the output of --current_transfers option """
189
d = self.tool.get_current_uploads()
190
d.addCallback(lambda result: show_uploads(result, out))
191
d.addCallback(lambda _: self.tool.get_current_downloads())
192
d.addCallback(lambda result: show_downloads(result, out))
193
expected = u'Current uploads: 0\nCurrent downloads: 0\n'
195
"""check the output"""
196
self.assertEqual(out.getvalue(), expected)
200
@defer.inlineCallbacks
201
def test_show_current_transfers(self):
202
"""Test the --current_transfers output with transfers in progress."""
204
fake_download = FakeDownload('share_id', 'down_node_id')
205
fake_download.deflated_size = 10
206
fake_download.n_bytes_read = 1
207
fake_download.path = "down_path"
208
self.action_q.queue.waiting.append(fake_download)
211
fake_upload = FakeUpload('share_id', 'node_id')
212
fake_upload.deflated_size = 100
213
fake_upload.n_bytes_written = 10
214
fake_upload.path = "up_path"
215
self.action_q.queue.waiting.append(fake_upload)
218
expected = u"Current uploads:\n path: up_path\n " + \
219
"deflated size: 100\n bytes written: 10\nCurrent " + \
220
"downloads:\n path: down_path\n deflated size: " + \
221
"10\n bytes read: 1\n"
222
result = yield self.tool.get_current_uploads()
223
show_uploads(result, out)
225
result = yield self.tool.get_current_downloads()
226
show_downloads(result, out)
228
self.assertEqual(out.getvalue(), expected)
230
def test_show_state(self):
231
"""test the output of --status """
234
"State: QUEUE_MANAGER",
235
"connection: With User With Network",
236
"description: processing the commands pool",
237
"is_connected: True",
242
d = self.tool.get_status()
243
d.addCallback(lambda result: show_state(result, out))
245
"""check the output"""
246
info = [x.strip() for x in out.getvalue().split("\n") if x.strip()]
247
self.assertEqual(info, expected)
251
@defer.inlineCallbacks
252
def test_show_free_space(self):
253
"""Test the output of --free-space"""
254
share_path = os.path.join(self.main.shares_dir, 'share')
255
share = Share(path=share_path, volume_id='vol_id')
256
yield self.main.vm.add_share(share)
257
self.main.vm.update_free_space('vol_id', 12345)
260
expected = "Free space: 12345 bytes\n"
262
result = yield self.tool.free_space('vol_id')
263
show_free_space(result, out)
264
self.assertEqual(out.getvalue(), expected)
266
@defer.inlineCallbacks
267
def test_show_waiting_simple(self):
268
"""Test the output of --waiting-metadata"""
269
# inject the fake data
270
cmd1 = FakeCommand("", "node1", path='foo')
272
cmd2 = FakeCommand("", "node2")
274
self.action_q.queue.waiting.extend([cmd1, cmd2])
278
" FakeCommand(running=True, share_id='', "
279
"node_id='node1', path='foo', other='')\n"
280
" FakeCommand(running=False, share_id='', "
281
"node_id='node2', other='')\n"
284
result = yield self.tool.waiting()
285
show_waiting(result, out)
286
self.assertEqual(out.getvalue(), expected)
288
@defer.inlineCallbacks
289
def test_show_waiting_metadata(self):
290
"""Test the output of --waiting-metadata"""
291
# inject the fake data
292
cmd1 = FakeCommand("", "node1", path='p')
293
cmd2 = FakeCommand("", "node2")
294
self.action_q.queue.waiting.extend([cmd1, cmd2])
298
"Warning: this option is deprecated! Use '--waiting' instead\n"
299
" FakeCommand(running=True, share_id='', node_id='node1', "
300
"path='p', other='')\n"
301
" FakeCommand(running=True, share_id='', node_id='node2', "
305
result = yield self.tool.waiting_metadata()
306
show_waiting_metadata(result, out)
307
self.assertEqual(out.getvalue(), expected)
309
@defer.inlineCallbacks
310
def test_show_waiting_content(self):
311
"""Test the output of --waiting-content"""
312
class FakeCommand2(FakeUpload):
313
"""Fake command that goes in content queue."""
314
def __init__(self, *args):
315
FakeUpload.__init__(self, *args)
316
self.path = 'other_path'
318
# inject the fake data
319
self.action_q.queue.waiting.extend([
320
FakeUpload("", "node_id"),
321
FakeCommand2("", "node_id_2"),
322
FakeDownload("", "node_id_1"),
324
FakeCommand2("", "node_id_3")])
327
u"Warning: this option is deprecated! Use '--waiting' instead\n"
328
u"operation='FakeUpload' node_id='node_id' share_id='' "
329
u"path='upload_path'\n"
330
u"operation='FakeCommand2' node_id='node_id_2' share_id='' "
331
u"path='other_path'\n"
332
u"operation='FakeDownload' node_id='node_id_1' share_id='' "
333
u"path='download_path'\n"
334
u"operation='FakeCommand2' node_id='node_id_3' share_id='' "
335
u"path='other_path'\n"
338
result = yield self.tool.waiting_content()
339
yield show_waiting_content(result, out)
340
self.assertEqual(out.getvalue(), expected)
342
def test_show_folders_empty(self):
343
"""test the output of --list-folders """
345
d = self.tool.get_folders()
346
d.addCallback(lambda result: show_folders(result, out))
348
"""check the output"""
349
self.assertEqual('No folders\n', out.getvalue())
353
@defer.inlineCallbacks
354
def test_show_folders_subscribed(self):
355
"""Test the output of --list-folders."""
357
path = u'ñoño'.encode('utf-8')
358
suggested_path = os.path.join("~", u'ñoño')
360
udf = UDF("folder_id", "node_id", suggested_path, path,
362
yield self.main.vm.add_udf(udf)
363
expected = u"Folder list:\n id=folder_id subscribed=True " + \
365
result = yield self.tool.get_folders()
366
show_folders(result, out)
367
self.assertEqual(out.getvalue(), expected)
369
@defer.inlineCallbacks
370
def test_show_folders_unsubscribed(self):
371
"""Test the output of --list-folders with a unsubscribed folder."""
373
path = u'ñoño'.encode('utf-8')
374
suggested_path = os.path.join("~", u'ñoño')
376
udf = UDF("folder_id", "node_id", suggested_path, path,
378
yield self.main.vm.add_udf(udf)
379
self.main.vm.unsubscribe_udf(udf.id)
380
expected = u"Folder list:\n id=folder_id subscribed=False " + \
382
result = yield self.tool.get_folders()
383
show_folders(result, out)
384
self.assertEqual(out.getvalue(), expected)
386
@defer.inlineCallbacks
387
def generic_show_dirty_nodes(self, encoding, empty=False):
388
"""Test dirty nodes output."""
390
path1 = os.path.join(self.root_dir, u'ñoño-1'.encode('utf-8'))
391
self.main.fs.create(path1, "")
392
path2 = os.path.join(self.root_dir, u'ñoño-2'.encode('utf-8'))
393
mdid2 = self.main.fs.create(path2, "")
394
path3 = os.path.join(self.root_dir, "path3")
395
self.main.fs.create(path3, "")
396
path4 = os.path.join(self.root_dir, "path4")
397
mdid4 = self.main.fs.create(path4, "")
401
self.main.fs.set_by_mdid(mdid2, dirty=True)
402
self.main.fs.set_by_mdid(mdid4, dirty=True)
403
dirty_nodes = yield self.tool.get_dirty_nodes()
405
out.encoding = encoding
407
dirty_nodes.sort(key=itemgetter('mdid'))
408
show_dirty_nodes(dirty_nodes, out)
409
node_line_tpl = "mdid: %(mdid)s volume_id: %(share_id)s " + \
410
"node_id: %(node_id)s is_dir: %(is_dir)s path: %(path)s\n"
412
expected = " Dirty nodes:\n%s"
414
for mdid in sorted([mdid4, mdid2]):
415
mdobj = self.main.fs.get_by_mdid(mdid)
417
if encoding is not None:
418
d['path'] = d['path'].decode(encoding)
420
d['path'] = d['path'].decode('utf-8')
421
lines.append(node_line_tpl % d)
422
value = expected % ''.join(lines)
424
value = " No dirty nodes.\n"
425
self.assertEqual(out.getvalue(), value)
427
def test_show_dirty_nodes(self):
428
"""Test show_dirty_nodes with unicode paths."""
429
return self.generic_show_dirty_nodes("utf-8")
431
def test_show_dirty_nodes_pipe(self):
432
"""Test show_dirty_nodes with unicode paths going to e.g: a pipe"""
433
return self.generic_show_dirty_nodes(None)
435
def test_show_dirty_nodes_empty(self):
436
"""Test show_dirty_nodes with no dirty nodes."""
437
return self.generic_show_dirty_nodes(None, empty=True)