~ubuntu-branches/ubuntu/karmic/spambayes/karmic

« back to all changes in this revision

Viewing changes to Outlook2000/tester.py

  • Committer: Bazaar Package Importer
  • Author(s): Jorge Bernal
  • Date: 2005-04-07 14:02:02 UTC
  • Revision ID: james.westby@ubuntu.com-20050407140202-mgyh6t7gn2dlrrw5
Tags: upstream-1.0.1
ImportĀ upstreamĀ versionĀ 1.0.1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# unit tester for the Outlook addin.
 
2
#
 
3
# Note we are only attempting to test Outlook specific
 
4
# functionality, such as filters, etc.
 
5
#
 
6
# General process is to create test messages known to contain ham/spam
 
7
# keywords, and tracking their progress through the filters.  We also
 
8
# move this test message back around, and watch the incremental retrain
 
9
# in action.  Also checks that the message correctly remains classified
 
10
# after a message move.
 
11
from __future__ import generators
 
12
 
 
13
from win32com.client import constants
 
14
import sys
 
15
from time import sleep
 
16
import copy
 
17
import rfc822
 
18
import cStringIO
 
19
import threading
 
20
 
 
21
from spambayes.storage import STATE_KEY
 
22
 
 
23
import msgstore
 
24
 
 
25
from win32com.mapi import mapi, mapiutil
 
26
import pythoncom
 
27
 
 
28
HAM="ham"
 
29
SPAM="spam"
 
30
UNSURE="unsure"
 
31
 
 
32
TEST_SUBJECT = "SpamBayes addin auto-generated test message"
 
33
 
 
34
class TestFailure(Exception):
 
35
    pass
 
36
 
 
37
def TestFailed(msg):
 
38
    raise TestFailure(msg)
 
39
 
 
40
def AssertRaises(exception, func, *args):
 
41
    try:
 
42
        func(*args)
 
43
        raise TestFailed("Function '%s' should have raised '%r', but it worked!" % \
 
44
                         (func, exception))
 
45
    except:
 
46
        exc_type = sys.exc_info()[0]
 
47
        if exc_type == exception or issubclass(exc_type, exception):
 
48
            return
 
49
        raise
 
50
 
 
51
filter_event = threading.Event()
 
52
 
 
53
def WaitForFilters():
 
54
    # Must wait longer than normal, so when run with a timer we still work.
 
55
    filter_event.clear()
 
56
    for i in range(500):
 
57
        pythoncom.PumpWaitingMessages()
 
58
        if filter_event.isSet():
 
59
            break
 
60
        sleep(0.01)
 
61
 
 
62
def DictExtractor(bayes):
 
63
    for k, v in bayes.wordinfo.items():
 
64
        yield k, v
 
65
 
 
66
def DBExtractor(bayes):
 
67
    # We use bsddb3 now if we can
 
68
    try:
 
69
        import bsddb3 as bsddb
 
70
        bsddb_error = bsddb.db.DBNotFoundError
 
71
    except ImportError:
 
72
        import bsddb
 
73
        bsddb_error = bsddb.error
 
74
    key = bayes.dbm.first()[0]
 
75
    if key != STATE_KEY:
 
76
        yield key, bayes._wordinfoget(key)
 
77
    while True:
 
78
        try:
 
79
            key = bayes.dbm.next()[0]
 
80
        except bsddb.error:
 
81
            break
 
82
        except bsddb_error:
 
83
            break
 
84
        if key != STATE_KEY:
 
85
            yield key, bayes._wordinfoget(key)
 
86
 
 
87
# Find the top 'n' words in the Spam database that are clearly
 
88
# marked as either ham or spam.  Simply enumerates the
 
89
# bayes word list looking for any word with zero count in the
 
90
# non-requested category.
 
91
_top_ham = None
 
92
_top_spam = None
 
93
def FindTopWords(bayes, num, get_spam):
 
94
    global _top_spam, _top_ham
 
95
    if get_spam and _top_spam: return _top_spam
 
96
    if not get_spam and _top_ham: return _top_ham
 
97
    items = []
 
98
    try:
 
99
        bayes.db # bsddb style
 
100
        extractor = DBExtractor
 
101
    except AttributeError:
 
102
        extractor = DictExtractor
 
103
 
 
104
    for word, info in extractor(bayes):
 
105
        if info is None:
 
106
            break
 
