1
# -*- coding: utf-8 -*-
3
# tests.syncdaemon.test_event_logging - test logging ZG events
5
# Author: Alejandro J. Cura <alecu@canonical.com>
7
# Copyright 2010 Canonical Ltd.
9
# This program is free software: you can redistribute it and/or modify it
10
# under the terms of the GNU General Public License version 3, as published
11
# by the Free Software Foundation.
13
# This program is distributed in the hope that it will be useful, but
14
# WITHOUT ANY WARRANTY; without even the implied warranties of
15
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
16
# PURPOSE. See the GNU General Public License for more details.
18
# You should have received a copy of the GNU General Public License along
19
# with this program. If not, see <http://www.gnu.org/licenses/>.
20
"""Test the event logging from SyncDaemon into Zeitgeist."""
27
from twisted.internet import defer
28
from zeitgeist.datamodel import Interpretation, Manifestation
30
from contrib.testing.testcase import (
31
FakeMain, FakeMainTestCase, BaseTwistedTestCase, MementoHandler)
32
from ubuntuone.platform.linux import get_udf_path
33
from ubuntuone.storageprotocol import client, delta
34
from ubuntuone.storageprotocol.request import ROOT
35
from ubuntuone.storageprotocol.sharersp import NotifyShareHolder
36
from ubuntuone.syncdaemon.action_queue import (
37
RequestQueue, Upload, MakeFile, MakeDir)
38
from ubuntuone.syncdaemon.event_logging import (
39
zglog, ZeitgeistListener, ACTOR_UBUNTUONE,
40
EVENT_INTERPRETATION_U1_FOLDER_SHARED,
41
EVENT_INTERPRETATION_U1_FOLDER_UNSHARED,
42
EVENT_INTERPRETATION_U1_SHARE_ACCEPTED,
43
EVENT_INTERPRETATION_U1_SHARE_UNACCEPTED,
44
EVENT_INTERPRETATION_U1_CONFLICT_RENAME,
45
EVENT_INTERPRETATION_U1_UDF_CREATED,
46
EVENT_INTERPRETATION_U1_UDF_DELETED,
47
EVENT_INTERPRETATION_U1_UDF_SUBSCRIBED,
48
EVENT_INTERPRETATION_U1_UDF_UNSUBSCRIBED,
49
MANIFESTATION_U1_CONTACT_DATA_OBJECT, DIRECTORY_MIMETYPE,
50
INTERPRETATION_U1_CONTACT, URI_PROTOCOL_U1,
51
STORAGE_DELETED, STORAGE_NETWORK, STORAGE_LOCAL)
52
from ubuntuone.syncdaemon.sync import Sync
53
from ubuntuone.syncdaemon.volume_manager import Share, Shared, UDF
54
from test_action_queue import ConnectedBaseTestCase
56
VOLUME = uuid.UUID('12345678-1234-1234-1234-123456789abc')
59
class MockLogger(object):
60
"""A mock logger that stores whatever is logged into it."""
63
"""Initialize this instance."""
68
self.events.append(event)
70
def listen_for(event_q, event, callback, count=1, collect=False):
71
"""Setup a EQ listener for the specified event."""
72
class Listener(object):
73
"""A basic listener to handle the pushed event."""
79
def _handle_event(self, *args, **kwargs):
82
self.events.append((args, kwargs))
83
if self.hits == count:
84
event_q.unsubscribe(self)
88
callback((args, kwargs))
93
setattr(listener, 'handle_'+event, listener._handle_event)
94
event_q.subscribe(listener)
98
class ZeitgeistListenerTestCase(FakeMainTestCase):
99
"""Tests for ZeitgeistListener."""
102
"""Initialize this instance."""
103
super(ZeitgeistListenerTestCase, self).setUp()
104
self.patch(zglog, "ZeitgeistLogger", MockLogger)
105
self.listener = ZeitgeistListener(self.fs, self.vm)
106
self.event_q.subscribe(self.listener)
108
def _listen_for(self, *args, **kwargs):
109
return listen_for(self.main.event_q, *args, **kwargs)
112
class ZeitgeistSharesTestCase(ZeitgeistListenerTestCase):
113
"""Tests for all Share-related zeitgeist events."""
115
def test_share_created_with_username_is_logged(self):
116
"""A ShareCreated event is logged."""
117
fake_username = "fake user"
118
path = os.path.join(self.vm.root.path, 'shared_path')
119
sample_node_id = "node id"
120
self.main.fs.create(path, "")
121
self.main.fs.set_node_id(path, sample_node_id)
123
def fake_create_share(node_id, user, name, access_level, marker):
124
"""Fake the creation of the share on the server."""
125
self.assertIn(marker, self.vm.marker_share_map)
126
share_id = self.fs.get_by_mdid(marker).share_id
127
self.main.event_q.push('AQ_CREATE_SHARE_OK',
131
self.patch(self.main.action_q, "create_share", fake_create_share)
132
self.vm.create_share(path, fake_username, 'shared_name', 'View')
134
self.assert_folder_shared_is_logged(path, fake_username)
136
def test_share_created_with_email_is_logged(self):
137
"""A ShareCreated event is logged."""
138
fake_username = "fakeuser@somewhere.com"
139
path = os.path.join(self.vm.root.path, 'shared_path')
140
sample_node_id = "node id"
141
self.main.fs.create(path, "")
142
self.main.fs.set_node_id(path, sample_node_id)
144
def fake_create_share(node_id, user, name, access_level, marker):
145
"""Fake the creation of the share on the server."""
146
self.assertIn(marker, self.vm.marker_share_map)
147
self.main.event_q.push('AQ_SHARE_INVITATION_SENT',
150
self.patch(self.main.action_q, "create_share", fake_create_share)
151
self.vm.create_share(path, fake_username, 'shared_name', 'View')
153
self.assert_folder_shared_is_logged(path, fake_username)
155
def assert_folder_shared_is_logged(self, path, fake_username):
156
"""Assert that the FolderShared event was logged."""
158
self.assertEqual(len(self.listener.zg.events), 1)
159
event = self.listener.zg.events[0]
161
self.assertEqual(event.interpretation,
162
EVENT_INTERPRETATION_U1_FOLDER_SHARED)
163
self.assertEqual(event.manifestation,
164
Manifestation.USER_ACTIVITY)
165
self.assertEqual(event.actor, ACTOR_UBUNTUONE)
167
folder = event.subjects[0]
168
self.assertTrue(folder.uri.startswith(URI_PROTOCOL_U1))
169
self.assertEqual(folder.interpretation, Interpretation.FOLDER)
170
self.assertEqual(folder.manifestation,
171
Manifestation.REMOTE_DATA_OBJECT)
172
self.assertTrue(folder.origin.endswith(path))
173
self.assertEqual(folder.mimetype, DIRECTORY_MIMETYPE)
174
self.assertEqual(folder.storage, STORAGE_NETWORK)
176
other_user = event.subjects[1]
177
self.assertEqual(other_user.uri, "mailto:" + fake_username)
178
self.assertEqual(other_user.interpretation, INTERPRETATION_U1_CONTACT)
179
self.assertEqual(other_user.manifestation,
180
MANIFESTATION_U1_CONTACT_DATA_OBJECT)
183
@defer.inlineCallbacks
184
def test_share_deleted_is_logged(self):
185
"""Test VolumeManager.delete_share."""
186
sample_share_id = "share id"
187
sample_node_id = "node id"
188
fake_username = "fake user"
189
path = os.path.join(self.vm.root.path, 'shared_path')
190
self.main.fs.create(path, "")
191
self.main.fs.set_node_id(path, sample_node_id)
192
share = Shared(path=path, volume_id=sample_share_id,
193
node_id=sample_node_id, other_username=fake_username)
194
self.vm.add_shared(share)
196
def fake_delete_share(share_id):
197
"""Fake delete_share."""
198
self.assertEqual(share_id, share.volume_id)
199
self.main.event_q.push('AQ_DELETE_SHARE_OK', share_id)
201
self.patch(self.main.action_q, 'delete_share', fake_delete_share)
203
self._listen_for('VM_SHARE_DELETED', d.callback, 1, collect=True)
204
self.vm.delete_share(share.volume_id)
207
self.assertEqual(len(self.listener.zg.events), 1)
208
event = self.listener.zg.events[0]
210
self.assertEqual(event.interpretation,
211
EVENT_INTERPRETATION_U1_FOLDER_UNSHARED)
212
self.assertEqual(event.manifestation,
213
Manifestation.USER_ACTIVITY)
214
self.assertEqual(event.actor, ACTOR_UBUNTUONE)
216
folder = event.subjects[0]
217
self.assertTrue(folder.uri.startswith(URI_PROTOCOL_U1))
218
self.assertEqual(folder.interpretation, Interpretation.FOLDER)
219
self.assertEqual(folder.manifestation,
220
Manifestation.REMOTE_DATA_OBJECT)
221
self.assertTrue(folder.origin.endswith(path))
222
self.assertEqual(folder.mimetype, DIRECTORY_MIMETYPE)
223
self.assertEqual(folder.storage, STORAGE_NETWORK)
225
other_user = event.subjects[1]
226
self.assertEqual(other_user.uri, "mailto:" + fake_username)
227
self.assertEqual(other_user.interpretation, INTERPRETATION_U1_CONTACT)
228
self.assertEqual(other_user.manifestation,
229
MANIFESTATION_U1_CONTACT_DATA_OBJECT)
231
def test_share_accepted_is_logged(self):
232
"""Test that an accepted share event is logged."""
233
# initialize the the root
234
self.vm._got_root('root_uuid')
235
fake_username = "fake user"
236
path = os.path.join(self.vm.root.path, 'shared_path')
237
self.main.fs.create(path, "")
238
share_path = os.path.join(self.shares_dir, 'fake_share')
239
share = Share(path=share_path, volume_id='volume_id', node_id="node_id",
240
other_username=fake_username)
241
self.vm.add_share(share)
243
self.assertEqual(len(self.listener.zg.events), 1)
244
event = self.listener.zg.events[0]
246
self.assertEqual(event.interpretation,
247
EVENT_INTERPRETATION_U1_SHARE_ACCEPTED)
248
self.assertEqual(event.manifestation,
249
Manifestation.USER_ACTIVITY)
250
self.assertEqual(event.actor, ACTOR_UBUNTUONE)
252
folder = event.subjects[0]
253
self.assertTrue(folder.uri.startswith(URI_PROTOCOL_U1))
254
self.assertEqual(folder.interpretation, Interpretation.FOLDER)
255
self.assertEqual(folder.manifestation,
256
Manifestation.REMOTE_DATA_OBJECT)
257
self.assertTrue(folder.origin.endswith(share_path))
258
self.assertEqual(folder.mimetype, DIRECTORY_MIMETYPE)
259
self.assertEqual(folder.storage, STORAGE_NETWORK)
261
other_user = event.subjects[1]
262
self.assertEqual(other_user.uri, "mailto:" + fake_username)
263
self.assertEqual(other_user.interpretation, INTERPRETATION_U1_CONTACT)
264
self.assertEqual(other_user.manifestation,
265
MANIFESTATION_U1_CONTACT_DATA_OBJECT)
268
@defer.inlineCallbacks
269
def test_share_unaccepted_is_logged(self):
270
"""Test that an unaccepted share event is logged."""
271
fake_username = "fake user"
274
share_path = os.path.join(self.main.shares_dir, 'share')
275
holder = NotifyShareHolder.from_params(uuid.uuid4(),
279
u'visible_name', 'Read')
281
self.main.vm.add_share(Share.from_notify_holder(holder, share_path))
282
self._listen_for('VM_VOLUME_DELETED', d.callback, 1, collect=True)
283
self.main.event_q.push('SV_SHARE_DELETED', holder.share_id)
286
self.assertEqual(len(self.listener.zg.events), 2)
287
event = self.listener.zg.events[1]
289
self.assertEqual(event.interpretation,
290
EVENT_INTERPRETATION_U1_SHARE_UNACCEPTED)
291
self.assertEqual(event.manifestation,
292
Manifestation.USER_ACTIVITY)
293
self.assertEqual(event.actor, ACTOR_UBUNTUONE)
295
folder = event.subjects[0]
296
self.assertTrue(folder.uri.startswith(URI_PROTOCOL_U1))
297
self.assertEqual(folder.interpretation, Interpretation.FOLDER)
298
self.assertEqual(folder.manifestation,
299
Manifestation.REMOTE_DATA_OBJECT)
300
self.assertTrue(folder.origin.endswith(share_path))
301
self.assertEqual(folder.mimetype, DIRECTORY_MIMETYPE)
302
self.assertEqual(folder.storage, STORAGE_NETWORK)
304
other_user = event.subjects[1]
305
self.assertEqual(other_user.uri, "mailto:" + fake_username)
306
self.assertEqual(other_user.interpretation, INTERPRETATION_U1_CONTACT)
307
self.assertEqual(other_user.manifestation,
308
MANIFESTATION_U1_CONTACT_DATA_OBJECT)
311
class ZeitgeistUDFsTestCase(ZeitgeistListenerTestCase):
312
"""Tests for all UDFs-related zeitgeist events."""
315
"""Initialize this test instance."""
316
super(ZeitgeistUDFsTestCase, self).setUp()
317
self.home_dir = self.mktemp('ubuntuonehacker')
318
self._old_home = os.environ['HOME']
319
os.environ['HOME'] = self.home_dir
322
"""Finalize this test instance."""
323
self.rmtree(self.home_dir)
324
os.environ['HOME'] = self._old_home
325
super(ZeitgeistUDFsTestCase, self).tearDown()
327
def _create_udf(self, id, node_id, suggested_path, subscribed=True):
328
"""Create an UDF and returns it and the volume."""
329
path = get_udf_path(suggested_path)
330
# make sure suggested_path is unicode
331
if isinstance(suggested_path, str):
332
suggested_path = suggested_path.decode('utf-8')
333
udf = UDF(str(id), str(node_id), suggested_path, path, subscribed)
336
@defer.inlineCallbacks
337
def test_udf_create_is_logged(self):
338
"""Test for Folders.create."""
339
path = os.path.join(self.home_dir, u'ƱoƱo'.encode('utf-8'))
341
node_id = uuid.uuid4()
343
def create_udf(path, name, marker):
344
"""Fake create_udf."""
345
# check that the marker is the full path to the udf
346
expanded_path = os.path.expanduser(path.encode('utf-8'))
347
udf_path = os.path.join(expanded_path, name.encode('utf-8'))
348
if str(marker) != udf_path:
349
d.errback(ValueError("marker != path - "
350
"marker: %r path: %r" % (marker, udf_path)))
351
self.main.event_q.push("AQ_CREATE_UDF_OK", **dict(volume_id=id,
355
self.patch(self.main.action_q, "create_udf", create_udf)
358
self._listen_for('VM_UDF_CREATED', d.callback, 1, collect=True)
359
self.vm.create_udf(path.encode('utf-8'))
362
self.assertEqual(len(self.listener.zg.events), 1)
363
event = self.listener.zg.events[0]
365
self.assertEqual(event.interpretation,
366
EVENT_INTERPRETATION_U1_UDF_CREATED)
367
self.assertEqual(event.manifestation,
368
Manifestation.USER_ACTIVITY)
369
self.assertEqual(event.actor, ACTOR_UBUNTUONE)
371
folder = event.subjects[0]
372
self.assertTrue(folder.uri.startswith(URI_PROTOCOL_U1))
373
self.assertEqual(folder.interpretation, Interpretation.FOLDER)
374
self.assertEqual(folder.manifestation,
375
Manifestation.REMOTE_DATA_OBJECT)
376
self.assertTrue(folder.origin.endswith(path))
377
self.assertEqual(folder.mimetype, DIRECTORY_MIMETYPE)
378
self.assertEqual(folder.storage, STORAGE_NETWORK)
380
@defer.inlineCallbacks
381
def test_udf_delete_is_logged(self):
382
"""Test for Folders.delete."""
384
node_id = uuid.uuid4()
385
path = os.path.join(self.home_dir, u'ƱoƱo'.encode('utf-8'))
387
def create_udf(path, name, marker):
388
"""Fake create_udf."""
389
# check that the marker is the full path to the udf
390
expanded_path = os.path.expanduser(path.encode('utf-8'))
391
udf_path = os.path.join(expanded_path, name.encode('utf-8'))
392
if str(marker) != udf_path:
393
d.errback(ValueError("marker != path - "
394
"marker: %r path: %r" % (marker, udf_path)))
395
self.main.event_q.push("AQ_CREATE_UDF_OK", **dict(volume_id=id,
399
self.patch(self.main.action_q, "create_udf", create_udf)
402
self._listen_for('VM_UDF_CREATED', d.callback, 1, collect=True)
403
self.vm.create_udf(path.encode('utf-8'))
406
def delete_volume(path):
407
"""Fake delete_volume."""
408
self.main.event_q.push("AQ_DELETE_VOLUME_OK", volume_id=id)
410
self.patch(self.main.action_q, "delete_volume", delete_volume)
412
self.vm.delete_volume(str(id))
414
self.assertEqual(len(self.listener.zg.events), 1)
415
event = self.listener.zg.events[0]
418
self._listen_for('VM_VOLUME_DELETED', d.callback, 1, collect=True)
421
self.assertEqual(len(self.listener.zg.events), 2)
422
event = self.listener.zg.events[1]
424
self.assertEqual(event.interpretation,
425
EVENT_INTERPRETATION_U1_UDF_DELETED)
426
self.assertEqual(event.manifestation,
427
Manifestation.USER_ACTIVITY)
428
self.assertEqual(event.actor, ACTOR_UBUNTUONE)
430
folder = event.subjects[0]
431
self.assertTrue(folder.uri.startswith(URI_PROTOCOL_U1))
432
self.assertEqual(folder.interpretation, Interpretation.FOLDER)
433
self.assertEqual(folder.manifestation,
434
Manifestation.DELETED_RESOURCE)
435
self.assertTrue(folder.origin.endswith(path))
436
self.assertEqual(folder.mimetype, DIRECTORY_MIMETYPE)
437
self.assertEqual(folder.storage, STORAGE_DELETED)
439
@defer.inlineCallbacks
440
def test_udf_subscribe_is_logged(self):
441
"""Test for Folders.subscribe."""
442
suggested_path = u'~/ƱoƱo'
443
udf = self._create_udf(uuid.uuid4(), 'node_id', suggested_path,
445
yield self.main.vm.add_udf(udf)
447
self._listen_for('VM_UDF_SUBSCRIBED', d.callback, 1, collect=True)
448
self.vm.subscribe_udf(udf.volume_id)
451
self.assertEqual(len(self.listener.zg.events), 2)
452
event = self.listener.zg.events[1]
454
self.assertEqual(event.interpretation,
455
EVENT_INTERPRETATION_U1_UDF_SUBSCRIBED)
456
self.assertEqual(event.manifestation,
457
Manifestation.USER_ACTIVITY)
458
self.assertEqual(event.actor, ACTOR_UBUNTUONE)
460
folder = event.subjects[0]
461
self.assertTrue(folder.uri.endswith(udf.path))
462
self.assertEqual(folder.interpretation, Interpretation.FOLDER)
463
self.assertEqual(folder.manifestation,
464
Manifestation.FILE_DATA_OBJECT)
465
self.assertTrue(folder.origin.startswith(URI_PROTOCOL_U1))
466
self.assertEqual(folder.mimetype, DIRECTORY_MIMETYPE)
467
self.assertEqual(folder.storage, STORAGE_LOCAL)
469
@defer.inlineCallbacks
470
def test_udf_unsubscribe_is_logged(self):
471
"""Test for Folders.unsubscribe."""
472
suggested_path = u'~/ƱoƱo'
473
udf = self._create_udf(uuid.uuid4(), 'node_id', suggested_path,
475
yield self.main.vm.add_udf(udf)
477
self._listen_for('VM_UDF_UNSUBSCRIBED', d.callback, 1, collect=True)
478
self.vm.unsubscribe_udf(udf.volume_id)
481
self.assertEqual(len(self.listener.zg.events), 2)
482
event = self.listener.zg.events[1]
484
self.assertEqual(event.interpretation,
485
EVENT_INTERPRETATION_U1_UDF_UNSUBSCRIBED)
486
self.assertEqual(event.manifestation,
487
Manifestation.USER_ACTIVITY)
488
self.assertEqual(event.actor, ACTOR_UBUNTUONE)
490
folder = event.subjects[0]
491
self.assertTrue(folder.uri.endswith(udf.path))
492
self.assertEqual(folder.interpretation, Interpretation.FOLDER)
493
self.assertEqual(folder.manifestation,
494
Manifestation.DELETED_RESOURCE)
495
self.assertTrue(folder.origin.startswith(URI_PROTOCOL_U1))
496
self.assertEqual(folder.mimetype, DIRECTORY_MIMETYPE)
497
self.assertEqual(folder.storage, STORAGE_DELETED)
500
class ZeitgeistRemoteFileSyncTestCase(ConnectedBaseTestCase):
501
"""File sync events are logged into Zeitgeist."""
504
"""Initialize this test instance."""
505
ConnectedBaseTestCase.setUp(self)
506
self.rq = request_queue = RequestQueue(name='FOO',
507
action_queue=self.action_queue)
509
class MyUpload(Upload):
510
"""Just to allow monkeypatching."""
513
self.command = MyUpload(request_queue, share_id=self.share_id,
514
node_id='a_node_id', previous_hash='prev_hash',
515
hash='yadda', crc32=0, size=0,
516
fileobj_factory=lambda: None,
517
tempfile_factory=lambda: None)
518
self.command.pre_queue_setup() # create the logger
519
self.fsm = self.action_queue.main.fs
520
self.vm = self.action_queue.main.vm
521
self.patch(zglog, "ZeitgeistLogger", MockLogger)
522
self.listener = ZeitgeistListener(self.fsm, self.vm)
523
self.action_queue.event_queue.subscribe(self.listener)
524
self.root_id = "roootid"
525
self.sync = Sync(main=self.main)
528
"""Finalize this test instance."""
529
ConnectedBaseTestCase.tearDown(self)
531
def test_syncdaemon_creates_file_on_server_is_logged(self):
532
"""Files created by SyncDaemon on the server are logged."""
533
filename = "filename.mp3"
534
path = os.path.join(self.vm.root.path, filename)
535
self.fsm.create(path, "")
536
self.fsm.set_node_id(path, "a_node_id")
538
request = client.MakeFile(self.action_queue.client, self.share_id,
540
request.new_id = 'a_node_id'
541
request.new_generation = 13
543
# create a command and trigger it success
544
cmd = MakeFile(self.rq, self.share_id, 'parent', filename, 'marker')
545
res = cmd.handle_success(request)
546
assert res is request
548
# create a request and fill it with succesful information
549
request = client.PutContent(self.action_queue.client, self.share_id,
550
'node', 'prvhash', 'newhash', 'crc32',
551
'size', 'deflated', 'fd')
552
request.new_generation = 13
554
# trigger success in the command
555
self.command.handle_success(request)
557
# check for successful event
558
kwargs = dict(share_id=self.command.share_id, node_id='a_node_id',
559
hash='yadda', new_generation=13)
561
info = dict(marker='marker', new_id='a_node_id', new_generation=13,
562
volume_id=self.share_id)
564
('AQ_FILE_NEW_OK', (), info),
565
('AQ_UPLOAD_FINISHED', (), kwargs)
567
self.assertEqual(events, self.command.action_queue.event_queue.events)
569
self.assertEqual(len(self.listener.zg.events), 1)
570
event = self.listener.zg.events[0]
572
self.assertEqual(event.interpretation,
573
Interpretation.CREATE_EVENT)
574
self.assertEqual(event.manifestation,
575
Manifestation.SCHEDULED_ACTIVITY)
576
self.assertEqual(event.actor, ACTOR_UBUNTUONE)
578
remote_file = event.subjects[0]
579
self.assertTrue(remote_file.uri.startswith(URI_PROTOCOL_U1))
580
self.assertEqual(remote_file.interpretation, Interpretation.AUDIO)
581
self.assertEqual(remote_file.manifestation,
582
Manifestation.REMOTE_DATA_OBJECT)
583
self.assertTrue(remote_file.origin.endswith(filename))
584
self.assertEqual(remote_file.mimetype, "audio/mpeg")
585
self.assertEqual(remote_file.storage, STORAGE_NETWORK)
587
def test_syncdaemon_creates_dir_on_server_is_logged(self):
588
"""Dirs created by SyncDaemon on the server are logged."""
590
path = os.path.join(self.vm.root.path, dirname)
591
self.fsm.create(path, "")
592
self.fsm.set_node_id(path, "a_node_id")
594
request = client.MakeDir(self.action_queue.client, self.share_id,
596
request.new_id = 'a_node_id'
597
request.new_generation = 13
599
# create a command and trigger it success
600
cmd = MakeDir(self.rq, self.share_id, 'parent', dirname, 'marker')
601
res = cmd.handle_success(request)
602
assert res is request
604
# check for successful event
605
info = dict(marker='marker', new_id='a_node_id', new_generation=13,
606
volume_id=self.share_id)
608
('AQ_DIR_NEW_OK', (), info),
610
self.assertEqual(events, self.command.action_queue.event_queue.events)
612
self.assertEqual(len(self.listener.zg.events), 1)
613
event = self.listener.zg.events[0]
615
self.assertEqual(event.interpretation,
616
Interpretation.CREATE_EVENT)
617
self.assertEqual(event.manifestation,
618
Manifestation.SCHEDULED_ACTIVITY)
619
self.assertEqual(event.actor, ACTOR_UBUNTUONE)
621
remote_file = event.subjects[0]
622
self.assertTrue(remote_file.uri.startswith(URI_PROTOCOL_U1))
623
self.assertEqual(remote_file.interpretation, Interpretation.FOLDER)
624
self.assertEqual(remote_file.manifestation,
625
Manifestation.REMOTE_DATA_OBJECT)
626
self.assertTrue(remote_file.origin.endswith(dirname))
627
self.assertEqual(remote_file.mimetype, DIRECTORY_MIMETYPE)
628
self.assertEqual(remote_file.storage, STORAGE_NETWORK)
630
def test_syncdaemon_modifies_on_server_is_logged(self):
631
"""Files modified by SyncDaemon on the server are logged."""
632
filename = "filename.mp3"
633
path = os.path.join(self.vm.root.path, filename)
634
self.fsm.create(path, "")
635
self.fsm.set_node_id(path, "a_node_id")
637
# create a request and fill it with succesful information
638
request = client.PutContent(self.action_queue.client, self.share_id,
639
'node', 'prvhash', 'newhash', 'crc32',
640
'size', 'deflated', 'fd')
641
request.new_generation = 13
643
# trigger success in the command
644
self.command.handle_success(request)
646
# check for successful event
647
kwargs = dict(share_id=self.command.share_id, node_id='a_node_id',
648
hash='yadda', new_generation=13)
650
events = [('AQ_UPLOAD_FINISHED', (), kwargs)]
651
self.assertEqual(events, self.command.action_queue.event_queue.events)
653
self.assertEqual(len(self.listener.zg.events), 1)
654
event = self.listener.zg.events[0]
656
self.assertEqual(event.interpretation,
657
Interpretation.MODIFY_EVENT)
658
self.assertEqual(event.manifestation,
659
Manifestation.SCHEDULED_ACTIVITY)
660
self.assertEqual(event.actor, ACTOR_UBUNTUONE)
662
remote_file = event.subjects[0]
663
self.assertTrue(remote_file.uri.startswith(URI_PROTOCOL_U1))
664
self.assertEqual(remote_file.interpretation, Interpretation.AUDIO)
665
self.assertEqual(remote_file.manifestation,
666
Manifestation.REMOTE_DATA_OBJECT)
667
self.assertTrue(remote_file.origin.endswith(filename))
668
self.assertEqual(remote_file.mimetype, "audio/mpeg")
669
self.assertEqual(remote_file.storage, STORAGE_NETWORK)
671
@defer.inlineCallbacks
672
def test_syncdaemon_deletes_file_on_server_is_logged(self):
673
"""Files deleted by SD on the server are logged."""
675
listen_for(self.main.event_q, 'AQ_UNLINK_OK', d.callback)
677
path = os.path.join(self.main.vm.root.path, "filename.mp3")
678
self.main.fs.create(path, "")
679
self.main.fs.set_node_id(path, "node_id")
680
self.main.event_q.push("AQ_UNLINK_OK", share_id="",
681
parent_id="parent_id",
682
node_id="node_id", new_generation=13)
685
self.assertEqual(len(self.listener.zg.events), 1)
686
event = self.listener.zg.events[0]
688
self.assertEqual(event.interpretation,
689
Interpretation.DELETE_EVENT)
690
self.assertEqual(event.manifestation,
691
Manifestation.SCHEDULED_ACTIVITY)
692
self.assertEqual(event.actor, ACTOR_UBUNTUONE)
694
remote_folder = event.subjects[0]
695
self.assertTrue(remote_folder.uri.startswith(URI_PROTOCOL_U1))
696
self.assertEqual(remote_folder.interpretation, Interpretation.AUDIO)
697
self.assertEqual(remote_folder.manifestation,
698
Manifestation.DELETED_RESOURCE)
699
self.assertTrue(remote_folder.origin.endswith("filename.mp3"))
700
self.assertEqual(remote_folder.mimetype, "audio/mpeg")
701
self.assertEqual(remote_folder.storage, STORAGE_DELETED)
703
@defer.inlineCallbacks
704
def test_syncdaemon_deletes_dir_on_server_is_logged(self):
705
"""Files deleted by SD on the server are logged."""
707
listen_for(self.main.event_q, 'AQ_UNLINK_OK', d.callback)
709
path = os.path.join(self.main.vm.root.path, "folder name")
710
self.main.fs.create(path, "", is_dir=True)
711
self.main.fs.set_node_id(path, "node_id")
712
self.main.event_q.push("AQ_UNLINK_OK", share_id="",
713
parent_id="parent_id",
714
node_id="node_id", new_generation=13)
717
self.assertEqual(len(self.listener.zg.events), 1)
718
event = self.listener.zg.events[0]
720
self.assertEqual(event.interpretation,
721
Interpretation.DELETE_EVENT)
722
self.assertEqual(event.manifestation,
723
Manifestation.SCHEDULED_ACTIVITY)
724
self.assertEqual(event.actor, ACTOR_UBUNTUONE)
726
remote_folder = event.subjects[0]
727
self.assertTrue(remote_folder.uri.startswith(URI_PROTOCOL_U1))
728
self.assertEqual(remote_folder.interpretation, Interpretation.FOLDER)
729
self.assertEqual(remote_folder.manifestation,
730
Manifestation.DELETED_RESOURCE)
731
self.assertTrue(remote_folder.origin.endswith("folder name"))
732
self.assertEqual(remote_folder.mimetype, DIRECTORY_MIMETYPE)
733
self.assertEqual(remote_folder.storage, STORAGE_DELETED)
736
class ZeitgeistLocalFileSyncTestCase(BaseTwistedTestCase):
737
"""Zeitgeist events coming from the server."""
741
"""Initialize this instance."""
742
BaseTwistedTestCase.setUp(self)
743
self.root = self.mktemp('root')
744
self.shares = self.mktemp('shares')
745
self.data = self.mktemp('data')
746
self.partials_dir = self.mktemp('partials_dir')
747
self.handler = MementoHandler()
748
self.handler.setLevel(logging.ERROR)
749
FakeMain._sync_class = Sync
750
self.main = FakeMain(root_dir=self.root, shares_dir=self.shares,
752
partials_dir=self.partials_dir)
753
self._logger = logging.getLogger('ubuntuone.SyncDaemon')
754
self._logger.addHandler(self.handler)
756
self.root_id = root_id = "roootid"
757
self.main.vm._got_root(root_id)
758
self.filemp3delta = delta.FileInfoDelta(
759
generation=5, is_live=True, file_type=delta.FILE,
760
parent_id=self.root_id, share_id=ROOT, node_id=uuid.uuid4(),
761
name=u"fileƱ.mp3", is_public=False, content_hash="hash",
762
crc32=1, size=10, last_modified=0)
764
self.dirdelta = delta.FileInfoDelta(
765
generation=6, is_live=True, file_type=delta.DIRECTORY,
766
parent_id=root_id, share_id=ROOT, node_id=uuid.uuid4(),
767
name=u"directory_Ʊ", is_public=False, content_hash="hash",
768
crc32=1, size=10, last_modified=0)
770
self.patch(zglog, "ZeitgeistLogger", MockLogger)
771
self.listener = ZeitgeistListener(self.main.fs, self.main.vm)
772
self.main.event_q.subscribe(self.listener)
775
"""Clean up this instance."""
776
self._logger.removeHandler(self.handler)
778
FakeMain._sync_class = None
779
shutil.rmtree(self.root)
780
shutil.rmtree(self.shares)
781
shutil.rmtree(self.data)
782
for record in self.handler.records:
783
exc_info = getattr(record, 'exc_info', None)
784
if exc_info is not None:
785
raise exc_info[0], exc_info[1], exc_info[2]
786
BaseTwistedTestCase.tearDown(self)
788
@defer.inlineCallbacks
789
def test_syncdaemon_creates_file_locally_is_logged(self):
790
"""Files created locally by SyncDaemon are logged."""
792
d2 = defer.Deferred()
793
listen_for(self.main.event_q, 'SV_FILE_NEW', d.callback)
794
listen_for(self.main.event_q, 'AQ_DOWNLOAD_FINISHED', d2.callback)
796
deltas = [ self.filemp3delta ]
797
kwargs = dict(volume_id=ROOT, delta_content=deltas, end_generation=11,
798
full=True, free_bytes=10)
799
self.main.sync.handle_AQ_DELTA_OK(**kwargs)
801
# check that the file is created
802
node = self.main.fs.get_by_node_id(ROOT, self.filemp3delta.node_id)
803
self.assertEqual(node.path, self.filemp3delta.name.encode('utf8'))
804
self.assertEqual(node.is_dir, False)
805
self.assertEqual(node.generation, self.filemp3delta.generation)
807
yield d # wait for SV_FILE_NEW
810
share_id=self.filemp3delta.share_id,
811
node_id=self.filemp3delta.node_id,
812
server_hash="server hash")
813
self.main.event_q.push("AQ_DOWNLOAD_FINISHED", **dlargs)
815
yield d2 # wait for AQ_DOWNLOAD_FINISHED
817
self.assertEqual(len(self.listener.zg.events), 1)
818
event = self.listener.zg.events[0]
820
self.assertEqual(event.interpretation,
821
Interpretation.CREATE_EVENT)
822
self.assertEqual(event.manifestation,
823
Manifestation.WORLD_ACTIVITY)
824
self.assertEqual(event.actor, ACTOR_UBUNTUONE)
826
local_file = event.subjects[0]
827
self.assertTrue(local_file.uri.endswith(self.filemp3delta.name))
828
self.assertEqual(local_file.interpretation, Interpretation.AUDIO)
829
self.assertEqual(local_file.manifestation,
830
Manifestation.FILE_DATA_OBJECT)
831
self.assertTrue(local_file.origin.startswith(URI_PROTOCOL_U1))
832
self.assertEqual(local_file.mimetype, "audio/mpeg")
833
self.assertEqual(local_file.storage, STORAGE_LOCAL)
835
@defer.inlineCallbacks
836
def test_syncdaemon_creates_dir_locally_is_logged(self):
837
"""Dirs created locally by SyncDaemon are logged."""
839
listen_for(self.main.event_q, 'SV_DIR_NEW', d.callback)
841
deltas = [ self.dirdelta ]
842
kwargs = dict(volume_id=ROOT, delta_content=deltas, end_generation=11,
843
full=True, free_bytes=10)
844
self.main.sync.handle_AQ_DELTA_OK(**kwargs)
846
# check that the dir is created
847
node = self.main.fs.get_by_node_id(ROOT, self.dirdelta.node_id)
848
self.assertEqual(node.path, self.dirdelta.name.encode('utf8'))
849
self.assertEqual(node.is_dir, True)
850
self.assertEqual(node.generation, self.dirdelta.generation)
852
yield d # wait for SV_DIR_NEW
854
self.assertEqual(len(self.listener.zg.events), 1)
855
event = self.listener.zg.events[0]
857
self.assertEqual(event.interpretation,
858
Interpretation.CREATE_EVENT)
859
self.assertEqual(event.manifestation,
860
Manifestation.WORLD_ACTIVITY)
861
self.assertEqual(event.actor, ACTOR_UBUNTUONE)
863
local_file = event.subjects[0]
864
self.assertTrue(local_file.uri.endswith(self.dirdelta.name))
865
self.assertEqual(local_file.interpretation, Interpretation.FOLDER)
866
self.assertEqual(local_file.manifestation,
867
Manifestation.FILE_DATA_OBJECT)
868
self.assertTrue(local_file.origin.startswith(URI_PROTOCOL_U1))
869
self.assertEqual(local_file.mimetype, DIRECTORY_MIMETYPE)
870
self.assertEqual(local_file.storage, STORAGE_LOCAL)
872
@defer.inlineCallbacks
873
def test_syncdaemon_modifies_locally_is_logged(self):
874
"""Files modified locally by SyncDaemon are logged."""
876
d2 = defer.Deferred()
877
listen_for(self.main.event_q, 'SV_FILE_NEW', d.callback)
878
listen_for(self.main.event_q, 'AQ_DOWNLOAD_FINISHED', d2.callback)
880
deltas = [ self.filemp3delta ]
881
kwargs = dict(volume_id=ROOT, delta_content=deltas, end_generation=11,
882
full=True, free_bytes=10)
883
self.main.sync.handle_AQ_DELTA_OK(**kwargs)
885
# check that the file is modified
886
node = self.main.fs.get_by_node_id(ROOT, self.filemp3delta.node_id)
887
self.assertEqual(node.path, self.filemp3delta.name.encode('utf8'))
888
self.assertEqual(node.is_dir, False)
889
self.assertEqual(node.generation, self.filemp3delta.generation)
891
yield d # wait for SV_FILE_NEW
893
# remove from the recent list
894
local_file_id = (self.filemp3delta.share_id, self.filemp3delta.node_id)
895
self.listener.newly_created_local_files.remove(local_file_id)
898
share_id=self.filemp3delta.share_id,
899
node_id=self.filemp3delta.node_id,
900
server_hash="server hash")
901
self.main.event_q.push("AQ_DOWNLOAD_FINISHED", **dlargs)
903
yield d2 # wait for AQ_DOWNLOAD_FINISHED
905
self.assertEqual(len(self.listener.zg.events), 1)
906
event = self.listener.zg.events[0]
908
self.assertEqual(event.interpretation,
909
Interpretation.MODIFY_EVENT)
910
self.assertEqual(event.manifestation,
911
Manifestation.WORLD_ACTIVITY)
912
self.assertEqual(event.actor, ACTOR_UBUNTUONE)
914
local_file = event.subjects[0]
915
self.assertTrue(local_file.uri.endswith(self.filemp3delta.name))
916
self.assertEqual(local_file.interpretation, Interpretation.AUDIO)
917
self.assertEqual(local_file.manifestation,
918
Manifestation.FILE_DATA_OBJECT)
919
self.assertTrue(local_file.origin.startswith(URI_PROTOCOL_U1))
920
self.assertEqual(local_file.mimetype, "audio/mpeg")
921
self.assertEqual(local_file.storage, STORAGE_LOCAL)
923
@defer.inlineCallbacks
924
def test_syncdaemon_deletes_file_locally_is_logged(self):
925
"""Files deleted locally by SyncDaemon are logged."""
927
listen_for(self.main.event_q, 'SV_FILE_DELETED', d.callback)
929
filename = self.filemp3delta.name.encode("utf-8")
930
path = os.path.join(self.main.vm.root.path, filename)
931
self.main.fs.create(path, "")
932
self.main.fs.set_node_id(path, "node_id")
933
self.main.event_q.push("SV_FILE_DELETED", volume_id="",
934
node_id="node_id", is_dir=False)
938
self.assertEqual(len(self.listener.zg.events), 1)
939
event = self.listener.zg.events[0]
941
self.assertEqual(event.interpretation,
942
Interpretation.DELETE_EVENT)
943
self.assertEqual(event.manifestation,
944
Manifestation.WORLD_ACTIVITY)
945
self.assertEqual(event.actor, ACTOR_UBUNTUONE)
947
local_file = event.subjects[0]
948
self.assertTrue(local_file.uri.endswith(self.filemp3delta.name))
949
self.assertEqual(local_file.interpretation, Interpretation.AUDIO)
950
self.assertEqual(local_file.manifestation,
951
Manifestation.DELETED_RESOURCE)
952
self.assertTrue(local_file.origin.startswith(URI_PROTOCOL_U1))
953
self.assertEqual(local_file.mimetype, "audio/mpeg")
954
self.assertEqual(local_file.storage, STORAGE_DELETED)
956
@defer.inlineCallbacks
957
def test_syncdaemon_deletes_dir_locally_is_logged(self):
958
"""Dirs deleted locally by SyncDaemon are logged."""
960
listen_for(self.main.event_q, 'SV_FILE_DELETED', d.callback)
962
path = os.path.join(self.main.vm.root.path, "folder name")
963
self.main.fs.create(path, "", is_dir=True)
964
self.main.fs.set_node_id(path, "node_id")
965
self.main.event_q.push("SV_FILE_DELETED", volume_id="",
966
node_id="node_id", is_dir=True)
970
self.assertEqual(len(self.listener.zg.events), 1)
971
event = self.listener.zg.events[0]
973
self.assertEqual(event.interpretation,
974
Interpretation.DELETE_EVENT)
975
self.assertEqual(event.manifestation,
976
Manifestation.WORLD_ACTIVITY)
977
self.assertEqual(event.actor, ACTOR_UBUNTUONE)
979
local_folder = event.subjects[0]
980
self.assertTrue(local_folder.uri.endswith("folder name"))
981
self.assertEqual(local_folder.interpretation, Interpretation.FOLDER)
982
self.assertEqual(local_folder.manifestation,
983
Manifestation.DELETED_RESOURCE)
984
self.assertTrue(local_folder.origin.startswith(URI_PROTOCOL_U1))
985
self.assertEqual(local_folder.mimetype, DIRECTORY_MIMETYPE)
986
self.assertEqual(local_folder.storage, STORAGE_DELETED)
988
@defer.inlineCallbacks
989
def test_file_sync_conflict_is_logged(self):
990
"""Files renamed because of conflict are logged."""
992
listen_for(self.main.event_q, 'FSM_FILE_CONFLICT', d.callback)
994
testfile = os.path.join(self.main.vm.root.path, 'sample.mp3')
995
mdid = self.main.fs.create(testfile, "")
996
self.main.fs.set_node_id(testfile, "uuid")
997
with open(testfile, "w") as fh:
998
fh.write("this is music!")
1000
self.main.fs.move_to_conflict(mdid)
1004
self.assertEqual(len(self.listener.zg.events), 1)
1005
event = self.listener.zg.events[0]
1007
self.assertEqual(event.interpretation,
1008
EVENT_INTERPRETATION_U1_CONFLICT_RENAME)
1009
self.assertEqual(event.manifestation,
1010
Manifestation.WORLD_ACTIVITY)
1011
self.assertEqual(event.actor, ACTOR_UBUNTUONE)
1013
local_file = event.subjects[0]
1014
new_name = testfile + self.main.fs.CONFLICT_SUFFIX
1015
self.assertTrue(local_file.uri.endswith(new_name))
1016
self.assertEqual(local_file.interpretation, Interpretation.AUDIO)
1017
self.assertEqual(local_file.manifestation,
1018
Manifestation.FILE_DATA_OBJECT)
1019
self.assertTrue(local_file.origin.endswith(testfile))
1020
self.assertEqual(local_file.mimetype, "audio/mpeg")
1021
self.assertEqual(local_file.storage, STORAGE_LOCAL)
1023
@defer.inlineCallbacks
1024
def test_dir_sync_conflict_is_logged(self):
1025
"""Dirs renamed because of conflict are logged."""
1026
d = defer.Deferred()
1027
listen_for(self.main.event_q, 'FSM_DIR_CONFLICT', d.callback)
1029
testdir = os.path.join(self.main.vm.root.path, 'sampledir')
1030
mdid = self.main.fs.create(testdir, "", is_dir=True)
1031
self.main.fs.set_node_id(testdir, "uuid")
1034
self.main.fs.move_to_conflict(mdid)
1038
self.assertEqual(len(self.listener.zg.events), 1)
1039
event = self.listener.zg.events[0]
1041
self.assertEqual(event.interpretation,
1042
EVENT_INTERPRETATION_U1_CONFLICT_RENAME)
1043
self.assertEqual(event.manifestation,
1044
Manifestation.WORLD_ACTIVITY)
1045
self.assertEqual(event.actor, ACTOR_UBUNTUONE)
1047
local_file = event.subjects[0]
1048
new_name = testdir + self.main.fs.CONFLICT_SUFFIX
1049
self.assertTrue(local_file.uri.endswith(new_name))
1050
self.assertEqual(local_file.interpretation, Interpretation.FOLDER)
1051
self.assertEqual(local_file.manifestation,
1052
Manifestation.FILE_DATA_OBJECT)
1053
self.assertTrue(local_file.origin.endswith(testdir))
1054
self.assertEqual(local_file.mimetype, DIRECTORY_MIMETYPE)
1055
self.assertEqual(local_file.storage, STORAGE_LOCAL)
1057
class ZeitgeistPublicFilesTestCase(ZeitgeistListenerTestCase):
1058
"""Public files events are logged into Zeitgeist."""
1060
@defer.inlineCallbacks
1061
def test_publish_url_is_logged(self):
1062
"""Publishing a file with a url is logged."""
1066
public_url = 'http://example.com/foo.mp3'
1068
share_path = os.path.join(self.shares_dir, 'share')
1069
self.main.vm.add_share(Share(path=share_path, volume_id='share',
1070
other_username='other username'))
1071
path = os.path.join(share_path, "foo.mp3")
1072
self.main.fs.create(path, str(share_id))
1073
self.main.fs.set_node_id(path, str(node_id))
1075
d = defer.Deferred()
1076
self._listen_for('AQ_CHANGE_PUBLIC_ACCESS_OK', d.callback)
1077
self.main.event_q.push('AQ_CHANGE_PUBLIC_ACCESS_OK',
1078
share_id=share_id, node_id=node_id,
1079
is_public=is_public, public_url=public_url)
1082
self.assertEqual(len(self.listener.zg.events), 2)
1083
event = self.listener.zg.events[1]
1085
self.assertEqual(event.interpretation,
1086
Interpretation.CREATE_EVENT)
1087
self.assertEqual(event.manifestation,
1088
Manifestation.USER_ACTIVITY)
1089
self.assertEqual(event.actor, ACTOR_UBUNTUONE)
1091
public_file = event.subjects[0]
1092
self.assertEqual(public_file.uri, public_url)
1093
self.assertEqual(public_file.interpretation, Interpretation.AUDIO)
1094
self.assertEqual(public_file.manifestation,
1095
Manifestation.REMOTE_DATA_OBJECT)
1096
self.assertTrue(public_file.origin.endswith(node_id))
1097
self.assertEqual(public_file.mimetype, "audio/mpeg")
1098
self.assertEqual(public_file.storage, STORAGE_NETWORK)
1100
@defer.inlineCallbacks
1101
def test_unpublish_url_is_logged(self):
1102
"""Unpublishing a file with a url is logged."""
1106
public_url = 'http://example.com/foo.mp3'
1108
share_path = os.path.join(self.shares_dir, 'share')
1109
self.main.vm.add_share(Share(path=share_path, volume_id='share',
1110
other_username='other username'))
1111
path = os.path.join(share_path, "foo.mp3")
1112
self.main.fs.create(path, str(share_id))
1113
self.main.fs.set_node_id(path, str(node_id))
1115
d = defer.Deferred()
1116
self._listen_for('AQ_CHANGE_PUBLIC_ACCESS_OK', d.callback)
1117
self.main.event_q.push('AQ_CHANGE_PUBLIC_ACCESS_OK',
1118
share_id=share_id, node_id=node_id,
1119
is_public=is_public, public_url=public_url)
1122
self.assertEqual(len(self.listener.zg.events), 2)
1123
event = self.listener.zg.events[1]
1125
self.assertEqual(event.interpretation,
1126
Interpretation.DELETE_EVENT)
1127
self.assertEqual(event.manifestation,
1128
Manifestation.USER_ACTIVITY)
1129
self.assertEqual(event.actor, ACTOR_UBUNTUONE)
1131
public_file = event.subjects[0]
1132
self.assertEqual(public_file.uri, public_url)
1133
self.assertEqual(public_file.interpretation, Interpretation.AUDIO)
1134
self.assertEqual(public_file.manifestation,
1135
Manifestation.DELETED_RESOURCE)
1136
self.assertTrue(public_file.origin.endswith(node_id))
1137
self.assertEqual(public_file.mimetype, "audio/mpeg")
1138
self.assertEqual(public_file.storage, STORAGE_DELETED)