336
336
# check that the share is in the fsm metadata
337
337
self.assertEqual(mdobj.node_id, share.node_id)
338
338
self.assertEqual(mdobj.share_id, share.volume_id)
339
self.assertTrue(share.subscribed)
339
self.assertTrue(self.vm.shares[share.volume_id].subscribed)
340
340
self.assertFalse(self.main.event_q.has_watch(share.path))
342
342
@defer.inlineCallbacks
375
375
self.assertEqual(2, len(self.vm.shares))
376
self.assertTrue(share.subscribed)
376
self.assertTrue(self.vm.shares[share.volume_id].subscribed)
377
377
self.assertTrue(self.main.event_q.has_watch(share.path))
379
379
@defer.inlineCallbacks
1582
1582
share_created_d = defer.Deferred()
1583
1583
vol_new_gen_d = defer.Deferred()
1585
# patch aq.rescan_from_scratch in order to intercept the calls
1586
root_from_scratch_d = defer.Deferred()
1587
share_from_scratch_d = defer.Deferred()
1588
from_scratch_deferreds = {'':root_from_scratch_d,
1589
str(share_id):share_from_scratch_d}
1590
self.patch(self.main.action_q, 'rescan_from_scratch',
1591
lambda vol_id: from_scratch_deferreds.pop(vol_id).callback(vol_id))
1585
1593
self.vm.refresh_volumes = lambda: self.fail('refresh_volumes called!')
1586
1594
# listen for VM_SHARE_CREATED event for the new share
1587
1595
self._listen_for('VM_SHARE_CREATED', share_created_d.callback)
1588
1596
if auto_subscribe:
1589
expected_events = [{'generation': 10, 'volume_id': str(share_id)},
1590
{'generation': 17, 'volume_id': ''}]
1591
self.patch(self.vm, '_scan_share',
1592
lambda *a, **kw: defer.succeed(None))
1597
expected_events = [{'generation': 17, 'volume_id': ''}]
1593
1598
self._listen_for('SV_VOLUME_NEW_GENERATION',
1594
vol_new_gen_d.callback, 2, collect=True)
1599
vol_new_gen_d.callback, 1, collect=True)
1596
1601
expected_events = {'generation': 17, 'volume_id': ''}
1597
1602
self.patch(self.vm, '_scan_share', lambda *a, **kw: self.fail(a))
1605
1610
yield share_created_d
1606
1611
events = yield vol_new_gen_d
1608
1612
self.assertEqual(events, expected_events)
1614
vol_id = yield root_from_scratch_d
1615
self.assertEqual(vol_id, '')
1617
vol_id = yield share_from_scratch_d
1618
self.assertEqual(vol_id, str(share_id))
1611
1621
"""The test itself."""
1612
1622
self.assertEqual(2, len(self.vm.shares)) # the share and the root
1629
1639
# root was already checked on test_handle_AQ_LIST_VOLUMES_root
1640
# patch aq.rescan_from_scratch in order to intercept the calls
1641
root_from_scratch_d = defer.Deferred()
1642
share_from_scratch_d = defer.Deferred()
1643
from_scratch_deferreds = {'':root_from_scratch_d,
1644
str(share_id):share_from_scratch_d}
1645
self.patch(self.main.action_q, 'rescan_from_scratch',
1646
lambda vol_id: from_scratch_deferreds.pop(vol_id).callback(vol_id))
1631
1648
# now send the same list again and check using a custom home
1632
1649
with environ('HOME', self.home_dir):
1633
1650
self.vm.handle_AQ_LIST_VOLUMES(response)
1652
vol_id = yield root_from_scratch_d
1653
self.assertEqual(vol_id, '')
1655
vol_id = yield share_from_scratch_d
1656
self.assertEqual(vol_id, str(share_id))
1637
1660
@defer.inlineCallbacks
1661
1684
udf_created_d = defer.Deferred()
1662
1685
vol_new_gen_d = defer.Deferred()
1687
# patch aq.rescan_from_scratch in order to intercept the calls
1688
root_from_scratch_d = defer.Deferred()
1689
share_from_scratch_d = defer.Deferred()
1690
from_scratch_deferreds = {'':root_from_scratch_d,
1691
str(udf_id):share_from_scratch_d}
1692
self.patch(self.main.action_q, 'rescan_from_scratch',
1693
lambda vol_id: from_scratch_deferreds.pop(vol_id).callback(vol_id))
1664
1695
# listen for VM_UDF_CREATED event for the new UDF
1665
1696
self._listen_for('VM_UDF_CREATED', udf_created_d.callback)
1666
1697
if auto_subscribe:
1667
expected_events = [{'generation': 23, 'volume_id': str(udf_id)},
1668
{'generation': 17, 'volume_id': ''}]
1669
self.patch(self.vm, '_scan_udf', defer.succeed)
1698
expected_events = [{'generation': 17, 'volume_id': ''}]
1670
1699
self._listen_for('SV_VOLUME_NEW_GENERATION',
1671
vol_new_gen_d.callback, 2, collect=True)
1700
vol_new_gen_d.callback, 1, collect=True)
1673
1702
expected_events = {'generation': 17, 'volume_id': ''}
1674
1703
self.patch(self.vm, '_scan_udf', self.fail)
1682
1711
yield udf_created_d
1683
1712
events = yield vol_new_gen_d
1685
1713
self.assertEqual(events, expected_events)
1715
vol_id = yield root_from_scratch_d
1716
self.assertEqual(vol_id, '')
1718
vol_id = yield share_from_scratch_d
1719
self.assertEqual(vol_id, str(udf_id))
1688
1722
"""The test itself."""
1689
1723
self.assertEqual(1, len(self.vm.udfs)) # the new udf
1730
1764
old_home = os.environ['HOME']
1731
1765
os.environ['HOME'] = self.home_dir
1732
1766
self.addCleanup(lambda: os.environ.__setitem__('HOME', old_home))
1767
self.main.event_q.push('SYS_INIT_DONE')
1734
1769
def test_get_udf_path_name(self):
1735
1770
"""Test for _get_udf_path_name."""
2160
2195
self.assertEquals(udf.node_id, str(node_id))
2161
2196
self.assertEquals(0, len(self.vm.marker_udf_map))
2162
2197
self.assertTrue(self.vm.udfs[str(udf_id)])
2163
self.assertTrue(udf.subscribed)
2198
self.assertTrue(self.vm.udfs[str(udf_id)].subscribed)
2164
2199
self.assertTrue(os.path.exists(udf.path))
2165
2200
d.addCallback(check)
2322
2357
self.assertEquals(udf.volume_id, str(udf_id))
2323
2358
self.assertIn(str(udf_id), self.vm.udfs)
2324
2359
if auto_subscribe:
2325
self.assertTrue(udf.subscribed)
2360
self.assertTrue(self.vm.udfs[udf.id].subscribed)
2326
2361
self.assertTrue(os.path.exists(udf.path))
2327
2362
# check that rescan_from_scratch is called
2328
2363
vol_id = yield rescan_cb
2329
2364
self.assertEqual(vol_id, udf.volume_id)
2331
self.assertFalse(udf.subscribed)
2366
self.assertFalse(self.vm.udfs[udf.id].subscribed)
2332
2367
self.assertFalse(os.path.exists(udf.path))
2334
2369
def test_handle_SV_VOLUME_CREATED_udf_subscribe(self):
2818
2853
# patch the fake action queue
2819
2854
self.main.action_q.query_volumes = lambda: defer.succeed(response)
2820
self.patch(self.vm, '_scan_volume', lambda *a, **kw: defer.succeed(a))
2856
# patch aq.rescan_from_scratch in order to intercept the calls
2857
root_from_scratch_d = defer.Deferred()
2858
share_from_scratch_d = defer.Deferred()
2859
from_scratch_deferreds = {'':root_from_scratch_d,
2860
str(share_id):share_from_scratch_d}
2861
self.patch(self.main.action_q, 'rescan_from_scratch',
2862
lambda vol_id: from_scratch_deferreds.pop(vol_id).callback(vol_id))
2822
2864
vol_rescan_d = defer.Deferred()
2823
2865
self._listen_for('SV_VOLUME_NEW_GENERATION',
2824
vol_rescan_d.callback, 2, collect=True)
2866
vol_rescan_d.callback, 1, collect=True)
2825
2867
server_rescan_d = defer.Deferred()
2826
2868
self._listen_for('SYS_SERVER_RESCAN_DONE', server_rescan_d.callback)
2827
2869
with environ('HOME', self.home_dir):
2830
2872
yield server_rescan_d
2831
2873
events = yield vol_rescan_d
2833
expected_events = [{'generation': 17, 'volume_id': str(share_id)},
2834
{'generation': 1, 'volume_id': ''}]
2874
expected_events = [{'generation': 1, 'volume_id': ''}]
2835
2875
self.assertEqual(expected_events, events)
2877
vol_id = yield root_from_scratch_d
2878
self.assertEqual(vol_id, '')
2879
vol_id = yield share_from_scratch_d
2880
self.assertEqual(vol_id, str(share_id))
2837
2882
@defer.inlineCallbacks
2838
2883
def test_server_rescan_with_udf_autosubscribe(self):
2839
2884
"""Test the server_rescan method."""
2850
2895
# patch the fake action queue
2851
2896
self.main.action_q.query_volumes = lambda: defer.succeed(response)
2853
self.patch(self.vm, '_scan_volume', defer.succeed)
2898
# patch aq.rescan_from_scratch in order to intercept the calls
2899
root_from_scratch_d = defer.Deferred()
2900
udf_from_scratch_d = defer.Deferred()
2901
from_scratch_deferreds = {'':root_from_scratch_d,
2902
str(udf_id):udf_from_scratch_d}
2903
self.patch(self.main.action_q, 'rescan_from_scratch',
2904
lambda vol_id: from_scratch_deferreds.pop(vol_id).callback(vol_id))
2907
self.patch(self.main.lr, 'scan_dir', lambda *a, **k: None)
2855
2909
vol_rescan_d = defer.Deferred()
2856
2910
self._listen_for('SV_VOLUME_NEW_GENERATION',
2857
vol_rescan_d.callback, 2, collect=True)
2911
vol_rescan_d.callback, 1, collect=True)
2858
2912
server_rescan_d = defer.Deferred()
2859
2913
self._listen_for('SYS_SERVER_RESCAN_DONE', server_rescan_d.callback)
2860
2914
with environ('HOME', self.home_dir):
2861
2915
yield self.vm.server_rescan()
2863
yield server_rescan_d
2864
2918
events = yield vol_rescan_d
2866
expected_events = [{'generation': 13, 'volume_id': str(udf_id)},
2867
{'generation': 1, 'volume_id': ''}]
2920
expected_events = [{'generation': 1, 'volume_id': ''}]
2868
2921
self.assertEqual(expected_events, events)
2922
vol_id = yield root_from_scratch_d
2923
self.assertEqual(vol_id, '')
2924
vol_id = yield udf_from_scratch_d
2925
self.assertEqual(vol_id, str(udf_id))
2926
yield server_rescan_d
2870
2928
@defer.inlineCallbacks
2871
2929
def test_server_rescan_with_autosubscribe(self):
2886
2944
# patch the fake action queue
2887
2945
self.main.action_q.query_volumes = lambda: defer.succeed(response)
2888
self.patch(self.vm, '_scan_volume', lambda *a, **kw: defer.succeed(a))
2947
# patch aq.rescan_from_scratch in order to intercept the calls
2948
root_from_scratch_d = defer.Deferred()
2949
share_from_scratch_d = defer.Deferred()
2950
udf_from_scratch_d = defer.Deferred()
2951
from_scratch_deferreds = {'':root_from_scratch_d,
2952
str(share_id):share_from_scratch_d,
2953
str(udf_id):udf_from_scratch_d}
2954
self.patch(self.main.action_q, 'rescan_from_scratch',
2955
lambda vol_id: from_scratch_deferreds.pop(vol_id).callback(vol_id))
2958
self.patch(self.main.lr, 'scan_dir', lambda *a, **k: None)
2890
2960
vol_rescan_d = defer.Deferred()
2891
2961
self._listen_for('SV_VOLUME_NEW_GENERATION',
2892
vol_rescan_d.callback, 3, collect=True)
2962
vol_rescan_d.callback, 1, collect=True)
2893
2963
server_rescan_d = defer.Deferred()
2894
2964
self._listen_for('SYS_SERVER_RESCAN_DONE', server_rescan_d.callback)
2895
2965
with environ('HOME', self.home_dir):
2898
2968
yield server_rescan_d
2899
2969
events = yield vol_rescan_d
2901
expected_events = [{'generation': 17, 'volume_id': str(share_id)},
2902
{'generation': 13, 'volume_id': str(udf_id)},
2903
{'generation': 1, 'volume_id': ''}]
2971
expected_events = [{'generation': 1, 'volume_id': ''}]
2904
2972
self.assertEqual(expected_events, events)
2974
vol_id = yield root_from_scratch_d
2975
self.assertEqual(vol_id, '')
2976
vol_id = yield share_from_scratch_d
2977
self.assertEqual(vol_id, str(share_id))
2978
vol_id = yield udf_from_scratch_d
2979
self.assertEqual(vol_id, str(udf_id))
2906
2981
@defer.inlineCallbacks
2907
2982
def test_server_rescan_error(self):
2908
2983
"""Test the server_rescan method."""
3040
3115
root_id = uuid.uuid4()
3041
3116
root_volume = volumes.RootVolume(root_id, 1, 500)
3042
3117
response = [share_volume, udf_volume, root_volume]
3043
3119
d = defer.Deferred()
3044
self._listen_for('SV_VOLUME_NEW_GENERATION', d.callback, 2, True)
3120
self._listen_for('SV_VOLUME_NEW_GENERATION', d.callback, 1, True)
3045
3121
udf_d = defer.Deferred()
3046
3122
self._listen_for('VM_UDF_CREATED', udf_d.callback)
3047
3123
with environ('HOME', self.home_dir):
3055
3131
events_dict = dict((event['volume_id'], event['generation'])
3056
3132
for event in events)
3057
self.assertIn(str(share_id), events_dict)
3058
3133
# new udfs server rescan is triggered after local rescan.
3059
3134
self.assertNotIn(str(udf_id), events_dict)
3060
3135
self.assertIn(request.ROOT, events_dict)
3061
self.assertEqual(1, events_dict[str(share_id)])
3062
3136
self.assertEqual(1, events_dict[request.ROOT])
3063
3137
# set the local metadata generation to new value
3064
3138
share = self.vm.shares[str(share_id)]