107
        if ":" in word:
 
108
            continue
 
109
        if get_spam:
 
110
            if info.hamcount==0:
 
111
                items.append((info.spamcount, word, info))
 
112
        else:
 
113
            if info.spamcount==0:
 
114
                items.append((info.hamcount, word, info))
 
115
    items.sort()
 
116
    items.reverse()
 
117
    # Throw an error if we don't have enough tokens - otherwise
 
118
    # the test itself may fail, which will be more confusing than this.
 
119
    if len(items) < num:
 
120
        TestFailed("Error: could not find %d words with Spam=%s - only found %d" % (num, get_spam, len(items)))
 
121
    ret = {}
 
122
    for n, word, info in items[:num]:
 
123
        ret[word]=copy.copy(info)
 
124
    if get_spam:
 
125
        _top_spam = ret
 
126
    else:
 
127
        _top_ham = ret
 
128
    return ret
 
129
 
 
130
# A little driver/manager for our tests
 
131
class Driver:
 
132
    def __init__(self, mgr):
 
133
        if mgr is None:
 
134
            import manager
 
135
            mgr = manager.GetManager()
 
136
        self.manager = mgr
 
137
        # Remember the "spam" folder.
 
138
        folder = mgr.message_store.GetFolder(mgr.config.filter.spam_folder_id)
 
139
        self.folder_spam = folder.GetOutlookItem()
 
140
        # Remember the "unsure" folder.
 
141
        folder = mgr.message_store.GetFolder(mgr.config.filter.unsure_folder_id)
 
142
        self.folder_unsure = folder.GetOutlookItem()
 
143
        # And the drafts folder where new messages are created.
 
144
        self.folder_drafts = mgr.outlook.Session.GetDefaultFolder(constants.olFolderDrafts)
 
145
 
 
146
    def GetWatchFolderGenerator(self):
 
147
        mgr = self.manager
 
148
        gen = mgr.message_store.GetFolderGenerator(
 
149
                                mgr.config.filter.watch_folder_ids,
 
150
                                mgr.config.filter.watch_include_sub)
 
151
        for f in gen:
 
152
            yield f, f.GetOutlookItem()
 
153
 
 
154
    def FindTestMessage(self, folder):
 
155
        subject = TEST_SUBJECT
 
156
        items = folder.Items
 
157
        return items.Find("[Subject] = '%s'" % (subject,))
 
158
 
 
159
    def CheckMessageFilteredFrom(self, folder):
 
160
        # For hotmail accounts, the message may take a little time to actually
 
161
        # be removed from the original folder (ie, it appears in the "dest"
 
162
        # folder before it vanished.
 
163
        for i in range(5):
 
164
            if self.FindTestMessage(folder) is None:
 
165
                break
 
166
            for j in range(10):
 
167
                sleep(.05)
 
168
        else:
 
169
            ms_folder = self.manager.message_store.GetFolder(folder)
 
170
            TestFailed("The test message remained in folder '%s'" % ms_folder.GetFQName())
 
171
 
 
172
    def _CleanTestMessageFromFolder(self, folder):
 
173
        subject = TEST_SUBJECT
 
174
        num = 0
 
175
        # imap/hotmail etc only soft delete, and I see no way to differentiate
 
176
        # force the user to purge them manually
 
177
        for i in range(50):
 
178
            msg = self.FindTestMessage(folder)
 
179
            if msg is None:
 
180
                break
 
181
            msg.Delete()
 
182
        else:
 
183
            raise TestFailed("Old test messages appear to still exist.  These may" \
 
184
                             "be 'soft-deleted' - you will need to purge them manually")
 
185
        if num:
 
186
            print "Cleaned %d test messages from folder '%s'" % (num, folder.Name)
 
187
 
 
188
    def CleanAllTestMessages(self):
 
189
        self._CleanTestMessageFromFolder(self.folder_spam)
 
190
        self._CleanTestMessageFromFolder(self.folder_unsure)
 
191
        self._CleanTestMessageFromFolder(self.folder_drafts)
 
192
        for msf, of in self.GetWatchFolderGenerator():
 
193
            self._CleanTestMessageFromFolder(of)
 
194
 
 
195
    def CreateTestMessageInFolder(self, spam_status, folder):
 
196
        msg, words = self.CreateTestMessage(spam_status)
 
197
        msg.Save() # Put into "Drafts".
 
