1
# -*- coding: utf-8 -*-
3
# Author: Guillermo Gonzalez <guillermo.gonzalez@canonical.com>
5
# Copyright 2009 Canonical Ltd.
7
# This program is free software: you can redistribute it and/or modify it
8
# under the terms of the GNU General Public License version 3, as published
9
# by the Free Software Foundation.
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranties of
13
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
14
# PURPOSE. See the GNU General Public License for more details.
16
# You should have received a copy of the GNU General Public License along
17
# with this program. If not, see <http://www.gnu.org/licenses/>.
18
"""tests for the syncdaemon u1sdtool script """
20
from operator import itemgetter
21
from StringIO import StringIO
22
from tests.platform.linux.test_tools import TestToolsBase, FakeCommand
23
from ubuntuone.syncdaemon.action_queue import Upload
24
from ubuntuone.syncdaemon.volume_manager import Share, UDF
25
from ubuntuone.platform.linux.tools import (
35
show_waiting_metadata,
37
from twisted.internet import defer
40
class U1SDToolTests(TestToolsBase):
41
"""Tests for u1sdtool output"""
43
def test_show_shares_empty(self):
44
"""test the output of --list-shared """
46
d = self.tool.get_shares()
47
d.addCallback(lambda result: show_shares(result, out))
49
"""check the output"""
50
self.assertEquals('No shares\n', out.getvalue())
54
def test_show_shares(self):
55
"""test the output of --list-shared """
57
share_path = os.path.join(self.shares_dir, u"ñoño".decode('utf-8'))
58
share = Share(path=share_path, name=u"ñoño", volume_id='share_id',
59
access_level='View', other_username='fake_user',
60
other_visible_name=u"ñoño")
61
self.main.vm.add_share(share)
62
expected = u"Shares list:\n id=share_id name=\xf1o\xf1o " + \
63
"accepted=False access_level=View from=fake_user\n"
64
d = self.tool.get_shares()
65
d.addCallback(lambda result: show_shares(result, out))
67
"""check the output"""
68
self.assertEquals(out.getvalue(), expected)
72
def test_show_shared_empty(self):
73
"""test the output of --list-shared """
75
d = self.tool.list_shared()
76
d.addCallback(lambda result: show_shared(result, out))
78
"""check the output"""
79
self.assertEquals('No shared\n', out.getvalue())
83
def test_show_shared(self):
84
"""test the output of --list-shared """
85
path = os.path.join(self.root_dir, "ñoño")
86
self.fs_manager.create(path, "")
87
self.fs_manager.set_node_id(path, "node_id")
88
# helper function, pylint: disable-msg=C0111
89
def fake_create_share(node_id, user, name, access_level, marker, path):
90
self.main.vm.handle_AQ_CREATE_SHARE_OK(share_id='share_id',
92
self.main.action_q.create_share = fake_create_share
93
self.main.vm.create_share(path, 'fake_user', 'shared_name', 'View')
95
expected = u"Shared list:\n id=share_id name=shared_name " + \
96
"accepted=False access_level=View to=fake_user " + \
97
"path=%s\n" % path.decode('utf-8')
98
d = self.tool.list_shared()
99
d.addCallback(lambda result: show_shared(result, out))
101
"""check the output"""
102
self.assertEquals(out.getvalue(), expected)
106
def test_show_path_info_unicode(self):
107
"""test the output of --info with unicode paths """
108
return self.generic_test_show_path_info_unicode('utf-8')
110
def test_show_path_info_unicode_pipe(self):
111
"""test the output of --info with unicode paths going to e.g. a pipe"""
112
return self.generic_test_show_path_info_unicode(None)
114
def generic_test_show_path_info_unicode(self, encoding):
115
"""generic test for the output of --info with unicode paths """
116
path = os.path.join(self.root_dir, "ñoño")
117
mdid = self.fs_manager.create(path, "")
118
self.fs_manager.set_node_id(path, "uuid1")
119
mdobj = self.fs_manager.get_by_mdid(mdid)
120
self.fs_manager.create_partial(mdobj.node_id, mdobj.share_id)
121
fh = self.fs_manager.get_partial_for_writing(mdobj.node_id,
125
self.fs_manager.commit_partial(mdobj.node_id, mdobj.share_id,
127
self.fs_manager.remove_partial("uuid1", "")
128
d = self.tool.get_metadata(path)
130
out.encoding = encoding
131
expected = """ File: %(path_info)s
134
info_created: %(info_created)s
135
info_is_partial: %(info_is_partial)s
136
info_last_downloaded: %(info_last_downloaded)s
137
info_last_partial_created: %(info_last_partial_created)s
138
info_last_partial_removed: %(info_last_partial_removed)s
139
info_node_id_assigned: %(info_node_id_assigned)s
141
local_hash: %(local_hash)s
145
server_hash: %(server_hash)s
146
share_id: %(share_id)s
150
# the callback, pylint: disable-msg=C0111
151
def callback(result):
152
if encoding is not None:
153
result.update(dict(path_info=path.decode(encoding)))
155
result.update(dict(path_info=path))
156
for k, v in result.copy().items():
157
result[k] = v.decode('utf-8')
158
value = expected % result
159
self.assertEquals(out.getvalue(), value)
160
# helper callback, pylint: disable-msg=C0111
162
show_path_info(result, path, out)
165
d.addCallback(callback)
168
def test_show_current_transfers_empty(self):
169
"""test the output of --current_transfers option """
171
d = self.tool.get_current_uploads()
172
d.addCallback(lambda result: show_uploads(result, out))
173
d.addCallback(lambda _: self.tool.get_current_downloads())
174
d.addCallback(lambda result: show_downloads(result, out))
175
expected = u'Current uploads: 0\nCurrent downloads: 0\n'
177
"""check the output"""
178
self.assertEquals(out.getvalue(), expected)
182
def test_show_current_transfers(self):
183
"""test the output of --current_transfers option with transfers in
186
share_path = os.path.join(self.shares_dir, 'share')
187
self.main.vm.add_share(Share(path=share_path, volume_id='share_id'))
189
down_path = os.path.join(share_path, "down_path")
190
self.fs_manager.create(down_path, "share_id")
191
self.fs_manager.set_node_id(down_path, "down_node_id")
192
self.action_q.downloading[('share_id', 'down_node_id')] = dict(
193
deflated_size=10, size=100, n_bytes_read=1)
195
up_path = os.path.join(share_path, "up_path")
196
self.fs_manager.create(up_path, "share_id")
197
self.fs_manager.set_node_id(up_path, "node_id")
198
self.action_q.uploading[('share_id', 'node_id')] = dict(
199
deflated_size=100, n_bytes_written=10)
201
expected = u"Current uploads:\n path: %(up_path)s\n " + \
202
"deflated size: 100\n bytes written: 10\nCurrent " + \
203
"downloads:\n path: %(down_path)s\n deflated size: " + \
204
"10\n bytes read: 1\n"
205
expected = expected % dict(up_path=up_path, down_path=down_path)
206
d = self.tool.get_current_uploads()
207
d.addCallback(lambda result: show_uploads(result, out))
208
d.addCallback(lambda _: self.tool.get_current_downloads())
209
d.addCallback(lambda result: show_downloads(result, out))
211
"""check the output"""
212
self.assertEquals(out.getvalue(), expected)
216
def test_show_state(self):
217
"""test the output of --status """
220
"State: QUEUE_MANAGER",
221
"connection: With User With Network",
222
"description: processing the commands pool",
223
"is_connected: True",
228
d = self.tool.get_status()
229
d.addCallback(lambda result: show_state(result, out))
231
"""check the output"""
232
info = [x.strip() for x in out.getvalue().split("\n") if x.strip()]
233
self.assertEquals(info, expected)
237
@defer.inlineCallbacks
238
def test_show_waiting_metadata(self):
239
"""Test the output of --waiting-metadata"""
240
path_1 = os.path.join(self.root_dir, "bar")
241
self.fs_manager.create(path_1, "")
242
self.fs_manager.set_node_id(path_1, "node_id_1")
243
path_2 = os.path.join(self.root_dir, "foo_1")
244
self.fs_manager.create(path_2, "")
245
self.fs_manager.set_node_id(path_2, "node_id_2")
247
# inject the fake data
248
cmd1 = FakeCommand("", "node_id_1")
249
cmd2 = FakeCommand("", "node_id_2")
250
self.action_q.queue.waiting.extend([cmd1, cmd2])
252
temp = "FakeCommand(share_id='', node_id='%s', path='%s', other='')"
253
exp1 = temp % ("node_id_1", path_1)
254
exp2 = temp % ("node_id_2", path_2)
255
expected = " " + exp1 + "\n "+ exp2 + "\n"
257
result = yield self.tool.waiting_metadata()
258
show_waiting_metadata(result, out)
259
self.assertEquals(out.getvalue(), expected)
261
def test_show_waiting_content(self):
262
"""Test the output of --waiting-content"""
263
class FakeCommand1(FakeCommand, Upload):
264
"""Fake command that goes in content queue."""
265
def __init__(self, *args):
266
FakeCommand.__init__(self, *args)
268
class FakeCommand2(FakeCommand1):
269
"""Fake command that goes in content queue."""
271
path = os.path.join(self.root_dir, "foo")
272
self.fs_manager.create(path, "")
273
self.fs_manager.set_node_id(path, "node_id")
274
path_1 = os.path.join(self.root_dir, "bar")
275
self.fs_manager.create(path_1, "")
276
self.fs_manager.set_node_id(path_1, "node_id_1")
277
path_2 = os.path.join(self.root_dir, "foo_1")
278
self.fs_manager.create(path_2, "")
279
self.fs_manager.set_node_id(path_2, "node_id_2")
280
path_3 = os.path.join(self.root_dir, "bar_1")
281
self.fs_manager.create(path_3, "")
282
self.fs_manager.set_node_id(path_3, "node_id_3")
283
# inject the fake data
284
self.action_q.queue.waiting.extend([
285
FakeCommand1("", "node_id"),
286
FakeCommand2("", "node_id_2"),
287
FakeCommand1("", "node_id_1"),
288
FakeCommand2("", "node_id_3")])
290
expected = u"operation='FakeCommand1' node_id='node_id' share_id='' " + \
291
"path='%(path)s'\n" + \
292
"operation='FakeCommand2' node_id='node_id_2' share_id='' " + \
293
"path='%(path_2)s'\n" + \
294
"operation='FakeCommand1' node_id='node_id_1' share_id='' " + \
295
"path='%(path_1)s'\n" + \
296
"operation='FakeCommand2' node_id='node_id_3' share_id=''" + \
297
" path='%(path_3)s'\n"
298
expected = expected % dict(path=path, path_1=path_1,
299
path_2=path_2, path_3=path_3)
300
d = self.tool.waiting_content()
301
d.addCallback(lambda result: show_waiting_content(result, out))
303
"""check the output"""
304
self.assertEquals(out.getvalue(), expected)
308
def test_show_folders_empty(self):
309
"""test the output of --list-folders """
311
d = self.tool.get_folders()
312
d.addCallback(lambda result: show_folders(result, out))
314
"""check the output"""
315
self.assertEquals('No folders\n', out.getvalue())
319
@defer.inlineCallbacks
320
def test_show_folders_subscribed(self):
321
"""Test the output of --list-folders."""
323
path = u'ñoño'.encode('utf-8')
324
suggested_path = os.path.join("~", u'ñoño')
326
udf = UDF("folder_id", "node_id", suggested_path, path,
328
yield self.main.vm.add_udf(udf)
329
expected = u"Folder list:\n id=folder_id subscribed=True " + \
331
result = yield self.tool.get_folders()
332
show_folders(result, out)
333
self.assertEquals(out.getvalue(), expected)
335
@defer.inlineCallbacks
336
def test_show_folders_unsubscribed(self):
337
"""Test the output of --list-folders with a unsubscribed folder."""
339
path = u'ñoño'.encode('utf-8')
340
suggested_path = os.path.join("~", u'ñoño')
342
udf = UDF("folder_id", "node_id", suggested_path, path,
344
yield self.main.vm.add_udf(udf)
345
self.main.vm.unsubscribe_udf(udf.id)
346
expected = u"Folder list:\n id=folder_id subscribed=False " + \
348
result = yield self.tool.get_folders()
349
show_folders(result, out)
350
self.assertEquals(out.getvalue(), expected)
352
@defer.inlineCallbacks
353
def generic_show_dirty_nodes(self, encoding, empty=False):
354
"""Test dirty nodes output."""
356
path1 = os.path.join(self.root_dir, u'ñoño-1'.encode('utf-8'))
357
self.main.fs.create(path1, "")
358
path2 = os.path.join(self.root_dir, u'ñoño-2'.encode('utf-8'))
359
mdid2 = self.main.fs.create(path2, "")
360
path3 = os.path.join(self.root_dir, "path3")
361
self.main.fs.create(path3, "")
362
path4 = os.path.join(self.root_dir, "path4")
363
mdid4 = self.main.fs.create(path4, "")
367
self.main.fs.set_by_mdid(mdid2, dirty=True)
368
self.main.fs.set_by_mdid(mdid4, dirty=True)
369
dirty_nodes = yield self.tool.get_dirty_nodes()
371
out.encoding = encoding
373
dirty_nodes.sort(key=itemgetter('mdid'))
374
show_dirty_nodes(dirty_nodes, out)
375
node_line_tpl = "mdid: %(mdid)s volume_id: %(share_id)s " + \
376
"node_id: %(node_id)s is_dir: %(is_dir)s path: %(path)s\n"
378
expected = """ Dirty nodes:
381
for mdid in sorted([mdid4, mdid2]):
382
mdobj = self.main.fs.get_by_mdid(mdid)
383
lines.append(node_line_tpl % mdobj.__dict__)
384
value = expected % ''.join(lines)
386
value = """ No dirty nodes.\n"""
387
self.assertEquals(out.getvalue(), value)
389
def test_show_dirty_nodes(self):
390
"""Test show_dirty_nodes with unicode paths."""
391
return self.generic_show_dirty_nodes("utf-8")
393
def test_show_dirty_nodes_pipe(self):
394
"""Test show_dirty_nodes with unicode paths going to e.g: a pipe"""
395
return self.generic_show_dirty_nodes(None)
397
def test_show_dirty_nodes_empty(self):
398
"""Test show_dirty_nodes with no dirty nodes."""
399
return self.generic_show_dirty_nodes(None, empty=True)