107
118
FILE_NOTIFY_CHANGE_LAST_WRITE | \
108
119
FILE_NOTIFY_CHANGE_SECURITY | \
109
120
FILE_NOTIFY_CHANGE_LAST_ACCESS
121
self.memento = MementoHandler()
122
self.memento.setLevel(logging.DEBUG)
124
self.paths_checked = []
125
old_is_dir = Watch._path_is_dir
127
def file_notify_information_wrapper(buf, data):
128
"""Wrapper that gets the events and adds them to the list."""
129
events = FILE_NOTIFY_INFORMATION(buf, data)
130
# we want to append the list because that is what will be logged.
131
# If we use extend we wont have the same logging because it will
132
# group all events in a single lists which is not what the COM API
134
str_events = [(WINDOWS_ACTIONS_NAMES[action], path) for action, path in
136
self.raw_events.append(str_events)
139
def path_is_dir_wrapper(watch, path):
140
"""Wrapper that gets the checked paths."""
141
result = old_is_dir(watch, path)
142
self.paths_checked.append((path, result))
145
self.patch(filesystem_notifications, 'FILE_NOTIFY_INFORMATION',
146
file_notify_information_wrapper)
147
self.patch(filesystem_notifications.Watch, '_path_is_dir',
112
150
@defer.inlineCallbacks
113
def _perform_operations(self, path, mask, auto_add, actions, number_events):
151
def _perform_operations(self, path, mask, auto_add, actions,
114
153
"""Perform the file operations and returns the recorded events."""
115
154
handler = TestCaseHandler(number_events=number_events)
116
155
manager = WatchManager(handler)
117
156
yield manager.add_watch(os_helper.get_windows_valid_path(path), mask,
118
157
auto_add=auto_add)
158
# change the logger so that we can check the logs if we wanted
159
manager._wdm[0].log.addHandler(self.memento)
161
self.addCleanup(manager._wdm[0].log.removeHandler, self.memento)
119
162
# execution the actions
121
164
# process the recorded events
666
750
@defer.inlineCallbacks
667
751
def test_stop_watching_fired_when_watch_thread_finishes(self):
668
752
"""The deferred returned is fired when the watch thread finishes."""
670
753
test_path = self.mktemp("another_test_directory")
671
fake_processor = events.append
672
watch = Watch(1, test_path, self.mask, True, fake_processor)
754
watch = Watch(1, test_path, self.mask, True, None)
673
755
yield watch.start_watching()
674
756
self.assertNotEqual(watch._watch_handle, None)
675
757
yield watch.stop_watching()
676
758
self.assertEqual(watch._watch_handle, None)
760
def test_is_path_dir_missing_no_subdir(self):
761
"""Test when the path does not exist and is no a subdir."""
762
path = u'\\\\?\\C:\\path\\to\\no\\dir'
763
test_path = self.mktemp("test_directory")
764
self.patch(os.path, 'exists', lambda path: False)
765
watch = Watch(1, test_path, self.mask, True, None)
766
self.assertFalse(watch._path_is_dir(path))
768
def test_is_path_dir_missing_in_subdir(self):
769
"""Test when the path does not exist and is a subdir."""
770
path = u'\\\\?\\C:\\path\\to\\no\\dir'
771
test_path = self.mktemp("test_directory")
772
self.patch(os.path, 'exists', lambda path: False)
773
watch = Watch(1, test_path, self.mask, True, None)
774
watch._subdirs.add(path)
775
self.assertTrue(watch._path_is_dir(path))
777
def test_is_path_dir_present_is_dir(self):
778
"""Test when the path is present and is dir."""
779
path = u'\\\\?\\C:\\path\\to\\no\\dir'
780
test_path = self.mktemp("test_directory")
781
self.patch(os.path, 'exists', lambda path: True)
782
self.patch(os.path, 'isdir', lambda path: True)
783
watch = Watch(1, test_path, self.mask, True, None)
784
watch._subdirs.add(path)
785
self.assertTrue(watch._path_is_dir(path))
787
def test_is_path_dir_present_no_dir(self):
788
"""Test when the path is present but not a dir."""
789
path = u'\\\\?\\C:\\path\\to\\no\\dir'
790
test_path = self.mktemp("test_directory")
791
self.patch(os.path, 'exists', lambda path: True)
792
self.patch(os.path, 'isdir', lambda path: False)
793
watch = Watch(1, test_path, self.mask, True, None)
794
watch._subdirs.add(path)
795
self.assertFalse(watch._path_is_dir(path))
797
def test_update_subdirs_create_not_present(self):
798
"""Test when we update on a create event and not present."""
799
path = u'\\\\?\\C:\\path\\to\\no\\dir'
800
test_path = self.mktemp("test_directory")
801
watch = Watch(1, test_path, self.mask, True, None)
802
watch._update_subdirs(path, REVERSE_WINDOWS_ACTIONS[IN_CREATE])
803
self.assertTrue(path in watch._subdirs)
805
def test_update_subdirs_create_present(self):
806
"""Test when we update on a create event and is present."""
807
path = u'\\\\?\\C:\\path\\to\\no\\dir'
808
test_path = self.mktemp("test_directory")
809
watch = Watch(1, test_path, self.mask, True, None)
810
watch._subdirs.add(path)
811
old_length = len(watch._subdirs)
812
watch._update_subdirs(path, REVERSE_WINDOWS_ACTIONS[IN_CREATE])
813
self.assertTrue(path in watch._subdirs)
814
self.assertEqual(old_length, len(watch._subdirs))
816
def test_update_subdirs_delete_not_present(self):
817
"""Test when we delete and is not present."""
818
path = u'\\\\?\\C:\\path\\to\\no\\dir'
819
test_path = self.mktemp("test_directory")
820
watch = Watch(1, test_path, self.mask, True, None)
821
watch._update_subdirs(path, REVERSE_WINDOWS_ACTIONS[IN_DELETE])
822
self.assertTrue(path not in watch._subdirs)
824
def test_update_subdirs_delete_present(self):
825
"""Test when we delete and is present."""
826
path = u'\\\\?\\C:\\path\\to\\no\\dir'
827
test_path = self.mktemp("test_directory")
828
watch = Watch(1, test_path, self.mask, True, None)
829
watch._subdirs.add(path)
830
watch._update_subdirs(path, REVERSE_WINDOWS_ACTIONS[IN_DELETE])
831
self.assertTrue(path not in watch._subdirs)
679
834
class TestWatchManager(BaseTwistedTestCase):
680
835
"""Test the watch manager."""
837
@defer.inlineCallbacks
683
839
"""Set each of the tests."""
684
super(TestWatchManager, self).setUp()
840
yield super(TestWatchManager, self).setUp()
685
841
self.parent_path = u'\\\\?\\C:\\' # a valid windows path
686
842
self.path = self.parent_path + u'path'
687
843
self.watch = Watch(1, self.path, None, True, None)
1067
1224
self.assertEqual(5, len(self.general.called_methods))
1068
1225
# assert that the ignores are called
1069
1226
self.assertEqual('is_ignored', self.general.called_methods[0][0])
1070
self.assertEqual(held_event.pathname, self.general.called_methods[0][1])
1227
self.assertEqual(held_event.pathname,
1228
self.general.called_methods[0][1])
1071
1229
self.assertEqual('is_ignored', self.general.called_methods[1][0])
1072
1230
self.assertEqual(event.pathname, self.general.called_methods[1][1])
1073
1231
# assert that we do request the share_id
1074
self.assertEqual('get_path_share_id', self.general.called_methods[2][0])
1232
self.assertEqual('get_path_share_id',
1233
self.general.called_methods[2][0])
1075
1234
self.assertEqual(os.path.split(event.pathname)[0],
1076
1235
self.general.called_methods[2][1],
1077
1236
'Get the share_id for event')
1078
self.assertEqual('get_path_share_id', self.general.called_methods[3][0])
1237
self.assertEqual('get_path_share_id',
1238
self.general.called_methods[3][0])
1079
1239
self.assertEqual(os.path.split(held_event.pathname)[0],
1080
1240
self.general.called_methods[3][1],
1081
1241
'Get the share_id for held event.')
1099
1260
self.assertEqual(5, len(self.general.called_methods))
1100
1261
# assert that the ignores are called
1101
1262
self.assertEqual('is_ignored', self.general.called_methods[0][0])
1102
self.assertEqual(held_event.pathname, self.general.called_methods[0][1])
1263
self.assertEqual(held_event.pathname,
1264
self.general.called_methods[0][1])
1103
1265
self.assertEqual('is_ignored', self.general.called_methods[1][0])
1104
1266
self.assertEqual(event.pathname, self.general.called_methods[1][1])
1105
1267
# assert that we do request the share_id
1106
self.assertEqual('get_path_share_id', self.general.called_methods[2][0])
1268
self.assertEqual('get_path_share_id',
1269
self.general.called_methods[2][0])
1107
1270
self.assertEqual(os.path.split(event.pathname)[0],
1108
1271
self.general.called_methods[2][1],
1109
1272
'Get the share_id for event')
1110
self.assertEqual('get_path_share_id', self.general.called_methods[3][0])
1273
self.assertEqual('get_path_share_id',
1274
self.general.called_methods[3][0])
1111
1275
self.assertEqual(os.path.split(held_event.pathname)[0],
1112
1276
self.general.called_methods[3][1],
1113
1277
'Get the share_id for held event.')