198
        assert self.FindTestMessage(self.folder_drafts) is not None
 
199
        # Move it to the specified folder
 
200
        msg.Move(folder)
 
201
        # And now find it in the specified folder
 
202
        return self.FindTestMessage(folder), words
 
203
 
 
204
    def CreateTestMessage(self, spam_status):
 
205
        words = {}
 
206
        bayes = self.manager.classifier_data.bayes
 
207
        if spam_status != SPAM:
 
208
            words.update(FindTopWords(bayes, 50, False))
 
209
        if spam_status != HAM:
 
210
            words.update(FindTopWords(bayes, 50, True))
 
211
        # Create a new blank message with our words
 
212
        msg = self.manager.outlook.CreateItem(0)
 
213
        msg.Body = "\n".join(words.keys())
 
214
        msg.Subject = TEST_SUBJECT
 
215
        return msg, words
 
216
 
 
217
def check_words(words, bayes, spam_offset, ham_offset):
 
218
    for word, existing_info in words.items():
 
219
        new_info = bayes._wordinfoget(word)
 
220
        if existing_info.spamcount+spam_offset != new_info.spamcount or \
 
221
           existing_info.hamcount+ham_offset != new_info.hamcount:
 
222
            TestFailed("Word check for '%s failed. "
 
223
                       "old spam/ham=%d/%d, new spam/ham=%d/%d,"
 
224
                       "spam_offset=%d, ham_offset=%d" % \
 
225
                       (word,
 
226
                        existing_info.spamcount, existing_info.hamcount,
 
227
                        new_info.spamcount, new_info.hamcount,
 
228
                        spam_offset, ham_offset))
 
229
 
 
230
# The tests themselves.
 
231
# The "spam" test is huge - we do standard filter tests, but
 
232
# also do incremental retrain tests.
 
233
def TestSpamFilter(driver):
 
234
    bayes = driver.manager.classifier_data.bayes
 
235
    nspam = bayes.nspam
 
236
    nham = bayes.nham
 
237
    original_bayes = copy.copy(driver.manager.classifier_data.bayes)
 
238
    # for each watch folder, create a spam message, and do the training thang
 
239
    for msf_watch, folder_watch in driver.GetWatchFolderGenerator():
 
240
        print "Performing Spam test on watch folder '%s'..." % msf_watch.GetFQName()
 
241
        # Create a spam message in the Inbox - it should get immediately filtered
 
242
        msg, words = driver.CreateTestMessageInFolder(SPAM, folder_watch)
 
243
        # sleep to ensure filtering.
 
244
        WaitForFilters()
 
245
        # It should no longer be in the Inbox.
 
246
        driver.CheckMessageFilteredFrom(folder_watch)
 
247
        # It should be in the "sure is spam" folder.
 
248
        spam_msg = driver.FindTestMessage(driver.folder_spam)
 
249
        if spam_msg is None:
 
250
            TestFailed("The test message vanished from the Inbox, but didn't appear in Spam")
 
251
        # Check that none of the above caused training.
 
252
        if nspam != bayes.nspam:
 
253
            TestFailed("Something caused a new spam message to appear")
 
254
        if nham != bayes.nham:
 
255
            TestFailed("Something caused a new ham message to appear")
 
256
        check_words(words, bayes, 0, 0)
 
257
 
 
258
        # Now move the message back to the inbox - it should get trained.
 
259
        store_msg = driver.manager.message_store.GetMessage(spam_msg)
 
260
        import train
 
261
        if train.been_trained_as_ham(store_msg, driver.manager.classifier_data):
 
262
            TestFailed("This new spam message should not have been trained as ham yet")
 
263
        if train.been_trained_as_spam(store_msg, driver.manager.classifier_data):
 
264
            TestFailed("This new spam message should not have been trained as spam yet")
 
265
        spam_msg.Move(folder_watch)
 
266
        WaitForFilters()
 
267
        spam_msg = driver.FindTestMessage(folder_watch)
 
268
        if spam_msg is None:
 
269
            TestFailed("The message appears to have been filtered out of the watch folder")
 
270
        store_msg = driver.manager.message_store.GetMessage(spam_msg)
 
271
        need_untrain = True
 
272
        try:
 
273
            if nspam != bayes.nspam:
 
274
                TestFailed("There were not the same number of spam messages after a re-train")
 
275
            if nham+1 != bayes.nham:
 
276
                TestFailed("There was not one more ham messages after a re-train")
 
277
            if train.been_trained_as_spam(store_msg, driver.manager.classifier_data):
 
278
                TestFailed("This new spam message should not have been trained as spam yet")
 
279
            if not train.been_trained_as_ham(store_msg, driver.manager.classifier_data):
 
280
                TestFailed("This new spam message should have been trained as ham now")
 
281
            # word infos should have one extra ham
 
282
            check_words(words, bayes, 0, 1)
 
283
            # Now move it back to the Spam folder.
 
284
            # This should see the message un-trained as ham, and re-trained as Spam
 
285
            spam_msg.Move(driver.folder_spam)
 
286
            WaitForFilters()
 
287
            spam_msg = driver.FindTestMessage(driver.folder_spam)
 
288
            if spam_msg is None:
 
289
                TestFailed("Could not find the message in the Spam folder")
 
290
            store_msg = driver.manager.message_store.GetMessage(spam_msg)
 
291
            if nspam +1 != bayes.nspam:
 
292
                TestFailed("There should be one more spam now")
 
293
            if nham != bayes.nham:
 
294
                TestFailed("There should be the same number of hams again")
 
295
            if not train.been_trained_as_spam(store_msg, driver.manager.classifier_data):
 
296
                TestFailed("This new spam message should have been trained as spam by now")
 
297
            if train.been_trained_as_ham(store_msg, driver.manager.classifier_data):
 
298
                TestFailed("This new spam message should have been un-trained as ham")
 
299
            # word infos should have one extra spam, no extra ham
 
300
            check_words(words, bayes, 1, 0)
 
301
            # Move the message to another folder, and make sure we still
 
302
            # identify it correctly as having been trained.
 
303
            # Move to the "unsure" folder, just cos we know about it, and
 
304
            # we know that no special watching of this folder exists.
 
305
            spam_msg.Move(driver.folder_unsure)
 
306
            spam_msg = driver.FindTestMessage(driver.folder_unsure)
 
307
            if spam_msg is None:
 
308
                TestFailed("Could not find the message in the Unsure folder")
 
309
            store_msg = driver.manager.message_store.GetMessage(spam_msg)
 
310
            if not train.been_trained_as_spam(store_msg, driver.manager.classifier_data):
 
311
                TestFailed("Message was not identified as Spam after moving")
 
312
 
 
313
            # word infos still be 'spam'
 
314
            check_words(words, bayes, 1, 0)
 
315
 
 
316
            # Now undo the damage we did.
 
317
            was_spam = train.untrain_message(store_msg, driver.manager.classifier_data)
 
318
            if not was_spam:
 
319
                TestFailed("Untraining this message did not indicate it was spam")
 
320
            if train.been_trained_as_spam(store_msg, driver.manager.classifier_data) or \
 
321
               train.been_trained_as_ham(store_msg, driver.manager.classifier_data):
 
322
                TestFailed("Untraining this message kept it has ham/spam")
 
323
            need_untrain = False
 
324
        finally:
 
325
            if need_untrain:
 
326
                train.untrain_message(store_msg, driver.manager.classifier_data)
 
327
 
 
328
        # Check all the counts are back where we started.
 
329
        if nspam != bayes.nspam:
 
330
            TestFailed("Spam count didn't get back to the same")
 
331
        if nham != bayes.nham:
 
332
            TestFailed("Ham count didn't get back to the same")
 
333
        check_words(words, bayes, 0, 0)
 
334
 
 
335
        if bayes.wordinfo != original_bayes.wordinfo:
 
336
            TestFailed("The bayes object's 'wordinfo' did not compare the same at the end of all this!")
 
337
        if bayes.probcache != original_bayes.probcache:
 
338
            TestFailed("The bayes object's 'probcache' did not compare the same at the end of all this!")
 
339
 
 
340
        spam_msg.Delete()
 
341
    print "Created a Spam message, and saw it get filtered and trained."
 
342
 
 
343
def _DoTestHamTrain(driver, folder1, folder2):
 
344
    # [ 780612 ] Outlook incorrectly trains on moved messages
 
345
    # Should not train when previously classified message is moved by the user
 
346
    # from one watch folder to another.
 
347
    bayes = driver.manager.classifier_data.bayes
 
348
    nham = bayes.nham
 
349
    nspam = bayes.nspam
 
350
 
 
351
    # Create a ham message in the Inbox - it wont get filtered if the other
 
352
    # tests pass, but we do need to wait for it to be scored.
 
353
    msg, words = driver.CreateTestMessageInFolder(HAM, folder1)
 
354
    # sleep to ensure filtering.
 
355
    WaitForFilters()
 
356
    # It should still be in the Inbox.
 
357
    if driver.FindTestMessage(folder1) is None:
 
358
        TestFailed("The test ham message appeared to have been filtered!")
 
359
 
 
360
    # Manually move it to folder2
 
361
    msg.Move(folder2)
 
362
    # sleep to any processing in this folder.
 
363
    WaitForFilters()
 
364
    # re-find it in folder2
 
365
    msg = driver.FindTestMessage(folder2)
 
366
    if driver.FindTestMessage(folder2) is None:
 
367
        TestFailed("Couldn't find the ham message we just moved")
 
368
 
 
369
    if nspam != bayes.nspam or nham != bayes.nham:
 
370
        TestFailed("Move of existing ham caused a train")
 
371
    msg.Delete()
 
372
 
 
373
def _DoTestHamFilter(driver, folder):
 
374
    # Create a ham message in the Inbox - it should not get filtered
 
375
    msg, words = driver.CreateTestMessageInFolder(HAM, folder)
 
376
    # sleep to ensure filtering.
 
377
    WaitForFilters()
 
378
    # It should still be in the Inbox.
 
379
    if driver.FindTestMessage(folder) is None:
 
380
        TestFailed("The test ham message appeared to have been filtered!")
 
381
    msg.Delete()
 
382
 
 
383
def TestHamFilter(driver):
 
384
    # Execute the 'ham' test in every folder we watch.
 
385
    mgr = driver.manager
 
386
    gen = mgr.message_store.GetFolderGenerator(
 
387
                        mgr.config.filter.watch_folder_ids,
 
388
                        mgr.config.filter.watch_include_sub)
 
389
    num = 0
 
390
    folders = []
 
391
    for f in gen:
 
392
        print "Running ham filter tests on folder '%s'" % f.GetFQName()
 
393
        f = f.GetOutlookItem()
 
394
        _DoTestHamFilter(driver, f)
 
395
        num += 1
 
396
        folders.append(f)
 
397
    # Now test incremental train logic, between all these folders.
 
398
    if len(folders)<2:
 
399
        print "NOTE: Can't do incremental training tests as only 1 watch folder is in place"
 
400
    else:
 
401
        for f in folders:
 
402
            # 'targets' is a list of all folders except this
 
403
            targets = folders[:]
 
404
            targets.remove(f)
 
405
            for t in targets:
 
406
                _DoTestHamTrain(driver, f, t)
 
407
    print "Created a Ham message, and saw it remain in place (in %d watch folders.)" % num
 
408
 
 
409
def TestUnsureFilter(driver):
 
410
    # Create a spam message in the Inbox - it should get immediately filtered
 
411
    for msf_watch, folder_watch in driver.GetWatchFolderGenerator():
 
412
        print "Performing Spam test on watch folder '%s'..." % msf_watch.GetFQName()
 
413
        msg, words = driver.CreateTestMessageInFolder(UNSURE, folder_watch)
 
414
        # sleep to ensure filtering.
 
415
        WaitForFilters()
 
416
        # It should no longer be in the Inbox.
 
417
        driver.CheckMessageFilteredFrom(folder_watch)
 
418
        # It should be in the "unsure" folder.
 
419
        spam_msg = driver.FindTestMessage(driver.folder_unsure)
 
420
        if spam_msg is None:
 
421
            TestFailed("The test message vanished from the Inbox, but didn't appear in Unsure")
 
422
        spam_msg.Delete()
 
423
    print "Created an unsure message, and saw it get filtered"
 
424
 
 
425
def run_tests(manager):
 
426
    "Filtering tests"
 
427
    driver = Driver(manager)
 
428
    manager.Save() # necessary after a full retrain
 
429
    assert driver.manager.config.filter.enabled, "Filtering must be enabled for these tests"
 
430
    assert driver.manager.config.training.train_recovered_spam and \
 
431
           driver.manager.config.training.train_manual_spam, "Incremental training must be enabled for these tests"
 
432
    driver.CleanAllTestMessages()
 
433
    TestSpamFilter(driver)
 
434
    TestUnsureFilter(driver)
 
435
    TestHamFilter(driver)
 
436
    driver.CleanAllTestMessages()
 
437
 
 
438
def run_filter_tests(manager):
 
439
    # setup config to save info with the message, and test
 
440
    apply_with_new_config(manager,
 
441
                          {"Filter.timer_enabled": False,
 
442
                           "Filter.save_spam_info" : True,
 
443
                          },
 
444
                          run_tests, manager)
 
445
 
 
446
    apply_with_new_config(manager,
 
447
                          {"Filter.timer_enabled": True,
 
448
                           "Filter.save_spam_info" : True,
 
449
                          },
 
450
                          run_tests, manager)
 
451
    apply_with_new_config(manager,
 
452
                          {"Filter.timer_enabled": False,
 
453
                           "Filter.save_spam_info" : False,
 
454
                          },
 
455
                          run_tests, manager)
 
456
 
 
457
    apply_with_new_config(manager,
 
458
                          {"Filter.timer_enabled": True,
 
459
                           "Filter.save_spam_info" : False,
 
460
                          },
 
461
                          run_tests, manager)
 
462
 
 
463
def apply_with_new_config(manager, new_config_dict, func, *args):
 
464
    old_config = {}
 
465
    friendly_opts = []
 
466
    for name, val in new_config_dict.items():
 
467
        sect_name, opt_name = name.split(".")
 
468
        old_config[sect_name, opt_name] = manager.options.get(sect_name, opt_name)
 
469
        manager.options.set(sect_name, opt_name, val)
 
470
        friendly_opts.append("%s=%s" % (name, val))
 
471
    manager.addin.FiltersChanged() # to ensure correct filtler in place
 
472
    try:
 
473
        test_name = getattr(func, "__doc__", None)
 
474
        if not test_name: test_name = func.__name__
 
475
        print "*" * 10, "Running '%s' with %s" % (test_name, ", ".join(friendly_opts))
 
476
        func(*args)
 
477
    finally:
 
478
        for (sect_name, opt_name), val in old_config.items():
 
479
            manager.options.set(sect_name, opt_name, val)
 
480
 
 
481
###############################################################################
 
482
# "Non-filter" tests are those that don't require us to create messages and
 
483
# see them get filtered.
 
484
def run_nonfilter_tests(manager):
 
485
    # And now some other 'sanity' checks.
 
486
    # Check messages we are unable to score.
 
487
    # Must enable the filtering code for this test
 
488
    msgstore.test_suite_running = False
 
489
    try:
 
490
        print "Scanning all your good mail and spam for some sanity checks..."
 
491
        num_found = num_looked = 0
 
492
        num_without_headers = num_without_body = num_without_html_body = 0
 
493
        for folder_ids, include_sub in [
 
494
            (manager.config.filter.watch_folder_ids, manager.config.filter.watch_include_sub),
 
495
            ([manager.config.filter.spam_folder_id], False),
 
496
            ]:
 
497
            for folder in manager.message_store.GetFolderGenerator(folder_ids, include_sub):
 
498
                for message in folder.GetMessageGenerator(False):
 
499
                    # If not ipm.note, then no point reporting - but any
 
500
                    # ipm.note messages we don't want to filter should be
 
501
                    # reported.
 
502
                    num_looked += 1
 
503
                    if num_looked % 500 == 0: print " scanned", num_looked, "messages..."
 
504
                    if not message.IsFilterCandidate() and \
 
505
                        message.msgclass.lower().startswith("ipm.note"):
 
506
                        if num_found == 0:
 
507
                            print "*" * 80
 
508
                            print "WARNING: We found the following messages in your folders that would not be filtered by the addin"
 
509
                            print "If any of these messages should be filtered, we have a bug!"
 
510
                        num_found += 1
 
511
                        print " %s/%s" % (folder.name, message.subject)
 
512
                    headers, body, html_body = message._GetMessageTextParts()
 
513
                    if not headers: num_without_headers += 1
 
514
                    if not body: num_without_body += 1
 
515
                    # for HTML, we only check multi-part
 
516
                    temp_obj = rfc822.Message(cStringIO.StringIO(headers+"\n\n"))
 
517
                    content_type = temp_obj.get("content-type", '')
 
518
                    if content_type.lower().startswith("multipart"):
 
519
                        if not html_body: num_without_html_body += 1
 
520
 
 
521
        print "Checked %d items, %d non-filterable items found" % (num_looked, num_found)
 
522
        print "of these items, %d had no headers, %d had no text body and %d had no HTML" % \
 
523
                (num_without_headers, num_without_body, num_without_html_body)
 
524
    finally:
 
525
        msgstore.test_suite_running = True
 
526
 
 
527
def run_invalid_id_tests(manager):
 
528
    # Do some tests with invalid message and folder IDs.
 
529
    print "Doing some 'invalid ID' tests - you should see a couple of warning, but no errors or tracebacks"
 
530
    id_no_item = ('0000','0000') # this ID is 'valid' - but there will be no such item
 
531
    id_invalid = ('xxxx','xxxx') # this ID is 'invalid' in that the hex-bin conversion fails
 
532
    id_empty1 = ('','')
 
533
    id_empty2 = ()
 
534
    bad_ids = id_no_item, id_invalid, id_empty1, id_empty2
 
535
    for id in bad_ids:
 
536
        AssertRaises(msgstore.MsgStoreException, manager.message_store.GetMessage, id)
 
537
    # Test 'GetFolderGenerator' works with invalid ids.
 
538
    for id in bad_ids:
 
539
        AssertRaises(msgstore.MsgStoreException, manager.message_store.GetFolder, id)
 
540
        ids = manager.config.filter.watch_folder_ids[:]
 
541
        ids.append(id)
 
542
        found = 0
 
543
        for f in manager.message_store.GetFolderGenerator(ids, False):
 
544
            found += 1
 
545
        if found > len(manager.config.filter.watch_folder_ids):
 
546
            raise TestFailed("Seemed to find the extra folder")
 
547
        names = manager.FormatFolderNames(ids, False)
 
548
        if names.find("<unknown") < 0:
 
549
            raise TestFailed("Couldn't find unknown folder in names '%s'" % names)
 
550
    print "Finished 'invalid ID' tests"
 
551
 
 
552
###############################################################################
 
553
# "Failure" tests - execute some tests while provoking the msgstore to simulate
 
554
# various MAPI errors.  Although not complete, it does help exercise our code
 
555
# paths through the code.
 
556
def _restore_mapi_failure():
 
557
    msgstore.test_suite_failure = None
 
558
    msgstore.test_suite_failure_request = None
 
559
 
 
560
def _setup_for_mapi_failure(checkpoint, hr, fail_count = None):
 
561
    assert msgstore.test_suite_running, "msgstore should already know its running"
 
562
    assert not msgstore.test_suite_failure, "should already have torn down previous failure"
 
563
    msgstore.test_suite_failure = pythoncom.com_error, \
 
564
                         (hr, "testsuite generated error", None, -1)
 
565
    msgstore.test_suite_failure_request = checkpoint
 
566
    msgstore.test_suite_failure_count = fail_count
 
567
 
 
568
def _setup_mapi_notfound_failure(checkpoint):
 
569
    _setup_for_mapi_failure(checkpoint, mapi.MAPI_E_NOT_FOUND)
 
570
 
 
571
def _do_single_failure_ham_test(driver, checkpoint, hr, fail_count = None):
 
572
    _do_single_failure_test(driver, True, checkpoint, hr, fail_count)
 
573
 
 
574
def _do_single_failure_spam_test(driver, checkpoint, hr, fail_count = None):
 
575
    _do_single_failure_test(driver, False, checkpoint, hr, fail_count)
 
576
 
 
577
def _do_single_failure_test(driver, is_ham, checkpoint, hr, fail_count):
 
578
    print "-> Testing MAPI error '%s' in %s" % (mapiutil.GetScodeString(hr),
 
579
                                              checkpoint)
 
580
    # message moved after we have ID, but before opening.
 
581
    for msf, folder in driver.GetWatchFolderGenerator():
 
582
        print "Testing in folder '%s'" % msf.GetFQName()
 
583
        if is_ham:
 
584
            msg, words = driver.CreateTestMessageInFolder(HAM, folder)
 
585
        else:
 
586
            msg, words = driver.CreateTestMessageInFolder(SPAM, folder)
 
587
        try:
 
588
            _setup_for_mapi_failure(checkpoint, hr, fail_count)
 
589
            try:
 
590
                # sleep to ensure filtering.
 
591
                WaitForFilters()
 
592
            finally:
 
593
                _restore_mapi_failure()
 
594
            if driver.FindTestMessage(folder) is None:
 
595
                TestFailed("We appear to have filtered a message even though we forced 'not found' failure")
 
596
        finally:
 
597
            if msg is not None:
 
598
                msg.Delete()
 
599
    print "<- Finished MAPI error '%s' in %s" % (mapiutil.GetScodeString(hr),
 
600
                                                 checkpoint)
 
601
 
 
602
def do_failure_tests(manager):
 
603
    # We setup msgstore to fail for us, then try a few tests.  The idea is to
 
604
    # ensure we gracefully degrade in these failures.
 
605
    # We set verbosity to min of 1, as this helps us see how the filters handle
 
606
    # the errors.
 
607
    driver = Driver(manager)
 
608
    driver.CleanAllTestMessages()
 
609
    old_verbose = manager.verbose
 
610
    manager.verbose = max(1, old_verbose)
 
611
    try:
 
612
        _do_single_failure_ham_test(driver, "MAPIMsgStoreMsg._EnsureObject", mapi.MAPI_E_NOT_FOUND)
 
613
        _do_single_failure_ham_test(driver, "MAPIMsgStoreMsg.SetField", -2146644781)
 
614
        _do_single_failure_ham_test(driver, "MAPIMsgStoreMsg.Save", -2146644781)
 
615
        _do_single_failure_ham_test(driver, "MAPIMsgStoreMsg.Save",
 
616
                                    mapi.MAPI_E_OBJECT_CHANGED, fail_count=1)
 
617
        # SetReadState???
 
618
        _do_single_failure_spam_test(driver, "MAPIMsgStoreMsg._DoCopyMove", mapi.MAPI_E_TABLE_TOO_BIG)
 
619
 
 
620
    finally:
 
621
        manager.verbose = old_verbose
 
622
 
 
623
def run_failure_tests(manager):
 
624
    "Forced MAPI failure tests"
 
625
    apply_with_new_config(manager,
 
626
                          {"Filter.timer_enabled": True,
 
627
                          },
 
628
                          do_failure_tests, manager)
 
629
    apply_with_new_config(manager,
 
630
                          {"Filter.timer_enabled": False,
 
631
                          },
 
632
                          do_failure_tests, manager)
 
633
 
 
634
def filter_message_with_event(msg, mgr, all_actions=True):
 
635
    import filter
 
636
    ret = filter._original_filter_message(msg, mgr, all_actions)
 
637
    if ret != "Failed":
 
638
        filter_event.set() # only set if it works
 
639
    return ret
 
640
 
 
641
def test(manager):
 
642
    from dialogs import SetWaitCursor
 
643
    SetWaitCursor(1)
 
644
 
 
645
    import filter
 
646
    if "_original_filter_message" not in filter.__dict__:
 
647
        filter._original_filter_message = filter.filter_message
 
648
        filter.filter_message = filter_message_with_event
 
649
 
 
650
    try: # restore the plugin config at exit.
 
651
        assert not msgstore.test_suite_running, "already running??"
 
652
        msgstore.test_suite_running = True
 
653
        assert not manager.test_suite_running, "already running??"
 
654
        manager.test_suite_running = True
 
655
        run_filter_tests(manager)
 
656
        run_failure_tests(manager)
 
657
        run_invalid_id_tests(manager)
 
658
        # non-filter tests take alot of time - ask if you want to do them
 
659
        if manager.AskQuestion("Do you want to run the non-filter tests?" \
 
660
                               "\r\n\r\nThese may take some time"):
 
661
            run_nonfilter_tests(manager)
 
662
        print "*" * 20
 
663
        print "Test suite finished without error!"
 
664
        print "*" * 20
 
665
    finally:
 
666
        print "Restoring standard configuration..."
 
667
        # Always restore configuration to how we started.
 
668
        msgstore.test_suite_running = False
 
669
        manager.test_suite_running = False
 
670
        manager.LoadConfig()
 
671
        manager.addin.FiltersChanged() # restore original filters.
 
672
        manager.addin.ProcessMissedMessages()
 
673
        SetWaitCursor(0)
 
674
 
 
675
if __name__=='__main__':
 
676
    print "NOTE: This will NOT work from the command line"
 
677
    print "(it nearly will, and is useful for debugging the tests"
 
678
    print "themselves, so we will run them anyway!)"
 
679
    test()